You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vk...@apache.org on 2016/02/12 00:28:04 UTC
[29/43] ignite git commit: ignite-2587 Fixed continuous query
notifications in offheap mode and BinaryObjectOffheapImpl usage.
http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
index 27edb0c..e6bfd87 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
@@ -21,6 +21,7 @@ import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -56,6 +57,7 @@ import javax.cache.processor.MutableEntry;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+import org.apache.ignite.cache.CacheMemoryMode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
@@ -70,6 +72,7 @@ import static javax.cache.event.EventType.CREATED;
import static javax.cache.event.EventType.EXPIRED;
import static javax.cache.event.EventType.REMOVED;
import static javax.cache.event.EventType.UPDATED;
+import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED;
import static org.apache.ignite.cache.CacheMode.LOCAL;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheMode.REPLICATED;
@@ -79,7 +82,7 @@ import static org.apache.ignite.cache.CacheMode.REPLICATED;
*/
public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAbstractTest {
/** */
- private static volatile List<CacheEntryEvent<? extends Integer, ? extends Integer>> evts;
+ private static volatile List<CacheEntryEvent<?, ?>> evts;
/** */
private static volatile CountDownLatch evtsLatch;
@@ -91,7 +94,10 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
private Integer lastKey = 0;
/** */
- private CacheEntryListenerConfiguration<Integer, Integer> lsnrCfg;
+ private CacheEntryListenerConfiguration<Object, Object> lsnrCfg;
+
+ /** */
+ private boolean useObjects;
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@@ -103,9 +109,18 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
cfg.setEagerTtl(eagerTtl());
+ cfg.setMemoryMode(memoryMode());
+
return cfg;
}
+ /**
+ * @return Cache memory mode.
+ */
+ protected CacheMemoryMode memoryMode() {
+ return ONHEAP_TIERED;
+ }
+
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
super.afterTest();
@@ -129,9 +144,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
* @throws Exception If failed.
*/
public void testExceptionIgnored() throws Exception {
- CacheEntryListenerConfiguration<Integer, Integer> lsnrCfg = new MutableCacheEntryListenerConfiguration<>(
- new Factory<CacheEntryListener<Integer, Integer>>() {
- @Override public CacheEntryListener<Integer, Integer> create() {
+ CacheEntryListenerConfiguration<Object, Object> lsnrCfg = new MutableCacheEntryListenerConfiguration<>(
+ new Factory<CacheEntryListener<Object, Object>>() {
+ @Override public CacheEntryListener<Object, Object> create() {
return new ExceptionListener();
}
},
@@ -140,7 +155,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
false
);
- IgniteCache<Integer, Integer> cache = jcache();
+ IgniteCache<Object, Object> cache = jcache();
cache.registerCacheEntryListener(lsnrCfg);
@@ -158,13 +173,13 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
}
lsnrCfg = new MutableCacheEntryListenerConfiguration<>(
- new Factory<CacheEntryListener<Integer, Integer>>() {
- @Override public CacheEntryListener<Integer, Integer> create() {
+ new Factory<CacheEntryListener<Object, Object>>() {
+ @Override public CacheEntryListener<Object, Object> create() {
return new CreateUpdateRemoveExpireListener();
}
},
- new Factory<CacheEntryEventSerializableFilter<? super Integer, ? super Integer>>() {
- @Override public CacheEntryEventSerializableFilter<? super Integer, ? super Integer> create() {
+ new Factory<CacheEntryEventSerializableFilter<Object, Object>>() {
+ @Override public CacheEntryEventSerializableFilter<Object, Object> create() {
return new ExceptionFilter();
}
},
@@ -192,9 +207,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
* @throws Exception If failed.
*/
public void testNoOldValue() throws Exception {
- CacheEntryListenerConfiguration<Integer, Integer> lsnrCfg = new MutableCacheEntryListenerConfiguration<>(
- new Factory<CacheEntryListener<Integer, Integer>>() {
- @Override public CacheEntryListener<Integer, Integer> create() {
+ CacheEntryListenerConfiguration<Object, Object> lsnrCfg = new MutableCacheEntryListenerConfiguration<>(
+ new Factory<CacheEntryListener<Object, Object>>() {
+ @Override public CacheEntryListener<Object, Object> create() {
return new CreateUpdateRemoveExpireListener();
}
},
@@ -203,7 +218,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
true
);
- IgniteCache<Integer, Integer> cache = jcache();
+ IgniteCache<Object, Object> cache = jcache();
try {
for (Integer key : keys()) {
@@ -222,21 +237,30 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
/**
* @throws Exception If failed.
*/
+ public void testSynchronousEventsObjectKeyValue() throws Exception {
+ useObjects = true;
+
+ testSynchronousEvents();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testSynchronousEvents() throws Exception {
- final CacheEntryCreatedListener<Integer, Integer> lsnr = new CreateUpdateRemoveExpireListener() {
- @Override public void onRemoved(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
+ final CacheEntryCreatedListener<Object, Object> lsnr = new CreateUpdateRemoveExpireListener() {
+ @Override public void onRemoved(Iterable<CacheEntryEvent<?, ?>> evts) {
super.onRemoved(evts);
awaitLatch();
}
- @Override public void onCreated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
+ @Override public void onCreated(Iterable<CacheEntryEvent<?, ?>> evts) {
super.onCreated(evts);
awaitLatch();
}
- @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
+ @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) {
super.onUpdated(evts);
awaitLatch();
@@ -252,9 +276,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
}
};
- CacheEntryListenerConfiguration<Integer, Integer> lsnrCfg = new MutableCacheEntryListenerConfiguration<>(
- new Factory<CacheEntryListener<Integer, Integer>>() {
- @Override public CacheEntryListener<Integer, Integer> create() {
+ CacheEntryListenerConfiguration<Object, Object> lsnrCfg = new MutableCacheEntryListenerConfiguration<>(
+ new Factory<CacheEntryListener<Object, Object>>() {
+ @Override public CacheEntryListener<Object, Object> create() {
return lsnr;
}
},
@@ -263,7 +287,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
true
);
- IgniteCache<Integer, Integer> cache = jcache();
+ IgniteCache<Object, Object> cache = jcache();
cache.registerCacheEntryListener(lsnrCfg);
@@ -299,7 +323,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
if (!eagerTtl()) {
U.sleep(1100);
- assertNull(primaryCache(key, cache.getName()).get(key));
+ assertNull(primaryCache(key, cache.getName()).get(key(key)));
evtsLatch.await(5000, MILLISECONDS);
@@ -378,13 +402,13 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
final CyclicBarrier barrier = new CyclicBarrier(THREADS);
- final IgniteCache<Integer, Integer> cache = jcache(0);
+ final IgniteCache<Object, Object> cache = jcache(0);
GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
@Override public Void call() throws Exception {
- CacheEntryListenerConfiguration<Integer, Integer> cfg = new MutableCacheEntryListenerConfiguration<>(
- new Factory<CacheEntryListener<Integer, Integer>>() {
- @Override public CacheEntryListener<Integer, Integer> create() {
+ CacheEntryListenerConfiguration<Object, Object> cfg = new MutableCacheEntryListenerConfiguration<>(
+ new Factory<CacheEntryListener<Object, Object>>() {
+ @Override public CacheEntryListener<Object, Object> create() {
return new CreateUpdateRemoveExpireListener();
}
},
@@ -441,9 +465,13 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
* @param expEvts Expected events number.
* @throws Exception If failed.
*/
- private void syncEvent(Integer key, Integer val, IgniteCache<Integer, Integer> cache, int expEvts)
+ private void syncEvent(
+ Integer key,
+ Integer val,
+ IgniteCache<Object, Object> cache,
+ int expEvts)
throws Exception {
- evts = Collections.synchronizedList(new ArrayList<CacheEntryEvent<? extends Integer, ? extends Integer>>());
+ evts = Collections.synchronizedList(new ArrayList<CacheEntryEvent<?, ?>>());
evtsLatch = new CountDownLatch(expEvts);
@@ -466,9 +494,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
});
if (val != null)
- cache.put(key, val);
+ cache.put(key(key), value(val));
else
- cache.remove(key);
+ cache.remove(key(key));
done.set(true);
@@ -480,15 +508,45 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
}
/**
+ * @param key Integer key.
+ * @return Key instance.
+ */
+ private Object key(Integer key) {
+ assert key != null;
+
+ return useObjects ? new ListenerTestKey(key) : key;
+ }
+
+ /**
+ * @param val Integer value.
+ * @return Value instance.
+ */
+ private Object value(Integer val) {
+ if (val == null)
+ return null;
+
+ return useObjects ? new ListenerTestValue(val) : val;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testEventsObjectKeyValue() throws Exception {
+ useObjects = true;
+
+ testEvents();
+ }
+
+ /**
* @throws Exception If failed.
*/
public void testEvents() throws Exception {
- IgniteCache<Integer, Integer> cache = jcache();
+ IgniteCache<Object, Object> cache = jcache();
- Map<Integer, Integer> vals = new HashMap<>();
+ Map<Object, Object> vals = new HashMap<>();
for (int i = 0; i < 100; i++)
- vals.put(i + 2_000_000, i);
+ vals.put(key(i + 2_000_000), value(i));
cache.putAll(vals); // Put some data in cache to make sure events are not generated for existing entries.
@@ -518,7 +576,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
checkEvents(cache, new CreateUpdateRemoveExpireListenerFactory(), key, true, true, true, true);
}
- CacheEntryListenerConfiguration<Integer, Integer> lsnrCfg = new MutableCacheEntryListenerConfiguration<>(
+ CacheEntryListenerConfiguration<Object, Object> lsnrCfg = new MutableCacheEntryListenerConfiguration<>(
new CreateUpdateRemoveExpireListenerFactory(),
new TestFilterFactory(),
true,
@@ -551,7 +609,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
* @throws Exception If failed.
*/
@SuppressWarnings("unchecked")
- private void checkListenerOnStart(Map<Integer, Integer> vals) throws Exception {
+ private void checkListenerOnStart(Map<Object, Object> vals) throws Exception {
lsnrCfg = new MutableCacheEntryListenerConfiguration<>(
new CreateUpdateRemoveExpireListenerFactory(),
null,
@@ -564,7 +622,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
try {
awaitPartitionMapExchange();
- IgniteCache<Integer, Integer> cache = grid.cache(null);
+ IgniteCache<Object, Object> cache = grid.cache(null);
Integer key = Integer.MAX_VALUE;
@@ -588,7 +646,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
try {
awaitPartitionMapExchange();
- IgniteCache<Integer, Integer> cache = grid.cache(null);
+ IgniteCache<Object, Object> cache = grid.cache(null);
log.info("Check filter for listener in configuration.");
@@ -613,14 +671,14 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
* @throws Exception If failed.
*/
private void checkEvents(
- final IgniteCache<Integer, Integer> cache,
- final Factory<CacheEntryListener<Integer, Integer>> lsnrFactory,
+ final IgniteCache<Object, Object> cache,
+ final Factory<CacheEntryListener<Object, Object>> lsnrFactory,
Integer key,
boolean create,
boolean update,
boolean rmv,
boolean expire) throws Exception {
- CacheEntryListenerConfiguration<Integer, Integer> lsnrCfg = new MutableCacheEntryListenerConfiguration<>(
+ CacheEntryListenerConfiguration<Object, Object> lsnrCfg = new MutableCacheEntryListenerConfiguration<>(
lsnrFactory,
null,
true,
@@ -642,8 +700,8 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
* @param vals Values in cache.
* @throws Exception If failed.
*/
- private void checkFilter(final IgniteCache<Integer, Integer> cache, Map<Integer, Integer> vals) throws Exception {
- evts = Collections.synchronizedList(new ArrayList<CacheEntryEvent<? extends Integer, ? extends Integer>>());
+ private void checkFilter(final IgniteCache<Object, Object> cache, Map<Object, Object> vals) throws Exception {
+ evts = Collections.synchronizedList(new ArrayList<CacheEntryEvent<?, ?>>());
final int expEvts = (vals.size() / 2) * 4; // Remove, create, update and expire for half of modified entries.
@@ -653,16 +711,18 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
cache.putAll(vals);
- final Map<Integer, Integer> newVals = new HashMap<>();
+ final Map<Object, Object> newVals = new HashMap<>();
- for (Integer key : vals.keySet())
- newVals.put(key, -1);
+ for (Object key : vals.keySet())
+ newVals.put(key, value(-1));
cache.withExpiryPolicy(new ModifiedExpiryPolicy(new Duration(MILLISECONDS, 500))).putAll(newVals);
+ U.sleep(1000);
+
GridTestUtils.waitForCondition(new GridAbsPredicate() {
@Override public boolean apply() {
- for (Integer key : newVals.keySet()) {
+ for (Object key : newVals.keySet()) {
if (primaryCache(key, cache.getName()).get(key) != null)
return false;
}
@@ -675,13 +735,20 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
assertEquals(expEvts, evts.size());
- Set<Integer> rmvd = new HashSet<>();
- Set<Integer> created = new HashSet<>();
- Set<Integer> updated = new HashSet<>();
- Set<Integer> expired = new HashSet<>();
+ Set<Object> rmvd = new HashSet<>();
+ Set<Object> created = new HashSet<>();
+ Set<Object> updated = new HashSet<>();
+ Set<Object> expired = new HashSet<>();
+
+ for (CacheEntryEvent<?, ?> evt : evts) {
+ Integer key;
+
+ if (useObjects)
+ key = ((ListenerTestKey)evt.getKey()).key;
+ else
+ key = (Integer)evt.getKey();
- for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts) {
- assertTrue(evt.getKey() % 2 == 0);
+ assertTrue(key % 2 == 0);
assertTrue(vals.keySet().contains(evt.getKey()));
@@ -707,7 +774,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
break;
case UPDATED:
- assertEquals(-1, (int)evt.getValue());
+ assertEquals(value(-1), evt.getValue());
assertEquals(vals.get(evt.getKey()), evt.getOldValue());
@@ -722,7 +789,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
case EXPIRED:
assertNull(evt.getValue());
- assertEquals(-1, (int)evt.getOldValue());
+ assertEquals(value(-1), evt.getOldValue());
assertTrue(rmvd.contains(evt.getKey()));
@@ -757,8 +824,8 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
* @throws Exception If failed.
*/
private void checkEvents(
- final IgniteCache<Integer, Integer> cache,
- final CacheEntryListenerConfiguration<Integer, Integer> lsnrCfg,
+ final IgniteCache<Object, Object> cache,
+ final CacheEntryListenerConfiguration<Object, Object> lsnrCfg,
Integer key,
boolean create,
boolean update,
@@ -789,64 +856,64 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
if (expire)
expEvts += 2;
- evts = Collections.synchronizedList(new ArrayList<CacheEntryEvent<? extends Integer, ? extends Integer>>());
+ evts = Collections.synchronizedList(new ArrayList<CacheEntryEvent<?, ?>>());
evtsLatch = new CountDownLatch(expEvts);
- cache.put(key, 0);
+ cache.put(key(key), value(0));
for (int i = 0; i < UPDATES; i++) {
if (i % 2 == 0)
- cache.put(key, i + 1);
+ cache.put(key(key), value(i + 1));
else
- cache.invoke(key, new EntrySetValueProcessor(i + 1));
+ cache.invoke(key(key), new EntrySetValueProcessor(value(i + 1)));
}
// Invoke processor does not update value, should not trigger event.
- assertEquals(String.valueOf(UPDATES), cache.invoke(key, new EntryToStringProcessor()));
+ assertEquals(String.valueOf(UPDATES), cache.invoke(key(key), new EntryToStringProcessor()));
- assertFalse(cache.putIfAbsent(key, -1));
+ assertFalse(cache.putIfAbsent(key(key), value(-1)));
- assertFalse(cache.remove(key, -1));
+ assertFalse(cache.remove(key(key), value(-1)));
- assertTrue(cache.remove(key));
+ assertTrue(cache.remove(key(key)));
- IgniteCache<Integer, Integer> expirePlcCache =
+ IgniteCache<Object, Object> expirePlcCache =
cache.withExpiryPolicy(new CreatedExpiryPolicy(new Duration(MILLISECONDS, 100)));
- expirePlcCache.put(key, 10);
+ expirePlcCache.put(key(key), value(10));
U.sleep(700);
if (!eagerTtl())
- assertNull(primaryCache(key, cache.getName()).get(key)); // Provoke expire event if eager ttl is disabled.
+ assertNull(primaryCache(key, cache.getName()).get(key(key))); // Provoke expire event if eager ttl is disabled.
- IgniteCache<Integer, Integer> cache1 = cache;
+ IgniteCache<Object, Object> cache1 = cache;
if (gridCount() > 1)
cache1 = jcache(1); // Do updates from another node.
- cache1.put(key, 1);
+ cache1.put(key(key), value(1));
- cache1.put(key, 2);
+ cache1.put(key(key), value(2));
- assertTrue(cache1.remove(key));
+ assertTrue(cache1.remove(key(key)));
- IgniteCache<Integer, Integer> expirePlcCache1 =
+ IgniteCache<Object, Object> expirePlcCache1 =
cache1.withExpiryPolicy(new CreatedExpiryPolicy(new Duration(MILLISECONDS, 100)));
- expirePlcCache1.put(key, 20);
+ expirePlcCache1.put(key(key), value(20));
U.sleep(200);
if (!eagerTtl())
- assertNull(primaryCache(key, cache.getName()).get(key)); // Provoke expire event if eager ttl is disabled.
+ assertNull(primaryCache(key, cache.getName()).get(key(key))); // Provoke expire event if eager ttl is disabled.
evtsLatch.await(5000, MILLISECONDS);
assertEquals(expEvts, evts.size());
- Iterator<CacheEntryEvent<? extends Integer, ? extends Integer>> iter = evts.iterator();
+ Iterator<CacheEntryEvent<?, ?>> iter = evts.iterator();
if (create)
checkEvent(iter, key, CREATED, 0, null);
@@ -886,11 +953,11 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
cache.deregisterCacheEntryListener(lsnrCfg);
- cache.put(key, 1);
+ cache.put(key(key), value(1));
- cache.put(key, 2);
+ cache.put(key(key), value(2));
- assertTrue(cache.remove(key));
+ assertTrue(cache.remove(key(key)));
U.sleep(500); // Sleep some time to ensure listener was really removed.
@@ -908,26 +975,26 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
* @param expVal Expected value.
* @param expOld Expected old value.
*/
- private void checkEvent(Iterator<CacheEntryEvent<? extends Integer, ? extends Integer>> iter,
+ private void checkEvent(Iterator<CacheEntryEvent<?, ?>> iter,
Integer expKey,
EventType expType,
@Nullable Integer expVal,
@Nullable Integer expOld) {
assertTrue(iter.hasNext());
- CacheEntryEvent<? extends Integer, ? extends Integer> evt = iter.next();
+ CacheEntryEvent<?, ?> evt = iter.next();
iter.remove();
assertTrue(evt.getSource() instanceof IgniteCacheProxy);
- assertEquals(expKey, evt.getKey());
+ assertEquals(key(expKey), evt.getKey());
assertEquals(expType, evt.getEventType());
- assertEquals(expVal, evt.getValue());
+ assertEquals(value(expVal), evt.getValue());
- assertEquals(expOld, evt.getOldValue());
+ assertEquals(value(expOld), evt.getOldValue());
if (expOld == null)
assertFalse(evt.isOldValueAvailable());
@@ -977,7 +1044,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
/**
* @param evt Event.
*/
- private static void onEvent(CacheEntryEvent<? extends Integer, ? extends Integer> evt) {
+ private static void onEvent(CacheEntryEvent<?, ?> evt) {
// System.out.println("Received event [evt=" + evt + ", thread=" + Thread.currentThread().getName() + ']');
assertNotNull(evt);
@@ -993,9 +1060,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
/**
*
*/
- private static class CreateUpdateRemoveExpireListenerFactory implements Factory<CacheEntryListener<Integer, Integer>> {
+ private static class CreateUpdateRemoveExpireListenerFactory implements Factory<CacheEntryListener<Object, Object>> {
/** {@inheritDoc} */
- @Override public CacheEntryListener<Integer, Integer> create() {
+ @Override public CacheEntryListener<Object, Object> create() {
return new CreateUpdateRemoveExpireListener();
}
}
@@ -1003,9 +1070,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
/**
*
*/
- private static class NoOpCreateUpdateListenerFactory implements Factory<CacheEntryListener<Integer, Integer>> {
+ private static class NoOpCreateUpdateListenerFactory implements Factory<CacheEntryListener<Object, Object>> {
/** {@inheritDoc} */
- @Override public CacheEntryListener<Integer, Integer> create() {
+ @Override public CacheEntryListener<Object, Object> create() {
return new NoOpCreateUpdateListener();
}
}
@@ -1013,9 +1080,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
/**
*
*/
- private static class CreateUpdateListenerFactory implements Factory<CacheEntryListener<Integer, Integer>> {
+ private static class CreateUpdateListenerFactory implements Factory<CacheEntryListener<Object, Object>> {
/** {@inheritDoc} */
- @Override public CacheEntryListener<Integer, Integer> create() {
+ @Override public CacheEntryListener<Object, Object> create() {
return new CreateUpdateListener();
}
}
@@ -1023,9 +1090,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
/**
*
*/
- private static class CreateListenerFactory implements Factory<CacheEntryListener<Integer, Integer>> {
+ private static class CreateListenerFactory implements Factory<CacheEntryListener<Object, Object>> {
/** {@inheritDoc} */
- @Override public CacheEntryListener<Integer, Integer> create() {
+ @Override public CacheEntryListener<Object, Object> create() {
return new CreateListener();
}
}
@@ -1033,9 +1100,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
/**
*
*/
- private static class RemoveListenerFactory implements Factory<CacheEntryListener<Integer, Integer>> {
+ private static class RemoveListenerFactory implements Factory<CacheEntryListener<Object, Object>> {
/** {@inheritDoc} */
- @Override public CacheEntryListener<Integer, Integer> create() {
+ @Override public CacheEntryListener<Object, Object> create() {
return new RemoveListener();
}
}
@@ -1043,9 +1110,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
/**
*
*/
- private static class UpdateListenerFactory implements Factory<CacheEntryListener<Integer, Integer>> {
+ private static class UpdateListenerFactory implements Factory<CacheEntryListener<Object, Object>> {
/** {@inheritDoc} */
- @Override public CacheEntryListener<Integer, Integer> create() {
+ @Override public CacheEntryListener<Object, Object> create() {
return new UpdateListener();
}
}
@@ -1053,9 +1120,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
/**
*
*/
- private static class ExpireListenerFactory implements Factory<CacheEntryListener<Integer, Integer>> {
+ private static class ExpireListenerFactory implements Factory<CacheEntryListener<Object, Object>> {
/** {@inheritDoc} */
- @Override public CacheEntryListener<Integer, Integer> create() {
+ @Override public CacheEntryListener<Object, Object> create() {
return new ExpireListener();
}
}
@@ -1063,9 +1130,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
/**
*
*/
- private static class TestFilterFactory implements Factory<CacheEntryEventSerializableFilter<Integer, Integer>> {
+ private static class TestFilterFactory implements Factory<CacheEntryEventSerializableFilter<Object, Object>> {
/** {@inheritDoc} */
- @Override public CacheEntryEventSerializableFilter<Integer, Integer> create() {
+ @Override public CacheEntryEventSerializableFilter<Object, Object> create() {
return new TestFilter();
}
}
@@ -1073,10 +1140,10 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
/**
*
*/
- private static class CreateListener implements CacheEntryCreatedListener<Integer, Integer> {
+ private static class CreateListener implements CacheEntryCreatedListener<Object, Object> {
/** {@inheritDoc} */
- @Override public void onCreated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
- for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts)
+ @Override public void onCreated(Iterable<CacheEntryEvent<?, ?>> evts) {
+ for (CacheEntryEvent<?, ?> evt : evts)
onEvent(evt);
}
}
@@ -1084,10 +1151,10 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
/**
*
*/
- private static class UpdateListener implements CacheEntryUpdatedListener<Integer, Integer> {
+ private static class UpdateListener implements CacheEntryUpdatedListener<Object, Object> {
/** {@inheritDoc} */
- @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
- for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts)
+ @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) {
+ for (CacheEntryEvent<?, ?> evt : evts)
onEvent(evt);
}
}
@@ -1095,10 +1162,10 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
/**
*
*/
- private static class RemoveListener implements CacheEntryRemovedListener<Integer, Integer> {
+ private static class RemoveListener implements CacheEntryRemovedListener<Object, Object> {
/** {@inheritDoc} */
- @Override public void onRemoved(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
- for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts)
+ @Override public void onRemoved(Iterable<CacheEntryEvent<?, ?>> evts) {
+ for (CacheEntryEvent<?, ?> evt : evts)
onEvent(evt);
}
}
@@ -1106,10 +1173,10 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
/**
*
*/
- private static class ExpireListener implements CacheEntryExpiredListener<Integer, Integer> {
+ private static class ExpireListener implements CacheEntryExpiredListener<Object, Object> {
/** {@inheritDoc} */
- @Override public void onExpired(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
- for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts)
+ @Override public void onExpired(Iterable<CacheEntryEvent<?, ?>> evts) {
+ for (CacheEntryEvent<?, ?> evt : evts)
onEvent(evt);
}
}
@@ -1117,32 +1184,39 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
/**
*
*/
- private static class TestFilter implements CacheEntryEventSerializableFilter<Integer, Integer> {
+ private static class TestFilter implements CacheEntryEventSerializableFilter<Object, Object> {
/** {@inheritDoc} */
- @Override public boolean evaluate(CacheEntryEvent<? extends Integer, ? extends Integer> evt) {
+ @Override public boolean evaluate(CacheEntryEvent<?, ?> evt) {
assert evt != null;
assert evt.getSource() != null : evt;
assert evt.getEventType() != null : evt;
assert evt.getKey() != null : evt;
- return evt.getKey() % 2 == 0;
+ Integer key;
+
+ if (evt.getKey() instanceof ListenerTestKey)
+ key = ((ListenerTestKey)evt.getKey()).key;
+ else
+ key = (Integer)evt.getKey();
+
+ return key % 2 == 0;
}
}
/**
*
*/
- private static class CreateUpdateListener implements CacheEntryCreatedListener<Integer, Integer>,
- CacheEntryUpdatedListener<Integer, Integer> {
+ private static class CreateUpdateListener implements CacheEntryCreatedListener<Object, Object>,
+ CacheEntryUpdatedListener<Object, Object> {
/** {@inheritDoc} */
- @Override public void onCreated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
- for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts)
+ @Override public void onCreated(Iterable<CacheEntryEvent<?, ?>> evts) {
+ for (CacheEntryEvent<?, ?> evt : evts)
onEvent(evt);
}
/** {@inheritDoc} */
- @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
- for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts)
+ @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) {
+ for (CacheEntryEvent<?, ?> evt : evts)
onEvent(evt);
}
}
@@ -1150,11 +1224,11 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
/**
*
*/
- private static class NoOpCreateUpdateListener implements CacheEntryCreatedListener<Integer, Integer>,
- CacheEntryUpdatedListener<Integer, Integer> {
+ private static class NoOpCreateUpdateListener implements CacheEntryCreatedListener<Object, Object>,
+ CacheEntryUpdatedListener<Object, Object> {
/** {@inheritDoc} */
- @Override public void onCreated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
- for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts) {
+ @Override public void onCreated(Iterable<CacheEntryEvent<?, ?>> evts) {
+ for (CacheEntryEvent<?, ?> evt : evts) {
assertNotNull(evt);
assertNotNull(evt.getSource());
assertNotNull(evt.getEventType());
@@ -1163,8 +1237,8 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
}
/** {@inheritDoc} */
- @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
- for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts) {
+ @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) {
+ for (CacheEntryEvent<?, ?> evt : evts) {
assertNotNull(evt);
assertNotNull(evt.getSource());
assertNotNull(evt.getEventType());
@@ -1177,16 +1251,16 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
*
*/
private static class CreateUpdateRemoveExpireListener extends CreateUpdateListener
- implements CacheEntryRemovedListener<Integer, Integer>, CacheEntryExpiredListener<Integer, Integer> {
+ implements CacheEntryRemovedListener<Object, Object>, CacheEntryExpiredListener<Object, Object> {
/** {@inheritDoc} */
- @Override public void onRemoved(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
- for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts)
+ @Override public void onRemoved(Iterable<CacheEntryEvent<?, ?>> evts) {
+ for (CacheEntryEvent<?, ?> evt : evts)
onEvent(evt);
}
/** {@inheritDoc} */
- @Override public void onExpired(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
- for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts)
+ @Override public void onExpired(Iterable<CacheEntryEvent<?, ?>> evts) {
+ for (CacheEntryEvent<?, ?> evt : evts)
onEvent(evt);
}
}
@@ -1194,9 +1268,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
/**
*
*/
- private static class ExceptionFilter implements CacheEntryEventSerializableFilter<Integer, Integer> {
+ private static class ExceptionFilter implements CacheEntryEventSerializableFilter<Object, Object> {
/** {@inheritDoc} */
- @Override public boolean evaluate(CacheEntryEvent<? extends Integer, ? extends Integer> evt) {
+ @Override public boolean evaluate(CacheEntryEvent<?, ?> evt) {
throw new RuntimeException("Test filter error.");
}
}
@@ -1205,24 +1279,24 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
*
*/
private static class ExceptionListener extends CreateUpdateListener
- implements CacheEntryRemovedListener<Integer, Integer>, CacheEntryExpiredListener<Integer, Integer> {
+ implements CacheEntryRemovedListener<Object, Object>, CacheEntryExpiredListener<Object, Object> {
/** {@inheritDoc} */
- @Override public void onCreated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
+ @Override public void onCreated(Iterable<CacheEntryEvent<?, ?>> evts) {
error();
}
/** {@inheritDoc} */
- @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
+ @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) {
error();
}
/** {@inheritDoc} */
- @Override public void onExpired(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
+ @Override public void onExpired(Iterable<CacheEntryEvent<?, ?>> evts) {
error();
}
/** {@inheritDoc} */
- @Override public void onRemoved(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
+ @Override public void onRemoved(Iterable<CacheEntryEvent<?, ?>> evts) {
error();
}
@@ -1237,10 +1311,12 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
/**
*
*/
- protected static class EntryToStringProcessor implements EntryProcessor<Integer, Integer, String> {
+ protected static class EntryToStringProcessor implements EntryProcessor<Object, Object, String> {
/** {@inheritDoc} */
- @Override public String process(MutableEntry<Integer, Integer> e, Object... arguments)
- throws EntryProcessorException {
+ @Override public String process(MutableEntry<Object, Object> e, Object... args) {
+ if (e.getValue() instanceof ListenerTestValue)
+ return String.valueOf(((ListenerTestValue)e.getValue()).val1);
+
return String.valueOf(e.getValue());
}
@@ -1253,19 +1329,19 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
/**
*
*/
- protected static class EntrySetValueProcessor implements EntryProcessor<Integer, Integer, String> {
+ protected static class EntrySetValueProcessor implements EntryProcessor<Object, Object, String> {
/** */
- private Integer val;
+ private Object val;
/**
* @param val Value to set.
*/
- public EntrySetValueProcessor(Integer val) {
+ public EntrySetValueProcessor(Object val) {
this.val = val;
}
/** {@inheritDoc} */
- @Override public String process(MutableEntry<Integer, Integer> e, Object... arguments)
+ @Override public String process(MutableEntry<Object, Object> e, Object... args)
throws EntryProcessorException {
e.setValue(val);
@@ -1307,4 +1383,88 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
// No-op.
}
}
+
+ /**
+ *
+ */
+ static class ListenerTestKey implements Serializable {
+ /** */
+ private final Integer key;
+
+ /**
+ * @param key Key.
+ */
+ public ListenerTestKey(Integer key) {
+ this.key = key;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ ListenerTestKey that = (ListenerTestKey)o;
+
+ return key.equals(that.key);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return key.hashCode();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(ListenerTestKey.class, this);
+ }
+ }
+
+ /**
+ *
+ */
+ static class ListenerTestValue implements Serializable {
+ /** */
+ private final Integer val1;
+
+ /** */
+ private final String val2;
+
+ /**
+ * @param val Value.
+ */
+ public ListenerTestValue(Integer val) {
+ this.val1 = val;
+ this.val2 = String.valueOf(val);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ ListenerTestValue that = (ListenerTestValue) o;
+
+ return val1.equals(that.val1) && val2.equals(that.val2);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ int res = val1.hashCode();
+
+ res = 31 * res + val2.hashCode();
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(ListenerTestValue.class, this);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAtomicOffheapTieredTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAtomicOffheapTieredTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAtomicOffheapTieredTest.java
new file mode 100644
index 0000000..69efb84
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAtomicOffheapTieredTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.cache.CacheMemoryMode;
+
+import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED;
+
+/**
+ *
+ */
+public class IgniteCacheEntryListenerAtomicOffheapTieredTest extends IgniteCacheEntryListenerAtomicTest {
+ /** {@inheritDoc} */
+ @Override protected CacheMemoryMode memoryMode() {
+ return OFFHEAP_TIERED;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAtomicOffheapValuesTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAtomicOffheapValuesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAtomicOffheapValuesTest.java
new file mode 100644
index 0000000..23b1bc0
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAtomicOffheapValuesTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.cache.CacheMemoryMode;
+
+import static org.apache.ignite.cache.CacheMemoryMode.*;
+
+/**
+ *
+ */
+public class IgniteCacheEntryListenerAtomicOffheapValuesTest extends IgniteCacheEntryListenerAtomicTest {
+ /** {@inheritDoc} */
+ @Override protected CacheMemoryMode memoryMode() {
+ return OFFHEAP_VALUES;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxOffheapTieredTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxOffheapTieredTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxOffheapTieredTest.java
new file mode 100644
index 0000000..d552195
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxOffheapTieredTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.cache.CacheMemoryMode;
+
+import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED;
+
+/**
+ *
+ */
+public class IgniteCacheEntryListenerTxOffheapTieredTest extends IgniteCacheEntryListenerTxTest {
+ /** {@inheritDoc} */
+ @Override protected CacheMemoryMode memoryMode() {
+ return OFFHEAP_TIERED;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxOffheapValuesTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxOffheapValuesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxOffheapValuesTest.java
new file mode 100644
index 0000000..32555c8
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxOffheapValuesTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.cache.CacheMemoryMode;
+
+import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_VALUES;
+
+/**
+ *
+ */
+public class IgniteCacheEntryListenerTxOffheapValuesTest extends IgniteCacheEntryListenerTxTest {
+ /** {@inheritDoc} */
+ @Override protected CacheMemoryMode memoryMode() {
+ return OFFHEAP_VALUES;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxTest.java
index a9e43d4..41725e7 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxTest.java
@@ -48,6 +48,7 @@ public class IgniteCacheEntryListenerTxTest extends IgniteCacheEntryListenerAbst
return null;
}
+ /** {@inheritDoc} */
@Override public void testEvents(){
fail("https://issues.apache.org/jira/browse/IGNITE-1600");
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
index 1c65f9b..a42f056 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
@@ -55,6 +55,7 @@ import org.apache.ignite.cache.CacheAtomicWriteOrderMode;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
import org.apache.ignite.cache.CacheEntryProcessor;
+import org.apache.ignite.cache.CacheMemoryMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.affinity.Affinity;
import org.apache.ignite.cache.query.ContinuousQuery;
@@ -97,6 +98,7 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
+import static org.apache.ignite.cache.CacheMemoryMode.*;
import static org.apache.ignite.cache.CacheMode.REPLICATED;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
@@ -142,6 +144,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
ccfg.setBackups(backups);
ccfg.setWriteSynchronizationMode(FULL_SYNC);
ccfg.setNearConfiguration(nearCacheConfiguration());
+ ccfg.setMemoryMode(memoryMode());
cfg.setCacheConfiguration(ccfg);
@@ -151,6 +154,13 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
}
/**
+ * @return Cache memory mode.
+ */
+ protected CacheMemoryMode memoryMode() {
+ return ONHEAP_TIERED;
+ }
+
+ /**
* @return Near cache configuration.
*/
protected NearCacheConfiguration nearCacheConfiguration() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicPrimaryWriteOrderOffheapTieredTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicPrimaryWriteOrderOffheapTieredTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicPrimaryWriteOrderOffheapTieredTest.java
new file mode 100644
index 0000000..cc8590d
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicPrimaryWriteOrderOffheapTieredTest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.cache.CacheMemoryMode;
+
+import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED;
+
+/**
+ *
+ */
+public class CacheContinuousQueryFailoverAtomicPrimaryWriteOrderOffheapTieredTest
+ extends CacheContinuousQueryFailoverAtomicPrimaryWriteOrderSelfTest {
+ /** {@inheritDoc} */
+ @Override protected CacheMemoryMode memoryMode() {
+ return OFFHEAP_TIERED;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverTxOffheapTieredTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverTxOffheapTieredTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverTxOffheapTieredTest.java
new file mode 100644
index 0000000..cae06c3
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverTxOffheapTieredTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.cache.CacheMemoryMode;
+
+import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED;
+
+/**
+ *
+ */
+public class CacheContinuousQueryFailoverTxOffheapTieredTest extends CacheContinuousQueryFailoverTxSelfTest {
+ /** {@inheritDoc} */
+ @Override protected CacheMemoryMode memoryMode() {
+ return OFFHEAP_TIERED;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
new file mode 100644
index 0000000..d9b2091
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
@@ -0,0 +1,684 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.io.Serializable;
+import java.util.Random;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import javax.cache.Cache;
+import javax.cache.configuration.Factory;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryUpdatedListener;
+import javax.cache.integration.CacheLoaderException;
+import javax.cache.integration.CacheWriterException;
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMemoryMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cache.store.CacheStore;
+import org.apache.ignite.cache.store.CacheStoreAdapter;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED;
+import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_VALUES;
+import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstractTest {
+ /** */
+ private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private static final int NODES = 5;
+
+ /** */
+ private static final int KEYS = 10;
+
+ /** */
+ private static final int VALS = 10;
+
+ /** */
+ private boolean client;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+ cfg.setClientMode(client);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ startGridsMultiThreaded(NODES - 1);
+
+ client = true;
+
+ startGrid(NODES - 1);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ stopAllGrids();
+
+ super.afterTestsStopped();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAtomic() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+ 1,
+ ATOMIC,
+ ONHEAP_TIERED,
+ false);
+
+ testContinuousQuery(ccfg);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAtomicReplicated() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED,
+ 0,
+ ATOMIC,
+ ONHEAP_TIERED,
+ false);
+
+ testContinuousQuery(ccfg);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAtomicOffheapValues() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+ 1,
+ ATOMIC,
+ OFFHEAP_VALUES,
+ false);
+
+ testContinuousQuery(ccfg);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAtomicOffheapTiered() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+ 1,
+ ATOMIC,
+ OFFHEAP_TIERED,
+ false);
+
+ testContinuousQuery(ccfg);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAtomicNoBackups() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+ 0,
+ ATOMIC,
+ ONHEAP_TIERED,
+ false);
+
+ testContinuousQuery(ccfg);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTx() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+ 1,
+ TRANSACTIONAL,
+ ONHEAP_TIERED,
+ false);
+
+ testContinuousQuery(ccfg);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxReplicated() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED,
+ 0,
+ TRANSACTIONAL,
+ ONHEAP_TIERED,
+ false);
+
+ testContinuousQuery(ccfg);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxOffheapValues() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+ 1,
+ TRANSACTIONAL,
+ OFFHEAP_VALUES,
+ false);
+
+ testContinuousQuery(ccfg);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxOffheapTiered() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+ 1,
+ TRANSACTIONAL,
+ OFFHEAP_TIERED,
+ false);
+
+ testContinuousQuery(ccfg);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxNoBackups() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+ 0,
+ TRANSACTIONAL,
+ ONHEAP_TIERED,
+ false);
+
+ testContinuousQuery(ccfg);
+ }
+
+ /**
+ * @param ccfg Cache configuration.
+ * @throws Exception If failed.
+ */
+ private void testContinuousQuery(CacheConfiguration<Object, Object> ccfg) throws Exception {
+ ignite(0).createCache(ccfg);
+
+ try {
+ IgniteCache<Object, Object> cache = ignite(NODES - 1).cache(ccfg.getName());
+
+ long seed = System.currentTimeMillis();
+
+ Random rnd = new Random(seed);
+
+ log.info("Random seed: " + seed);
+
+ ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+ final BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue =
+ new ArrayBlockingQueue<>(10_000);
+
+ qry.setLocalListener(new CacheEntryUpdatedListener<Object, Object>() {
+ @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) {
+ for (CacheEntryEvent<?, ?> evt : evts) {
+ // System.out.println("Event: " + evt);
+
+ evtsQueue.add(evt);
+ }
+ }
+ });
+
+ QueryCursor<?> cur = cache.query(qry);
+
+ ConcurrentMap<Object, Object> expData = new ConcurrentHashMap<>();
+
+ try {
+ for (int i = 0; i < 1000; i++) {
+ if (i % 100 == 0)
+ log.info("Iteration: " + i);
+
+ randomUpdate(rnd, evtsQueue, expData, cache);
+ }
+ }
+ finally {
+ cur.close();
+ }
+ }
+ finally {
+ ignite(0).destroyCache(ccfg.getName());
+ }
+ }
+
+ /**
+ * @param rnd Random generator.
+ * @param evtsQueue Events queue.
+ * @param expData Expected cache data.
+ * @param cache Cache.
+ * @throws Exception If failed.
+ */
+ private void randomUpdate(
+ Random rnd,
+ BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue,
+ ConcurrentMap<Object, Object> expData,
+ IgniteCache<Object, Object> cache)
+ throws Exception {
+ Object key = new QueryTestKey(rnd.nextInt(KEYS));
+ Object newVal = value(rnd);
+ Object oldVal = expData.get(key);
+
+ int op = rnd.nextInt(11);
+
+ // log.info("Random operation [key=" + key + ", op=" + op + ']');
+
+ switch (op) {
+ case 0: {
+ cache.put(key, newVal);
+
+ waitEvent(evtsQueue, key, newVal, oldVal);
+
+ expData.put(key, newVal);
+
+ break;
+ }
+
+ case 1: {
+ cache.getAndPut(key, newVal);
+
+ waitEvent(evtsQueue, key, newVal, oldVal);
+
+ expData.put(key, newVal);
+
+ break;
+ }
+
+ case 2: {
+ cache.remove(key);
+
+ waitEvent(evtsQueue, key, null, oldVal);
+
+ expData.remove(key);
+
+ break;
+ }
+
+ case 3: {
+ cache.getAndRemove(key);
+
+ waitEvent(evtsQueue, key, null, oldVal);
+
+ expData.remove(key);
+
+ break;
+ }
+
+ case 4: {
+ cache.invoke(key, new EntrySetValueProcessor(newVal, rnd.nextBoolean()));
+
+ waitEvent(evtsQueue, key, newVal, oldVal);
+
+ expData.put(key, newVal);
+
+ break;
+ }
+
+ case 5: {
+ cache.invoke(key, new EntrySetValueProcessor(null, rnd.nextBoolean()));
+
+ waitEvent(evtsQueue, key, null, oldVal);
+
+ expData.remove(key);
+
+ break;
+ }
+
+ case 6: {
+ cache.putIfAbsent(key, newVal);
+
+ if (oldVal == null) {
+ waitEvent(evtsQueue, key, newVal, null);
+
+ expData.put(key, newVal);
+ }
+ else
+ checkNoEvent(evtsQueue);
+
+ break;
+ }
+
+ case 7: {
+ cache.getAndPutIfAbsent(key, newVal);
+
+ if (oldVal == null) {
+ waitEvent(evtsQueue, key, newVal, null);
+
+ expData.put(key, newVal);
+ }
+ else
+ checkNoEvent(evtsQueue);
+
+ break;
+ }
+
+ case 8: {
+ cache.replace(key, newVal);
+
+ if (oldVal != null) {
+ waitEvent(evtsQueue, key, newVal, oldVal);
+
+ expData.put(key, newVal);
+ }
+ else
+ checkNoEvent(evtsQueue);
+
+ break;
+ }
+
+ case 9: {
+ cache.getAndReplace(key, newVal);
+
+ if (oldVal != null) {
+ waitEvent(evtsQueue, key, newVal, oldVal);
+
+ expData.put(key, newVal);
+ }
+ else
+ checkNoEvent(evtsQueue);
+
+ break;
+ }
+
+ case 10: {
+ if (oldVal != null) {
+ Object replaceVal = value(rnd);
+
+ boolean success = replaceVal.equals(oldVal);
+
+ if (success) {
+ cache.replace(key, replaceVal, newVal);
+
+ waitEvent(evtsQueue, key, newVal, oldVal);
+
+ expData.put(key, newVal);
+ }
+ else {
+ cache.replace(key, replaceVal, newVal);
+
+ checkNoEvent(evtsQueue);
+ }
+ }
+ else {
+ cache.replace(key, value(rnd), newVal);
+
+ checkNoEvent(evtsQueue);
+ }
+
+ break;
+ }
+
+ default:
+ fail();
+ }
+ }
+
+ /**
+ * @param rnd Random generator.
+ * @return Cache value.
+ */
+ private static Object value(Random rnd) {
+ return new QueryTestValue(rnd.nextInt(VALS));
+ }
+
+ /**
+ * @param evtsQueue Event queue.
+ * @param key Key.
+ * @param val Value.
+ * @param oldVal Old value.
+ * @throws Exception If failed.
+ */
+ private void waitEvent(BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue,
+ Object key, Object val, Object oldVal) throws Exception {
+ if (val == null && oldVal == null) {
+ checkNoEvent(evtsQueue);
+
+ return;
+ }
+
+ CacheEntryEvent<?, ?> evt = evtsQueue.poll(5, SECONDS);
+
+ assertNotNull("Failed to wait for event [key=" + key +
+ ", val=" + val +
+ ", oldVal=" + oldVal + ']', evt);
+ assertEquals(key, evt.getKey());
+ assertEquals(val, evt.getValue());
+ assertEquals(oldVal, evt.getOldValue());
+ }
+
+ /**
+ * @param evtsQueue Event queue.
+ * @throws Exception If failed.
+ */
+ private void checkNoEvent(BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue) throws Exception {
+ CacheEntryEvent<?, ?> evt = evtsQueue.poll(50, MILLISECONDS);
+
+ assertNull(evt);
+ }
+
+ /**
+ *
+ * @param cacheMode Cache mode.
+ * @param backups Number of backups.
+ * @param atomicityMode Cache atomicity mode.
+ * @param memoryMode Cache memory mode.
+ * @param store If {@code true} configures dummy cache store.
+ * @return Cache configuration.
+ */
+ private CacheConfiguration<Object, Object> cacheConfiguration(
+ CacheMode cacheMode,
+ int backups,
+ CacheAtomicityMode atomicityMode,
+ CacheMemoryMode memoryMode,
+ boolean store) {
+ CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
+
+ ccfg.setAtomicityMode(atomicityMode);
+ ccfg.setCacheMode(cacheMode);
+ ccfg.setMemoryMode(memoryMode);
+ ccfg.setWriteSynchronizationMode(FULL_SYNC);
+ ccfg.setAtomicWriteOrderMode(PRIMARY);
+
+ if (cacheMode == PARTITIONED)
+ ccfg.setBackups(backups);
+
+ if (store) {
+ ccfg.setCacheStoreFactory(new TestStoreFactory());
+ ccfg.setReadThrough(true);
+ ccfg.setWriteThrough(true);
+ }
+
+ return ccfg;
+ }
+
+ /**
+ *
+ */
+ private static class TestStoreFactory implements Factory<CacheStore<Object, Object>> {
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override public CacheStore<Object, Object> create() {
+ return new CacheStoreAdapter() {
+ @Override public Object load(Object key) throws CacheLoaderException {
+ return null;
+ }
+
+ @Override public void write(Cache.Entry entry) throws CacheWriterException {
+ // No-op.
+ }
+
+ @Override public void delete(Object key) throws CacheWriterException {
+ // No-op.
+ }
+ };
+ }
+ }
+
+ /**
+ *
+ */
+ static class QueryTestKey implements Serializable {
+ /** */
+ private final Integer key;
+
+ /**
+ * @param key Key.
+ */
+ public QueryTestKey(Integer key) {
+ this.key = key;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ QueryTestKey that = (QueryTestKey)o;
+
+ return key.equals(that.key);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return key.hashCode();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(QueryTestKey.class, this);
+ }
+ }
+
+ /**
+ *
+ */
+ static class QueryTestValue implements Serializable {
+ /** */
+ private final Integer val1;
+
+ /** */
+ private final String val2;
+
+ /**
+ * @param val Value.
+ */
+ public QueryTestValue(Integer val) {
+ this.val1 = val;
+ this.val2 = String.valueOf(val);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ QueryTestValue that = (QueryTestValue) o;
+
+ return val1.equals(that.val1) && val2.equals(that.val2);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ int res = val1.hashCode();
+
+ res = 31 * res + val2.hashCode();
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(QueryTestValue.class, this);
+ }
+ }
+ /**
+ *
+ */
+ protected static class EntrySetValueProcessor implements EntryProcessor<Object, Object, Object> {
+ /** */
+ private Object val;
+
+ /** */
+ private boolean retOld;
+
+ /**
+ * @param val Value to set.
+ * @param retOld Return old value flag.
+ */
+ public EntrySetValueProcessor(Object val, boolean retOld) {
+ this.val = val;
+ this.retOld = retOld;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object process(MutableEntry<Object, Object> e, Object... args) {
+ Object old = retOld ? e.getValue() : null;
+
+ if (val != null)
+ e.setValue(val);
+ else
+ e.remove();
+
+ return old;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(EntrySetValueProcessor.class, this);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
index 5abb98d..dbe282e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
@@ -40,6 +40,7 @@ import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+import org.apache.ignite.cache.CacheMemoryMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.cache.query.ContinuousQuery;
@@ -73,9 +74,9 @@ import org.jsr166.ConcurrentHashMap8;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
-
import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED;
import static org.apache.ignite.cache.CacheMode.LOCAL;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheMode.REPLICATED;
@@ -117,6 +118,7 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
cacheCfg.setReadThrough(true);
cacheCfg.setWriteThrough(true);
cacheCfg.setLoadPreviousValue(true);
+ cacheCfg.setMemoryMode(memoryMode());
cfg.setCacheConfiguration(cacheCfg);
}
@@ -135,6 +137,13 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
}
/**
+ * @return Cache memory mode.
+ */
+ protected CacheMemoryMode memoryMode() {
+ return ONHEAP_TIERED;
+ }
+
+ /**
* @return Peer class loading enabled flag.
*/
protected boolean peerClassLoadingEnabled() {
@@ -393,8 +402,8 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
}
});
- try (QueryCursor<Cache.Entry<Integer, Integer>> query2 = cache1.query(qry2);
- QueryCursor<Cache.Entry<Integer, Integer>> query1 = cache.query(qry1)) {
+ try (QueryCursor<Cache.Entry<Integer, Integer>> qryCur2 = cache1.query(qry2);
+ QueryCursor<Cache.Entry<Integer, Integer>> qryCur1 = cache.query(qry1)) {
for (int i = 0; i < gridCount(); i++) {
IgniteCache<Object, Object> cache0 = grid(i).cache(null);
@@ -448,7 +457,7 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
}
});
- QueryCursor<Cache.Entry<Integer, Integer>> query = cache.query(qry);
+ QueryCursor<Cache.Entry<Integer, Integer>> qryCur = cache.query(qry);
for (int key = 0; key < keyCnt; key++)
cache.put(key, key);
@@ -461,7 +470,7 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
}, 2000L);
}
finally {
- query.close();
+ qryCur.close();
}
}
else {
http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAtomicOffheapTieredTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAtomicOffheapTieredTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAtomicOffheapTieredTest.java
new file mode 100644
index 0000000..d6948e2
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAtomicOffheapTieredTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.cache.CacheMemoryMode;
+
+import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED;
+
+/**
+ *
+ */
+public class GridCacheContinuousQueryAtomicOffheapTieredTest extends GridCacheContinuousQueryAtomicSelfTest {
+ /** {@inheritDoc} */
+ @Override protected CacheMemoryMode memoryMode() {
+ return OFFHEAP_TIERED;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAtomicOffheapValuesTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAtomicOffheapValuesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAtomicOffheapValuesTest.java
new file mode 100644
index 0000000..4002435
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAtomicOffheapValuesTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.cache.CacheMemoryMode;
+
+import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_VALUES;
+
+/**
+ *
+ */
+public class GridCacheContinuousQueryAtomicOffheapValuesTest extends GridCacheContinuousQueryAtomicSelfTest {
+ /** {@inheritDoc} */
+ @Override protected CacheMemoryMode memoryMode() {
+ return OFFHEAP_VALUES;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryTxOffheapTieredTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryTxOffheapTieredTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryTxOffheapTieredTest.java
new file mode 100644
index 0000000..bcba7b6
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryTxOffheapTieredTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.cache.CacheMemoryMode;
+
+import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED;
+
+/**
+ *
+ */
+public class GridCacheContinuousQueryTxOffheapTieredTest extends GridCacheContinuousQueryTxSelfTest {
+ /** {@inheritDoc} */
+ @Override protected CacheMemoryMode memoryMode() {
+ return OFFHEAP_TIERED;
+ }
+}