You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2017/04/24 08:21:56 UTC
[38/65] [abbrv] ignite git commit: IGNITE-4997: DDL: Fixed schema
state replay on client reconnect. This closes #1845.
IGNITE-4997: DDL: Fixed schema state replay on client reconnect. This closes #1845.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3eb52a8a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3eb52a8a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3eb52a8a
Branch: refs/heads/ignite-5024
Commit: 3eb52a8a4d42add524051a9611b1b7d1a1d17398
Parents: 8ad5a94
Author: Alexander Paschenko <al...@gmail.com>
Authored: Fri Apr 21 11:15:55 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Fri Apr 21 11:15:55 2017 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheProcessor.java | 12 ++
.../processors/query/GridQueryProcessor.java | 173 ++++++++++---------
.../cache/index/AbstractSchemaSelfTest.java | 44 +++--
.../DynamicIndexAbstractConcurrentSelfTest.java | 122 +++++++++++++
.../cache/index/SchemaExchangeSelfTest.java | 57 +++++-
5 files changed, 307 insertions(+), 101 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/3eb52a8a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index da6ebc1..28ef22f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -1185,6 +1185,18 @@ public class GridCacheProcessor extends GridProcessorAdapter {
cache.onReconnected();
reconnected.add(cache);
+
+ if (!sysCache) {
+ // Re-create cache structures inside indexing in order to apply recent schema changes.
+ GridCacheContext cctx = cache.context();
+
+ DynamicCacheDescriptor desc = cacheDescriptor(name);
+
+ assert desc != null;
+
+ ctx.query().onCacheStop0(cctx.name());
+ ctx.query().onCacheStart0(cctx, desc.schema());
+ }
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/3eb52a8a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 8381882..e0dc387 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -652,113 +652,122 @@ public class GridQueryProcessor extends GridProcessorAdapter {
}
/**
+ * Create type descriptors from schema and initialize indexing for given cache.<p>
+ * Use with {@link #busyLock} where appropriate.
* @param cctx Cache context.
* @param schema Initial schema.
* @throws IgniteCheckedException If failed.
*/
@SuppressWarnings({"deprecation", "ThrowableResultOfMethodCallIgnored"})
- private void initializeCache(GridCacheContext<?, ?> cctx, QuerySchema schema) throws IgniteCheckedException {
- String space = cctx.name();
+ public void onCacheStart0(GridCacheContext<?, ?> cctx, QuerySchema schema)
+ throws IgniteCheckedException {
- // Prepare candidates.
- List<Class<?>> mustDeserializeClss = new ArrayList<>();
+ cctx.shared().database().checkpointReadLock();
- Collection<QueryTypeCandidate> cands = new ArrayList<>();
+ try {
+ synchronized (stateMux) {
+ String space = cctx.name();
- Collection<QueryEntity> qryEntities = schema.entities();
+ // Prepare candidates.
+ List<Class<?>> mustDeserializeClss = new ArrayList<>();
- if (!F.isEmpty(qryEntities)) {
- for (QueryEntity qryEntity : qryEntities) {
- QueryTypeCandidate cand = QueryUtils.typeForQueryEntity(space, cctx, qryEntity, mustDeserializeClss);
+ Collection<QueryTypeCandidate> cands = new ArrayList<>();
- cands.add(cand);
- }
- }
+ Collection<QueryEntity> qryEntities = schema.entities();
- // Ensure that candidates has unique index names. Otherwise we will not be able to apply pending operations.
- Map<String, QueryTypeDescriptorImpl> tblTypMap = new HashMap<>();
- Map<String, QueryTypeDescriptorImpl> idxTypMap = new HashMap<>();
+ if (!F.isEmpty(qryEntities)) {
+ for (QueryEntity qryEntity : qryEntities) {
+ QueryTypeCandidate cand = QueryUtils.typeForQueryEntity(space, cctx, qryEntity,
+ mustDeserializeClss);
- for (QueryTypeCandidate cand : cands) {
- QueryTypeDescriptorImpl desc = cand.descriptor();
+ cands.add(cand);
+ }
+ }
- QueryTypeDescriptorImpl oldDesc = tblTypMap.put(desc.tableName(), desc);
+ // Ensure that candidates has unique index names. Otherwise we will not be able to apply pending operations.
+ Map<String, QueryTypeDescriptorImpl> tblTypMap = new HashMap<>();
+ Map<String, QueryTypeDescriptorImpl> idxTypMap = new HashMap<>();
- if (oldDesc != null)
- throw new IgniteException("Duplicate table name [cache=" + space +
- ", tblName=" + desc.tableName() + ", type1=" + desc.name() + ", type2=" + oldDesc.name() + ']');
+ for (QueryTypeCandidate cand : cands) {
+ QueryTypeDescriptorImpl desc = cand.descriptor();
- for (String idxName : desc.indexes().keySet()) {
- oldDesc = idxTypMap.put(idxName, desc);
+ QueryTypeDescriptorImpl oldDesc = tblTypMap.put(desc.tableName(), desc);
- if (oldDesc != null)
- throw new IgniteException("Duplicate index name [cache=" + space +
- ", idxName=" + idxName + ", type1=" + desc.name() + ", type2=" + oldDesc.name() + ']');
- }
- }
+ if (oldDesc != null)
+ throw new IgniteException("Duplicate table name [cache=" + space +
+ ", tblName=" + desc.tableName() + ", type1=" + desc.name() + ", type2=" + oldDesc.name() + ']');
- // Apply pending operation which could have been completed as no-op at this point. There could be only one
- // in-flight operation for a cache.
- synchronized (stateMux) {
- if (disconnected)
- return;
+ for (String idxName : desc.indexes().keySet()) {
+ oldDesc = idxTypMap.put(idxName, desc);
- for (SchemaOperation op : schemaOps.values()) {
- if (F.eq(op.proposeMessage().deploymentId(), cctx.dynamicDeploymentId())) {
- if (op.started()) {
- SchemaOperationWorker worker = op.manager().worker();
+ if (oldDesc != null)
+ throw new IgniteException("Duplicate index name [cache=" + space +
+ ", idxName=" + idxName + ", type1=" + desc.name() + ", type2=" + oldDesc.name() + ']');
+ }
+ }
- assert !worker.cacheRegistered();
+ // Apply pending operation which could have been completed as no-op at this point.
+ // There could be only one in-flight operation for a cache.
+ for (SchemaOperation op : schemaOps.values()) {
+ if (F.eq(op.proposeMessage().deploymentId(), cctx.dynamicDeploymentId())) {
+ if (op.started()) {
+ SchemaOperationWorker worker = op.manager().worker();
- if (!worker.nop()) {
- IgniteInternalFuture fut = worker.future();
+ assert !worker.cacheRegistered();
- assert fut.isDone();
+ if (!worker.nop()) {
+ IgniteInternalFuture fut = worker.future();
- if (fut.error() == null) {
- SchemaAbstractOperation op0 = op.proposeMessage().operation();
+ assert fut.isDone();
- if (op0 instanceof SchemaIndexCreateOperation) {
- SchemaIndexCreateOperation opCreate = (SchemaIndexCreateOperation)op0;
+ if (fut.error() == null) {
+ SchemaAbstractOperation op0 = op.proposeMessage().operation();
- QueryTypeDescriptorImpl typeDesc = tblTypMap.get(opCreate.tableName());
+ if (op0 instanceof SchemaIndexCreateOperation) {
+ SchemaIndexCreateOperation opCreate = (SchemaIndexCreateOperation) op0;
- assert typeDesc != null;
+ QueryTypeDescriptorImpl typeDesc = tblTypMap.get(opCreate.tableName());
- QueryUtils.processDynamicIndexChange(opCreate.indexName(), opCreate.index(),
- typeDesc);
- }
- else if (op0 instanceof SchemaIndexDropOperation) {
- SchemaIndexDropOperation opDrop = (SchemaIndexDropOperation)op0;
+ assert typeDesc != null;
+
+ QueryUtils.processDynamicIndexChange(opCreate.indexName(), opCreate.index(),
+ typeDesc);
+ }
+ else if (op0 instanceof SchemaIndexDropOperation) {
+ SchemaIndexDropOperation opDrop = (SchemaIndexDropOperation) op0;
- QueryTypeDescriptorImpl typeDesc = idxTypMap.get(opDrop.indexName());
+ QueryTypeDescriptorImpl typeDesc = idxTypMap.get(opDrop.indexName());
- assert typeDesc != null;
+ assert typeDesc != null;
- QueryUtils.processDynamicIndexChange(opDrop.indexName(), null, typeDesc);
+ QueryUtils.processDynamicIndexChange(opDrop.indexName(), null, typeDesc);
+ }
+ else
+ assert false;
}
- else
- assert false;
}
}
+
+ break;
}
+ }
- break;
+ // Ready to register at this point.
+ registerCache0(space, cctx, cands);
+
+ // Warn about possible implicit deserialization.
+ if (!mustDeserializeClss.isEmpty()) {
+ U.warn(log, "Some classes in query configuration cannot be written in binary format " +
+ "because they either implement Externalizable interface or have writeObject/readObject " +
+ "methods. Instances of these classes will be deserialized in order to build indexes. Please " +
+ "ensure that all nodes have these classes in classpath. To enable binary serialization " +
+ "either implement " + Binarylizable.class.getSimpleName() + " interface or set explicit " +
+ "serializer using BinaryTypeConfiguration.setSerializer() method: " + mustDeserializeClss);
}
}
}
-
- // Ready to register at this point.
- registerCache0(space, cctx, cands);
-
- // Warn about possible implicit deserialization.
- if (!mustDeserializeClss.isEmpty()) {
- U.warn(log, "Some classes in query configuration cannot be written in binary format " +
- "because they either implement Externalizable interface or have writeObject/readObject methods. " +
- "Instances of these classes will be deserialized in order to build indexes. Please ensure that " +
- "all nodes have these classes in classpath. To enable binary serialization either implement " +
- Binarylizable.class.getSimpleName() + " interface or set explicit serializer using " +
- "BinaryTypeConfiguration.setSerializer() method: " + mustDeserializeClss);
+ finally {
+ cctx.shared().database().checkpointReadUnlock();
}
}
@@ -780,7 +789,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
schemaOps.clear();
}
- // Complete client futures outside of synchonized block because they may have listeners/chains.
+ // Complete client futures outside of synchronized block because they may have listeners/chains.
for (SchemaOperationClientFuture fut : futs)
fut.onDone(new SchemaOperationException("Client node is disconnected (operation result is unknown)."));
@@ -804,14 +813,10 @@ public class GridQueryProcessor extends GridProcessorAdapter {
if (!busyLock.enterBusy())
return;
- cctx.shared().database().checkpointReadLock();
-
try {
- initializeCache(cctx, schema);
+ onCacheStart0(cctx, schema);
}
finally {
- cctx.shared().database().checkpointReadUnlock();
-
busyLock.leaveBusy();
}
}
@@ -827,7 +832,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
return;
try {
- unregisterCache0(cctx.name());
+ onCacheStop0(cctx.name());
}
finally {
busyLock.leaveBusy();
@@ -1302,7 +1307,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
spaces.add(CU.mask(space));
}
catch (IgniteCheckedException | RuntimeException e) {
- unregisterCache0(space);
+ onCacheStop0(space);
throw e;
}
@@ -1310,12 +1315,14 @@ public class GridQueryProcessor extends GridProcessorAdapter {
}
/**
- * Unregister cache.
+ * Unregister cache.<p>
+ * Use with {@link #busyLock} where appropriate.
*
* @param space Space.
*/
- private void unregisterCache0(String space) {
- assert idx != null;
+ public void onCacheStop0(String space) {
+ if (idx == null)
+ return;
synchronized (stateMux) {
// Clear types.
@@ -1464,8 +1471,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
fut.onDone(e);
- if (e instanceof Error)
- throw e;
+ throw e;
}
}
};
@@ -1547,6 +1553,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
* @return Type descriptor if found.
* @throws IgniteCheckedException If type check failed.
*/
+ @SuppressWarnings("ConstantConditions")
@Nullable private QueryTypeDescriptorImpl typeByValue(CacheObjectContext coctx,
KeyCacheObject key,
CacheObject val,
http://git-wip-us.apache.org/repos/asf/ignite/blob/3eb52a8a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/AbstractSchemaSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/AbstractSchemaSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/AbstractSchemaSelfTest.java
index e228026..19d6f54 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/AbstractSchemaSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/AbstractSchemaSelfTest.java
@@ -27,6 +27,7 @@ import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
+import org.apache.ignite.internal.processors.query.GridQueryIndexDescriptor;
import org.apache.ignite.internal.processors.query.GridQueryProcessor;
import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
import org.apache.ignite.internal.processors.query.QueryIndexDescriptorImpl;
@@ -131,7 +132,7 @@ public class AbstractSchemaSelfTest extends GridCommonAbstractTest {
}
/**
- * Assert index state on all nodes.
+ * Assert index state on all <b>affinity</b> nodes.
*
* @param cacheName Cache name.
* @param tblName Table name.
@@ -140,25 +141,44 @@ public class AbstractSchemaSelfTest extends GridCommonAbstractTest {
*/
protected static void assertIndex(String cacheName, String tblName, String idxName,
IgniteBiTuple<String, Boolean>... fields) {
+ assertIndex(cacheName, false, tblName, idxName, fields);
+ }
+
+ /**
+ * Assert index state on all nodes.
+ *
+ * @param cacheName Cache name.
+ * @param checkNonAffinityNodes Whether existence of {@link GridQueryIndexDescriptor} must be checked on non
+ * affinity nodes as well.
+ * @param tblName Table name.
+ * @param idxName Index name.
+ * @param fields Fields.
+ */
+ protected static void assertIndex(String cacheName, boolean checkNonAffinityNodes, String tblName, String idxName,
+ IgniteBiTuple<String, Boolean>... fields) {
for (Ignite node : Ignition.allGrids())
- assertIndex((IgniteEx)node, cacheName, tblName, idxName, fields);
+ assertIndex(node, checkNonAffinityNodes, cacheName, tblName, idxName, fields);
}
/**
* Assert index state on particular node.
*
* @param node Node.
+ * @param checkNonAffinityNode Whether existence of {@link GridQueryIndexDescriptor} must be checked regardless of
+ * whether this node is affinity node or not.
* @param cacheName Cache name.
* @param tblName Table name.
* @param idxName Index name.
* @param fields Fields.
*/
- protected static void assertIndex(IgniteEx node, String cacheName, String tblName, String idxName,
- IgniteBiTuple<String, Boolean>... fields) {
- assertIndexDescriptor(node, cacheName, tblName, idxName, fields);
+ protected static void assertIndex(Ignite node, boolean checkNonAffinityNode, String cacheName, String tblName,
+ String idxName, IgniteBiTuple<String, Boolean>... fields) {
+ IgniteEx node0 = (IgniteEx)node;
+
+ assertIndexDescriptor(node0, cacheName, tblName, idxName, fields);
- if (affinityNode(node, cacheName)) {
- QueryTypeDescriptorImpl typeDesc = typeExisting(node, cacheName, tblName);
+ if (checkNonAffinityNode || affinityNode(node0, cacheName)) {
+ QueryTypeDescriptorImpl typeDesc = typeExisting(node0, cacheName, tblName);
assertIndex(typeDesc, idxName, fields);
}
@@ -263,11 +283,13 @@ public class AbstractSchemaSelfTest extends GridCommonAbstractTest {
* @param tblName Table name.
* @param idxName Index name.
*/
- protected static void assertNoIndex(IgniteEx node, String cacheName, String tblName, String idxName) {
- assertNoIndexDescriptor(node, cacheName, idxName);
+ protected static void assertNoIndex(Ignite node, String cacheName, String tblName, String idxName) {
+ IgniteEx node0 = (IgniteEx)node;
+
+ assertNoIndexDescriptor(node0, cacheName, idxName);
- if (affinityNode(node, cacheName)) {
- QueryTypeDescriptorImpl typeDesc = typeExisting(node, cacheName, tblName);
+ if (affinityNode(node0, cacheName)) {
+ QueryTypeDescriptorImpl typeDesc = typeExisting(node0, cacheName, tblName);
assertNoIndex(typeDesc, idxName);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/3eb52a8a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexAbstractConcurrentSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexAbstractConcurrentSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexAbstractConcurrentSelfTest.java
index d2a2f49..5976615 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexAbstractConcurrentSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexAbstractConcurrentSelfTest.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.index;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
import org.apache.ignite.Ignition;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.cache.CacheAtomicityMode;
@@ -28,6 +29,7 @@ import org.apache.ignite.cache.QueryIndex;
import org.apache.ignite.cache.query.SqlQuery;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteClientReconnectAbstractTest;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.query.GridQueryProcessor;
@@ -47,6 +49,8 @@ import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import static org.apache.ignite.internal.IgniteClientReconnectAbstractTest.TestTcpDiscoverySpi;
+
/**
* Concurrency tests for dynamic index create/drop.
*/
@@ -111,6 +115,11 @@ public abstract class DynamicIndexAbstractConcurrentSelfTest extends DynamicInde
return ccfg.setCacheMode(cacheMode).setAtomicityMode(atomicityMode);
}
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration commonConfiguration(int idx) throws Exception {
+ return super.commonConfiguration(idx).setDiscoverySpi(new TestTcpDiscoverySpi());
+ }
+
/**
* Make sure that coordinator migrates correctly between nodes.
*
@@ -600,6 +609,119 @@ public abstract class DynamicIndexAbstractConcurrentSelfTest extends DynamicInde
}
/**
+ * Make sure that client receives schema changes made while it was disconnected.
+ *
+ * @throws Exception If failed.
+ */
+ public void testClientReconnect() throws Exception {
+ checkClientReconnect(false);
+ }
+
+ /**
+ * Make sure that client receives schema changes made while it was disconnected, even with cache recreation.
+ *
+ * @throws Exception If failed.
+ */
+ public void testClientReconnectWithCacheRestart() throws Exception {
+ checkClientReconnect(true);
+ }
+
+ /**
+ * Make sure that client receives schema changes made while it was disconnected, optionally with cache restart
+ * in the interim.
+ *
+ * @param restartCache Whether cache needs to be recreated during client's absence.
+ * @throws Exception If failed.
+ */
+ private void checkClientReconnect(final boolean restartCache) throws Exception {
+ // Start complex topology.
+ final Ignite srv = Ignition.start(serverConfiguration(1));
+ Ignition.start(serverConfiguration(2));
+ Ignition.start(serverConfiguration(3, true));
+
+ final Ignite cli = Ignition.start(clientConfiguration(4));
+
+ cli.createCache(cacheConfiguration());
+
+ // Check index create.
+ reconnectClientNode(srv, cli, restartCache, new RunnableX() {
+ @Override public void run() throws Exception {
+ final QueryIndex idx = index(IDX_NAME_1, field(FIELD_NAME_1));
+
+ queryProcessor(srv).dynamicIndexCreate(CACHE_NAME, TBL_NAME, idx, false).get();
+ }
+ });
+
+ assertIndex(cli, true, CACHE_NAME, TBL_NAME, IDX_NAME_1, field(FIELD_NAME_1));
+ assertIndexUsed(IDX_NAME_1, SQL_SIMPLE_FIELD_1, SQL_ARG_1);
+
+ // Check index drop.
+ reconnectClientNode(srv, cli, restartCache, new RunnableX() {
+ @Override public void run() throws Exception {
+ if (!restartCache)
+ queryProcessor(srv).dynamicIndexDrop(CACHE_NAME, IDX_NAME_1, false).get();
+ }
+ });
+
+ assertNoIndex(cli, CACHE_NAME, TBL_NAME, IDX_NAME_1);
+ assertIndexNotUsed(IDX_NAME_1, SQL_SIMPLE_FIELD_1, SQL_ARG_1);
+
+ // Update existing index.
+ QueryIndex idx = index(IDX_NAME_2, field(alias(FIELD_NAME_2)));
+
+ queryProcessor(srv).dynamicIndexCreate(CACHE_NAME, TBL_NAME, idx, false).get();
+
+ assertIndex(cli, true, CACHE_NAME, TBL_NAME, IDX_NAME_2, field(alias(FIELD_NAME_2)));
+ assertIndexUsed(IDX_NAME_2, SQL_SIMPLE_FIELD_2, SQL_ARG_2);
+
+ reconnectClientNode(srv, cli, restartCache, new RunnableX() {
+ @Override public void run() throws Exception {
+ if (!restartCache)
+ queryProcessor(srv).dynamicIndexDrop(CACHE_NAME, IDX_NAME_2, false).get();
+
+ final QueryIndex idx = index(IDX_NAME_2, field(FIELD_NAME_1), field(alias(FIELD_NAME_2)));
+
+ queryProcessor(srv).dynamicIndexCreate(CACHE_NAME, TBL_NAME, idx, false);
+ }
+ });
+
+ assertIndex(CACHE_NAME, TBL_NAME, IDX_NAME_2, field(FIELD_NAME_1), field(alias(FIELD_NAME_2)));
+ assertIndexUsed(IDX_NAME_2, SQL_COMPOSITE, SQL_ARG_1, SQL_ARG_2);
+ }
+
+ /**
+ * Reconnect the client and run specified actions while it's out.
+ *
+ * @param srvNode Server node.
+ * @param cliNode Client node.
+ * @param restart Whether cache has to be recreated prior to executing required actions.
+ * @param clo Closure to run
+ * @throws Exception If failed.
+ */
+ private void reconnectClientNode(final Ignite srvNode, final Ignite cliNode, final boolean restart,
+ final RunnableX clo) throws Exception {
+ IgniteClientReconnectAbstractTest.reconnectClientNode(log, cliNode, srvNode, new Runnable() {
+ @Override public void run() {
+ if (restart) {
+ srvNode.destroyCache(CACHE_NAME);
+
+ srvNode.getOrCreateCache(cacheConfiguration().setName(CACHE_NAME));
+ }
+
+ try {
+ clo.run();
+ }
+ catch (Exception e) {
+ throw new IgniteException("Test reconnect runnable failed.", e);
+ }
+ }
+ });
+
+ if (restart)
+ cliNode.cache(CACHE_NAME);
+ }
+
+ /**
* Test concurrent node start/stop along with index operations. Nothing should hang.
*
* @throws Exception If failed.
http://git-wip-us.apache.org/repos/asf/ignite/blob/3eb52a8a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/SchemaExchangeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/SchemaExchangeSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/SchemaExchangeSelfTest.java
index 95ad2f1..5a00345 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/SchemaExchangeSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/SchemaExchangeSelfTest.java
@@ -17,8 +17,11 @@
package org.apache.ignite.internal.processors.cache.index;
+import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteClientDisconnectedException;
+import org.apache.ignite.IgniteException;
import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.QueryIndex;
import org.apache.ignite.cache.query.annotations.QuerySqlField;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
@@ -34,6 +37,9 @@ import org.apache.ignite.testframework.GridTestUtils;
import java.util.Collections;
import java.util.Map;
+import static org.apache.ignite.internal.IgniteClientReconnectAbstractTest.TestTcpDiscoverySpi;
+import static org.apache.ignite.internal.IgniteClientReconnectAbstractTest.reconnectClientNode;
+
/**
* Tests for schema exchange between nodes.
*/
@@ -396,11 +402,11 @@ public class SchemaExchangeSelfTest extends AbstractSchemaSelfTest {
}
/**
- * Test client reconnect.
+ * Test client reconnect after server restart accompanied by schema change.
*
* @throws Exception If failed.
*/
- public void testClientReconnect() throws Exception {
+ public void testServerRestartWithNewTypes() throws Exception {
IgniteEx node1 = start(1, KeyClass.class, ValueClass.class);
assertTypes(node1, ValueClass.class);
@@ -412,11 +418,11 @@ public class SchemaExchangeSelfTest extends AbstractSchemaSelfTest {
stopGrid(1);
- assert GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() {
@Override public boolean apply() {
return grid(2).context().clientDisconnected();
}
- }, 10_000L);
+ }, 10_000L));
IgniteFuture reconnFut = null;
@@ -441,6 +447,40 @@ public class SchemaExchangeSelfTest extends AbstractSchemaSelfTest {
}
/**
+ * Test client reconnect.
+ *
+ * @throws Exception If failed.
+ */
+ @SuppressWarnings("unchecked")
+ public void testClientReconnect() throws Exception {
+ final IgniteEx node1 = start(1, KeyClass.class, ValueClass.class);
+ assertTypes(node1, ValueClass.class);
+
+ final IgniteEx node2 = startClientNoCache(2);
+ assertTypes(node2);
+
+ node2.cache(CACHE_NAME);
+ assertTypes(node2, ValueClass.class);
+
+ reconnectClientNode(log, node2, node1, new Runnable() {
+ @Override public void run() {
+ assertTrue(node2.context().clientDisconnected());
+
+ final QueryIndex idx = index(IDX_NAME_1, field(FIELD_NAME_1));
+
+ try {
+ queryProcessor(node1).dynamicIndexCreate(CACHE_NAME, TBL_NAME, idx, false).get();
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+ });
+
+ assertIndex(CACHE_NAME, TBL_NAME, IDX_NAME_1, field(FIELD_NAME_1));
+ }
+
+ /**
* Ensure that only provided types exists for the given cache.
*
* @param node Node.
@@ -449,15 +489,15 @@ public class SchemaExchangeSelfTest extends AbstractSchemaSelfTest {
private static void assertTypes(IgniteEx node, Class... clss) {
Map<String, QueryTypeDescriptorImpl> types = types(node, CACHE_NAME);
- if (clss == null || clss.length == 0)
- assert types.isEmpty();
+ if (F.isEmpty(clss))
+ assertTrue(types.isEmpty());
else {
assertEquals(clss.length, types.size());
for (Class cls : clss) {
String tblName = tableName(cls);
- assert types.containsKey(tblName);
+ assertTrue(types.containsKey(tblName));
}
}
}
@@ -498,6 +538,7 @@ public class SchemaExchangeSelfTest extends AbstractSchemaSelfTest {
cfg.setClientMode(client);
cfg.setLocalHost("127.0.0.1");
cfg.setCacheConfiguration(cacheConfiguration(clss));
+ cfg.setDiscoverySpi(new TestTcpDiscoverySpi());
if (filterNodeName != null && F.eq(name, filterNodeName))
cfg.setUserAttributes(Collections.singletonMap("AFF_NODE", true));
@@ -543,6 +584,8 @@ public class SchemaExchangeSelfTest extends AbstractSchemaSelfTest {
cfg.setClientMode(client);
cfg.setLocalHost("127.0.0.1");
+ cfg.setDiscoverySpi(new TestTcpDiscoverySpi());
+
return (IgniteEx)Ignition.start(cfg);
}