You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/02/19 10:46:46 UTC

ignite git commit: ignite-1232 Avoid error on cache start if affinity key is not part of table

Repository: ignite
Updated Branches:
  refs/heads/ignite-1232 080d50391 -> 7fab7051a


ignite-1232 Avoid error on cache start if affinity key is not part of table


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7fab7051
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7fab7051
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7fab7051

Branch: refs/heads/ignite-1232
Commit: 7fab7051a6cabe8b8d3b4a09776232444d7f2882
Parents: 080d503
Author: sboikov <sb...@gridgain.com>
Authored: Fri Feb 19 12:46:39 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Feb 19 12:46:39 2016 +0300

----------------------------------------------------------------------
 .../processors/query/h2/IgniteH2Indexing.java   |  13 +
 .../processors/query/h2/opt/GridH2Table.java    |  34 ++-
 .../cache/IgniteCacheJoinQueryTest.java         | 303 +++++++++++++++++--
 .../IgniteCacheQuerySelfTestSuite.java          |   3 +
 4 files changed, 325 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/7fab7051/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index cb41250..1215141 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.query.h2;
 
+import java.util.Objects;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
@@ -1130,6 +1131,18 @@ public class IgniteH2Indexing implements GridQueryIndexing {
                     twoStepQry.spaces(spaces);
                 }
 
+                if (twoStepQry.distributedJoins()) {
+                    for (String table : twoStepQry.tables()) {
+                        GridH2Table tbl = dataTable(table);
+
+                        Objects.requireNonNull(tbl, table);
+
+                        if (!tbl.affinityColumnExists())
+                            throw new CacheException("Failed to run distributed join query, " +
+                                "affinity key is not included in table: " + table);
+                    }
+                }
+
                 meta = meta(stmt.getMetaData());
             }
             catch (IgniteCheckedException e) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/7fab7051/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
index f31f091..b4a056c 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
@@ -96,6 +96,9 @@ public class GridH2Table extends TableBase {
     /** */
     private final boolean snapshotEnabled;
 
+    /** */
+    private final boolean affinityColExists;
+
     /**
      * Creates table.
      *
@@ -113,17 +116,33 @@ public class GridH2Table extends TableBase {
         this.desc = desc;
         this.spaceName = spaceName;
 
+        boolean affinityColExists0 = true;
+
         if (desc != null && desc.context() != null) {
             String affKey = desc.type().affinityKey();
 
-            int affKeyColId = affKey == null ? KEY_COL : getColumn(desc.context().config().isSqlEscapeAll() ?
-                affKey : affKey.toUpperCase()).getColumnId();
+            int affKeyColId = -1;
+
+            if (affKey != null) {
+                String colName = desc.context().config().isSqlEscapeAll() ? affKey : affKey.toUpperCase();
+
+                if (doesColumnExist(colName))
+                    affKeyColId = getColumn(colName).getColumnId();
+                else
+                    affinityColExists0 = false;
+            }
+            else
+                affKeyColId = KEY_COL;
 
-            affKeyCol = indexColumn(affKeyColId, SortOrder.ASCENDING);
+            if (affinityColExists0) {
+                affKeyCol = indexColumn(affKeyColId, SortOrder.ASCENDING);
 
-            assert affKeyCol != null;
+                assert affKeyCol != null;
+            }
         }
 
+        affinityColExists = affinityColExists0;
+
         // Indexes must be created in the end when everything is ready.
         idxs = idxsFactory.createIndexes(this);
 
@@ -139,6 +158,13 @@ public class GridH2Table extends TableBase {
     }
 
     /**
+     * @return {@code True} if affinity key exists in this table.
+     */
+    public boolean affinityColumnExists() {
+        return affinityColExists;
+    }
+
+    /**
      * @return {@code true} If this is a partitioned table.
      */
     public boolean isPartitioned() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/7fab7051/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinQueryTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinQueryTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinQueryTest.java
index 5a86da9..6daa2e1 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinQueryTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinQueryTest.java
@@ -17,27 +17,47 @@
 
 package org.apache.ignite.internal.processors.cache;
 
-import org.apache.ignite.Ignite;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ThreadLocalRandom;
+import javax.cache.CacheException;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.CacheKeyConfiguration;
+import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
 /**
- * TODO IGNITE-1232.
+ *
  */
+@SuppressWarnings("unchecked")
 public class IgniteCacheJoinQueryTest extends GridCommonAbstractTest {
     /** */
     private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
 
     /** */
-    private static final int NODES = 1;
+    private static final int NODES = 4;
+
+    /** */
+    private boolean client;
 
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
@@ -47,12 +67,12 @@ public class IgniteCacheJoinQueryTest extends GridCommonAbstractTest {
 
         CacheKeyConfiguration keyCfg = new CacheKeyConfiguration();
 
-        keyCfg.setTypeName(TestKey1.class.getName());
+        keyCfg.setTypeName(PersonKeyWithAffinity.class.getName());
         keyCfg.setAffinityKeyFieldName("affKey");
 
         cfg.setCacheKeyConfiguration(keyCfg);
 
-        cfg.setMarshaller(null);
+        cfg.setClientMode(client);
 
         return cfg;
     }
@@ -61,7 +81,11 @@ public class IgniteCacheJoinQueryTest extends GridCommonAbstractTest {
     @Override protected void beforeTestsStarted() throws Exception {
         super.beforeTestsStarted();
 
-        startGridsMultiThreaded(NODES);
+        startGridsMultiThreaded(NODES - 1);
+
+        client = true;
+
+        startGrid(NODES - 1);
     }
 
     /** {@inheritDoc} */
@@ -74,44 +98,204 @@ public class IgniteCacheJoinQueryTest extends GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
-    public void testAffinityKey() throws Exception {
-        CacheConfiguration ccfg = new CacheConfiguration();
+    public void testAffinityKeyNotQueryField() throws Exception {
+        CacheConfiguration ccfg = cacheConfiguration(PARTITIONED, 1, true, false);
+
+        IgniteCache cache = ignite(0).createCache(ccfg);
+
+        try {
+            putData(cache, true);
+
+            for (int i = 0; i < NODES; i++) {
+                final IgniteCache cache0 = ignite(i).cache(ccfg.getName());
+
+                final String QRY = "select o.name, p.name " +
+                    "from Organization o, Person p " +
+                    "where p.orgId = o._key";
+
+                SqlFieldsQuery qry = new SqlFieldsQuery(QRY);
+
+                cache0.query(qry).getAll();
+
+                GridTestUtils.assertThrows(log, new Callable<Object>() {
+                    @Override public Object call() throws Exception {
+                        SqlFieldsQuery qry = new SqlFieldsQuery(QRY);
+
+                        qry.setDistributedJoins(true);
+
+                        cache0.query(qry).getAll();
+
+                        return null;
+                    }
+                }, CacheException.class, null);
+
+                cache0.query(qry).getAll();
+            }
+        }
+        finally {
+            ignite(0).destroyCache(ccfg.getName());
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testJoinQuery() throws Exception {
+        testJoinQuery(PARTITIONED, 0, false);
+
+        testJoinQuery(PARTITIONED, 1, false);
+
+        testJoinQuery(REPLICATED, 0, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testJoinQueryWithAffinityKey() throws Exception {
+        testJoinQuery(PARTITIONED, 0, true);
 
-        QueryEntity qryEntity = new QueryEntity();
-        qryEntity.setKeyType(TestKey1.class.getName());
-        qryEntity.setValueType(Integer.class.getName());
+        testJoinQuery(PARTITIONED, 1, true);
 
-        ccfg.setQueryEntities(F.asList(qryEntity));
+        testJoinQuery(REPLICATED, 0, true);
+    }
 
-        Ignite ignite = ignite(0);
+    /**
+     * @param cacheMode Cache mode.
+     * @param backups Number of backups.
+     * @param affKey If {@code true} uses key with affinity key field.
+     */
+    public void testJoinQuery(CacheMode cacheMode, int backups, boolean affKey) {
+        CacheConfiguration ccfg = cacheConfiguration(cacheMode, backups, affKey, affKey);
 
-        IgniteCache cache = ignite.createCache(ccfg);
+        IgniteCache cache = ignite(0).createCache(ccfg);
 
         try {
+            Map<Integer, Integer> cnts = putData(cache, affKey);
 
+            for (int i = 0; i < NODES; i++)
+                checkJoin(ignite(i).cache(ccfg.getName()), cnts);
         }
         finally {
-            ignite.destroyCache(ccfg.getName());
+            ignite(0).destroyCache(ccfg.getName());
+        }
+    }
+
+    /**
+     * @param cache Cache.
+     * @param personCnt Count of persons per organization.
+     */
+    private void checkJoin(IgniteCache cache, Map<Integer, Integer> personCnt) {
+        SqlFieldsQuery qry = new SqlFieldsQuery("select o.name, p.name " +
+            "from Organization o, Person p " +
+            "where p.orgId = o._key and o._key=?");
+
+        qry.setDistributedJoins(true);
+
+        for (int i = 0; i < personCnt.size(); i++) {
+            qry.setArgs(i);
+
+            List<List<Object>> res = cache.query(qry).getAll();
+
+            assertEquals((int)personCnt.get(i), res.size());
         }
     }
 
     /**
+     * @param cacheMode Cache mode.
+     * @param backups Number of backups.
+     * @param affKey If {@code true} uses key with affinity key field.
+     * @param includeAffKey If {@code true} includes affinity key field in query fields.
+     * @return Cache configuration.
+     */
+    private CacheConfiguration cacheConfiguration(CacheMode cacheMode,
+        int backups,
+        boolean affKey,
+        boolean includeAffKey) {
+        CacheConfiguration ccfg = new CacheConfiguration();
+
+        ccfg.setCacheMode(cacheMode);
+
+        if (cacheMode == PARTITIONED)
+            ccfg.setBackups(backups);
+
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+
+        QueryEntity person = new QueryEntity();
+        person.setKeyType(affKey ? PersonKeyWithAffinity.class.getName() : PersonKey.class.getName());
+        person.setValueType(Person.class.getName());
+        person.addQueryField("orgId", Integer.class.getName(), null);
+        person.addQueryField("id", Integer.class.getName(), null);
+        person.addQueryField("name", String.class.getName(), null);
+
+        if (includeAffKey)
+            person.addQueryField("affKey", Integer.class.getName(), null);
+
+        QueryEntity org = new QueryEntity();
+        org.setKeyType(Integer.class.getName());
+        org.setValueType(Organization.class.getName());
+        org.addQueryField("name", String.class.getName(), null);
+
+        ccfg.setQueryEntities(F.asList(person, org));
+
+        return ccfg;
+    }
+
+    /**
+     * @param cache Cache.
+     * @param affKey If {@code true} uses key with affinity key field.
+     * @return Count of persons per organization.
+     */
+    private Map<Integer, Integer> putData(IgniteCache cache, boolean affKey) {
+        Map<Integer, Integer> personCnt = new HashMap<>();
+
+        final int ORG_CNT = 10;
+
+        for (int i = 0; i < ORG_CNT; i++)
+            cache.put(i, new Organization("org-" + i));
+
+        Set<Integer> ids = new HashSet<>();
+
+        for (int i = 0; i < ORG_CNT; i++) {
+            int cnt = ThreadLocalRandom.current().nextInt(100);
+
+            for (int j = 0; j < cnt; j++) {
+                int personId = ThreadLocalRandom.current().nextInt();
+
+                while (!ids.add(personId))
+                    personId = ThreadLocalRandom.current().nextInt();
+
+                Object key = affKey ? new PersonKeyWithAffinity(personId) : new PersonKey(personId);
+
+                String name = "person-" + personId;
+
+                cache.put(key, new Person(i, name));
+
+                assertEquals(name, ((Person)cache.get(key)).name);
+            }
+
+            personCnt.put(i, cnt);
+        }
+
+        return personCnt;
+    }
+
+    /**
      *
      */
-    public static class TestKey1 {
+    public static class PersonKeyWithAffinity {
         /** */
-        private int key;
+        private int id;
 
         /** */
         private int affKey;
 
         /**
-         * @param key Key.
+         * @param id Key.
          */
-        public TestKey1(int key) {
-            this.key = key;
+        public PersonKeyWithAffinity(int id) {
+            this.id = id;
 
-            affKey = key + 1;
+            affKey = id + 1;
         }
 
         /** {@inheritDoc} */
@@ -122,14 +306,85 @@ public class IgniteCacheJoinQueryTest extends GridCommonAbstractTest {
             if (o == null || getClass() != o.getClass())
                 return false;
 
-            TestKey1 testKey1 = (TestKey1)o;
+            PersonKeyWithAffinity other = (PersonKeyWithAffinity)o;
 
-            return key == testKey1.key;
+            return id == other.id;
         }
 
         /** {@inheritDoc} */
         @Override public int hashCode() {
-            return key;
+            return id;
+        }
+    }
+
+    /**
+     *
+     */
+    public static class PersonKey {
+        /** */
+        private int id;
+
+        /**
+         * @param id Key.
+         */
+        public PersonKey(int id) {
+            this.id = id;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            PersonKey other = (PersonKey)o;
+
+            return id == other.id;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return id;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class Person implements Serializable {
+        /** */
+        @QuerySqlField
+        int orgId;
+
+        /** */
+        @QuerySqlField
+        String name;
+
+        /**
+         * @param orgId Organization ID.
+         * @param name Name.
+         */
+        public Person(int orgId, String name) {
+            this.orgId = orgId;
+            this.name = name;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class Organization implements Serializable {
+        /** */
+        @QuerySqlField
+        String name;
+
+        /**
+         * @param name Name.
+         */
+        public Organization(String name) {
+            this.name = name;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/7fab7051/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
index 6ed4004..685afc3 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
@@ -35,6 +35,7 @@ import org.apache.ignite.internal.processors.cache.IgniteBinaryWrappedObjectFiel
 import org.apache.ignite.internal.processors.cache.IgniteCacheCollocatedQuerySelfTest;
 import org.apache.ignite.internal.processors.cache.IgniteCacheDuplicateEntityConfigurationSelfTest;
 import org.apache.ignite.internal.processors.cache.IgniteCacheFieldsQueryNoDataSelfTest;
+import org.apache.ignite.internal.processors.cache.IgniteCacheJoinQueryTest;
 import org.apache.ignite.internal.processors.cache.IgniteCacheLargeResultSelfTest;
 import org.apache.ignite.internal.processors.cache.IgniteCacheNoClassQuerySelfTest;
 import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapEvictQueryTest;
@@ -246,6 +247,8 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite {
         suite.addTestSuite(IgniteCacheP2pUnmarshallingQueryErrorTest.class);
         suite.addTestSuite(IgniteCacheNoClassQuerySelfTest.class);
 
+        suite.addTestSuite(IgniteCacheJoinQueryTest.class);
+
         return suite;
     }
 }