You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2017/02/13 10:32:09 UTC

[12/31] ignite git commit: IGNITE-4540: IndexingSPI can be used without have default H2 Indexing enabled. This closes #1423.

IGNITE-4540: IndexingSPI can be used without have default H2 Indexing enabled. This closes #1423.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a922ac9d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a922ac9d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a922ac9d

Branch: refs/heads/ignite-4436-2
Commit: a922ac9d17f91f25aaa2bac9f0a2622dbd04c9bb
Parents: 8e622e4
Author: Andrey V. Mashenkov <an...@gmail.com>
Authored: Tue Jan 17 15:31:04 2017 +0300
Committer: Andrey V. Mashenkov <an...@gmail.com>
Committed: Tue Jan 17 15:31:04 2017 +0300

----------------------------------------------------------------------
 .../cache/query/GridCacheQueryManager.java      | 83 ++++++++++++++++++--
 .../processors/query/GridQueryProcessor.java    | 46 -----------
 .../cache/query/IndexingSpiQuerySelfTest.java   | 66 ++++++++--------
 .../IndexingSpiQueryWithH2IndexingSelfTest.java | 36 +++++++++
 4 files changed, 145 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/a922ac9d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index 85c01d9..d64dff4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -1,4 +1,4 @@
-/*
+ /*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -45,6 +45,7 @@ import javax.cache.expiry.ExpiryPolicy;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cache.query.QueryMetrics;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
@@ -60,6 +61,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheEntryImpl;
 import org.apache.ignite.internal.processors.cache.CacheMetricsImpl;
 import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.CacheObjectContext;
 import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
@@ -167,6 +169,9 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
             }
         };
 
+    /** Default is @{code true} */
+    private final boolean isIndexingSpiAllowsBinary = !IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_UNWRAP_BINARY_FOR_INDEXING_SPI);
+
     /** */
     private GridQueryProcessor qryProc;
 
@@ -205,15 +210,24 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
     private boolean enabled;
 
     /** */
+    private boolean qryProcEnabled;
+
+
+    /** */
     private AffinityTopologyVersion qryTopVer;
 
     /** {@inheritDoc} */
     @Override public void start0() throws IgniteCheckedException {
         CacheConfiguration ccfg = cctx.config();
 
+        qryProcEnabled = GridQueryProcessor.isEnabled(ccfg);
+
         qryProc = cctx.kernalContext().query();
+
         space = cctx.name();
 
+        enabled = qryProcEnabled || (isIndexingSpiEnabled() && !CU.isSystemCache(space));
+
         maxIterCnt = ccfg.getMaxQueryIteratorsCount();
 
         detailMetricsSz = ccfg.getQueryDetailMetricsSize();
@@ -259,8 +273,6 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
 
         cctx.events().addListener(lsnr, EVT_NODE_LEFT, EVT_NODE_FAILED);
 
-        enabled = GridQueryProcessor.isEnabled(ccfg);
-
         qryTopVer = cctx.startTopologyVersion();
 
         if (qryTopVer == null)
@@ -369,11 +381,21 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
      * @throws IgniteCheckedException If failed.
      */
     public void onSwap(CacheObject key) throws IgniteCheckedException {
+        if(!enabled)
+            return;
+
         if (!enterBusy())
             return; // Ignore index update when node is stopping.
 
         try {
-            qryProc.onSwap(space, key);
+            if (isIndexingSpiEnabled()) {
+                Object key0 = unwrapIfNeeded(key, cctx.cacheObjectContext());
+
+                cctx.kernalContext().indexing().onSwap(space, key0);
+            }
+
+            if(qryProcEnabled)
+                qryProc.onSwap(space, key);
         }
         finally {
             leaveBusy();
@@ -381,6 +403,14 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
     }
 
     /**
+     * Checks if IndexinSPI is enabled.
+     * @return IndexingSPI enabled flag.
+     */
+    private boolean isIndexingSpiEnabled() {
+        return cctx.kernalContext().indexing().enabled();
+    }
+
+    /**
      * Entry for given key unswapped.
      *
      * @param key Key.
@@ -388,11 +418,25 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
      * @throws IgniteCheckedException If failed.
      */
     public void onUnswap(CacheObject key, CacheObject val) throws IgniteCheckedException {
+        if(!enabled)
+            return;
+
         if (!enterBusy())
             return; // Ignore index update when node is stopping.
 
         try {
-            qryProc.onUnswap(space, key, val);
+            if (isIndexingSpiEnabled()) {
+                CacheObjectContext coctx = cctx.cacheObjectContext();
+
+                Object key0 = unwrapIfNeeded(key, coctx);
+
+                Object val0 = unwrapIfNeeded(val, coctx);
+
+                cctx.kernalContext().indexing().onUnswap(space, key0, val0);
+            }
+
+            if(qryProcEnabled)
+                qryProc.onUnswap(space, key, val);
         }
         finally {
             leaveBusy();
@@ -429,7 +473,18 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
             return; // Ignore index update when node is stopping.
 
         try {
-            qryProc.store(space, key, val, CU.versionToBytes(ver), expirationTime);
+            if (isIndexingSpiEnabled()) {
+                CacheObjectContext coctx = cctx.cacheObjectContext();
+
+                Object key0 = unwrapIfNeeded(key, coctx);
+
+                Object val0 = unwrapIfNeeded(val, coctx);
+
+                cctx.kernalContext().indexing().store(space, key0, val0, expirationTime);
+            }
+
+            if(qryProcEnabled)
+                qryProc.store(space, key, val, CU.versionToBytes(ver), expirationTime);
         }
         finally {
             invalidateResultCache();
@@ -454,7 +509,14 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
             return; // Ignore index update when node is stopping.
 
         try {
-            qryProc.remove(space, key, val);
+            if (isIndexingSpiEnabled()) {
+                Object key0 = unwrapIfNeeded(key, cctx.cacheObjectContext());
+
+                cctx.kernalContext().indexing().remove(space, key0);
+            }
+
+            if(qryProcEnabled)
+                qryProc.remove(space, key, val);
         }
         finally {
             invalidateResultCache();
@@ -561,6 +623,13 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
     public abstract CacheQueryFuture<?> queryFieldsDistributed(GridCacheQueryBean qry, Collection<ClusterNode> nodes);
 
     /**
+     * Unwrap CacheObject if needed.
+     */
+    private Object unwrapIfNeeded(CacheObject obj, CacheObjectContext coctx) {
+        return isIndexingSpiAllowsBinary && cctx.cacheObjects().isBinaryObject(obj) ? obj : obj.value(coctx, false);
+    }
+
+    /**
      * Performs query.
      *
      * @param qry Query.

http://git-wip-us.apache.org/repos/asf/ignite/blob/a922ac9d/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index f4ac4ae..48ca2b5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -160,9 +160,6 @@ public class GridQueryProcessor extends GridProcessorAdapter {
     /** */
     private static final ThreadLocal<AffinityTopologyVersion> requestTopVer = new ThreadLocal<>();
 
-    /** Default is @{true} */
-    private final boolean isIndexingSpiAllowsBinary = !IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_UNWRAP_BINARY_FOR_INDEXING_SPI);
-
     /**
      * @param ctx Kernal context.
      */
@@ -682,16 +679,6 @@ public class GridQueryProcessor extends GridProcessorAdapter {
 
         CacheObjectContext coctx = null;
 
-        if (ctx.indexing().enabled()) {
-            coctx = cacheObjectContext(space);
-
-            Object key0 = unwrap(key, coctx);
-
-            Object val0 = unwrap(val, coctx);
-
-            ctx.indexing().store(space, key0, val0, expirationTime);
-        }
-
         if (idx == null)
             return;
 
@@ -745,13 +732,6 @@ public class GridQueryProcessor extends GridProcessorAdapter {
     }
 
     /**
-     * Unwrap CacheObject if needed.
-     */
-    private Object unwrap(CacheObject obj, CacheObjectContext coctx) {
-        return isIndexingSpiAllowsBinary && ctx.cacheObjects().isBinaryObject(obj) ? obj : obj.value(coctx, false);
-    }
-
-    /**
      * @throws IgniteCheckedException If failed.
      */
     private void checkEnabled() throws IgniteCheckedException {
@@ -1039,14 +1019,6 @@ public class GridQueryProcessor extends GridProcessorAdapter {
         if (log.isDebugEnabled())
             log.debug("Remove [space=" + space + ", key=" + key + ", val=" + val + "]");
 
-        if (ctx.indexing().enabled()) {
-            CacheObjectContext coctx = cacheObjectContext(space);
-
-            Object key0 = unwrap(key, coctx);
-
-            ctx.indexing().remove(space, key0);
-        }
-
         if (idx == null)
             return;
 
@@ -1184,14 +1156,6 @@ public class GridQueryProcessor extends GridProcessorAdapter {
         if (log.isDebugEnabled())
             log.debug("Swap [space=" + spaceName + ", key=" + key + "]");
 
-        if (ctx.indexing().enabled()) {
-            CacheObjectContext coctx = cacheObjectContext(spaceName);
-
-            Object key0 = unwrap(key, coctx);
-
-            ctx.indexing().onSwap(spaceName, key0);
-        }
-
         if (idx == null)
             return;
 
@@ -1221,16 +1185,6 @@ public class GridQueryProcessor extends GridProcessorAdapter {
         if (log.isDebugEnabled())
             log.debug("Unswap [space=" + spaceName + ", key=" + key + ", val=" + val + "]");
 
-        if (ctx.indexing().enabled()) {
-            CacheObjectContext coctx = cacheObjectContext(spaceName);
-
-            Object key0 = unwrap(key, coctx);
-
-            Object val0 = unwrap(val, coctx);
-
-            ctx.indexing().onUnswap(spaceName, key0, val0);
-        }
-
         if (idx == null)
             return;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a922ac9d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQuerySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQuerySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQuerySelfTest.java
index f66b99e..84a13df 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQuerySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQuerySelfTest.java
@@ -55,15 +55,40 @@ import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
 /**
- * Indexing Spi query test
+ * Indexing Spi query only test
  */
 public class IndexingSpiQuerySelfTest extends TestCase {
+    public static final String CACHE_NAME = "test-cache";
+
     /** {@inheritDoc} */
     @Override public void tearDown() throws Exception {
         Ignition.stopAll(true);
     }
 
     /**
+     * @return Configuration.
+     */
+    protected IgniteConfiguration configuration() {
+        IgniteConfiguration cfg = new IgniteConfiguration();
+
+        TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+        TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+        disco.setMaxMissedHeartbeats(Integer.MAX_VALUE);
+
+        disco.setIpFinder(ipFinder);
+
+        cfg.setDiscoverySpi(disco);
+
+        return cfg;
+    }
+
+    /** */
+    protected <K,V> CacheConfiguration<K, V> cacheConfiguration(String cacheName) {
+        return new CacheConfiguration<>(cacheName);
+    }
+
+    /**
      * @throws Exception If failed.
      */
     public void testSimpleIndexingSpi() throws Exception {
@@ -73,9 +98,7 @@ public class IndexingSpiQuerySelfTest extends TestCase {
 
         Ignite ignite = Ignition.start(cfg);
 
-        CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>("test-cache");
-
-        ccfg.setIndexedTypes(Integer.class, Integer.class);
+        CacheConfiguration<Integer, Integer> ccfg = cacheConfiguration(CACHE_NAME);
 
         IgniteCache<Integer, Integer> cache = ignite.createCache(ccfg);
 
@@ -98,7 +121,7 @@ public class IndexingSpiQuerySelfTest extends TestCase {
 
         Ignite ignite = Ignition.start(cfg);
 
-        CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>("test-cache");
+        CacheConfiguration<Integer, Integer> ccfg = cacheConfiguration(CACHE_NAME);
 
         IgniteCache<Integer, Integer> cache = ignite.createCache(ccfg);
 
@@ -121,9 +144,7 @@ public class IndexingSpiQuerySelfTest extends TestCase {
 
         Ignite ignite = Ignition.start(cfg);
 
-        CacheConfiguration<PersonKey, Person> ccfg = new CacheConfiguration<>("test-binary-cache");
-
-        ccfg.setIndexedTypes(PersonKey.class, Person.class);
+        CacheConfiguration<PersonKey, Person> ccfg = cacheConfiguration(CACHE_NAME);
 
         IgniteCache<PersonKey, Person> cache = ignite.createCache(ccfg);
 
@@ -155,9 +176,7 @@ public class IndexingSpiQuerySelfTest extends TestCase {
 
         Ignite ignite = Ignition.start(cfg);
 
-        CacheConfiguration<PersonKey, Person> ccfg = new CacheConfiguration<>("test-binary-cache");
-
-        ccfg.setIndexedTypes(PersonKey.class, Person.class);
+        CacheConfiguration<PersonKey, Person> ccfg = cacheConfiguration(CACHE_NAME);
 
         IgniteCache<PersonKey, Person> cache = ignite.createCache(ccfg);
 
@@ -187,10 +206,9 @@ public class IndexingSpiQuerySelfTest extends TestCase {
 
         Ignite ignite = Ignition.start(cfg);
 
-        CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>("test-cache");
+        CacheConfiguration<Integer, Integer> ccfg = cacheConfiguration(CACHE_NAME);
 
         ccfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
-        ccfg.setIndexedTypes(Integer.class, Integer.class);
 
         final IgniteCache<Integer, Integer> cache = ignite.createCache(ccfg);
 
@@ -219,24 +237,6 @@ public class IndexingSpiQuerySelfTest extends TestCase {
     }
 
     /**
-     * @return Configuration.
-     */
-    private IgniteConfiguration configuration() {
-        IgniteConfiguration cfg = new IgniteConfiguration();
-
-        TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
-        TcpDiscoverySpi disco = new TcpDiscoverySpi();
-
-        disco.setMaxMissedHeartbeats(Integer.MAX_VALUE);
-
-        disco.setIpFinder(ipFinder);
-
-        cfg.setDiscoverySpi(disco);
-
-        return cfg;
-    }
-
-    /**
      * Indexing Spi implementation for test
      */
     private static class MyIndexingSpi extends IgniteSpiAdapter implements IndexingSpi {
@@ -350,7 +350,7 @@ public class IndexingSpiQuerySelfTest extends TestCase {
     /**
      *
      */
-    private static class PersonKey implements Serializable, Comparable<PersonKey> {
+     static class PersonKey implements Serializable, Comparable<PersonKey> {
         /** */
         private int id;
 
@@ -385,7 +385,7 @@ public class IndexingSpiQuerySelfTest extends TestCase {
     /**
      *
      */
-    private static class Person implements Serializable {
+    static class Person implements Serializable {
         /** */
         private String name;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a922ac9d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQueryWithH2IndexingSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQueryWithH2IndexingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQueryWithH2IndexingSelfTest.java
new file mode 100644
index 0000000..800c5a2
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQueryWithH2IndexingSelfTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query;
+
+import org.apache.ignite.configuration.CacheConfiguration;
+
+/**
+ * Indexing Spi query with configured default indexer test
+ */
+public class IndexingSpiQueryWithH2IndexingSelfTest extends IndexingSpiQuerySelfTest {
+    /** */
+    protected <K, V> CacheConfiguration<K, V> cacheConfiguration(String cacheName) {
+        CacheConfiguration<K, V> ccfg = super.cacheConfiguration(cacheName);
+
+        ccfg.setIndexedTypes(PersonKey.class, Person.class);
+
+        ccfg.setIndexedTypes(Integer.class, Integer.class);
+
+        return ccfg;
+    }
+}
\ No newline at end of file