You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by nt...@apache.org on 2016/03/28 21:35:16 UTC
ignite git commit: IGNITE-2630 Added CacheInterceptorEntry class with
getPartitionCounter method. Added tests.
Repository: ignite
Updated Branches:
refs/heads/ignite-2630 [created] 8a9a048c9
IGNITE-2630 Added CacheInterceptorEntry class with getPartitionCounter method. Added tests.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8a9a048c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8a9a048c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8a9a048c
Branch: refs/heads/ignite-2630
Commit: 8a9a048c92fb24eacd82a75646062e0c4aa2e406
Parents: 35fd10d
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Mon Mar 28 22:35:17 2016 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Mon Mar 28 22:35:17 2016 +0300
----------------------------------------------------------------------
.../ignite/cache/CacheInterceptorEntry.java | 39 +
.../processors/cache/CacheLazyEntry.java | 46 +-
.../processors/cache/GridCacheMapEntry.java | 19 +-
...torPartitionCounterRandomOperationsTest.java | 967 +++++++++++++++++++
.../IgniteCacheInterceptorSelfTestSuite.java | 1 +
5 files changed, 1062 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/8a9a048c/modules/core/src/main/java/org/apache/ignite/cache/CacheInterceptorEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/CacheInterceptorEntry.java b/modules/core/src/main/java/org/apache/ignite/cache/CacheInterceptorEntry.java
new file mode 100644
index 0000000..a9bd8f5
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/cache/CacheInterceptorEntry.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache;
+
+import javax.cache.Cache;
+
+/**
+ * A cache interceptor map entry.
+ *
+ * @param <K> the type of key
+ * @param <V> the type of value
+ */
+public abstract class CacheInterceptorEntry<K, V> implements Cache.Entry<K, V> {
+ /**
+ * Each cache update increases partition counter. The same cache updates have on the same value of counter
+ * on primary and backup nodes. This value can be useful to communicate with external applications.
+ * The value has sense only for entries get by {@link CacheInterceptor#onAfterPut(Cache.Entry)} and
+ * {@link CacheInterceptor#onAfterRemove(Cache.Entry)} methods. For entries got by other methods will return
+ * {@code 0}.
+ *
+ * @return Value of counter for this entry.
+ */
+ public abstract long getPartitionUpdateCounter();
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8a9a048c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLazyEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLazyEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLazyEntry.java
index 6ec17c0..b7db62d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLazyEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLazyEntry.java
@@ -17,15 +17,15 @@
package org.apache.ignite.internal.processors.cache;
-import javax.cache.Cache;
import org.apache.ignite.Ignite;
+import org.apache.ignite.cache.CacheInterceptorEntry;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
/**
*
*/
-public class CacheLazyEntry<K, V> implements Cache.Entry<K, V> {
+public class CacheLazyEntry<K, V> extends CacheInterceptorEntry<K, V> {
/** Cache context. */
protected GridCacheContext cctx;
@@ -46,6 +46,9 @@ public class CacheLazyEntry<K, V> implements Cache.Entry<K, V> {
/** Keep binary flag. */
private boolean keepBinary;
+ /** Update counter. */
+ private Long updateCntr;
+
/**
* @param cctx Cache context.
* @param keyObj Key cache object.
@@ -85,6 +88,31 @@ public class CacheLazyEntry<K, V> implements Cache.Entry<K, V> {
K key,
CacheObject valObj,
V val,
+ boolean keepBinary,
+ Long updateCntr
+ ) {
+ this.cctx = ctx;
+ this.keyObj = keyObj;
+ this.key = key;
+ this.valObj = valObj;
+ this.val = val;
+ this.keepBinary = keepBinary;
+ this.updateCntr = updateCntr;
+ }
+
+ /**
+ * @param ctx Cache context.
+ * @param keyObj Key cache object.
+ * @param key Key value.
+ * @param valObj Cache object
+ * @param keepBinary Keep binary flag.
+ * @param val Cache value.
+ */
+ public CacheLazyEntry(GridCacheContext<K, V> ctx,
+ KeyCacheObject keyObj,
+ K key,
+ CacheObject valObj,
+ V val,
boolean keepBinary
) {
this.cctx = ctx;
@@ -137,6 +165,20 @@ public class CacheLazyEntry<K, V> implements Cache.Entry<K, V> {
}
/** {@inheritDoc} */
+ @Override public long getPartitionUpdateCounter() {
+ return updateCntr == null ? 0L : updateCntr;
+ }
+
+ /**
+ * Sets update counter.
+ *
+ * @param updateCntr Update counter.
+ */
+ public void updateCounter(Long updateCntr) {
+ this.updateCntr = updateCntr;
+ }
+
+ /** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public <T> T unwrap(Class<T> cls) {
if (cls.isAssignableFrom(Ignite.class))
http://git-wip-us.apache.org/repos/asf/ignite/blob/8a9a048c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index c5df29b..58347f6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -1253,7 +1253,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
cctx.store().put(tx, key, val, newVer);
if (intercept)
- cctx.config().getInterceptor().onAfterPut(new CacheLazyEntry(cctx, key, key0, val, val0, keepBinary));
+ cctx.config().getInterceptor().onAfterPut(new CacheLazyEntry(cctx, key, key0, val, val0, keepBinary, updateCntr0));
return valid ? new GridCacheUpdateTxResult(true, retval ? old : null, updateCntr0) :
new GridCacheUpdateTxResult(false, null);
@@ -1303,7 +1303,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
IgniteBiTuple<Boolean, Object> interceptRes = null;
- Cache.Entry entry0 = null;
+ CacheLazyEntry entry0 = null;
Long updateCntr0;
@@ -1468,8 +1468,11 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
onMarkedObsolete();
}
- if (intercept)
+ if (intercept) {
+ entry0.updateCounter(updateCntr0);
+
cctx.config().getInterceptor().onAfterRemove(entry0);
+ }
if (valid) {
CacheObject ret;
@@ -1816,9 +1819,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
if (intercept) {
if (op == GridCacheOperation.UPDATE)
- cctx.config().getInterceptor().onAfterPut(new CacheLazyEntry(cctx, key, key0, updated, updated0, keepBinary));
+ cctx.config().getInterceptor().onAfterPut(new CacheLazyEntry(cctx, key, key0, updated, updated0, keepBinary, 0L));
else
- cctx.config().getInterceptor().onAfterRemove(new CacheLazyEntry(cctx, key, key0, old, old0, keepBinary));
+ cctx.config().getInterceptor().onAfterRemove(new CacheLazyEntry(cctx, key, key0, old, old0, keepBinary, 0L));
}
}
@@ -2389,7 +2392,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
else {
if (intercept) {
interceptRes = cctx.config().getInterceptor().onBeforeRemove(new CacheLazyEntry(cctx, key, key0,
- oldVal, old0, keepBinary));
+ oldVal, old0, keepBinary, updateCntr0));
if (cctx.cancelRemove(interceptRes))
return new GridCacheUpdateAtomicResult(false,
@@ -2492,9 +2495,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
if (intercept) {
if (op == GridCacheOperation.UPDATE)
- cctx.config().getInterceptor().onAfterPut(new CacheLazyEntry(cctx, key, key0, updated, updated0, keepBinary));
+ cctx.config().getInterceptor().onAfterPut(new CacheLazyEntry(cctx, key, key0, updated, updated0, keepBinary, updateCntr0));
else
- cctx.config().getInterceptor().onAfterRemove(new CacheLazyEntry(cctx, key, key0, oldVal, old0, keepBinary));
+ cctx.config().getInterceptor().onAfterRemove(new CacheLazyEntry(cctx, key, key0, oldVal, old0, keepBinary, updateCntr0));
if (interceptRes != null)
oldVal = cctx.toCacheObject(cctx.unwrapTemporary(interceptRes.get2()));
http://git-wip-us.apache.org/repos/asf/ignite/blob/8a9a048c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheInterceptorPartitionCounterRandomOperationsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheInterceptorPartitionCounterRandomOperationsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheInterceptorPartitionCounterRandomOperationsTest.java
new file mode 100644
index 0000000..eaae7c5
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheInterceptorPartitionCounterRandomOperationsTest.java
@@ -0,0 +1,967 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import javax.cache.Cache;
+import javax.cache.configuration.Factory;
+import javax.cache.integration.CacheLoaderException;
+import javax.cache.integration.CacheWriterException;
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheInterceptorAdapter;
+import org.apache.ignite.cache.CacheInterceptorEntry;
+import org.apache.ignite.cache.CacheMemoryMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cache.store.CacheStore;
+import org.apache.ignite.cache.store.CacheStoreAdapter;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+import org.eclipse.jetty.util.BlockingArrayQueue;
+import org.jetbrains.annotations.NotNull;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED;
+import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_VALUES;
+import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
+
+/**
+ *
+ */
+public class CacheInterceptorPartitionCounterRandomOperationsTest extends GridCommonAbstractTest {
+ /** */
+ private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private static final int NODES = 5;
+
+ /** */
+ private static final int KEYS = 50;
+
+ /** */
+ private static final int VALS = 10;
+
+ /** */
+ public static final int ITERATION_CNT = 100;
+
+ /** */
+ private boolean client;
+
+ /** */
+ private static ConcurrentMap<UUID, BlockingQueue<Cache.Entry<QueryTestKey, QueryTestValue>>>
+ afterPutEvts = new ConcurrentHashMap<>();
+
+ /** */
+ private static ConcurrentMap<UUID, BlockingQueue<Cache.Entry<QueryTestKey, QueryTestValue>>>
+ afterRmvEvts = new ConcurrentHashMap<>();
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+ cfg.setClientMode(client);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ startGridsMultiThreaded(NODES - 1);
+
+ client = true;
+
+ startGrid(NODES - 1);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ stopAllGrids();
+
+ super.afterTestsStopped();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ afterPutEvts.clear();
+ afterRmvEvts.clear();
+
+ for (int i = 0; i < NODES; i++) {
+ afterRmvEvts.put(grid(i).cluster().localNode().id(),
+ new BlockingArrayQueue<Cache.Entry<QueryTestKey, QueryTestValue>>());
+ afterPutEvts.put(grid(i).cluster().localNode().id(),
+ new BlockingArrayQueue<Cache.Entry<QueryTestKey, QueryTestValue>>());
+ }
+ }
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAtomic() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+ 1,
+ ATOMIC,
+ ONHEAP_TIERED,
+ false);
+
+ doTestPartitionCounterOperation(ccfg);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAtomicReplicated() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED,
+ 0,
+ ATOMIC,
+ ONHEAP_TIERED,
+ false);
+
+ doTestPartitionCounterOperation(ccfg);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAtomicOffheapValues() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+ 1,
+ ATOMIC,
+ OFFHEAP_VALUES,
+ false);
+
+ doTestPartitionCounterOperation(ccfg);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAtomicOffheapTiered() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+ 1,
+ ATOMIC,
+ OFFHEAP_TIERED,
+ false);
+
+ doTestPartitionCounterOperation(ccfg);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAtomicNoBackups() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+ 0,
+ ATOMIC,
+ ONHEAP_TIERED,
+ false);
+
+ doTestPartitionCounterOperation(ccfg);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTx() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+ 1,
+ TRANSACTIONAL,
+ ONHEAP_TIERED,
+ false);
+
+ doTestPartitionCounterOperation(ccfg);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxExplicit() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+ 1,
+ TRANSACTIONAL,
+ ONHEAP_TIERED,
+ false);
+
+ doTestPartitionCounterOperation(ccfg);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxReplicated() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED,
+ 0,
+ TRANSACTIONAL,
+ ONHEAP_TIERED,
+ false);
+
+ doTestPartitionCounterOperation(ccfg);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxOffheapValues() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+ 1,
+ TRANSACTIONAL,
+ OFFHEAP_VALUES,
+ false);
+
+ doTestPartitionCounterOperation(ccfg);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxOffheapValuesExplicit() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+ 1,
+ TRANSACTIONAL,
+ OFFHEAP_VALUES,
+ false);
+
+ doTestPartitionCounterOperation(ccfg);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxOffheapTiered() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+ 1,
+ TRANSACTIONAL,
+ OFFHEAP_TIERED,
+ false);
+
+ doTestPartitionCounterOperation(ccfg);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxOffheapTieredExplicit() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+ 1,
+ TRANSACTIONAL,
+ OFFHEAP_TIERED,
+ false);
+
+ doTestPartitionCounterOperation(ccfg);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxNoBackups() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+ 0,
+ TRANSACTIONAL,
+ ONHEAP_TIERED,
+ false);
+
+ doTestPartitionCounterOperation(ccfg);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxNoBackupsExplicit() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+ 0,
+ TRANSACTIONAL,
+ ONHEAP_TIERED,
+ false);
+
+ doTestPartitionCounterOperation(ccfg);
+ }
+
+ /**
+ * @param ccfg Cache configuration.
+ * @throws Exception If failed.
+ */
+ protected void doTestPartitionCounterOperation(CacheConfiguration<Object, Object> ccfg)
+ throws Exception {
+ ignite(0).createCache(ccfg);
+
+ try {
+ long seed = System.currentTimeMillis();
+
+ Random rnd = new Random(seed);
+
+ log.info("Random seed: " + seed);
+
+ ConcurrentMap<Object, Object> expData = new ConcurrentHashMap<>();
+
+ Map<Integer, Long> partCntr = new ConcurrentHashMap<>();
+
+ for (int i = 0; i < ITERATION_CNT; i++) {
+ if (i % 20 == 0)
+ log.info("Iteration: " + i);
+
+ for (int idx = 0; idx < NODES; idx++)
+ randomUpdate(rnd, expData, partCntr, grid(idx).cache(ccfg.getName()));
+ }
+ }
+ finally {
+ ignite(0).destroyCache(ccfg.getName());
+ }
+ }
+
+ /**
+ * @param rnd Random generator.
+ * @param expData Expected cache data.
+ * @param partCntr Partition counter.
+ * @param cache Cache.
+ * @throws Exception If failed.
+ */
+ private void randomUpdate(
+ Random rnd,
+ ConcurrentMap<Object, Object> expData,
+ Map<Integer, Long> partCntr,
+ IgniteCache<Object, Object> cache)
+ throws Exception {
+ Object key = new QueryTestKey(rnd.nextInt(1));
+ Object newVal = value(rnd);
+ Object oldVal = expData.get(key);
+
+ int op = rnd.nextInt(11);
+
+ Ignite ignite = cache.unwrap(Ignite.class);
+
+ Transaction tx = null;
+
+ if (cache.getConfiguration(CacheConfiguration.class).getAtomicityMode() == TRANSACTIONAL && rnd.nextBoolean())
+ tx = ignite.transactions().txStart(txRandomConcurrency(rnd), txRandomIsolation(rnd));
+
+ try {
+ //log.info("Random operation [key=" + key + ", op=" + op + ']');
+
+ switch (op) {
+ case 0: {
+ cache.put(key, newVal);
+
+ if (tx != null)
+ tx.commit();
+
+ updatePartitionCounter(cache, key, partCntr);
+
+ waitAndCheckEvent(cache, partCntr, affinity(cache), key, newVal, oldVal, false);
+
+ expData.put(key, newVal);
+
+ break;
+ }
+
+ case 1: {
+ cache.getAndPut(key, newVal);
+
+ if (tx != null)
+ tx.commit();
+
+ updatePartitionCounter(cache, key, partCntr);
+
+ waitAndCheckEvent(cache, partCntr, affinity(cache), key, newVal, oldVal, false);
+
+ expData.put(key, newVal);
+
+ break;
+ }
+
+ case 2: {
+ cache.remove(key);
+
+ if (tx != null)
+ tx.commit();
+
+ updatePartitionCounter(cache, key, partCntr);
+
+ waitAndCheckEvent(cache, partCntr, affinity(cache), key, null, oldVal, true);
+
+ expData.remove(key);
+
+ break;
+ }
+
+ case 3: {
+ cache.getAndRemove(key);
+
+ if (tx != null)
+ tx.commit();
+
+ updatePartitionCounter(cache, key, partCntr);
+
+ waitAndCheckEvent(cache, partCntr, affinity(cache), key, null, oldVal, true);
+
+ expData.remove(key);
+
+ break;
+ }
+
+ case 4: {
+ cache.invoke(key, new EntrySetValueProcessor(newVal, rnd.nextBoolean()));
+
+ if (tx != null)
+ tx.commit();
+
+ updatePartitionCounter(cache, key, partCntr);
+
+ waitAndCheckEvent(cache, partCntr, affinity(cache), key, newVal, oldVal, false);
+
+ expData.put(key, newVal);
+
+ break;
+ }
+
+ case 5: {
+ cache.invoke(key, new EntrySetValueProcessor(null, rnd.nextBoolean()));
+
+ if (tx != null)
+ tx.commit();
+
+ updatePartitionCounter(cache, key, partCntr);
+
+ waitAndCheckEvent(cache, partCntr, affinity(cache), key, null, oldVal, true);
+
+ expData.remove(key);
+
+ break;
+ }
+
+ case 6: {
+ cache.putIfAbsent(key, newVal);
+
+ if (tx != null)
+ tx.commit();
+
+ if (oldVal == null) {
+ updatePartitionCounter(cache, key, partCntr);
+
+ waitAndCheckEvent(cache, partCntr, affinity(cache), key, newVal, null, false);
+
+ expData.put(key, newVal);
+ }
+ else
+ checkNoEvent(getInterceptorQueues(cache, key, false));
+
+ break;
+ }
+
+ case 7: {
+ cache.getAndPutIfAbsent(key, newVal);
+
+ if (tx != null)
+ tx.commit();
+
+ if (oldVal == null) {
+ updatePartitionCounter(cache, key, partCntr);
+
+ waitAndCheckEvent(cache, partCntr, affinity(cache), key, newVal, null, false);
+
+ expData.put(key, newVal);
+ }
+ else
+ checkNoEvent(getInterceptorQueues(cache, key, false));
+
+ break;
+ }
+
+ case 8: {
+ cache.replace(key, newVal);
+
+ if (tx != null)
+ tx.commit();
+
+ if (oldVal != null) {
+ updatePartitionCounter(cache, key, partCntr);
+
+ waitAndCheckEvent(cache, partCntr, affinity(cache), key, newVal, oldVal, false);
+
+ expData.put(key, newVal);
+ }
+ else
+ checkNoEvent(getInterceptorQueues(cache, key, false));
+
+ break;
+ }
+
+ case 9: {
+ cache.getAndReplace(key, newVal);
+
+ if (tx != null)
+ tx.commit();
+
+ if (oldVal != null) {
+ updatePartitionCounter(cache, key, partCntr);
+
+ waitAndCheckEvent(cache, partCntr, affinity(cache), key, newVal, oldVal, false);
+
+ expData.put(key, newVal);
+ }
+ else
+ checkNoEvent(getInterceptorQueues(cache, key, false));
+
+ break;
+ }
+
+ case 10: {
+ if (oldVal != null) {
+ Object replaceVal = value(rnd);
+
+ boolean success = replaceVal.equals(oldVal);
+
+ if (success) {
+ cache.replace(key, replaceVal, newVal);
+
+ if (tx != null)
+ tx.commit();
+
+ updatePartitionCounter(cache, key, partCntr);
+
+ waitAndCheckEvent(cache, partCntr, affinity(cache), key, newVal, oldVal, false);
+
+ expData.put(key, newVal);
+ }
+ else {
+ cache.replace(key, replaceVal, newVal);
+
+ if (tx != null)
+ tx.commit();
+
+ checkNoEvent(getInterceptorQueues(cache, key, false));
+ }
+ }
+ else {
+ cache.replace(key, value(rnd), newVal);
+
+ if (tx != null)
+ tx.commit();
+
+ checkNoEvent(getInterceptorQueues(cache, key, false));
+ }
+
+ break;
+ }
+
+ default:
+ fail("Op:" + op);
+ }
+ }
+ finally {
+ if (tx != null)
+ tx.close();
+ }
+ }
+
+ /**
+ * @param cache Cache.
+ * @param key Key.
+ * @param rmv Remove operation.
+ * @return Queues.
+ */
+ @NotNull private List<BlockingQueue<Cache.Entry<QueryTestKey, QueryTestValue>>> getInterceptorQueues(
+ IgniteCache<Object, Object> cache,
+ Object key,
+ boolean rmv
+ ) {
+ Collection<ClusterNode> nodes =
+ Collections.singletonList(affinity(cache).mapKeyToPrimaryAndBackups(key).iterator().next());
+
+ List<BlockingQueue<Cache.Entry<QueryTestKey, QueryTestValue>>> queues = new ArrayList<>();
+
+ for (ClusterNode node : nodes)
+ queues.add(rmv ? afterRmvEvts.get(node.id()) : afterPutEvts.get(node.id()));
+
+ return queues;
+ }
+
+ /**
+ * @param rnd {@link Random}.
+ * @return {@link TransactionIsolation}.
+ */
+ private TransactionIsolation txRandomIsolation(Random rnd) {
+ int val = rnd.nextInt(3);
+
+ if (val == 0)
+ return READ_COMMITTED;
+ else if (val == 1)
+ return REPEATABLE_READ;
+ else
+ return SERIALIZABLE;
+ }
+
+ /**
+ * @param rnd {@link Random}.
+ * @return {@link TransactionConcurrency}.
+ */
+ private TransactionConcurrency txRandomConcurrency(Random rnd) {
+ return rnd.nextBoolean() ? TransactionConcurrency.OPTIMISTIC : TransactionConcurrency.PESSIMISTIC;
+ }
+
+ /**
+ * @param cache Cache.
+ * @param key Key
+ * @param cntrs Partition counters.
+ */
+ private void updatePartitionCounter(IgniteCache<Object, Object> cache, Object key, Map<Integer, Long> cntrs) {
+ Affinity<Object> aff = cache.unwrap(Ignite.class).affinity(cache.getName());
+
+ int part = aff.partition(key);
+
+ Long partCntr = cntrs.get(part);
+
+ if (partCntr == null)
+ partCntr = 0L;
+
+ cntrs.put(part, ++partCntr);
+ }
+
+ /**
+ * @param rnd Random generator.
+ * @return Cache value.
+ */
+ private static Object value(Random rnd) {
+ return new QueryTestValue(rnd.nextInt(VALS));
+ }
+
+ /**
+ * @param cache Ignite cache.
+ * @param partCntrs Partition counters.
+ * @param aff Affinity function.
+ * @param key Key.
+ * @param val Value.
+ * @param oldVal Old value.
+ * @param rmv Remove operation.
+ * @throws Exception If failed.
+ */
+ private void waitAndCheckEvent(IgniteCache cache,
+ Map<Integer, Long> partCntrs,
+ Affinity<Object> aff,
+ Object key,
+ Object val,
+ Object oldVal,
+ boolean rmv)
+ throws Exception {
+ Collection<BlockingQueue<Cache.Entry<QueryTestKey, QueryTestValue>>> entries = getInterceptorQueues(cache, key,
+ rmv);
+
+ if (val == null && oldVal == null) {
+ checkNoEvent(entries);
+
+ return;
+ }
+
+ for (BlockingQueue<Cache.Entry<QueryTestKey, QueryTestValue>> evtsQueue : entries) {
+ Cache.Entry<QueryTestKey, QueryTestValue> entry = evtsQueue.poll(5, SECONDS);
+
+ assertNotNull("Failed to wait for event [key=" + key + ", val=" + val + ", oldVal=" + oldVal + ']', entry);
+ assertEquals(key, entry.getKey());
+ assertEquals(rmv ? oldVal : val, entry.getValue());
+
+ long cntr = partCntrs.get(aff.partition(key));
+ CacheInterceptorEntry qryEntryEvt = entry.unwrap(CacheInterceptorEntry.class);
+
+ assertNotNull(cntr);
+ assertNotNull(qryEntryEvt);
+
+ assertEquals(cntr, qryEntryEvt.getPartitionUpdateCounter());
+ }
+ }
+
+ /**
+ * @param evtsQueues Event queue.
+ * @throws Exception If failed.
+ */
+ private void checkNoEvent(Collection<BlockingQueue<Cache.Entry<QueryTestKey, QueryTestValue>>> evtsQueues)
+ throws Exception {
+ for (BlockingQueue<Cache.Entry<QueryTestKey, QueryTestValue>> evtsQueue : evtsQueues) {
+ Cache.Entry<QueryTestKey, QueryTestValue> evt = evtsQueue.poll(50, MILLISECONDS);
+
+ assertTrue(evt == null || evt.getValue() == null);
+ }
+ }
+
+ /**
+ *
+ * @param cacheMode Cache mode.
+ * @param backups Number of backups.
+ * @param atomicityMode Cache atomicity mode.
+ * @param memoryMode Cache memory mode.
+ * @param store If {@code true} configures dummy cache store.
+ * @return Cache configuration.
+ */
+ protected CacheConfiguration<Object, Object> cacheConfiguration(
+ CacheMode cacheMode,
+ int backups,
+ CacheAtomicityMode atomicityMode,
+ CacheMemoryMode memoryMode,
+ boolean store) {
+ CacheConfiguration<QueryTestKey, QueryTestValue> ccfg = new CacheConfiguration<>();
+
+ ccfg.setAtomicityMode(atomicityMode);
+ ccfg.setCacheMode(cacheMode);
+ ccfg.setMemoryMode(memoryMode);
+ ccfg.setWriteSynchronizationMode(FULL_SYNC);
+ ccfg.setAtomicWriteOrderMode(PRIMARY);
+
+ if (cacheMode == PARTITIONED)
+ ccfg.setBackups(backups);
+
+ if (store) {
+ ccfg.setCacheStoreFactory(new TestStoreFactory());
+ ccfg.setReadThrough(true);
+ ccfg.setWriteThrough(true);
+ }
+
+ ccfg.setInterceptor(new TestInterceptor());
+
+ return (CacheConfiguration)ccfg;
+ }
+
+ /**
+ *
+ */
+ private static class TestStoreFactory implements Factory<CacheStore<Object, Object>> {
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override public CacheStore<Object, Object> create() {
+ return new CacheStoreAdapter() {
+ @Override public Object load(Object key) throws CacheLoaderException {
+ return null;
+ }
+
+ @Override public void write(Cache.Entry entry) throws CacheWriterException {
+ // No-op.
+ }
+
+ @Override public void delete(Object key) throws CacheWriterException {
+ // No-op.
+ }
+ };
+ }
+ }
+
+ /**
+ *
+ */
+ public static class QueryTestKey implements Serializable, Comparable {
+ /** */
+ private final Integer key;
+
+ /**
+ * @param key Key.
+ */
+ public QueryTestKey(Integer key) {
+ this.key = key;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ QueryTestKey that = (QueryTestKey)o;
+
+ return key.equals(that.key);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return key.hashCode();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(QueryTestKey.class, this);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int compareTo(Object o) {
+ return key - ((QueryTestKey)o).key;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class TestInterceptor extends CacheInterceptorAdapter<QueryTestKey, QueryTestValue> {
+ /** {@inheritDoc} */
+ @Override public void onAfterPut(Cache.Entry<QueryTestKey, QueryTestValue> e) {
+ e.getKey();
+ e.getValue();
+
+ UUID id = e.unwrap(Ignite.class).cluster().localNode().id();
+
+ BlockingQueue<Cache.Entry<QueryTestKey, QueryTestValue>> ents = afterPutEvts.get(id);
+
+ if (ents == null) {
+ ents = new BlockingArrayQueue<>();
+
+ BlockingQueue<Cache.Entry<QueryTestKey, QueryTestValue>> oldVal = afterPutEvts.putIfAbsent(id, ents);
+
+ ents = oldVal == null ? ents : oldVal;
+ }
+
+ ents.add(e);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onAfterRemove(Cache.Entry<QueryTestKey, QueryTestValue> e) {
+ e.getKey();
+ e.getValue();
+
+ UUID id = e.unwrap(Ignite.class).cluster().localNode().id();
+
+ BlockingQueue<Cache.Entry<QueryTestKey, QueryTestValue>> ents = afterRmvEvts.get(id);
+
+ if (ents == null) {
+ ents = new BlockingArrayQueue<>();
+
+ BlockingQueue<Cache.Entry<QueryTestKey, QueryTestValue>> oldVal = afterRmvEvts.putIfAbsent(id, ents);
+
+ ents = oldVal == null ? ents : oldVal;
+ }
+
+ ents.add(e);
+ }
+ }
+
+ /**
+ *
+ */
+ public static class QueryTestValue implements Serializable {
+ /** */
+ @GridToStringInclude
+ protected final Integer val1;
+
+ /** */
+ @GridToStringInclude
+ protected final String val2;
+
+ /**
+ * @param val Value.
+ */
+ public QueryTestValue(Integer val) {
+ this.val1 = val;
+ this.val2 = String.valueOf(val);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ QueryTestValue that = (QueryTestValue) o;
+
+ return val1.equals(that.val1) && val2.equals(that.val2);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ int res = val1.hashCode();
+
+ res = 31 * res + val2.hashCode();
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(QueryTestValue.class, this);
+ }
+ }
+
+ /**
+ *
+ */
+ protected static class EntrySetValueProcessor implements EntryProcessor<Object, Object, Object> {
+ /** */
+ private Object val;
+
+ /** */
+ private boolean retOld;
+
+ /**
+ * @param val Value to set.
+ * @param retOld Return old value flag.
+ */
+ public EntrySetValueProcessor(Object val, boolean retOld) {
+ this.val = val;
+ this.retOld = retOld;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object process(MutableEntry<Object, Object> e, Object... args) {
+ Object old = retOld ? e.getValue() : null;
+
+ if (val != null)
+ e.setValue(val);
+ else
+ e.remove();
+
+ return old;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(EntrySetValueProcessor.class, this);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8a9a048c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInterceptorSelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInterceptorSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInterceptorSelfTestSuite.java
index 9b219b7..745fb90 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInterceptorSelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInterceptorSelfTestSuite.java
@@ -55,6 +55,7 @@ public class IgniteCacheInterceptorSelfTestSuite extends TestSuite {
suite.addTestSuite(GridCacheOnCopyFlagReplicatedSelfTest.class);
suite.addTestSuite(GridCacheOnCopyFlagLocalSelfTest.class);
suite.addTestSuite(GridCacheOnCopyFlagAtomicSelfTest.class);
+ suite.addTestSuite(CacheInterceptorPartitionCounterRandomOperationsTest.class);
return suite;
}