You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/04/18 14:13:19 UTC
[2/8] ignite git commit: IGNITE-4565: Implemented CREATE INDEX and
DROP INDEX. This closes #1773. This closes #1804.
http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexAbstractConcurrentSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexAbstractConcurrentSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexAbstractConcurrentSelfTest.java
new file mode 100644
index 0000000..d2a2f49
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexAbstractConcurrentSelfTest.java
@@ -0,0 +1,921 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.index;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.QueryIndex;
+import org.apache.ignite.cache.query.SqlQuery;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.query.GridQueryProcessor;
+import org.apache.ignite.internal.processors.query.QueryIndexDescriptorImpl;
+import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
+import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitor;
+import org.apache.ignite.internal.processors.query.schema.SchemaOperationException;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.jetbrains.annotations.Nullable;
+
+import javax.cache.Cache;
+import java.util.*;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Concurrency tests for dynamic index create/drop.
+ */
+@SuppressWarnings("unchecked")
+public abstract class DynamicIndexAbstractConcurrentSelfTest extends DynamicIndexAbstractSelfTest {
+ /** Test duration. */
+ private static final long TEST_DUR = 10_000L;
+
+ /** Large cache size. */
+ private static final int LARGE_CACHE_SIZE = 100_000;
+
+ /** Latches to block certain index operations. */
+ private static final ConcurrentHashMap<UUID, T2<CountDownLatch, AtomicBoolean>> BLOCKS = new ConcurrentHashMap<>();
+
+ /** Cache mode. */
+ private final CacheMode cacheMode;
+
+ /** Atomicity mode. */
+ private final CacheAtomicityMode atomicityMode;
+
+ /**
+ * Constructor.
+ *
+ * @param cacheMode Cache mode.
+ * @param atomicityMode Atomicity mode.
+ */
+ protected DynamicIndexAbstractConcurrentSelfTest(CacheMode cacheMode, CacheAtomicityMode atomicityMode) {
+ this.cacheMode = cacheMode;
+ this.atomicityMode = atomicityMode;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ GridQueryProcessor.idxCls = BlockingIndexing.class;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ GridQueryProcessor.idxCls = null;
+
+ for (T2<CountDownLatch, AtomicBoolean> block : BLOCKS.values())
+ block.get1().countDown();
+
+ BLOCKS.clear();
+
+ stopAllGrids();
+
+ super.afterTest();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected long getTestTimeout() {
+ return 5 * 60 * 1000L;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected CacheConfiguration<KeyClass, ValueClass> cacheConfiguration() {
+ CacheConfiguration<KeyClass, ValueClass> ccfg = super.cacheConfiguration();
+
+ return ccfg.setCacheMode(cacheMode).setAtomicityMode(atomicityMode);
+ }
+
+ /**
+ * Make sure that coordinator migrates correctly between nodes.
+ *
+ * @throws Exception If failed.
+ */
+ public void testCoordinatorChange() throws Exception {
+ // Start servers.
+ Ignite srv1 = Ignition.start(serverConfiguration(1));
+ Ignite srv2 = Ignition.start(serverConfiguration(2));
+ Ignition.start(serverConfiguration(3, true));
+ Ignition.start(serverConfiguration(4));
+
+ UUID srv1Id = srv1.cluster().localNode().id();
+ UUID srv2Id = srv2.cluster().localNode().id();
+
+ // Start client which will execute operations.
+ Ignite cli = Ignition.start(clientConfiguration(5));
+
+ cli.getOrCreateCache(cacheConfiguration());
+
+ put(srv1, 0, KEY_AFTER);
+
+ // Test migration between normal servers.
+ blockIndexing(srv1Id);
+
+ QueryIndex idx1 = index(IDX_NAME_1, field(FIELD_NAME_1));
+
+ IgniteInternalFuture<?> idxFut1 = queryProcessor(cli).dynamicIndexCreate(CACHE_NAME, TBL_NAME, idx1, false);
+
+ Thread.sleep(100);
+
+ //srv1.close();
+ Ignition.stop(srv1.name(), true);
+
+ unblockIndexing(srv1Id);
+
+ idxFut1.get();
+
+ assertIndex(CACHE_NAME, TBL_NAME, IDX_NAME_1, field(FIELD_NAME_1));
+ assertIndexUsed(IDX_NAME_1, SQL_SIMPLE_FIELD_1, SQL_ARG_1);
+ assertSqlSimpleData(SQL_SIMPLE_FIELD_1, KEY_AFTER - SQL_ARG_1);
+
+ // Test migration from normal server to non-affinity server.
+ blockIndexing(srv2Id);
+
+ QueryIndex idx2 = index(IDX_NAME_2, field(alias(FIELD_NAME_2)));
+
+ IgniteInternalFuture<?> idxFut2 = queryProcessor(cli).dynamicIndexCreate(CACHE_NAME, TBL_NAME, idx2, false);
+
+ Thread.sleep(100);
+
+ //srv2.close();
+ Ignition.stop(srv2.name(), true);
+
+ unblockIndexing(srv2Id);
+
+ idxFut2.get();
+
+ assertIndex(CACHE_NAME, TBL_NAME, IDX_NAME_2, field(alias(FIELD_NAME_2)));
+ assertIndexUsed(IDX_NAME_2, SQL_SIMPLE_FIELD_2, SQL_ARG_1);
+ assertSqlSimpleData(SQL_SIMPLE_FIELD_2, KEY_AFTER - SQL_ARG_1);
+ }
+
+ /**
+ * Test operations join.
+ *
+ * @throws Exception If failed.
+ */
+ public void testOperationChaining() throws Exception {
+ Ignite srv1 = Ignition.start(serverConfiguration(1));
+
+ Ignition.start(serverConfiguration(2));
+ Ignition.start(serverConfiguration(3, true));
+ Ignition.start(clientConfiguration(4));
+
+ srv1.getOrCreateCache(cacheConfiguration());
+
+ blockIndexing(srv1);
+
+ QueryIndex idx1 = index(IDX_NAME_1, field(FIELD_NAME_1));
+ QueryIndex idx2 = index(IDX_NAME_2, field(alias(FIELD_NAME_2)));
+
+ IgniteInternalFuture<?> idxFut1 = queryProcessor(srv1).dynamicIndexCreate(CACHE_NAME, TBL_NAME, idx1, false);
+ IgniteInternalFuture<?> idxFut2 = queryProcessor(srv1).dynamicIndexCreate(CACHE_NAME, TBL_NAME, idx2, false);
+
+ // Start even more nodes of different flavors
+ Ignition.start(serverConfiguration(5));
+ Ignition.start(serverConfiguration(6, true));
+ Ignition.start(clientConfiguration(7));
+
+ assert !idxFut1.isDone();
+ assert !idxFut2.isDone();
+
+ unblockIndexing(srv1);
+
+ idxFut1.get();
+ idxFut2.get();
+
+ assertIndex(CACHE_NAME, TBL_NAME, IDX_NAME_1, field(FIELD_NAME_1));
+ assertIndex(CACHE_NAME, TBL_NAME, IDX_NAME_2, field(alias(FIELD_NAME_2)));
+
+ Thread.sleep(100);
+
+ put(srv1, 0, KEY_AFTER);
+
+ assertIndexUsed(IDX_NAME_1, SQL_SIMPLE_FIELD_1, SQL_ARG_1);
+ assertIndexUsed(IDX_NAME_2, SQL_SIMPLE_FIELD_2, SQL_ARG_1);
+
+ assertSqlSimpleData(SQL_SIMPLE_FIELD_1, KEY_AFTER - SQL_ARG_1);
+ assertSqlSimpleData(SQL_SIMPLE_FIELD_2, KEY_AFTER - SQL_ARG_1);
+ }
+
+ /**
+ * Test node join on pending operation.
+ *
+ * @throws Exception If failed.
+ */
+ public void testNodeJoinOnPendingOperation() throws Exception {
+ Ignite srv1 = Ignition.start(serverConfiguration(1));
+
+ srv1.getOrCreateCache(cacheConfiguration());
+
+ blockIndexing(srv1);
+
+ QueryIndex idx = index(IDX_NAME_1, field(FIELD_NAME_1));
+
+ IgniteInternalFuture<?> idxFut = queryProcessor(srv1).dynamicIndexCreate(CACHE_NAME, TBL_NAME, idx, false);
+
+ Ignition.start(serverConfiguration(2));
+ Ignition.start(serverConfiguration(3, true));
+ Ignition.start(clientConfiguration(4));
+
+ assert !idxFut.isDone();
+
+ unblockIndexing(srv1);
+
+ idxFut.get();
+
+ Thread.sleep(100L);
+
+ assertIndex(CACHE_NAME, TBL_NAME, IDX_NAME_1, field(FIELD_NAME_1));
+
+ put(srv1, 0, KEY_AFTER);
+
+ assertIndexUsed(IDX_NAME_1, SQL_SIMPLE_FIELD_1, SQL_ARG_1);
+ assertSqlSimpleData(SQL_SIMPLE_FIELD_1, KEY_AFTER - SQL_ARG_1);
+ }
+
+ /**
+ * PUT/REMOVE data from cache and build index concurrently.
+ *
+ * @throws Exception If failed,
+ */
+ public void testConcurrentPutRemove() throws Exception {
+ // Start several nodes.
+ Ignite srv1 = Ignition.start(serverConfiguration(1));
+ Ignition.start(serverConfiguration(2));
+ Ignition.start(serverConfiguration(3));
+ Ignition.start(serverConfiguration(4));
+
+ awaitPartitionMapExchange();
+
+ IgniteCache<BinaryObject, BinaryObject> cache = srv1.createCache(cacheConfiguration()).withKeepBinary();
+
+ // Start data change operations from several threads.
+ final AtomicBoolean stopped = new AtomicBoolean();
+
+ IgniteInternalFuture updateFut = multithreadedAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ while (!stopped.get()) {
+ Ignite node = grid(ThreadLocalRandom.current().nextInt(1, 5));
+
+ int key = ThreadLocalRandom.current().nextInt(0, LARGE_CACHE_SIZE);
+ int val = ThreadLocalRandom.current().nextInt();
+
+ BinaryObject keyObj = key(node, key);
+
+ if (ThreadLocalRandom.current().nextBoolean()) {
+ BinaryObject valObj = value(node, val);
+
+ node.cache(CACHE_NAME).put(keyObj, valObj);
+ }
+ else
+ node.cache(CACHE_NAME).remove(keyObj);
+ }
+
+ return null;
+ }
+ }, 4);
+
+ // Let some to arrive.
+ Thread.sleep(500L);
+
+ // Create index.
+ QueryIndex idx = index(IDX_NAME_1, field(FIELD_NAME_1));
+
+ queryProcessor(srv1).dynamicIndexCreate(CACHE_NAME, TBL_NAME, idx, false).get();
+
+ // Stop updates once index is ready.
+ stopped.set(true);
+
+ updateFut.get();
+
+ // Make sure index is there.
+ assertIndex(CACHE_NAME, TBL_NAME, IDX_NAME_1, field(FIELD_NAME_1));
+ assertIndexUsed(IDX_NAME_1, SQL_SIMPLE_FIELD_1, SQL_ARG_1);
+
+ // Get expected values.
+ Map<Long, Long> expKeys = new HashMap<>();
+
+ for (int i = 0; i < LARGE_CACHE_SIZE; i++) {
+ BinaryObject val = cache.get(key(srv1, i));
+
+ if (val != null) {
+ long fieldVal = val.field(FIELD_NAME_1);
+
+ if (fieldVal >= SQL_ARG_1)
+ expKeys.put((long)i, fieldVal);
+ }
+ }
+
+ // Validate query result.
+ for (Ignite node : Ignition.allGrids()) {
+ IgniteCache<BinaryObject, BinaryObject> nodeCache = node.cache(CACHE_NAME).withKeepBinary();
+
+ SqlQuery qry = new SqlQuery(tableName(ValueClass.class), SQL_SIMPLE_FIELD_1).setArgs(SQL_ARG_1);
+
+ List<Cache.Entry<BinaryObject, BinaryObject>> res = nodeCache.query(qry).getAll();
+
+ assertEquals("Cache size mismatch [exp=" + expKeys.size() + ", actual=" + res.size() + ']',
+ expKeys.size(), res.size());
+
+ for (Cache.Entry<BinaryObject, BinaryObject> entry : res) {
+ long key = entry.getKey().field(FIELD_KEY);
+ Long fieldVal = entry.getValue().field(FIELD_NAME_1);
+
+ assertTrue("Expected key is not in result set: " + key, expKeys.containsKey(key));
+
+ assertEquals("Unexpected value [key=" + key + ", expVal=" + expKeys.get(key) +
+ ", actualVal=" + fieldVal + ']', expKeys.get(key), fieldVal);
+ }
+
+ }
+ }
+
+ /**
+ * Test index consistency on re-balance.
+ *
+ * @throws Exception If failed.
+ */
+ public void testConcurrentRebalance() throws Exception {
+ // Start cache and populate it with data.
+ Ignite srv1 = Ignition.start(serverConfiguration(1));
+ Ignite srv2 = Ignition.start(serverConfiguration(2));
+
+ srv1.createCache(cacheConfiguration());
+
+ awaitPartitionMapExchange();
+
+ put(srv1, 0, LARGE_CACHE_SIZE);
+
+ // Start index operation in blocked state.
+ blockIndexing(srv1);
+ blockIndexing(srv2);
+
+ QueryIndex idx = index(IDX_NAME_1, field(FIELD_NAME_1));
+
+ final IgniteInternalFuture<?> idxFut =
+ queryProcessor(srv1).dynamicIndexCreate(CACHE_NAME, TBL_NAME, idx, false);
+
+ Thread.sleep(100);
+
+ // Start two more nodes and unblock index operation in the middle.
+ Ignition.start(serverConfiguration(3));
+
+ unblockIndexing(srv1);
+ unblockIndexing(srv2);
+
+ Ignition.start(serverConfiguration(4));
+
+ awaitPartitionMapExchange();
+
+ // Validate index state.
+ idxFut.get();
+
+ assertIndex(CACHE_NAME, TBL_NAME, IDX_NAME_1, field(FIELD_NAME_1));
+
+ assertIndexUsed(IDX_NAME_1, SQL_SIMPLE_FIELD_1, SQL_ARG_1);
+ assertSqlSimpleData(SQL_SIMPLE_FIELD_1, LARGE_CACHE_SIZE - SQL_ARG_1);
+ }
+
+ /**
+ * Check what happen in case cache is destroyed before operation is started.
+ *
+ * @throws Exception If failed.
+ */
+ public void testConcurrentCacheDestroy() throws Exception {
+ // Start complex topology.
+ Ignite srv1 = Ignition.start(serverConfiguration(1));
+
+ Ignition.start(serverConfiguration(2));
+ Ignition.start(serverConfiguration(3, true));
+
+ Ignite cli = Ignition.start(clientConfiguration(4));
+
+ // Start cache and populate it with data.
+ IgniteCache cache = cli.getOrCreateCache(cacheConfiguration());
+
+ put(cli, KEY_AFTER);
+
+ // Start index operation and block it on coordinator.
+ blockIndexing(srv1);
+
+ QueryIndex idx = index(IDX_NAME_1, field(FIELD_NAME_1));
+
+ final IgniteInternalFuture<?> idxFut =
+ queryProcessor(srv1).dynamicIndexCreate(CACHE_NAME, TBL_NAME, idx, false);
+
+ Thread.sleep(100);
+
+ // Destroy cache.
+ cache.destroy();
+
+ // Unblock indexing and see what happens.
+ unblockIndexing(srv1);
+
+ try {
+ idxFut.get();
+
+ fail("Exception has not been thrown.");
+ }
+ catch (SchemaOperationException e) {
+ // No-op.
+ }
+ }
+
+ /**
+ * Make sure that contended operations on the same index from different nodes do not hang.
+ *
+ * @throws Exception If failed.
+ */
+ public void testConcurrentOperationsMultithreaded() throws Exception {
+ // Start complex topology.
+ Ignition.start(serverConfiguration(1));
+ Ignition.start(serverConfiguration(2));
+ Ignition.start(serverConfiguration(3, true));
+
+ Ignite cli = Ignition.start(clientConfiguration(4));
+
+ cli.createCache(cacheConfiguration());
+
+ final AtomicBoolean stopped = new AtomicBoolean();
+
+ // Start several threads which will mess around indexes.
+ final QueryIndex idx = index(IDX_NAME_1, field(FIELD_NAME_1));
+
+ IgniteInternalFuture idxFut = multithreadedAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ boolean exists = false;
+
+ while (!stopped.get()) {
+ Ignite node = grid(ThreadLocalRandom.current().nextInt(1, 5));
+
+ IgniteInternalFuture fut;
+
+ if (exists) {
+ fut = queryProcessor(node).dynamicIndexDrop(CACHE_NAME, IDX_NAME_1, true);
+
+ exists = false;
+ }
+ else {
+ fut = queryProcessor(node).dynamicIndexCreate(CACHE_NAME, TBL_NAME, idx, true);
+
+ exists = true;
+ }
+
+ try {
+ fut.get();
+ }
+ catch (SchemaOperationException e) {
+ // No-op.
+ }
+ catch (Exception e) {
+ fail("Unexpected exception: " + e);
+ }
+ }
+
+ return null;
+ }
+ }, 8);
+
+ Thread.sleep(TEST_DUR);
+
+ stopped.set(true);
+
+ // Make sure nothing hanged.
+ idxFut.get();
+
+ queryProcessor(cli).dynamicIndexDrop(CACHE_NAME, IDX_NAME_1, true).get();
+ queryProcessor(cli).dynamicIndexCreate(CACHE_NAME, TBL_NAME, idx, true).get();
+
+ assertIndex(CACHE_NAME, TBL_NAME, IDX_NAME_1, field(FIELD_NAME_1));
+
+ put(cli, 0, KEY_AFTER);
+
+ assertIndexUsed(IDX_NAME_1, SQL_SIMPLE_FIELD_1, SQL_ARG_1);
+ assertSqlSimpleData(SQL_SIMPLE_FIELD_1, KEY_AFTER - SQL_ARG_1);
+ }
+
+ /**
+ * Make sure that contended operations on the same index from different nodes do not hang when we issue both
+ * CREATE/DROP and SELECT statements.
+ *
+ * @throws Exception If failed.
+ */
+ public void testQueryConsistencyMultithreaded() throws Exception {
+ // Start complex topology.
+ Ignition.start(serverConfiguration(1));
+ Ignition.start(serverConfiguration(2));
+ Ignition.start(serverConfiguration(3, true));
+
+ Ignite cli = Ignition.start(clientConfiguration(4));
+
+ cli.createCache(cacheConfiguration());
+
+ put(cli, 0, KEY_AFTER);
+
+ final AtomicBoolean stopped = new AtomicBoolean();
+
+ // Thread which will mess around indexes.
+ final QueryIndex idx = index(IDX_NAME_1, field(FIELD_NAME_1));
+
+ IgniteInternalFuture idxFut = multithreadedAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ boolean exists = false;
+
+ while (!stopped.get()) {
+ Ignite node = grid(ThreadLocalRandom.current().nextInt(1, 5));
+
+ IgniteInternalFuture fut;
+
+ if (exists) {
+ fut = queryProcessor(node).dynamicIndexDrop(CACHE_NAME, IDX_NAME_1, true);
+
+ exists = false;
+ }
+ else {
+ fut = queryProcessor(node).dynamicIndexCreate(CACHE_NAME, TBL_NAME, idx, true);
+
+ exists = true;
+ }
+
+ try {
+ fut.get();
+ }
+ catch (SchemaOperationException e) {
+ // No-op.
+ }
+ catch (Exception e) {
+ fail("Unexpected exception: " + e);
+ }
+ }
+
+ return null;
+ }
+ }, 1);
+
+ IgniteInternalFuture qryFut = multithreadedAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ while (!stopped.get()) {
+ Ignite node = grid(ThreadLocalRandom.current().nextInt(1, 5));
+
+ assertSqlSimpleData(node, SQL_SIMPLE_FIELD_1, KEY_AFTER - SQL_ARG_1);
+ }
+
+ return null;
+ }
+ }, 8);
+
+ Thread.sleep(TEST_DUR);
+
+ stopped.set(true);
+
+ // Make sure nothing hanged.
+ idxFut.get();
+ qryFut.get();
+ }
+
+ /**
+ * Test concurrent node start/stop along with index operations. Nothing should hang.
+ *
+ * @throws Exception If failed.
+ */
+ public void testConcurrentOperationsAndNodeStartStopMultithreaded() throws Exception {
+ // Start several stable nodes.
+ Ignition.start(serverConfiguration(1));
+ Ignition.start(serverConfiguration(2));
+ Ignition.start(serverConfiguration(3, true));
+
+ final Ignite cli = Ignition.start(clientConfiguration(4));
+
+ cli.createCache(cacheConfiguration());
+
+ final AtomicBoolean stopped = new AtomicBoolean();
+
+ // Start node start/stop worker.
+ final AtomicInteger nodeIdx = new AtomicInteger(4);
+
+ IgniteInternalFuture startStopFut = multithreadedAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ boolean exists = false;
+
+ int lastIdx = 0;
+
+ while (!stopped.get()) {
+ if (exists) {
+ stopGrid(lastIdx);
+
+ exists = false;
+ }
+ else {
+ lastIdx = nodeIdx.incrementAndGet();
+
+ IgniteConfiguration cfg;
+
+ switch (ThreadLocalRandom.current().nextInt(0, 3)) {
+ case 1:
+ cfg = serverConfiguration(lastIdx, false);
+
+ break;
+
+ case 2:
+
+ cfg = serverConfiguration(lastIdx, true);
+
+ break;
+
+ default:
+ cfg = clientConfiguration(lastIdx);
+ }
+
+ Ignition.start(cfg);
+
+ exists = true;
+ }
+
+ Thread.sleep(ThreadLocalRandom.current().nextLong(500L, 1500L));
+ }
+
+ return null;
+ }
+ }, 1);
+
+ // Start several threads which will mess around indexes.
+ final QueryIndex idx = index(IDX_NAME_1, field(FIELD_NAME_1));
+
+ IgniteInternalFuture idxFut = multithreadedAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ boolean exists = false;
+
+ while (!stopped.get()) {
+ Ignite node = grid(ThreadLocalRandom.current().nextInt(1, 5));
+
+ IgniteInternalFuture fut;
+
+ if (exists) {
+ fut = queryProcessor(node).dynamicIndexDrop(CACHE_NAME, IDX_NAME_1, true);
+
+ exists = false;
+ }
+ else {
+ fut = queryProcessor(node).dynamicIndexCreate(CACHE_NAME, TBL_NAME, idx, true);
+
+ exists = true;
+ }
+
+ try {
+ fut.get();
+ }
+ catch (SchemaOperationException e) {
+ // No-op.
+ }
+ catch (Exception e) {
+ fail("Unexpected exception: " + e);
+ }
+ }
+
+ return null;
+ }
+ }, 1);
+
+ Thread.sleep(TEST_DUR);
+
+ stopped.set(true);
+
+ // Make sure nothing hanged.
+ startStopFut.get();
+ idxFut.get();
+
+ // Make sure cache is operational at this point.
+ cli.getOrCreateCache(cacheConfiguration());
+
+ queryProcessor(cli).dynamicIndexDrop(CACHE_NAME, IDX_NAME_1, true).get();
+ queryProcessor(cli).dynamicIndexCreate(CACHE_NAME, TBL_NAME, idx, true).get();
+
+ assertIndex(CACHE_NAME, TBL_NAME, IDX_NAME_1, field(FIELD_NAME_1));
+
+ put(cli, 0, KEY_AFTER);
+
+ assertIndexUsed(IDX_NAME_1, SQL_SIMPLE_FIELD_1, SQL_ARG_1);
+ assertSqlSimpleData(SQL_SIMPLE_FIELD_1, KEY_AFTER - SQL_ARG_1);
+ }
+
+ /**
+ * Multithreaded cache start/stop along with index operations. Nothing should hang.
+ *
+ * @throws Exception If failed.
+ */
+ public void testConcurrentOperationsAndCacheStartStopMultithreaded() throws Exception {
+ // Start complex topology.
+ Ignition.start(serverConfiguration(1));
+ Ignition.start(serverConfiguration(2));
+ Ignition.start(serverConfiguration(3, true));
+
+ Ignite cli = Ignition.start(clientConfiguration(4));
+
+ final AtomicBoolean stopped = new AtomicBoolean();
+
+ // Start cache create/destroy worker.
+ IgniteInternalFuture startStopFut = multithreadedAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ boolean exists = false;
+
+ while (!stopped.get()) {
+ Ignite node = grid(ThreadLocalRandom.current().nextInt(1, 5));
+
+ if (exists) {
+ node.destroyCache(CACHE_NAME);
+
+ exists = false;
+ }
+ else {
+ node.createCache(cacheConfiguration());
+
+ exists = true;
+ }
+
+ Thread.sleep(ThreadLocalRandom.current().nextLong(200L, 400L));
+ }
+
+ return null;
+ }
+ }, 1);
+
+ // Start several threads which will mess around indexes.
+ final QueryIndex idx = index(IDX_NAME_1, field(FIELD_NAME_1));
+
+ IgniteInternalFuture idxFut = multithreadedAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ boolean exists = false;
+
+ while (!stopped.get()) {
+ Ignite node = grid(ThreadLocalRandom.current().nextInt(1, 5));
+
+ IgniteInternalFuture fut;
+
+ if (exists) {
+ fut = queryProcessor(node).dynamicIndexDrop(CACHE_NAME, IDX_NAME_1, true);
+
+ exists = false;
+ }
+ else {
+ fut = queryProcessor(node).dynamicIndexCreate(CACHE_NAME, TBL_NAME, idx, true);
+
+ exists = true;
+ }
+
+ try {
+ fut.get();
+ }
+ catch (SchemaOperationException e) {
+ // No-op.
+ }
+ catch (Exception e) {
+ fail("Unexpected exception: " + e);
+ }
+ }
+
+ return null;
+ }
+ }, 8);
+
+ Thread.sleep(TEST_DUR);
+
+ stopped.set(true);
+
+ // Make sure nothing hanged.
+ startStopFut.get();
+ idxFut.get();
+
+ // Make sure cache is operational at this point.
+ cli.getOrCreateCache(cacheConfiguration());
+
+ queryProcessor(cli).dynamicIndexDrop(CACHE_NAME, IDX_NAME_1, true).get();
+ queryProcessor(cli).dynamicIndexCreate(CACHE_NAME, TBL_NAME, idx, true).get();
+
+ assertIndex(CACHE_NAME, TBL_NAME, IDX_NAME_1, field(FIELD_NAME_1));
+
+ put(cli, 0, KEY_AFTER);
+
+ assertIndexUsed(IDX_NAME_1, SQL_SIMPLE_FIELD_1, SQL_ARG_1);
+ assertSqlSimpleData(SQL_SIMPLE_FIELD_1, KEY_AFTER - SQL_ARG_1);
+ }
+
+ /**
+ * Block indexing.
+ *
+ * @param node Node.
+ */
+ @SuppressWarnings("SuspiciousMethodCalls")
+ private static void blockIndexing(Ignite node) {
+ UUID nodeId = ((IgniteEx)node).localNode().id();
+
+ blockIndexing(nodeId);
+ }
+
+ /**
+ * Block indexing.
+ *
+ * @param nodeId Node.
+ */
+ @SuppressWarnings("SuspiciousMethodCalls")
+ private static void blockIndexing(UUID nodeId) {
+ assertFalse(BLOCKS.contains(nodeId));
+
+ BLOCKS.put(nodeId, new T2<>(new CountDownLatch(1), new AtomicBoolean()));
+ }
+
+ /**
+ * Unblock indexing.
+ *
+ * @param node Node.
+ */
+ private static void unblockIndexing(Ignite node) {
+ UUID nodeId = ((IgniteEx)node).localNode().id();
+
+ unblockIndexing(nodeId);
+ }
+
+ /**
+ * Unblock indexing.
+ *
+ * @param nodeId Node ID.
+ */
+ private static void unblockIndexing(UUID nodeId) {
+ T2<CountDownLatch, AtomicBoolean> blocker = BLOCKS.remove(nodeId);
+
+ assertNotNull(blocker);
+
+ blocker.get1().countDown();
+ }
+
+ /**
+ * Await indexing.
+ *
+ * @param nodeId Node ID.
+ */
+ private static void awaitIndexing(UUID nodeId) {
+ T2<CountDownLatch, AtomicBoolean> blocker = BLOCKS.get(nodeId);
+
+ if (blocker != null) {
+ assertTrue(blocker.get2().compareAndSet(false, true));
+
+ while (true) {
+ try {
+ blocker.get1().await();
+
+ break;
+ }
+ catch (InterruptedException e) {
+ // No-op.
+ }
+ }
+ }
+ }
+
+ /**
+ * Blocking indexing processor.
+ */
+ private static class BlockingIndexing extends IgniteH2Indexing {
+ /** {@inheritDoc} */
+ @Override public void dynamicIndexCreate(@Nullable String spaceName, String tblName,
+ QueryIndexDescriptorImpl idxDesc, boolean ifNotExists, SchemaIndexCacheVisitor cacheVisitor)
+ throws IgniteCheckedException {
+ awaitIndexing(ctx.localNodeId());
+
+ super.dynamicIndexCreate(spaceName, tblName, idxDesc, ifNotExists, cacheVisitor);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void dynamicIndexDrop(@Nullable String spaceName, String idxName, boolean ifExists)
+ throws IgniteCheckedException{
+ awaitIndexing(ctx.localNodeId());
+
+ super.dynamicIndexDrop(spaceName, idxName, ifExists);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexAbstractSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexAbstractSelfTest.java
new file mode 100644
index 0000000..e52e0d3
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexAbstractSelfTest.java
@@ -0,0 +1,467 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.index;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cache.query.SqlQuery;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.binary.BinaryMarshaller;
+import org.apache.ignite.internal.processors.query.schema.SchemaOperationException;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+
+import javax.cache.Cache;
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+/**
+ * Tests for dynamic index creation.
+ */
+@SuppressWarnings({"unchecked", "ThrowableResultOfMethodCallIgnored"})
+public abstract class DynamicIndexAbstractSelfTest extends AbstractSchemaSelfTest {
+ /** Attribute to filter node out of cache data nodes. */
+ protected static final String ATTR_FILTERED = "FILTERED";
+
+ /** Key range limit for "before" step. */
+ protected static final int KEY_BEFORE = 100;
+
+ /** Key range limit for "after" step. */
+ protected static final int KEY_AFTER = 200;
+
+ /** SQL to check index on the field 1. */
+ protected static final String SQL_SIMPLE_FIELD_1 = "SELECT * FROM " + TBL_NAME + " WHERE " + FIELD_NAME_1 + " >= ?";
+
+ /** SQL to check composite index */
+ protected static final String SQL_COMPOSITE = "SELECT * FROM " + TBL_NAME + " WHERE " + FIELD_NAME_1 +
+ " >= ? AND " + alias(FIELD_NAME_2) + " >= ?";
+
+ /** SQL to check index on the field 2. */
+ protected static final String SQL_SIMPLE_FIELD_2 =
+ "SELECT * FROM " + TBL_NAME + " WHERE " + alias(FIELD_NAME_2) + " >= ?";
+
+ /** Argument for simple SQL (1). */
+ protected static final int SQL_ARG_1 = 40;
+
+ /** Argument for simple SQL (2). */
+ protected static final int SQL_ARG_2 = 80;
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ stopAllGrids();
+
+ super.afterTestsStopped();
+ }
+
+ /**
+ * Create server configuration.
+ *
+ * @param idx Index.
+ * @return Configuration.
+ * @throws Exception If failed.
+ */
+ protected IgniteConfiguration serverConfiguration(int idx) throws Exception {
+ return serverConfiguration(idx, false);
+ }
+
+ /**
+ * Create server configuration.
+ *
+ * @param idx Index.
+ * @param filter Whether to filter the node out of cache.
+ * @return Configuration.
+ * @throws Exception If failed.
+ */
+ protected IgniteConfiguration serverConfiguration(int idx, boolean filter) throws Exception {
+ IgniteConfiguration cfg = commonConfiguration(idx);
+
+ if (filter)
+ cfg.setUserAttributes(Collections.singletonMap(ATTR_FILTERED, true));
+
+ return cfg;
+ }
+
+ /**
+ * Create client configuration.
+ *
+ * @param idx Index.
+ * @return Configuration.
+ * @throws Exception If failed.
+ */
+ protected IgniteConfiguration clientConfiguration(int idx) throws Exception {
+ return commonConfiguration(idx).setClientMode(true);
+ }
+
+ /**
+ * Create common node configuration.
+ *
+ * @param idx Index.
+ * @return Configuration.
+ * @throws Exception If failed.
+ */
+ protected IgniteConfiguration commonConfiguration(int idx) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(getTestIgniteInstanceName(idx));
+
+ cfg.setDiscoverySpi(new TcpDiscoverySpi());
+
+ cfg.setMarshaller(new BinaryMarshaller());
+
+ return optimize(cfg);
+ }
+
+ /**
+ * @return Default cache configuration.
+ */
+ protected CacheConfiguration<KeyClass, ValueClass> cacheConfiguration() {
+ CacheConfiguration ccfg = new CacheConfiguration().setName(CACHE_NAME);
+
+ QueryEntity entity = new QueryEntity();
+
+ entity.setKeyType(KeyClass.class.getName());
+ entity.setValueType(ValueClass.class.getName());
+
+ entity.addQueryField(FIELD_KEY, Long.class.getName(), null);
+ entity.addQueryField(FIELD_NAME_1, Long.class.getName(), null);
+ entity.addQueryField(FIELD_NAME_2, Long.class.getName(), null);
+
+ entity.setKeyFields(Collections.singleton(FIELD_KEY));
+
+ entity.setAliases(Collections.singletonMap(FIELD_NAME_2, alias(FIELD_NAME_2)));
+
+ ccfg.setQueryEntities(Collections.singletonList(entity));
+
+ ccfg.setNodeFilter(new NodeFilter());
+
+ ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+ ccfg.setBackups(1);
+
+ return ccfg;
+ }
+
+ /**
+ * Ensure that schema exception is thrown.
+ *
+ * @param r Runnable.
+ * @param expCode Error code.
+ */
+ protected static void assertSchemaException(RunnableX r, int expCode) {
+ try {
+ r.run();
+ }
+ catch (SchemaOperationException e) {
+ assertEquals("Unexpected error code [expected=" + expCode + ", actual=" + e.code() + ']',
+ expCode, e.code());
+
+ return;
+ }
+ catch (Exception e) {
+ fail("Unexpected exception: " + e);
+ }
+
+ fail(SchemaOperationException.class.getSimpleName() + " is not thrown.");
+ }
+
+ /**
+ * Ensure index is used in plan.
+ *
+ * @param idxName Index name.
+ * @param sql SQL.
+ * @param args Arguments.
+ */
+ protected static void assertIndexUsed(String idxName, String sql, Object... args) {
+ for (Ignite node : Ignition.allGrids())
+ assertIndexUsed((IgniteEx)node, idxName, sql, args);
+ }
+
+ /**
+ * Ensure index is used in plan.
+ *
+ * @param node Node.
+ * @param idxName Index name.
+ * @param sql SQL.
+ * @param args Arguments.
+ */
+ protected static void assertIndexUsed(IgniteEx node, String idxName, String sql, Object... args) {
+ SqlFieldsQuery qry = new SqlFieldsQuery("EXPLAIN " + sql);
+
+ if (args != null && args.length > 0)
+ qry.setArgs(args);
+
+ String plan = (String)node.cache(CACHE_NAME).query(qry).getAll().get(0).get(0);
+
+ assertTrue("Index is not used: " + plan, plan.toLowerCase().contains(idxName.toLowerCase()));
+ }
+
+ /**
+ * Ensure index is not used in plan.
+ *
+ * @param idxName Index name.
+ * @param sql SQL.
+ * @param args Arguments.
+ */
+ protected static void assertIndexNotUsed(String idxName, String sql, Object... args) {
+ for (Ignite node : Ignition.allGrids())
+ assertIndexNotUsed((IgniteEx)node, idxName, sql, args);
+ }
+
+ /**
+ * Ensure index is not used in plan.
+ *
+ * @param node Node.
+ * @param idxName Index name.
+ * @param sql SQL.
+ * @param args Arguments.
+ */
+ protected static void assertIndexNotUsed(IgniteEx node, String idxName, String sql, Object... args) {
+ SqlFieldsQuery qry = new SqlFieldsQuery("EXPLAIN " + sql);
+
+ if (args != null && args.length > 0)
+ qry.setArgs(args);
+
+ String plan = (String)node.cache(CACHE_NAME).query(qry).getAll().get(0).get(0);
+
+ assertFalse("Index is used: " + plan, plan.contains(idxName));
+ }
+
+ /**
+ * Create key object.
+ *
+ * @param ignite Ignite instance.
+ * @param id ID.
+ * @return Key object.
+ */
+ protected static BinaryObject key(Ignite ignite, long id) {
+ return ignite.binary().builder(KeyClass.class.getName()).setField(FIELD_KEY, id).build();
+ }
+
+ /**
+ * Create value object.
+ *
+ * @param ignite Ignite instance.
+ * @param id ID.
+ * @return Value object.
+ */
+ protected static BinaryObject value(Ignite ignite, long id) {
+ return ignite.binary().builder(ValueClass.class.getName())
+ .setField(FIELD_NAME_1, id)
+ .setField(FIELD_NAME_2, id)
+ .build();
+ }
+
+ /**
+ * Create key/value entry for the given key.
+ *
+ * @param ignite Ignite instance.
+ * @param id ID.
+ * @return Entry.
+ */
+ protected static T2<BinaryObject, BinaryObject> entry(Ignite ignite, long id) {
+ return new T2<>(key(ignite, id), value(ignite, id));
+ }
+
+ /**
+ * Get common cache.
+ *
+ * @param node Node.
+ * @return Cache.
+ */
+ protected static IgniteCache<BinaryObject, BinaryObject> cache(Ignite node) {
+ return node.cache(CACHE_NAME).withKeepBinary();
+ }
+
+ /**
+ * Get key.
+ *
+ * @param node Node.
+ * @param id ID.
+ */
+ protected static BinaryObject get(Ignite node, int id) {
+ BinaryObject key = key(node, id);
+
+ return cache(node).get(key);
+ }
+
+ /**
+ * Put key range.
+ *
+ * @param node Node.
+ * @param from From key.
+ * @param to To key.
+ */
+ protected static void put(Ignite node, int from, int to) {
+ try (IgniteDataStreamer streamer = node.dataStreamer(CACHE_NAME)) {
+ streamer.allowOverwrite(true);
+ streamer.keepBinary(true);
+
+ for (int i = from; i < to; i++) {
+ BinaryObject key = key(node, i);
+ BinaryObject val = value(node, i);
+
+ streamer.addData(key, val);
+ }
+
+ streamer.flush();
+ }
+ }
+
+ /**
+ * Put key to cache.
+ *
+ * @param node Node.
+ * @param id ID.
+ */
+ protected static void put(Ignite node, long id) {
+ BinaryObject key = key(node, id);
+ BinaryObject val = value(node, id);
+
+ cache(node).put(key, val);
+ }
+
+ /**
+ * Remove key range.
+ *
+ * @param node Node.
+ * @param from From key.
+ * @param to To key.
+ */
+ protected static void remove(Ignite node, int from, int to) {
+ for (int i = from; i < to; i++)
+ remove(node, i);
+ }
+
+ /**
+ * Remove key form cache.
+ *
+ * @param node Node.
+ * @param id ID.
+ */
+ protected static void remove(Ignite node, long id) {
+ BinaryObject key = key(node, id);
+
+ cache(node).remove(key);
+ }
+
+ /**
+ * @return Random string.
+ */
+ protected static String randomString() {
+ return UUID.randomUUID().toString();
+ }
+
+ /**
+ * Assert SQL simple data state.
+ *
+ * @param sql SQL query.
+ * @param expSize Expected size.
+ */
+ protected static void assertSqlSimpleData(String sql, int expSize) {
+ for (Ignite node : Ignition.allGrids())
+ assertSqlSimpleData(node, sql, expSize);
+ }
+
+ /**
+ * Assert SQL simple data state.
+ *
+ * @param node Node.
+ * @param sql SQL query.
+ * @param expSize Expected size.
+ */
+ protected static void assertSqlSimpleData(Ignite node, String sql, int expSize) {
+ SqlQuery qry = new SqlQuery(tableName(ValueClass.class), sql).setArgs(SQL_ARG_1);
+
+ List<Cache.Entry<BinaryObject, BinaryObject>> res = node.cache(CACHE_NAME).withKeepBinary().query(qry).getAll();
+
+ Set<Long> ids = new HashSet<>();
+
+ for (Cache.Entry<BinaryObject, BinaryObject> entry : res) {
+ long id = entry.getKey().field(FIELD_KEY);
+
+ long field1 = entry.getValue().field(FIELD_NAME_1);
+ long field2 = entry.getValue().field(FIELD_NAME_2);
+
+ assertTrue(field1 >= SQL_ARG_1);
+
+ assertEquals(id, field1);
+ assertEquals(id, field2);
+
+ assertTrue(ids.add(id));
+ }
+
+ assertEquals("Size mismatch [node=" + node.name() + ", exp=" + expSize + ", actual=" + res.size() +
+ ", ids=" + ids + ']', expSize, res.size());
+ }
+
+ /**
+ * Assert SQL simple data state.
+ *
+ * @param node Node.
+ * @param sql SQL query.
+ * @param expSize Expected size.
+ */
+ protected static void assertSqlCompositeData(Ignite node, String sql, int expSize) {
+ SqlQuery qry = new SqlQuery(tableName(ValueClass.class), sql).setArgs(SQL_ARG_1, SQL_ARG_2);
+
+ List<Cache.Entry<BinaryObject, BinaryObject>> res = node.cache(CACHE_NAME).withKeepBinary().query(qry).getAll();
+
+ Set<Long> ids = new HashSet<>();
+
+ for (Cache.Entry<BinaryObject, BinaryObject> entry : res) {
+ long id = entry.getKey().field(FIELD_KEY);
+
+ long field1 = entry.getValue().field(FIELD_NAME_1);
+ long field2 = entry.getValue().field(FIELD_NAME_2);
+
+ assertTrue(field1 >= SQL_ARG_2);
+
+ assertEquals(id, field1);
+ assertEquals(id, field2);
+
+ assertTrue(ids.add(id));
+ }
+
+ assertEquals("Size mismatch [exp=" + expSize + ", actual=" + res.size() + ", ids=" + ids + ']',
+ expSize, res.size());
+ }
+
+ /**
+ * Node filter.
+ */
+ protected static class NodeFilter implements IgnitePredicate<ClusterNode>, Serializable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** {@inheritDoc} */
+ @Override public boolean apply(ClusterNode node) {
+ return node.attribute(ATTR_FILTERED) == null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexClientBasicSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexClientBasicSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexClientBasicSelfTest.java
new file mode 100644
index 0000000..10f4f85
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexClientBasicSelfTest.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.index;
+
+/**
+ * Test dynamic schema operations from client node.
+ */
+public class DynamicIndexClientBasicSelfTest extends DynamicIndexAbstractBasicSelfTest {
+ /** {@inheritDoc} */
+ @Override protected int nodeIndex() {
+ return IDX_CLI;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexPartitionedAtomicConcurrentSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexPartitionedAtomicConcurrentSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexPartitionedAtomicConcurrentSelfTest.java
new file mode 100644
index 0000000..497ec39
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexPartitionedAtomicConcurrentSelfTest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.index;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+
+/**
+ * Concurrency tests for dynamic index create/drop for PARTITIONED/ATOMIC cache.
+ */
+public class DynamicIndexPartitionedAtomicConcurrentSelfTest extends DynamicIndexAbstractConcurrentSelfTest {
+ /**
+ * Constructor.
+ */
+ public DynamicIndexPartitionedAtomicConcurrentSelfTest() {
+ super(CacheMode.PARTITIONED, CacheAtomicityMode.ATOMIC);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexPartitionedTransactionalConcurrentSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexPartitionedTransactionalConcurrentSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexPartitionedTransactionalConcurrentSelfTest.java
new file mode 100644
index 0000000..fed0149
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexPartitionedTransactionalConcurrentSelfTest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.index;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+
+/**
+ * Concurrency tests for dynamic index create/drop for PARTITIONED/TRANSACTIONAL cache.
+ */
+public class DynamicIndexPartitionedTransactionalConcurrentSelfTest extends DynamicIndexAbstractConcurrentSelfTest {
+ /**
+ * Constructor.
+ */
+ public DynamicIndexPartitionedTransactionalConcurrentSelfTest() {
+ super(CacheMode.PARTITIONED, CacheAtomicityMode.TRANSACTIONAL);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexReplicatedAtomicConcurrentSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexReplicatedAtomicConcurrentSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexReplicatedAtomicConcurrentSelfTest.java
new file mode 100644
index 0000000..2c6c9a9
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexReplicatedAtomicConcurrentSelfTest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.index;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+
+/**
+ * Concurrency tests for dynamic index create/drop for REPLICATED/ATOMIC cache.
+ */
+public class DynamicIndexReplicatedAtomicConcurrentSelfTest extends DynamicIndexAbstractConcurrentSelfTest {
+ /**
+ * Constructor.
+ */
+ public DynamicIndexReplicatedAtomicConcurrentSelfTest() {
+ super(CacheMode.REPLICATED, CacheAtomicityMode.ATOMIC);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexReplicatedTransactionalConcurrentSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexReplicatedTransactionalConcurrentSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexReplicatedTransactionalConcurrentSelfTest.java
new file mode 100644
index 0000000..9dc92a4
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexReplicatedTransactionalConcurrentSelfTest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.index;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+
+/**
+ * Concurrency tests for dynamic index create/drop for REPLICATED/TRANSACTIONAL cache.
+ */
+public class DynamicIndexReplicatedTransactionalConcurrentSelfTest extends DynamicIndexAbstractConcurrentSelfTest {
+ /**
+ * Constructor.
+ */
+ public DynamicIndexReplicatedTransactionalConcurrentSelfTest() {
+ super(CacheMode.REPLICATED, CacheAtomicityMode.TRANSACTIONAL);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexServerBasicSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexServerBasicSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexServerBasicSelfTest.java
new file mode 100644
index 0000000..c014229
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexServerBasicSelfTest.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.index;
+
+/**
+ * Test dynamic schema operations from non-coordinator node.
+ */
+public class DynamicIndexServerBasicSelfTest extends DynamicIndexAbstractBasicSelfTest {
+ /** {@inheritDoc} */
+ @Override protected int nodeIndex() {
+ return IDX_SRV_NON_CRD;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexServerCoordinatorBasicSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexServerCoordinatorBasicSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexServerCoordinatorBasicSelfTest.java
new file mode 100644
index 0000000..7427a4c
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexServerCoordinatorBasicSelfTest.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.index;
+
+/**
+ * Test dynamic schema operations from coordinator node.
+ */
+public class DynamicIndexServerCoordinatorBasicSelfTest extends DynamicIndexAbstractBasicSelfTest {
+ /** {@inheritDoc} */
+ @Override protected int nodeIndex() {
+ return IDX_SRV_CRD;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexServerNodeFIlterBasicSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexServerNodeFIlterBasicSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexServerNodeFIlterBasicSelfTest.java
new file mode 100644
index 0000000..b8acd1d
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexServerNodeFIlterBasicSelfTest.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.index;
+
+/**
+ * Test dynamic schema operations from server node which do not pass node filter.
+ */
+public class DynamicIndexServerNodeFIlterBasicSelfTest extends DynamicIndexAbstractBasicSelfTest {
+ /** {@inheritDoc} */
+ @Override protected int nodeIndex() {
+ return IDX_SRV_FILTERED;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexServerNodeFilterCoordinatorBasicSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexServerNodeFilterCoordinatorBasicSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexServerNodeFilterCoordinatorBasicSelfTest.java
new file mode 100644
index 0000000..e297fe1
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexServerNodeFilterCoordinatorBasicSelfTest.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.index;
+
+import org.apache.ignite.configuration.IgniteConfiguration;
+
+/**
+ * Test dynamic schema operations from server node which do not pass node filter and which is coordinator.
+ */
+public class DynamicIndexServerNodeFilterCoordinatorBasicSelfTest extends DynamicIndexServerCoordinatorBasicSelfTest {
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration serverCoordinatorConfiguration(int idx) throws Exception {
+ return serverConfiguration(idx, true);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexAbstractSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexAbstractSelfTest.java
new file mode 100644
index 0000000..cf563cc
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexAbstractSelfTest.java
@@ -0,0 +1,400 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.index;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import javax.cache.CacheException;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.binary.BinaryMarshaller;
+import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
+import org.apache.ignite.internal.processors.query.IgniteSQLException;
+import org.apache.ignite.internal.util.typedef.F;
+
+/**
+ * Test that checks indexes handling on H2 side.
+ */
+public abstract class H2DynamicIndexAbstractSelfTest extends AbstractSchemaSelfTest {
+ /** Client node index. */
+ private final static int CLIENT = 2;
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ for (IgniteConfiguration cfg : configurations())
+ Ignition.start(cfg);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ stopAllGrids();
+
+ super.afterTestsStopped();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ client().getOrCreateCache(cacheConfiguration());
+
+ assertNoIndex(CACHE_NAME, TBL_NAME, IDX_NAME_1);
+
+ IgniteCache<KeyClass, ValueClass> cache = client().cache(CACHE_NAME);
+
+ cache.put(new KeyClass(1), new ValueClass("val1"));
+ cache.put(new KeyClass(2), new ValueClass("val2"));
+ cache.put(new KeyClass(3), new ValueClass("val3"));
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ client().destroyCache(CACHE_NAME);
+
+ super.afterTest();
+ }
+
+ /**
+ * Test that after index creation index is used by queries.
+ */
+ public void testCreateIndex() throws Exception {
+ IgniteCache<KeyClass, ValueClass> cache = cache();
+
+ assertSize(3);
+
+ cache.query(new SqlFieldsQuery("CREATE INDEX \"" + IDX_NAME_1 + "\" ON \"" + TBL_NAME + "\"(\""
+ + FIELD_NAME_1 + "\" ASC)")).getAll();
+
+ // Test that local queries on all nodes use new index.
+ for (int i = 0 ; i < 4; i++) {
+ List<List<?>> locRes = ignite(i).cache("cache").query(new SqlFieldsQuery("explain select \"id\" from " +
+ "\"cache\".\"ValueClass\" where \"field1\" = 'A'").setLocal(true)).getAll();
+
+ assertEquals(F.asList(
+ Collections.singletonList("SELECT\n" +
+ " \"id\"\n" +
+ "FROM \"cache\".\"ValueClass\"\n" +
+ " /* \"cache\".\"idx_1\": \"field1\" = 'A' */\n" +
+ "WHERE \"field1\" = 'A'")
+ ), locRes);
+ }
+
+ assertSize(3);
+
+ cache.remove(new KeyClass(2));
+
+ assertSize(2);
+
+ cache.put(new KeyClass(4), new ValueClass("someVal"));
+
+ assertSize(3);
+ }
+
+ /**
+ * Test that creating an index with duplicate name yields an error.
+ */
+ public void testCreateIndexWithDuplicateName() {
+ final IgniteCache<KeyClass, ValueClass> cache = cache();
+
+ cache.query(new SqlFieldsQuery("CREATE INDEX \"" + IDX_NAME_1 + "\" ON \"" + TBL_NAME + "\"(\""
+ + FIELD_NAME_1 + "\" ASC)"));
+
+ assertSqlException(new RunnableX() {
+ @Override public void run() throws Exception {
+ cache.query(new SqlFieldsQuery("CREATE INDEX \"" + IDX_NAME_1 + "\" ON \"" + TBL_NAME + "\"(\"id\" ASC)"));
+ }
+ }, IgniteQueryErrorCode.INDEX_ALREADY_EXISTS);
+ }
+
+ /**
+ * Test that creating an index with duplicate name does not yield an error with {@code IF NOT EXISTS}.
+ */
+ public void testCreateIndexIfNotExists() {
+ final IgniteCache<KeyClass, ValueClass> cache = cache();
+
+ cache.query(new SqlFieldsQuery("CREATE INDEX \"" + IDX_NAME_1 + "\" ON \"" + TBL_NAME + "\"(\""
+ + FIELD_NAME_1 + "\" ASC)"));
+
+ cache.query(new SqlFieldsQuery("CREATE INDEX IF NOT EXISTS \"" + IDX_NAME_1 + "\" ON \"" + TBL_NAME +
+ "\"(\"id\" ASC)"));
+ }
+
+ /**
+ * Test that after index drop there are no attempts to use it, and data state remains intact.
+ */
+ public void testDropIndex() {
+ IgniteCache<KeyClass, ValueClass> cache = cache();
+
+ assertSize(3);
+
+ cache.query(new SqlFieldsQuery("CREATE INDEX \"" + IDX_NAME_1 + "\" ON \"" + TBL_NAME + "\"(\""
+ + FIELD_NAME_1 + "\" ASC)"));
+
+ assertSize(3);
+
+ cache.query(new SqlFieldsQuery("DROP INDEX \"" + IDX_NAME_1 + "\""));
+
+ // Test that no local queries on all nodes use new index.
+ for (int i = 0 ; i < 4; i++) {
+ List<List<?>> locRes = ignite(i).cache("cache").query(new SqlFieldsQuery("explain select \"id\" from " +
+ "\"cache\".\"ValueClass\" where \"field1\" = 'A'").setLocal(true)).getAll();
+
+ assertEquals(F.asList(
+ Collections.singletonList("SELECT\n" +
+ " \"id\"\n" +
+ "FROM \"cache\".\"ValueClass\"\n" +
+ " /* \"cache\".\"ValueClass\".__SCAN_ */\n" +
+ "WHERE \"field1\" = 'A'")
+ ), locRes);
+ }
+
+ assertSize(3);
+ }
+
+ /**
+ * Test that dropping a non-existent index yields an error.
+ */
+ public void testDropMissingIndex() {
+ final IgniteCache<KeyClass, ValueClass> cache = cache();
+
+ assertSqlException(new RunnableX() {
+ @Override public void run() throws Exception {
+ cache.query(new SqlFieldsQuery("DROP INDEX \"" + IDX_NAME_1 + "\""));
+ }
+ }, IgniteQueryErrorCode.INDEX_NOT_FOUND);
+ }
+
+ /**
+ * Test that dropping a non-existent index does not yield an error with {@code IF EXISTS}.
+ */
+ public void testDropMissingIndexIfExists() {
+ final IgniteCache<KeyClass, ValueClass> cache = cache();
+
+ cache.query(new SqlFieldsQuery("DROP INDEX IF EXISTS \"" + IDX_NAME_1 + "\""));
+ }
+
+ /**
+ * Test that changes in cache affect index, and vice versa.
+ */
+ public void testIndexState() {
+ IgniteCache<KeyClass, ValueClass> cache = cache();
+
+ assertColumnValues("val1", "val2", "val3");
+
+ cache.query(new SqlFieldsQuery("CREATE INDEX \"" + IDX_NAME_1 + "\" ON \"" + TBL_NAME + "\"(\""
+ + FIELD_NAME_1 + "\" ASC)"));
+
+ assertColumnValues("val1", "val2", "val3");
+
+ cache.remove(new KeyClass(2));
+
+ assertColumnValues("val1", "val3");
+
+ cache.put(new KeyClass(0), new ValueClass("someVal"));
+
+ assertColumnValues("someVal", "val1", "val3");
+
+ cache.query(new SqlFieldsQuery("DROP INDEX \"" + IDX_NAME_1 + "\""));
+
+ assertColumnValues("someVal", "val1", "val3");
+ }
+
+ /**
+ * Check that values of {@code field1} match what we expect.
+ * @param vals Expected values.
+ */
+ private void assertColumnValues(String... vals) {
+ List<List<?>> expRes = new ArrayList<>(vals.length);
+
+ for (String v : vals)
+ expRes.add(Collections.singletonList(v));
+
+ assertEquals(expRes, cache().query(new SqlFieldsQuery("SELECT \"" + FIELD_NAME_1 + "\" FROM \"" + TBL_NAME +
+ "\" ORDER BY \"id\""))
+ .getAll());
+ }
+
+ /**
+ * Do a {@code SELECT COUNT(*)} query to check index state correctness.
+ * @param expSize Expected number of items in table.
+ */
+ private void assertSize(long expSize) {
+ assertEquals(expSize, cache().size());
+
+ assertEquals(expSize, cache().query(new SqlFieldsQuery("SELECT COUNT(*) from \"ValueClass\""))
+ .getAll().get(0).get(0));
+ }
+
+ /**
+ * Get configurations to be used in test.
+ *
+ * @return Configurations.
+ * @throws Exception If failed.
+ */
+ private List<IgniteConfiguration> configurations() throws Exception {
+ return Arrays.asList(
+ serverConfiguration(0),
+ serverConfiguration(1),
+ clientConfiguration(2),
+ serverConfiguration(3)
+ );
+ }
+
+ /**
+ * @return Client node.
+ */
+ private Ignite client() {
+ return ignite(CLIENT);
+ }
+
+ /**
+ * @return Cache.
+ */
+ private IgniteCache<KeyClass, ValueClass> cache() {
+ return client().cache(CACHE_NAME);
+ }
+
+ /**
+ * Create server configuration.
+ *
+ * @param idx Index.
+ * @return Configuration.
+ * @throws Exception If failed.
+ */
+ private IgniteConfiguration serverConfiguration(int idx) throws Exception {
+ return commonConfiguration(idx);
+ }
+
+ /**
+ * Create client configuration.
+ *
+ * @param idx Index.
+ * @return Configuration.
+ * @throws Exception If failed.
+ */
+ private IgniteConfiguration clientConfiguration(int idx) throws Exception {
+ return commonConfiguration(idx).setClientMode(true);
+ }
+
+ /**
+ * Create common node configuration.
+ *
+ * @param idx Index.
+ * @return Configuration.
+ * @throws Exception If failed.
+ */
+ private IgniteConfiguration commonConfiguration(int idx) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(getTestIgniteInstanceName(idx));
+
+ cfg.setMarshaller(new BinaryMarshaller());
+
+ return optimize(cfg);
+ }
+
+ /**
+ * @return Default cache configuration.
+ */
+ private CacheConfiguration cacheConfiguration() {
+ CacheConfiguration<KeyClass, ValueClass> ccfg = new CacheConfiguration<KeyClass, ValueClass>()
+ .setName(CACHE_NAME);
+
+ QueryEntity entity = new QueryEntity();
+
+ entity.setKeyType(KeyClass.class.getName());
+ entity.setValueType(ValueClass.class.getName());
+
+ entity.addQueryField("id", Long.class.getName(), null);
+ entity.addQueryField(FIELD_NAME_1, String.class.getName(), null);
+ entity.addQueryField(FIELD_NAME_2, String.class.getName(), null);
+
+ entity.setKeyFields(Collections.singleton("id"));
+
+ entity.setAliases(Collections.singletonMap(FIELD_NAME_2, alias(FIELD_NAME_2)));
+
+ ccfg.setQueryEntities(Collections.singletonList(entity));
+
+ ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+ ccfg.setSqlEscapeAll(true);
+ ccfg.setAtomicityMode(atomicityMode());
+ ccfg.setCacheMode(cacheMode());
+
+ if (nearCache())
+ ccfg.setNearConfiguration(new NearCacheConfiguration<KeyClass, ValueClass>());
+
+ return ccfg;
+ }
+
+ /**
+ * @return Cache mode to use.
+ */
+ protected abstract CacheMode cacheMode();
+
+ /**
+ * @return Cache atomicity mode to use.
+ */
+ protected abstract CacheAtomicityMode atomicityMode();
+
+ /**
+ * @return Whether to use near cache.
+ */
+ protected abstract boolean nearCache();
+
+ /**
+ * Ensure that SQL exception is thrown.
+ *
+ * @param r Runnable.
+ * @param expCode Error code.
+ */
+ private static void assertSqlException(DynamicIndexAbstractBasicSelfTest.RunnableX r, int expCode) {
+ try {
+ try {
+ r.run();
+ }
+ catch (CacheException e) {
+ if (e.getCause() != null)
+ throw (Exception)e.getCause();
+ else
+ throw e;
+ }
+ }
+ catch (IgniteSQLException e) {
+ assertEquals("Unexpected error code [expected=" + expCode + ", actual=" + e.statusCode() + ']',
+ expCode, e.statusCode());
+
+ return;
+ }
+ catch (Exception e) {
+ fail("Unexpected exception: " + e);
+ }
+
+ fail(IgniteSQLException.class.getSimpleName() + " is not thrown.");
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexAtomicPartitionedNearSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexAtomicPartitionedNearSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexAtomicPartitionedNearSelfTest.java
new file mode 100644
index 0000000..96a7c14
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexAtomicPartitionedNearSelfTest.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.index;
+
+/** */
+public class H2DynamicIndexAtomicPartitionedNearSelfTest extends H2DynamicIndexAtomicPartitionedSelfTest {
+ /** {@inheritDoc} */
+ @Override protected boolean nearCache() {
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexAtomicPartitionedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexAtomicPartitionedSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexAtomicPartitionedSelfTest.java
new file mode 100644
index 0000000..0a4c48c
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexAtomicPartitionedSelfTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.index;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+
+/** */
+public class H2DynamicIndexAtomicPartitionedSelfTest extends H2DynamicIndexAbstractSelfTest {
+ /** {@inheritDoc} */
+ @Override protected CacheMode cacheMode() {
+ return CacheMode.PARTITIONED;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected CacheAtomicityMode atomicityMode() {
+ return CacheAtomicityMode.ATOMIC;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected boolean nearCache() {
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexAtomicReplicatedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexAtomicReplicatedSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexAtomicReplicatedSelfTest.java
new file mode 100644
index 0000000..fc9f9e7
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexAtomicReplicatedSelfTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.index;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+
+/** */
+public class H2DynamicIndexAtomicReplicatedSelfTest extends H2DynamicIndexAbstractSelfTest {
+ /** {@inheritDoc} */
+ @Override protected CacheMode cacheMode() {
+ return CacheMode.REPLICATED;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected CacheAtomicityMode atomicityMode() {
+ return CacheAtomicityMode.ATOMIC;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected boolean nearCache() {
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexTransactionalPartitionedNearSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexTransactionalPartitionedNearSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexTransactionalPartitionedNearSelfTest.java
new file mode 100644
index 0000000..e8c4fb2
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexTransactionalPartitionedNearSelfTest.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.index;
+
+/** */
+public class H2DynamicIndexTransactionalPartitionedNearSelfTest extends H2DynamicIndexTransactionalPartitionedSelfTest {
+ /** {@inheritDoc} */
+ @Override protected boolean nearCache() {
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexTransactionalPartitionedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexTransactionalPartitionedSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexTransactionalPartitionedSelfTest.java
new file mode 100644
index 0000000..ad61412
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexTransactionalPartitionedSelfTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.index;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+
+/** */
+public class H2DynamicIndexTransactionalPartitionedSelfTest extends H2DynamicIndexAbstractSelfTest {
+ /** {@inheritDoc} */
+ @Override protected CacheMode cacheMode() {
+ return CacheMode.PARTITIONED;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected CacheAtomicityMode atomicityMode() {
+ return CacheAtomicityMode.TRANSACTIONAL;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected boolean nearCache() {
+ return false;
+ }
+}