You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/05/22 16:44:56 UTC

[5/8] ignite git commit: ignite-5075

ignite-5075


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/bf4bc78c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/bf4bc78c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/bf4bc78c

Branch: refs/heads/ignite-5075-pds
Commit: bf4bc78c63d295c68a6e2b4a7f26bab280ddd91d
Parents: b5eab96
Author: sboikov <sb...@gridgain.com>
Authored: Mon May 22 11:18:58 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon May 22 11:18:58 2017 +0300

----------------------------------------------------------------------
 .../processors/cache/IgniteCacheGroupsTest.java | 402 ++++++++++++++++++-
 1 file changed, 397 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/bf4bc78c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
index f1b5345..ee76c6e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
@@ -18,38 +18,49 @@
 package org.apache.ignite.internal.processors.cache;
 
 import java.io.Serializable;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import java.util.TreeSet;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeSet;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
-import javax.cache.Cache;
 import java.util.concurrent.locks.Lock;
+import javax.cache.Cache;
 import javax.cache.CacheException;
+import javax.cache.configuration.Factory;
+import javax.cache.integration.CacheLoaderException;
+import javax.cache.integration.CacheWriterException;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.Ignition;
+import org.apache.ignite.binary.BinaryObject;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheExistsException;
+import org.apache.ignite.cache.CacheInterceptor;
+import org.apache.ignite.cache.CacheInterceptorAdapter;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.CachePeekMode;
 import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cache.affinity.AffinityKeyMapper;
 import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
 import org.apache.ignite.cache.query.ScanQuery;
+import org.apache.ignite.cache.store.CacheStore;
+import org.apache.ignite.cache.store.CacheStoreAdapter;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.binary.BinaryMarshaller;
 import org.apache.ignite.internal.processors.platform.cache.expiry.PlatformExpiryPolicyFactory;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.lang.GridPlainCallable;
@@ -1819,6 +1830,387 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest {
     }
 
     /**
+     * @return Cache configurations.
+     */
+    private CacheConfiguration[] interceptorConfigurations() {
+        CacheConfiguration[] ccfgs = new CacheConfiguration[6];
+
+        ccfgs[0] = cacheConfiguration(GROUP1, "c1", PARTITIONED, ATOMIC, 2, false).setInterceptor(new Interceptor1());
+        ccfgs[1] = cacheConfiguration(GROUP1, "c2", PARTITIONED, ATOMIC, 2, false).setInterceptor(new Interceptor2());
+        ccfgs[2] = cacheConfiguration(GROUP1, "c3", PARTITIONED, TRANSACTIONAL, 2, false).setInterceptor(new Interceptor1());
+        ccfgs[3] = cacheConfiguration(GROUP1, "c4", PARTITIONED, TRANSACTIONAL, 2, false).setInterceptor(new Interceptor2());
+        ccfgs[4] = cacheConfiguration(GROUP1, "c5", PARTITIONED, ATOMIC, 2, false);
+        ccfgs[5] = cacheConfiguration(GROUP1, "c6", PARTITIONED, TRANSACTIONAL, 2, false);
+
+        return ccfgs;
+    }
+
+    /**
+     * Tests caches in the same group with different {@link CacheInterceptor}s.
+     *
+     * @throws Exception If failed.
+     */
+    public void testInterceptors() throws Exception {
+        for (int i = 0; i < 4; i++) {
+            ccfgs = interceptorConfigurations();
+
+            startGrid(i);
+        }
+
+        Ignite node = ignite(0);
+
+        checkInterceptorPut(node.cache("c1"), "v1");
+        checkInterceptorPut(node.cache("c2"), "v2");
+        checkInterceptorPut(node.cache("c3"), "v1");
+        checkInterceptorPut(node.cache("c4"), "v2");
+
+        checkCache(0, "c5", 10);
+        checkCache(0, "c6", 10);
+    }
+
+    /**
+     * @param cache Cache.
+     * @param expVal Expected value.
+     */
+    private void checkInterceptorPut(IgniteCache cache, String expVal) {
+        ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+        for (int i = 0; i < 10; i++) {
+            Integer key = rnd.nextInt();
+
+            cache.put(key, i);
+
+            assertEquals(expVal, cache.get(key));
+        }
+    }
+
+    /**
+     * @return Cache configurations.
+     */
+    private CacheConfiguration[] cacheStoreConfigurations() {
+        CacheConfiguration[] ccfgs = new CacheConfiguration[6];
+
+        ccfgs[0] = cacheConfiguration(GROUP1, "c1", PARTITIONED, ATOMIC, 2, false).
+            setCacheStoreFactory(new StoreFactory1()).setReadThrough(true).setWriteThrough(true);
+
+        ccfgs[1] = cacheConfiguration(GROUP1, "c2", PARTITIONED, ATOMIC, 2, false).
+            setCacheStoreFactory(new StoreFactory2()).setReadThrough(true).setWriteThrough(true);
+
+        ccfgs[2] = cacheConfiguration(GROUP1, "c3", PARTITIONED, TRANSACTIONAL, 2, false).
+            setCacheStoreFactory(new StoreFactory1()).setReadThrough(true).setWriteThrough(true);
+
+        ccfgs[3] = cacheConfiguration(GROUP1, "c4", PARTITIONED, TRANSACTIONAL, 2, false).
+            setCacheStoreFactory(new StoreFactory2()).setReadThrough(true).setWriteThrough(true);
+
+        ccfgs[4] = cacheConfiguration(GROUP1, "c5", PARTITIONED, ATOMIC, 2, false);
+        ccfgs[5] = cacheConfiguration(GROUP1, "c6", PARTITIONED, TRANSACTIONAL, 2, false);
+
+        return ccfgs;
+    }
+
+    /**
+     * Tests caches in the same group with different {@link CacheStore}s.
+     *
+     * @throws Exception If failed.
+     */
+    public void testCacheStores() throws Exception {
+        for (int i = 0; i < 4; i++) {
+            ccfgs = cacheStoreConfigurations();
+
+            startGrid(i);
+        }
+
+        Ignite node = ignite(0);
+
+        checkStorePut(node.cache("c1"), Store1.map);
+        assertTrue(Store2.map.isEmpty());
+
+        checkStorePut(node.cache("c3"), Store1.map);
+        assertTrue(Store2.map.isEmpty());
+
+        Store1.map.clear();
+
+        checkStorePut(node.cache("c2"), Store2.map);
+        assertTrue(Store1.map.isEmpty());
+
+        checkStorePut(node.cache("c4"), Store2.map);
+        assertTrue(Store1.map.isEmpty());
+
+        Store2.map.clear();
+
+        checkCache(0, "c5", 10);
+        checkCache(0, "c6", 10);
+
+        assertTrue(Store1.map.isEmpty());
+        assertTrue(Store2.map.isEmpty());
+    }
+
+    /**
+     * @param cache Cache.
+     * @param storeMap Cache store data.
+     */
+    private void checkStorePut(IgniteCache cache, ConcurrentHashMap<Object, Object> storeMap) {
+        ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+        for (int i = 0; i < 10; i++) {
+            Integer key = rnd.nextInt();
+
+            storeMap.put(key, i);
+
+            assertEquals(i, cache.get(key));
+
+            cache.put(key, 10_000);
+
+            assertEquals(10_000, cache.get(key));
+            assertEquals(10_000, storeMap.get(key));
+        }
+    }
+
+    /**
+     * @return Cache configurations.
+     */
+    private CacheConfiguration[] mapperConfigurations() {
+        CacheConfiguration[] ccfgs = new CacheConfiguration[6];
+
+        ccfgs[0] = cacheConfiguration(GROUP1, "c1", PARTITIONED, ATOMIC, 2, false).setAffinityMapper(new Mapper1());
+        ccfgs[1] = cacheConfiguration(GROUP1, "c2", PARTITIONED, ATOMIC, 2, false).setAffinityMapper(new Mapper2());
+        ccfgs[2] = cacheConfiguration(GROUP1, "c3", PARTITIONED, TRANSACTIONAL, 2, false).setAffinityMapper(new Mapper1());
+        ccfgs[3] = cacheConfiguration(GROUP1, "c4", PARTITIONED, TRANSACTIONAL, 2, false).setAffinityMapper(new Mapper2());
+        ccfgs[4] = cacheConfiguration(GROUP1, "c5", PARTITIONED, ATOMIC, 2, false);
+        ccfgs[5] = cacheConfiguration(GROUP1, "c6", PARTITIONED, TRANSACTIONAL, 2, false);
+
+        return ccfgs;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAffinityMappers() throws Exception {
+        for (int i = 0; i < 4; i++) {
+            ccfgs = mapperConfigurations();
+
+            startGrid(i);
+        }
+
+        for (int i = 0; i < 4; i++)
+            checkAffinityMappers(ignite(i));
+
+        client = true;
+
+        startGrid(4);
+
+        checkAffinityMappers(ignite(4));
+
+        for (int i = 0; i < 5; i++) {
+            checkCache(i, "c1", 10);
+            checkCache(i, "c2", 10);
+            checkCache(i, "c3", 10);
+            checkCache(i, "c4", 10);
+            checkCache(i, "c5", 10);
+            checkCache(i, "c6", 10);
+        }
+    }
+
+    /**
+     * @param node Node.
+     */
+    private void checkAffinityMappers(Ignite node) {
+        Affinity aff1 = node.affinity("c1");
+        Affinity aff2 = node.affinity("c2");
+        Affinity aff3 = node.affinity("c3");
+        Affinity aff4 = node.affinity("c4");
+        Affinity aff5 = node.affinity("c5");
+        Affinity aff6 = node.affinity("c6");
+
+        RendezvousAffinityFunction func = new RendezvousAffinityFunction();
+
+        for (int i = 0; i < 100; i++) {
+            MapperTestKey1 k = new MapperTestKey1(i, i + 10);
+
+            assertEquals(i, aff1.partition(k));
+            assertEquals(i, aff3.partition(k));
+            assertEquals(i + 10, aff2.partition(k));
+            assertEquals(i + 10, aff4.partition(k));
+
+            int part;
+
+            if (node.configuration().getMarshaller() instanceof BinaryMarshaller)
+                part = func.partition(node.binary().toBinary(k));
+            else
+                part = func.partition(k);
+
+            assertEquals(part, aff5.partition(k));
+            assertEquals(part, aff6.partition(k));
+        }
+    }
+
+    /**
+     *
+     */
+    static class Mapper1 implements AffinityKeyMapper {
+        /** {@inheritDoc} */
+        @Override public Object affinityKey(Object key) {
+            if (key instanceof MapperTestKey1)
+                return ((MapperTestKey1)key).p1;
+            else if (key instanceof BinaryObject)
+                ((BinaryObject) key).field("p1");
+
+            return key;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void reset() {
+            // No-op.
+        }
+    }
+
+    /**
+     *
+     */
+    static class Mapper2 implements AffinityKeyMapper {
+        /** {@inheritDoc} */
+        @Override public Object affinityKey(Object key) {
+            if (key instanceof MapperTestKey1)
+                return ((MapperTestKey1)key).p2;
+            else if (key instanceof BinaryObject)
+                ((BinaryObject) key).field("p2");
+
+            return key;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void reset() {
+            // No-op.
+        }
+    }
+
+    /**
+     *
+     */
+    static class MapperTestKey1 {
+        /** */
+        final int p1;
+
+        /** */
+        final int p2;
+
+        /**
+         * @param p1 Field1.
+         * @param p2 Field2.
+         */
+        public MapperTestKey1(int p1, int p2) {
+            this.p1 = p1;
+            this.p2 = p2;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            MapperTestKey1 testKey1 = (MapperTestKey1)o;
+
+            return p1 == testKey1.p1 && p2 == testKey1.p2;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            int res = p1;
+            res = 31 * res + p2;
+            return res;
+        }
+    }
+
+    /**
+     *
+     */
+    static class Interceptor1 extends CacheInterceptorAdapter<Object, Object> {
+        /** {@inheritDoc} */
+        @Override public Object onBeforePut(Cache.Entry<Object, Object> entry, Object newVal) {
+            return "v1";
+        }
+    }
+
+    /**
+     *
+     */
+    static class Interceptor2 extends CacheInterceptorAdapter<Object, Object> {
+        /** {@inheritDoc} */
+        @Override public Object onBeforePut(Cache.Entry<Object, Object> entry, Object newVal) {
+            return "v2";
+        }
+    }
+
+    /**
+     *
+     */
+    static class StoreFactory1 implements Factory<CacheStore> {
+        /** {@inheritDoc} */
+        @Override public CacheStore create() {
+            return new Store1();
+        }
+    }
+
+    /**
+     *
+     */
+    static class Store1 extends CacheStoreAdapter {
+        /** */
+        static ConcurrentHashMap<Object, Object> map = new ConcurrentHashMap<>();
+
+        /** {@inheritDoc} */
+        @Override public Object load(Object key) throws CacheLoaderException {
+            return map.get(key);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void write(Cache.Entry entry) throws CacheWriterException {
+            map.put(entry.getKey(), entry.getValue());
+        }
+
+        /** {@inheritDoc} */
+        @Override public void delete(Object key) throws CacheWriterException {
+            map.remove(key);
+        }
+    }
+
+    /**
+     *
+     */
+    static class StoreFactory2 implements Factory<CacheStore> {
+        /** {@inheritDoc} */
+        @Override public CacheStore create() {
+            return new Store2();
+        }
+    }
+
+    /**
+     *
+     */
+    static class Store2 extends CacheStoreAdapter {
+        /** */
+        static ConcurrentHashMap<Object, Object> map = new ConcurrentHashMap<>();
+
+        /** {@inheritDoc} */
+        @Override public Object load(Object key) throws CacheLoaderException {
+            return map.get(key);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void write(Cache.Entry entry) throws CacheWriterException {
+            map.put(entry.getKey(), entry.getValue());
+        }
+
+        /** {@inheritDoc} */
+        @Override public void delete(Object key) throws CacheWriterException {
+            map.remove(key);
+        }
+    }
+
+    /**
      * @param rnd Random.
      * @param cache Cache.
      */