You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/23 15:57:45 UTC

[10/40] incubator-ignite git commit: ignite-973 - swap manager fix + offheap processor signatures fix

ignite-973 - swap manager fix + offheap processor signatures fix


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/23512df8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/23512df8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/23512df8

Branch: refs/heads/IgniteReflectionFactory-doc
Commit: 23512df815c196f9a2292ad948b130380f123770
Parents: b23ea74
Author: S.Vladykin <sv...@gridgain.com>
Authored: Fri Jun 19 13:31:27 2015 +0300
Committer: S.Vladykin <sv...@gridgain.com>
Committed: Fri Jun 19 13:31:27 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheSwapManager.java  |  12 +-
 .../offheap/GridOffHeapProcessor.java           |  19 +-
 .../apache/ignite/internal/util/GridDebug.java  |  37 ++--
 .../cache/IgniteCacheOffheapEvictQueryTest.java | 179 +++++++++++++++++++
 .../IgniteCacheQuerySelfTestSuite.java          |   1 +
 5 files changed, 220 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/23512df8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
index d0d9049..f709e03 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
@@ -977,15 +977,15 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
 
         // First try offheap.
         if (offheapEnabled) {
-            byte[] val = offheap.remove(spaceName, part, key.value(cctx.cacheObjectContext(), false),
-                key.valueBytes(cctx.cacheObjectContext()));
-
-            if(val != null && cctx.config().isStatisticsEnabled())
-                cctx.cache().metrics0().onOffHeapRemove();
+            // TODO Pass closure c to offheap.remove and apply it before the actual remove.
+            byte[] val = offheap.remove(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext()));
 
             if (val != null) {
+                if (cctx.config().isStatisticsEnabled())
+                    cctx.cache().metrics0().onOffHeapRemove();
+
                 if (c != null)
-                    c.apply(val); // Probably we should read value and apply closure before removing...
+                    c.apply(val);
 
                 return;
             }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/23512df8/modules/core/src/main/java/org/apache/ignite/internal/processors/offheap/GridOffHeapProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/offheap/GridOffHeapProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/offheap/GridOffHeapProcessor.java
index a99c4c0..81bf9f2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/offheap/GridOffHeapProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/offheap/GridOffHeapProcessor.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.offheap;
 import org.apache.ignite.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.processors.*;
+import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.lang.*;
 import org.apache.ignite.internal.util.offheap.*;
@@ -101,7 +102,7 @@ public class GridOffHeapProcessor extends GridProcessorAdapter {
      * @return Key bytes
      * @throws IgniteCheckedException If failed.
      */
-    private byte[] keyBytes(Object key, @Nullable byte[] keyBytes) throws IgniteCheckedException {
+    private byte[] keyBytes(KeyCacheObject key, @Nullable byte[] keyBytes) throws IgniteCheckedException {
         assert key != null;
 
         return keyBytes != null ? keyBytes : marsh.marshal(key);
@@ -130,7 +131,7 @@ public class GridOffHeapProcessor extends GridProcessorAdapter {
      * @return {@code true} If offheap space contains value for the given key.
      * @throws IgniteCheckedException If failed.
      */
-    public boolean contains(@Nullable String spaceName, int part, Object key, byte[] keyBytes)
+    public boolean contains(@Nullable String spaceName, int part, KeyCacheObject key, byte[] keyBytes)
         throws IgniteCheckedException {
         GridOffHeapPartitionedMap m = offheap(spaceName);
 
@@ -147,7 +148,7 @@ public class GridOffHeapProcessor extends GridProcessorAdapter {
      * @return Value bytes.
      * @throws IgniteCheckedException If failed.
      */
-    @Nullable public byte[] get(@Nullable String spaceName, int part, Object key, byte[] keyBytes)
+    @Nullable public byte[] get(@Nullable String spaceName, int part, KeyCacheObject key, byte[] keyBytes)
         throws IgniteCheckedException {
         GridOffHeapPartitionedMap m = offheap(spaceName);
 
@@ -166,7 +167,7 @@ public class GridOffHeapProcessor extends GridProcessorAdapter {
      * @return Tuple where first value is pointer and second is value size.
      * @throws IgniteCheckedException If failed.
      */
-    @Nullable public IgniteBiTuple<Long, Integer> valuePointer(@Nullable String spaceName, int part, Object key,
+    @Nullable public IgniteBiTuple<Long, Integer> valuePointer(@Nullable String spaceName, int part, KeyCacheObject key,
         byte[] keyBytes) throws IgniteCheckedException {
         GridOffHeapPartitionedMap m = offheap(spaceName);
 
@@ -182,7 +183,7 @@ public class GridOffHeapProcessor extends GridProcessorAdapter {
      * @param keyBytes Key bytes.
      * @throws IgniteCheckedException If failed.
      */
-    public void enableEviction(@Nullable String spaceName, int part, Object key, byte[] keyBytes)
+    public void enableEviction(@Nullable String spaceName, int part, KeyCacheObject key, byte[] keyBytes)
         throws IgniteCheckedException {
         GridOffHeapPartitionedMap m = offheap(spaceName);
 
@@ -201,7 +202,7 @@ public class GridOffHeapProcessor extends GridProcessorAdapter {
      * @return Value bytes.
      * @throws IgniteCheckedException If failed.
      */
-    @Nullable public <T> T getValue(@Nullable String spaceName, int part, Object key, byte[] keyBytes,
+    @Nullable public <T> T getValue(@Nullable String spaceName, int part, KeyCacheObject key, byte[] keyBytes,
         @Nullable ClassLoader ldr) throws IgniteCheckedException {
         byte[] valBytes = get(spaceName, part, key, keyBytes);
 
@@ -221,7 +222,7 @@ public class GridOffHeapProcessor extends GridProcessorAdapter {
      * @return Value bytes.
      * @throws IgniteCheckedException If failed.
      */
-    @Nullable public byte[] remove(@Nullable String spaceName, int part, Object key, byte[] keyBytes) throws IgniteCheckedException {
+    @Nullable public byte[] remove(@Nullable String spaceName, int part, KeyCacheObject key, byte[] keyBytes) throws IgniteCheckedException {
         GridOffHeapPartitionedMap m = offheap(spaceName);
 
         return m == null ? null : m.remove(part, U.hash(key), keyBytes(key, keyBytes));
@@ -237,7 +238,7 @@ public class GridOffHeapProcessor extends GridProcessorAdapter {
      * @param valBytes Value bytes.
      * @throws IgniteCheckedException If failed.
      */
-    public void put(@Nullable String spaceName, int part, Object key, byte[] keyBytes, byte[] valBytes)
+    public void put(@Nullable String spaceName, int part, KeyCacheObject key, byte[] keyBytes, byte[] valBytes)
         throws IgniteCheckedException {
         GridOffHeapPartitionedMap m = offheap(spaceName);
 
@@ -258,7 +259,7 @@ public class GridOffHeapProcessor extends GridProcessorAdapter {
      * @return {@code true} If succeeded.
      * @throws IgniteCheckedException If failed.
      */
-    public boolean removex(@Nullable String spaceName, int part, Object key, byte[] keyBytes) throws IgniteCheckedException {
+    public boolean removex(@Nullable String spaceName, int part, KeyCacheObject key, byte[] keyBytes) throws IgniteCheckedException {
         GridOffHeapPartitionedMap m = offheap(spaceName);
 
         return m != null && m.removex(part, U.hash(key), keyBytes(key, keyBytes));

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/23512df8/modules/core/src/main/java/org/apache/ignite/internal/util/GridDebug.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridDebug.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridDebug.java
index aadec74..98c8664 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridDebug.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridDebug.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.util;
 
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
 import org.jetbrains.annotations.*;
 
 import java.io.*;
@@ -173,13 +174,6 @@ public class GridDebug {
     }
 
     /**
-     * Dump collected data to stdout.
-     */
-    public static void dump() {
-        dump(que.get());
-    }
-
-    /**
      * Dumps given number of last events.
      *
      * @param n Number of last elements to dump.
@@ -204,7 +198,7 @@ public class GridDebug {
      * @param que Queue.
      */
     @SuppressWarnings("TypeMayBeWeakened")
-    public static void dump(ConcurrentLinkedQueue<Item> que) {
+    public static void dump(Collection<Item> que) {
         if (que == null)
             return;
 
@@ -226,7 +220,7 @@ public class GridDebug {
      */
     public static String dumpWithStop(Object... x) {
         debug(x);
-        return dumpWithReset(null);
+        return dumpWithReset(null, null);
     }
 
     /**
@@ -235,16 +229,20 @@ public class GridDebug {
      * @return Empty string (useful for assertions like {@code assert x == 0 : D.dumpWithReset();} ).
      */
     public static String dumpWithReset() {
-        return dumpWithReset(new ConcurrentLinkedQueue<Item>());
+        return dumpWithReset(new ConcurrentLinkedQueue<Item>(), null);
     }
 
     /**
      * Dump existing queue to stdout and atomically replace it with given.
      *
      * @param q2 Queue.
+     * @param filter Filter for logged debug items.
      * @return Empty string.
      */
-    private static String dumpWithReset(@Nullable ConcurrentLinkedQueue<Item> q2) {
+    public static String dumpWithReset(
+        @Nullable ConcurrentLinkedQueue<Item> q2,
+        @Nullable IgnitePredicate<Item> filter
+    ) {
         ConcurrentLinkedQueue<Item> q;
 
         do {
@@ -255,7 +253,20 @@ public class GridDebug {
         }
         while (!que.compareAndSet(q, q2));
 
-        dump(q);
+        Collection<Item> col = null;
+
+        if (filter == null)
+            col = q;
+        else if (q != null) {
+            col = new ArrayList<>();
+
+            for (Item item : q) {
+                if (filter.apply(item))
+                    col.add(item);
+            }
+        }
+
+        dump(col);
 
         return "";
     }
@@ -281,7 +292,7 @@ public class GridDebug {
      */
     private static String formatEntry(long ts, String threadName, long threadId, Object... data) {
         return "<" + DEBUG_DATE_FMT.format(new Date(ts)) + "><~DBG~><" + threadName + " id:" + threadId + "> " +
-            Arrays.toString(data);
+            Arrays.deepToString(data);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/23512df8/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapEvictQueryTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapEvictQueryTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapEvictQueryTest.java
new file mode 100644
index 0000000..fc6c125
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapEvictQueryTest.java
@@ -0,0 +1,179 @@
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.query.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.spi.swapspace.inmemory.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import javax.cache.*;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.cache.CacheMode.*;
+
+/**
+ */
+public class IgniteCacheOffheapEvictQueryTest extends GridCommonAbstractTest {
+    /** */
+    private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+        disco.setIpFinder(ipFinder);
+
+        cfg.setDiscoverySpi(disco);
+
+        cfg.setSwapSpaceSpi(new GridTestSwapSpaceSpi());
+
+        CacheConfiguration<?,?> cacheCfg = defaultCacheConfiguration();
+
+        cacheCfg.setCacheMode(PARTITIONED);
+        cacheCfg.setAtomicityMode(TRANSACTIONAL);
+        cacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+        cacheCfg.setSwapEnabled(true);
+        cacheCfg.setBackups(0);
+        cacheCfg.setMemoryMode(CacheMemoryMode.ONHEAP_TIERED);
+        cacheCfg.setEvictionPolicy(null);
+        cacheCfg.setNearConfiguration(null);
+
+        cacheCfg.setSqlOnheapRowCacheSize(128);
+
+        cacheCfg.setIndexedTypes(
+            Integer.class, Integer.class
+        );
+
+        cacheCfg.setOffHeapMaxMemory(2000); // Small offheap for evictions from offheap to swap.
+
+        cfg.setCacheConfiguration(cacheCfg);
+
+        return cfg;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testEvictAndRemove() throws Exception {
+        final int KEYS_CNT = 3000;
+        final int THREADS_CNT = 256;
+
+        final IgniteCache<Integer,Integer> c = startGrid().cache(null);
+
+        for (int i = 0; i < KEYS_CNT; i++) {
+            c.put(i, i);
+
+            if ((i & 1) == 0)
+                c.localEvict(F.asList(i));
+        }
+
+        X.println("___ Cache loaded...");
+
+        final CyclicBarrier b = new CyclicBarrier(THREADS_CNT, new Runnable() {
+            @Override public void run() {
+                X.println("___ go!");
+            }
+        });
+
+        final AtomicInteger keys = new AtomicInteger(KEYS_CNT);
+
+        IgniteInternalFuture<?> fut = multithreadedAsync(new Runnable() {
+            @Override public void run() {
+                Random rnd = new GridRandom();
+
+                try {
+                    b.await();
+                }
+                catch (InterruptedException e) {
+                    throw new IgniteInterruptedException(e);
+                }
+                catch (BrokenBarrierException e) {
+                    throw new IllegalStateException(e);
+                }
+
+                while (keys.get() > 0) {
+                    int k = rnd.nextInt(KEYS_CNT);
+
+                    try {
+                        switch (rnd.nextInt(4)) {
+                            case 0:
+                                c.localEvict(F.asList(k));
+
+                                break;
+
+                            case 1:
+                                c.get(k);
+
+                                break;
+
+                            case 2:
+                                if (c.remove(k))
+                                    keys.decrementAndGet();
+
+                                break;
+
+                            case 3:
+                                c.query(new SqlFieldsQuery("select _val from Integer where _key between ? and ?")
+                                    .setArgs(k, k + 20).setLocal(true)).getAll();
+
+                                break;
+                        }
+                    }
+                    catch (CacheException e) {
+                        String msgStart = "Failed to get value for key:";
+
+                        for (Throwable th = e; th != null; th = th.getCause()) {
+                            String msg = th.getMessage();
+
+                            if (msg != null && msg.startsWith(msgStart)) {
+                                int dot = msg.indexOf('.', msgStart.length());
+
+                                assertTrue(dot != -1);
+
+                                final Integer failedKey = Integer.parseInt(msg.substring(msgStart.length(), dot).trim());
+
+                                X.println("___ failed key: " + failedKey);
+
+                                break;
+                            }
+                        }
+
+                        LT.warn(log, null, e.getMessage());
+
+                        return;
+                    }
+                }
+            }
+        }, THREADS_CNT);
+
+        try {
+            fut.get(60_000);
+
+            if (c.size(CachePeekMode.ALL) != 0)
+                fail("Not all keys removed.");
+
+            X.println("___ all keys removed");
+        }
+        catch (IgniteFutureTimeoutCheckedException e) {
+            X.println("___ timeout");
+            X.println("___ keys: " + keys.get());
+
+            keys.set(0);
+
+            fut.get();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/23512df8/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
index dee3078..b9205a9 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
@@ -63,6 +63,7 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite {
         suite.addTestSuite(IgniteCacheQueryEvictsMultiThreadedSelfTest.class);
         suite.addTestSuite(IgniteCacheQueryOffheapMultiThreadedSelfTest.class);
         // suite.addTestSuite(IgniteCacheQueryOffheapEvictsMultiThreadedSelfTest.class); TODO IGNITE-971.
+        suite.addTestSuite(IgniteCacheOffheapEvictQueryTest.class);
         suite.addTestSuite(IgniteCacheSqlQueryMultiThreadedSelfTest.class);
         suite.addTestSuite(IgniteCacheOffheapTieredMultithreadedSelfTest.class);
         suite.addTestSuite(IgniteCacheQueryNodeRestartSelfTest.class);