You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ni...@apache.org on 2020/06/21 14:14:26 UTC
[ignite] branch master updated: IGNITE-12808 Allow register started
caches in indexing to enable SQL query on them. (#7627)
This is an automated email from the ASF dual-hosted git repository.
nizhikov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 3485b92 IGNITE-12808 Allow register started caches in indexing to enable SQL query on them. (#7627)
3485b92 is described below
commit 3485b929a03a975f67a3e7ee44a78adb013db65a
Author: Ivan Daschinskiy <iv...@gmail.com>
AuthorDate: Sun Jun 21 17:14:09 2020 +0300
IGNITE-12808 Allow register started caches in indexing to enable SQL query on them. (#7627)
This patch introduce support of the `CREATE TABLE T1 ... WITH 'cache_name=some-cache` where "some-cache" is the existing cache name.
---
.../processors/cache/CacheMetricsImpl.java | 2 +-
.../processors/cache/ClusterCachesInfo.java | 2 +-
.../processors/cache/DynamicCacheDescriptor.java | 31 +-
.../processors/cache/GridCacheContext.java | 37 +-
.../processors/cache/GridCacheContextInfo.java | 31 +-
.../processors/cache/GridCacheProcessor.java | 16 +-
.../internal/processors/cache/GridCacheUtils.java | 38 ++
.../processors/cache/IgniteCacheProxyImpl.java | 2 +-
.../cache/ValidationOnNodeJoinUtils.java | 31 +-
.../GridCacheDatabaseSharedManager.java | 79 +--
.../IgniteCacheDatabaseSharedManager.java | 7 -
.../cache/query/GridCacheQueryManager.java | 14 +-
.../processors/query/GridQueryProcessor.java | 385 ++++++++++---
.../internal/processors/query/QuerySchema.java | 50 +-
.../query/schema/SchemaOperationException.java | 6 +
.../query/schema/SchemaOperationWorker.java | 3 +-
.../operation/SchemaAddQueryEntityOperation.java | 82 +++
.../main/resources/META-INF/classnames.properties | 1 +
.../processors/query/h2/CommandProcessor.java | 41 +-
.../processors/query/h2/IgniteH2Indexing.java | 7 +-
.../processors/query/h2/opt/GridH2Table.java | 2 +-
.../IgniteDynamicEnableIndexingRestoreTest.java | 274 +++++++++
.../index/DynamicEnableIndexingAbstractTest.java | 305 ++++++++++
.../index/DynamicEnableIndexingBasicSelfTest.java | 160 ++++++
.../DynamicEnableIndexingConcurrentSelfTest.java | 636 +++++++++++++++++++++
.../cache/index/H2DynamicTableSelfTest.java | 104 +++-
.../IgniteBinaryCacheQueryTestSuite.java | 2 +
.../IgniteBinaryCacheQueryTestSuite2.java | 5 +
28 files changed, 2145 insertions(+), 208 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
index c120993..097ab04 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
@@ -351,7 +351,7 @@ public class CacheMetricsImpl implements CacheMetrics {
"Number of partitions need to be cleared before actual rebalance start.");
mreg.register("IsIndexRebuildInProgress", () -> {
- IgniteInternalFuture fut = cctx.shared().database().indexRebuildFuture(cctx.cacheId());
+ IgniteInternalFuture fut = cctx.shared().kernalContext().query().indexRebuildFuture(cctx.cacheId());
return fut != null && !fut.isDone();
}, "True if index rebuild is in progress.");
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
index 58e3afa..8259334 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
@@ -2017,7 +2017,7 @@ public class ClusterCachesInfo {
else if (!active && isMergeConfigSupport) {
DynamicCacheDescriptor desc = registeredCaches.get(cfg.getName());
- QuerySchemaPatch schemaPatch = desc.makeSchemaPatch(cacheInfo.cacheData().queryEntities());
+ QuerySchemaPatch schemaPatch = desc.makeSchemaPatch(cacheInfo.cacheData());
if (schemaPatch.hasConflicts()) {
hasSchemaPatchConflict = true;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
index ae0ba34..95c24ad 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
@@ -29,6 +29,8 @@ import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProces
import org.apache.ignite.internal.processors.query.QuerySchema;
import org.apache.ignite.internal.processors.query.QuerySchemaPatch;
import org.apache.ignite.internal.processors.query.schema.message.SchemaFinishDiscoveryMessage;
+import org.apache.ignite.internal.processors.query.schema.operation.SchemaAbstractOperation;
+import org.apache.ignite.internal.processors.query.schema.operation.SchemaAddQueryEntityOperation;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.CU;
@@ -363,6 +365,24 @@ public class DynamicCacheDescriptor {
public void schemaChangeFinish(SchemaFinishDiscoveryMessage msg) {
synchronized (schemaMux) {
schema.finish(msg);
+
+ if (msg.operation() instanceof SchemaAddQueryEntityOperation) {
+ cacheCfg = GridCacheUtils.patchCacheConfiguration(cacheCfg,
+ (SchemaAddQueryEntityOperation)msg.operation());
+ }
+ }
+ }
+
+ /**
+ * Make schema patch for this cache.
+ *
+ * @param cacheData Stored cache by which current schema should be expanded.
+ * @return Patch which contains operations for expanding schema of this cache.
+ * @see QuerySchemaPatch
+ */
+ public QuerySchemaPatch makeSchemaPatch(StoredCacheData cacheData) {
+ synchronized (schemaMux) {
+ return schema.makePatch(cacheData.config(), cacheData.queryEntities());
}
}
@@ -387,7 +407,16 @@ public class DynamicCacheDescriptor {
*/
public boolean applySchemaPatch(QuerySchemaPatch patch) {
synchronized (schemaMux) {
- return schema.applyPatch(patch);
+ boolean res = schema.applyPatch(patch);
+
+ if (res) {
+ for (SchemaAbstractOperation op: patch.getPatchOperations()) {
+ if (op instanceof SchemaAddQueryEntityOperation)
+ cacheCfg = GridCacheUtils.patchCacheConfiguration(cacheCfg, (SchemaAddQueryEntityOperation)op);
+ }
+ }
+
+ return res;
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index 03f26be..8986e6f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -48,6 +48,7 @@ import org.apache.ignite.binary.BinaryField;
import org.apache.ignite.binary.BinaryObjectBuilder;
import org.apache.ignite.cache.CacheInterceptor;
import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.QueryEntity;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
@@ -95,6 +96,7 @@ import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProces
import org.apache.ignite.internal.processors.closure.GridClosureProcessor;
import org.apache.ignite.internal.processors.platform.cache.PlatformCacheManager;
import org.apache.ignite.internal.processors.plugin.CachePluginManager;
+import org.apache.ignite.internal.processors.query.schema.operation.SchemaAddQueryEntityOperation;
import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
import org.apache.ignite.internal.util.F0;
import org.apache.ignite.internal.util.lang.GridFunc;
@@ -157,7 +159,7 @@ public class GridCacheContext<K, V> implements Externalizable {
private IgniteLogger log;
/** Cache configuration. */
- private CacheConfiguration cacheCfg;
+ private volatile CacheConfiguration cacheCfg;
/** Affinity manager. */
private GridCacheAffinityManager affMgr;
@@ -2351,6 +2353,39 @@ public class GridCacheContext<K, V> implements Externalizable {
}
/**
+ * Apply changes from {@link SchemaAddQueryEntityOperation}.
+ *
+ * @param op Add query entity schema operation.
+ */
+ public void onSchemaAddQueryEntity(SchemaAddQueryEntityOperation op) {
+ onSchemaAddQueryEntity(op.entities(), op.schemaName(), op.isSqlEscape(),
+ op.queryParallelism());
+ }
+
+ /**
+ * Apply changes on enable indexing.
+ *
+ * @param entities New query entities.
+ * @param sqlSchema Sql schema name.
+ * @param isSqlEscape Sql escape flag.
+ * @param qryParallelism Query parallelism parameter.
+ */
+ public void onSchemaAddQueryEntity(
+ Collection<QueryEntity> entities,
+ String sqlSchema,
+ boolean isSqlEscape,
+ int qryParallelism
+ ) {
+ CacheConfiguration oldCfg = cacheCfg;
+
+ if (oldCfg != null)
+ cacheCfg = GridCacheUtils.patchCacheConfiguration(oldCfg, entities, sqlSchema, isSqlEscape, qryParallelism);
+
+ if (qryMgr != null)
+ qryMgr.enable();
+ }
+
+ /**
* Returns future that assigned to last performing {@link GlobalRemoveAllJob}.
*/
public AtomicReference<IgniteInternalFuture<Boolean>> lastRemoveAllJobFut() {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContextInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContextInfo.java
index c20e421..d1b5506 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContextInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContextInfo.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.cache;
import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.processors.query.schema.operation.SchemaAddQueryEntityOperation;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.lang.IgniteUuid;
@@ -35,7 +36,7 @@ public class GridCacheContextInfo<K, V> {
private final IgniteUuid dynamicDeploymentId;
/** Cache configuration. */
- private final CacheConfiguration config;
+ private volatile CacheConfiguration<K, V> config;
/** Cache group ID. */
private final int groupId;
@@ -44,7 +45,7 @@ public class GridCacheContextInfo<K, V> {
private final int cacheId;
/** Full cache context. Can be {@code null} in case a cache is not started. */
- @Nullable private volatile GridCacheContext cctx;
+ @Nullable private volatile GridCacheContext<K, V> cctx;
/**
* Constructor of full cache context.
@@ -80,7 +81,7 @@ public class GridCacheContextInfo<K, V> {
/**
* @return Cache configuration.
*/
- public CacheConfiguration config() {
+ public CacheConfiguration<K, V> config() {
return config;
}
@@ -115,7 +116,7 @@ public class GridCacheContextInfo<K, V> {
/**
* @return Cache context. {@code null} for not started cache.
*/
- @Nullable public GridCacheContext cacheContext() {
+ @Nullable public GridCacheContext<K, V> cacheContext() {
return cctx;
}
@@ -123,7 +124,7 @@ public class GridCacheContextInfo<K, V> {
* @return Dynamic deployment ID.
*/
public IgniteUuid dynamicDeploymentId() {
- GridCacheContext cctx0 = cctx;
+ GridCacheContext<K, V> cctx0 = cctx;
if (cctx0 != null)
return cctx0.dynamicDeploymentId();
@@ -138,7 +139,7 @@ public class GridCacheContextInfo<K, V> {
*
* @param cctx Initted cache context.
*/
- public void initCacheContext(GridCacheContext<?, ?> cctx) {
+ public void initCacheContext(GridCacheContext<K, V> cctx) {
assert this.cctx == null : this.cctx;
assert cctx != null;
@@ -167,6 +168,24 @@ public class GridCacheContextInfo<K, V> {
return cctx != null;
}
+ /**
+ * Apply changes from {@link SchemaAddQueryEntityOperation}.
+ *
+ * @param op Add query entity schema operation.
+ */
+ public void onSchemaAddQueryEntity(SchemaAddQueryEntityOperation op) {
+ if (cctx != null) {
+ cctx.onSchemaAddQueryEntity(op);
+
+ config = cctx.config();
+ }
+ else {
+ CacheConfiguration<K, V> oldCfg = config;
+
+ config = GridCacheUtils.patchCacheConfiguration(oldCfg, op);
+ }
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return "GridCacheContextInfo: " + name() + " " + (isCacheContextInited() ? "started" : "not started");
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 060555b..d994988 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -924,15 +924,23 @@ public class GridCacheProcessor extends GridProcessorAdapter {
reconnected.add(cache);
if (cache.context().userCache()) {
- // Re-create cache structures inside indexing in order to apply recent schema changes.
- GridCacheContextInfo cacheInfo = new GridCacheContextInfo(cache.context(), false);
+ DynamicCacheDescriptor desc = cacheDescriptor(cache.name());
- DynamicCacheDescriptor desc = cacheDescriptor(cacheInfo.name());
+ assert desc != null : cache.name();
- assert desc != null : cacheInfo.name();
+ if (!QueryUtils.isEnabled(cache.context().config())
+ && QueryUtils.isEnabled(desc.cacheConfiguration())) {
+ CacheConfiguration newCfg = desc.cacheConfiguration();
+
+ cache.context().onSchemaAddQueryEntity(newCfg.getQueryEntities(), newCfg.getSqlSchema(),
+ newCfg.isSqlEscapeAll(), newCfg.getQueryParallelism());
+ }
boolean rmvIdx = !cache.context().group().persistenceEnabled();
+ // Re-create cache structures inside indexing in order to apply recent schema changes.
+ GridCacheContextInfo cacheInfo = new GridCacheContextInfo(cache.context(), false);
+
ctx.query().onCacheStop0(cacheInfo, rmvIdx);
ctx.query().onCacheStart0(cacheInfo, desc.schema(), desc.sql());
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index 9fd2409..ac62e5e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -83,6 +83,7 @@ import org.apache.ignite.internal.processors.dr.GridDrType;
import org.apache.ignite.internal.processors.igfs.IgfsUtils;
import org.apache.ignite.internal.processors.query.QueryUtils;
import org.apache.ignite.internal.processors.query.schema.SchemaOperationException;
+import org.apache.ignite.internal.processors.query.schema.operation.SchemaAddQueryEntityOperation;
import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
import org.apache.ignite.internal.util.lang.GridClosureException;
import org.apache.ignite.internal.util.lang.IgniteInClosureX;
@@ -2122,6 +2123,43 @@ public class GridCacheUtils {
}
/**
+ * Patch cache configuration with {@link SchemaAddQueryEntityOperation}.
+ *
+ * @param oldCfg Old cache config.
+ * @param op Schema add query entity operation.
+ */
+ public static <K, V> CacheConfiguration<K, V> patchCacheConfiguration(
+ CacheConfiguration<K, V> oldCfg,
+ SchemaAddQueryEntityOperation op
+ ) {
+ return patchCacheConfiguration(oldCfg, op.entities(), op.schemaName(), op.isSqlEscape(),
+ op.queryParallelism());
+ }
+
+ /**
+ * Patch cache configuration with {@link SchemaAddQueryEntityOperation}.
+ *
+ * @param oldCfg Old cache config.
+ * @param entities New query entities.
+ * @param sqlSchema Sql schema name.
+ * @param isSqlEscape Sql escape flag.
+ * @param qryParallelism Query parallelism parameter.
+ */
+ public static <K, V> CacheConfiguration<K, V> patchCacheConfiguration(
+ CacheConfiguration<K, V> oldCfg,
+ Collection<QueryEntity> entities,
+ String sqlSchema,
+ boolean isSqlEscape,
+ int qryParallelism
+ ) {
+ return new CacheConfiguration<>(oldCfg)
+ .setQueryEntities(entities)
+ .setSqlSchema(sqlSchema)
+ .setSqlEscapeAll(isSqlEscape)
+ .setQueryParallelism(qryParallelism);
+ }
+
+ /**
*
*/
public interface BackupPostProcessingClosure extends IgniteInClosure<Collection<GridCacheEntryInfo>>,
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
index 05bc7db..0c6c117 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
@@ -2223,7 +2223,7 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
@Override public IgniteFuture<?> indexReadyFuture() {
GridCacheContext<K, V> ctx = getContextSafe();
- IgniteInternalFuture fut = ctx.shared().database().indexRebuildFuture(ctx.cacheId());
+ IgniteInternalFuture fut = ctx.shared().kernalContext().query().indexRebuildFuture(ctx.cacheId());
if (fut == null)
return new IgniteFinishedFutureImpl<>();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ValidationOnNodeJoinUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ValidationOnNodeJoinUtils.java
index 9371e85..e5ef064 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ValidationOnNodeJoinUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ValidationOnNodeJoinUtils.java
@@ -92,11 +92,16 @@ import static org.apache.ignite.internal.processors.security.SecurityUtils.nodeS
* Util class for joining node validation.
*/
public class ValidationOnNodeJoinUtils {
- /** Template of message of conflicts during configuration merge */
+ /** Template of message of conflicts of sql schema name. */
+ private static final String SQL_SCHEMA_CONFLICTS_MESSAGE =
+ "Failed to join node to the active cluster, configuration conflict for cache '%s': " +
+ "schema '%s' from joining node differs to '%s'";
+
+ /** Template of message of conflicts during configuration merge. */
private static final String MERGE_OF_CONFIG_CONFLICTS_MESSAGE =
"Conflicts during configuration merge for cache '%s' : \n%s";
- /** Template of message of node join was fail because it requires to merge of config */
+ /** Template of message of node join was fail because it requires to merge of config. */
private static final String MERGE_OF_CONFIG_REQUIRED_MESSAGE = "Failed to join node to the active cluster " +
"(the config of the cache '%s' has to be merged which is impossible on active grid). " +
"Deactivate grid and retry node join or clean the joining node.";
@@ -173,15 +178,29 @@ public class ValidationOnNodeJoinUtils {
if (locDesc == null)
continue;
- QuerySchemaPatch schemaPatch = locDesc.makeSchemaPatch(cacheInfo.cacheData().queryEntities());
+ String joinedSchema = cacheInfo.cacheData().config().getSqlSchema();
+ Collection<QueryEntity> joinedQryEntities = cacheInfo.cacheData().queryEntities();
+ String locSchema = locDesc.cacheConfiguration().getSqlSchema();
+
+ // Peform checks of SQL schema. If schemas' names not equal, only valid case is if local or joined
+ // QuerySchema is empty and schema name is null (when indexing enabled dynamically).
+ if (!F.eq(joinedSchema, locSchema)
+ && (locSchema != null || !locDesc.schema().isEmpty())
+ && (joinedSchema != null || !F.isEmpty(joinedQryEntities))) {
+ errorMsg.append(String.format(SQL_SCHEMA_CONFLICTS_MESSAGE, locDesc.cacheName(), joinedSchema,
+ locSchema));
+ }
+
+ QuerySchemaPatch schemaPatch = locDesc.makeSchemaPatch(joinedQryEntities);
if (schemaPatch.hasConflicts() || (isGridActive && !schemaPatch.isEmpty())) {
if (errorMsg.length() > 0)
errorMsg.append("\n");
- if (schemaPatch.hasConflicts())
- errorMsg.append(String.format(MERGE_OF_CONFIG_CONFLICTS_MESSAGE,
- locDesc.cacheName(), schemaPatch.getConflictsMessage()));
+ if (schemaPatch.hasConflicts()) {
+ errorMsg.append(String.format(MERGE_OF_CONFIG_CONFLICTS_MESSAGE, locDesc.cacheName(),
+ schemaPatch.getConflictsMessage()));
+ }
else
errorMsg.append(String.format(MERGE_OF_CONFIG_REQUIRED_MESSAGE, locDesc.cacheName()));
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index aee5649..4b01715 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -81,7 +81,6 @@ import org.apache.ignite.configuration.DataPageEvictionMode;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.configuration.NearCacheConfiguration;
import org.apache.ignite.failure.FailureContext;
import org.apache.ignite.failure.FailureType;
import org.apache.ignite.internal.GridKernalContext;
@@ -122,7 +121,6 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor;
import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
-import org.apache.ignite.internal.processors.cache.ExchangeActions;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
@@ -194,7 +192,6 @@ import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentLinkedHashMap;
import static java.nio.file.StandardOpenOption.READ;
-import static java.util.Objects.isNull;
import static java.util.Objects.nonNull;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_CHECKPOINT_READ_LOCK_TIMEOUT;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_JVM_PAUSE_DETECTOR_THRESHOLD;
@@ -366,9 +363,6 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
/** Thread local with buffers for the checkpoint threads. Each buffer represent one page for durable memory. */
private ThreadLocal<ByteBuffer> threadBuf;
- /** Map from a cacheId to a future indicating that there is an in-progress index rebuild for the given cache. */
- private final ConcurrentMap<Integer, GridFutureAdapter<Void>> idxRebuildFuts = new ConcurrentHashMap<>();
-
/**
* Lock holder for compatible folders mode. Null if lock holder was created at start node. <br>
* In this case lock is held on PDS resover manager and it is not required to manage locking here
@@ -1488,35 +1482,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
fut.timeBag().finishGlobalStage("Restore partition states");
}
- if (cctx.kernalContext().query().moduleEnabled()) {
- ExchangeActions acts = fut.exchangeActions();
-
- if (acts != null) {
- if (!F.isEmpty(acts.cacheStartRequests())) {
- for (ExchangeActions.CacheActionData actionData : acts.cacheStartRequests())
- prepareIndexRebuildFuture(CU.cacheId(actionData.request().cacheName()));
- }
- else if (acts.localJoinContext() != null && !F.isEmpty(acts.localJoinContext().caches())) {
- for (T2<DynamicCacheDescriptor, NearCacheConfiguration> tup : acts.localJoinContext().caches())
- prepareIndexRebuildFuture(tup.get1().cacheId());
- }
- }
- }
- }
-
- /**
- * Creates a new index rebuild future that should be completed later after exchange is done. The future
- * has to be created before exchange is initialized to guarantee that we will capture a correct future
- * after activation or restore completes.
- * If there was an old future for the given ID, it will be completed.
- *
- * @param cacheId Cache ID.
- */
- private void prepareIndexRebuildFuture(int cacheId) {
- GridFutureAdapter<Void> old = idxRebuildFuts.put(cacheId, new GridFutureAdapter<>());
-
- if (old != null)
- old.onDone();
+ if (cctx.kernalContext().query().moduleEnabled())
+ cctx.kernalContext().query().beforeExchange(fut);
}
/** {@inheritDoc} */
@@ -1541,43 +1508,12 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
if (!cacheCtx.startTopologyVersion().equals(exchangeFut.initialVersion()))
continue;
- int cacheId = cacheCtx.cacheId();
- GridFutureAdapter<Void> usrFut = idxRebuildFuts.get(cacheId);
-
IgniteInternalFuture<?> rebuildFut = qryProc.rebuildIndexesFromHash(cacheCtx);
- if (nonNull(rebuildFut)) {
- if (log.isInfoEnabled())
- log.info("Started indexes rebuilding for cache [" + cacheInfo(cacheCtx) + ']');
-
- assert nonNull(usrFut) : "Missing user future for cache: " + cacheCtx.name();
-
- rebuildFut.listen(fut -> {
- idxRebuildFuts.remove(cacheId, usrFut);
-
- Throwable err = fut.error();
-
- usrFut.onDone(err);
-
- if (isNull(err)) {
- if (log.isInfoEnabled())
- log.info("Finished indexes rebuilding for cache [" + cacheInfo(cacheCtx) + ']');
- }
- else {
- if (!(err instanceof NodeStoppingException))
- log.error("Failed to rebuild indexes for cache [" + cacheInfo(cacheCtx) + ']', err);
- }
-
- rebuildIndexesCompleteCntr.countDown(true);
- });
- }
- else if (nonNull(usrFut)) {
- idxRebuildFuts.remove(cacheId, usrFut);
-
- usrFut.onDone();
-
+ if (nonNull(rebuildFut))
+ rebuildFut.listen(fut -> rebuildIndexesCompleteCntr.countDown(true));
+ else
rebuildIndexesCompleteCntr.countDown(false);
- }
}
}
@@ -1594,11 +1530,6 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
}
/** {@inheritDoc} */
- @Nullable @Override public IgniteInternalFuture indexRebuildFuture(int cacheId) {
- return idxRebuildFuts.get(cacheId);
- }
-
- /** {@inheritDoc} */
@Override public void onCacheGroupsStopped(
Collection<IgniteBiTuple<CacheGroupContext, Boolean>> stoppedGrps
) {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
index b679669..32eb60d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
@@ -1005,13 +1005,6 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap
}
/**
- * @return Future that will be completed when indexes for given cache are restored.
- */
- @Nullable public IgniteInternalFuture indexRebuildFuture(int cacheId) {
- return null;
- }
-
- /**
* Reserve update history for exchange.
*
* @return Reserved update counters per cache and partition.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index 784f705..5462ed2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -217,10 +217,10 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
private GridLocalEventListener lsnr;
/** */
- private boolean enabled;
+ private volatile boolean enabled;
/** */
- private boolean qryProcEnabled;
+ private volatile boolean qryProcEnabled;
/** */
private AffinityTopologyVersion qryTopVer;
@@ -296,6 +296,14 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
return enabled;
}
+ /**
+ * Enable query manager.
+ */
+ public void enable() {
+ qryProcEnabled = true;
+ enabled = true;
+ }
+
/** {@inheritDoc} */
@Override protected void onKernalStop0(boolean cancel) {
busyLock.block();
@@ -425,7 +433,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
*/
public void remove(KeyCacheObject key, @Nullable CacheDataRow prevRow)
throws IgniteCheckedException {
- if (!QueryUtils.isEnabled(cctx.config()))
+ if (!qryProcEnabled)
return; // No-op.
if (!enterBusy())
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 8a94782..b6cc81e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -56,6 +56,7 @@ import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cache.query.SqlQuery;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
import org.apache.ignite.events.CacheQueryExecutedEvent;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInternalFuture;
@@ -68,12 +69,14 @@ import org.apache.ignite.internal.processors.cache.CacheObjectContext;
import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
import org.apache.ignite.internal.processors.cache.DynamicCacheChangeRequest;
import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
+import org.apache.ignite.internal.processors.cache.ExchangeActions;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheContextInfo;
import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.query.CacheQueryFuture;
@@ -95,6 +98,7 @@ import org.apache.ignite.internal.processors.query.schema.message.SchemaFinishDi
import org.apache.ignite.internal.processors.query.schema.message.SchemaOperationStatusMessage;
import org.apache.ignite.internal.processors.query.schema.message.SchemaProposeDiscoveryMessage;
import org.apache.ignite.internal.processors.query.schema.operation.SchemaAbstractOperation;
+import org.apache.ignite.internal.processors.query.schema.operation.SchemaAddQueryEntityOperation;
import org.apache.ignite.internal.processors.query.schema.operation.SchemaAlterTableAddColumnOperation;
import org.apache.ignite.internal.processors.query.schema.operation.SchemaAlterTableDropColumnOperation;
import org.apache.ignite.internal.processors.query.schema.operation.SchemaIndexCreateOperation;
@@ -129,6 +133,7 @@ import org.jetbrains.annotations.Nullable;
import static java.util.Collections.newSetFromMap;
import static java.util.Objects.isNull;
import static java.util.Objects.nonNull;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_EXECUTED;
import static org.apache.ignite.internal.GridTopic.TOPIC_SCHEMA;
import static org.apache.ignite.internal.IgniteComponentType.INDEXING;
@@ -192,6 +197,9 @@ public class GridQueryProcessor extends GridProcessorAdapter {
/** Active propose messages. */
private final LinkedHashMap<UUID, SchemaProposeDiscoveryMessage> activeProposals = new LinkedHashMap<>();
+ /** Map from a cacheId to a future indicating that there is an in-progress index rebuild for the given cache. */
+ private final ConcurrentMap<Integer, GridFutureAdapter<Void>> idxRebuildFuts = new ConcurrentHashMap<>();
+
/** General state mutex. */
private final Object stateMux = new Object();
@@ -199,7 +207,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
private ClusterNode crd;
/** Registered cache names. */
- private final Collection<String> cacheNames = newSetFromMap(new ConcurrentHashMap<String, Boolean>());
+ private final Collection<String> cacheNames = ConcurrentHashMap.newKeySet();
/** ID history for index create/drop discovery messages. */
private final GridBoundedConcurrentLinkedHashSet<IgniteUuid> dscoMsgIdHist =
@@ -225,7 +233,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
private boolean skipFieldLookup;
/** Cache name - value typeId pairs for which type mismatch message was logged. */
- private final Set<Long> missedCacheTypes = newSetFromMap(new ConcurrentHashMap<>());
+ private final Set<Long> missedCacheTypes = ConcurrentHashMap.newKeySet();
/**
* @param ctx Kernal context.
@@ -432,6 +440,41 @@ public class GridQueryProcessor extends GridProcessorAdapter {
}
/**
+ * Prepare index rebuild futures if needed before exchange.
+ *
+ * @param fut Exchange future.
+ */
+ public void beforeExchange(GridDhtPartitionsExchangeFuture fut) {
+ ExchangeActions acts = fut.exchangeActions();
+
+ if (acts != null) {
+ if (!F.isEmpty(acts.cacheStartRequests())) {
+ for (ExchangeActions.CacheActionData actionData : acts.cacheStartRequests())
+ prepareIndexRebuildFuture(CU.cacheId(actionData.request().cacheName()));
+ }
+ else if (acts.localJoinContext() != null && !F.isEmpty(acts.localJoinContext().caches())) {
+ for (T2<DynamicCacheDescriptor, NearCacheConfiguration> tup : acts.localJoinContext().caches())
+ prepareIndexRebuildFuture(tup.get1().cacheId());
+ }
+ }
+ }
+
+ /**
+ * Creates a new index rebuild future that should be completed later after exchange is done. The future
+ * has to be created before exchange is initialized to guarantee that we will capture a correct future
+ * after activation or restore completes.
+ * If there was an old future for the given ID, it will be completed.
+ *
+ * @param cacheId Cache ID.
+ */
+ private void prepareIndexRebuildFuture(int cacheId) {
+ GridFutureAdapter<Void> old = idxRebuildFuts.put(cacheId, new GridFutureAdapter<>());
+
+ if (old != null)
+ old.onDone();
+ }
+
+ /**
*
* @return Information about secondary indexes inline size. Key is a full index name, value is a effective inline size.
* @see GridQueryIndexing#secondaryIndexesInlineSize()
@@ -770,10 +813,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
err = res.get3();
}
else {
- // If cache is not started yet, there is no schema. Take schema from cache descriptor and validate.
- QuerySchema schema = cacheDesc.schema();
-
- T2<Boolean, SchemaOperationException> res = prepareChangeOnNotStartedCache(op, schema);
+ T2<Boolean, SchemaOperationException> res = prepareChangeOnNotStartedCache(op, cacheDesc);
assert res.get1() != null;
@@ -832,8 +872,26 @@ public class GridQueryProcessor extends GridProcessorAdapter {
*/
public void onCacheStart0(GridCacheContextInfo<?, ?> cacheInfo, QuerySchema schema, boolean isSql)
throws IgniteCheckedException {
- if (!cacheSupportSql(cacheInfo.config()))
- return;
+ if (!cacheSupportSql(cacheInfo.config())) {
+ synchronized (stateMux) {
+ boolean proceed = false;
+
+ for (SchemaAbstractDiscoveryMessage msg: activeProposals.values()) {
+ if (msg.operation() instanceof SchemaAddQueryEntityOperation) {
+ SchemaAddQueryEntityOperation op = (SchemaAddQueryEntityOperation)msg.operation();
+
+ if (op.cacheName().equals(cacheInfo.name())) {
+ proceed = true;
+
+ break;
+ }
+ }
+ }
+
+ if (!proceed)
+ return;
+ }
+ }
ctx.cache().context().database().checkpointReadLock();
@@ -848,53 +906,14 @@ public class GridQueryProcessor extends GridProcessorAdapter {
String schemaName = QueryUtils.normalizeSchemaName(cacheName, cacheInfo.config().getSqlSchema());
- // Prepare candidates.
- List<Class<?>> mustDeserializeClss = new ArrayList<>();
-
- Collection<QueryTypeCandidate> cands = new ArrayList<>();
-
- Collection<QueryEntity> qryEntities = schema.entities();
-
- if (!F.isEmpty(qryEntities)) {
- for (QueryEntity qryEntity : qryEntities) {
- QueryTypeCandidate cand = QueryUtils.typeForQueryEntity(
- ctx,
- cacheName,
- schemaName,
- cacheInfo,
- qryEntity,
- mustDeserializeClss,
- escape
- );
-
- cands.add(cand);
- }
- }
+ T3<Collection<QueryTypeCandidate>, Map<String, QueryTypeDescriptorImpl>, Map<String, QueryTypeDescriptorImpl>>
+ candRes = createQueryCandidates(cacheName, schemaName, cacheInfo, schema.entities(), escape);
// Ensure that candidates has unique index names.
// Otherwise we will not be able to apply pending operations.
- Map<String, QueryTypeDescriptorImpl> tblTypMap = new HashMap<>();
- Map<String, QueryTypeDescriptorImpl> idxTypMap = new HashMap<>();
-
- for (QueryTypeCandidate cand : cands) {
- QueryTypeDescriptorImpl desc = cand.descriptor();
-
- QueryTypeDescriptorImpl oldDesc = tblTypMap.put(desc.tableName(), desc);
-
- if (oldDesc != null)
- throw new IgniteException("Duplicate table name [cache=" + cacheName +
- ",tblName=" + desc.tableName() +
- ", type1=" + desc.name() + ", type2=" + oldDesc.name() + ']');
-
- for (String idxName : desc.indexes().keySet()) {
- oldDesc = idxTypMap.put(idxName, desc);
-
- if (oldDesc != null)
- throw new IgniteException("Duplicate index name [cache=" + cacheName +
- ",idxName=" + idxName +
- ", type1=" + desc.name() + ", type2=" + oldDesc.name() + ']');
- }
- }
+ Collection<QueryTypeCandidate> cands = candRes.get1();
+ Map<String, QueryTypeDescriptorImpl> tblTypMap = candRes.get2();
+ Map<String, QueryTypeDescriptorImpl> idxTypMap = candRes.get3();
// Apply pending operation which could have been completed as no-op at this point.
// There could be only one in-flight operation for a cache.
@@ -952,8 +971,24 @@ public class GridQueryProcessor extends GridProcessorAdapter {
processDynamicDropColumn(typeDesc, opDropCol.columns());
}
+ else if (op0 instanceof SchemaAddQueryEntityOperation) {
+ SchemaAddQueryEntityOperation opEnableIdx =
+ (SchemaAddQueryEntityOperation)op0;
+
+ cacheInfo.onSchemaAddQueryEntity(opEnableIdx);
+
+ cands = createQueryCandidates(
+ opEnableIdx.cacheName(),
+ opEnableIdx.schemaName(),
+ cacheInfo,
+ opEnableIdx.entities(),
+ opEnableIdx.isSqlEscape()
+ ).get1();
+
+ schemaName = opEnableIdx.schemaName();
+ }
else
- assert false;
+ assert false : "Unsupported operation: " + op0;
}
}
}
@@ -964,16 +999,6 @@ public class GridQueryProcessor extends GridProcessorAdapter {
// Ready to register at this point.
registerCache0(cacheName, schemaName, cacheInfo, cands, isSql);
-
- // Warn about possible implicit deserialization.
- if (!mustDeserializeClss.isEmpty()) {
- U.warnDevOnly(log, "Some classes in query configuration cannot be written in binary format " +
- "because they either implement Externalizable interface or have writeObject/readObject " +
- "methods. Instances of these classes will be deserialized in order to build indexes. Please " +
- "ensure that all nodes have these classes in classpath. To enable binary serialization " +
- "either implement " + Binarylizable.class.getSimpleName() + " interface or set explicit " +
- "serializer using BinaryTypeConfiguration.setSerializer() method: " + mustDeserializeClss);
- }
}
}
finally {
@@ -1153,6 +1178,83 @@ public class GridQueryProcessor extends GridProcessorAdapter {
}
/**
+ * Create query candidates and mappings of index and type descriptors names to them.
+ *
+ * @param cacheName Cache name.
+ * @param schemaName Schema name.
+ * @param cacheInfo Grid cache info.
+ * @param entities Collection of query entities.
+ * @param escape Sql escale flag.
+ * @return Triple of query candidates and mappings of index and type descriptors names to them.
+ * @throws IgniteCheckedException If failed.
+ */
+ private T3<Collection<QueryTypeCandidate>, Map<String, QueryTypeDescriptorImpl>, Map<String, QueryTypeDescriptorImpl>>
+ createQueryCandidates(
+ String cacheName,
+ String schemaName,
+ GridCacheContextInfo<?, ?> cacheInfo,
+ Collection<QueryEntity> entities,
+ boolean escape
+ ) throws IgniteCheckedException {
+ Collection<QueryTypeCandidate> cands = new ArrayList<>();
+
+ List<Class<?>> mustDeserializeClss = new ArrayList<>();
+
+ if (!F.isEmpty(entities)) {
+ for (QueryEntity qryEntity : entities) {
+ QueryTypeCandidate cand = QueryUtils.typeForQueryEntity(
+ ctx,
+ cacheName,
+ schemaName,
+ cacheInfo,
+ qryEntity,
+ mustDeserializeClss,
+ escape
+ );
+
+ cands.add(cand);
+ }
+ }
+
+ // Ensure that candidates has unique index names.
+ // Otherwise we will not be able to apply pending operations.
+ Map<String, QueryTypeDescriptorImpl> tblTypMap = new HashMap<>();
+ Map<String, QueryTypeDescriptorImpl> idxTypMap = new HashMap<>();
+
+ for (QueryTypeCandidate cand : cands) {
+ QueryTypeDescriptorImpl desc = cand.descriptor();
+
+ QueryTypeDescriptorImpl oldDesc = tblTypMap.put(desc.tableName(), desc);
+
+ if (oldDesc != null)
+ throw new IgniteException("Duplicate table name [cache=" + cacheName +
+ ",tblName=" + desc.tableName() +
+ ", type1=" + desc.name() + ", type2=" + oldDesc.name() + ']');
+
+ for (String idxName : desc.indexes().keySet()) {
+ oldDesc = idxTypMap.put(idxName, desc);
+
+ if (oldDesc != null)
+ throw new IgniteException("Duplicate index name [cache=" + cacheName +
+ ",idxName=" + idxName +
+ ", type1=" + desc.name() + ", type2=" + oldDesc.name() + ']');
+ }
+ }
+
+ // Warn about possible implicit deserialization.
+ if (!mustDeserializeClss.isEmpty()) {
+ U.warnDevOnly(log, "Some classes in query configuration cannot be written in binary format " +
+ "because they either implement Externalizable interface or have writeObject/readObject " +
+ "methods. Instances of these classes will be deserialized in order to build indexes. Please " +
+ "ensure that all nodes have these classes in classpath. To enable binary serialization " +
+ "either implement " + Binarylizable.class.getSimpleName() + " interface or set explicit " +
+ "serializer using BinaryTypeConfiguration.setSerializer() method: " + mustDeserializeClss);
+ }
+
+ return new T3<>(cands, tblTypMap, idxTypMap);
+ }
+
+ /**
* Register class metadata locally if it didn't do it earlier.
*
* @param cls Class for which the metadata should be registered.
@@ -1329,6 +1431,10 @@ public class GridQueryProcessor extends GridProcessorAdapter {
}
}
}
+ else if (op instanceof SchemaAddQueryEntityOperation) {
+ if (cacheNames.contains(op.cacheName()))
+ err = new SchemaOperationException(SchemaOperationException.CODE_CACHE_ALREADY_INDEXED, op.cacheName());
+ }
else
err = new SchemaOperationException("Unsupported operation: " + op);
@@ -1364,15 +1470,25 @@ public class GridQueryProcessor extends GridProcessorAdapter {
* Prepare operation on non-started cache.
*
* @param op Operation.
- * @param schema Known cache schema.
+ * @param desc Dynamic cache descriptor.
* @return Result: nop flag, error.
*/
- private T2<Boolean, SchemaOperationException> prepareChangeOnNotStartedCache(SchemaAbstractOperation op,
- QuerySchema schema) {
+ private T2<Boolean, SchemaOperationException> prepareChangeOnNotStartedCache(
+ SchemaAbstractOperation op,
+ DynamicCacheDescriptor desc
+ ) {
boolean nop = false;
SchemaOperationException err = null;
+ if (op instanceof SchemaAddQueryEntityOperation) {
+ if (cacheSupportSql(desc.cacheConfiguration()))
+ err = new SchemaOperationException(SchemaOperationException.CODE_CACHE_ALREADY_INDEXED, desc.cacheName());
+
+ return new T2<>(nop, err);
+ }
+
// Build table and index maps.
+ QuerySchema schema = desc.schema();
Map<String, QueryEntity> tblMap = new HashMap<>();
Map<String, T2<QueryEntity, QueryIndex>> idxMap = new HashMap<>();
@@ -1639,7 +1755,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
idxs.remove(idxKey);
}
else {
- assert (op instanceof SchemaAlterTableAddColumnOperation ||
+ assert (op instanceof SchemaAddQueryEntityOperation || op instanceof SchemaAlterTableAddColumnOperation ||
op instanceof SchemaAlterTableDropColumnOperation);
// No-op - all processing is done at "local" stage
@@ -1698,7 +1814,19 @@ public class GridQueryProcessor extends GridProcessorAdapter {
String cacheName = op.cacheName();
- GridCacheContextInfo cacheInfo = idx.registeredCacheInfo(cacheName);
+ GridCacheContextInfo<?, ?> cacheInfo = null;
+
+ if (op instanceof SchemaAddQueryEntityOperation) {
+ GridCacheContext<?, ?> cctx = ctx.cache().context().cacheContext(CU.cacheId(cacheName));
+
+ if (cctx != null)
+ cacheInfo = new GridCacheContextInfo<>(cctx, false);
+ else
+ return;
+
+ }
+ else
+ cacheInfo = idx.registeredCacheInfo(cacheName);
if (cacheInfo == null || !F.eq(depId, cacheInfo.dynamicDeploymentId()))
throw new SchemaOperationException(SchemaOperationException.CODE_CACHE_NOT_FOUND, cacheName);
@@ -1776,6 +1904,21 @@ public class GridQueryProcessor extends GridProcessorAdapter {
idx.dynamicDropColumn(op0.schemaName(), op0.tableName(), op0.columns(), op0.ifTableExists(),
op0.ifExists());
}
+ else if (op instanceof SchemaAddQueryEntityOperation) {
+ SchemaAddQueryEntityOperation op0 = (SchemaAddQueryEntityOperation)op;
+
+ if (!cacheNames.contains(op0.cacheName())) {
+ cacheInfo.onSchemaAddQueryEntity(op0);
+
+ T3<Collection<QueryTypeCandidate>, Map<String, QueryTypeDescriptorImpl>, Map<String, QueryTypeDescriptorImpl>>
+ candRes = createQueryCandidates(op0.cacheName(), op0.schemaName(), cacheInfo, op0.entities(),
+ op0.isSqlEscape());
+
+ registerCache0(op0.cacheName(), op.schemaName(), cacheInfo, candRes.get1(), false);
+ }
+
+ rebuildIndexesFromHash0(cacheInfo.cacheContext());
+ }
else
throw new SchemaOperationException("Unsupported operation: " + op);
}
@@ -1987,7 +2130,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
* @param destroy Destroy flag.
*/
public void onCacheStop0(GridCacheContextInfo cacheInfo, boolean destroy) {
- if (idx == null || !cacheSupportSql(cacheInfo.config()))
+ if (idx == null || !cacheNames.contains(cacheInfo.name()))
return;
String cacheName = cacheInfo.name();
@@ -2142,7 +2285,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
// Indexing module is disabled, nothing to rebuild.
if (rebuildIsMeaningless(cctx))
- return null;
+ return chainIndexRebuildFuture(null, cctx);
// No need to rebuild if cache has no data.
boolean empty = true;
@@ -2156,7 +2299,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
}
if (empty)
- return null;
+ return chainIndexRebuildFuture(null, cctx);
if (!busyLock.enterBusy()) {
return new GridFinishedFuture<>(new NodeStoppingException("Failed to rebuild indexes from hash " +
@@ -2164,7 +2307,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
}
try {
- return idx.rebuildIndexesFromHash(cctx);
+ return rebuildIndexesFromHash0(cctx);
}
finally {
busyLock.leaveBusy();
@@ -2172,6 +2315,68 @@ public class GridQueryProcessor extends GridProcessorAdapter {
}
/**
+ * @param cctx Cache context.
+ */
+ private IgniteInternalFuture<?> rebuildIndexesFromHash0(GridCacheContext<?, ?> cctx) {
+ IgniteInternalFuture<?> idxFut = idx.rebuildIndexesFromHash(cctx);
+
+ return chainIndexRebuildFuture(idxFut, cctx);
+ }
+
+ /**
+ * Chain real index rebuild future with user future and do some post processing.
+ *
+ * @param idxFut Real index future. If {@code null} simply completes existing user future.
+ * @param cctx Cache context.
+ * @return Chained user future.
+ */
+ private @Nullable IgniteInternalFuture<?> chainIndexRebuildFuture(
+ @Nullable IgniteInternalFuture<?> idxFut,
+ GridCacheContext<?, ?> cctx
+ ) {
+ int cacheId = cctx.cacheId();
+
+ if (nonNull(idxFut)) {
+ GridFutureAdapter<Void> res = idxRebuildFuts.computeIfAbsent(cacheId, id -> new GridFutureAdapter<>());
+
+ String cacheInfo = "[name=" + cctx.name() + ", grpName=" + cctx.group().name() + "]";
+
+ if (log.isInfoEnabled())
+ log.info("Started indexes rebuilding for cache " + cacheInfo);
+
+ idxFut.listen(fut -> {
+ idxRebuildFuts.remove(cacheId, res);
+
+ Throwable err = fut.error();
+
+ if (isNull(err) && log.isInfoEnabled())
+ log.info("Finished indexes rebuilding for cache " + cacheInfo);
+ else if (!(err instanceof NodeStoppingException))
+ log.error("Failed to rebuild indexes for cache " + cacheInfo, err);
+
+ res.onDone(err);
+ });
+
+ return res;
+ }
+ else {
+ GridFutureAdapter<Void> fut = idxRebuildFuts.remove(cacheId);
+
+ if (fut != null)
+ fut.onDone();
+
+ return null;
+ }
+ }
+
+ /**
+ * @return Future that will be completed when indexes for given cache are restored.
+ */
+ @Nullable public IgniteInternalFuture<?> indexRebuildFuture(int cacheId) {
+ return idxRebuildFuts.get(cacheId);
+ }
+
+ /**
* @param cacheName Cache name.
* @return Cache object context.
*/
@@ -2835,6 +3040,42 @@ public class GridQueryProcessor extends GridProcessorAdapter {
}
/**
+ * Enable dynamically indexing of existing cache.
+ *
+ * @param cacheName Cache name
+ * @param schemaName Target schema name.
+ * @param entity Instance of {@code QueryEntity}.
+ * @param qryParallelism Query parallelism.
+ * @param sqlEscape Escape flag, see{@link QueryUtils#normalizeQueryEntity}.
+ */
+ public IgniteInternalFuture<?> dynamicAddQueryEntity(
+ String cacheName,
+ String schemaName,
+ QueryEntity entity,
+ Integer qryParallelism,
+ boolean sqlEscape
+ ) {
+ assert qryParallelism == null || qryParallelism > 0;
+
+ CacheConfiguration cfg = ctx.cache().cacheConfiguration(cacheName);
+
+ if (qryParallelism != null && qryParallelism > 1 && cfg.getCacheMode() != PARTITIONED)
+ throw new IgniteSQLException("Segmented indices are supported for PARTITIONED mode only.");
+
+ QueryEntity entity0 = QueryUtils.normalizeQueryEntity(entity, sqlEscape);
+
+ SchemaAddQueryEntityOperation op = new SchemaAddQueryEntityOperation(
+ UUID.randomUUID(),
+ cacheName,
+ schemaName,
+ Collections.singletonList(entity0),
+ qryParallelism != null ? qryParallelism : CacheConfiguration.DFLT_QUERY_PARALLELISM,
+ sqlEscape);
+
+ return startIndexOperationDistributed(op);
+ }
+
+ /**
* Start distributed index change operation.
*
* @param op Operation.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QuerySchema.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QuerySchema.java
index 082d52f..5fac11c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QuerySchema.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QuerySchema.java
@@ -20,17 +20,21 @@ package org.apache.ignite.internal.processors.query;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.UUID;
import org.apache.ignite.cache.QueryEntity;
import org.apache.ignite.cache.QueryEntityPatch;
import org.apache.ignite.cache.QueryIndex;
+import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.processors.query.schema.message.SchemaFinishDiscoveryMessage;
import org.apache.ignite.internal.processors.query.schema.operation.SchemaAbstractOperation;
+import org.apache.ignite.internal.processors.query.schema.operation.SchemaAddQueryEntityOperation;
import org.apache.ignite.internal.processors.query.schema.operation.SchemaAlterTableAddColumnOperation;
import org.apache.ignite.internal.processors.query.schema.operation.SchemaAlterTableDropColumnOperation;
import org.apache.ignite.internal.processors.query.schema.operation.SchemaIndexCreateOperation;
@@ -94,7 +98,32 @@ public class QuerySchema implements Serializable {
* @see QuerySchemaPatch
*/
public QuerySchemaPatch makePatch(Collection<QueryEntity> target) {
+ return makePatch(null, target);
+ }
+
+ /**
+ * Make query schema patch.
+ *
+ * @param targetCfg Cache configuration when it should be changed (enabling indexing dynamically).
+ * @param target Query entity list to which current schema should be expanded.
+ * @return Patch to achieve entity which is a result of merging current one and target.
+ * @see QuerySchemaPatch
+ */
+ public QuerySchemaPatch makePatch(CacheConfiguration<?, ?> targetCfg, Collection<QueryEntity> target) {
synchronized (mux) {
+ if (entities.isEmpty() && targetCfg != null) {
+ SchemaAddQueryEntityOperation op = new SchemaAddQueryEntityOperation(
+ UUID.randomUUID(),
+ targetCfg.getName(),
+ targetCfg.getSqlSchema(),
+ target,
+ targetCfg.getQueryParallelism(),
+ targetCfg.isSqlEscapeAll()
+ );
+
+ return new QuerySchemaPatch(Collections.singletonList(op), Collections.emptyList(), "");
+ }
+
Map<String, QueryEntity> localEntities = new HashMap<>();
for (QueryEntity entity : entities) {
@@ -276,9 +305,7 @@ public class QuerySchema implements Serializable {
if (replaceTarget)
((List<QueryEntity>)entities).set(targetIdx, target);
}
- else {
- assert op instanceof SchemaAlterTableDropColumnOperation;
-
+ else if (op instanceof SchemaAlterTableDropColumnOperation) {
SchemaAlterTableDropColumnOperation op0 = (SchemaAlterTableDropColumnOperation)op;
int targetIdx = -1;
@@ -305,6 +332,14 @@ public class QuerySchema implements Serializable {
+ ", ifExists=" + op0.ifExists() + ']';
}
}
+ else {
+ assert op instanceof SchemaAddQueryEntityOperation : "Unsupported schema operation [" + op.toString() + "]";
+
+ assert entities.isEmpty();
+
+ for (QueryEntity opEntity: ((SchemaAddQueryEntityOperation)op).entities())
+ entities.add(QueryUtils.copy(opEntity));
+ }
}
}
@@ -317,6 +352,15 @@ public class QuerySchema implements Serializable {
}
}
+ /**
+ * @return {@code True} if entities is not empty.
+ */
+ public boolean isEmpty() {
+ synchronized (mux) {
+ return entities.isEmpty();
+ }
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(QuerySchema.class, this);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaOperationException.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaOperationException.java
index f0db026..d4e1693 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaOperationException.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaOperationException.java
@@ -51,6 +51,9 @@ public class SchemaOperationException extends IgniteCheckedException {
/** Code: index already exists. */
public static final int CODE_INDEX_EXISTS = 7;
+ /** Code: cache already indexed. */
+ public static final int CODE_CACHE_ALREADY_INDEXED = 8;
+
/** Error code. */
private final int code;
@@ -129,6 +132,9 @@ public class SchemaOperationException extends IgniteCheckedException {
case CODE_INDEX_EXISTS:
return "Index already exists: " + objName;
+ case CODE_CACHE_ALREADY_INDEXED:
+ return "Cache is already indexed: " + objName;
+
default:
assert false;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaOperationWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaOperationWorker.java
index 160fea0..e460ad9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaOperationWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaOperationWorker.java
@@ -24,6 +24,7 @@ import org.apache.ignite.internal.processors.query.GridQueryProcessor;
import org.apache.ignite.internal.processors.query.QueryTypeDescriptorImpl;
import org.apache.ignite.internal.processors.query.QueryUtils;
import org.apache.ignite.internal.processors.query.schema.operation.SchemaAbstractOperation;
+import org.apache.ignite.internal.processors.query.schema.operation.SchemaAddQueryEntityOperation;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.lang.IgniteInClosure;
@@ -95,7 +96,7 @@ public class SchemaOperationWorker extends GridWorker {
if (err != null)
fut.onDone(err);
- else if (nop || !cacheRegistered)
+ else if (nop || (!cacheRegistered && !(op instanceof SchemaAddQueryEntityOperation)))
fut.onDone();
pubFut = publicFuture(fut);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/operation/SchemaAddQueryEntityOperation.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/operation/SchemaAddQueryEntityOperation.java
new file mode 100644
index 0000000..feace59
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/operation/SchemaAddQueryEntityOperation.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.schema.operation;
+
+import java.util.Collection;
+import java.util.UUID;
+import org.apache.ignite.cache.QueryEntity;
+
+/**
+ * Enabling indexing on cache operation.
+ */
+public class SchemaAddQueryEntityOperation extends SchemaAbstractOperation {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private final Collection<QueryEntity> entities;
+
+ /** */
+ private final int qryParallelism;
+
+ /** */
+ private final boolean sqlEscape;
+
+ /**
+ * @param opId Operation ID.
+ * @param cacheName Cache name.
+ * @param schemaName Schema name.
+ * @param entities Collection of QueryEntity.
+ * @param qryParallelism Query parallelism.
+ * @param sqlEscape Sql escape flag.
+ */
+ public SchemaAddQueryEntityOperation(
+ UUID opId,
+ String cacheName,
+ String schemaName,
+ Collection<QueryEntity> entities,
+ int qryParallelism,
+ boolean sqlEscape
+ ) {
+ super(opId, cacheName, schemaName);
+ this.entities = entities;
+ this.qryParallelism = qryParallelism;
+ this.sqlEscape = sqlEscape;
+ }
+
+ /**
+ * @return Collection of query entities.
+ */
+ public Collection<QueryEntity> entities() {
+ return entities;
+ }
+
+ /**
+ * @return Query parallelism.
+ */
+ public int queryParallelism() {
+ return qryParallelism;
+ }
+
+ /**
+ * @return Sql escape flag.
+ */
+ public boolean isSqlEscape() {
+ return sqlEscape;
+ }
+}
diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties
index 2763ce1..ccd6087 100644
--- a/modules/core/src/main/resources/META-INF/classnames.properties
+++ b/modules/core/src/main/resources/META-INF/classnames.properties
@@ -1655,6 +1655,7 @@ org.apache.ignite.internal.processors.query.schema.operation.SchemaAlterTableDro
org.apache.ignite.internal.processors.query.schema.operation.SchemaIndexAbstractOperation
org.apache.ignite.internal.processors.query.schema.operation.SchemaIndexCreateOperation
org.apache.ignite.internal.processors.query.schema.operation.SchemaIndexDropOperation
+org.apache.ignite.internal.processors.query.schema.operation.SchemaAddQueryEntityOperation
org.apache.ignite.internal.processors.resource.GridResourceIoc$AnnotationSet
org.apache.ignite.internal.processors.resource.GridResourceIoc$ResourceAnnotation
org.apache.ignite.internal.processors.rest.GridRestCommand
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/CommandProcessor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/CommandProcessor.java
index fa1c753..de106eb 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/CommandProcessor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/CommandProcessor.java
@@ -787,21 +787,32 @@ public class CommandProcessor {
if (err != null)
throw err;
- ctx.query().dynamicTableCreate(
- cmd.schemaName(),
- e,
- cmd.templateName(),
- cmd.cacheName(),
- cmd.cacheGroup(),
- cmd.dataRegionName(),
- cmd.affinityKey(),
- cmd.atomicityMode(),
- cmd.writeSynchronizationMode(),
- cmd.backups(),
- cmd.ifNotExists(),
- cmd.encrypted(),
- cmd.parallelism()
- );
+ if (!F.isEmpty(cmd.cacheName()) && ctx.cache().cacheDescriptor(cmd.cacheName()) != null) {
+ ctx.query().dynamicAddQueryEntity(
+ cmd.cacheName(),
+ cmd.schemaName(),
+ e,
+ cmd.parallelism(),
+ true
+ ).get();
+ }
+ else {
+ ctx.query().dynamicTableCreate(
+ cmd.schemaName(),
+ e,
+ cmd.templateName(),
+ cmd.cacheName(),
+ cmd.cacheGroup(),
+ cmd.dataRegionName(),
+ cmd.affinityKey(),
+ cmd.atomicityMode(),
+ cmd.writeSynchronizationMode(),
+ cmd.backups(),
+ cmd.ifNotExists(),
+ cmd.encrypted(),
+ cmd.parallelism()
+ );
+ }
}
}
else if (cmdH2 instanceof GridSqlDropTable) {
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index df6089c..fdeb89c 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -1957,19 +1957,16 @@ public class IgniteH2Indexing implements GridQueryIndexing {
@Override public IgniteInternalFuture<?> rebuildIndexesFromHash(GridCacheContext cctx) {
assert nonNull(cctx);
- // No data in fresh in-memory cache.
- if (!cctx.group().persistenceEnabled())
+ if (!CU.affinityNode(cctx.localNode(), cctx.config().getNodeFilter()))
return null;
IgnitePageStoreManager pageStore = cctx.shared().pageStore();
- assert nonNull(pageStore);
-
SchemaIndexCacheVisitorClosure clo;
String cacheName = cctx.name();
- if (!pageStore.hasIndexStore(cctx.groupId())) {
+ if (pageStore == null || !pageStore.hasIndexStore(cctx.groupId())) {
// If there are no index store, rebuild all indexes.
clo = new IndexRebuildFullClosure(cctx.queries(), cctx.mvccEnabled());
}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
index 4fa8906..b1ca683 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
@@ -854,7 +854,7 @@ public class GridH2Table extends TableBase {
* @param clo Closure.
*/
public void collectIndexesForPartialRebuild(IndexRebuildPartialClosure clo) {
- for (int i = sysIdxsCnt; i < idxs.size(); i++) {
+ for (int i = 0; i < idxs.size(); i++) {
Index idx = idxs.get(i);
if (idx instanceof H2TreeIndex) {
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicEnableIndexingRestoreTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicEnableIndexingRestoreTest.java
new file mode 100644
index 0000000..bf67c9d
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicEnableIndexingRestoreTest.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CachePeekMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.internal.IgniteClientReconnectAbstractTest;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.index.DynamicEnableIndexingAbstractTest;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+/**
+ * Tests different scenarious to ensure that enabling indexing on persistence CACHE
+ * correctly persisted and validated on topology change.
+ */
+public class IgniteDynamicEnableIndexingRestoreTest extends DynamicEnableIndexingAbstractTest {
+ /** */
+ private static final String WRONG_SCHEMA_NAME = "DOMAIN_1";
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.setClusterStateOnStart(ClusterState.INACTIVE);
+
+ DataStorageConfiguration memCfg = new DataStorageConfiguration()
+ .setDefaultDataRegionConfiguration(
+ new DataRegionConfiguration().setMaxSize(200 * 1024 * 1024).setPersistenceEnabled(true))
+ .setWalMode(WALMode.LOG_ONLY);
+
+ cfg.setDataStorageConfiguration(memCfg);
+ cfg.setConsistentId(gridName);
+ cfg.setSqlSchemas(POI_SCHEMA_NAME, WRONG_SCHEMA_NAME);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ stopAllGrids();
+
+ cleanPersistenceDir();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+
+ cleanPersistenceDir();
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ @Test
+ public void testMergeCacheConfig_StartWithInitialCoordinator() throws Exception {
+ testMergeCacheConfig(0, 1);
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ @Test
+ public void testMergeCacheConfig_StartWithInitialSecondNode() throws Exception {
+ testMergeCacheConfig(1, 0);
+ }
+
+ /**
+ * @param firstIdx Index of first starting node after cluster stopping.
+ * @param secondIdx Index of second starting node after cluster stopping.
+ */
+ private void testMergeCacheConfig(int firstIdx, int secondIdx) throws Exception {
+ prepareTestGrid();
+
+ // Check when start from firstIdx node.
+ IgniteEx ig = startGrid(firstIdx);
+
+ startGrid(secondIdx);
+
+ ig.cluster().state(ClusterState.ACTIVE);
+
+ awaitPartitionMapExchange();
+
+ performQueryingIntegrityCheck(ig);
+
+ // Restart and start from the beginning.
+ stopAllGrids();
+
+ ig = startGrids(2);
+
+ ig.cluster().state(ClusterState.ACTIVE);
+
+ awaitPartitionMapExchange();
+
+ performQueryingIntegrityCheck(ig);
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ @Test
+ public void testFailJoiningNodeBecauseNeedConfigUpdateOnActiveGrid() throws Exception {
+ prepareTestGrid();
+
+ IgniteEx ig = startGrid(1);
+ ig.cluster().state(ClusterState.ACTIVE);
+
+ try {
+ startGrid(0);
+
+ fail("Node should start with fail");
+ }
+ catch (Exception e) {
+ assertThat(X.cause(e, IgniteSpiException.class).getMessage(),
+ containsString("the config of the cache 'poi' has to be merged which is impossible on active grid"));
+ }
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ @Test
+ public void testFailJoiningNodeDifferentSchemasOnDynamicIndexes() throws Exception {
+ prepareTestGrid();
+
+ IgniteEx ig = startGrid(1);
+ ig.cluster().state(ClusterState.ACTIVE);
+
+ // Enable indexing with different schema name.
+ createTable(ig.cache(POI_CACHE_NAME), WRONG_SCHEMA_NAME, CacheConfiguration.DFLT_QUERY_PARALLELISM);
+
+ try {
+ startGrid(0);
+
+ fail("Node should start with fail");
+ }
+ catch (Exception e) {
+ assertThat(X.cause(e, IgniteSpiException.class).getMessage(),
+ containsString("schema 'DOMAIN' from joining node differs to 'DOMAIN_1'"));
+ }
+ }
+
+ /**
+ * Check that client reconnects to restarted grid. Start grid from node with enabled indexing.
+ *
+ * @throws Exception if failed.
+ */
+ @Test
+ public void testReconnectClient_RestartFromNodeWithEnabledIndexing() throws Exception {
+ testReconnectClient(true);
+ }
+
+ /**
+ * Check that client reconnects to restarted grid. Start grid from node that stopped before enabled indexing.
+ *
+ * @throws Exception if failed.
+ */
+ @Test
+ public void testReconnectClient_RestartFromNodeWithDisabledIndexing() throws Exception {
+ testReconnectClient(false);
+ }
+
+ /**
+ * @param startFromEnabledIndexing If @{code true}, start grid from node with enabled indexing.
+ *
+ * @throws Exception if failed.
+ */
+ private void testReconnectClient(boolean startFromEnabledIndexing) throws Exception {
+ IgniteEx srv0 = startGrids(2);
+
+ IgniteEx cli = startClientGrid(2);
+
+ cli.cluster().state(ClusterState.ACTIVE);
+
+ IgniteCache<?, ?> cache = srv0.createCache(testCacheConfiguration(POI_CACHE_NAME));
+
+ loadData(srv0, 0, NUM_ENTRIES);
+
+ stopGrid(1);
+
+ createTable(cache, POI_SCHEMA_NAME, CacheConfiguration.DFLT_QUERY_PARALLELISM);
+
+ performQueryingIntegrityCheck(srv0);
+
+ IgniteClientReconnectAbstractTest.reconnectClientNode(log, cli, srv0, () -> {
+ try {
+ stopGrid(0);
+
+ if (startFromEnabledIndexing)
+ startGrid(0);
+ else
+ startGrid(1);
+ }
+ catch (Exception e) {
+ throw new IgniteException("Failed to restart cluster", e);
+ }
+ });
+
+ assertEquals(2, cli.cluster().nodes().size());
+ cli.cluster().state(ClusterState.ACTIVE);
+
+ if (startFromEnabledIndexing) {
+ awaitPartitionMapExchange();
+
+ performQueryingIntegrityCheck(cli);
+ }
+ else
+ assertEquals(NUM_ENTRIES, cli.getOrCreateCache(POI_CACHE_NAME).size(CachePeekMode.PRIMARY));
+ }
+
+ /**
+ * Prepare test grid:
+ * 1) Start two nodes, start cache and fill it with data.
+ * 2) Stop second node.
+ * 3) Enable indexing on cache on first node.
+ * 4) Stop cluster.
+ */
+ private void prepareTestGrid() throws Exception {
+ IgniteEx ig = startGrids(2);
+
+ ig.cluster().state(ClusterState.ACTIVE);
+
+ IgniteCache<?, ?> cache = ig.createCache(testCacheConfiguration(POI_CACHE_NAME));
+
+ loadData(ig, 0, NUM_ENTRIES);
+
+ stopGrid(1);
+
+ createTable(cache, POI_SCHEMA_NAME, CacheConfiguration.DFLT_QUERY_PARALLELISM);
+
+ performQueryingIntegrityCheck(ig);
+
+ stopAllGrids();
+ }
+
+ /** */
+ private CacheConfiguration<?, ?> testCacheConfiguration(String name) {
+ //Set transactional because of https://issues.apache.org/jira/browse/IGNITE-5564.
+ return new CacheConfiguration<>(name)
+ .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
+ .setCacheMode(CacheMode.REPLICATED)
+ .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+ }
+}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicEnableIndexingAbstractTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicEnableIndexingAbstractTest.java
new file mode 100644
index 0000000..845be8e
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicEnableIndexingAbstractTest.java
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.index;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.query.h2.H2TableDescriptor;
+import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
+import org.apache.ignite.internal.processors.query.h2.SchemaManager;
+import org.apache.ignite.internal.processors.query.h2.database.H2TreeIndex;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ * Base class for enable indexing tests.
+ */
+public class DynamicEnableIndexingAbstractTest extends GridCommonAbstractTest {
+ /** Node index for regular server (coordinator). */
+ protected static final int IDX_SRV_CRD = 0;
+
+ /** Node index for regular server (not coordinator). */
+ protected static final int IDX_SRV_NON_CRD = 1;
+
+ /** Node index for regular client. */
+ protected static final int IDX_CLI = 2;
+
+ /** Node index for server which doesn't pass node filter. */
+ protected static final int IDX_SRV_FILTERED = 3;
+
+ /** Node index for client with near-only cache. */
+ protected static final int IDX_CLI_NEAR_ONLY = 4;
+
+ /** Attribute to filter node out of cache data nodes. */
+ protected static final String ATTR_FILTERED = "FILTERED";
+
+ /** */
+ protected static final String POI_CACHE_NAME = "poi";
+
+ /** */
+ protected static final int NUM_ENTRIES = 1000;
+
+ /** */
+ protected static final String POI_SCHEMA_NAME = "DOMAIN";
+
+ /** */
+ protected static final String POI_TABLE_NAME = "POI";
+
+ /** */
+ protected static final String POI_CLASS_NAME = "PointOfInterest";
+
+ /** */
+ protected static final String ID_FIELD_NAME = "id";
+
+ /** */
+ protected static final String NAME_FIELD_NAME = "name";
+
+ /** */
+ protected static final String KEY_PK_IDX_NAME = "_key_PK";
+
+ /** */
+ protected static final String LATITUDE_FIELD_NAME = "latitude";
+
+ /** */
+ protected static final String LONGITUDE_FIELD_NAME = "longitude";
+
+ /** */
+ protected static final String SELECT_ALL_QUERY = String.format("SELECT * FROM %s", POI_TABLE_NAME);
+
+ /** */
+ protected static final int QUERY_PARALLELISM = 4;
+
+ /** */
+ protected void createTable(IgniteCache<?, ?> cache, int qryParallelism) {
+ createTable(cache, POI_SCHEMA_NAME, qryParallelism);
+ }
+
+ /** */
+ protected void createTable(IgniteCache<?, ?> cache, String schemaName, int qryParallelism) {
+ String sql = String.format(
+ "CREATE TABLE %s.%s " +
+ "(%s INT, %s VARCHAR," +
+ " %s DOUBLE PRECISION," +
+ " %s DOUBLE PRECISION," +
+ " PRIMARY KEY (%s)" +
+ ") WITH " +
+ " \"CACHE_NAME=%s,VALUE_TYPE=%s,PARALLELISM=%d\"",
+ schemaName, POI_TABLE_NAME, ID_FIELD_NAME, NAME_FIELD_NAME, LATITUDE_FIELD_NAME, LONGITUDE_FIELD_NAME,
+ ID_FIELD_NAME, POI_CACHE_NAME, POI_CLASS_NAME, qryParallelism);
+
+ cache.query(new SqlFieldsQuery(sql));
+ }
+
+ /** */
+ protected List<IgniteConfiguration> configurations() throws Exception {
+ return Arrays.asList(
+ serverConfiguration(IDX_SRV_CRD),
+ serverConfiguration(IDX_SRV_NON_CRD),
+ clientConfiguration(IDX_CLI),
+ serverConfiguration(IDX_SRV_FILTERED, true),
+ clientConfiguration(IDX_CLI_NEAR_ONLY)
+ );
+ }
+
+ /** */
+ protected IgniteConfiguration clientConfiguration(int idx) throws Exception {
+ return commonConfiguration(idx).setClientMode(true);
+ }
+
+ /** */
+ protected IgniteConfiguration serverConfiguration(int idx) throws Exception {
+ return serverConfiguration(idx, false);
+ }
+
+ /** */
+ protected IgniteConfiguration serverConfiguration(int idx, boolean filter) throws Exception {
+ IgniteConfiguration cfg = commonConfiguration(idx);
+
+ if (filter)
+ cfg.setUserAttributes(Collections.singletonMap(ATTR_FILTERED, true));
+
+ return cfg;
+ }
+
+ /** */
+ protected IgniteConfiguration commonConfiguration(int idx) throws Exception {
+ String gridName = getTestIgniteInstanceName(idx);
+
+ IgniteConfiguration cfg = getConfiguration(gridName);
+
+ cfg.setClusterStateOnStart(ClusterState.INACTIVE);
+
+ DataStorageConfiguration memCfg = new DataStorageConfiguration()
+ .setDefaultDataRegionConfiguration(new DataRegionConfiguration().setMaxSize(128 * 1024 * 1024));
+
+ cfg.setDataStorageConfiguration(memCfg);
+ cfg.setConsistentId(gridName);
+ cfg.setSqlSchemas(POI_SCHEMA_NAME);
+
+ return optimize(cfg);
+ }
+
+ /** */
+ protected CacheConfiguration<?, ?> testCacheConfiguration(
+ String name,
+ CacheMode mode,
+ CacheAtomicityMode atomicityMode
+ ) {
+ return new CacheConfiguration<>(name)
+ .setNodeFilter(new DynamicEnableIndexingBasicSelfTest.NodeFilter())
+ .setAtomicityMode(atomicityMode)
+ .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC)
+ .setCacheMode(mode);
+ }
+
+ /** */
+ protected void loadData(IgniteEx node, int start, int end) {
+ try (IgniteDataStreamer<Object, Object> streamer = node.dataStreamer(POI_CACHE_NAME)) {
+ Random rnd = ThreadLocalRandom.current();
+
+ for (int i = start; i < end; i++) {
+ BinaryObject bo = node.binary().builder(POI_CLASS_NAME)
+ .setField(NAME_FIELD_NAME, "POI_" + i, String.class)
+ .setField(LATITUDE_FIELD_NAME, rnd.nextDouble(), Double.class)
+ .setField(LONGITUDE_FIELD_NAME, rnd.nextDouble(), Double.class)
+ .build();
+
+ streamer.addData(i, bo);
+ }
+ }
+ }
+
+ /** */
+ protected void performQueryingIntegrityCheck(Ignite ig) throws Exception {
+ performQueryingIntegrityCheck(ig, 100);
+ }
+
+ /** */
+ protected List<List<?>> query(Ignite ig, String sql) throws Exception {
+ IgniteCache<Object, Object> cache = ig.cache(POI_CACHE_NAME).withKeepBinary();
+
+ return cache.query(new SqlFieldsQuery(sql).setSchema(POI_SCHEMA_NAME)).getAll();
+ }
+
+ /** */
+ protected void performQueryingIntegrityCheck(Ignite ig, int key) throws Exception {
+ IgniteCache<Object, Object> cache = ig.cache(POI_CACHE_NAME).withKeepBinary();
+
+ String sql = String.format("DELETE FROM %s WHERE %s = %d", POI_TABLE_NAME, ID_FIELD_NAME, key);
+ List<List<?>> res = cache.query(new SqlFieldsQuery(sql).setSchema(POI_SCHEMA_NAME)).getAll();
+
+ assertEquals(1, res.size());
+ assertNull(cache.get(key));
+
+ sql = String.format("INSERT INTO %s(%s) VALUES (%s)", POI_TABLE_NAME,
+ String.join(",", ID_FIELD_NAME, NAME_FIELD_NAME), String.join(",", String.valueOf(key), "'test'"));
+
+ res = cache.query(new SqlFieldsQuery(sql).setSchema(POI_SCHEMA_NAME)).getAll();
+
+ assertEquals(1, res.size());
+ assertNotNull(cache.get(key));
+
+ sql = String.format("UPDATE %s SET %s = '%s' WHERE ID = %d", POI_TABLE_NAME, NAME_FIELD_NAME, "POI_" + key, key);
+ res = cache.query(new SqlFieldsQuery(sql).setSchema(POI_SCHEMA_NAME)).getAll();
+
+ assertEquals(1, res.size());
+ assertEquals("POI_" + key, ((BinaryObject)cache.get(key)).field(NAME_FIELD_NAME));
+
+ assertIndexUsed(cache, "SELECT * FROM " + POI_TABLE_NAME + " WHERE ID = " + key, KEY_PK_IDX_NAME);
+ }
+
+ /** */
+ protected String explainPlan(IgniteCache<?, ?> cache, String sql) {
+ return cache.query(new SqlFieldsQuery("EXPLAIN " + sql).setSchema(POI_SCHEMA_NAME))
+ .getAll().get(0).get(0).toString().toLowerCase();
+ }
+
+ /** */
+ protected void assertIndexUsed(IgniteCache<?, ?> cache, String sql, String idx) throws IgniteCheckedException {
+ AtomicReference<String> currPlan = new AtomicReference<>();
+
+ boolean res = GridTestUtils.waitForCondition(() -> {
+ String plan = explainPlan(cache, sql);
+
+ currPlan.set(plan);
+
+ return plan.contains(idx.toLowerCase());
+ }, 1_000);
+
+ assertTrue("Query \"" + sql + "\" executed without usage of " + idx + ", see plan:\n\"" +
+ currPlan.get() + "\"", res);
+ }
+
+ /** */
+ protected void checkQueryParallelism(IgniteEx ig, CacheMode cacheMode) {
+ int expectedParallelism = cacheMode != CacheMode.REPLICATED ? QUERY_PARALLELISM :
+ CacheConfiguration.DFLT_QUERY_PARALLELISM;
+
+ IgniteH2Indexing indexing = (IgniteH2Indexing)ig.context().query().getIndexing();
+
+ SchemaManager schemaMgr = indexing.schemaManager();
+
+ H2TableDescriptor descr = schemaMgr.tableForType(POI_SCHEMA_NAME, POI_CACHE_NAME, POI_CLASS_NAME);
+
+ assertNotNull(descr);
+
+ if (descr.table().getIndex(KEY_PK_IDX_NAME) instanceof H2TreeIndex) {
+ H2TreeIndex pkIdx = (H2TreeIndex)descr.table().getIndex(KEY_PK_IDX_NAME);
+
+ assertNotNull(pkIdx);
+
+ assertEquals(expectedParallelism, pkIdx.segmentsCount());
+ }
+
+ CacheConfiguration<?, ?> cfg = ig.context().cache().cacheConfiguration(POI_CACHE_NAME);
+
+ assertEquals(expectedParallelism, cfg.getQueryParallelism());
+ }
+
+ /** */
+ protected static class NodeFilter implements IgnitePredicate<ClusterNode> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** {@inheritDoc} */
+ @Override public boolean apply(ClusterNode node) {
+ return node.attribute(ATTR_FILTERED) == null;
+ }
+ }
+}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicEnableIndexingBasicSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicEnableIndexingBasicSelfTest.java
new file mode 100644
index 0000000..492084d
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicEnableIndexingBasicSelfTest.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.index;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+/**
+ * Tests basic functionality of enabling indexing.
+ */
+@RunWith(Parameterized.class)
+public class DynamicEnableIndexingBasicSelfTest extends DynamicEnableIndexingAbstractTest {
+ /** Test parameters. */
+ @Parameters(name = "hasNear={0},nodeIdx={1},cacheMode={2},atomicityMode={3}")
+ public static Iterable<Object[]> params() {
+ int[] opNodes = new int[] {IDX_CLI, IDX_SRV_CRD, IDX_SRV_NON_CRD, IDX_SRV_FILTERED};
+
+ CacheMode[] cacheModes = new CacheMode[] {CacheMode.PARTITIONED, CacheMode.REPLICATED};
+
+ CacheAtomicityMode[] atomicityModes = new CacheAtomicityMode[] {
+ CacheAtomicityMode.ATOMIC,
+ CacheAtomicityMode.TRANSACTIONAL,
+ CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT
+ };
+
+ List<Object[]> res = new ArrayList<>();
+
+ for (int node : opNodes) {
+ for (CacheMode cacheMode : cacheModes) {
+ for (CacheAtomicityMode atomicityMode : atomicityModes) {
+ res.add(new Object[] {true, node, cacheMode, atomicityMode});
+
+ // For TRANSACTIONAL_SNAPSHOT near caches is forbidden.
+ if (atomicityMode != CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT)
+ res.add(new Object[] {false, node, cacheMode, atomicityMode});
+
+ }
+ }
+ }
+
+ return res;
+ }
+
+ /** */
+ @Parameter(0)
+ public Boolean hasNear;
+
+ /** */
+ @Parameter(1)
+ public int nodeIdx;
+
+ /** */
+ @Parameter(2)
+ public CacheMode cacheMode;
+
+ /** */
+ @Parameter(3)
+ public CacheAtomicityMode atomicityMode;
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ for (IgniteConfiguration cfg : configurations())
+ startGrid(cfg);
+
+ node().cluster().state(ClusterState.ACTIVE);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ CacheConfiguration<?, ?> ccfg = testCacheConfiguration(POI_CACHE_NAME, cacheMode, atomicityMode);
+
+ if (hasNear && atomicityMode != CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT)
+ ccfg.setNearConfiguration(new NearCacheConfiguration<>());
+
+ node().getOrCreateCache(ccfg);
+
+ if (atomicityMode != CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT)
+ grid(IDX_CLI_NEAR_ONLY).getOrCreateNearCache(POI_CACHE_NAME, new NearCacheConfiguration<>());
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ node().destroyCache(POI_CACHE_NAME);
+
+ super.afterTest();
+ }
+
+ /** */
+ @Test
+ public void testEnableDynamicIndexing() throws Exception {
+ loadData(node(), 0, NUM_ENTRIES / 2);
+
+ createTable();
+
+ grid(IDX_SRV_CRD).cache(POI_CACHE_NAME).indexReadyFuture().get();
+
+ loadData(node(), NUM_ENTRIES / 2, NUM_ENTRIES);
+
+ for (Ignite ig : G.allGrids()) {
+ assertEquals(NUM_ENTRIES, query(ig, SELECT_ALL_QUERY).size());
+
+ performQueryingIntegrityCheck(ig);
+
+ checkQueryParallelism((IgniteEx)ig, cacheMode);
+ }
+ }
+
+ /** */
+ @SuppressWarnings("ThrowableNotThrown")
+ private void createTable() {
+ if (cacheMode == CacheMode.REPLICATED) {
+ GridTestUtils.assertThrows(log, () -> createTable(node().cache(POI_CACHE_NAME), QUERY_PARALLELISM),
+ IgniteException.class, "Segmented indices are supported for PARTITIONED mode only.");
+
+ createTable(node().cache(POI_CACHE_NAME), 1);
+ }
+ else
+ createTable(node().cache(POI_CACHE_NAME), QUERY_PARALLELISM);
+ }
+
+ /** */
+ private IgniteEx node() {
+ return grid(nodeIdx);
+ }
+}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicEnableIndexingConcurrentSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicEnableIndexingConcurrentSelfTest.java
new file mode 100644
index 0000000..bdaa1ba
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicEnableIndexingConcurrentSelfTest.java
@@ -0,0 +1,636 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.index;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.cache.CacheException;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.QueryIndex;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteClientReconnectAbstractTest;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.managers.discovery.CustomEventListener;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.query.GridQueryProcessor;
+import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
+import org.apache.ignite.internal.processors.query.schema.SchemaOperationException;
+import org.apache.ignite.internal.processors.query.schema.message.SchemaFinishDiscoveryMessage;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.transactions.TransactionSerializationException;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+/**
+ * Test different scnerarions on concurrent enabling indexing.
+ */
+@RunWith(Parameterized.class)
+public class DynamicEnableIndexingConcurrentSelfTest extends DynamicEnableIndexingAbstractTest {
+ /** Test parameters. */
+ @Parameters(name = "cacheMode={0},atomicityMode={1}")
+ public static Iterable<Object[]> params() {
+ CacheMode[] cacheModes = new CacheMode[] {CacheMode.PARTITIONED, CacheMode.REPLICATED};
+
+ CacheAtomicityMode[] atomicityModes = new CacheAtomicityMode[] {
+ CacheAtomicityMode.ATOMIC,
+ CacheAtomicityMode.TRANSACTIONAL,
+ CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT
+ };
+
+ List<Object[]> res = new ArrayList<>();
+ for (CacheMode cacheMode : cacheModes) {
+ for (CacheAtomicityMode atomicityMode : atomicityModes)
+ res.add(new Object[] {cacheMode, atomicityMode});
+ }
+
+ return res;
+ }
+
+ /** Latches to block certain index operations. */
+ private static final ConcurrentHashMap<UUID, T2<CountDownLatch, CountDownLatch>> BLOCKS =
+ new ConcurrentHashMap<>();
+
+ /** Name field index name. */
+ private static final String NAME_FIELD_IDX_NAME = "name_idx";
+
+ /** Large number of entries. */
+ private static final int LARGE_NUM_ENTRIES = 100_000;
+
+ /** */
+ @Parameter(0)
+ public CacheMode cacheMode;
+
+ /** */
+ @Parameter(1)
+ public CacheAtomicityMode atomicityMode;
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ GridQueryProcessor.idxCls = null;
+
+ for (T2<CountDownLatch, CountDownLatch> block : BLOCKS.values())
+ block.get1().countDown();
+
+ BLOCKS.clear();
+
+ stopAllGrids();
+
+ super.afterTest();
+ }
+
+ /**
+ * Test pending operation when coordinator change.
+ */
+ @Test
+ public void testCoordinatorChange() throws Exception {
+ // Start servers.
+ IgniteEx srv1 = ignitionStart(serverConfiguration(1));
+ ignitionStart(serverConfiguration(2));
+ ignitionStart(serverConfiguration(3));
+ ignitionStart(serverConfiguration(4));
+
+ // Start client.
+ IgniteEx cli = ignitionStart(clientConfiguration(5));
+ cli.cluster().state(ClusterState.ACTIVE);
+
+ createCache(cli);
+ loadData(cli, 0, NUM_ENTRIES);
+
+ // Test migration between normal servers.
+ UUID id1 = srv1.cluster().localNode().id();
+
+ CountDownLatch idxLatch = blockIndexing(id1);
+
+ IgniteInternalFuture<?> tblFut = enableIndexing(cli);
+
+ idxLatch.await();
+
+ Ignition.stop(srv1.name(), true);
+
+ unblockIndexing(id1);
+
+ tblFut.get();
+
+ for (Ignite g: G.allGrids()) {
+ assertTrue(query(g, SELECT_ALL_QUERY).size() >= 3 * NUM_ENTRIES / 4 );
+
+ performQueryingIntegrityCheck(g);
+
+ checkQueryParallelism((IgniteEx)g, cacheMode);
+ }
+ }
+
+ /** */
+ @Test
+ public void testClientReconnect() throws Exception {
+ // Start servers.
+ IgniteEx srv1 = ignitionStart(serverConfiguration(1));
+ ignitionStart(serverConfiguration(2));
+ ignitionStart(serverConfiguration(3));
+ ignitionStart(serverConfiguration(4));
+
+ // Start client.
+ IgniteEx cli = ignitionStart(clientConfiguration(5));
+ cli.cluster().state(ClusterState.ACTIVE);
+
+ createCache(cli);
+ loadData(cli, 0, NUM_ENTRIES);
+
+ // Reconnect client and enable indexing before client connects.
+ IgniteClientReconnectAbstractTest.reconnectClientNode(log, cli, srv1, () -> {
+ try {
+ enableIndexing(srv1).get();
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException("Failed to enable indexing", e);
+ }
+ });
+
+ assertEquals(NUM_ENTRIES, query(cli, SELECT_ALL_QUERY).size());
+
+ for (Ignite g: G.allGrids()) {
+ assertEquals(NUM_ENTRIES, query(g, SELECT_ALL_QUERY).size());
+
+ performQueryingIntegrityCheck(g);
+
+ checkQueryParallelism((IgniteEx)g, cacheMode);
+ }
+ }
+
+ /** */
+ @Test
+ public void testNodeJoinOnPendingOperation() throws Exception {
+ CountDownLatch finishLatch = new CountDownLatch(3);
+
+ IgniteEx srv1 = ignitionStart(serverConfiguration(1), finishLatch);
+ srv1.cluster().state(ClusterState.ACTIVE);
+
+ createCache(srv1);
+ loadData(srv1, 0, NUM_ENTRIES);
+
+ CountDownLatch idxLatch = blockIndexing(srv1);
+
+ IgniteInternalFuture<?> tblFut = enableIndexing(srv1);
+
+ U.await(idxLatch);
+
+ ignitionStart(serverConfiguration(2), finishLatch);
+ ignitionStart(serverConfiguration(3), finishLatch);
+
+ awaitPartitionMapExchange();
+
+ assertFalse(tblFut.isDone());
+
+ unblockIndexing(srv1);
+
+ tblFut.get();
+
+ U.await(finishLatch);
+
+ for (Ignite g: G.allGrids()) {
+ assertEquals(NUM_ENTRIES, query(g, SELECT_ALL_QUERY).size());
+
+ performQueryingIntegrityCheck(g);
+
+ checkQueryParallelism((IgniteEx)g, cacheMode);
+ }
+ }
+
+ /** Test chaining schema operation with enable indexing. */
+ @Test
+ public void testOperationChaining() throws Exception {
+ IgniteEx srv1 = ignitionStart(serverConfiguration(1));
+
+ ignitionStart(serverConfiguration(2));
+ ignitionStart(serverConfiguration(3, true));
+ ignitionStart(clientConfiguration(4));
+
+ srv1.cluster().state(ClusterState.ACTIVE);
+
+ createCache(srv1);
+ loadData(srv1, 0, NUM_ENTRIES);
+
+ CountDownLatch idxLatch = blockIndexing(srv1);
+
+ IgniteInternalFuture<?> tblFut = enableIndexing(srv1);
+
+ QueryIndex idx = new QueryIndex();
+ idx.setName(NAME_FIELD_IDX_NAME.toUpperCase());
+ idx.setFieldNames(Collections.singletonList(NAME_FIELD_NAME.toUpperCase()), true);
+
+ IgniteInternalFuture<?> idxFut1 = srv1.context().query().dynamicIndexCreate(POI_CACHE_NAME, POI_SCHEMA_NAME,
+ POI_TABLE_NAME, idx, false, 0);
+
+ idxLatch.await();
+
+ // Add more nodes.
+ ignitionStart(serverConfiguration(5));
+ ignitionStart(serverConfiguration(6, true));
+ ignitionStart(clientConfiguration(7));
+
+ assertFalse(tblFut.isDone());
+ assertFalse(idxFut1.isDone());
+
+ unblockIndexing(srv1);
+
+ idxFut1.get();
+
+ for (Ignite g: G.allGrids()) {
+ assertEquals(NUM_ENTRIES, query(g, SELECT_ALL_QUERY).size());
+
+ performQueryingIntegrityCheck(g);
+
+ checkQueryParallelism((IgniteEx)g, cacheMode);
+
+ IgniteCache<Object, Object> cache = g.cache(POI_CACHE_NAME);
+
+ assertIndexUsed(cache, "SELECT * FROM " + POI_TABLE_NAME + " WHERE name = 'POI_100'", NAME_FIELD_IDX_NAME);
+
+ List<List<?>> res = cache.query(new SqlFieldsQuery("SELECT " + ID_FIELD_NAME + " FROM " + POI_TABLE_NAME +
+ " WHERE name = 'POI_100'").setSchema(POI_SCHEMA_NAME)).getAll();
+
+ assertEquals(1, res.size());
+ assertEquals(100, res.get(0).get(0));
+ }
+ }
+
+ /** Enable indexing on ongoing rebalance. */
+ @Test
+ public void testConcurrentRebalance() throws Exception {
+ // Start cache and populate it with data.
+ IgniteEx srv1 = ignitionStart(serverConfiguration(1));
+ Ignite srv2 = ignitionStart(serverConfiguration(2));
+ srv1.cluster().state(ClusterState.ACTIVE);
+
+ createCache(srv1);
+ loadData(srv1, 0, LARGE_NUM_ENTRIES);
+
+ // Start index operation in blocked state.
+ CountDownLatch idxLatch1 = blockIndexing(srv1);
+ CountDownLatch idxLatch2 = blockIndexing(srv2);
+
+ IgniteInternalFuture<?> tblFut = enableIndexing(srv1);
+
+ U.await(idxLatch1);
+ U.await(idxLatch2);
+
+ // Start two more nodes and unblock index operation in the middle.
+ ignitionStart(serverConfiguration(3));
+
+ unblockIndexing(srv1);
+ unblockIndexing(srv2);
+
+ ignitionStart(serverConfiguration(4));
+
+ awaitPartitionMapExchange();
+
+ tblFut.get();
+
+ for (Ignite g: G.allGrids()) {
+ assertEquals(LARGE_NUM_ENTRIES, query(g, SELECT_ALL_QUERY).size());
+
+ performQueryingIntegrityCheck(g);
+
+ checkQueryParallelism((IgniteEx)g, cacheMode);
+ }
+ }
+
+ /** Test concurrent put remove when enabling indexing. */
+ @Test
+ public void testConcurrentPutRemove() throws Exception {
+ CountDownLatch finishLatch = new CountDownLatch(4);
+
+ // Start several nodes.
+ IgniteEx srv1 = ignitionStart(serverConfiguration(1), finishLatch);
+ ignitionStart(serverConfiguration(2), finishLatch);
+ ignitionStart(serverConfiguration(3), finishLatch);
+ ignitionStart(serverConfiguration(4), finishLatch);
+
+ srv1.cluster().state(ClusterState.ACTIVE);
+
+ createCache(srv1);
+ loadData(srv1, 0, LARGE_NUM_ENTRIES);
+
+ // Start data change operations from several threads.
+ final AtomicBoolean stopped = new AtomicBoolean();
+ final CountDownLatch iterations = new CountDownLatch(1000);
+
+ IgniteInternalFuture<?> task = multithreadedAsync(() -> {
+ while (!stopped.get()) {
+ Ignite node = grid(ThreadLocalRandom.current().nextInt(1, 5));
+
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ int i = rnd.nextInt(0, LARGE_NUM_ENTRIES);
+
+ BinaryObject val = node.binary().builder(POI_CLASS_NAME)
+ .setField(NAME_FIELD_NAME, "POI_" + i, String.class)
+ .setField(LATITUDE_FIELD_NAME, rnd.nextDouble(), Double.class)
+ .setField(LONGITUDE_FIELD_NAME, rnd.nextDouble(), Double.class)
+ .build();
+
+ IgniteCache<Object, BinaryObject> cache = node.cache(POI_CACHE_NAME).withKeepBinary();
+
+ try {
+ if (ThreadLocalRandom.current().nextBoolean())
+ cache.put(i, val);
+ else
+ cache.remove(i);
+ }
+ catch (CacheException e) {
+ if (!X.hasCause(e, TransactionSerializationException.class))
+ throw e;
+ }
+ finally {
+ iterations.countDown();
+ }
+ }
+
+ return null;
+ }, 4);
+
+ // Do some work.
+ iterations.await(2, TimeUnit.SECONDS);
+
+ enableIndexing(srv1).get();
+
+ // Stop updates once index is ready.
+ stopped.set(true);
+ task.get();
+
+ finishLatch.await();
+
+ // Perform integrity check.
+ IgniteCache<Object, Object> cache = srv1.cache(POI_CACHE_NAME).withKeepBinary();
+
+ query(srv1, SELECT_ALL_QUERY).forEach(res -> {
+ BinaryObject val = (BinaryObject)cache.get(res.get(0));
+
+ assertNotNull(val);
+
+ assertEquals(val.field(NAME_FIELD_NAME), res.get(1));
+ assertEquals(val.field(LATITUDE_FIELD_NAME), res.get(2));
+ assertEquals(val.field(LONGITUDE_FIELD_NAME), res.get(3));
+ });
+ }
+
+ /** Test concurrent enabling indexing. Only one attempt should succeed. */
+ @Test
+ public void testConcurrentEnableIndexing() throws Exception {
+ // Start several nodes.
+ IgniteEx srv1 = ignitionStart(serverConfiguration(1));
+ ignitionStart(serverConfiguration(2));
+ ignitionStart(clientConfiguration(3));
+ ignitionStart(clientConfiguration(4));
+
+ srv1.cluster().state(ClusterState.ACTIVE);
+
+ createCache(srv1);
+ loadData(srv1, 0, LARGE_NUM_ENTRIES);
+
+ // Start enable indexing from several threads.
+ final AtomicBoolean stopped = new AtomicBoolean();
+ final AtomicInteger success = new AtomicInteger();
+ final CountDownLatch iterations = new CountDownLatch(1000);
+
+ IgniteInternalFuture<?> task = multithreadedAsync(() -> {
+ while (!stopped.get()) {
+ IgniteEx node = grid(ThreadLocalRandom.current().nextInt(1, 4));
+
+ try {
+ Thread.sleep(100);
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+
+ break;
+ }
+
+ enableIndexing(node).chain((fut) -> {
+ try {
+ fut.get();
+
+ success.incrementAndGet();
+ }
+ catch (IgniteCheckedException e) {
+ assertTrue(e.hasCause(SchemaOperationException.class));
+
+ SchemaOperationException opEx = e.getCause(SchemaOperationException.class);
+
+ assertEquals(SchemaOperationException.CODE_CACHE_ALREADY_INDEXED, opEx.code());
+ assertEquals("Cache is already indexed: " + POI_CACHE_NAME, opEx.getMessage());
+ }
+
+ return null;
+ });
+
+ iterations.countDown();
+ }
+
+ return null;
+ }, 4);
+
+ // Do attempts.
+ iterations.await(2, TimeUnit.SECONDS);
+
+ // Start more server nodes..
+ ignitionStart(serverConfiguration(5));
+ ignitionStart(serverConfiguration(6));
+
+ // Stop task.
+ stopped.set(true);
+ task.get();
+
+ // Check that only one successful attempt.
+ assertEquals(1, success.get());
+
+ awaitPartitionMapExchange();
+
+ for (Ignite g: G.allGrids()) {
+ assertEquals(LARGE_NUM_ENTRIES, query(g, SELECT_ALL_QUERY).size());
+
+ performQueryingIntegrityCheck(g);
+
+ checkQueryParallelism((IgniteEx)g, cacheMode);
+ }
+ }
+
+ /** */
+ private IgniteInternalFuture<?> enableIndexing(IgniteEx node) {
+ Integer parallelism = cacheMode == CacheMode.PARTITIONED ? QUERY_PARALLELISM : null;
+
+ return node.context().query().dynamicAddQueryEntity(POI_CACHE_NAME, POI_SCHEMA_NAME, queryEntity(), parallelism,
+ false);
+ }
+
+ /** */
+ private QueryEntity queryEntity() {
+ LinkedHashMap<String, String> fields = new LinkedHashMap<>();
+ fields.put(ID_FIELD_NAME, Integer.class.getName());
+ fields.put(NAME_FIELD_NAME, String.class.getName());
+ fields.put(LATITUDE_FIELD_NAME, Double.class.getName());
+ fields.put(LONGITUDE_FIELD_NAME, Double.class.getName());
+
+ return new QueryEntity()
+ .setKeyType(Integer.class.getName())
+ .setKeyFieldName(ID_FIELD_NAME)
+ .setValueType(POI_CLASS_NAME)
+ .setTableName(POI_TABLE_NAME)
+ .setFields(fields);
+ }
+
+ /** */
+ private void createCache(IgniteEx node) throws Exception {
+ CacheConfiguration<?, ?> ccfg = testCacheConfiguration(POI_CACHE_NAME, cacheMode, atomicityMode);
+
+ node.context().cache().dynamicStartCache(ccfg, POI_CACHE_NAME, null, true, true, true).get();
+ }
+
+ /** */
+ private static void awaitIndexing(UUID nodeId) {
+ T2<CountDownLatch, CountDownLatch> blocker = BLOCKS.get(nodeId);
+
+ if (blocker != null) {
+ blocker.get2().countDown();
+
+ while (true) {
+ try {
+ blocker.get1().await();
+
+ break;
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+ }
+
+ /** */
+ private static CountDownLatch blockIndexing(Ignite node) {
+ UUID nodeId = ((IgniteEx)node).localNode().id();
+
+ return blockIndexing(nodeId);
+ }
+
+ /** */
+ private static CountDownLatch blockIndexing(UUID nodeId) {
+ assertFalse(BLOCKS.containsKey(nodeId));
+
+ CountDownLatch idxLatch = new CountDownLatch(1);
+
+ BLOCKS.put(nodeId, new T2<>(new CountDownLatch(1), idxLatch));
+
+ return idxLatch;
+ }
+
+ /** */
+ private static void unblockIndexing(Ignite node) {
+ UUID nodeId = ((IgniteEx)node).localNode().id();
+
+ unblockIndexing(nodeId);
+ }
+
+ /** */
+ private static void unblockIndexing(UUID nodeId) {
+ T2<CountDownLatch, CountDownLatch> blocker = BLOCKS.remove(nodeId);
+
+ assertNotNull(blocker);
+
+ blocker.get1().countDown();
+ }
+
+ /** */
+ private IgniteEx ignitionStart(IgniteConfiguration cfg) throws Exception {
+ return ignitionStart(cfg, null);
+ }
+
+ /**
+ * Spoof blocking indexing class and start new node.
+ * @param cfg Node configuration.
+ * @param latch Latch to await schema operation finish.
+ * @return New node.
+ * @throws Exception If failed.
+ */
+ private IgniteEx ignitionStart(IgniteConfiguration cfg, final CountDownLatch latch) throws Exception {
+ GridQueryProcessor.idxCls = BlockingIndexing.class;
+
+ IgniteEx node = startGrid(cfg);
+
+ if (latch != null) {
+ node.context().discovery().setCustomEventListener(SchemaFinishDiscoveryMessage.class,
+ new CustomEventListener<SchemaFinishDiscoveryMessage>() {
+ @Override public void onCustomEvent(
+ AffinityTopologyVersion topVer,
+ ClusterNode snd,
+ SchemaFinishDiscoveryMessage msg
+ ) {
+ latch.countDown();
+ }
+ });
+ }
+
+ return node;
+ }
+
+ /**
+ * Blocking indexing processor.
+ */
+ private static class BlockingIndexing extends IgniteH2Indexing {
+ /** {@inheritDoc} */
+ @Override public IgniteInternalFuture<?> rebuildIndexesFromHash(GridCacheContext cctx) {
+ awaitIndexing(ctx.localNodeId());
+
+ return super.rebuildIndexesFromHash(cctx);
+ }
+ }
+}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicTableSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicTableSelfTest.java
index 6a3b72b..3b9a076 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicTableSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicTableSelfTest.java
@@ -65,6 +65,7 @@ import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
import org.apache.ignite.internal.processors.query.schema.SchemaOperationException;
import org.apache.ignite.internal.util.GridStringBuilder;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.testframework.GridTestUtils;
import org.h2.jdbc.JdbcSQLException;
@@ -326,23 +327,57 @@ public class H2DynamicTableSelfTest extends AbstractSchemaSelfTest {
}
/**
- * Test that attempting to create a cache with a pre-existing name yields an error.
+ * Test creating table over existing cache (enabling query).
+ */
+ @Test
+ public void testCreateTableOnExistingCache() {
+ String cacheName = "new";
+
+ try {
+ client().getOrCreateCache(cacheName);
+
+ doTestCustomNames("new", null, null);
+
+ String createTemplate = "CREATE TABLE \"%s\" (id int primary key, x varchar) WITH " +
+ "\"wrap_key,wrap_value,cache_name=%s\"";
+
+ // Fail to create table with same name.
+ GridTestUtils.assertThrows(null, () -> {
+ execute(client(), String.format(createTemplate, "NameTest", cacheName));
+
+ return null;
+ }, IgniteSQLException.class, "Table already exists: NameTest");
+
+ // Fail to create table with different name on indexed cache.
+ GridTestUtils.assertThrows(null, () -> {
+ execute(client(), String.format(createTemplate, "NameTest1", cacheName));
+
+ return null;
+ }, IgniteSQLException.class, "Cache is already indexed: " + cacheName);
+ }
+ finally {
+ client().destroyCache("new");
+ }
+ }
+
+ /**
+ * Test creating table over existing LOCAL cache fails (enabling query).
* @throws Exception if failed.
*/
@Test
- public void testDuplicateCustomCacheName() throws Exception {
- client().getOrCreateCache("new");
+ public void testCreateTableOnExistingLocalCache() throws Exception {
+ client().getOrCreateCache(new CacheConfiguration<>("local").setCacheMode(CacheMode.LOCAL));
try {
GridTestUtils.assertThrows(null, new Callable<Object>() {
@Override public Object call() throws Exception {
- doTestCustomNames("new", null, null);
+ doTestCustomNames("local", null, null);
return null;
}
- }, IgniteSQLException.class, "Table already exists: NameTest");
+ }, IgniteSQLException.class, "Schema changes are not supported for LOCAL cache");
}
finally {
- client().destroyCache("new");
+ client().destroyCache("local");
}
}
@@ -957,6 +992,63 @@ public class H2DynamicTableSelfTest extends AbstractSchemaSelfTest {
}
/**
+ * Tests that attempt to {@code DROP TABLE} that is enabled dynamically will fail.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testDropTableEnabledDynamically() throws Exception {
+ String cacheName = "new";
+ String tableName = "NewTable";
+ String createSql = "CREATE TABLE \"" + tableName + "\" (id int primary key, x varchar) WITH " +
+ "\"wrap_key,wrap_value,cache_name=" + cacheName + "\"";
+
+ try {
+ client().getOrCreateCache(cacheName);
+
+ execute(client(), createSql);
+
+ GridTestUtils.assertThrows(null, () -> {
+ execute("DROP TABLE \"" + tableName + "\"");
+
+ return null;
+ }, IgniteSQLException.class, "Only cache created with CREATE TABLE may be removed with DROP TABLE");
+ }
+ finally {
+ client().destroyCache(cacheName);
+ }
+ }
+
+ /**
+ * Tests that after destroying cache with table enabled dynamically that table also is removed.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testTableEnabledDynamicallyNotExistsIfCacheDestroyed() throws Exception {
+ String cacheName = "new";
+ String tableName = "NewTable";
+ String createSql = "CREATE TABLE \"" + tableName + "\" (id int primary key, x varchar) WITH " +
+ "\"wrap_key,wrap_value,cache_name=" + cacheName + "\"";
+
+ client().getOrCreateCache(cacheName);
+
+ execute(client(), createSql);
+
+ client().destroyCache(cacheName);
+
+ for (Ignite g: G.allGrids()) {
+ IgniteEx node = (IgniteEx)g;
+
+ QueryTypeDescriptorImpl desc = type(node, cacheName, tableName);
+
+ assertNull(desc);
+
+ assertTrue(execute(g, "SELECT * FROM SYS.TABLES WHERE table_name = '" + tableName + "'").isEmpty());
+ }
+ }
+
+ /**
* Test that attempting to {@code DROP TABLE} that does not exist yields an error.
* @throws Exception if failed.
*/
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
index 739f915..691b784 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
@@ -93,6 +93,7 @@ import org.apache.ignite.internal.processors.cache.IgniteCacheUpdateSqlQuerySelf
import org.apache.ignite.internal.processors.cache.IgniteCheckClusterStateBeforeExecuteQueryTest;
import org.apache.ignite.internal.processors.cache.IgniteClientReconnectCacheQueriesFailoverTest;
import org.apache.ignite.internal.processors.cache.IgniteCrossCachesJoinsQueryTest;
+import org.apache.ignite.internal.processors.cache.IgniteDynamicEnableIndexingRestoreTest;
import org.apache.ignite.internal.processors.cache.IgniteDynamicSqlRestoreTest;
import org.apache.ignite.internal.processors.cache.IgniteErrorOnRebalanceTest;
import org.apache.ignite.internal.processors.cache.IncorrectQueryEntityTest;
@@ -337,6 +338,7 @@ import org.junit.runners.Suite;
IgniteCacheDuplicateEntityConfigurationSelfTest.class,
IncorrectQueryEntityTest.class,
IgniteDynamicSqlRestoreTest.class,
+ IgniteDynamicEnableIndexingRestoreTest.class,
// Queries tests.
IgniteQueryTableLockAndConnectionPoolLazyModeOnTest.class,
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite2.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite2.java
index 6396926..d09c081 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite2.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite2.java
@@ -45,6 +45,8 @@ import org.apache.ignite.internal.processors.cache.index.DynamicColumnsConcurren
import org.apache.ignite.internal.processors.cache.index.DynamicColumnsConcurrentAtomicReplicatedSelfTest;
import org.apache.ignite.internal.processors.cache.index.DynamicColumnsConcurrentTransactionalPartitionedSelfTest;
import org.apache.ignite.internal.processors.cache.index.DynamicColumnsConcurrentTransactionalReplicatedSelfTest;
+import org.apache.ignite.internal.processors.cache.index.DynamicEnableIndexingBasicSelfTest;
+import org.apache.ignite.internal.processors.cache.index.DynamicEnableIndexingConcurrentSelfTest;
import org.apache.ignite.internal.processors.cache.index.DynamicIndexPartitionedAtomicConcurrentSelfTest;
import org.apache.ignite.internal.processors.cache.index.DynamicIndexPartitionedTransactionalConcurrentSelfTest;
import org.apache.ignite.internal.processors.cache.index.DynamicIndexReplicatedAtomicConcurrentSelfTest;
@@ -105,6 +107,9 @@ import org.junit.runners.Suite;
DynamicColumnsConcurrentAtomicReplicatedSelfTest.class,
DynamicColumnsConcurrentTransactionalReplicatedSelfTest.class,
+ DynamicEnableIndexingBasicSelfTest.class,
+ DynamicEnableIndexingConcurrentSelfTest.class,
+
// Distributed joins.
IgniteCacheQueryNodeRestartDistributedJoinSelfTest.class,
IgniteCacheQueryStopOnCancelOrTimeoutDistributedJoinSelfTest.class,