You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/09 08:32:59 UTC
[10/24] incubator-ignite git commit: ignite-545: merge from
ignite-sprint-6
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheRemoveAllSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheRemoveAllSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheRemoveAllSelfTest.java
new file mode 100644
index 0000000..f5de96f
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheRemoveAllSelfTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.testframework.*;
+
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+/**
+ * Test remove all method.
+ */
+public class CacheRemoveAllSelfTest extends GridCacheAbstractSelfTest {
+ /** {@inheritDoc} */
+ @Override protected long getTestTimeout() {
+ return 60000;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected int gridCount() {
+ return 4;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testRemoveAll() throws Exception {
+ IgniteCache<Integer, String> cache = grid(0).cache(null);
+
+ for (int i = 0; i < 10_000; ++i)
+ cache.put(i, "val");
+
+ final AtomicInteger igniteId = new AtomicInteger(gridCount());
+
+ IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ for (int i = 0; i < 2; ++i)
+ startGrid(igniteId.getAndIncrement());
+
+ return true;
+ }
+ }, 3, "start-node-thread");
+
+ cache.removeAll();
+
+ fut.get();
+
+ U.sleep(5000);
+
+ for (int i = 0; i < igniteId.get(); ++i) {
+ IgniteCache locCache = grid(i).cache(null);
+
+ assertEquals("Local size: " + locCache.localSize() + "\n" +
+ "On heap: " + locCache.localSize(CachePeekMode.ONHEAP) + "\n" +
+ "Off heap: " + locCache.localSize(CachePeekMode.OFFHEAP) + "\n" +
+ "Swap: " + locCache.localSize(CachePeekMode.SWAP) + "\n" +
+ "Primary: " + locCache.localSize(CachePeekMode.PRIMARY) + "\n" +
+ "Backup: " + locCache.localSize(CachePeekMode.BACKUP),
+ 0, locCache.localSize());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFailoverSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFailoverSelfTest.java
index 4cb7365..5d9ad35 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFailoverSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFailoverSelfTest.java
@@ -25,12 +25,12 @@ import org.apache.ignite.internal.cluster.*;
import org.apache.ignite.internal.util.lang.*;
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.spi.discovery.tcp.*;
import org.apache.ignite.testframework.*;
import org.apache.ignite.transactions.*;
import org.jetbrains.annotations.*;
import javax.cache.*;
-import java.util.*;
import java.util.concurrent.atomic.*;
import static org.apache.ignite.cache.CacheRebalanceMode.*;
@@ -70,6 +70,12 @@ public abstract class GridCacheAbstractFailoverSelfTest extends GridCacheAbstrac
cfg.getTransactionConfiguration().setTxSerializableEnabled(true);
+ TcpDiscoverySpi discoSpi = (TcpDiscoverySpi)cfg.getDiscoverySpi();
+
+ discoSpi.setSocketTimeout(10_000);
+ discoSpi.setAckTimeout(10_000);
+ discoSpi.setNetworkTimeout(10_000);
+
return cfg;
}
@@ -177,11 +183,11 @@ public abstract class GridCacheAbstractFailoverSelfTest extends GridCacheAbstrac
info("Run topology change.");
try {
+ String name = "new-node-" + Thread.currentThread().getName();
+
for (int i = 0; i < TOP_CHANGE_CNT && err.get() == null; i++) {
info("Topology change " + i);
- String name = UUID.randomUUID().toString();
-
try {
final Ignite g = startGrid(name);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
index 4dc371c..70d8f9c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
@@ -50,6 +50,7 @@ import static java.util.concurrent.TimeUnit.*;
import static org.apache.ignite.cache.CacheAtomicityMode.*;
import static org.apache.ignite.cache.CacheMemoryMode.*;
import static org.apache.ignite.cache.CacheMode.*;
+import static org.apache.ignite.cache.CachePeekMode.*;
import static org.apache.ignite.events.EventType.*;
import static org.apache.ignite.testframework.GridTestUtils.*;
import static org.apache.ignite.transactions.TransactionConcurrency.*;
@@ -121,15 +122,15 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
/**
* @return {@code True} if values should be stored off-heap.
*/
- protected boolean offHeapValues() {
- return false;
+ protected CacheMemoryMode memoryMode() {
+ return ONHEAP_TIERED;
}
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
- if (offHeapValues())
+ if (memoryMode() == OFFHEAP_TIERED || memoryMode() == OFFHEAP_VALUES)
cfg.setSwapSpaceSpi(new GridTestSwapSpaceSpi());
return cfg;
@@ -139,8 +140,8 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
@Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception {
CacheConfiguration ccfg = super.cacheConfiguration(gridName);
- if (offHeapValues()) {
- ccfg.setMemoryMode(CacheMemoryMode.OFFHEAP_VALUES);
+ if (memoryMode() == OFFHEAP_TIERED || memoryMode() == OFFHEAP_VALUES) {
+ ccfg.setMemoryMode(memoryMode());
ccfg.setOffHeapMaxMemory(0);
}
@@ -272,7 +273,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
// Will actually delete entry from map.
CU.invalidate(jcache(i), "key0");
- assertNull("Failed check for grid: " + i, jcache(i).localPeek("key0", CachePeekMode.ONHEAP));
+ assertNull("Failed check for grid: " + i, jcache(i).localPeek("key0", ONHEAP));
Collection<String> keysCol = mapped.get(grid(i).localNode());
@@ -288,20 +289,20 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
if (ctx.affinity().localNode(key, new AffinityTopologyVersion(ctx.discovery().topologyVersion())))
sum++;
- assertEquals("Incorrect key size on cache #" + i, sum, jcache(i).localSize(CachePeekMode.ALL));
+ assertEquals("Incorrect key size on cache #" + i, sum, jcache(i).localSize(ALL));
}
for (int i = 0; i < gridCount(); i++) {
Collection<String> keysCol = mapped.get(grid(i).localNode());
assertEquals("Failed check for grid: " + i, !F.isEmpty(keysCol) ? keysCol.size() : 0,
- jcache(i).localSize(CachePeekMode.PRIMARY));
+ jcache(i).localSize(PRIMARY));
}
int globalPrimarySize = map.size();
for (int i = 0; i < gridCount(); i++)
- assertEquals(globalPrimarySize, jcache(i).size(CachePeekMode.PRIMARY));
+ assertEquals(globalPrimarySize, jcache(i).size(PRIMARY));
int times = 1;
@@ -313,7 +314,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
int globalSize = globalPrimarySize * times;
for (int i = 0; i < gridCount(); i++)
- assertEquals(globalSize, jcache(i).size(CachePeekMode.ALL));
+ assertEquals(globalSize, jcache(i).size(ALL));
}
/**
@@ -735,7 +736,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
assertNull(cache.get("key3"));
for (int i = 0; i < gridCount(); i++)
- assertNull("Failed for cache: " + i, jcache(i).localPeek("key3", CachePeekMode.ONHEAP));
+ assertNull("Failed for cache: " + i, jcache(i).localPeek("key3", ONHEAP));
cache.remove("key1");
cache.put("key2", 1);
@@ -750,7 +751,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
assertNull(cache.get("key3"));
for (int i = 0; i < gridCount(); i++)
- assertNull(jcache(i).localPeek("key3", CachePeekMode.ONHEAP));
+ assertNull(jcache(i).localPeek("key3", ONHEAP));
}
/**
@@ -789,7 +790,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
assertNull(cache.get("key3"));
for (int i = 0; i < gridCount(); i++)
- assertNull("Failed for cache: " + i, jcache(i).localPeek("key3", CachePeekMode.ONHEAP));
+ assertNull("Failed for cache: " + i, jcache(i).localPeek("key3", ONHEAP));
cache.remove("key1");
cache.put("key2", 1);
@@ -804,7 +805,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
assertNull(cache.get("key3"));
for (int i = 0; i < gridCount(); i++)
- assertNull(jcache(i).localPeek("key3", CachePeekMode.ONHEAP));
+ assertNull(jcache(i).localPeek("key3", ONHEAP));
}
/**
@@ -874,9 +875,9 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
Map<String, EntryProcessorResult<String>> res = cache.invokeAll(F.asSet("key1", "key2", "key3"), RMV_PROCESSOR);
for (int i = 0; i < gridCount(); i++) {
- assertNull(jcache(i).localPeek("key1", CachePeekMode.ONHEAP));
- assertNull(jcache(i).localPeek("key2", CachePeekMode.ONHEAP));
- assertNull(jcache(i).localPeek("key3", CachePeekMode.ONHEAP));
+ assertNull(jcache(i).localPeek("key1", ONHEAP));
+ assertNull(jcache(i).localPeek("key2", ONHEAP));
+ assertNull(jcache(i).localPeek("key3", ONHEAP));
}
assertEquals("null", res.get("key1").get());
@@ -1232,7 +1233,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
assertNull(cache.get("key3"));
for (int i = 0; i < gridCount(); i++)
- assertNull(jcache(i).localPeek("key3", CachePeekMode.ONHEAP));
+ assertNull(jcache(i).localPeek("key3", ONHEAP));
}
/**
@@ -1271,7 +1272,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
assertNull(cache.get("k1"));
for (int i = 0; i < gridCount(); i++)
- assertNull(jcache(i).localPeek("k1", CachePeekMode.ONHEAP));
+ assertNull(jcache(i).localPeek("k1", ONHEAP));
final EntryProcessor<String, Integer, Integer> errProcessor = new EntryProcessor<String, Integer, Integer>() {
@Override public Integer process(MutableEntry<String, Integer> e, Object... args) {
@@ -1692,7 +1693,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
for (int i = 0; i < gridCount(); i++) {
info("Peek on node [i=" + i + ", id=" + grid(i).localNode().id() + ", val=" +
- grid(i).cache(null).localPeek("key", CachePeekMode.ONHEAP) + ']');
+ grid(i).cache(null).localPeek("key", ONHEAP) + ']');
}
assertEquals((Integer)1, cache.getAndPutIfAbsent("key", 2));
@@ -2001,10 +2002,10 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
for (int i = 0; i < gridCount(); i++) {
info("Peek key on grid [i=" + i + ", nodeId=" + grid(i).localNode().id() +
- ", peekVal=" + grid(i).cache(null).localPeek("key", CachePeekMode.ONHEAP) + ']');
+ ", peekVal=" + grid(i).cache(null).localPeek("key", ONHEAP) + ']');
info("Peek key2 on grid [i=" + i + ", nodeId=" + grid(i).localNode().id() +
- ", peekVal=" + grid(i).cache(null).localPeek("key2", CachePeekMode.ONHEAP) + ']');
+ ", peekVal=" + grid(i).cache(null).localPeek("key2", ONHEAP) + ']');
}
assertEquals((Integer)6, cache.get("key2"));
@@ -2233,7 +2234,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
* @throws Exception If failed.
*/
public void testDeletedEntriesFlag() throws Exception {
- if (cacheMode() != LOCAL && cacheMode() != REPLICATED) {
+ if (cacheMode() != LOCAL && cacheMode() != REPLICATED && memoryMode() != OFFHEAP_TIERED) {
int cnt = 3;
IgniteCache<String, Integer> cache = jcache();
@@ -2288,9 +2289,9 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
String key = String.valueOf(i);
if (grid(0).affinity(null).mapKeyToPrimaryAndBackups(key).contains(grid(g).localNode()))
- assertEquals((Integer)i, jcache(g).localPeek(key, CachePeekMode.ONHEAP));
+ assertEquals((Integer)i, peek(jcache(g), key));
else
- assertNull(jcache(g).localPeek(key, CachePeekMode.ONHEAP));
+ assertNull(peek(jcache(g), key));
}
}
}
@@ -2475,6 +2476,8 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
}
}, NullPointerException.class, null);
+ assertEquals(0, grid(0).cache(null).localSize());
+
GridTestUtils.assertThrows(log, new Callable<Void>() {
@Override public Void call() throws Exception {
cache.removeAll(null);
@@ -2569,7 +2572,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
Set<String> keys = new HashSet<>(primaryKeysForCache(cache, 2));
for (String key : keys)
- assertNull(cache.localPeek(key, CachePeekMode.ONHEAP));
+ assertNull(cache.localPeek(key, ONHEAP));
Map<String, Integer> vals = new HashMap<>();
@@ -2584,17 +2587,17 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
}
for (String key : keys)
- assertEquals(vals.get(key), cache.localPeek(key, CachePeekMode.ONHEAP));
+ assertEquals(vals.get(key), peek(cache, key));
cache.clear();
for (String key : keys)
- assertNull(cache.localPeek(key, CachePeekMode.ONHEAP));
+ assertNull(peek(cache, key));
loadAll(cache, keys, true);
for (String key : keys)
- assertEquals(vals.get(key), cache.localPeek(key, CachePeekMode.ONHEAP));
+ assertEquals(vals.get(key), peek(cache, key));
}
/**
@@ -2703,7 +2706,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
try {
cache.clear();
- assertEquals(vals.get(first), peek(cache, first));
+ assertEquals(vals.get(first), cache.localPeek(first, ONHEAP));
}
finally {
lock.unlock();
@@ -2734,14 +2737,14 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
cache.localEvict(Sets.union(ImmutableSet.of("key1", "key2"), keys));
- assert cache.localSize(CachePeekMode.ONHEAP) == 0;
+ assert cache.localSize(ONHEAP) == 0;
cache.clear();
cache.localPromote(ImmutableSet.of("key2", "key1"));
- assert cache.localPeek("key1", CachePeekMode.ONHEAP) == null;
- assert cache.localPeek("key2", CachePeekMode.ONHEAP) == null;
+ assert cache.localPeek("key1", ONHEAP) == null;
+ assert cache.localPeek("key2", ONHEAP) == null;
}
/**
@@ -2906,13 +2909,13 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
Ignite ignite = primaryIgnite("key");
IgniteCache<String, Integer> cache = ignite.cache(null);
- assert cache.localPeek("key", CachePeekMode.ONHEAP) == null;
+ assert peek(cache, "key") == null;
cache.put("key", 1);
cache.replace("key", 2);
- assert cache.localPeek("key", CachePeekMode.ONHEAP) == 2;
+ assertEquals(2, peek(cache, "key").intValue());
}
/**
@@ -2944,7 +2947,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
cache.remove("key");
assertNull(cache.get("key")); // localPeek ignores transactions.
- assertNotNull(cache.localPeek("key")); // localPeek ignores transactions.
+ assertNotNull(peek(cache, "key")); // localPeek ignores transactions.
tx.commit();
}
@@ -2960,7 +2963,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
cache.put("key", 1);
cache.remove("key");
- assertNull(cache.localPeek("key", CachePeekMode.ONHEAP));
+ assertNull(peek(cache, "key"));
}
/**
@@ -2986,11 +2989,11 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
// Expired entry should not be swapped.
cache.localEvict(Collections.singleton(key));
- assertNull(cache.localPeek(key, CachePeekMode.ONHEAP));
+ assertNull(peek(cache, "key"));
cache.localPromote(Collections.singleton(key));
- assertNull(cache.localPeek(key, CachePeekMode.ONHEAP));
+ assertNull(cache.localPeek(key, ONHEAP));
assertTrue(cache.localSize() == 0);
@@ -3021,7 +3024,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
c.put(key, 1);
- assertEquals(Integer.valueOf(1), c.localPeek(key, CachePeekMode.ONHEAP));
+ assertEquals(Integer.valueOf(1), peek(c, key));
int ttl = 500;
@@ -3031,7 +3034,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
Thread.sleep(ttl + 100);
- assert c.localPeek(key, CachePeekMode.ONHEAP) == null;
+ assert peek(c, key) == null;
assert c.localSize() == 0 : "Cache is not empty.";
}
@@ -3058,7 +3061,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
Thread.sleep(ttl + 100);
- assertNull(c.localPeek(key, CachePeekMode.ONHEAP));
+ assertNull(c.localPeek(key, ONHEAP));
assert c.localSize() == 0;
}
@@ -3092,6 +3095,9 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
* @throws Exception If failed.
*/
private void checkTtl(boolean inTx, boolean oldEntry) throws Exception {
+ if (memoryMode() == OFFHEAP_TIERED)
+ return;
+
int ttl = 1000;
final ExpiryPolicy expiry = new TouchedExpiryPolicy(new Duration(MILLISECONDS, ttl));
@@ -3361,15 +3367,15 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
cache.put(key2, 2);
cache.put(key3, 3);
- assert cache.localPeek(key1, CachePeekMode.ONHEAP) == 1;
- assert cache.localPeek(key2, CachePeekMode.ONHEAP) == 2;
- assert cache.localPeek(key3, CachePeekMode.ONHEAP) == 3;
+ assert peek(cache, key1) == 1;
+ assert peek(cache, key2) == 2;
+ assert peek(cache, key3) == 3;
cache.localEvict(F.asList(key1, key2));
- assert cache.localPeek(key1, CachePeekMode.ONHEAP) == null;
- assert cache.localPeek(key2, CachePeekMode.ONHEAP) == null;
- assert cache.localPeek(key3, CachePeekMode.ONHEAP) == 3;
+ assert cache.localPeek(key1, ONHEAP) == null;
+ assert cache.localPeek(key2, ONHEAP) == null;
+ assert peek(cache, key3) == 3;
loadAll(cache, ImmutableSet.of(key1, key2), true);
@@ -3391,7 +3397,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
* @throws Exception If failed.
*/
public void testUnswap() throws Exception {
- GridCacheAdapter<String, Integer> cache = ((IgniteKernal)grid(0)).internalCache();
+ IgniteCache<String, Integer> cache = grid(0).cache(null);
List<String> keys = primaryKeysForCache(jcache(), 3);
@@ -3408,17 +3414,11 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
Collection<String> locKeys = new HashSet<>();
- if (cache.context().affinityNode()) {
- locKeys.addAll(cache.primaryKeySet());
+ if (grid(0).context().cache().cache(null).context().affinityNode()) {
+ Iterable<Cache.Entry<String, Integer>> entries = cache.localEntries(PRIMARY, BACKUP);
- info("Local keys (primary): " + locKeys);
-
- locKeys.addAll(cache.keySet(new CacheEntryPredicateAdapter() {
- @Override public boolean apply(GridCacheEntryEx e) {
- return grid(0).affinity(null).isBackup(grid(0).localNode(),
- e.key().value(e.context().cacheObjectContext(), false));
- }
- }));
+ for (Cache.Entry<String, Integer> entry : entries)
+ locKeys.add(entry.getKey());
info("Local keys (primary + backup): " + locKeys);
}
@@ -3444,57 +3444,67 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
}, EVT_CACHE_OBJECT_SWAPPED, EVT_CACHE_OBJECT_UNSWAPPED);
}
- cache.evictAll(Collections.singleton(k2));
- cache.evictAll(Collections.singleton(k3));
+ cache.localEvict(F.asList(k2, k3));
- assertNotNull(cache.localPeek(k1, new CachePeekMode[] {CachePeekMode.ONHEAP, CachePeekMode.OFFHEAP}, null));
- assertNull(cache.localPeek(k2, new CachePeekMode[] {CachePeekMode.ONHEAP, CachePeekMode.OFFHEAP}, null));
- assertNull(cache.localPeek(k3, new CachePeekMode[] {CachePeekMode.ONHEAP, CachePeekMode.OFFHEAP}, null));
+ if (memoryMode() == OFFHEAP_TIERED) {
+ assertNotNull(cache.localPeek(k1, ONHEAP, OFFHEAP));
+ assertNotNull(cache.localPeek(k2, ONHEAP, OFFHEAP));
+ assertNotNull(cache.localPeek(k3, ONHEAP, OFFHEAP));
+ }
+ else {
+ assertNotNull(cache.localPeek(k1, ONHEAP, OFFHEAP));
+ assertNull(cache.localPeek(k2, ONHEAP, OFFHEAP));
+ assertNull(cache.localPeek(k3, ONHEAP, OFFHEAP));
+ }
int cnt = 0;
if (locKeys.contains(k2)) {
- assertNull(cache.localPeek(k2, ONHEAP_PEEK_MODES, null));
+ assertNull(cache.localPeek(k2, ONHEAP_PEEK_MODES));
- cache.promoteAll(Collections.singleton(k2));
+ cache.localPromote(Collections.singleton(k2));
- assertEquals((Integer) 2, cache.localPeek(k2, ONHEAP_PEEK_MODES, null));
+ assertEquals((Integer) 2, cache.localPeek(k2, ONHEAP_PEEK_MODES));
cnt++;
}
else {
- cache.promoteAll(Collections.singleton(k2));
+ cache.localPromote(Collections.singleton(k2));
- assertNull(cache.localPeek(k2, ONHEAP_PEEK_MODES, null));
+ assertNull(cache.localPeek(k2, ONHEAP_PEEK_MODES));
}
if (locKeys.contains(k3)) {
- assertNull(cache.localPeek(k3, ONHEAP_PEEK_MODES, null));
+ assertNull(cache.localPeek(k3, ONHEAP_PEEK_MODES));
- cache.promoteAll(Collections.singleton(k3));
+ cache.localPromote(Collections.singleton(k3));
- assertEquals((Integer)3, cache.localPeek(k3, ONHEAP_PEEK_MODES, null));
+ assertEquals((Integer)3, cache.localPeek(k3, ONHEAP_PEEK_MODES));
cnt++;
}
else {
- cache.promoteAll(Collections.singleton(k3));
+ cache.localPromote(Collections.singleton(k3));
- assertNull(cache.localPeek(k3, ONHEAP_PEEK_MODES, null));
+ assertNull(cache.localPeek(k3, ONHEAP_PEEK_MODES));
}
- assertEquals(cnt, swapEvts.get());
- assertEquals(cnt, unswapEvts.get());
+ if (memoryMode() != OFFHEAP_TIERED) {
+ assertEquals(cnt, swapEvts.get());
+ assertEquals(cnt, unswapEvts.get());
+ }
- cache.evictAll(Collections.singleton(k1));
+ cache.localEvict(Collections.singleton(k1));
assertEquals((Integer)1, cache.get(k1));
if (locKeys.contains(k1))
cnt++;
- assertEquals(cnt, swapEvts.get());
- assertEquals(cnt, unswapEvts.get());
+ if (memoryMode() != OFFHEAP_TIERED) {
+ assertEquals(cnt, swapEvts.get());
+ assertEquals(cnt, unswapEvts.get());
+ }
cache.clear();
@@ -3506,14 +3516,21 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
swapEvts.set(0);
unswapEvts.set(0);
- cache.evictAll(Collections.singleton(k2));
- cache.evictAll(Collections.singleton(k3));
+ cache.localEvict(Collections.singleton(k2));
+ cache.localEvict(Collections.singleton(k3));
- assertNotNull(cache.localPeek(k1, new CachePeekMode[] {CachePeekMode.ONHEAP, CachePeekMode.OFFHEAP}, null));
- assertNull(cache.localPeek(k2, new CachePeekMode[] {CachePeekMode.ONHEAP, CachePeekMode.OFFHEAP}, null));
- assertNull(cache.localPeek(k3, new CachePeekMode[] {CachePeekMode.ONHEAP, CachePeekMode.OFFHEAP}, null));
+ if (memoryMode() == OFFHEAP_TIERED) {
+ assertNotNull(cache.localPeek(k1, ONHEAP, OFFHEAP));
+ assertNotNull(cache.localPeek(k2, ONHEAP, OFFHEAP));
+ assertNotNull(cache.localPeek(k3, ONHEAP, OFFHEAP));
+ }
+ else {
+ assertNotNull(cache.localPeek(k1, ONHEAP, OFFHEAP));
+ assertNull(cache.localPeek(k2, ONHEAP, OFFHEAP));
+ assertNull(cache.localPeek(k3, ONHEAP, OFFHEAP));
+ }
- cache.promoteAll(F.asList(k2, k3));
+ cache.localPromote(F.asSet(k2, k3));
cnt = 0;
@@ -3523,8 +3540,10 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
if (locKeys.contains(k3))
cnt++;
- assertEquals(cnt, swapEvts.get());
- assertEquals(cnt, unswapEvts.get());
+ if (memoryMode() != OFFHEAP_TIERED) {
+ assertEquals(cnt, swapEvts.get());
+ assertEquals(cnt, unswapEvts.get());
+ }
}
/**
@@ -3557,7 +3576,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
Thread.sleep(ttl + 100);
// Peek will actually remove entry from cache.
- assertNull(cache.localPeek(key, CachePeekMode.ONHEAP));
+ assertNull(cache.localPeek(key, ONHEAP));
assert cache.localSize() == 0;
@@ -3654,6 +3673,13 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
assertTrue(cache.remove("key" + i));
}
});
+
+ CU.inTx(ignite(0), jcache(), concurrency, isolation, new CIX1<IgniteCache<String, Integer>>() {
+ @Override public void applyx(IgniteCache<String, Integer> cache) {
+ for (int i = 0; i < cnt; i++)
+ assertNull(cache.get("key" + i));
+ }
+ });
}
}
@@ -3745,6 +3771,9 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
* @throws Exception If failed.
*/
protected void checkSize(Collection<String> keys) throws Exception {
+ if (memoryMode() == OFFHEAP_TIERED)
+ return;
+
if (nearEnabled())
assertEquals(keys.size(), jcache().localSize(CachePeekMode.ALL));
else {
@@ -3768,7 +3797,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
}
}
- assertEquals("Incorrect size on cache #" + i, size, jcache(i).localSize(CachePeekMode.ALL));
+ assertEquals("Incorrect size on cache #" + i, size, jcache(i).localSize(ALL));
}
}
}
@@ -3779,8 +3808,8 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
*/
protected void checkKeySize(Collection<String> keys) throws Exception {
if (nearEnabled())
- assertEquals("Invalid key size: " + jcache().localSize(CachePeekMode.ALL),
- keys.size(), jcache().localSize(CachePeekMode.ALL));
+ assertEquals("Invalid key size: " + jcache().localSize(ALL),
+ keys.size(), jcache().localSize(ALL));
else {
for (int i = 0; i < gridCount(); i++) {
GridCacheContext<String, Integer> ctx = context(i);
@@ -3791,7 +3820,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
if (ctx.affinity().localNode(key, ctx.discovery().topologyVersionEx()))
size++;
- assertEquals("Incorrect key size on cache #" + i, size, jcache(i).localSize(CachePeekMode.ALL));
+ assertEquals("Incorrect key size on cache #" + i, size, jcache(i).localSize(ALL));
}
}
}
@@ -4390,7 +4419,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
assertFalse(cacheSkipStore.iterator().hasNext());
assertTrue(map.size() == 0);
- assertTrue(cache.size(CachePeekMode.ALL) == 0);
+ assertTrue(cache.size(ALL) == 0);
// putAll/removeAll from multiple nodes.
@@ -4479,8 +4508,8 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
assertTrue(map.containsKey(rmvKey));
- assertTrue(cache.size(CachePeekMode.ALL) == 0);
- assertTrue(cacheSkipStore.size(CachePeekMode.ALL) == 0);
+ assertTrue(cache.size(ALL) == 0);
+ assertTrue(cacheSkipStore.size(ALL) == 0);
cache.remove(rmvKey);
@@ -4723,8 +4752,8 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
for (int i = 0; i < keys.size(); i++)
putToStore(keys.get(i), i);
- assertTrue(cacheSkipStore.size(CachePeekMode.ALL) == 0);
- assertTrue(cache.size(CachePeekMode.ALL) == 0);
+ assertTrue(cacheSkipStore.size(ALL) == 0);
+ assertTrue(cache.size(ALL) == 0);
assertTrue(map.size() != 0);
try (Transaction tx = txs.txStart(txConcurrency, txIsolation)) {
@@ -4813,8 +4842,8 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
*/
private void checkEmpty(IgniteCache<String, Integer> cache, IgniteCache<String, Integer> cacheSkipStore)
throws Exception {
- assertTrue(cache.size(CachePeekMode.ALL) == 0);
- assertTrue(cacheSkipStore.size(CachePeekMode.ALL) == 0);
+ assertTrue(cache.size(ALL) == 0);
+ assertTrue(cacheSkipStore.size(ALL) == 0);
assertTrue(map.size() == 0);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java
index 342eb5a..efd0185 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java
@@ -393,8 +393,8 @@ public abstract class GridCacheAbstractSelfTest extends GridCommonAbstractTest {
* @throws Exception If failed.
*/
@Nullable protected <K, V> V peek(IgniteCache<K, V> cache, K key) throws Exception {
- return offheapTiered(cache) ? cache.localPeek(key, CachePeekMode.SWAP) : cache.localPeek(key,
- CachePeekMode.ONHEAP);
+ return offheapTiered(cache) ? cache.localPeek(key, CachePeekMode.SWAP, CachePeekMode.OFFHEAP) :
+ cache.localPeek(key, CachePeekMode.ONHEAP);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSwapReloadSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSwapReloadSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSwapReloadSelfTest.java
index 78f01f0..7d4eefc 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSwapReloadSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSwapReloadSelfTest.java
@@ -24,7 +24,6 @@ import org.apache.ignite.cache.store.*;
import org.apache.ignite.configuration.*;
import org.apache.ignite.events.*;
import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.lang.*;
import org.apache.ignite.spi.discovery.tcp.*;
@@ -35,7 +34,6 @@ import org.apache.ignite.spi.swapspace.file.*;
import org.apache.ignite.testframework.junits.common.*;
import javax.cache.*;
-import javax.cache.configuration.*;
import java.util.*;
import java.util.concurrent.*;
@@ -51,6 +49,18 @@ public class GridCacheSwapReloadSelfTest extends GridCommonAbstractTest {
/** IP finder. */
private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+ /**
+ * Creates swap space spi.
+ * @return The swap spi.
+ */
+ protected SwapSpaceSpi spi() {
+ FileSwapSpaceSpi swap = new FileSwapSpaceSpi();
+
+ swap.setWriteBufferSize(1);
+
+ return swap;
+ }
+
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
@@ -62,11 +72,9 @@ public class GridCacheSwapReloadSelfTest extends GridCommonAbstractTest {
cfg.setDiscoverySpi(disco);
- FileSwapSpaceSpi swap = new FileSwapSpaceSpi();
-
- swap.setWriteBufferSize(1);
+ SwapSpaceSpi spi = spi();
- cfg.setSwapSpaceSpi(swap);
+ cfg.setSwapSpaceSpi(spi);
CacheConfiguration cacheCfg = defaultCacheConfiguration();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
index a873bb0..bb449e0 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
@@ -77,7 +77,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
super.afterTest();
for (int i = 0; i < gridCount(); i++) {
- GridContinuousProcessor proc = ((IgniteKernal)grid(i)).context().continuous();
+ GridContinuousProcessor proc = grid(i).context().continuous();
ConcurrentMap<?, ?> syncMsgFuts = GridTestUtils.getFieldValue(proc, "syncMsgFuts");
@@ -712,7 +712,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
expirePlcCache.put(key, 10);
- U.sleep(200);
+ U.sleep(700);
if (!eagerTtl())
assertNull(primaryCache(key, cache.getName()).get(key)); // Provoke expire event if eager ttl is disabled.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNearLockValueSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNearLockValueSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNearLockValueSelfTest.java
new file mode 100644
index 0000000..5cc9d04
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNearLockValueSelfTest.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.managers.communication.*;
+import org.apache.ignite.internal.processors.cache.distributed.near.*;
+import org.apache.ignite.plugin.extensions.communication.*;
+import org.apache.ignite.spi.*;
+import org.apache.ignite.spi.communication.tcp.*;
+import org.apache.ignite.testframework.junits.common.*;
+import org.apache.ignite.transactions.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+import static org.apache.ignite.transactions.TransactionConcurrency.*;
+import static org.apache.ignite.transactions.TransactionIsolation.*;
+
+/**
+ *
+ */
+public class IgniteCacheNearLockValueSelfTest extends GridCommonAbstractTest {
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ startGridsMultiThreaded(2);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ super.afterTestsStopped();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ if (getTestGridName(0).equals(gridName))
+ cfg.setClientMode(true);
+
+ cfg.setCommunicationSpi(new TestCommunicationSpi());
+
+ return cfg;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testDhtVersion() throws Exception {
+ CacheConfiguration<Object, Object> pCfg = new CacheConfiguration<>("partitioned");
+
+ pCfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
+
+ try (IgniteCache<Object, Object> cache = ignite(0).getOrCreateCache(pCfg, new NearCacheConfiguration<>())) {
+ cache.put("key1", "val1");
+
+ for (int i = 0; i < 3; i++) {
+ ((TestCommunicationSpi)ignite(0).configuration().getCommunicationSpi()).clear();
+ ((TestCommunicationSpi)ignite(1).configuration().getCommunicationSpi()).clear();
+
+ try (Transaction tx = ignite(0).transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ cache.get("key1");
+
+ tx.commit();
+ }
+
+ TestCommunicationSpi comm = (TestCommunicationSpi)ignite(0).configuration().getCommunicationSpi();
+
+ assertEquals(1, comm.requests().size());
+
+ GridCacheAdapter<Object, Object> primary = ((IgniteKernal)grid(1)).internalCache("partitioned");
+
+ GridCacheEntryEx dhtEntry = primary.peekEx(primary.context().toCacheKeyObject("key1"));
+
+ assertNotNull(dhtEntry);
+
+ GridNearLockRequest req = comm.requests().iterator().next();
+
+ assertEquals(dhtEntry.version(), req.dhtVersion(0));
+
+ // Check entry version in near cache after commit.
+ GridCacheAdapter<Object, Object> near = ((IgniteKernal)grid(0)).internalCache("partitioned");
+
+ GridNearCacheEntry nearEntry = (GridNearCacheEntry)near.peekEx(near.context().toCacheKeyObject("key1"));
+
+ assertNotNull(nearEntry);
+
+ assertEquals(dhtEntry.version(), nearEntry.dhtVersion());
+ }
+ }
+ }
+
+ /**
+ *
+ */
+ private static class TestCommunicationSpi extends TcpCommunicationSpi {
+ /** */
+ private Collection<GridNearLockRequest> reqs = new ConcurrentLinkedDeque<>();
+
+ /** {@inheritDoc} */
+ @Override public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException {
+ if (msg instanceof GridIoMessage) {
+ GridIoMessage ioMsg = (GridIoMessage)msg;
+
+ if (ioMsg.message() instanceof GridNearLockRequest)
+ reqs.add((GridNearLockRequest)ioMsg.message());
+ }
+
+ super.sendMessage(node, msg);
+ }
+
+ /**
+ * @return Collected requests.
+ */
+ public Collection<GridNearLockRequest> requests() {
+ return reqs;
+ }
+
+ /**
+ *
+ */
+ public void clear() {
+ reqs.clear();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java
new file mode 100644
index 0000000..5aa0ac8
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.query.annotations.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.util.typedef.*;
+
+import javax.cache.*;
+import java.io.*;
+import java.util.concurrent.atomic.*;
+
+/**
+ * Checks behavior on exception while unmarshalling key.
+ */
+public class IgniteCacheP2pUnmarshallingErrorTest extends IgniteCacheAbstractTest {
+ /** Allows to change behavior of readExternal method. */
+ protected static AtomicInteger readCnt = new AtomicInteger();
+
+ /** Iterable key. */
+ protected static int key = 0;
+
+ /** {@inheritDoc} */
+ @Override protected int gridCount() {
+ return 3;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected CacheMode cacheMode() {
+ return CacheMode.PARTITIONED;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected CacheAtomicityMode atomicityMode() {
+ return CacheAtomicityMode.ATOMIC;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected CacheAtomicWriteOrderMode atomicWriteOrderMode() {
+ return CacheAtomicWriteOrderMode.PRIMARY;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected NearCacheConfiguration nearConfiguration() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ if (gridName.endsWith("0"))
+ cfg.setClientMode(true);
+
+ return cfg;
+ }
+
+ /** Test key 1. */
+ public static class TestKey implements Externalizable {
+ /** Test key 1. */
+ public TestKey(String field) {
+ this.field = field;
+ }
+
+ /** Test key 1. */
+ public TestKey() {
+ }
+
+ /** Field. */
+ @QuerySqlField(index = true)
+ private String field;
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ TestKey key = (TestKey)o;
+
+ return !(field != null ? !field.equals(key.field) : key.field != null);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return field != null ? field.hashCode() : 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeObject(field);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ field = (String)in.readObject();
+
+ if (readCnt.decrementAndGet() <= 0) {
+ throw new IOException("Class can not be unmarshalled");
+ }
+ }
+ }
+
+ /**
+ * Sends put atomically and handles fail.
+ */
+ protected void failAtomicPut(int k) {
+ try {
+ jcache(0).put(new TestKey(String.valueOf(k)), "");
+
+ assert false : "p2p marshalling failed, but error response was not sent";
+ }
+ catch (CacheException e) {
+ assert X.hasCause(e, IOException.class);
+ }
+
+ assert readCnt.get() == 0; //ensure we have read count as expected.
+ }
+
+ /**
+ * Sends get atomically and handles fail.
+ */
+ protected void failAtomicGet(int k) {
+ try {
+ jcache(0).get(new TestKey(String.valueOf(k)));
+
+ assert false : "p2p marshalling failed, but error response was not sent";
+ }
+ catch (CacheException e) {
+ assert X.hasCause(e, IOException.class);
+ }
+ }
+
+ /**
+ * Tests that correct response will be sent to client node in case of unmarshalling failed.
+ */
+ public void testResponseMessageOnUnmarshallingFailed() throws Exception {
+ //GridNearAtomicUpdateRequest unmarshalling failed test
+ readCnt.set(1);
+
+ failAtomicPut(++key);
+
+ //Check that cache is empty.
+ readCnt.set(Integer.MAX_VALUE);
+
+ assert jcache(0).get(new TestKey(String.valueOf(key))) == null;
+
+ //GridDhtAtomicUpdateRequest unmarshalling failed test
+ readCnt.set(2);
+
+ failAtomicPut(++key);
+
+ //Check that cache is not empty.
+ readCnt.set(Integer.MAX_VALUE);
+
+ assert jcache(0).get(new TestKey(String.valueOf(key))) != null;
+
+ //GridNearGetRequest unmarshalling failed test
+ readCnt.set(1);
+
+ failAtomicGet(++key);
+
+ //GridNearGetResponse unmarshalling failed test
+ readCnt.set(Integer.MAX_VALUE);
+
+ jcache(0).put(new TestKey(String.valueOf(++key)), "");
+
+ readCnt.set(2);
+
+ failAtomicGet(key);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingNearErrorTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingNearErrorTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingNearErrorTest.java
new file mode 100644
index 0000000..732d12d
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingNearErrorTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.cache.eviction.fifo.*;
+import org.apache.ignite.configuration.*;
+
+/**
+ * Checks behavior on exception while unmarshalling key.
+ */
+public class IgniteCacheP2pUnmarshallingNearErrorTest extends IgniteCacheP2pUnmarshallingErrorTest {
+ /** {@inheritDoc} */
+ @Override protected NearCacheConfiguration nearConfiguration() {
+ return new NearCacheConfiguration();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.getCacheConfiguration()[0].setEvictMaxOverflowRatio(0);
+ cfg.getCacheConfiguration()[0].setEvictSynchronized(true);
+ cfg.getCacheConfiguration()[0].setEvictSynchronizedKeyBufferSize(1);
+ cfg.getCacheConfiguration()[0].setEvictionPolicy(new FifoEvictionPolicy(1));
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void testResponseMessageOnUnmarshallingFailed() throws InterruptedException {
+ //GridCacheEvictionRequest unmarshalling failed test
+ readCnt.set(5); //2 for each put
+
+ jcache(0).put(new TestKey(String.valueOf(++key)), "");
+ jcache(0).put(new TestKey(String.valueOf(++key)), "");
+
+ //Eviction request unmarshalling failed but ioManager does not hangs up.
+
+ Thread.sleep(1000); //todo: wait for eviction complete
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingRebalanceErrorTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingRebalanceErrorTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingRebalanceErrorTest.java
new file mode 100644
index 0000000..49f58f9
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingRebalanceErrorTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.cache.affinity.*;
+import org.apache.ignite.internal.util.typedef.*;
+
+import javax.cache.*;
+import java.io.*;
+
+/**
+ * Checks behavior on exception while unmarshalling key.
+ */
+public class IgniteCacheP2pUnmarshallingRebalanceErrorTest extends IgniteCacheP2pUnmarshallingErrorTest {
+ /** {@inheritDoc} */
+ @Override protected int gridCount() {
+ return 2;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void testResponseMessageOnUnmarshallingFailed() throws Exception {
+ //GridDhtPartitionSupplyMessage unmarshalling failed test
+ readCnt.set(Integer.MAX_VALUE);
+
+ for (int i = 0; i <= 20; i++) {
+ jcache(0).put(new TestKey(String.valueOf(++key)), "");
+ }
+
+ readCnt.set(1);
+
+ startGrid(3);
+
+ //GridDhtPartitionSupplyMessage unmarshalling failed but ioManager does not hangs up.
+
+ Thread.sleep(1000);
+
+ //GridDhtForceKeysRequest unmarshalling failed test
+ stopGrid(3);
+
+ readCnt.set(Integer.MAX_VALUE);
+
+ for (int i = 0; i <= 1000; i++) {
+ jcache(0).put(new TestKey(String.valueOf(++key)), "");
+ }
+
+ startGrid(3);
+
+ Affinity<Object> aff = affinity(grid(3).cache(null));
+
+ while (!aff.isPrimary(grid(3).localNode(), new TestKey(String.valueOf(key)))) {
+ --key;
+ }
+
+ readCnt.set(1);
+
+ try {
+ jcache(3).get(new TestKey(String.valueOf(key)));
+ assert false : "p2p marshalling failed, but error response was not sent";
+ }
+ catch (CacheException e) {
+ assert X.hasCause(e, IOException.class);
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingTxErrorTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingTxErrorTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingTxErrorTest.java
new file mode 100644
index 0000000..a4e2753
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingTxErrorTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.transactions.*;
+
+import javax.cache.*;
+import java.io.*;
+
+/**
+ * Checks behavior on exception while unmarshalling key.
+ */
+public class IgniteCacheP2pUnmarshallingTxErrorTest extends IgniteCacheP2pUnmarshallingErrorTest {
+
+ /** {@inheritDoc} */
+ @Override protected CacheAtomicityMode atomicityMode() {
+ return CacheAtomicityMode.TRANSACTIONAL;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ if (!gridName.endsWith("0"))
+ cfg.getCacheConfiguration()[0].setRebalanceDelay(-1); //allows to check GridDhtLockRequest fail.
+
+ return cfg;
+ }
+
+ /**
+ * Sends put with optimistic lock and handles fail.
+ */
+ protected void failOptimistic() {
+ try (Transaction tx = grid(0).transactions().txStart(TransactionConcurrency.OPTIMISTIC, TransactionIsolation.REPEATABLE_READ)) {
+
+ jcache(0).put(new TestKey(String.valueOf(++key)), "");
+
+ tx.commit();
+
+ assert false : "p2p marshalling failed, but error response was not sent";
+ }
+ catch (IgniteException e) {
+ assert X.hasCause(e, IOException.class);
+ }
+
+ assert readCnt.get() == 0; //ensure we have read count as expected.
+ }
+
+ /**
+ * Sends put with pessimistic lock and handles fail.
+ */
+ protected void failPessimictic() {
+ try (Transaction tx = grid(0).transactions().txStart(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ)) {
+
+ jcache(0).put(new TestKey(String.valueOf(++key)), "");
+
+ assert false : "p2p marshalling failed, but error response was not sent";
+ }
+ catch (CacheException e) {
+ assert X.hasCause(e, IOException.class);
+ }
+
+ assert readCnt.get() == 0; //ensure we have read count as expected.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void testResponseMessageOnUnmarshallingFailed() {
+ //GridNearTxPrepareRequest unmarshalling failed test
+ readCnt.set(2);
+
+ failOptimistic();
+
+ //GridDhtTxPrepareRequest unmarshalling failed test
+ readCnt.set(3);
+
+ failOptimistic();
+
+ //GridNearLockRequest unmarshalling failed test
+ readCnt.set(2);
+
+ failPessimictic();
+
+ //GridDhtLockRequest unmarshalling failed test
+ readCnt.set(3);
+
+ try (Transaction tx = grid(0).transactions().txStart(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ)) {
+ jcache(0).put(new TestKey(String.valueOf(++key)), ""); //No failure at client side.
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePeekModesAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePeekModesAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePeekModesAbstractTest.java
index c2eb56f..7cd8414 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePeekModesAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePeekModesAbstractTest.java
@@ -26,6 +26,7 @@ import org.apache.ignite.configuration.*;
import org.apache.ignite.internal.*;
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.spi.*;
+import org.apache.ignite.spi.swapspace.*;
import org.apache.ignite.spi.swapspace.file.*;
import javax.cache.*;
@@ -54,11 +55,19 @@ public abstract class IgniteCachePeekModesAbstractTest extends IgniteCacheAbstra
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
- cfg.setSwapSpaceSpi(new FileSwapSpaceSpi());
+ cfg.setSwapSpaceSpi(spi());
return cfg;
}
+ /**
+ * Creates a SwapSpaceSpi.
+ * @return the Spi
+ */
+ protected SwapSpaceSpi spi() {
+ return new FileSwapSpaceSpi();
+ }
+
/** {@inheritDoc} */
@Override protected NearCacheConfiguration nearConfiguration() {
return null;
@@ -231,7 +240,7 @@ public abstract class IgniteCachePeekModesAbstractTest extends IgniteCacheAbstra
for (Integer key : keys)
cache0.put(key, val);
- FileSwapSpaceSpi swap = (FileSwapSpaceSpi)ignite(nodeIdx).configuration().getSwapSpaceSpi();
+ SwapSpaceSpi swap = ignite(nodeIdx).configuration().getSwapSpaceSpi();
Set<Integer> swapKeys = new HashSet<>();
@@ -614,7 +623,7 @@ public abstract class IgniteCachePeekModesAbstractTest extends IgniteCacheAbstra
* @return Tuple with primary and backup keys.
*/
private T2<List<Integer>, List<Integer>> swapKeys(int nodeIdx) {
- FileSwapSpaceSpi swap = (FileSwapSpaceSpi)ignite(nodeIdx).configuration().getSwapSpaceSpi();
+ SwapSpaceSpi swap = ignite(nodeIdx).configuration().getSwapSpaceSpi();
IgniteSpiCloseableIterator<Integer> it = swap.keyIterator(SPACE_NAME, null);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
index 72b76d7..095221e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
@@ -26,6 +26,9 @@ import org.apache.ignite.internal.*;
import org.apache.ignite.internal.managers.discovery.*;
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.lang.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
import org.apache.ignite.testframework.*;
import org.apache.ignite.testframework.junits.common.*;
@@ -40,6 +43,9 @@ import java.util.concurrent.atomic.*;
@SuppressWarnings("unchecked")
public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
/** */
+ private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
private static final String DYNAMIC_CACHE_NAME = "TestDynamicCache";
/** */
@@ -51,8 +57,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
/** */
public static final IgnitePredicate<ClusterNode> NODE_FILTER = new IgnitePredicate<ClusterNode>() {
/** {@inheritDoc} */
- @Override
- public boolean apply(ClusterNode n) {
+ @Override public boolean apply(ClusterNode n) {
Boolean val = n.attribute(TEST_ATTRIBUTE_NAME);
return val != null && val;
@@ -78,6 +83,8 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
cfg.setUserAttributes(F.asMap(TEST_ATTRIBUTE_NAME, testAttribute));
CacheConfiguration cacheCfg = new CacheConfiguration();
@@ -157,8 +164,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
futs.clear();
GridTestUtils.runMultiThreaded(new Callable<Object>() {
- @Override
- public Object call() throws Exception {
+ @Override public Object call() throws Exception {
futs.add(kernal.context().cache().dynamicStopCache(DYNAMIC_CACHE_NAME));
return null;
@@ -218,8 +224,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
futs.clear();
GridTestUtils.runMultiThreaded(new Callable<Object>() {
- @Override
- public Object call() throws Exception {
+ @Override public Object call() throws Exception {
IgniteEx kernal = grid(ThreadLocalRandom.current().nextInt(nodeCount()));
futs.add(kernal.context().cache().dynamicStopCache(DYNAMIC_CACHE_NAME));
@@ -940,8 +945,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
latches[i] = new CountDownLatch(1);
lsnrs[i] = new IgnitePredicate<CacheEvent>() {
- @Override
- public boolean apply(CacheEvent e) {
+ @Override public boolean apply(CacheEvent e) {
switch (e.type()) {
case EventType.EVT_CACHE_NODES_LEFT:
latches[idx].countDown();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/OffHeapTieredTransactionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/OffHeapTieredTransactionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/OffHeapTieredTransactionSelfTest.java
new file mode 100644
index 0000000..671d6c4
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/OffHeapTieredTransactionSelfTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.junits.common.*;
+import org.apache.ignite.transactions.*;
+
+import java.util.*;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.cache.CacheMemoryMode.*;
+import static org.apache.ignite.cache.CacheMode.*;
+import static org.apache.ignite.transactions.TransactionConcurrency.*;
+import static org.apache.ignite.transactions.TransactionIsolation.*;
+
+/**
+ * Off-heap tiered test.
+ */
+public class OffHeapTieredTransactionSelfTest extends GridCommonAbstractTest {
+ /** IP finder. */
+ private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ CacheConfiguration ccfg = defaultCacheConfiguration();
+
+ ccfg.setMemoryMode(OFFHEAP_TIERED);
+ ccfg.setAtomicityMode(TRANSACTIONAL);
+ ccfg.setOffHeapMaxMemory(0);
+ ccfg.setSwapEnabled(true);
+ ccfg.setCacheMode(REPLICATED);
+
+ TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+ disco.setIpFinder(ipFinder);
+
+ cfg.setDiscoverySpi(disco);
+
+ cfg.setCacheConfiguration(ccfg);
+
+ cfg.getTransactionConfiguration().setTxSerializableEnabled(true);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected long getTestTimeout() {
+ return 30_000;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ startGrids(2);
+
+ awaitPartitionMapExchange();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+ }
+
+ /**
+ * @throws Exception In case of error.
+ */
+ public void testPutAll() throws Exception {
+ IgniteCache<String, Integer> cache = grid(0).cache(null);
+
+ final int KEYS = 5;
+
+ Map<String, Integer> data = new LinkedHashMap<>();
+
+ for (int i = 0; i < KEYS; i++)
+ data.put("key_" + i, i);
+
+ checkPutAll(cache, data, OPTIMISTIC, READ_COMMITTED);
+
+ checkPutAll(cache, data, OPTIMISTIC, REPEATABLE_READ);
+
+ checkPutAll(cache, data, OPTIMISTIC, SERIALIZABLE);
+
+ checkPutAll(cache, data, PESSIMISTIC, READ_COMMITTED);
+
+ checkPutAll(cache, data, PESSIMISTIC, REPEATABLE_READ);
+
+ checkPutAll(cache, data, PESSIMISTIC, SERIALIZABLE);
+ }
+
+ /**
+ * @throws Exception In case of error.
+ */
+ private void checkPutAll(IgniteCache<String, Integer> cache, Map<String, Integer> data,
+ TransactionConcurrency txConcurrency, TransactionIsolation txIsolation) throws Exception {
+ IgniteTransactions txs = cache.unwrap(Ignite.class).transactions();
+
+ try (Transaction tx = txs.txStart(txConcurrency, txIsolation)) {
+ cache.putAll(data);
+
+ tx.commit();
+ }
+
+ for (Map.Entry<String, Integer> entry : data.entrySet())
+ assertEquals(entry.getValue(), cache.get(entry.getKey()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLoadingConcurrentGridStartSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLoadingConcurrentGridStartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLoadingConcurrentGridStartSelfTest.java
new file mode 100644
index 0000000..798494f
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLoadingConcurrentGridStartSelfTest.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.store.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.testframework.*;
+import org.apache.ignite.testframework.junits.common.*;
+import org.jetbrains.annotations.*;
+
+import javax.cache.*;
+import javax.cache.configuration.*;
+import javax.cache.integration.*;
+import java.io.*;
+import java.util.concurrent.*;
+
+import static org.apache.ignite.cache.CacheMode.*;
+
+/**
+ * Tests for cache data loading during simultaneous grids start.
+ */
+public class CacheLoadingConcurrentGridStartSelfTest extends GridCommonAbstractTest {
+ /** Grids count */
+ private static int GRIDS_CNT = 5;
+
+ /** Keys count */
+ private static int KEYS_CNT = 1_000_000;
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ CacheConfiguration ccfg = new CacheConfiguration();
+
+ ccfg.setCacheMode(PARTITIONED);
+
+ ccfg.setBackups(1);
+
+ ccfg.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(new TestCacheStoreAdapter()));
+
+ cfg.setCacheConfiguration(ccfg);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+ }
+
+ /**
+ * @throws Exception if failed
+ */
+ public void testLoadCacheWithDataStreamer() throws Exception {
+ IgniteInClosure<Ignite> f = new IgniteInClosure<Ignite>() {
+ @Override public void apply(Ignite grid) {
+ try (IgniteDataStreamer<Integer, String> dataStreamer = grid.dataStreamer(null)) {
+ for (int i = 0; i < KEYS_CNT; i++)
+ dataStreamer.addData(i, Integer.toString(i));
+ }
+ }
+ };
+
+ loadCache(f);
+ }
+
+ /**
+ * @throws Exception if failed
+ */
+ public void testLoadCacheFromStore() throws Exception {
+ loadCache(new IgniteInClosure<Ignite>() {
+ @Override public void apply(Ignite grid) {
+ grid.cache(null).loadCache(null);
+ }
+ });
+ }
+
+ /**
+ * Loads cache using closure and asserts cache size.
+ *
+ * @param f cache loading closure
+ * @throws Exception if failed
+ */
+ private void loadCache(IgniteInClosure<Ignite> f) throws Exception {
+ Ignite g0 = startGrid(0);
+
+ IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable<Ignite>() {
+ @Override public Ignite call() throws Exception {
+ return startGridsMultiThreaded(1, GRIDS_CNT - 1);
+ }
+ });
+
+ try {
+ f.apply(g0);
+ }
+ finally {
+ fut.get();
+ }
+
+ assertCacheSize();
+ }
+
+ /** Asserts cache size. */
+ private void assertCacheSize() {
+ IgniteCache<Integer, String> cache = grid(0).cache(null);
+
+ assertEquals(KEYS_CNT, cache.size(CachePeekMode.PRIMARY));
+
+ int total = 0;
+
+ for (int i = 0; i < GRIDS_CNT; i++)
+ total += grid(i).cache(null).localSize(CachePeekMode.PRIMARY);
+
+ assertEquals(KEYS_CNT, total);
+ }
+
+ /**
+ * Cache store adapter.
+ */
+ private static class TestCacheStoreAdapter extends CacheStoreAdapter<Integer, String> implements Serializable {
+ /** {@inheritDoc} */
+ @Override public void loadCache(IgniteBiInClosure<Integer, String> f, Object... args) {
+ for (int i = 0; i < KEYS_CNT; i++)
+ f.apply(i, Integer.toString(i));
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public String load(Integer i) throws CacheLoaderException {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void write(Cache.Entry<? extends Integer, ? extends String> entry)
+ throws CacheWriterException {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void delete(Object o) throws CacheWriterException {
+ // No-op.
+ }
+ }
+}