You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2017/02/13 10:32:09 UTC
[12/31] ignite git commit: IGNITE-4540: IndexingSPI can be used
without have default H2 Indexing enabled. This closes #1423.
IGNITE-4540: IndexingSPI can be used without have default H2 Indexing enabled. This closes #1423.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a922ac9d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a922ac9d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a922ac9d
Branch: refs/heads/ignite-4436-2
Commit: a922ac9d17f91f25aaa2bac9f0a2622dbd04c9bb
Parents: 8e622e4
Author: Andrey V. Mashenkov <an...@gmail.com>
Authored: Tue Jan 17 15:31:04 2017 +0300
Committer: Andrey V. Mashenkov <an...@gmail.com>
Committed: Tue Jan 17 15:31:04 2017 +0300
----------------------------------------------------------------------
.../cache/query/GridCacheQueryManager.java | 83 ++++++++++++++++++--
.../processors/query/GridQueryProcessor.java | 46 -----------
.../cache/query/IndexingSpiQuerySelfTest.java | 66 ++++++++--------
.../IndexingSpiQueryWithH2IndexingSelfTest.java | 36 +++++++++
4 files changed, 145 insertions(+), 86 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/a922ac9d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index 85c01d9..d64dff4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -1,4 +1,4 @@
-/*
+ /*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -45,6 +45,7 @@ import javax.cache.expiry.ExpiryPolicy;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cache.query.QueryMetrics;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
@@ -60,6 +61,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheEntryImpl;
import org.apache.ignite.internal.processors.cache.CacheMetricsImpl;
import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.CacheObjectContext;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
@@ -167,6 +169,9 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
}
};
+ /** Default is @{code true} */
+ private final boolean isIndexingSpiAllowsBinary = !IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_UNWRAP_BINARY_FOR_INDEXING_SPI);
+
/** */
private GridQueryProcessor qryProc;
@@ -205,15 +210,24 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
private boolean enabled;
/** */
+ private boolean qryProcEnabled;
+
+
+ /** */
private AffinityTopologyVersion qryTopVer;
/** {@inheritDoc} */
@Override public void start0() throws IgniteCheckedException {
CacheConfiguration ccfg = cctx.config();
+ qryProcEnabled = GridQueryProcessor.isEnabled(ccfg);
+
qryProc = cctx.kernalContext().query();
+
space = cctx.name();
+ enabled = qryProcEnabled || (isIndexingSpiEnabled() && !CU.isSystemCache(space));
+
maxIterCnt = ccfg.getMaxQueryIteratorsCount();
detailMetricsSz = ccfg.getQueryDetailMetricsSize();
@@ -259,8 +273,6 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
cctx.events().addListener(lsnr, EVT_NODE_LEFT, EVT_NODE_FAILED);
- enabled = GridQueryProcessor.isEnabled(ccfg);
-
qryTopVer = cctx.startTopologyVersion();
if (qryTopVer == null)
@@ -369,11 +381,21 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
* @throws IgniteCheckedException If failed.
*/
public void onSwap(CacheObject key) throws IgniteCheckedException {
+ if(!enabled)
+ return;
+
if (!enterBusy())
return; // Ignore index update when node is stopping.
try {
- qryProc.onSwap(space, key);
+ if (isIndexingSpiEnabled()) {
+ Object key0 = unwrapIfNeeded(key, cctx.cacheObjectContext());
+
+ cctx.kernalContext().indexing().onSwap(space, key0);
+ }
+
+ if(qryProcEnabled)
+ qryProc.onSwap(space, key);
}
finally {
leaveBusy();
@@ -381,6 +403,14 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
}
/**
+ * Checks if IndexinSPI is enabled.
+ * @return IndexingSPI enabled flag.
+ */
+ private boolean isIndexingSpiEnabled() {
+ return cctx.kernalContext().indexing().enabled();
+ }
+
+ /**
* Entry for given key unswapped.
*
* @param key Key.
@@ -388,11 +418,25 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
* @throws IgniteCheckedException If failed.
*/
public void onUnswap(CacheObject key, CacheObject val) throws IgniteCheckedException {
+ if(!enabled)
+ return;
+
if (!enterBusy())
return; // Ignore index update when node is stopping.
try {
- qryProc.onUnswap(space, key, val);
+ if (isIndexingSpiEnabled()) {
+ CacheObjectContext coctx = cctx.cacheObjectContext();
+
+ Object key0 = unwrapIfNeeded(key, coctx);
+
+ Object val0 = unwrapIfNeeded(val, coctx);
+
+ cctx.kernalContext().indexing().onUnswap(space, key0, val0);
+ }
+
+ if(qryProcEnabled)
+ qryProc.onUnswap(space, key, val);
}
finally {
leaveBusy();
@@ -429,7 +473,18 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
return; // Ignore index update when node is stopping.
try {
- qryProc.store(space, key, val, CU.versionToBytes(ver), expirationTime);
+ if (isIndexingSpiEnabled()) {
+ CacheObjectContext coctx = cctx.cacheObjectContext();
+
+ Object key0 = unwrapIfNeeded(key, coctx);
+
+ Object val0 = unwrapIfNeeded(val, coctx);
+
+ cctx.kernalContext().indexing().store(space, key0, val0, expirationTime);
+ }
+
+ if(qryProcEnabled)
+ qryProc.store(space, key, val, CU.versionToBytes(ver), expirationTime);
}
finally {
invalidateResultCache();
@@ -454,7 +509,14 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
return; // Ignore index update when node is stopping.
try {
- qryProc.remove(space, key, val);
+ if (isIndexingSpiEnabled()) {
+ Object key0 = unwrapIfNeeded(key, cctx.cacheObjectContext());
+
+ cctx.kernalContext().indexing().remove(space, key0);
+ }
+
+ if(qryProcEnabled)
+ qryProc.remove(space, key, val);
}
finally {
invalidateResultCache();
@@ -561,6 +623,13 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
public abstract CacheQueryFuture<?> queryFieldsDistributed(GridCacheQueryBean qry, Collection<ClusterNode> nodes);
/**
+ * Unwrap CacheObject if needed.
+ */
+ private Object unwrapIfNeeded(CacheObject obj, CacheObjectContext coctx) {
+ return isIndexingSpiAllowsBinary && cctx.cacheObjects().isBinaryObject(obj) ? obj : obj.value(coctx, false);
+ }
+
+ /**
* Performs query.
*
* @param qry Query.
http://git-wip-us.apache.org/repos/asf/ignite/blob/a922ac9d/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index f4ac4ae..48ca2b5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -160,9 +160,6 @@ public class GridQueryProcessor extends GridProcessorAdapter {
/** */
private static final ThreadLocal<AffinityTopologyVersion> requestTopVer = new ThreadLocal<>();
- /** Default is @{true} */
- private final boolean isIndexingSpiAllowsBinary = !IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_UNWRAP_BINARY_FOR_INDEXING_SPI);
-
/**
* @param ctx Kernal context.
*/
@@ -682,16 +679,6 @@ public class GridQueryProcessor extends GridProcessorAdapter {
CacheObjectContext coctx = null;
- if (ctx.indexing().enabled()) {
- coctx = cacheObjectContext(space);
-
- Object key0 = unwrap(key, coctx);
-
- Object val0 = unwrap(val, coctx);
-
- ctx.indexing().store(space, key0, val0, expirationTime);
- }
-
if (idx == null)
return;
@@ -745,13 +732,6 @@ public class GridQueryProcessor extends GridProcessorAdapter {
}
/**
- * Unwrap CacheObject if needed.
- */
- private Object unwrap(CacheObject obj, CacheObjectContext coctx) {
- return isIndexingSpiAllowsBinary && ctx.cacheObjects().isBinaryObject(obj) ? obj : obj.value(coctx, false);
- }
-
- /**
* @throws IgniteCheckedException If failed.
*/
private void checkEnabled() throws IgniteCheckedException {
@@ -1039,14 +1019,6 @@ public class GridQueryProcessor extends GridProcessorAdapter {
if (log.isDebugEnabled())
log.debug("Remove [space=" + space + ", key=" + key + ", val=" + val + "]");
- if (ctx.indexing().enabled()) {
- CacheObjectContext coctx = cacheObjectContext(space);
-
- Object key0 = unwrap(key, coctx);
-
- ctx.indexing().remove(space, key0);
- }
-
if (idx == null)
return;
@@ -1184,14 +1156,6 @@ public class GridQueryProcessor extends GridProcessorAdapter {
if (log.isDebugEnabled())
log.debug("Swap [space=" + spaceName + ", key=" + key + "]");
- if (ctx.indexing().enabled()) {
- CacheObjectContext coctx = cacheObjectContext(spaceName);
-
- Object key0 = unwrap(key, coctx);
-
- ctx.indexing().onSwap(spaceName, key0);
- }
-
if (idx == null)
return;
@@ -1221,16 +1185,6 @@ public class GridQueryProcessor extends GridProcessorAdapter {
if (log.isDebugEnabled())
log.debug("Unswap [space=" + spaceName + ", key=" + key + ", val=" + val + "]");
- if (ctx.indexing().enabled()) {
- CacheObjectContext coctx = cacheObjectContext(spaceName);
-
- Object key0 = unwrap(key, coctx);
-
- Object val0 = unwrap(val, coctx);
-
- ctx.indexing().onUnswap(spaceName, key0, val0);
- }
-
if (idx == null)
return;
http://git-wip-us.apache.org/repos/asf/ignite/blob/a922ac9d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQuerySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQuerySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQuerySelfTest.java
index f66b99e..84a13df 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQuerySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQuerySelfTest.java
@@ -55,15 +55,40 @@ import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
/**
- * Indexing Spi query test
+ * Indexing Spi query only test
*/
public class IndexingSpiQuerySelfTest extends TestCase {
+ public static final String CACHE_NAME = "test-cache";
+
/** {@inheritDoc} */
@Override public void tearDown() throws Exception {
Ignition.stopAll(true);
}
/**
+ * @return Configuration.
+ */
+ protected IgniteConfiguration configuration() {
+ IgniteConfiguration cfg = new IgniteConfiguration();
+
+ TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+ TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+ disco.setMaxMissedHeartbeats(Integer.MAX_VALUE);
+
+ disco.setIpFinder(ipFinder);
+
+ cfg.setDiscoverySpi(disco);
+
+ return cfg;
+ }
+
+ /** */
+ protected <K,V> CacheConfiguration<K, V> cacheConfiguration(String cacheName) {
+ return new CacheConfiguration<>(cacheName);
+ }
+
+ /**
* @throws Exception If failed.
*/
public void testSimpleIndexingSpi() throws Exception {
@@ -73,9 +98,7 @@ public class IndexingSpiQuerySelfTest extends TestCase {
Ignite ignite = Ignition.start(cfg);
- CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>("test-cache");
-
- ccfg.setIndexedTypes(Integer.class, Integer.class);
+ CacheConfiguration<Integer, Integer> ccfg = cacheConfiguration(CACHE_NAME);
IgniteCache<Integer, Integer> cache = ignite.createCache(ccfg);
@@ -98,7 +121,7 @@ public class IndexingSpiQuerySelfTest extends TestCase {
Ignite ignite = Ignition.start(cfg);
- CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>("test-cache");
+ CacheConfiguration<Integer, Integer> ccfg = cacheConfiguration(CACHE_NAME);
IgniteCache<Integer, Integer> cache = ignite.createCache(ccfg);
@@ -121,9 +144,7 @@ public class IndexingSpiQuerySelfTest extends TestCase {
Ignite ignite = Ignition.start(cfg);
- CacheConfiguration<PersonKey, Person> ccfg = new CacheConfiguration<>("test-binary-cache");
-
- ccfg.setIndexedTypes(PersonKey.class, Person.class);
+ CacheConfiguration<PersonKey, Person> ccfg = cacheConfiguration(CACHE_NAME);
IgniteCache<PersonKey, Person> cache = ignite.createCache(ccfg);
@@ -155,9 +176,7 @@ public class IndexingSpiQuerySelfTest extends TestCase {
Ignite ignite = Ignition.start(cfg);
- CacheConfiguration<PersonKey, Person> ccfg = new CacheConfiguration<>("test-binary-cache");
-
- ccfg.setIndexedTypes(PersonKey.class, Person.class);
+ CacheConfiguration<PersonKey, Person> ccfg = cacheConfiguration(CACHE_NAME);
IgniteCache<PersonKey, Person> cache = ignite.createCache(ccfg);
@@ -187,10 +206,9 @@ public class IndexingSpiQuerySelfTest extends TestCase {
Ignite ignite = Ignition.start(cfg);
- CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>("test-cache");
+ CacheConfiguration<Integer, Integer> ccfg = cacheConfiguration(CACHE_NAME);
ccfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
- ccfg.setIndexedTypes(Integer.class, Integer.class);
final IgniteCache<Integer, Integer> cache = ignite.createCache(ccfg);
@@ -219,24 +237,6 @@ public class IndexingSpiQuerySelfTest extends TestCase {
}
/**
- * @return Configuration.
- */
- private IgniteConfiguration configuration() {
- IgniteConfiguration cfg = new IgniteConfiguration();
-
- TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
- TcpDiscoverySpi disco = new TcpDiscoverySpi();
-
- disco.setMaxMissedHeartbeats(Integer.MAX_VALUE);
-
- disco.setIpFinder(ipFinder);
-
- cfg.setDiscoverySpi(disco);
-
- return cfg;
- }
-
- /**
* Indexing Spi implementation for test
*/
private static class MyIndexingSpi extends IgniteSpiAdapter implements IndexingSpi {
@@ -350,7 +350,7 @@ public class IndexingSpiQuerySelfTest extends TestCase {
/**
*
*/
- private static class PersonKey implements Serializable, Comparable<PersonKey> {
+ static class PersonKey implements Serializable, Comparable<PersonKey> {
/** */
private int id;
@@ -385,7 +385,7 @@ public class IndexingSpiQuerySelfTest extends TestCase {
/**
*
*/
- private static class Person implements Serializable {
+ static class Person implements Serializable {
/** */
private String name;
http://git-wip-us.apache.org/repos/asf/ignite/blob/a922ac9d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQueryWithH2IndexingSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQueryWithH2IndexingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQueryWithH2IndexingSelfTest.java
new file mode 100644
index 0000000..800c5a2
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQueryWithH2IndexingSelfTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query;
+
+import org.apache.ignite.configuration.CacheConfiguration;
+
+/**
+ * Indexing Spi query with configured default indexer test
+ */
+public class IndexingSpiQueryWithH2IndexingSelfTest extends IndexingSpiQuerySelfTest {
+ /** */
+ protected <K, V> CacheConfiguration<K, V> cacheConfiguration(String cacheName) {
+ CacheConfiguration<K, V> ccfg = super.cacheConfiguration(cacheName);
+
+ ccfg.setIndexedTypes(PersonKey.class, Person.class);
+
+ ccfg.setIndexedTypes(Integer.class, Integer.class);
+
+ return ccfg;
+ }
+}
\ No newline at end of file