You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/01/05 08:58:06 UTC
[03/26] ignite git commit: IGNITE-4408: Allow BinaryObjects pass to
IndexingSpi. This closes #1353.
IGNITE-4408: Allow BinaryObjects pass to IndexingSpi. This closes #1353.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/708cc8c6
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/708cc8c6
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/708cc8c6
Branch: refs/heads/master
Commit: 708cc8c6849b21063a555895671f6f820d92184a
Parents: c103ac3
Author: Andrey V. Mashenkov <an...@gmail.com>
Authored: Thu Dec 22 12:48:58 2016 +0300
Committer: Andrey V. Mashenkov <an...@gmail.com>
Committed: Thu Dec 22 12:48:58 2016 +0300
----------------------------------------------------------------------
.../apache/ignite/IgniteSystemProperties.java | 8 +
.../processors/cache/IgniteCacheProxy.java | 2 +-
.../cache/query/GridCacheQueryAdapter.java | 2 +-
.../processors/query/GridQueryProcessor.java | 36 +++-
.../apache/ignite/spi/indexing/IndexingSpi.java | 3 +
.../cache/query/IndexingSpiQuerySelfTest.java | 199 ++++++++++++++++++-
6 files changed, 229 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/708cc8c6/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index de6cbed..fe78d88 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -500,6 +500,14 @@ public final class IgniteSystemProperties {
public static final String IGNITE_UNALIGNED_MEMORY_ACCESS = "IGNITE_UNALIGNED_MEMORY_ACCESS";
/**
+ * When set to {@code true} BinaryObject will be unwrapped before passing to IndexingSpi to preserve
+ * old behavior query processor with IndexingSpi.
+ * <p>
+ * @deprecated Should be removed in Apache Ignite 2.0.
+ */
+ public static final String IGNITE_UNWRAP_BINARY_FOR_INDEXING_SPI = "IGNITE_UNWRAP_BINARY_FOR_INDEXING_SPI";
+
+ /**
* Enforces singleton.
*/
private IgniteSystemProperties() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/708cc8c6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index f87fa1d..b9737c6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -848,7 +848,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
*/
private void validate(Query qry) {
if (!GridQueryProcessor.isEnabled(ctx.config()) && !(qry instanceof ScanQuery) &&
- !(qry instanceof ContinuousQuery))
+ !(qry instanceof ContinuousQuery) && !(qry instanceof SpiQuery))
throw new CacheException("Indexing is disabled for cache: " + ctx.cache().name() +
". Use setIndexedTypes or setTypeMetadata methods on CacheConfiguration to enable.");
http://git-wip-us.apache.org/repos/asf/ignite/blob/708cc8c6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
index 2355591..b29e5e7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
@@ -430,7 +430,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
* @throws IgniteCheckedException If query is invalid.
*/
public void validate() throws IgniteCheckedException {
- if ((type != SCAN && type != SET) && !GridQueryProcessor.isEnabled(cctx.config()))
+ if ((type != SCAN && type != SET && type != SPI) && !GridQueryProcessor.isEnabled(cctx.config()))
throw new IgniteCheckedException("Indexing is disabled for cache: " + cctx.cache().name());
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/708cc8c6/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 8befa0e..6c093ee 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -44,7 +44,7 @@ import javax.cache.Cache;
import javax.cache.CacheException;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
-import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.binary.BinaryField;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.binary.BinaryType;
@@ -160,6 +160,9 @@ public class GridQueryProcessor extends GridProcessorAdapter {
/** */
private static final ThreadLocal<AffinityTopologyVersion> requestTopVer = new ThreadLocal<>();
+ /** Default is @{true} */
+ private final boolean isIndexingSpiAllowsBinary = !IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_UNWRAP_BINARY_FOR_INDEXING_SPI);
+
/**
* @param ctx Kernal context.
*/
@@ -680,7 +683,11 @@ public class GridQueryProcessor extends GridProcessorAdapter {
if (ctx.indexing().enabled()) {
coctx = cacheObjectContext(space);
- ctx.indexing().store(space, key.value(coctx, false), val.value(coctx, false), expirationTime);
+ Object key0 = unwrap(key, coctx);
+
+ Object val0 = unwrap(val, coctx);
+
+ ctx.indexing().store(space, key0, val0, expirationTime);
}
if (idx == null)
@@ -736,6 +743,13 @@ public class GridQueryProcessor extends GridProcessorAdapter {
}
/**
+ * Unwrap CacheObject if needed.
+ */
+ private Object unwrap(CacheObject obj, CacheObjectContext coctx) {
+ return isIndexingSpiAllowsBinary && ctx.cacheObjects().isBinaryObject(obj) ? obj : obj.value(coctx, false);
+ }
+
+ /**
* @throws IgniteCheckedException If failed.
*/
private void checkEnabled() throws IgniteCheckedException {
@@ -1025,7 +1039,9 @@ public class GridQueryProcessor extends GridProcessorAdapter {
if (ctx.indexing().enabled()) {
CacheObjectContext coctx = cacheObjectContext(space);
- ctx.indexing().remove(space, key.value(coctx, false));
+ Object key0 = unwrap(key, coctx);
+
+ ctx.indexing().remove(space, key0);
}
if (idx == null)
@@ -1168,11 +1184,9 @@ public class GridQueryProcessor extends GridProcessorAdapter {
if (ctx.indexing().enabled()) {
CacheObjectContext coctx = cacheObjectContext(spaceName);
- ctx.indexing().onSwap(
- spaceName,
- key.value(
- coctx,
- false));
+ Object key0 = unwrap(key, coctx);
+
+ ctx.indexing().onSwap(spaceName, key0);
}
if (idx == null)
@@ -1207,7 +1221,11 @@ public class GridQueryProcessor extends GridProcessorAdapter {
if (ctx.indexing().enabled()) {
CacheObjectContext coctx = cacheObjectContext(spaceName);
- ctx.indexing().onUnswap(spaceName, key.value(coctx, false), val.value(coctx, false));
+ Object key0 = unwrap(key, coctx);
+
+ Object val0 = unwrap(val, coctx);
+
+ ctx.indexing().onUnswap(spaceName, key0, val0);
}
if (idx == null)
http://git-wip-us.apache.org/repos/asf/ignite/blob/708cc8c6/modules/core/src/main/java/org/apache/ignite/spi/indexing/IndexingSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/indexing/IndexingSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/indexing/IndexingSpi.java
index a3ea33e..bbe27c0 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/indexing/IndexingSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/indexing/IndexingSpi.java
@@ -35,6 +35,9 @@ import org.jetbrains.annotations.Nullable;
* methods. Note again that calling methods from this interface on the obtained instance can lead
* to undefined behavior and explicitly not supported.
*
+ * <b>NOTE:</b> Key and value arguments of IgniteSpi methods can be {@link org.apache.ignite.binary.BinaryObject} instances.
+ * BinaryObjects can be deserialized manually if original objects needed.
+ *
* Here is a Java example on how to configure SPI.
* <pre name="code" class="java">
* IndexingSpi spi = new MyIndexingSpi();
http://git-wip-us.apache.org/repos/asf/ignite/blob/708cc8c6/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQuerySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQuerySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQuerySelfTest.java
index 94b0c8a..f66b99e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQuerySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQuerySelfTest.java
@@ -17,11 +17,22 @@
package org.apache.ignite.internal.processors.cache.query;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.Callable;
+import javax.cache.Cache;
import junit.framework.TestCase;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.IgniteTransactions;
import org.apache.ignite.Ignition;
+import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cache.query.SpiQuery;
@@ -40,17 +51,9 @@ import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
import org.apache.ignite.transactions.TransactionState;
+import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.SortedMap;
-import java.util.TreeMap;
-import java.util.concurrent.Callable;
-import javax.cache.Cache;
-
/**
* Indexing Spi query test
*/
@@ -88,6 +91,94 @@ public class IndexingSpiQuerySelfTest extends TestCase {
/**
* @throws Exception If failed.
*/
+ public void testIndexingSpiWithDisabledQueryProcessor() throws Exception {
+ IgniteConfiguration cfg = configuration();
+
+ cfg.setIndexingSpi(new MyIndexingSpi());
+
+ Ignite ignite = Ignition.start(cfg);
+
+ CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>("test-cache");
+
+ IgniteCache<Integer, Integer> cache = ignite.createCache(ccfg);
+
+ for (int i = 0; i < 10; i++)
+ cache.put(i, i);
+
+ QueryCursor<Cache.Entry<Integer, Integer>> cursor = cache.query(new SpiQuery<Integer, Integer>().setArgs(2, 5));
+
+ for (Cache.Entry<Integer, Integer> entry : cursor)
+ System.out.println(entry);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testBinaryIndexingSpi() throws Exception {
+ IgniteConfiguration cfg = configuration();
+
+ cfg.setIndexingSpi(new MyBinaryIndexingSpi());
+
+ Ignite ignite = Ignition.start(cfg);
+
+ CacheConfiguration<PersonKey, Person> ccfg = new CacheConfiguration<>("test-binary-cache");
+
+ ccfg.setIndexedTypes(PersonKey.class, Person.class);
+
+ IgniteCache<PersonKey, Person> cache = ignite.createCache(ccfg);
+
+ for (int i = 0; i < 10; i++) {
+ PersonKey key = new PersonKey(i);
+
+ cache.put(key, new Person("John Doe " + i));
+ }
+
+ QueryCursor<Cache.Entry<PersonKey, Person>> cursor = cache.query(
+ new SpiQuery<PersonKey, Person>().setArgs(new PersonKey(2), new PersonKey(5)));
+
+ for (Cache.Entry<PersonKey, Person> entry : cursor)
+ System.out.println(entry);
+
+ cache.remove(new PersonKey(9));
+ }
+
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testNonBinaryIndexingSpi() throws Exception {
+ System.setProperty(IgniteSystemProperties.IGNITE_UNWRAP_BINARY_FOR_INDEXING_SPI, "true");
+
+ IgniteConfiguration cfg = configuration();
+
+ cfg.setIndexingSpi(new MyIndexingSpi());
+
+ Ignite ignite = Ignition.start(cfg);
+
+ CacheConfiguration<PersonKey, Person> ccfg = new CacheConfiguration<>("test-binary-cache");
+
+ ccfg.setIndexedTypes(PersonKey.class, Person.class);
+
+ IgniteCache<PersonKey, Person> cache = ignite.createCache(ccfg);
+
+ for (int i = 0; i < 10; i++) {
+ PersonKey key = new PersonKey(i);
+
+ cache.put(key, new Person("John Doe " + i));
+ }
+
+ QueryCursor<Cache.Entry<PersonKey, Person>> cursor = cache.query(
+ new SpiQuery<PersonKey, Person>().setArgs(new PersonKey(2), new PersonKey(5)));
+
+ for (Cache.Entry<PersonKey, Person> entry : cursor)
+ System.out.println(entry);
+
+ cache.remove(new PersonKey(9));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
@SuppressWarnings("ThrowableResultOfMethodCallIgnored")
public void testIndexingSpiFailure() throws Exception {
IgniteConfiguration cfg = configuration();
@@ -173,6 +264,9 @@ public class IndexingSpiQuerySelfTest extends TestCase {
Object from = paramsIt.next();
Object to = paramsIt.next();
+ from = from instanceof BinaryObject ? ((BinaryObject)from).deserialize() : from;
+ to = to instanceof BinaryObject ? ((BinaryObject)to).deserialize() : to;
+
SortedMap<Object, Object> map = idx.subMap(from, to);
Collection<Cache.Entry<?, ?>> res = new ArrayList<>(map.size());
@@ -186,6 +280,9 @@ public class IndexingSpiQuerySelfTest extends TestCase {
/** {@inheritDoc} */
@Override public void store(@Nullable String spaceName, Object key, Object val, long expirationTime)
throws IgniteSpiException {
+ assertFalse(key instanceof BinaryObject);
+ assertFalse(val instanceof BinaryObject);
+
idx.put(key, val);
}
@@ -206,13 +303,95 @@ public class IndexingSpiQuerySelfTest extends TestCase {
}
/**
+ * Indexing Spi implementation for test. Accepts binary objects only
+ */
+ private static class MyBinaryIndexingSpi extends MyIndexingSpi {
+
+ /** {@inheritDoc} */
+ @Override public void store(@Nullable String spaceName, Object key, Object val,
+ long expirationTime) throws IgniteSpiException {
+ assertTrue(key instanceof BinaryObject);
+
+ assertTrue(val instanceof BinaryObject);
+
+ super.store(spaceName, ((BinaryObject)key).deserialize(), ((BinaryObject)val).deserialize(), expirationTime);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void remove(@Nullable String spaceName, Object key) throws IgniteSpiException {
+ assertTrue(key instanceof BinaryObject);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onSwap(@Nullable String spaceName, Object key) throws IgniteSpiException {
+ assertTrue(key instanceof BinaryObject);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void onUnswap(@Nullable String spaceName, Object key, Object val) throws IgniteSpiException {
+ assertTrue(key instanceof BinaryObject);
+
+ assertTrue(val instanceof BinaryObject);
+ }
+ }
+
+ /**
* Broken Indexing Spi implementation for test
*/
- private class MyBrokenIndexingSpi extends MyIndexingSpi {
+ private static class MyBrokenIndexingSpi extends MyIndexingSpi {
/** {@inheritDoc} */
@Override public void store(@Nullable String spaceName, Object key, Object val,
long expirationTime) throws IgniteSpiException {
throw new IgniteSpiException("Test exception");
}
}
+
+ /**
+ *
+ */
+ private static class PersonKey implements Serializable, Comparable<PersonKey> {
+ /** */
+ private int id;
+
+ /** */
+ public PersonKey(int id) {
+ this.id = id;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int compareTo(@NotNull PersonKey o) {
+ return Integer.compare(id, o.id);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ PersonKey key = (PersonKey)o;
+
+ return id == key.id;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return id;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class Person implements Serializable {
+ /** */
+ private String name;
+
+ /** */
+ Person(String name) {
+ this.name = name;
+ }
+ }
}
\ No newline at end of file