You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2019/03/25 15:57:11 UTC
[ignite] branch master updated: IGNITE-11541 Static cache
configuration should not override persisted configuration - Fixes #6318.
This is an automated email from the ASF dual-hosted git repository.
agoncharuk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 2580c6a IGNITE-11541 Static cache configuration should not override persisted configuration - Fixes #6318.
2580c6a is described below
commit 2580c6a158b5af936582d51f174a2bcbdbe0a477
Author: Alexey Goncharuk <al...@gmail.com>
AuthorDate: Mon Mar 25 18:53:22 2019 +0300
IGNITE-11541 Static cache configuration should not override persisted configuration - Fixes #6318.
---
.../org/apache/ignite/IgniteSystemProperties.java | 9 +-
.../processors/cache/GridCacheProcessor.java | 44 ++-
.../processors/query/GridQueryProcessor.java | 23 ++
.../processors/cache/CacheMetricsManageTest.java | 32 +-
.../StaticCacheDdlKeepStaticConfigurationTest.java | 28 ++
.../processors/cache/index/StaticCacheDdlTest.java | 402 +++++++++++++++++++++
6 files changed, 516 insertions(+), 22 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index 59892d8..3097a05 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -973,7 +973,7 @@ public final class IgniteSystemProperties {
*/
public static final String IGNITE_DUMP_THREADS_ON_FAILURE = "IGNITE_DUMP_THREADS_ON_FAILURE";
- /**
+ /**
* Throttling timeout in millis which avoid excessive PendingTree access on unwind if there is nothing to clean yet.
*
* Default is 500 ms.
@@ -1161,6 +1161,13 @@ public final class IgniteSystemProperties {
public static final String CHECKPOINT_PARALLEL_SORT_THRESHOLD = "CHECKPOINT_PARALLEL_SORT_THRESHOLD";
/**
+ * Keep static cache configuration even if stored cache data differs from the static config. When this property
+ * is set, static cache configuration will override persisted configuration. DDL operations are not allowed
+ * when this system property is set.
+ */
+ public static final String IGNITE_KEEP_STATIC_CACHE_CONFIGURATION = "IGNITE_KEEP_STATIC_CACHE_CONFIGURATION";
+
+ /**
* Enforces singleton.
*/
private IgniteSystemProperties() {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 40454fb..7bed0f6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -227,6 +227,10 @@ public class GridCacheProcessor extends GridProcessorAdapter {
private final boolean IGNITE_ALLOW_START_CACHES_IN_PARALLEL =
IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_ALLOW_START_CACHES_IN_PARALLEL, true);
+ /** */
+ private final boolean keepStaticCacheConfiguration = IgniteSystemProperties.getBoolean(
+ IgniteSystemProperties.IGNITE_KEEP_STATIC_CACHE_CONFIGURATION);
+
/** Shared cache context. */
private GridCacheSharedContext<?, ?> sharedCtx;
@@ -895,10 +899,12 @@ public class GridCacheProcessor extends GridProcessorAdapter {
*/
private void addStoredCache(Map<String, CacheInfo> caches, StoredCacheData cacheData, String cacheName,
CacheType cacheType, boolean isStaticalyConfigured) {
- if (!cacheType.userCache())
- stopSeq.addLast(cacheName);
- else
- stopSeq.addFirst(cacheName);
+ if (!caches.containsKey(cacheName)) {
+ if (!cacheType.userCache())
+ stopSeq.addLast(cacheName);
+ else
+ stopSeq.addFirst(cacheName);
+ }
caches.put(cacheName, new CacheInfo(cacheData, cacheType, cacheData.sql(), 0, isStaticalyConfigured));
}
@@ -929,19 +935,37 @@ public class GridCacheProcessor extends GridProcessorAdapter {
Map<String, StoredCacheData> storedCaches = ctx.cache().context().pageStore().readCacheConfigurations();
if (!F.isEmpty(storedCaches)) {
+ List<String> skippedConfigs = new ArrayList<>();
+
for (StoredCacheData storedCacheData : storedCaches.values()) {
String cacheName = storedCacheData.config().getName();
- //Ignore stored caches if it already added by static config(static config has higher priority).
+ CacheType type = cacheType(cacheName);
+
if (!caches.containsKey(cacheName))
- addStoredCache(caches, storedCacheData, cacheName, cacheType(cacheName), false);
+ // No static cache - add the configuration.
+ addStoredCache(caches, storedCacheData, cacheName, type, false);
else {
+ // A static cache with the same name already exists.
CacheConfiguration cfg = caches.get(cacheName).cacheData().config();
CacheConfiguration cfgFromStore = storedCacheData.config();
validateCacheConfigurationOnRestore(cfg, cfgFromStore);
+
+ if (!keepStaticCacheConfiguration) {
+ addStoredCache(caches, storedCacheData, cacheName, type, false);
+
+ if (type == CacheType.USER)
+ skippedConfigs.add(cacheName);
+ }
}
}
+
+ if (!F.isEmpty(skippedConfigs))
+ U.warn(log, "Static configuration for the following caches will be ignored because a persistent " +
+ "cache with the same name already exist (see " +
+ "https://apacheignite.readme.io/docs/cache-configuration for more information): " +
+ skippedConfigs);
}
}
}
@@ -4685,6 +4709,14 @@ public class GridCacheProcessor extends GridProcessorAdapter {
}
/**
+ * @return Keep static cache configuration flag. If {@code true}, static cache configuration will override
+ * configuration persisted on disk.
+ */
+ public boolean keepStaticCacheConfiguration() {
+ return keepStaticCacheConfiguration;
+ }
+
+ /**
* @param name Cache name.
* @param <K> type of keys.
* @param <V> type of values.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 33c0a02..dd0f2bc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -39,6 +39,7 @@ import javax.cache.CacheException;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.binary.Binarylizable;
import org.apache.ignite.cache.CacheAtomicityMode;
@@ -384,6 +385,17 @@ public class GridQueryProcessor extends GridProcessorAdapter {
msg.onError(new SchemaOperationException("Schema changes are not supported for LOCAL cache."));
}
+ else if (failOnStaticCacheSchemaChanges(cacheDesc)) {
+ // Do not allow any schema changes when keep static cache configuration flag is set.
+ if (log.isDebugEnabled())
+ log.debug("Received schema propose discovery message, but cache is statically configured " +
+ "and " + IgniteSystemProperties.IGNITE_KEEP_STATIC_CACHE_CONFIGURATION +
+ " flag is set (will report error) [opId=" + opId + ", msg=" + msg + ']');
+
+ msg.onError(new SchemaOperationException("Schema changes are not supported for statically " +
+ "configured cache when " + IgniteSystemProperties.IGNITE_KEEP_STATIC_CACHE_CONFIGURATION +
+ " flag is set."));
+ }
else {
// Preserve deployment ID so that we can distinguish between different caches with the same name.
if (msg.deploymentId() == null)
@@ -408,6 +420,17 @@ public class GridQueryProcessor extends GridProcessorAdapter {
}
/**
+ * @param cacheDesc Cache descriptor to check.
+ * @return {@code true} if cache is statically configured, IGNITE_KEEP_STATIC_CACHE_CONFIGURATION system property
+ * is set and cache is persistent, {@code false} otherwise.
+ */
+ private boolean failOnStaticCacheSchemaChanges(DynamicCacheDescriptor cacheDesc) {
+ return cacheDesc.staticallyConfigured() &&
+ ctx.cache().keepStaticCacheConfiguration() &&
+ cacheDesc.groupDescriptor().persistenceEnabled();
+ }
+
+ /**
* Process schema propose message from discovery thread (or from cache start routine).
*
* @param msg Message.
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheMetricsManageTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheMetricsManageTest.java
index 423b703..c18ccce 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheMetricsManageTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheMetricsManageTest.java
@@ -57,6 +57,7 @@ import org.junit.Test;
/**
*
*/
+@SuppressWarnings("unchecked")
public class CacheMetricsManageTest extends GridCommonAbstractTest {
/** */
private static final String CACHE1 = "cache1";
@@ -71,7 +72,7 @@ public class CacheMetricsManageTest extends GridCommonAbstractTest {
private static final long WAIT_CONDITION_TIMEOUT = 3_000L;
/** Persistence. */
- private boolean persistence = false;
+ private boolean persistence;
/**
* @throws Exception If failed.
@@ -101,7 +102,7 @@ public class CacheMetricsManageTest extends GridCommonAbstractTest {
final CacheManager mgr1 = Caching.getCachingProvider().getCacheManager();
final CacheManager mgr2 = Caching.getCachingProvider().getCacheManager();
- CacheConfiguration cfg1 = new CacheConfiguration()
+ CacheConfiguration<Object, Object> cfg1 = new CacheConfiguration<>()
.setName(CACHE1)
.setGroupName(GROUP)
.setCacheMode(CacheMode.PARTITIONED)
@@ -109,7 +110,7 @@ public class CacheMetricsManageTest extends GridCommonAbstractTest {
mgr1.createCache(CACHE1, cfg1);
- CacheConfiguration cfg2 = new CacheConfiguration(cfg1)
+ CacheConfiguration<Object, Object> cfg2 = new CacheConfiguration<>(cfg1)
.setName(CACHE2)
.setStatisticsEnabled(true);
@@ -141,9 +142,9 @@ public class CacheMetricsManageTest extends GridCommonAbstractTest {
Ignite ig1 = startGrid(1);
startGrid(2);
- IgniteCache<?, ?> cache1 = ig1.cache(CACHE1);
+ IgniteCache<Object, Object> cache1 = ig1.cache(CACHE1);
- CacheConfiguration cacheCfg2 = new CacheConfiguration(cache1.getConfiguration(CacheConfiguration.class));
+ CacheConfiguration<Object, Object> cacheCfg2 = new CacheConfiguration<Object, Object>(cache1.getConfiguration(CacheConfiguration.class));
cacheCfg2.setName(CACHE2);
cacheCfg2.setStatisticsEnabled(true);
@@ -170,7 +171,7 @@ public class CacheMetricsManageTest extends GridCommonAbstractTest {
IgniteCache<?, ?> cache1 = grid(0).cache(CACHE1);
- CacheConfiguration cacheCfg2 = new CacheConfiguration(cache1.getConfiguration(CacheConfiguration.class));
+ CacheConfiguration<Object, Object> cacheCfg2 = new CacheConfiguration<Object, Object>(cache1.getConfiguration(CacheConfiguration.class));
cacheCfg2.setName(CACHE2);
@@ -201,7 +202,7 @@ public class CacheMetricsManageTest extends GridCommonAbstractTest {
barrier.await();
- ignite.cluster().enableStatistics(Arrays.asList(CACHE1), true);
+ ignite.cluster().enableStatistics(Collections.singletonList(CACHE1), true);
assertCachesStatisticsMode(true, false);
}
@@ -265,12 +266,12 @@ public class CacheMetricsManageTest extends GridCommonAbstractTest {
public void testClusterApiClearStatistics() throws Exception {
startGrids(3);
- IgniteCache<?, ?> cache = grid(0).cache(CACHE1);
+ IgniteCache<Object, Object> cache = grid(0).cache(CACHE1);
cache.enableStatistics(true);
grid(0).getOrCreateCache(
- new CacheConfiguration(cache.getConfiguration(CacheConfiguration.class)).setName(CACHE2)
+ new CacheConfiguration<Object, Object>(cache.getConfiguration(CacheConfiguration.class)).setName(CACHE2)
).enableStatistics(true);
Collection<String> cacheNames = Arrays.asList(CACHE1, CACHE2);
@@ -310,6 +311,7 @@ public class CacheMetricsManageTest extends GridCommonAbstractTest {
for (String cacheName : cacheNames) {
for (Ignite ig : G.allGrids()) {
assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @SuppressWarnings("ErrorNotRethrown")
@Override public boolean apply() {
boolean res = true;
@@ -377,10 +379,10 @@ public class CacheMetricsManageTest extends GridCommonAbstractTest {
Ignite ig1 = startGrid(1);
Ignite ig2 = startGrid(2);
- ig1.active(true);
+ ig1.cluster().active(true);
- CacheConfiguration ccfg = ig1.cache(CACHE1).getConfiguration(CacheConfiguration.class);
- CacheConfiguration cacheCfg2 = new CacheConfiguration(ccfg);
+ CacheConfiguration<Object, Object> ccfg = ig1.cache(CACHE1).getConfiguration(CacheConfiguration.class);
+ CacheConfiguration<Object, Object> cacheCfg2 = new CacheConfiguration<>(ccfg);
cacheCfg2.setName(CACHE2);
cacheCfg2.setStatisticsEnabled(true);
@@ -447,13 +449,13 @@ public class CacheMetricsManageTest extends GridCommonAbstractTest {
ig1 = startGrid(1);
- ig1.active(true);
+ ig1.cluster().active(true);
ig1.getOrCreateCache(cacheCfg2.setStatisticsEnabled(false));
if (persistence)
- // cache1 - from local configuration, cache2 - restored from pds
- assertCachesStatisticsMode(false, true);
+ // Both caches restored from pds.
+ assertCachesStatisticsMode(true, true);
else
assertCachesStatisticsMode(false, false);
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/StaticCacheDdlKeepStaticConfigurationTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/StaticCacheDdlKeepStaticConfigurationTest.java
new file mode 100644
index 0000000..a1a4ab0
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/StaticCacheDdlKeepStaticConfigurationTest.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.index;
+
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
+
+/**
+ *
+ */
+@WithSystemProperty(key = IgniteSystemProperties.IGNITE_KEEP_STATIC_CACHE_CONFIGURATION, value = "true")
+public class StaticCacheDdlKeepStaticConfigurationTest extends StaticCacheDdlTest {
+}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/StaticCacheDdlTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/StaticCacheDdlTest.java
new file mode 100644
index 0000000..08d5f52
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/StaticCacheDdlTest.java
@@ -0,0 +1,402 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.index;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import javax.cache.CacheException;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.query.FieldsQueryCursor;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ *
+ */
+public class StaticCacheDdlTest extends GridCommonAbstractTest {
+ /** */
+ private static final String PERSISTENT_CACHE_NAME = "PERSISTENTCACHE";
+
+ /** */
+ private static final String MEMORY_CACHE_NAME = "MEMORYCACHE";
+
+ /** */
+ private static final String TABLE_NAME = "PERSONS";
+
+ /** */
+ public static final String PERSISTENT_REGION_NAME = "PERSISTENT_REGION_NAME";
+
+ /** */
+ public static final String MEMORY_REGION_NAME = "MEMORY_REGION_NAME";
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Before
+ public void clearPersistence() throws Exception {
+ cleanPersistenceDir();
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ @After
+ public void cleanup() throws Exception {
+ cleanPersistenceDir();
+ }
+
+ /**
+ * @return {@code true} if static cache config is ignored by Ignite.
+ */
+ protected boolean ignoreStaticConfig() {
+ return !IgniteSystemProperties.getBoolean(
+ IgniteSystemProperties.IGNITE_KEEP_STATIC_CACHE_CONFIGURATION);
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ @Test
+ public void testAddColumn() throws Exception {
+ String fieldName = "new_field";
+
+ int size = 10;
+
+ try (Ignite ignite = startGrid(0)) {
+ ignite.cluster().active(true);
+
+ insertData(ignite, PERSISTENT_CACHE_NAME, size);
+ insertData(ignite, MEMORY_CACHE_NAME, size);
+
+ checkField(ignite, PERSISTENT_CACHE_NAME, fieldName, false);
+ checkField(ignite, MEMORY_CACHE_NAME, fieldName, false);
+
+ addColumn(ignite, PERSISTENT_CACHE_NAME, fieldName);
+ addColumn(ignite, MEMORY_CACHE_NAME, fieldName);
+
+ checkTableSize(ignite, PERSISTENT_CACHE_NAME, size);
+ checkTableSize(ignite, MEMORY_CACHE_NAME, size);
+
+ checkField(ignite, PERSISTENT_CACHE_NAME, fieldName, ignoreStaticConfig());
+ checkField(ignite, MEMORY_CACHE_NAME, fieldName, true);
+ }
+
+ // Check the column after restart the node.
+ try (Ignite ignite = startGrid(0)) {
+ checkTableSize(ignite, PERSISTENT_CACHE_NAME, size);
+
+ checkField(ignite, PERSISTENT_CACHE_NAME, fieldName, ignoreStaticConfig());
+ }
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ @Ignore("https://issues.apache.org/jira/browse/IGNITE-11604")
+ @Test
+ public void testDropColumn() throws Exception {
+ String fieldName = "field_to_drop";
+
+ int size = 10;
+
+ try (Ignite ignite = startGrid(0)) {
+ ignite.cluster().active(true);
+
+ insertData(ignite, PERSISTENT_CACHE_NAME, size);
+ insertData(ignite, MEMORY_CACHE_NAME, size);
+
+ checkField(ignite, PERSISTENT_CACHE_NAME, fieldName, true);
+ checkField(ignite, MEMORY_CACHE_NAME, fieldName, true);
+
+ dropColumn(ignite, PERSISTENT_CACHE_NAME, fieldName);
+ dropColumn(ignite, MEMORY_CACHE_NAME, fieldName);
+
+ checkField(ignite, PERSISTENT_CACHE_NAME, fieldName, !ignoreStaticConfig());
+ checkField(ignite, MEMORY_CACHE_NAME, fieldName, false);
+ }
+
+ // Check the column after restart the node.
+ try (Ignite ignite = startGrid(0)) {
+ checkField(ignite, PERSISTENT_CACHE_NAME, fieldName, !ignoreStaticConfig());
+ }
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ @Test
+ public void testAddIndex() throws Exception {
+ String fieldName = "some_field";
+
+ String idxName = TABLE_NAME + "_" + fieldName.toUpperCase() + "_IDX";
+
+ int size = 10;
+
+ try (Ignite ignite = startGrid(0)) {
+ ignite.cluster().active(true);
+
+ insertData(ignite, PERSISTENT_CACHE_NAME, size);
+ insertData(ignite, MEMORY_CACHE_NAME, size);
+
+ checkIndex(ignite, PERSISTENT_CACHE_NAME, idxName, fieldName, false);
+ checkIndex(ignite, MEMORY_CACHE_NAME, idxName, fieldName, false);
+
+ addIndex(ignite, PERSISTENT_CACHE_NAME, idxName, fieldName);
+ addIndex(ignite, MEMORY_CACHE_NAME, idxName, fieldName);
+
+ checkIndex(ignite, PERSISTENT_CACHE_NAME, idxName, fieldName, ignoreStaticConfig());
+ checkIndex(ignite, MEMORY_CACHE_NAME, idxName, fieldName, true);
+ }
+
+ // Check the column after restart the node.
+ try (Ignite ignite = startGrid(0)) {
+ checkIndex(ignite, PERSISTENT_CACHE_NAME, idxName, fieldName, ignoreStaticConfig());
+ }
+ }
+
+ /**
+ * This method checks if an index with the given name exists.
+ *
+ * @param ignite Ignite instance to check on.
+ * @param idxName Index name.
+ * @param fieldName Field name to check index on.
+ * @param shouldExist Should exist flag.
+ */
+ private void checkIndex(Ignite ignite, String cacheName, String idxName, String fieldName, boolean shouldExist) {
+ SqlFieldsQuery q = new SqlFieldsQuery(
+ "EXPLAIN SELECT * FROM " + cacheName + "." + TABLE_NAME + " WHERE " + fieldName + " = ?" ).setArgs("");
+
+ boolean exists = false;
+
+ try (FieldsQueryCursor<List<?>> cursor = ignite.cache(cacheName).query(q)) {
+ for (List<?> row : cursor) {
+ if (row.toString().contains(idxName)) {
+ exists = true;
+
+ break;
+ }
+ }
+ }
+
+ Assert.assertEquals("Check index (" + idxName + ") exists", shouldExist, exists);
+ }
+
+ /**
+ * @param ignite Ignite instance.
+ * @param fieldName Field name to check.
+ * @param shouldExist Should exist flag.
+ */
+ private void checkField(Ignite ignite, String cacheName, String fieldName, boolean shouldExist) {
+ SqlFieldsQuery q = new SqlFieldsQuery(
+ "SELECT * FROM " + cacheName + "." + TABLE_NAME + " LIMIT 1 OFFSET 0" );
+
+ boolean exists = false;
+
+ try (FieldsQueryCursor<List<?>> cursor = ignite.cache(cacheName).query(q)) {
+ consume(cursor);
+
+ for (int i = 0, cols = cursor.getColumnsCount(); i < cols; i++) {
+ if (cursor.getFieldName(i).equals(fieldName.toUpperCase())) {
+ exists = true;
+
+ break;
+ }
+ }
+ }
+
+ Assert.assertEquals("Check field (" + fieldName + ") exists", shouldExist, exists);
+ }
+
+ /**
+ * @param ignite Ignite instance.
+ * @param expSize Expected size.
+ */
+ private void checkTableSize(Ignite ignite, String cacheName, int expSize) {
+ SqlFieldsQuery q = new SqlFieldsQuery(
+ "SELECT * FROM " + cacheName + "." + TABLE_NAME );
+
+ try (QueryCursor<List<?>> cursor = ignite.cache(cacheName).query(q)) {
+ int actualSize = 0;
+
+ for (List<?> ignore : cursor)
+ actualSize++;
+
+ Assert.assertEquals("Check result set size", expSize, actualSize);
+ }
+ }
+
+ /**
+ * @param ignite Ignite instance.
+ * @param size Number of rows to add.
+ */
+ private void insertData(Ignite ignite, String cacheName, int size) {
+ int rows = 0;
+
+ for (int i = 0; i < size; i++) {
+ SqlFieldsQuery q = new SqlFieldsQuery(
+ "INSERT INTO " + cacheName + "." + TABLE_NAME + "(id, name) VALUES(?, ?)").
+ setArgs(i, UUID.randomUUID().toString());
+
+ try (QueryCursor<List<?>> cursor = ignite.cache(cacheName).query(q)) {
+ for (List<?> ignore : cursor)
+ rows++;
+ }
+ }
+
+ info(rows + " rows processed");
+ }
+
+ /**
+ * @param ignite Ignite instance.
+ * @param idxName Index name to add.
+ * @param fieldName Field name to add index on.
+ */
+ private void addIndex(Ignite ignite, String cacheName, String idxName, String fieldName) {
+ SqlFieldsQuery q = new SqlFieldsQuery("CREATE INDEX " + idxName + " ON " + cacheName + "." + TABLE_NAME +
+ "(" + fieldName + ")");
+
+ try {
+ try (QueryCursor<List<?>> cursor = ignite.cache(cacheName).query(q)) {
+ consume(cursor);
+ }
+ }
+ catch (CacheException e) {
+ assertTrue("Unexpected exception: " + e.getMessage(),
+ cacheName.equalsIgnoreCase(PERSISTENT_CACHE_NAME) && !ignoreStaticConfig());
+ }
+ }
+
+ /**
+ * @param ignite Ignite instance.
+ * @param fieldName Field name to add.
+ */
+ private void addColumn(Ignite ignite, String cacheName, String fieldName) {
+ SqlFieldsQuery q = new SqlFieldsQuery("ALTER TABLE " + cacheName + "." + TABLE_NAME + " " +
+ "ADD COLUMN " + fieldName + " VARCHAR");
+
+ try {
+ try (QueryCursor<List<?>> cursor = ignite.cache(cacheName).query(q)) {
+ consume(cursor);
+ }
+ }
+ catch (CacheException e) {
+ assertTrue("Unexpected exception: " + e.getMessage(),
+ cacheName.equalsIgnoreCase(PERSISTENT_CACHE_NAME) && !ignoreStaticConfig());
+ }
+ }
+
+ /**
+ * @param ignite Ignite instance.
+ * @param fieldName Field name to drop.
+ */
+ private void dropColumn(Ignite ignite, String cacheName, String fieldName) {
+ SqlFieldsQuery q = new SqlFieldsQuery("ALTER TABLE " + cacheName + "." + TABLE_NAME + " " +
+ "DROP COLUMN " + fieldName);
+
+ try {
+ try (QueryCursor<List<?>> cursor = ignite.cache(cacheName).query(q)) {
+ consume(cursor);
+ }
+ }
+ catch (CacheException e) {
+ assertTrue("Unexpected exception: " + e.getMessage(),
+ cacheName.equals(PERSISTENT_CACHE_NAME) && !ignoreStaticConfig());
+ }
+ }
+
+ /**
+ * @param cur Cursor to consume.
+ */
+ private void consume(QueryCursor<List<?>> cur) {
+ int rows = 0;
+
+ for (List<?> ignore : cur)
+ rows++;
+
+ info(rows + " rows processed");
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ DataStorageConfiguration dataStorageConfiguration = new DataStorageConfiguration();
+
+ dataStorageConfiguration
+ .setDefaultDataRegionConfiguration(
+ new DataRegionConfiguration()
+ .setName(PERSISTENT_REGION_NAME)
+ .setInitialSize(100L * 1024 * 1024)
+ .setMaxSize(1024L * 1024 * 1024)
+ .setPersistenceEnabled(true))
+ .setDataRegionConfigurations(
+ new DataRegionConfiguration()
+ .setName(MEMORY_REGION_NAME)
+ .setInitialSize(100L * 1024 * 1024)
+ .setMaxSize(1024L * 1024 * 1024)
+ .setPersistenceEnabled(false));
+
+ dataStorageConfiguration.setCheckpointFrequency(5000);
+
+ return cfg.setCacheConfiguration(
+ getCacheConfig(PERSISTENT_CACHE_NAME, PERSISTENT_REGION_NAME),
+ getCacheConfig(MEMORY_CACHE_NAME, MEMORY_REGION_NAME))
+ .setDataStorageConfiguration(dataStorageConfiguration);
+ }
+
+ /**
+ * @return Static cache configuration.
+ */
+ private CacheConfiguration getCacheConfig(String cacheName, String regionName) {
+ Set<String> keyFields = new HashSet<>(Collections.singletonList("id"));
+
+ LinkedHashMap<String, String> fields = new LinkedHashMap<>();
+ fields.put("id", Integer.class.getName());
+ fields.put("name", String.class.getName());
+ fields.put("field_to_drop", String.class.getName());
+ fields.put("some_field", String.class.getName());
+
+
+ QueryEntity qryEntity = new QueryEntity()
+ .setTableName(TABLE_NAME)
+ .setKeyType("CUSTOM_SQL_KEY_TYPE") // Replace by Integer to reproduce "Key is missing from query"
+ .setValueType("CUSTOM_SQL_VALUE_TYPE")
+ .setKeyFields(keyFields)
+ .setFields(fields);
+
+ return new CacheConfiguration(cacheName)
+ .setQueryEntities(Collections.singletonList(qryEntity))
+ .setDataRegionName(regionName);
+ }
+}