You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/01/05 08:58:06 UTC

[03/26] ignite git commit: IGNITE-4408: Allow BinaryObjects pass to IndexingSpi. This closes #1353.

IGNITE-4408: Allow BinaryObjects pass to IndexingSpi. This closes #1353.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/708cc8c6
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/708cc8c6
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/708cc8c6

Branch: refs/heads/master
Commit: 708cc8c6849b21063a555895671f6f820d92184a
Parents: c103ac3
Author: Andrey V. Mashenkov <an...@gmail.com>
Authored: Thu Dec 22 12:48:58 2016 +0300
Committer: Andrey V. Mashenkov <an...@gmail.com>
Committed: Thu Dec 22 12:48:58 2016 +0300

----------------------------------------------------------------------
 .../apache/ignite/IgniteSystemProperties.java   |   8 +
 .../processors/cache/IgniteCacheProxy.java      |   2 +-
 .../cache/query/GridCacheQueryAdapter.java      |   2 +-
 .../processors/query/GridQueryProcessor.java    |  36 +++-
 .../apache/ignite/spi/indexing/IndexingSpi.java |   3 +
 .../cache/query/IndexingSpiQuerySelfTest.java   | 199 ++++++++++++++++++-
 6 files changed, 229 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/708cc8c6/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index de6cbed..fe78d88 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -500,6 +500,14 @@ public final class IgniteSystemProperties {
     public static final String IGNITE_UNALIGNED_MEMORY_ACCESS = "IGNITE_UNALIGNED_MEMORY_ACCESS";
 
     /**
+     * When set to {@code true} BinaryObject will be unwrapped before passing to IndexingSpi to preserve
+     * old behavior query processor with IndexingSpi.
+     * <p>
+     * @deprecated Should be removed in Apache Ignite 2.0.
+     */
+    public static final String IGNITE_UNWRAP_BINARY_FOR_INDEXING_SPI = "IGNITE_UNWRAP_BINARY_FOR_INDEXING_SPI";
+
+    /**
      * Enforces singleton.
      */
     private IgniteSystemProperties() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/708cc8c6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index f87fa1d..b9737c6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -848,7 +848,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
      */
     private void validate(Query qry) {
         if (!GridQueryProcessor.isEnabled(ctx.config()) && !(qry instanceof ScanQuery) &&
-            !(qry instanceof ContinuousQuery))
+            !(qry instanceof ContinuousQuery) && !(qry instanceof SpiQuery))
             throw new CacheException("Indexing is disabled for cache: " + ctx.cache().name() +
                 ". Use setIndexedTypes or setTypeMetadata methods on CacheConfiguration to enable.");
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/708cc8c6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
index 2355591..b29e5e7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
@@ -430,7 +430,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
      * @throws IgniteCheckedException If query is invalid.
      */
     public void validate() throws IgniteCheckedException {
-        if ((type != SCAN && type != SET) && !GridQueryProcessor.isEnabled(cctx.config()))
+        if ((type != SCAN && type != SET && type != SPI) && !GridQueryProcessor.isEnabled(cctx.config()))
             throw new IgniteCheckedException("Indexing is disabled for cache: " + cctx.cache().name());
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/708cc8c6/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 8befa0e..6c093ee 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -44,7 +44,7 @@ import javax.cache.Cache;
 import javax.cache.CacheException;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
-import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.binary.BinaryField;
 import org.apache.ignite.binary.BinaryObject;
 import org.apache.ignite.binary.BinaryType;
@@ -160,6 +160,9 @@ public class GridQueryProcessor extends GridProcessorAdapter {
     /** */
     private static final ThreadLocal<AffinityTopologyVersion> requestTopVer = new ThreadLocal<>();
 
+    /** Default is @{true} */
+    private final boolean isIndexingSpiAllowsBinary = !IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_UNWRAP_BINARY_FOR_INDEXING_SPI);
+
     /**
      * @param ctx Kernal context.
      */
@@ -680,7 +683,11 @@ public class GridQueryProcessor extends GridProcessorAdapter {
         if (ctx.indexing().enabled()) {
             coctx = cacheObjectContext(space);
 
-            ctx.indexing().store(space, key.value(coctx, false), val.value(coctx, false), expirationTime);
+            Object key0 = unwrap(key, coctx);
+
+            Object val0 = unwrap(val, coctx);
+
+            ctx.indexing().store(space, key0, val0, expirationTime);
         }
 
         if (idx == null)
@@ -736,6 +743,13 @@ public class GridQueryProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * Unwrap CacheObject if needed.
+     */
+    private Object unwrap(CacheObject obj, CacheObjectContext coctx) {
+        return isIndexingSpiAllowsBinary && ctx.cacheObjects().isBinaryObject(obj) ? obj : obj.value(coctx, false);
+    }
+
+    /**
      * @throws IgniteCheckedException If failed.
      */
     private void checkEnabled() throws IgniteCheckedException {
@@ -1025,7 +1039,9 @@ public class GridQueryProcessor extends GridProcessorAdapter {
         if (ctx.indexing().enabled()) {
             CacheObjectContext coctx = cacheObjectContext(space);
 
-            ctx.indexing().remove(space, key.value(coctx, false));
+            Object key0 = unwrap(key, coctx);
+
+            ctx.indexing().remove(space, key0);
         }
 
         if (idx == null)
@@ -1168,11 +1184,9 @@ public class GridQueryProcessor extends GridProcessorAdapter {
         if (ctx.indexing().enabled()) {
             CacheObjectContext coctx = cacheObjectContext(spaceName);
 
-            ctx.indexing().onSwap(
-                spaceName,
-                key.value(
-                    coctx,
-                    false));
+            Object key0 = unwrap(key, coctx);
+
+            ctx.indexing().onSwap(spaceName, key0);
         }
 
         if (idx == null)
@@ -1207,7 +1221,11 @@ public class GridQueryProcessor extends GridProcessorAdapter {
         if (ctx.indexing().enabled()) {
             CacheObjectContext coctx = cacheObjectContext(spaceName);
 
-            ctx.indexing().onUnswap(spaceName, key.value(coctx, false), val.value(coctx, false));
+            Object key0 = unwrap(key, coctx);
+
+            Object val0 = unwrap(val, coctx);
+
+            ctx.indexing().onUnswap(spaceName, key0, val0);
         }
 
         if (idx == null)

http://git-wip-us.apache.org/repos/asf/ignite/blob/708cc8c6/modules/core/src/main/java/org/apache/ignite/spi/indexing/IndexingSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/indexing/IndexingSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/indexing/IndexingSpi.java
index a3ea33e..bbe27c0 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/indexing/IndexingSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/indexing/IndexingSpi.java
@@ -35,6 +35,9 @@ import org.jetbrains.annotations.Nullable;
  * methods. Note again that calling methods from this interface on the obtained instance can lead
  * to undefined behavior and explicitly not supported.
  *
+ * <b>NOTE:</b> Key and value arguments of IgniteSpi methods can be {@link org.apache.ignite.binary.BinaryObject} instances.
+ * BinaryObjects can be deserialized manually if original objects needed.
+ *
  * Here is a Java example on how to configure SPI.
  * <pre name="code" class="java">
  * IndexingSpi spi = new MyIndexingSpi();

http://git-wip-us.apache.org/repos/asf/ignite/blob/708cc8c6/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQuerySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQuerySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQuerySelfTest.java
index 94b0c8a..f66b99e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQuerySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQuerySelfTest.java
@@ -17,11 +17,22 @@
 
 package org.apache.ignite.internal.processors.cache.query;
 
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.Callable;
+import javax.cache.Cache;
 import junit.framework.TestCase;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.IgniteTransactions;
 import org.apache.ignite.Ignition;
+import org.apache.ignite.binary.BinaryObject;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.query.QueryCursor;
 import org.apache.ignite.cache.query.SpiQuery;
@@ -40,17 +51,9 @@ import org.apache.ignite.transactions.Transaction;
 import org.apache.ignite.transactions.TransactionConcurrency;
 import org.apache.ignite.transactions.TransactionIsolation;
 import org.apache.ignite.transactions.TransactionState;
+import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.SortedMap;
-import java.util.TreeMap;
-import java.util.concurrent.Callable;
-import javax.cache.Cache;
-
 /**
  * Indexing Spi query test
  */
@@ -88,6 +91,94 @@ public class IndexingSpiQuerySelfTest extends TestCase {
     /**
      * @throws Exception If failed.
      */
+    public void testIndexingSpiWithDisabledQueryProcessor() throws Exception {
+        IgniteConfiguration cfg = configuration();
+
+        cfg.setIndexingSpi(new MyIndexingSpi());
+
+        Ignite ignite = Ignition.start(cfg);
+
+        CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>("test-cache");
+
+        IgniteCache<Integer, Integer> cache = ignite.createCache(ccfg);
+
+        for (int i = 0; i < 10; i++)
+            cache.put(i, i);
+
+        QueryCursor<Cache.Entry<Integer, Integer>> cursor = cache.query(new SpiQuery<Integer, Integer>().setArgs(2, 5));
+
+        for (Cache.Entry<Integer, Integer> entry : cursor)
+            System.out.println(entry);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testBinaryIndexingSpi() throws Exception {
+        IgniteConfiguration cfg = configuration();
+
+        cfg.setIndexingSpi(new MyBinaryIndexingSpi());
+
+        Ignite ignite = Ignition.start(cfg);
+
+        CacheConfiguration<PersonKey, Person> ccfg = new CacheConfiguration<>("test-binary-cache");
+
+        ccfg.setIndexedTypes(PersonKey.class, Person.class);
+
+        IgniteCache<PersonKey, Person> cache = ignite.createCache(ccfg);
+
+        for (int i = 0; i < 10; i++) {
+            PersonKey key = new PersonKey(i);
+
+            cache.put(key, new Person("John Doe " + i));
+        }
+
+        QueryCursor<Cache.Entry<PersonKey, Person>> cursor = cache.query(
+            new SpiQuery<PersonKey, Person>().setArgs(new PersonKey(2), new PersonKey(5)));
+
+        for (Cache.Entry<PersonKey, Person> entry : cursor)
+            System.out.println(entry);
+
+        cache.remove(new PersonKey(9));
+    }
+
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonBinaryIndexingSpi() throws Exception {
+        System.setProperty(IgniteSystemProperties.IGNITE_UNWRAP_BINARY_FOR_INDEXING_SPI, "true");
+
+        IgniteConfiguration cfg = configuration();
+
+        cfg.setIndexingSpi(new MyIndexingSpi());
+
+        Ignite ignite = Ignition.start(cfg);
+
+        CacheConfiguration<PersonKey, Person> ccfg = new CacheConfiguration<>("test-binary-cache");
+
+        ccfg.setIndexedTypes(PersonKey.class, Person.class);
+
+        IgniteCache<PersonKey, Person> cache = ignite.createCache(ccfg);
+
+        for (int i = 0; i < 10; i++) {
+            PersonKey key = new PersonKey(i);
+
+            cache.put(key, new Person("John Doe " + i));
+        }
+
+        QueryCursor<Cache.Entry<PersonKey, Person>> cursor = cache.query(
+            new SpiQuery<PersonKey, Person>().setArgs(new PersonKey(2), new PersonKey(5)));
+
+        for (Cache.Entry<PersonKey, Person> entry : cursor)
+            System.out.println(entry);
+
+        cache.remove(new PersonKey(9));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
     public void testIndexingSpiFailure() throws Exception {
         IgniteConfiguration cfg = configuration();
@@ -173,6 +264,9 @@ public class IndexingSpiQuerySelfTest extends TestCase {
             Object from = paramsIt.next();
             Object to = paramsIt.next();
 
+            from = from instanceof BinaryObject ? ((BinaryObject)from).deserialize() : from;
+            to = to instanceof BinaryObject ? ((BinaryObject)to).deserialize() : to;
+
             SortedMap<Object, Object> map = idx.subMap(from, to);
 
             Collection<Cache.Entry<?, ?>> res = new ArrayList<>(map.size());
@@ -186,6 +280,9 @@ public class IndexingSpiQuerySelfTest extends TestCase {
         /** {@inheritDoc} */
         @Override public void store(@Nullable String spaceName, Object key, Object val, long expirationTime)
             throws IgniteSpiException {
+            assertFalse(key instanceof BinaryObject);
+            assertFalse(val instanceof BinaryObject);
+
             idx.put(key, val);
         }
 
@@ -206,13 +303,95 @@ public class IndexingSpiQuerySelfTest extends TestCase {
     }
 
     /**
+     * Indexing Spi implementation for test. Accepts binary objects only
+     */
+    private static class MyBinaryIndexingSpi extends MyIndexingSpi {
+
+        /** {@inheritDoc} */
+        @Override public void store(@Nullable String spaceName, Object key, Object val,
+            long expirationTime) throws IgniteSpiException {
+            assertTrue(key instanceof BinaryObject);
+
+            assertTrue(val instanceof BinaryObject);
+
+            super.store(spaceName, ((BinaryObject)key).deserialize(), ((BinaryObject)val).deserialize(), expirationTime);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void remove(@Nullable String spaceName, Object key) throws IgniteSpiException {
+            assertTrue(key instanceof BinaryObject);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onSwap(@Nullable String spaceName, Object key) throws IgniteSpiException {
+            assertTrue(key instanceof BinaryObject);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public void onUnswap(@Nullable String spaceName, Object key, Object val) throws IgniteSpiException {
+            assertTrue(key instanceof BinaryObject);
+
+            assertTrue(val instanceof BinaryObject);
+        }
+    }
+
+    /**
      * Broken Indexing Spi implementation for test
      */
-    private class MyBrokenIndexingSpi extends MyIndexingSpi {
+    private static class MyBrokenIndexingSpi extends MyIndexingSpi {
         /** {@inheritDoc} */
         @Override public void store(@Nullable String spaceName, Object key, Object val,
             long expirationTime) throws IgniteSpiException {
             throw new IgniteSpiException("Test exception");
         }
     }
+
+    /**
+     *
+     */
+    private static class PersonKey implements Serializable, Comparable<PersonKey> {
+        /** */
+        private int id;
+
+        /** */
+        public PersonKey(int id) {
+            this.id = id;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int compareTo(@NotNull PersonKey o) {
+            return Integer.compare(id, o.id);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            PersonKey key = (PersonKey)o;
+
+            return id == key.id;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return id;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class Person implements Serializable {
+        /** */
+        private String name;
+
+        /** */
+        Person(String name) {
+            this.name = name;
+        }
+    }
 }
\ No newline at end of file