You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2015/06/23 17:22:54 UTC
[07/38] incubator-ignite git commit: ignite-973 - swap manager fix +
offheap processor signatures fix
ignite-973 - swap manager fix + offheap processor signatures fix
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/23512df8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/23512df8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/23512df8
Branch: refs/heads/ignite-gg-10441
Commit: 23512df815c196f9a2292ad948b130380f123770
Parents: b23ea74
Author: S.Vladykin <sv...@gridgain.com>
Authored: Fri Jun 19 13:31:27 2015 +0300
Committer: S.Vladykin <sv...@gridgain.com>
Committed: Fri Jun 19 13:31:27 2015 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheSwapManager.java | 12 +-
.../offheap/GridOffHeapProcessor.java | 19 +-
.../apache/ignite/internal/util/GridDebug.java | 37 ++--
.../cache/IgniteCacheOffheapEvictQueryTest.java | 179 +++++++++++++++++++
.../IgniteCacheQuerySelfTestSuite.java | 1 +
5 files changed, 220 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/23512df8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
index d0d9049..f709e03 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
@@ -977,15 +977,15 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
// First try offheap.
if (offheapEnabled) {
- byte[] val = offheap.remove(spaceName, part, key.value(cctx.cacheObjectContext(), false),
- key.valueBytes(cctx.cacheObjectContext()));
-
- if(val != null && cctx.config().isStatisticsEnabled())
- cctx.cache().metrics0().onOffHeapRemove();
+ // TODO Pass closure c to offheap.remove and apply it before the actual remove.
+ byte[] val = offheap.remove(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext()));
if (val != null) {
+ if (cctx.config().isStatisticsEnabled())
+ cctx.cache().metrics0().onOffHeapRemove();
+
if (c != null)
- c.apply(val); // Probably we should read value and apply closure before removing...
+ c.apply(val);
return;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/23512df8/modules/core/src/main/java/org/apache/ignite/internal/processors/offheap/GridOffHeapProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/offheap/GridOffHeapProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/offheap/GridOffHeapProcessor.java
index a99c4c0..81bf9f2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/offheap/GridOffHeapProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/offheap/GridOffHeapProcessor.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.offheap;
import org.apache.ignite.*;
import org.apache.ignite.internal.*;
import org.apache.ignite.internal.processors.*;
+import org.apache.ignite.internal.processors.cache.*;
import org.apache.ignite.internal.util.*;
import org.apache.ignite.internal.util.lang.*;
import org.apache.ignite.internal.util.offheap.*;
@@ -101,7 +102,7 @@ public class GridOffHeapProcessor extends GridProcessorAdapter {
* @return Key bytes
* @throws IgniteCheckedException If failed.
*/
- private byte[] keyBytes(Object key, @Nullable byte[] keyBytes) throws IgniteCheckedException {
+ private byte[] keyBytes(KeyCacheObject key, @Nullable byte[] keyBytes) throws IgniteCheckedException {
assert key != null;
return keyBytes != null ? keyBytes : marsh.marshal(key);
@@ -130,7 +131,7 @@ public class GridOffHeapProcessor extends GridProcessorAdapter {
* @return {@code true} If offheap space contains value for the given key.
* @throws IgniteCheckedException If failed.
*/
- public boolean contains(@Nullable String spaceName, int part, Object key, byte[] keyBytes)
+ public boolean contains(@Nullable String spaceName, int part, KeyCacheObject key, byte[] keyBytes)
throws IgniteCheckedException {
GridOffHeapPartitionedMap m = offheap(spaceName);
@@ -147,7 +148,7 @@ public class GridOffHeapProcessor extends GridProcessorAdapter {
* @return Value bytes.
* @throws IgniteCheckedException If failed.
*/
- @Nullable public byte[] get(@Nullable String spaceName, int part, Object key, byte[] keyBytes)
+ @Nullable public byte[] get(@Nullable String spaceName, int part, KeyCacheObject key, byte[] keyBytes)
throws IgniteCheckedException {
GridOffHeapPartitionedMap m = offheap(spaceName);
@@ -166,7 +167,7 @@ public class GridOffHeapProcessor extends GridProcessorAdapter {
* @return Tuple where first value is pointer and second is value size.
* @throws IgniteCheckedException If failed.
*/
- @Nullable public IgniteBiTuple<Long, Integer> valuePointer(@Nullable String spaceName, int part, Object key,
+ @Nullable public IgniteBiTuple<Long, Integer> valuePointer(@Nullable String spaceName, int part, KeyCacheObject key,
byte[] keyBytes) throws IgniteCheckedException {
GridOffHeapPartitionedMap m = offheap(spaceName);
@@ -182,7 +183,7 @@ public class GridOffHeapProcessor extends GridProcessorAdapter {
* @param keyBytes Key bytes.
* @throws IgniteCheckedException If failed.
*/
- public void enableEviction(@Nullable String spaceName, int part, Object key, byte[] keyBytes)
+ public void enableEviction(@Nullable String spaceName, int part, KeyCacheObject key, byte[] keyBytes)
throws IgniteCheckedException {
GridOffHeapPartitionedMap m = offheap(spaceName);
@@ -201,7 +202,7 @@ public class GridOffHeapProcessor extends GridProcessorAdapter {
* @return Value bytes.
* @throws IgniteCheckedException If failed.
*/
- @Nullable public <T> T getValue(@Nullable String spaceName, int part, Object key, byte[] keyBytes,
+ @Nullable public <T> T getValue(@Nullable String spaceName, int part, KeyCacheObject key, byte[] keyBytes,
@Nullable ClassLoader ldr) throws IgniteCheckedException {
byte[] valBytes = get(spaceName, part, key, keyBytes);
@@ -221,7 +222,7 @@ public class GridOffHeapProcessor extends GridProcessorAdapter {
* @return Value bytes.
* @throws IgniteCheckedException If failed.
*/
- @Nullable public byte[] remove(@Nullable String spaceName, int part, Object key, byte[] keyBytes) throws IgniteCheckedException {
+ @Nullable public byte[] remove(@Nullable String spaceName, int part, KeyCacheObject key, byte[] keyBytes) throws IgniteCheckedException {
GridOffHeapPartitionedMap m = offheap(spaceName);
return m == null ? null : m.remove(part, U.hash(key), keyBytes(key, keyBytes));
@@ -237,7 +238,7 @@ public class GridOffHeapProcessor extends GridProcessorAdapter {
* @param valBytes Value bytes.
* @throws IgniteCheckedException If failed.
*/
- public void put(@Nullable String spaceName, int part, Object key, byte[] keyBytes, byte[] valBytes)
+ public void put(@Nullable String spaceName, int part, KeyCacheObject key, byte[] keyBytes, byte[] valBytes)
throws IgniteCheckedException {
GridOffHeapPartitionedMap m = offheap(spaceName);
@@ -258,7 +259,7 @@ public class GridOffHeapProcessor extends GridProcessorAdapter {
* @return {@code true} If succeeded.
* @throws IgniteCheckedException If failed.
*/
- public boolean removex(@Nullable String spaceName, int part, Object key, byte[] keyBytes) throws IgniteCheckedException {
+ public boolean removex(@Nullable String spaceName, int part, KeyCacheObject key, byte[] keyBytes) throws IgniteCheckedException {
GridOffHeapPartitionedMap m = offheap(spaceName);
return m != null && m.removex(part, U.hash(key), keyBytes(key, keyBytes));
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/23512df8/modules/core/src/main/java/org/apache/ignite/internal/util/GridDebug.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridDebug.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridDebug.java
index aadec74..98c8664 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridDebug.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridDebug.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.util;
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
import org.jetbrains.annotations.*;
import java.io.*;
@@ -173,13 +174,6 @@ public class GridDebug {
}
/**
- * Dump collected data to stdout.
- */
- public static void dump() {
- dump(que.get());
- }
-
- /**
* Dumps given number of last events.
*
* @param n Number of last elements to dump.
@@ -204,7 +198,7 @@ public class GridDebug {
* @param que Queue.
*/
@SuppressWarnings("TypeMayBeWeakened")
- public static void dump(ConcurrentLinkedQueue<Item> que) {
+ public static void dump(Collection<Item> que) {
if (que == null)
return;
@@ -226,7 +220,7 @@ public class GridDebug {
*/
public static String dumpWithStop(Object... x) {
debug(x);
- return dumpWithReset(null);
+ return dumpWithReset(null, null);
}
/**
@@ -235,16 +229,20 @@ public class GridDebug {
* @return Empty string (useful for assertions like {@code assert x == 0 : D.dumpWithReset();} ).
*/
public static String dumpWithReset() {
- return dumpWithReset(new ConcurrentLinkedQueue<Item>());
+ return dumpWithReset(new ConcurrentLinkedQueue<Item>(), null);
}
/**
* Dump existing queue to stdout and atomically replace it with given.
*
* @param q2 Queue.
+ * @param filter Filter for logged debug items.
* @return Empty string.
*/
- private static String dumpWithReset(@Nullable ConcurrentLinkedQueue<Item> q2) {
+ public static String dumpWithReset(
+ @Nullable ConcurrentLinkedQueue<Item> q2,
+ @Nullable IgnitePredicate<Item> filter
+ ) {
ConcurrentLinkedQueue<Item> q;
do {
@@ -255,7 +253,20 @@ public class GridDebug {
}
while (!que.compareAndSet(q, q2));
- dump(q);
+ Collection<Item> col = null;
+
+ if (filter == null)
+ col = q;
+ else if (q != null) {
+ col = new ArrayList<>();
+
+ for (Item item : q) {
+ if (filter.apply(item))
+ col.add(item);
+ }
+ }
+
+ dump(col);
return "";
}
@@ -281,7 +292,7 @@ public class GridDebug {
*/
private static String formatEntry(long ts, String threadName, long threadId, Object... data) {
return "<" + DEBUG_DATE_FMT.format(new Date(ts)) + "><~DBG~><" + threadName + " id:" + threadId + "> " +
- Arrays.toString(data);
+ Arrays.deepToString(data);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/23512df8/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapEvictQueryTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapEvictQueryTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapEvictQueryTest.java
new file mode 100644
index 0000000..fc6c125
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapEvictQueryTest.java
@@ -0,0 +1,179 @@
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.query.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.spi.swapspace.inmemory.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import javax.cache.*;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.cache.CacheMode.*;
+
+/**
+ */
+public class IgniteCacheOffheapEvictQueryTest extends GridCommonAbstractTest {
+ /** */
+ private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+ disco.setIpFinder(ipFinder);
+
+ cfg.setDiscoverySpi(disco);
+
+ cfg.setSwapSpaceSpi(new GridTestSwapSpaceSpi());
+
+ CacheConfiguration<?,?> cacheCfg = defaultCacheConfiguration();
+
+ cacheCfg.setCacheMode(PARTITIONED);
+ cacheCfg.setAtomicityMode(TRANSACTIONAL);
+ cacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+ cacheCfg.setSwapEnabled(true);
+ cacheCfg.setBackups(0);
+ cacheCfg.setMemoryMode(CacheMemoryMode.ONHEAP_TIERED);
+ cacheCfg.setEvictionPolicy(null);
+ cacheCfg.setNearConfiguration(null);
+
+ cacheCfg.setSqlOnheapRowCacheSize(128);
+
+ cacheCfg.setIndexedTypes(
+ Integer.class, Integer.class
+ );
+
+ cacheCfg.setOffHeapMaxMemory(2000); // Small offheap for evictions from offheap to swap.
+
+ cfg.setCacheConfiguration(cacheCfg);
+
+ return cfg;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testEvictAndRemove() throws Exception {
+ final int KEYS_CNT = 3000;
+ final int THREADS_CNT = 256;
+
+ final IgniteCache<Integer,Integer> c = startGrid().cache(null);
+
+ for (int i = 0; i < KEYS_CNT; i++) {
+ c.put(i, i);
+
+ if ((i & 1) == 0)
+ c.localEvict(F.asList(i));
+ }
+
+ X.println("___ Cache loaded...");
+
+ final CyclicBarrier b = new CyclicBarrier(THREADS_CNT, new Runnable() {
+ @Override public void run() {
+ X.println("___ go!");
+ }
+ });
+
+ final AtomicInteger keys = new AtomicInteger(KEYS_CNT);
+
+ IgniteInternalFuture<?> fut = multithreadedAsync(new Runnable() {
+ @Override public void run() {
+ Random rnd = new GridRandom();
+
+ try {
+ b.await();
+ }
+ catch (InterruptedException e) {
+ throw new IgniteInterruptedException(e);
+ }
+ catch (BrokenBarrierException e) {
+ throw new IllegalStateException(e);
+ }
+
+ while (keys.get() > 0) {
+ int k = rnd.nextInt(KEYS_CNT);
+
+ try {
+ switch (rnd.nextInt(4)) {
+ case 0:
+ c.localEvict(F.asList(k));
+
+ break;
+
+ case 1:
+ c.get(k);
+
+ break;
+
+ case 2:
+ if (c.remove(k))
+ keys.decrementAndGet();
+
+ break;
+
+ case 3:
+ c.query(new SqlFieldsQuery("select _val from Integer where _key between ? and ?")
+ .setArgs(k, k + 20).setLocal(true)).getAll();
+
+ break;
+ }
+ }
+ catch (CacheException e) {
+ String msgStart = "Failed to get value for key:";
+
+ for (Throwable th = e; th != null; th = th.getCause()) {
+ String msg = th.getMessage();
+
+ if (msg != null && msg.startsWith(msgStart)) {
+ int dot = msg.indexOf('.', msgStart.length());
+
+ assertTrue(dot != -1);
+
+ final Integer failedKey = Integer.parseInt(msg.substring(msgStart.length(), dot).trim());
+
+ X.println("___ failed key: " + failedKey);
+
+ break;
+ }
+ }
+
+ LT.warn(log, null, e.getMessage());
+
+ return;
+ }
+ }
+ }
+ }, THREADS_CNT);
+
+ try {
+ fut.get(60_000);
+
+ if (c.size(CachePeekMode.ALL) != 0)
+ fail("Not all keys removed.");
+
+ X.println("___ all keys removed");
+ }
+ catch (IgniteFutureTimeoutCheckedException e) {
+ X.println("___ timeout");
+ X.println("___ keys: " + keys.get());
+
+ keys.set(0);
+
+ fut.get();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/23512df8/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
index dee3078..b9205a9 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
@@ -63,6 +63,7 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite {
suite.addTestSuite(IgniteCacheQueryEvictsMultiThreadedSelfTest.class);
suite.addTestSuite(IgniteCacheQueryOffheapMultiThreadedSelfTest.class);
// suite.addTestSuite(IgniteCacheQueryOffheapEvictsMultiThreadedSelfTest.class); TODO IGNITE-971.
+ suite.addTestSuite(IgniteCacheOffheapEvictQueryTest.class);
suite.addTestSuite(IgniteCacheSqlQueryMultiThreadedSelfTest.class);
suite.addTestSuite(IgniteCacheOffheapTieredMultithreadedSelfTest.class);
suite.addTestSuite(IgniteCacheQueryNodeRestartSelfTest.class);