You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/05/22 16:44:56 UTC
[5/8] ignite git commit: ignite-5075
ignite-5075
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/bf4bc78c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/bf4bc78c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/bf4bc78c
Branch: refs/heads/ignite-5075-pds
Commit: bf4bc78c63d295c68a6e2b4a7f26bab280ddd91d
Parents: b5eab96
Author: sboikov <sb...@gridgain.com>
Authored: Mon May 22 11:18:58 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon May 22 11:18:58 2017 +0300
----------------------------------------------------------------------
.../processors/cache/IgniteCacheGroupsTest.java | 402 ++++++++++++++++++-
1 file changed, 397 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/bf4bc78c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
index f1b5345..ee76c6e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
@@ -18,38 +18,49 @@
package org.apache.ignite.internal.processors.cache;
import java.io.Serializable;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import java.util.TreeSet;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeSet;
import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-import javax.cache.Cache;
import java.util.concurrent.locks.Lock;
+import javax.cache.Cache;
import javax.cache.CacheException;
+import javax.cache.configuration.Factory;
+import javax.cache.integration.CacheLoaderException;
+import javax.cache.integration.CacheWriterException;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.Ignition;
+import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheExistsException;
+import org.apache.ignite.cache.CacheInterceptor;
+import org.apache.ignite.cache.CacheInterceptorAdapter;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cache.affinity.AffinityKeyMapper;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cache.query.ScanQuery;
+import org.apache.ignite.cache.store.CacheStore;
+import org.apache.ignite.cache.store.CacheStoreAdapter;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.binary.BinaryMarshaller;
import org.apache.ignite.internal.processors.platform.cache.expiry.PlatformExpiryPolicyFactory;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.lang.GridPlainCallable;
@@ -1819,6 +1830,387 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest {
}
/**
+ * @return Cache configurations.
+ */
+ private CacheConfiguration[] interceptorConfigurations() {
+ CacheConfiguration[] ccfgs = new CacheConfiguration[6];
+
+ ccfgs[0] = cacheConfiguration(GROUP1, "c1", PARTITIONED, ATOMIC, 2, false).setInterceptor(new Interceptor1());
+ ccfgs[1] = cacheConfiguration(GROUP1, "c2", PARTITIONED, ATOMIC, 2, false).setInterceptor(new Interceptor2());
+ ccfgs[2] = cacheConfiguration(GROUP1, "c3", PARTITIONED, TRANSACTIONAL, 2, false).setInterceptor(new Interceptor1());
+ ccfgs[3] = cacheConfiguration(GROUP1, "c4", PARTITIONED, TRANSACTIONAL, 2, false).setInterceptor(new Interceptor2());
+ ccfgs[4] = cacheConfiguration(GROUP1, "c5", PARTITIONED, ATOMIC, 2, false);
+ ccfgs[5] = cacheConfiguration(GROUP1, "c6", PARTITIONED, TRANSACTIONAL, 2, false);
+
+ return ccfgs;
+ }
+
+ /**
+ * Tests caches in the same group with different {@link CacheInterceptor}s.
+ *
+ * @throws Exception If failed.
+ */
+ public void testInterceptors() throws Exception {
+ for (int i = 0; i < 4; i++) {
+ ccfgs = interceptorConfigurations();
+
+ startGrid(i);
+ }
+
+ Ignite node = ignite(0);
+
+ checkInterceptorPut(node.cache("c1"), "v1");
+ checkInterceptorPut(node.cache("c2"), "v2");
+ checkInterceptorPut(node.cache("c3"), "v1");
+ checkInterceptorPut(node.cache("c4"), "v2");
+
+ checkCache(0, "c5", 10);
+ checkCache(0, "c6", 10);
+ }
+
+ /**
+ * @param cache Cache.
+ * @param expVal Expected value.
+ */
+ private void checkInterceptorPut(IgniteCache cache, String expVal) {
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ for (int i = 0; i < 10; i++) {
+ Integer key = rnd.nextInt();
+
+ cache.put(key, i);
+
+ assertEquals(expVal, cache.get(key));
+ }
+ }
+
+ /**
+ * @return Cache configurations.
+ */
+ private CacheConfiguration[] cacheStoreConfigurations() {
+ CacheConfiguration[] ccfgs = new CacheConfiguration[6];
+
+ ccfgs[0] = cacheConfiguration(GROUP1, "c1", PARTITIONED, ATOMIC, 2, false).
+ setCacheStoreFactory(new StoreFactory1()).setReadThrough(true).setWriteThrough(true);
+
+ ccfgs[1] = cacheConfiguration(GROUP1, "c2", PARTITIONED, ATOMIC, 2, false).
+ setCacheStoreFactory(new StoreFactory2()).setReadThrough(true).setWriteThrough(true);
+
+ ccfgs[2] = cacheConfiguration(GROUP1, "c3", PARTITIONED, TRANSACTIONAL, 2, false).
+ setCacheStoreFactory(new StoreFactory1()).setReadThrough(true).setWriteThrough(true);
+
+ ccfgs[3] = cacheConfiguration(GROUP1, "c4", PARTITIONED, TRANSACTIONAL, 2, false).
+ setCacheStoreFactory(new StoreFactory2()).setReadThrough(true).setWriteThrough(true);
+
+ ccfgs[4] = cacheConfiguration(GROUP1, "c5", PARTITIONED, ATOMIC, 2, false);
+ ccfgs[5] = cacheConfiguration(GROUP1, "c6", PARTITIONED, TRANSACTIONAL, 2, false);
+
+ return ccfgs;
+ }
+
+ /**
+ * Tests caches in the same group with different {@link CacheStore}s.
+ *
+ * @throws Exception If failed.
+ */
+ public void testCacheStores() throws Exception {
+ for (int i = 0; i < 4; i++) {
+ ccfgs = cacheStoreConfigurations();
+
+ startGrid(i);
+ }
+
+ Ignite node = ignite(0);
+
+ checkStorePut(node.cache("c1"), Store1.map);
+ assertTrue(Store2.map.isEmpty());
+
+ checkStorePut(node.cache("c3"), Store1.map);
+ assertTrue(Store2.map.isEmpty());
+
+ Store1.map.clear();
+
+ checkStorePut(node.cache("c2"), Store2.map);
+ assertTrue(Store1.map.isEmpty());
+
+ checkStorePut(node.cache("c4"), Store2.map);
+ assertTrue(Store1.map.isEmpty());
+
+ Store2.map.clear();
+
+ checkCache(0, "c5", 10);
+ checkCache(0, "c6", 10);
+
+ assertTrue(Store1.map.isEmpty());
+ assertTrue(Store2.map.isEmpty());
+ }
+
+ /**
+ * @param cache Cache.
+ * @param storeMap Cache store data.
+ */
+ private void checkStorePut(IgniteCache cache, ConcurrentHashMap<Object, Object> storeMap) {
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ for (int i = 0; i < 10; i++) {
+ Integer key = rnd.nextInt();
+
+ storeMap.put(key, i);
+
+ assertEquals(i, cache.get(key));
+
+ cache.put(key, 10_000);
+
+ assertEquals(10_000, cache.get(key));
+ assertEquals(10_000, storeMap.get(key));
+ }
+ }
+
+ /**
+ * @return Cache configurations.
+ */
+ private CacheConfiguration[] mapperConfigurations() {
+ CacheConfiguration[] ccfgs = new CacheConfiguration[6];
+
+ ccfgs[0] = cacheConfiguration(GROUP1, "c1", PARTITIONED, ATOMIC, 2, false).setAffinityMapper(new Mapper1());
+ ccfgs[1] = cacheConfiguration(GROUP1, "c2", PARTITIONED, ATOMIC, 2, false).setAffinityMapper(new Mapper2());
+ ccfgs[2] = cacheConfiguration(GROUP1, "c3", PARTITIONED, TRANSACTIONAL, 2, false).setAffinityMapper(new Mapper1());
+ ccfgs[3] = cacheConfiguration(GROUP1, "c4", PARTITIONED, TRANSACTIONAL, 2, false).setAffinityMapper(new Mapper2());
+ ccfgs[4] = cacheConfiguration(GROUP1, "c5", PARTITIONED, ATOMIC, 2, false);
+ ccfgs[5] = cacheConfiguration(GROUP1, "c6", PARTITIONED, TRANSACTIONAL, 2, false);
+
+ return ccfgs;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAffinityMappers() throws Exception {
+ for (int i = 0; i < 4; i++) {
+ ccfgs = mapperConfigurations();
+
+ startGrid(i);
+ }
+
+ for (int i = 0; i < 4; i++)
+ checkAffinityMappers(ignite(i));
+
+ client = true;
+
+ startGrid(4);
+
+ checkAffinityMappers(ignite(4));
+
+ for (int i = 0; i < 5; i++) {
+ checkCache(i, "c1", 10);
+ checkCache(i, "c2", 10);
+ checkCache(i, "c3", 10);
+ checkCache(i, "c4", 10);
+ checkCache(i, "c5", 10);
+ checkCache(i, "c6", 10);
+ }
+ }
+
+ /**
+ * @param node Node.
+ */
+ private void checkAffinityMappers(Ignite node) {
+ Affinity aff1 = node.affinity("c1");
+ Affinity aff2 = node.affinity("c2");
+ Affinity aff3 = node.affinity("c3");
+ Affinity aff4 = node.affinity("c4");
+ Affinity aff5 = node.affinity("c5");
+ Affinity aff6 = node.affinity("c6");
+
+ RendezvousAffinityFunction func = new RendezvousAffinityFunction();
+
+ for (int i = 0; i < 100; i++) {
+ MapperTestKey1 k = new MapperTestKey1(i, i + 10);
+
+ assertEquals(i, aff1.partition(k));
+ assertEquals(i, aff3.partition(k));
+ assertEquals(i + 10, aff2.partition(k));
+ assertEquals(i + 10, aff4.partition(k));
+
+ int part;
+
+ if (node.configuration().getMarshaller() instanceof BinaryMarshaller)
+ part = func.partition(node.binary().toBinary(k));
+ else
+ part = func.partition(k);
+
+ assertEquals(part, aff5.partition(k));
+ assertEquals(part, aff6.partition(k));
+ }
+ }
+
+ /**
+ *
+ */
+ static class Mapper1 implements AffinityKeyMapper {
+ /** {@inheritDoc} */
+ @Override public Object affinityKey(Object key) {
+ if (key instanceof MapperTestKey1)
+ return ((MapperTestKey1)key).p1;
+ else if (key instanceof BinaryObject)
+ ((BinaryObject) key).field("p1");
+
+ return key;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void reset() {
+ // No-op.
+ }
+ }
+
+ /**
+ *
+ */
+ static class Mapper2 implements AffinityKeyMapper {
+ /** {@inheritDoc} */
+ @Override public Object affinityKey(Object key) {
+ if (key instanceof MapperTestKey1)
+ return ((MapperTestKey1)key).p2;
+ else if (key instanceof BinaryObject)
+ ((BinaryObject) key).field("p2");
+
+ return key;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void reset() {
+ // No-op.
+ }
+ }
+
+ /**
+ *
+ */
+ static class MapperTestKey1 {
+ /** */
+ final int p1;
+
+ /** */
+ final int p2;
+
+ /**
+ * @param p1 Field1.
+ * @param p2 Field2.
+ */
+ public MapperTestKey1(int p1, int p2) {
+ this.p1 = p1;
+ this.p2 = p2;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ MapperTestKey1 testKey1 = (MapperTestKey1)o;
+
+ return p1 == testKey1.p1 && p2 == testKey1.p2;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ int res = p1;
+ res = 31 * res + p2;
+ return res;
+ }
+ }
+
+ /**
+ *
+ */
+ static class Interceptor1 extends CacheInterceptorAdapter<Object, Object> {
+ /** {@inheritDoc} */
+ @Override public Object onBeforePut(Cache.Entry<Object, Object> entry, Object newVal) {
+ return "v1";
+ }
+ }
+
+ /**
+ *
+ */
+ static class Interceptor2 extends CacheInterceptorAdapter<Object, Object> {
+ /** {@inheritDoc} */
+ @Override public Object onBeforePut(Cache.Entry<Object, Object> entry, Object newVal) {
+ return "v2";
+ }
+ }
+
+ /**
+ *
+ */
+ static class StoreFactory1 implements Factory<CacheStore> {
+ /** {@inheritDoc} */
+ @Override public CacheStore create() {
+ return new Store1();
+ }
+ }
+
+ /**
+ *
+ */
+ static class Store1 extends CacheStoreAdapter {
+ /** */
+ static ConcurrentHashMap<Object, Object> map = new ConcurrentHashMap<>();
+
+ /** {@inheritDoc} */
+ @Override public Object load(Object key) throws CacheLoaderException {
+ return map.get(key);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void write(Cache.Entry entry) throws CacheWriterException {
+ map.put(entry.getKey(), entry.getValue());
+ }
+
+ /** {@inheritDoc} */
+ @Override public void delete(Object key) throws CacheWriterException {
+ map.remove(key);
+ }
+ }
+
+ /**
+ *
+ */
+ static class StoreFactory2 implements Factory<CacheStore> {
+ /** {@inheritDoc} */
+ @Override public CacheStore create() {
+ return new Store2();
+ }
+ }
+
+ /**
+ *
+ */
+ static class Store2 extends CacheStoreAdapter {
+ /** */
+ static ConcurrentHashMap<Object, Object> map = new ConcurrentHashMap<>();
+
+ /** {@inheritDoc} */
+ @Override public Object load(Object key) throws CacheLoaderException {
+ return map.get(key);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void write(Cache.Entry entry) throws CacheWriterException {
+ map.put(entry.getKey(), entry.getValue());
+ }
+
+ /** {@inheritDoc} */
+ @Override public void delete(Object key) throws CacheWriterException {
+ map.remove(key);
+ }
+ }
+
+ /**
* @param rnd Random.
* @param cache Cache.
*/