You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by nt...@apache.org on 2016/03/01 12:09:37 UTC
[11/20] ignite git commit: ignite-2521: Configuration variations
tests framework + IgniteCacheBasicConfigVariationsFullApiTestSuite +
ignite-2554: Fixed Affinity.mapKeyToNode() for dynamically started LOCAL
cache
http://git-wip-us.apache.org/repos/asf/ignite/blob/953b575f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigVariationsFullApiTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigVariationsFullApiTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigVariationsFullApiTest.java
new file mode 100644
index 0000000..2ba7bb9
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigVariationsFullApiTest.java
@@ -0,0 +1,5851 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import javax.cache.Cache;
+import javax.cache.CacheException;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryListenerException;
+import javax.cache.event.CacheEntryUpdatedListener;
+import javax.cache.expiry.Duration;
+import javax.cache.expiry.ExpiryPolicy;
+import javax.cache.expiry.TouchedExpiryPolicy;
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.EntryProcessorResult;
+import javax.cache.processor.MutableEntry;
+import junit.framework.AssertionFailedError;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteTransactions;
+import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+import org.apache.ignite.cache.CacheEntryProcessor;
+import org.apache.ignite.cache.CachePeekMode;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cache.query.ScanQuery;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.internal.util.lang.GridAbsPredicateX;
+import org.apache.ignite.internal.util.typedef.CIX1;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.PA;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.lang.IgniteClosure;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.resources.LoggerResource;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.IgniteCacheConfigVariationsAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+import org.jetbrains.annotations.Nullable;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED;
+import static org.apache.ignite.cache.CacheMode.LOCAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cache.CachePeekMode.ALL;
+import static org.apache.ignite.cache.CachePeekMode.BACKUP;
+import static org.apache.ignite.cache.CachePeekMode.OFFHEAP;
+import static org.apache.ignite.cache.CachePeekMode.ONHEAP;
+import static org.apache.ignite.cache.CachePeekMode.PRIMARY;
+import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_LOCKED;
+import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_SWAPPED;
+import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_UNLOCKED;
+import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_UNSWAPPED;
+import static org.apache.ignite.testframework.GridTestUtils.assertThrows;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
+import static org.apache.ignite.transactions.TransactionState.COMMITTED;
+
+/**
+ * Full API cache test.
+ */
+@SuppressWarnings({"TransientFieldInNonSerializableClass", "unchecked"})
+public class IgniteCacheConfigVariationsFullApiTest extends IgniteCacheConfigVariationsAbstractTest {
+ /** Test timeout */
+ private static final long TEST_TIMEOUT = 60 * 1000;
+
+ /** */
+ public static final CacheEntryProcessor<String, Integer, String> ERR_PROCESSOR =
+ new CacheEntryProcessor<String, Integer, String>() {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ @Override public String process(MutableEntry<String, Integer> e, Object... args) {
+ throw new RuntimeException("Failed!");
+ }
+ };
+
+ /** Increment processor for invoke operations. */
+ public static final EntryProcessor<Object, Object, Object> INCR_PROCESSOR = new IncrementEntryProcessor();
+
+ /** Increment processor for invoke operations with IgniteEntryProcessor. */
+ public static final CacheEntryProcessor<Object, Object, Object> INCR_IGNITE_PROCESSOR =
+ new CacheEntryProcessor<Object, Object, Object>() {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ @Override public Object process(MutableEntry<Object, Object> e, Object... args) {
+ return INCR_PROCESSOR.process(e, args);
+ }
+ };
+
+ /** Increment processor for invoke operations. */
+ public static final EntryProcessor<Object, Object, Object> RMV_PROCESSOR = new RemoveEntryProcessor();
+
+ /** Increment processor for invoke operations with IgniteEntryProcessor. */
+ public static final CacheEntryProcessor<Object, Object, Object> RMV_IGNITE_PROCESSOR =
+ new CacheEntryProcessor<Object, Object, Object>() {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ @Override public Object process(MutableEntry<Object, Object> e, Object... args) {
+ return RMV_PROCESSOR.process(e, args);
+ }
+ };
+
+ /** {@inheritDoc} */
+ @Override protected long getTestTimeout() {
+ return TEST_TIMEOUT;
+ }
+
+ /**
+ * @throws Exception In case of error.
+ */
+ public void testSize() throws Exception {
+ assert jcache().localSize() == 0;
+
+ int size = 10;
+
+ final Map<String, Integer> map = new HashMap<>();
+
+ for (int i = 0; i < size; i++)
+ map.put("key" + i, i);
+
+ // Put in primary nodes to avoid near readers which will prevent entry from being cleared.
+ Map<ClusterNode, Collection<String>> mapped = grid(0).<String>affinity(cacheName()).mapKeysToNodes(map.keySet());
+
+ for (int i = 0; i < gridCount(); i++) {
+ Collection<String> keys = mapped.get(grid(i).localNode());
+
+ if (!F.isEmpty(keys)) {
+ for (String key : keys)
+ jcache(i).put(key, map.get(key));
+ }
+ }
+
+ map.remove("key0");
+
+ mapped = grid(0).<String>affinity(cacheName()).mapKeysToNodes(map.keySet());
+
+ for (int i = 0; i < gridCount(); i++) {
+ // Will actually delete entry from map.
+ CU.invalidate(jcache(i), "key0");
+
+ assertNull("Failed check for grid: " + i, jcache(i).localPeek("key0", ONHEAP));
+
+ Collection<String> keysCol = mapped.get(grid(i).localNode());
+
+ assert jcache(i).localSize() != 0 || F.isEmpty(keysCol);
+ }
+
+ for (int i = 0; i < gridCount(); i++)
+ executeOnLocalOrRemoteJvm(i, new CheckCacheSizeTask(map, cacheName()));
+
+ for (int i = 0; i < gridCount(); i++) {
+ Collection<String> keysCol = mapped.get(grid(i).localNode());
+
+ assertEquals("Failed check for grid: " + i, !F.isEmpty(keysCol) ? keysCol.size() : 0,
+ jcache(i).localSize(PRIMARY));
+ }
+
+ int globalPrimarySize = map.size();
+
+ for (int i = 0; i < gridCount(); i++)
+ assertEquals(globalPrimarySize, jcache(i).size(PRIMARY));
+
+ int times = 1;
+
+ if (cacheMode() == REPLICATED)
+ times = gridCount() - clientsCount();
+ else if (cacheMode() == PARTITIONED)
+ times = Math.min(gridCount(), jcache().getConfiguration(CacheConfiguration.class).getBackups() + 1);
+
+ int globalSize = globalPrimarySize * times;
+
+ for (int i = 0; i < gridCount(); i++)
+ assertEquals(globalSize, jcache(i).size(ALL));
+ }
+
+ /**
+ * @throws Exception In case of error.
+ */
+ public void testContainsKey() throws Exception {
+ jcache().put("testContainsKey", 1);
+
+ checkContainsKey(true, "testContainsKey");
+ checkContainsKey(false, "testContainsKeyWrongKey");
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testContainsKeyTx() throws Exception {
+ if (!txEnabled())
+ return;
+
+ IgniteCache<String, Integer> cache = jcache();
+
+ IgniteTransactions txs = ignite(0).transactions();
+
+ for (int i = 0; i < 10; i++) {
+ String key = String.valueOf(i);
+
+ try (Transaction tx = txs.txStart()) {
+ assertNull(key, cache.get(key));
+
+ assertFalse(cache.containsKey(key));
+
+ tx.commit();
+ }
+
+ try (Transaction tx = txs.txStart()) {
+ assertNull(key, cache.get(key));
+
+ cache.put(key, i);
+
+ assertTrue(cache.containsKey(key));
+
+ tx.commit();
+ }
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testContainsKeysTx() throws Exception {
+ if (!txEnabled())
+ return;
+
+ IgniteCache<String, Integer> cache = jcache();
+
+ IgniteTransactions txs = ignite(0).transactions();
+
+ Set<String> keys = new HashSet<>();
+
+ for (int i = 0; i < 10; i++) {
+ String key = String.valueOf(i);
+
+ keys.add(key);
+ }
+
+ try (Transaction tx = txs.txStart()) {
+ for (String key : keys)
+ assertNull(key, cache.get(key));
+
+ assertFalse(cache.containsKeys(keys));
+
+ tx.commit();
+ }
+
+ try (Transaction tx = txs.txStart()) {
+ for (String key : keys)
+ assertNull(key, cache.get(key));
+
+ for (String key : keys)
+ cache.put(key, 0);
+
+ assertTrue(cache.containsKeys(keys));
+
+ tx.commit();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testRemoveInExplicitLocks() throws Exception {
+ if (lockingEnabled()) {
+ IgniteCache<String, Integer> cache = jcache();
+
+ cache.put("a", 1);
+
+ Lock lock = cache.lockAll(ImmutableSet.of("a", "b", "c", "d"));
+
+ lock.lock();
+
+ try {
+ cache.remove("a");
+
+ // Make sure single-key operation did not remove lock.
+ cache.putAll(F.asMap("b", 2, "c", 3, "d", 4));
+ }
+ finally {
+ lock.unlock();
+ }
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testRemoveAllSkipStore() throws Exception {
+ if (isMultiJvm())
+ fail("https://issues.apache.org/jira/browse/IGNITE-1088");
+
+ if (!storeEnabled())
+ return;
+
+ IgniteCache<String, Integer> jcache = jcache();
+
+ jcache.putAll(F.asMap("1", 1, "2", 2, "3", 3));
+
+ jcache.withSkipStore().removeAll();
+
+ assertEquals((Integer)1, jcache.get("1"));
+ assertEquals((Integer)2, jcache.get("2"));
+ assertEquals((Integer)3, jcache.get("3"));
+ }
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ public void testAtomicOps() throws IgniteCheckedException {
+ IgniteCache<String, Integer> c = jcache();
+
+ final int cnt = 10;
+
+ for (int i = 0; i < cnt; i++)
+ assertNull(c.getAndPutIfAbsent("k" + i, i));
+
+ for (int i = 0; i < cnt; i++) {
+ boolean wrong = i % 2 == 0;
+
+ String key = "k" + i;
+
+ boolean res = c.replace(key, wrong ? i + 1 : i, -1);
+
+ assertEquals(wrong, !res);
+ }
+
+ for (int i = 0; i < cnt; i++) {
+ boolean success = i % 2 != 0;
+
+ String key = "k" + i;
+
+ boolean res = c.remove(key, -1);
+
+ assertTrue(success == res);
+ }
+ }
+
+ /**
+ * @throws Exception In case of error.
+ */
+ public void testGet() throws Exception {
+ runInAllDataModes(new TestRunnable() {
+ @Override public void run() {
+ IgniteCache cache = jcache();
+
+ cache.put(key(1), value(1));
+ cache.put(key(2), value(2));
+
+ assertEquals(value(1), cache.get(key(1)));
+ assertEquals(value(2), cache.get(key(2)));
+ // Wrong key.
+ assertNull(cache.get(key(3)));
+ }
+ });
+ }
+
+ /**
+ * @throws Exception In case of error.
+ */
+ public void testGetAsync() throws Exception {
+ IgniteCache<String, Integer> cache = jcache();
+
+ cache.put("key1", 1);
+ cache.put("key2", 2);
+
+ IgniteCache<String, Integer> cacheAsync = cache.withAsync();
+
+ cacheAsync.get("key1");
+
+ IgniteFuture<Integer> fut1 = cacheAsync.future();
+
+ cacheAsync.get("key2");
+
+ IgniteFuture<Integer> fut2 = cacheAsync.future();
+
+ cacheAsync.get("wrongKey");
+
+ IgniteFuture<Integer> fut3 = cacheAsync.future();
+
+ assert fut1.get() == 1;
+ assert fut2.get() == 2;
+ assert fut3.get() == null;
+ }
+
+ /**
+ * @throws Exception In case of error.
+ */
+ public void testGetAll() throws Exception {
+ runInAllDataModes(new TestRunnable() {
+ @Override public void run() {
+ final Object key1 = key(1);
+ final Object key2 = key(2);
+ final Object key9999 = key(9999);
+
+ final Object val1 = value(1);
+ final Object val2 = value(2);
+
+ Transaction tx = txShouldBeUsed() ? transactions().txStart() : null;
+
+ final IgniteCache<Object, Object> cache = jcache();
+
+ try {
+ cache.put(key1, val1);
+ cache.put(key2, val2);
+
+ if (tx != null)
+ tx.commit();
+ }
+ finally {
+ if (tx != null)
+ tx.close();
+ }
+
+ GridTestUtils.assertThrows(log, new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ cache.getAll(null).isEmpty();
+
+ return null;
+ }
+ }, NullPointerException.class, null);
+
+ assert cache.getAll(Collections.<Object>emptySet()).isEmpty();
+
+ Map<Object, Object> map1 = cache.getAll(ImmutableSet.of(key1, key2, key9999));
+
+ info("Retrieved map1: " + map1);
+
+ assert 2 == map1.size() : "Invalid map: " + map1;
+
+ assertEquals(val1, map1.get(key1));
+ assertEquals(val2, map1.get(key2));
+ assertNull(map1.get(key9999));
+
+ Map<Object, Object> map2 = cache.getAll(ImmutableSet.of(key1, key2, key9999));
+
+ info("Retrieved map2: " + map2);
+
+ assert 2 == map2.size() : "Invalid map: " + map2;
+
+ assertEquals(val1, map2.get(key1));
+ assertEquals(val2, map2.get(key2));
+ assertNull(map2.get(key9999));
+
+ // Now do the same checks but within transaction.
+ if (txShouldBeUsed()) {
+ try (Transaction tx0 = transactions().txStart()) {
+ assert cache.getAll(Collections.<Object>emptySet()).isEmpty();
+
+ map1 = cache.getAll(ImmutableSet.of(key1, key2, key9999));
+
+ info("Retrieved map1: " + map1);
+
+ assert 2 == map1.size() : "Invalid map: " + map1;
+
+ assertEquals(val1, map2.get(key1));
+ assertEquals(val2, map2.get(key2));
+ assertNull(map2.get(key9999));
+
+ map2 = cache.getAll(ImmutableSet.of(key1, key2, key9999));
+
+ info("Retrieved map2: " + map2);
+
+ assert 2 == map2.size() : "Invalid map: " + map2;
+
+ assertEquals(val1, map2.get(key1));
+ assertEquals(val2, map2.get(key2));
+ assertNull(map2.get(key9999));
+
+ tx0.commit();
+ }
+ }
+ }
+ });
+ }
+
+ /**
+ * @throws Exception In case of error.
+ */
+ public void testGetAllWithNulls() throws Exception {
+ final IgniteCache<String, Integer> cache = jcache();
+
+ final Set<String> c = new HashSet<>();
+
+ c.add("key1");
+ c.add(null);
+
+ GridTestUtils.assertThrows(log, new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ cache.getAll(c);
+
+ return null;
+ }
+ }, NullPointerException.class, null);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testGetTxNonExistingKey() throws Exception {
+ if (txShouldBeUsed()) {
+ try (Transaction ignored = transactions().txStart()) {
+ assert jcache().get("key999123") == null;
+ }
+ }
+ }
+
+ /**
+ * @throws Exception In case of error.
+ */
+ public void testGetAllAsync() throws Exception {
+ final IgniteCache<String, Integer> cache = jcache();
+
+ final IgniteCache<String, Integer> cacheAsync = cache.withAsync();
+
+ cache.put("key1", 1);
+ cache.put("key2", 2);
+
+ GridTestUtils.assertThrows(log, new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ cacheAsync.getAll(null);
+
+ return null;
+ }
+ }, NullPointerException.class, null);
+
+ cacheAsync.getAll(Collections.<String>emptySet());
+ IgniteFuture<Map<String, Integer>> fut2 = cacheAsync.future();
+
+ cacheAsync.getAll(ImmutableSet.of("key1", "key2"));
+ IgniteFuture<Map<String, Integer>> fut3 = cacheAsync.future();
+
+ assert fut2.get().isEmpty();
+ assert fut3.get().size() == 2 : "Invalid map: " + fut3.get();
+ assert fut3.get().get("key1") == 1;
+ assert fut3.get().get("key2") == 2;
+ }
+
+ /**
+ * @throws Exception In case of error.
+ */
+ public void testPut() throws Exception {
+ runInAllDataModes(new TestRunnable() {
+ @Override public void run() throws Exception {
+ IgniteCache cache = jcache();
+
+ final Object key1 = key(1);
+ final Object val1 = value(1);
+ final Object key2 = key(2);
+ final Object val2 = value(2);
+
+ assert cache.getAndPut(key1, val1) == null;
+ assert cache.getAndPut(key2, val2) == null;
+
+ // Check inside transaction.
+ assertEquals(val1, cache.get(key1));
+ assertEquals(val2, cache.get(key2));
+
+ // Put again to check returned values.
+ assertEquals(val1, cache.getAndPut(key1, val1));
+ assertEquals(val2, cache.getAndPut(key2, val2));
+
+ checkContainsKey(true, key1);
+ checkContainsKey(true, key2);
+
+ assert cache.get(key1) != null;
+ assert cache.get(key2) != null;
+ assert cache.get(key(100500)) == null;
+
+ // Check outside transaction.
+ checkContainsKey(true, key1);
+ checkContainsKey(true, key2);
+
+ assertEquals(val1, cache.get(key1));
+ assertEquals(val2, cache.get(key2));
+ assert cache.get(key(100500)) == null;
+
+ assertEquals(val1, cache.getAndPut(key1, value(10)));
+ assertEquals(val2, cache.getAndPut(key2, value(11)));
+ }
+ });
+ }
+
+ /**
+ * @throws Exception In case of error.
+ */
+ public void testPutTx() throws Exception {
+ if (txShouldBeUsed()) {
+ IgniteCache<String, Integer> cache = jcache();
+
+ try (Transaction tx = transactions().txStart()) {
+ assert cache.getAndPut("key1", 1) == null;
+ assert cache.getAndPut("key2", 2) == null;
+
+ // Check inside transaction.
+ assert cache.get("key1") == 1;
+ assert cache.get("key2") == 2;
+
+ // Put again to check returned values.
+ assert cache.getAndPut("key1", 1) == 1;
+ assert cache.getAndPut("key2", 2) == 2;
+
+ assert cache.get("key1") != null;
+ assert cache.get("key2") != null;
+ assert cache.get("wrong") == null;
+
+ tx.commit();
+ }
+
+ // Check outside transaction.
+ checkContainsKey(true, "key1");
+ checkContainsKey(true, "key2");
+
+ assert cache.get("key1") == 1;
+ assert cache.get("key2") == 2;
+ assert cache.get("wrong") == null;
+
+ assertEquals((Integer)1, cache.getAndPut("key1", 10));
+ assertEquals((Integer)2, cache.getAndPut("key2", 11));
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testInvokeOptimisticReadCommitted() throws Exception {
+ runInAllDataModes(new TestRunnable() {
+ @Override public void run() throws Exception {
+ checkInvoke(OPTIMISTIC, READ_COMMITTED);
+ }
+ });
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testInvokeOptimisticRepeatableRead() throws Exception {
+ runInAllDataModes(new TestRunnable() {
+ @Override public void run() throws Exception {
+ checkInvoke(OPTIMISTIC, REPEATABLE_READ);
+ }
+ });
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testInvokePessimisticReadCommitted() throws Exception {
+ runInAllDataModes(new TestRunnable() {
+ @Override public void run() throws Exception {
+ checkInvoke(PESSIMISTIC, READ_COMMITTED);
+ }
+ });
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testInvokePessimisticRepeatableRead() throws Exception {
+ runInAllDataModes(new TestRunnable() {
+ @Override public void run() throws Exception {
+ checkInvoke(PESSIMISTIC, REPEATABLE_READ);
+ }
+ });
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testIgniteInvokeOptimisticReadCommitted1() throws Exception {
+ runInAllDataModes(new TestRunnable() {
+ @Override public void run() throws Exception {
+ checkIgniteInvoke(OPTIMISTIC, READ_COMMITTED);
+ }
+ });
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testIgniteInvokeOptimisticRepeatableRead() throws Exception {
+ runInAllDataModes(new TestRunnable() {
+ @Override public void run() throws Exception {
+ checkIgniteInvoke(OPTIMISTIC, REPEATABLE_READ);
+ }
+ });
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testIgniteInvokePessimisticReadCommitted() throws Exception {
+ runInAllDataModes(new TestRunnable() {
+ @Override public void run() throws Exception {
+ checkIgniteInvoke(PESSIMISTIC, READ_COMMITTED);
+ }
+ });
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testIgniteInvokePessimisticRepeatableRead() throws Exception {
+ runInAllDataModes(new TestRunnable() {
+ @Override public void run() throws Exception {
+ checkIgniteInvoke(PESSIMISTIC, REPEATABLE_READ);
+ }
+ });
+ }
+
+ /**
+ * @param concurrency Concurrency.
+ * @param isolation Isolation.
+ * @throws Exception If failed.
+ */
+ private void checkIgniteInvoke(TransactionConcurrency concurrency, TransactionIsolation isolation)
+ throws Exception {
+ checkInvoke(concurrency, isolation, INCR_IGNITE_PROCESSOR, RMV_IGNITE_PROCESSOR);
+ }
+
+ /**
+ * @param concurrency Transaction concurrency.
+ * @param isolation Transaction isolation.
+ * @param incrProcessor Increment processor.
+ * @param rmvProseccor Remove processor.
+ */
+ private void checkInvoke(TransactionConcurrency concurrency, TransactionIsolation isolation,
+ EntryProcessor<Object, Object, Object> incrProcessor,
+ EntryProcessor<Object, Object, Object> rmvProseccor) {
+ IgniteCache cache = jcache();
+
+ final Object key1 = key(1);
+ final Object key2 = key(2);
+ final Object key3 = key(3);
+
+ final Object val1 = value(1);
+ final Object val2 = value(2);
+ final Object val3 = value(3);
+
+ cache.put(key2, val1);
+ cache.put(key3, val3);
+
+ Transaction tx = txShouldBeUsed() ? ignite(0).transactions().txStart(concurrency, isolation) : null;
+
+ try {
+ assertNull(cache.invoke(key1, incrProcessor, dataMode));
+ assertEquals(val1, cache.invoke(key2, incrProcessor, dataMode));
+ assertEquals(val3, cache.invoke(key3, rmvProseccor));
+
+ if (tx != null)
+ tx.commit();
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+
+ throw e;
+ }
+ finally {
+ if (tx != null)
+ tx.close();
+ }
+
+ assertEquals(val1, cache.get(key1));
+ assertEquals(val2, cache.get(key2));
+ assertNull(cache.get(key3));
+
+ for (int i = 0; i < gridCount(); i++)
+ assertNull("Failed for cache: " + i, jcache(i).localPeek(key3, ONHEAP));
+
+ cache.remove(key1);
+ cache.put(key2, val1);
+ cache.put(key3, val3);
+
+ assertNull(cache.invoke(key1, incrProcessor, dataMode));
+ assertEquals(val1, cache.invoke(key2, incrProcessor, dataMode));
+ assertEquals(val3, cache.invoke(key3, rmvProseccor));
+
+ assertEquals(val1, cache.get(key1));
+ assertEquals(val2, cache.get(key2));
+ assertNull(cache.get(key3));
+
+ for (int i = 0; i < gridCount(); i++)
+ assertNull(jcache(i).localPeek(key3, ONHEAP));
+ }
+
+ /**
+ * @param concurrency Concurrency.
+ * @param isolation Isolation.
+ * @throws Exception If failed.
+ */
+ private void checkInvoke(TransactionConcurrency concurrency, TransactionIsolation isolation) throws Exception {
+ checkInvoke(concurrency, isolation, INCR_PROCESSOR, RMV_PROCESSOR);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testInvokeAllOptimisticReadCommitted() throws Exception {
+ runInAllDataModes(new TestRunnable() {
+ @Override public void run() throws Exception {
+ checkInvokeAll(OPTIMISTIC, READ_COMMITTED);
+ }
+ });
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testInvokeAllOptimisticRepeatableRead() throws Exception {
+ runInAllDataModes(new TestRunnable() {
+ @Override public void run() throws Exception {
+ checkInvokeAll(OPTIMISTIC, REPEATABLE_READ);
+ }
+ });
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testInvokeAllPessimisticReadCommitted() throws Exception {
+ runInAllDataModes(new TestRunnable() {
+ @Override public void run() throws Exception {
+ checkInvokeAll(PESSIMISTIC, READ_COMMITTED);
+ }
+ });
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testInvokeAllPessimisticRepeatableRead() throws Exception {
+ runInAllDataModes(new TestRunnable() {
+ @Override public void run() throws Exception {
+ checkInvokeAll(PESSIMISTIC, REPEATABLE_READ);
+ }
+ });
+ }
+
+ /**
+ * @param concurrency Transaction concurrency.
+ * @param isolation Transaction isolation.
+ * @throws Exception If failed.
+ */
+ private void checkInvokeAll(TransactionConcurrency concurrency, TransactionIsolation isolation) throws Exception {
+ // TODO IGNITE-2664: enable tests for all modes when IGNITE-2664 will be fixed.
+ if (dataMode != DataMode.EXTERNALIZABLE && gridCount() > 1)
+ return;
+
+ final Object key1 = key(1);
+ final Object key2 = key(2);
+ final Object key3 = key(3);
+
+ final Object val1 = value(1);
+ final Object val2 = value(2);
+ final Object val3 = value(3);
+ final Object val4 = value(4);
+
+ final IgniteCache<Object, Object> cache = jcache();
+
+ cache.put(key2, val1);
+ cache.put(key3, val3);
+
+ if (txShouldBeUsed()) {
+ Map<Object, EntryProcessorResult<Object>> res;
+
+ try (Transaction tx = ignite(0).transactions().txStart(concurrency, isolation)) {
+ res = cache.invokeAll(F.asSet(key1, key2, key3), INCR_PROCESSOR, dataMode);
+
+ tx.commit();
+ }
+
+ assertEquals(val1, cache.get(key1));
+ assertEquals(val2, cache.get(key2));
+ assertEquals(val4, cache.get(key3));
+
+ assertNull(res.get(key1));
+ assertEquals(val1, res.get(key2).get());
+ assertEquals(val3, res.get(key3).get());
+
+ assertEquals(2, res.size());
+
+ cache.remove(key1);
+ cache.put(key2, val1);
+ cache.put(key3, val3);
+ }
+
+ Map<Object, EntryProcessorResult<Object>> res = cache.invokeAll(F.asSet(key1, key2, key3), RMV_PROCESSOR);
+
+ for (int i = 0; i < gridCount(); i++) {
+ assertNull(jcache(i).localPeek(key1, ONHEAP));
+ assertNull(jcache(i).localPeek(key2, ONHEAP));
+ assertNull(jcache(i).localPeek(key3, ONHEAP));
+ }
+
+ assertNull(res.get(key1));
+ assertEquals(val1, res.get(key2).get());
+ assertEquals(val3, res.get(key3).get());
+
+ assertEquals(2, res.size());
+
+ cache.remove(key1);
+ cache.put(key2, val1);
+ cache.put(key3, val3);
+
+ res = cache.invokeAll(F.asSet(key1, key2, key3), INCR_PROCESSOR, dataMode);
+
+ assertEquals(val1, cache.get(key1));
+ assertEquals(val2, cache.get(key2));
+ assertEquals(val4, cache.get(key3));
+
+ assertNull(res.get(key1));
+ assertEquals(val1, res.get(key2).get());
+ assertEquals(val3, res.get(key3).get());
+
+ assertEquals(2, res.size());
+
+ cache.remove(key1);
+ cache.put(key2, val1);
+ cache.put(key3, val3);
+
+ res = cache.invokeAll(F.asMap(key1, INCR_PROCESSOR, key2, INCR_PROCESSOR, key3, INCR_PROCESSOR), dataMode);
+
+ assertEquals(val1, cache.get(key1));
+ assertEquals(val2, cache.get(key2));
+ assertEquals(val4, cache.get(key3));
+
+ assertNull(res.get(key1));
+ assertEquals(val1, res.get(key2).get());
+ assertEquals(val3, res.get(key3).get());
+
+ assertEquals(2, res.size());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testInvokeAllWithNulls() throws Exception {
+ runInAllDataModes(new TestRunnable() {
+ @Override public void run() throws Exception {
+ final Object key1 = key(1);
+
+ final IgniteCache<Object, Object> cache = jcache();
+
+ GridTestUtils.assertThrows(log, new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ cache.invokeAll((Set<Object>)null, INCR_PROCESSOR, dataMode);
+
+ return null;
+ }
+ }, NullPointerException.class, null);
+
+ GridTestUtils.assertThrows(log, new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ cache.invokeAll(F.asSet(key1), null);
+
+ return null;
+ }
+ }, NullPointerException.class, null);
+
+ {
+ final Set<Object> keys = new LinkedHashSet<>(2);
+
+ keys.add(key1);
+ keys.add(null);
+
+ GridTestUtils.assertThrows(log, new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ cache.invokeAll(keys, INCR_PROCESSOR, dataMode);
+
+ return null;
+ }
+ }, NullPointerException.class, null);
+
+ GridTestUtils.assertThrows(log, new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ cache.invokeAll(F.asSet(key1), null);
+
+ return null;
+ }
+ }, NullPointerException.class, null);
+ }
+ }
+ });
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testInvokeSequentialOptimisticNoStart() throws Exception {
+ runInAllDataModes(new TestRunnable() {
+ @Override public void run() throws Exception {
+ checkInvokeSequential0(false, OPTIMISTIC);
+ }
+ });
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testInvokeSequentialPessimisticNoStart() throws Exception {
+ runInAllDataModes(new TestRunnable() {
+ @Override public void run() throws Exception {
+ checkInvokeSequential0(false, PESSIMISTIC);
+ }
+ });
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testInvokeSequentialOptimisticWithStart() throws Exception {
+ runInAllDataModes(new TestRunnable() {
+ @Override public void run() throws Exception {
+ checkInvokeSequential0(true, OPTIMISTIC);
+ }
+ });
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testInvokeSequentialPessimisticWithStart() throws Exception {
+ runInAllDataModes(new TestRunnable() {
+ @Override public void run() throws Exception {
+ checkInvokeSequential0(true, PESSIMISTIC);
+ }
+ });
+ }
+
+ /**
+ * @param startVal Whether to put value.
+ * @param concurrency Concurrency.
+ * @throws Exception If failed.
+ */
+ private void checkInvokeSequential0(boolean startVal, TransactionConcurrency concurrency)
+ throws Exception {
+ final Object val1 = value(1);
+ final Object val2 = value(2);
+ final Object val3 = value(3);
+
+ IgniteCache<Object, Object> cache = jcache();
+
+ final Object key = primaryTestObjectKeysForCache(cache, 1).get(0);
+
+ Transaction tx = txShouldBeUsed() ? ignite(0).transactions().txStart(concurrency, READ_COMMITTED) : null;
+
+ try {
+ if (startVal)
+ cache.put(key, val2);
+ else
+ assertEquals(null, cache.get(key));
+
+ Object expRes = startVal ? val2 : null;
+
+ assertEquals(expRes, cache.invoke(key, INCR_PROCESSOR, dataMode));
+
+ expRes = startVal ? val3 : val1;
+
+ assertEquals(expRes, cache.invoke(key, INCR_PROCESSOR, dataMode));
+
+ expRes = value(valueOf(expRes) + 1);
+
+ assertEquals(expRes, cache.invoke(key, INCR_PROCESSOR, dataMode));
+
+ if (tx != null)
+ tx.commit();
+ }
+ finally {
+ if (tx != null)
+ tx.close();
+ }
+
+ Object exp = value((startVal ? 2 : 0) + 3);
+
+ assertEquals(exp, cache.get(key));
+
+ for (int i = 0; i < gridCount(); i++) {
+ if (ignite(i).affinity(cacheName()).isPrimaryOrBackup(grid(i).localNode(), key))
+ assertEquals(exp, peek(jcache(i), key));
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testInvokeAfterRemoveOptimistic() throws Exception {
+ runInAllDataModes(new TestRunnable() {
+ @Override public void run() throws Exception {
+ checkInvokeAfterRemove(OPTIMISTIC);
+ }
+ });
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testInvokeAfterRemovePessimistic() throws Exception {
+ runInAllDataModes(new TestRunnable() {
+ @Override public void run() throws Exception {
+ checkInvokeAfterRemove(PESSIMISTIC);
+ }
+ });
+ }
+
+ /**
+ * @param concurrency Concurrency.
+ * @throws Exception If failed.
+ */
+ private void checkInvokeAfterRemove(TransactionConcurrency concurrency) throws Exception {
+ IgniteCache<Object, Object> cache = jcache();
+
+ Object key = key(1);
+
+ cache.put(key, value(4));
+
+ Transaction tx = txShouldBeUsed() ? ignite(0).transactions().txStart(concurrency, READ_COMMITTED) : null;
+
+ try {
+ cache.remove(key);
+
+ cache.invoke(key, INCR_PROCESSOR, dataMode);
+ cache.invoke(key, INCR_PROCESSOR, dataMode);
+ cache.invoke(key, INCR_PROCESSOR, dataMode);
+
+ if (tx != null)
+ tx.commit();
+ }
+ finally {
+ if (tx != null)
+ tx.close();
+ }
+
+ assertEquals(value(3), cache.get(key));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testInvokeReturnValueGetOptimisticReadCommitted() throws Exception {
+ runInAllDataModes(new TestRunnable() {
+ @Override public void run() throws Exception {
+ checkInvokeReturnValue(false, OPTIMISTIC, READ_COMMITTED);
+ }
+ });
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testInvokeReturnValueGetOptimisticRepeatableRead() throws Exception {
+ runInAllDataModes(new TestRunnable() {
+ @Override public void run() throws Exception {
+ checkInvokeReturnValue(false, OPTIMISTIC, REPEATABLE_READ);
+ }
+ });
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testInvokeReturnValueGetPessimisticReadCommitted() throws Exception {
+ runInAllDataModes(new TestRunnable() {
+ @Override public void run() throws Exception {
+ checkInvokeReturnValue(false, PESSIMISTIC, READ_COMMITTED);
+ }
+ });
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testInvokeReturnValueGetPessimisticRepeatableRead() throws Exception {
+ runInAllDataModes(new TestRunnable() {
+ @Override public void run() throws Exception {
+ checkInvokeReturnValue(false, PESSIMISTIC, REPEATABLE_READ);
+ }
+ });
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testInvokeReturnValuePutInTx() throws Exception {
+ runInAllDataModes(new TestRunnable() {
+ @Override public void run() throws Exception {
+ checkInvokeReturnValue(true, OPTIMISTIC, READ_COMMITTED);
+ }
+ });
+ }
+
+ /**
+ * @param put Whether to put value.
+ * @param concurrency Concurrency.
+ * @param isolation Isolation.
+ * @throws Exception If failed.
+ */
+ private void checkInvokeReturnValue(boolean put,
+ TransactionConcurrency concurrency,
+ TransactionIsolation isolation)
+ throws Exception {
+ IgniteCache<Object, Object> cache = jcache();
+
+ Object key = key(1);
+ Object val1 = value(1);
+ Object val2 = value(2);
+
+ if (!put)
+ cache.put(key, val1);
+
+ Transaction tx = txShouldBeUsed() ? ignite(0).transactions().txStart(concurrency, isolation) : null;
+
+ try {
+ if (put)
+ cache.put(key, val1);
+
+ cache.invoke(key, INCR_PROCESSOR, dataMode);
+
+ assertEquals(val2, cache.get(key));
+
+ if (tx != null) {
+ // Second get inside tx. Make sure read value is not transformed twice.
+ assertEquals(val2, cache.get(key));
+
+ tx.commit();
+ }
+ }
+ finally {
+ if (tx != null)
+ tx.close();
+ }
+ }
+
+ /**
+ * @throws Exception In case of error.
+ */
+ public void testGetAndPutAsync() throws Exception {
+ IgniteCache<String, Integer> cache = jcache();
+
+ IgniteCache<String, Integer> cacheAsync = cache.withAsync();
+
+ cache.put("key1", 1);
+ cache.put("key2", 2);
+
+ cacheAsync.getAndPut("key1", 10);
+
+ IgniteFuture<Integer> fut1 = cacheAsync.future();
+
+ cacheAsync.getAndPut("key2", 11);
+
+ IgniteFuture<Integer> fut2 = cacheAsync.future();
+
+ assertEquals((Integer)1, fut1.get(5000));
+ assertEquals((Integer)2, fut2.get(5000));
+
+ assertEquals((Integer)10, cache.get("key1"));
+ assertEquals((Integer)11, cache.get("key2"));
+ }
+
+ /**
+ * @throws Exception In case of error.
+ */
+ public void testPutAsync0() throws Exception {
+ IgniteCache cacheAsync = jcache().withAsync();
+
+ cacheAsync.getAndPut("key1", 0);
+
+ IgniteFuture<Integer> fut1 = cacheAsync.future();
+
+ cacheAsync.getAndPut("key2", 1);
+
+ IgniteFuture<Integer> fut2 = cacheAsync.future();
+
+ assert fut1.get(5000) == null;
+ assert fut2.get(5000) == null;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testInvokeAsync() throws Exception {
+ runInAllDataModes(new TestRunnable() {
+ @Override public void run() throws Exception {
+ final Object key1 = key(1);
+ final Object key2 = key(2);
+ final Object key3 = key(3);
+
+ final Object val1 = value(1);
+ final Object val2 = value(2);
+ final Object val3 = value(3);
+
+ IgniteCache<Object, Object> cache = jcache();
+
+ cache.put(key2, val1);
+ cache.put(key3, val3);
+
+ IgniteCache<Object, Object> cacheAsync = cache.withAsync();
+
+ assertNull(cacheAsync.invoke(key1, INCR_PROCESSOR, dataMode));
+
+ IgniteFuture<?> fut0 = cacheAsync.future();
+
+ assertNull(cacheAsync.invoke(key2, INCR_PROCESSOR, dataMode));
+
+ IgniteFuture<?> fut1 = cacheAsync.future();
+
+ assertNull(cacheAsync.invoke(key3, RMV_PROCESSOR));
+
+ IgniteFuture<?> fut2 = cacheAsync.future();
+
+ fut0.get();
+ fut1.get();
+ fut2.get();
+
+ assertEquals(val1, cache.get(key1));
+ assertEquals(val2, cache.get(key2));
+ assertNull(cache.get(key3));
+
+ for (int i = 0; i < gridCount(); i++)
+ assertNull(jcache(i).localPeek(key3, ONHEAP));
+ }
+ });
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testInvoke() throws Exception {
+ runInAllDataModes(new TestRunnable() {
+ @Override public void run() throws Exception {
+ final Object k0 = key(0);
+ final Object k1 = key(1);
+
+ final Object val1 = value(1);
+ final Object val2 = value(2);
+ final Object val3 = value(3);
+
+ final IgniteCache<Object, Object> cache = jcache();
+
+ assertNull(cache.invoke(k0, INCR_PROCESSOR, dataMode));
+
+ assertEquals(k1, cache.get(k0));
+
+ assertEquals(val1, cache.invoke(k0, INCR_PROCESSOR, dataMode));
+
+ assertEquals(val2, cache.get(k0));
+
+ cache.put(k1, val1);
+
+ assertEquals(val1, cache.invoke(k1, INCR_PROCESSOR, dataMode));
+
+ assertEquals(val2, cache.get(k1));
+
+ assertEquals(val2, cache.invoke(k1, INCR_PROCESSOR, dataMode));
+
+ assertEquals(val3, cache.get(k1));
+
+ RemoveAndReturnNullEntryProcessor c = new RemoveAndReturnNullEntryProcessor();
+
+ assertNull(cache.invoke(k1, c));
+ assertNull(cache.get(k1));
+
+ for (int i = 0; i < gridCount(); i++)
+ assertNull(jcache(i).localPeek(k1, ONHEAP));
+
+ final EntryProcessor<Object, Object, Object> errProcessor = new FailedEntryProcessor();
+
+ GridTestUtils.assertThrows(log, new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ cache.invoke(k1, errProcessor);
+
+ return null;
+ }
+ }, EntryProcessorException.class, "Test entry processor exception.");
+ }
+ });
+ }
+
+ /**
+ * @throws Exception In case of error.
+ */
+ public void testPutx() throws Exception {
+ if (txShouldBeUsed())
+ checkPut(true);
+ }
+
+ /**
+ * @throws Exception In case of error.
+ */
+ public void testPutxNoTx() throws Exception {
+ checkPut(false);
+ }
+
+ /**
+ * @param inTx Whether to start transaction.
+ * @throws Exception If failed.
+ */
+ private void checkPut(boolean inTx) throws Exception {
+ Transaction tx = inTx ? transactions().txStart() : null;
+
+ IgniteCache<String, Integer> cache = jcache();
+
+ try {
+ cache.put("key1", 1);
+ cache.put("key2", 2);
+
+ // Check inside transaction.
+ assert cache.get("key1") == 1;
+ assert cache.get("key2") == 2;
+
+ if (tx != null)
+ tx.commit();
+ }
+ finally {
+ if (tx != null)
+ tx.close();
+ }
+
+ checkSize(F.asSet("key1", "key2"));
+
+ // Check outside transaction.
+ checkContainsKey(true, "key1");
+ checkContainsKey(true, "key2");
+ checkContainsKey(false, "wrong");
+
+ assert cache.get("key1") == 1;
+ assert cache.get("key2") == 2;
+ assert cache.get("wrong") == null;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPutAsync() throws Exception {
+ Transaction tx = txShouldBeUsed() ? transactions().txStart() : null;
+
+ IgniteCache cacheAsync = jcache().withAsync();
+
+ try {
+ jcache().put("key2", 1);
+
+ cacheAsync.put("key1", 10);
+
+ IgniteFuture<?> fut1 = cacheAsync.future();
+
+ cacheAsync.put("key2", 11);
+
+ IgniteFuture<?> fut2 = cacheAsync.future();
+
+ IgniteFuture<Transaction> f = null;
+
+ if (tx != null) {
+ tx = (Transaction)tx.withAsync();
+
+ tx.commit();
+
+ f = tx.future();
+ }
+
+ assertNull(fut1.get());
+ assertNull(fut2.get());
+
+ assert f == null || f.get().state() == COMMITTED;
+ }
+ finally {
+ if (tx != null)
+ tx.close();
+ }
+
+ checkSize(F.asSet("key1", "key2"));
+
+ assert jcache().get("key1") == 10;
+ assert jcache().get("key2") == 11;
+ }
+
+ /**
+ * @throws Exception In case of error.
+ */
+ public void testPutAll() throws Exception {
+ Map<String, Integer> map = F.asMap("key1", 1, "key2", 2);
+
+ IgniteCache<String, Integer> cache = jcache();
+
+ cache.putAll(map);
+
+ checkSize(F.asSet("key1", "key2"));
+
+ assert cache.get("key1") == 1;
+ assert cache.get("key2") == 2;
+
+ map.put("key1", 10);
+ map.put("key2", 20);
+
+ cache.putAll(map);
+
+ checkSize(F.asSet("key1", "key2"));
+
+ assert cache.get("key1") == 10;
+ assert cache.get("key2") == 20;
+ }
+
+ /**
+ * @throws Exception In case of error.
+ */
+ public void testNullInTx() throws Exception {
+ if (!txShouldBeUsed())
+ return;
+
+ final IgniteCache<String, Integer> cache = jcache();
+
+ for (int i = 0; i < 100; i++) {
+ final String key = "key-" + i;
+
+ assertNull(cache.get(key));
+
+ GridTestUtils.assertThrows(log, new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ IgniteTransactions txs = transactions();
+
+ try (Transaction tx = txs.txStart()) {
+ cache.put(key, 1);
+
+ cache.put(null, 2);
+
+ tx.commit();
+ }
+
+ return null;
+ }
+ }, NullPointerException.class, null);
+
+ assertNull(cache.get(key));
+
+ cache.put(key, 1);
+
+ assertEquals(1, (int)cache.get(key));
+
+ GridTestUtils.assertThrows(log, new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ IgniteTransactions txs = transactions();
+
+ try (Transaction tx = txs.txStart()) {
+ cache.put(key, 2);
+
+ cache.remove(null);
+
+ tx.commit();
+ }
+
+ return null;
+ }
+ }, NullPointerException.class, null);
+
+ assertEquals(1, (int)cache.get(key));
+
+ cache.put(key, 2);
+
+ assertEquals(2, (int)cache.get(key));
+
+ GridTestUtils.assertThrows(log, new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ IgniteTransactions txs = transactions();
+
+ Map<String, Integer> map = new LinkedHashMap<>();
+
+ map.put("k1", 1);
+ map.put("k2", 2);
+ map.put(null, 3);
+
+ try (Transaction tx = txs.txStart()) {
+ cache.put(key, 1);
+
+ cache.putAll(map);
+
+ tx.commit();
+ }
+
+ return null;
+ }
+ }, NullPointerException.class, null);
+
+ assertNull(cache.get("k1"));
+ assertNull(cache.get("k2"));
+
+ assertEquals(2, (int)cache.get(key));
+
+ cache.put(key, 3);
+
+ assertEquals(3, (int)cache.get(key));
+ }
+ }
+
+ /**
+ * @throws Exception In case of error.
+ */
+ public void testPutAllWithNulls() throws Exception {
+ final IgniteCache<String, Integer> cache = jcache();
+
+ {
+ final Map<String, Integer> m = new LinkedHashMap<>(2);
+
+ m.put("key1", 1);
+ m.put(null, 2);
+
+ GridTestUtils.assertThrows(log, new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ cache.putAll(m);
+
+ return null;
+ }
+ }, NullPointerException.class, null);
+
+ cache.put("key1", 1);
+
+ assertEquals(1, (int)cache.get("key1"));
+ }
+
+ {
+ final Map<String, Integer> m = new LinkedHashMap<>(2);
+
+ m.put("key3", 3);
+ m.put("key4", null);
+
+ GridTestUtils.assertThrows(log, new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ cache.putAll(m);
+
+ return null;
+ }
+ }, NullPointerException.class, null);
+
+ m.put("key4", 4);
+
+ cache.putAll(m);
+
+ assertEquals(3, (int)cache.get("key3"));
+ assertEquals(4, (int)cache.get("key4"));
+ }
+
+ assertThrows(log, new Callable<Object>() {
+ @Nullable @Override public Object call() throws Exception {
+ cache.put("key1", null);
+
+ return null;
+ }
+ }, NullPointerException.class, A.NULL_MSG_PREFIX);
+
+ assertThrows(log, new Callable<Object>() {
+ @Nullable @Override public Object call() throws Exception {
+ cache.getAndPut("key1", null);
+
+ return null;
+ }
+ }, NullPointerException.class, A.NULL_MSG_PREFIX);
+
+ assertThrows(log, new Callable<Object>() {
+ @Nullable @Override public Object call() throws Exception {
+ cache.put(null, 1);
+
+ return null;
+ }
+ }, NullPointerException.class, A.NULL_MSG_PREFIX);
+
+ assertThrows(log, new Callable<Object>() {
+ @Nullable @Override public Object call() throws Exception {
+ cache.replace(null, 1);
+
+ return null;
+ }
+ }, NullPointerException.class, A.NULL_MSG_PREFIX);
+
+ assertThrows(log, new Callable<Object>() {
+ @Nullable @Override public Object call() throws Exception {
+ cache.getAndReplace(null, 1);
+
+ return null;
+ }
+ }, NullPointerException.class, A.NULL_MSG_PREFIX);
+
+ assertThrows(log, new Callable<Object>() {
+ @Nullable @Override public Object call() throws Exception {
+ cache.replace("key", null);
+
+ return null;
+ }
+ }, NullPointerException.class, A.NULL_MSG_PREFIX);
+
+ assertThrows(log, new Callable<Object>() {
+ @Nullable @Override public Object call() throws Exception {
+ cache.getAndReplace("key", null);
+
+ return null;
+ }
+ }, NullPointerException.class, A.NULL_MSG_PREFIX);
+
+ assertThrows(log, new Callable<Object>() {
+ @Nullable @Override public Object call() throws Exception {
+ cache.replace(null, 1, 2);
+
+ return null;
+ }
+ }, NullPointerException.class, A.NULL_MSG_PREFIX);
+
+ assertThrows(log, new Callable<Object>() {
+ @Nullable @Override public Object call() throws Exception {
+ cache.replace("key", null, 2);
+
+ return null;
+ }
+ }, NullPointerException.class, A.NULL_MSG_PREFIX);
+
+ assertThrows(log, new Callable<Object>() {
+ @Nullable @Override public Object call() throws Exception {
+ cache.replace("key", 1, null);
+
+ return null;
+ }
+ }, NullPointerException.class, A.NULL_MSG_PREFIX);
+ }
+
+ /**
+ * @throws Exception In case of error.
+ */
+ public void testPutAllAsync() throws Exception {
+ Map<String, Integer> map = F.asMap("key1", 1, "key2", 2);
+
+ IgniteCache<String, Integer> cache = jcache();
+
+ IgniteCache<String, Integer> cacheAsync = cache.withAsync();
+
+ cacheAsync.putAll(map);
+
+ IgniteFuture<?> f1 = cacheAsync.future();
+
+ map.put("key1", 10);
+ map.put("key2", 20);
+
+ cacheAsync.putAll(map);
+
+ IgniteFuture<?> f2 = cacheAsync.future();
+
+ assertNull(f2.get());
+ assertNull(f1.get());
+
+ checkSize(F.asSet("key1", "key2"));
+
+ assert cache.get("key1") == 10;
+ assert cache.get("key2") == 20;
+ }
+
+ /**
+ * @throws Exception In case of error.
+ */
+ public void testGetAndPutIfAbsent() throws Exception {
+ Transaction tx = txShouldBeUsed() ? transactions().txStart() : null;
+
+ IgniteCache<String, Integer> cache = jcache();
+
+ try {
+ assert cache.getAndPutIfAbsent("key", 1) == null;
+
+ assert cache.get("key") != null;
+ assert cache.get("key") == 1;
+
+ assert cache.getAndPutIfAbsent("key", 2) != null;
+ assert cache.getAndPutIfAbsent("key", 2) == 1;
+
+ assert cache.get("key") != null;
+ assert cache.get("key") == 1;
+
+ if (tx != null)
+ tx.commit();
+ }
+ finally {
+ if (tx != null)
+ tx.close();
+ }
+
+ assert cache.getAndPutIfAbsent("key", 2) != null;
+
+ for (int i = 0; i < gridCount(); i++) {
+ info("Peek on node [i=" + i + ", id=" + grid(i).localNode().id() + ", val=" +
+ grid(i).cache(cacheName()).localPeek("key", ONHEAP) + ']');
+ }
+
+ assertEquals((Integer)1, cache.getAndPutIfAbsent("key", 2));
+
+ assert cache.get("key") != null;
+ assert cache.get("key") == 1;
+
+ if (!storeEnabled())
+ return;
+
+ // Check swap.
+ cache.put("key2", 1);
+
+ cache.localEvict(Collections.singleton("key2"));
+
+ if (!isLoadPreviousValue())
+ cache.get("key2");
+
+ assertEquals((Integer)1, cache.getAndPutIfAbsent("key2", 3));
+
+ // Check db.
+ if (storeEnabled() && isLoadPreviousValue() && !isMultiJvm()) {
+ putToStore("key3", 3);
+
+ assertEquals((Integer)3, cache.getAndPutIfAbsent("key3", 4));
+
+ assertEquals((Integer)3, cache.get("key3"));
+ }
+
+ assertEquals((Integer)1, cache.get("key2"));
+
+ cache.localEvict(Collections.singleton("key2"));
+
+ if (!isLoadPreviousValue())
+ cache.get("key2");
+
+ // Same checks inside tx.
+ tx = txShouldBeUsed() ? transactions().txStart() : null;
+
+ try {
+ assertEquals((Integer)1, cache.getAndPutIfAbsent("key2", 3));
+
+ if (tx != null)
+ tx.commit();
+
+ assertEquals((Integer)1, cache.get("key2"));
+ }
+ finally {
+ if (tx != null)
+ tx.close();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testGetAndPutIfAbsentAsync() throws Exception {
+ Transaction tx = txShouldBeUsed() ? transactions().txStart() : null;
+
+ IgniteCache<String, Integer> cache = jcache();
+
+ IgniteCache<String, Integer> cacheAsync = cache.withAsync();
+
+ try {
+ cacheAsync.getAndPutIfAbsent("key", 1);
+
+ IgniteFuture<Integer> fut1 = cacheAsync.future();
+
+ assertNull(fut1.get());
+ assertEquals((Integer)1, cache.get("key"));
+
+ cacheAsync.getAndPutIfAbsent("key", 2);
+
+ IgniteFuture<Integer> fut2 = cacheAsync.future();
+
+ assertEquals((Integer)1, fut2.get());
+ assertEquals((Integer)1, cache.get("key"));
+
+ if (tx != null)
+ tx.commit();
+ }
+ finally {
+ if (tx != null)
+ tx.close();
+ }
+
+ if (!storeEnabled())
+ return;
+
+ // Check swap.
+ cache.put("key2", 1);
+
+ cache.localEvict(Collections.singleton("key2"));
+
+ if (!isLoadPreviousValue())
+ cache.get("key2");
+
+ cacheAsync.getAndPutIfAbsent("key2", 3);
+
+ assertEquals((Integer)1, cacheAsync.<Integer>future().get());
+
+ // Check db.
+ if (storeEnabled() && isLoadPreviousValue() && !isMultiJvm()) {
+ putToStore("key3", 3);
+
+ cacheAsync.getAndPutIfAbsent("key3", 4);
+
+ assertEquals((Integer)3, cacheAsync.<Integer>future().get());
+ }
+
+ cache.localEvict(Collections.singleton("key2"));
+
+ if (!isLoadPreviousValue())
+ cache.get("key2");
+
+ // Same checks inside tx.
+ tx = txShouldBeUsed() ? transactions().txStart() : null;
+
+ try {
+ cacheAsync.getAndPutIfAbsent("key2", 3);
+
+ assertEquals(1, cacheAsync.future().get());
+
+ if (tx != null)
+ tx.commit();
+
+ assertEquals((Integer)1, cache.get("key2"));
+ }
+ finally {
+ if (tx != null)
+ tx.close();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPutIfAbsent() throws Exception {
+ IgniteCache<String, Integer> cache = jcache();
+
+ assertNull(cache.get("key"));
+ assert cache.putIfAbsent("key", 1);
+ assert cache.get("key") != null && cache.get("key") == 1;
+ assert !cache.putIfAbsent("key", 2);
+ assert cache.get("key") != null && cache.get("key") == 1;
+
+ if (!storeEnabled())
+ return;
+
+ // Check swap.
+ cache.put("key2", 1);
+
+ cache.localEvict(Collections.singleton("key2"));
+
+ if (!isLoadPreviousValue())
+ cache.get("key2");
+
+ assertFalse(cache.putIfAbsent("key2", 3));
+
+ // Check db.
+ if (storeEnabled() && isLoadPreviousValue() && !isMultiJvm()) {
+ putToStore("key3", 3);
+
+ assertFalse(cache.putIfAbsent("key3", 4));
+ }
+
+ cache.localEvict(Collections.singleton("key2"));
+
+ if (!isLoadPreviousValue())
+ cache.get("key2");
+
+ // Same checks inside tx.
+ Transaction tx = txShouldBeUsed() ? transactions().txStart() : null;
+
+ try {
+ assertFalse(cache.putIfAbsent("key2", 3));
+
+ if (tx != null)
+ tx.commit();
+
+ assertEquals((Integer)1, cache.get("key2"));
+ }
+ finally {
+ if (tx != null)
+ tx.close();
+ }
+ }
+
+ /**
+ * @throws Exception In case of error.
+ */
+ public void testPutxIfAbsentAsync() throws Exception {
+ if (txShouldBeUsed())
+ checkPutxIfAbsentAsync(true);
+ }
+
+ /**
+ * @throws Exception In case of error.
+ */
+ public void testPutxIfAbsentAsyncNoTx() throws Exception {
+ checkPutxIfAbsentAsync(false);
+ }
+
+ /**
+ * @param inTx In tx flag.
+ * @throws Exception If failed.
+ */
+ private void checkPutxIfAbsentAsync(boolean inTx) throws Exception {
+ IgniteCache<String, Integer> cache = jcache();
+
+ IgniteCache<String, Integer> cacheAsync = cache.withAsync();
+
+ cacheAsync.putIfAbsent("key", 1);
+
+ IgniteFuture<Boolean> fut1 = cacheAsync.future();
+
+ assert fut1.get();
+ assert cache.get("key") != null && cache.get("key") == 1;
+
+ cacheAsync.putIfAbsent("key", 2);
+
+ IgniteFuture<Boolean> fut2 = cacheAsync.future();
+
+ assert !fut2.get();
+ assert cache.get("key") != null && cache.get("key") == 1;
+
+ if (!storeEnabled())
+ return;
+
+ // Check swap.
+ cache.put("key2", 1);
+
+ cache.localEvict(Collections.singleton("key2"));
+
+ if (!isLoadPreviousValue())
+ cache.get("key2");
+
+ cacheAsync.putIfAbsent("key2", 3);
+
+ assertFalse(cacheAsync.<Boolean>future().get());
+
+ // Check db.
+ if (storeEnabled() && isLoadPreviousValue() && !isMultiJvm()) {
+ putToStore("key3", 3);
+
+ cacheAsync.putIfAbsent("key3", 4);
+
+ assertFalse(cacheAsync.<Boolean>future().get());
+ }
+
+ cache.localEvict(Collections.singletonList("key2"));
+
+ if (!isLoadPreviousValue())
+ cache.get("key2");
+
+ // Same checks inside tx.
+ Transaction tx = inTx ? transactions().txStart() : null;
+
+ try {
+ cacheAsync.putIfAbsent("key2", 3);
+
+ assertFalse(cacheAsync.<Boolean>future().get());
+
+ if (storeEnabled() && isLoadPreviousValue() && !isMultiJvm()) {
+ cacheAsync.putIfAbsent("key3", 4);
+
+ assertFalse(cacheAsync.<Boolean>future().get());
+ }
+
+ if (tx != null)
+ tx.commit();
+ }
+ finally {
+ if (tx != null)
+ tx.close();
+ }
+
+ assertEquals((Integer)1, cache.get("key2"));
+
+ if (storeEnabled() && isLoadPreviousValue() && !isMultiJvm())
+ assertEquals((Integer)3, cache.get("key3"));
+ }
+
+ /**
+ * @throws Exception In case of error.
+ */
+ public void testPutIfAbsentAsyncConcurrent() throws Exception {
+ IgniteCache cacheAsync = jcache().withAsync();
+
+ cacheAsync.putIfAbsent("key1", 1);
+
+ IgniteFuture<Boolean> fut1 = cacheAsync.future();
+
+ cacheAsync.putIfAbsent("key2", 2);
+
+ IgniteFuture<Boolean> fut2 = cacheAsync.future();
+
+ assert fut1.get();
+ assert fut2.get();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testGetAndReplace() throws Exception {
+ IgniteCache<String, Integer> cache = jcache();
+
+ cache.put("key", 1);
+
+ assert cache.get("key") == 1;
+
+ info("key 1 -> 2");
+
+ assert cache.getAndReplace("key", 2) == 1;
+
+ assert cache.get("key") == 2;
+
+ assert cache.getAndReplace("wrong", 0) == null;
+
+ assert cache.get("wrong") == null;
+
+ info("key 0 -> 3");
+
+ assert !cache.replace("key", 0, 3);
+
+ assert cache.get("key") == 2;
+
+ info("key 0 -> 3");
+
+ assert !cache.replace("key", 0, 3);
+
+ assert cache.get("key") == 2;
+
+ info("key 2 -> 3");
+
+ assert cache.replace("key", 2, 3);
+
+ assert cache.get("key") == 3;
+
+ if (!storeEnabled())
+ return;
+
+ info("evict key");
+
+ cache.localEvict(Collections.singleton("key"));
+
+ info("key 3 -> 4");
+
+ if (!isLoadPreviousValue())
+ cache.get("key");
+
+ assert cache.replace("key", 3, 4);
+
+ assert cache.get("key") == 4;
+
+ if (storeEnabled() && isLoadPreviousValue() && !isMultiJvm()) {
+ putToStore("key2", 5);
+
+ info("key2 5 -> 6");
+
+ assert cache.replace("key2", 5, 6);
+ }
+
+ for (int i = 0; i < gridCount(); i++) {
+ info("Peek key on grid [i=" + i + ", nodeId=" + grid(i).localNode().id() +
+ ", peekVal=" + grid(i).cache(cacheName()).localPeek("key", ONHEAP) + ']');
+
+ info("Peek key2 on grid [i=" + i + ", nodeId=" + grid(i).localNode().id() +
+ ", peekVal=" + grid(i).cache(cacheName()).localPeek("key2", ONHEAP) + ']');
+ }
+
+ if (storeEnabled() && isLoadPreviousValue() && !isMultiJvm())
+ assertEquals((Integer)6, cache.get("key2"));
+
+ cache.localEvict(Collections.singleton("key"));
+
+ if (!isLoadPreviousValue())
+ cache.get("key");
+
+ Transaction tx = txShouldBeUsed() ? transactions().txStart() : null;
+
+ try {
+ assert cache.replace("key", 4, 5);
+
+ if (tx != null)
+ tx.commit();
+
+ assert cache.get("key") == 5;
+ }
+ finally {
+ if (tx != null)
+ tx.close();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReplace() throws Exception {
+ IgniteCache<String, Integer> cache = jcache();
+
+ cache.put("key", 1);
+
+ assert cache.get("key") == 1;
+
+ assert cache.replace("key", 2);
+
+ assert cache.get("key") == 2;
+
+ assert !cache.replace("wrong", 2);
+
+ if (!storeEnabled())
+ return;
+
+ cache.localEvict(Collections.singleton("key"));
+
+ if (!isLoadPreviousValue())
+ assert cache.get("key") == 2;
+
+ assert cache.replace("key", 4);
+
+ assert cache.get("key") == 4;
+
+ if (storeEnabled() && isLoadPreviousValue() && !isMultiJvm()) {
+ putToStore("key2", 5);
+
+ cache.replace("key2", 6);
+
+ assertEquals((Integer)6, cache.get("key2"));
+ }
+
+ cache.localEvict(Collections.singleton("key"));
+
+ if (!isLoadPreviousValue())
+ assert cache.get("key") == 4;
+
+ Transaction tx = txShouldBeUsed() ? transactions().txStart() : null;
+
+ try {
+ assert cache.replace("key", 5);
+
+ if (tx != null)
+ tx.commit();
+ }
+ finally {
+ if (tx != null)
+ tx.close();
+ }
+
+ assert cache.get("key") == 5;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testGetAndReplaceAsync() throws Exception {
+ IgniteCache<String, Integer> cache = jcache();
+
+ IgniteCache<String, Integer> cacheAsync = cache.withAsync();
+
+ cache.put("key", 1);
+
+ assert cache.get("key") == 1;
+
+ cacheAsync.getAndReplace("key", 2);
+
+ assert cacheAsync.<Integer>future().get() == 1;
+
+ assert cache.get("key") == 2;
+
+ cacheAsync.getAndReplace("wrong", 0);
+
+ assert cacheAsync.future().get() == null;
+
+ assert cache.get("wrong") == null;
+
+ cacheAsync.replace("key", 0, 3);
+
+ assert !cacheAsync.<Boolean>future().get();
+
+ assert cache.get("key") == 2;
+
+ cacheAsync.replace("key", 0, 3);
+
+ assert !cacheAsync.<Boolean>future().get();
+
+ assert cache.get("key") == 2;
+
+ cacheAsync.replace("key", 2, 3);
+
+ assert cacheAsync.<Boolean>future().get();
+
+ assert cache.get("key") == 3;
+
+ if (!storeEnabled())
+ return;
+
+ cache.localEvict(Collections.singleton("key"));
+
+ if (!isLoadPreviousValue())
+ cache.get("key");
+
+ cacheAsync.replace("key", 3, 4);
+
+ assert cacheAsync.<Boolean>future().get();
+
+ assert cache.get("key") == 4;
+
+ if (storeEnabled() && isLoadPreviousValue() && !isMultiJvm()) {
+ putToStore("key2", 5);
+
+ cacheAsync.replace("key2", 5, 6);
+
+ assert cacheAsync.<Boolean>future().get();
+
+ assertEquals((Integer)6, cache.get("key2"));
+ }
+
+ cache.localEvict(Collections.singleton("key"));
+
+ if (!isLoadPreviousValue())
+ cache.get("key");
+
+ Transaction tx = txShouldBeUsed() ? transactions().txStart() : null;
+
+ try {
+ cacheAsync.replace("key", 4, 5);
+
+ assert cacheAsync.<Boolean>future().get();
+
+ if (tx != null)
+ tx.commit();
+ }
+ finally {
+ if (tx != null)
+ tx.close();
+ }
+
+ assert cache.get("key") == 5;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReplacexAsync() throws Exception {
+ IgniteCache<String, Integer> cache = jcache();
+
+ IgniteCache<String, Integer> cacheAsync = cache.withAsync();
+
+ cache.put("key", 1);
+
+ assert cache.get("key") == 1;
+
+ cacheAsync.replace("key", 2);
+
+ assert cacheAsync.<Boolean>future().get();
+
+ info("Finished replace.");
+
+ assertEquals((Integer)2, cache.get("key"));
+
+ cacheAsync.replace("wrond", 2);
+
+ assert !cacheAsync.<Boolean>future().get();
+
+ if (!storeEnabled())
+ return;
+
+ cache.localEvict(Collections.singleton("key"));
+
+ if (!isLoadPreviousValue())
+ cache.get("key");
+
+ cacheAsync.replace("key", 4);
+
+ assert cacheAsync.<Boolean>future().get();
+
+ assert cache.get("key") == 4;
+
+ if (storeEnabled() && isLoadPreviousValue() && !isMultiJvm()) {
+ putToStore("key2", 5);
+
+ cacheAsync.replace("key2", 6);
+
+ assert cacheAsync.<Boolean>future().get();
+
+ assert cache.get("key2") == 6;
+ }
+
+ cache.localEvict(Collections.singleton("key"));
+
+ if (!isLoadPreviousValue())
+ cache.get("key");
+
+ Transaction tx = txShouldBeUsed() ? transactions().txStart() : null;
+
+ try {
+ cacheAsync.replace("key", 5);
+
+ assert cacheAsync.<Boolean>future().get();
+
+ if (tx != null)
+ tx.commit();
+ }
+ finally {
+ if (tx != null)
+ tx.close();
+ }
+
+ assert cache.get("key") == 5;
+ }
+
+ /**
+ * @throws Exception In case of error.
+ */
+ public void testGetAndRemove() throws Exception {
+ IgniteCache<String, Integer> cache = jcache();
+
+ cache.put("key1", 1);
+ cache.put("key2", 2);
+
+ assert !cache.remove("key1", 0);
+ assert cache.get("key1") != null && cache.get("key1") == 1;
+ assert cache.remove("key1", 1);
+ assert cache.get("key1") == null;
+ assert cache.getAndRemove("key2") == 2;
+ assert cache.get("key2") == null;
+ assert cache.getAndRemove("key2") == null;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testGetAndRemoveObject() throws Exception {
+ IgniteCache<String, SerializableObject> cache = ignite(0).cache(cacheName());
+
+ SerializableObject val1 = new SerializableObject(1);
+ SerializableObject val2 = new SerializableObject(2);
+
+ cache.put("key1", val1);
+ cache.put("key2", val2);
+
+ assert !cache.remove("key1", new SerializableObject(0));
+
+ SerializableObject oldVal = cache.get("key1");
+
+ assert oldVal != null && F.eq(val1, oldVal);
+
+ assert cache.remove("key1");
+
+ assert cache.get("key1") == null;
+
+ SerializableObject oldVal2 = cache.getAndRemove("key2");
+
+ assert F.eq(val2, oldVal2);
+
+ assert cache.get("key2") == null;
+ assert cache.getAndRemove("key2") == null;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testGetAndPutSerializableObject() throws Exception {
+ IgniteCache<String, SerializableObject> cache = ignite(0).cache(cacheName());
+
+ SerializableObject val1 = new SerializableObject(1);
+ SerializableObject val2 = new SerializableObject(2);
+
+ cache.put("key1", val1);
+
+ SerializableObject oldVal = cache.get("key1");
+
+ assertEquals(val1, oldVal);
+
+ oldVal = cache.getAndPut("key1", val2);
+
+ assertEquals(val1, oldVal);
+
+ SerializableObject updVal = cache.get("key1");
+
+ assertEquals(val2, updVal);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testDeletedEntriesFlag() throws Exception {
+ if (cacheMode() != LOCAL && cacheMode() != REPLICATED && memoryMode() != OFFHEAP_TIERED) {
+ final int cnt = 3;
+
+ IgniteCache<String, Integer> cache = jcache();
+
+ for (int i = 0; i < cnt; i++)
+ cache.put(String.valueOf(i), i);
+
+ for (int i = 0; i < cnt; i++)
+ cache.remove(String.valueOf(i));
+
+ for (int g = 0; g < gridCount(); g++)
+ executeOnLocalOrRemoteJvm(g, new CheckEntriesDeletedTask(cnt, cacheName()));
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testRemoveLoad() throws Exception {
+ if (isMultiJvm())
+ fail("https://issues.apache.org/jira/browse/IGNITE-1088");
+
+ if (!storeEnabled())
+ return;
+
+ int cnt = 10;
+
+ Set<String> keys = new HashSet<>();
+
+ for (int i = 0; i < cnt; i++)
+ keys.add(String.valueOf(i));
+
+ jcache().removeAll(keys);
+
+ for (String key : keys)
+ putToStore(key, Integer.parseInt(key));
+
+ for (int g = 0; g < gridCount(); g++)
+ grid(g).cache(cacheName()).localLoadCache(null);
+
+ for (int g = 0; g < gridCount(); g++) {
+ for (int i = 0; i < cnt; i++) {
+ String key = String.valueOf(i);
+
+ if (grid(0).affinity(cacheName()).mapKeyToPrimaryAndBackups(key).contains(grid(g).localNode()))
+ assertEquals((Integer)i, peek(jcache(g), key));
+ else
+ assertNull(peek(jcache(g), key));
+ }
+ }
+ }
+
+ /**
+ * @throws Exception In case of error.
+ */
+ public void testRemoveAsync() throws Exception {
+ IgniteCache<String, Integer> cache = jcache();
+
+ IgniteCache<String, Integer> cacheAsync = cache.withAsync();
+
+ cache.put("key1", 1);
+ cache.put("key2", 2);
+
+ cacheAsync.remove("key1", 0);
+
+ assert !cacheAsync.<Boolean>future().get();
+
+ assert cache.get("key1") != null && cache.get("key1") == 1;
+
+ cacheAsync.remove("key1", 1);
+
+ assert cacheAsync.<Boolean>future().get();
+
+ assert cache.get("key1") == null;
+
+ cacheAsync.getAndRemove("key2");
+
+ assert cacheAsync.<Integer>future().get() == 2;
+
+ assert cache.get("key2") == null;
+
+ cacheAsync.getAndRemove("key2");
+
+ assert cacheAsync.future().get() == null;
+ }
+
+ /**
+ * @throws Exception In case of error.
+ */
+ public void testRemove() throws Exception {
+ IgniteCache<String, Integer> cache = jcache();
+
+ cache.put("key1", 1);
+
+ assert cache.remove("key1");
+ assert cache.get("key1") == null;
+ assert !cache.remove("key1");
+ }
+
+ /**
+ * @throws Exception In case of error.
+ */
+ public void testRemovexAsync() throws Exception {
+ IgniteCache<String, Integer> cache = jcache();
+
+ IgniteCache<String, Integer> cacheAsync = cache.withAsync();
+
+ cache.put("key1", 1);
+
+ cacheAsync.remove("key1");
+
+ assert cacheAsync.<Boolean>future().get();
+
+ assert cache.get("key1") == null;
+
+ cacheAsync.remove("key1");
+
+ assert !cacheAsync.<Boolean>future().get();
+ }
+
+ /**
+ * @throws Exception In case of error.
+ */
+ public void testGlobalRemoveAll() throws Exception {
+ globalRemoveAll(false);
+ }
+
+ /**
+ * @throws Exception In case of error.
+ */
+ public void testGlobalRemoveAllAsync() throws Exception {
+ globalRemoveAll(true);
+ }
+
+ /**
+ * @param async If {@code true} uses asynchronous operation.
+ * @throws Exception In case of error.
+ */
+ private void globalRemoveAll(boolean async) throws Exception {
+ IgniteCache<String, Integer> cache = jcache();
+
+ cache.put("key1", 1);
+ cache.put("key2", 2);
+ cache.put("key3", 3);
+
+ checkSize(F.asSet("key1", "key2", "key3"));
+
+ atomicClockModeDelay(cache);
+
+ IgniteCache<String, Integer> asyncCache = cache.withAsync();
+
+ if (async) {
+ asyncCache.removeAll(F.asSet("key1", "key2"));
+
+ asyncCache.future().get();
+ }
+ else
+ cache.removeAll(F.asSet("key1", "key2"));
+
+ checkSize(F.asSet("key3"));
+
+ checkContainsKey(false, "key1");
+ checkContainsKey(false, "key2");
+ checkContainsKey(true, "key3");
+
+ // Put values again.
+ cache.put("key1", 1);
+ cache.put("key2", 2);
+ cache.put("key3", 3);
+
+ atomicClockModeDelay(cache);
+
+ if (async) {
+ IgniteCache asyncCache0 = jcache(gridCount() > 1 ? 1 : 0).withAsync();
+
+ asyncCache0.removeAll();
+
+ asyncCache0.future().get();
+ }
+ else
+ jcache(gridCount() > 1 ? 1 : 0).removeAll();
+
+ assertEquals(0, cache.localSize());
+ long entryCnt = hugeRemoveAllEntryCount();
+
+ for (int i = 0; i < entryCnt; i++)
+ cache.put(String.valueOf(i), i);
+
+ for (int i = 0; i < entryCnt; i++)
+ assertEquals(Integer.valueOf(i), cache.get(String.valueOf(i)));
+
+ atomicClockModeDelay(cache);
+
+ if (async) {
+ asyncCache.removeAll();
+
+ asyncCache.future().get();
+ }
+ else
+ cache.removeAll();
+
+ for (int i = 0; i < entryCnt; i++)
+ assertNull(cache.get(String.valueOf(i)));
+ }
+
+ /**
+ * @return Count of entries to be removed in removeAll() test.
+ */
+ protected long hugeRemoveAllEntryCount() {
+ return 1000L;
+ }
+
+ /**
+ * @throws Exception In case of error.
+ */
+ public void testRemoveAllWithNulls() throws Exception {
+ final IgniteCache<String, Integer> cache = jcache();
+
+ final Set<String> c = new LinkedHashSet<>();
+
+ c.add("key1");
+ c.add(null);
+
+ GridTestUtils.assertThrows(log, new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ cache.removeAll(c);
+
+ return null;
+ }
+ }, NullPointerException.class, null);
+
+ assertEquals(0, jcache().localSize());
+
+ GridTestUtils.assertThrows(log, new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ cache.removeAll(null);
+
+ return null;
+ }
+ }, NullPointerException.class, null);
+
+ GridTestUtils.assertThrows(log, new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ cache.remove(null);
+
+ return null;
+ }
+ }, NullPointerException.class, null);
+
+ GridTestUtils.assertThrows(log, new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ cache.getAndRemove(null);
+
+ return null;
+ }
+ }, NullPointerException.class, null);
+
+ GridTestUtils.assertThrows(log, new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ cache.remove("key1", null);
+
+ return null;
+ }
+ }, NullPointerException.class, null);
+ }
+
+ /**
+ * @throws Exception In case of error.
+ */
+ public void testRemoveAllDuplicates() throws Exception {
+ jcache().removeAll(ImmutableSet.of("key1", "key1", "key1"));
+ }
+
+ /**
+ * @throws Exception In case of error.
+ */
+ public void testRemoveAllDuplicatesTx() throws Exception {
+ if (txShouldBeUsed()) {
+ try (Transaction tx = transactions().txStart()) {
+ jcache().removeAll(ImmutableSet.of("key1", "key1", "key1"));
+
+ tx.commit();
+ }
+ }
+ }
+
+ /**
+ * @throws Exception In case of error.
+ */
+ public void testRemoveAllEmpty() throws Exception {
+ jcache().removeAll();
+ }
+
+ /**
+ * @throws Exception In case of error.
+ */
+ public void testRemoveAllAsync() throws Exception {
+ IgniteCache<String, Integer> cache = jcache();
+
+ IgniteCache<String, Integer> cacheAsync = cache.withAsync();
+
+ cache.put("key1", 1);
+ cache.put("key2", 2);
+ cache.put("key3", 3);
+
+ checkSize(F.asSet("key1", "key2", "key3"));
+
+ cacheAsync.removeAll(F.asSet("key1", "key2"));
+
+ assertNull(cacheAsync.future().get());
+
+ checkSize(F.asSet("key3"));
+
+ checkContainsKey(false, "key1");
+ checkContainsKey(false, "key2");
+ checkContainsKey(true, "key3");
+ }
+
+ /**
+ * @throws Exception In case of error.
+ */
+ public void testLoadAll() throws Exception {
+ if (!storeEnabled())
+ return;
+
+ IgniteCache<String, Integer> cache = jcache();
+
+ Set<String> keys = new HashSet<>(primaryKeysForCache(2));
+
+ for (String key : keys)
+ assertNull(cache.localPeek(key, ONHEAP));
+
+ Map<String, Integer> vals = new HashMap<>();
+
+ int i = 0;
+
+ for (String key : keys) {
+ cache.put(key, i);
+
+ vals.put(key, i);
+
+ i++;
+ }
+
+ for (String key : keys)
+ assertEquals(vals.get(key), peek(cache, key));
+
+ cache.clear();
+
+ for (String key : keys)
+ assertNull(peek(cache, key));
+
+ loadAll(cache, keys, true);
+
+ for (String key : keys)
+ assertEquals(vals.get(key), peek(cache, key));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testRemoveAfterClear() throws Exception {
+ IgniteEx ignite = grid(0);
+
+ boolean affNode = ignite.context().cache().internalCache(cacheName()).context().affinityNode();
+
+ if (!affNode) {
+ if (gridCount() < 2)
+ return;
+
+ ignite = grid(1);
+ }
+
+ IgniteCache<Integer, Integer> cache = ignite.cache(cacheName());
+
+ int key = 0;
+
+ Collection<Integer> keys = new ArrayList<>();
+
+ for (int k = 0; k < 2; k++) {
+ while (!ignite.affinity(cacheName()).isPrimary(ignite.localNode(), key))
+ key++;
+
+ keys.add(key);
+
+ key++;
+ }
+
+ info("Keys: " + keys);
+
+ for (Integer k : keys)
+ cache.put(k, k);
+
+ cache.clear();
+
+ for (int g = 0; g < gridCount(); g++) {
+ Ignite
<TRUNCATED>