You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2018/04/17 15:59:05 UTC
[10/23] ignite git commit: IGNITE-8048 Store dynamic indexes to cache
data on node join - Fixes #3719.
IGNITE-8048 Store dynamic indexes to cache data on node join - Fixes #3719.
Signed-off-by: Alexey Goncharuk <al...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/bbc439b8
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/bbc439b8
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/bbc439b8
Branch: refs/heads/ignite-7708
Commit: bbc439b892a145a0b50b7b5dfd8c989d9868a1e1
Parents: e5c3f89
Author: Anton Kalashnikov <ka...@yandex.ru>
Authored: Tue Apr 17 10:30:52 2018 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Tue Apr 17 10:30:52 2018 +0300
----------------------------------------------------------------------
.../org/apache/ignite/cache/QueryEntity.java | 177 ++++++-
.../apache/ignite/cache/QueryEntityPatch.java | 118 +++++
.../cache/CacheJoinNodeDiscoveryData.java | 15 +-
.../processors/cache/ClusterCachesInfo.java | 428 +++++++++++----
.../cache/DynamicCacheDescriptor.java | 28 +
.../processors/cache/GridCacheProcessor.java | 94 +++-
.../cluster/GridClusterStateProcessor.java | 8 +-
.../internal/processors/query/QueryField.java | 10 +
.../internal/processors/query/QuerySchema.java | 84 ++-
.../processors/query/QuerySchemaPatch.java | 96 ++++
...erActivateDeactivateTestWithPersistence.java | 18 +-
.../cache/IgniteDynamicSqlRestoreTest.java | 529 +++++++++++++++++++
...ynamicColumnsAbstractConcurrentSelfTest.java | 3 +-
.../IgniteCacheQuerySelfTestSuite.java | 2 +
14 files changed, 1481 insertions(+), 129 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/bbc439b8/modules/core/src/main/java/org/apache/ignite/cache/QueryEntity.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/QueryEntity.java b/modules/core/src/main/java/org/apache/ignite/cache/QueryEntity.java
index 976bd67..37a7f15 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/QueryEntity.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/QueryEntity.java
@@ -17,6 +17,7 @@
package org.apache.ignite.cache;
+import javax.cache.CacheException;
import java.io.Serializable;
import java.lang.reflect.Field;
import java.math.BigDecimal;
@@ -27,23 +28,29 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
-import javax.cache.CacheException;
+import java.util.UUID;
import org.apache.ignite.cache.query.annotations.QueryGroupIndex;
import org.apache.ignite.cache.query.annotations.QuerySqlField;
import org.apache.ignite.cache.query.annotations.QueryTextField;
import org.apache.ignite.internal.processors.cache.query.QueryEntityClassProperty;
import org.apache.ignite.internal.processors.cache.query.QueryEntityTypeDescriptor;
import org.apache.ignite.internal.processors.query.GridQueryIndexDescriptor;
+import org.apache.ignite.internal.processors.query.QueryField;
import org.apache.ignite.internal.processors.query.QueryUtils;
+import org.apache.ignite.internal.processors.query.schema.operation.SchemaAbstractOperation;
+import org.apache.ignite.internal.processors.query.schema.operation.SchemaAlterTableAddColumnOperation;
+import org.apache.ignite.internal.processors.query.schema.operation.SchemaIndexCreateOperation;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
+import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import static java.util.Collections.unmodifiableMap;
@@ -153,6 +160,172 @@ public class QueryEntity implements Serializable {
}
/**
+ * Make query entity patch. This patch can only add properties to entity and can't remove them.
+ * Other words, the patch will contain only add operations(e.g. add column, create index) and not remove ones.
+ *
+ * @param target Query entity to which this entity should be expanded.
+ * @return Patch which contains operations for expanding this entity.
+ */
+ @NotNull public QueryEntityPatch makePatch(QueryEntity target) {
+ if (target == null)
+ return QueryEntityPatch.empty();
+
+ StringBuilder conflicts = new StringBuilder();
+
+ checkEquals(conflicts, "keyType", keyType, target.keyType);
+ checkEquals(conflicts, "valType", valType, target.valType);
+ checkEquals(conflicts, "keyFieldName", keyFieldName, target.keyFieldName);
+ checkEquals(conflicts, "valueFieldName", valueFieldName, target.valueFieldName);
+ checkEquals(conflicts, "tableName", tableName, target.tableName);
+
+ List<QueryField> queryFieldsToAdd = checkFields(target, conflicts);
+
+ Collection<QueryIndex> indexesToAdd = checkIndexes(target, conflicts);
+
+ if (conflicts.length() != 0)
+ return QueryEntityPatch.conflict(tableName + " conflict: \n" + conflicts.toString());
+
+ Collection<SchemaAbstractOperation> patchOperations = new ArrayList<>();
+
+ if (!queryFieldsToAdd.isEmpty())
+ patchOperations.add(new SchemaAlterTableAddColumnOperation(
+ UUID.randomUUID(),
+ null,
+ null,
+ tableName,
+ queryFieldsToAdd,
+ true,
+ true
+ ));
+
+ if (!indexesToAdd.isEmpty()) {
+ for (QueryIndex index : indexesToAdd) {
+ patchOperations.add(new SchemaIndexCreateOperation(
+ UUID.randomUUID(),
+ null,
+ null,
+ tableName,
+ index,
+ true,
+ 0
+ ));
+ }
+ }
+
+ return QueryEntityPatch.patch(patchOperations);
+ }
+
+ /**
+ * Comparing local fields and target fields.
+ *
+ * @param target Query entity for check.
+ * @param conflicts Storage of conflicts.
+ * @return Indexes which exist in target and not exist in local.
+ */
+ @NotNull private Collection<QueryIndex> checkIndexes(QueryEntity target, StringBuilder conflicts) {
+ HashSet<QueryIndex> indexesToAdd = new HashSet<>();
+
+ Map<String, QueryIndex> currentIndexes = new HashMap<>();
+
+ for (QueryIndex index : getIndexes()) {
+ if (currentIndexes.put(index.getName(), index) != null)
+ throw new IllegalStateException("Duplicate key");
+ }
+
+ for (QueryIndex queryIndex : target.getIndexes()) {
+ if(currentIndexes.containsKey(queryIndex.getName())) {
+ checkEquals(
+ conflicts,
+ "index " + queryIndex.getName(),
+ currentIndexes.get(queryIndex.getName()),
+ queryIndex
+ );
+ }
+ else
+ indexesToAdd.add(queryIndex);
+ }
+ return indexesToAdd;
+ }
+
+ /**
+ * Comparing local entity fields and target entity fields.
+ *
+ * @param target Query entity for check.
+ * @param conflicts Storage of conflicts.
+ * @return Fields which exist in target and not exist in local.
+ */
+ private List<QueryField> checkFields(QueryEntity target, StringBuilder conflicts) {
+ List<QueryField> queryFieldsToAdd = new ArrayList<>();
+
+ for (Map.Entry<String, String> targetField : target.getFields().entrySet()) {
+ String targetFieldName = targetField.getKey();
+ String targetFieldType = targetField.getValue();
+
+ if (getFields().containsKey(targetFieldName)) {
+ checkEquals(
+ conflicts,
+ "fieldType of " + targetFieldName,
+ getFields().get(targetFieldName),
+ targetFieldType
+ );
+
+ checkEquals(
+ conflicts,
+ "nullable of " + targetFieldName,
+ contains(getNotNullFields(), targetFieldName),
+ contains(target.getNotNullFields(), targetFieldName)
+ );
+
+ checkEquals(
+ conflicts,
+ "default value of " + targetFieldName,
+ getFromMap(getDefaultFieldValues(), targetFieldName),
+ getFromMap(target.getDefaultFieldValues(), targetFieldName)
+ );
+ }
+ else {
+ queryFieldsToAdd.add(new QueryField(
+ targetFieldName,
+ targetFieldType,
+ !contains(target.getNotNullFields(),targetFieldName),
+ getFromMap(target.getDefaultFieldValues(), targetFieldName)
+ ));
+ }
+ }
+
+ return queryFieldsToAdd;
+ }
+
+ /**
+ * @param collection Collection for checking.
+ * @param elementToCheck Element for checking to containing in collection.
+ * @return {@code true} if collection contain elementToCheck.
+ */
+ private static boolean contains(Collection<String> collection, String elementToCheck) {
+ return collection != null && collection.contains(elementToCheck);
+ }
+
+ /**
+ * @return Value from sourceMap or null if map is null.
+ */
+ private static Object getFromMap(Map<String, Object> sourceMap, String key) {
+ return sourceMap == null ? null : sourceMap.get(key);
+ }
+
+ /**
+ * Comparing two objects and add formatted text to conflicts if needed.
+ *
+ * @param conflicts Storage of conflicts resulting error message.
+ * @param name Name of comparing object.
+ * @param local Local object.
+ * @param received Received object.
+ */
+ private void checkEquals(StringBuilder conflicts, String name, Object local, Object received) {
+ if (!Objects.equals(local, received))
+ conflicts.append(String.format("%s is different: local=%s, received=%s\n", name, local, received));
+ }
+
+ /**
* Gets key type for this query pair.
*
* @return Key type.
@@ -319,7 +492,7 @@ public class QueryEntity implements Serializable {
*
* @return Collection of index entities.
*/
- public Collection<QueryIndex> getIndexes() {
+ @NotNull public Collection<QueryIndex> getIndexes() {
return idxs == null ? Collections.<QueryIndex>emptyList() : idxs;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/bbc439b8/modules/core/src/main/java/org/apache/ignite/cache/QueryEntityPatch.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/QueryEntityPatch.java b/modules/core/src/main/java/org/apache/ignite/cache/QueryEntityPatch.java
new file mode 100644
index 0000000..38e1b2a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/cache/QueryEntityPatch.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache;
+
+import java.util.Collection;
+import java.util.Objects;
+import org.apache.ignite.internal.processors.query.schema.operation.SchemaAbstractOperation;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * Query entity patch which contain {@link SchemaAbstractOperation} operations for changing query entity.
+ * This patch can only add properties to entity and can't remove them.
+ * Other words, the patch will contain only add operations
+ * (e.g.:
+ * {@link org.apache.ignite.internal.processors.query.schema.operation.SchemaAlterTableAddColumnOperation},
+ * {@link org.apache.ignite.internal.processors.query.schema.operation.SchemaIndexCreateOperation}
+ * ) and not remove ones.
+ *
+ * It contain only add operation because at the moment we don't have history of schema operations
+ * and by current state we can't understand some property was already deleted or it has not been added yet.
+ */
+public class QueryEntityPatch {
+ /** Empty query entity patch. */
+ private static final QueryEntityPatch EMPTY_QUERY_ENTITY_PATCH = new QueryEntityPatch(null, null);
+
+ /** Message which described conflicts during creating this patch. */
+ private String conflictsMessage;
+
+ /** Operations for modification query entity. */
+ private Collection<SchemaAbstractOperation> patchOperations;
+
+ /**
+ * Create patch.
+ */
+ private QueryEntityPatch(String conflictsMessage, Collection<SchemaAbstractOperation> patchOperations) {
+ this.conflictsMessage = conflictsMessage;
+ this.patchOperations = patchOperations;
+ }
+
+ /**
+ * Builder method for patch with conflicts.
+ *
+ * @param conflicts Conflicts.
+ * @return Query entity patch with conflicts.
+ */
+ public static QueryEntityPatch conflict(String conflicts) {
+ return new QueryEntityPatch(conflicts, null);
+ }
+
+ /**
+ * Builder method for empty patch.
+ *
+ * @return Query entity patch.
+ */
+ public static QueryEntityPatch empty() {
+ return EMPTY_QUERY_ENTITY_PATCH;
+ }
+
+ /**
+ * Builder method for patch with operations.
+ *
+ * @param patchOperations Operations for modification.
+ * @return Query entity patch which contain {@link SchemaAbstractOperation} operations for changing query entity.
+ */
+ public static QueryEntityPatch patch(Collection<SchemaAbstractOperation> patchOperations) {
+ return new QueryEntityPatch(null, patchOperations);
+ }
+
+ /**
+ * Check for conflict in this patch.
+ *
+ * @return {@code true} if patch has conflict.
+ */
+ public boolean hasConflict() {
+ return conflictsMessage != null;
+ }
+
+ /**
+ * @return {@code true} if patch is empty and can't be applying.
+ */
+ public boolean isEmpty() {
+ return patchOperations == null || patchOperations.isEmpty();
+ }
+
+ /**
+ * @return Conflicts.
+ */
+ public String getConflictsMessage() {
+ return conflictsMessage;
+ }
+
+ /**
+ * @return Patch operations for applying.
+ */
+ public Collection<SchemaAbstractOperation> getPatchOperations() {
+ return patchOperations;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(QueryEntityPatch.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/bbc439b8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheJoinNodeDiscoveryData.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheJoinNodeDiscoveryData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheJoinNodeDiscoveryData.java
index 6d2688c..a3902de 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheJoinNodeDiscoveryData.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheJoinNodeDiscoveryData.java
@@ -112,17 +112,23 @@ public class CacheJoinNodeDiscoveryData implements Serializable {
/** Flags added for future usage. */
private final long flags;
+ /** Statically configured flag */
+ private final boolean staticallyConfigured;
+
/**
* @param cacheData Cache data.
* @param cacheType Cache type.
* @param sql SQL flag - {@code true} if cache was created with {@code CREATE TABLE}.
* @param flags Flags (for future usage).
+ * @param staticallyConfigured {@code true} if it was configured by static config and {@code false} otherwise.
*/
- public CacheInfo(StoredCacheData cacheData, CacheType cacheType, boolean sql, long flags) {
+ public CacheInfo(StoredCacheData cacheData, CacheType cacheType, boolean sql, long flags,
+ boolean staticallyConfigured) {
this.cacheData = cacheData;
this.cacheType = cacheType;
this.sql = sql;
this.flags = flags;
+ this.staticallyConfigured = staticallyConfigured;
}
/**
@@ -146,6 +152,13 @@ public class CacheJoinNodeDiscoveryData implements Serializable {
return sql;
}
+ /**
+ * @return {@code true} if it was configured by static config and {@code false} otherwise.
+ */
+ public boolean isStaticallyConfigured() {
+ return staticallyConfigured;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(CacheInfo.class, this);
http://git-wip-us.apache.org/repos/asf/ignite/blob/bbc439b8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
index 2b2fb55..975617e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
@@ -35,31 +35,35 @@ import java.util.concurrent.ConcurrentMap;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.CacheExistsException;
+import org.apache.ignite.cache.QueryEntity;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.NearCacheConfiguration;
import org.apache.ignite.internal.GridCachePluginContext;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage;
import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState;
import org.apache.ignite.internal.processors.query.QuerySchema;
+import org.apache.ignite.internal.processors.query.QuerySchemaPatch;
import org.apache.ignite.internal.processors.query.QueryUtils;
import org.apache.ignite.internal.processors.query.schema.SchemaOperationException;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.util.lang.GridFunc;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.plugin.CachePluginContext;
import org.apache.ignite.plugin.CachePluginProvider;
import org.apache.ignite.plugin.PluginProvider;
import org.apache.ignite.spi.discovery.DiscoveryDataBag;
-import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.cache.CacheMode.LOCAL;
@@ -71,6 +75,9 @@ import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType
* Logic related to cache discovery data processing.
*/
class ClusterCachesInfo {
+ /** Version since which merge of config is supports. */
+ private static final IgniteProductVersion V_MERGE_CONFIG_SINCE = IgniteProductVersion.fromString("2.5.0");
+
/** */
private final GridKernalContext ctx;
@@ -987,54 +994,77 @@ class ClusterCachesInfo {
// CacheGroup configurations that were created from local node configuration.
Map<Integer, CacheGroupDescriptor> locCacheGrps = new HashMap<>(registeredCacheGroups());
- // Replace locally registered data with actual data received from cluster.
- registeredCaches.clear();
- registeredCacheGrps.clear();
- ctx.discovery().cleanCachesAndGroups();
+ //Replace locally registered data with actual data received from cluster.
+ cleanCachesAndGroups();
- for (CacheGroupData grpData : cachesData.cacheGroups().values()) {
- CacheGroupDescriptor grpDesc = new CacheGroupDescriptor(
- grpData.config(),
- grpData.groupName(),
- grpData.groupId(),
- grpData.receivedFrom(),
- grpData.startTopologyVersion(),
- grpData.deploymentId(),
- grpData.caches(),
- grpData.persistenceEnabled(),
- grpData.walEnabled(),
- grpData.walChangeRequests());
+ registerReceivedCacheGroups(cachesData, locCacheGrps);
- if (locCacheGrps.containsKey(grpDesc.groupId())) {
- CacheGroupDescriptor locGrpCfg = locCacheGrps.get(grpDesc.groupId());
+ registerReceivedCacheTemplates(cachesData);
- grpDesc.mergeWith(locGrpCfg);
- }
+ registerReceivedCaches(cachesData);
- CacheGroupDescriptor old = registeredCacheGrps.put(grpDesc.groupId(), grpDesc);
+ addReceivedClientNodesToDiscovery(cachesData);
- assert old == null : old;
+ String conflictErr = validateRegisteredCaches();
- ctx.discovery().addCacheGroup(grpDesc,
- grpData.config().getNodeFilter(),
- grpData.config().getCacheMode());
+ gridData = new GridData(joinDiscoData, cachesData, conflictErr);
+
+ if (cachesOnDisconnect == null || cachesOnDisconnect.clusterActive())
+ initStartCachesForLocalJoin(false, disconnectedState());
+ }
+
+ /**
+ * Validation {@link #registeredCaches} on conflicts.
+ *
+ * @return Error message if conflicts was found.
+ */
+ @Nullable private String validateRegisteredCaches() {
+ String conflictErr = null;
+
+ if (joinDiscoData != null) {
+ for (Map.Entry<String, CacheJoinNodeDiscoveryData.CacheInfo> e : joinDiscoData.caches().entrySet()) {
+ if (!registeredCaches.containsKey(e.getKey())) {
+ conflictErr = checkCacheConflict(e.getValue().cacheData().config());
+
+ if (conflictErr != null) {
+ conflictErr = "Failed to start configured cache due to conflict with started caches. " +
+ conflictErr;
+
+ break;
+ }
+ }
+ }
}
- for (CacheData cacheData : cachesData.templates().values()) {
- DynamicCacheDescriptor desc = new DynamicCacheDescriptor(
- ctx,
- cacheData.cacheConfiguration(),
- cacheData.cacheType(),
- null,
- true,
- cacheData.receivedFrom(),
- cacheData.staticallyConfigured(),
- false,
- cacheData.deploymentId(),
- cacheData.schema());
+ return conflictErr;
+ }
- registeredTemplates.put(cacheData.cacheConfiguration().getName(), desc);
+ /**
+ * Adding received client nodes to discovery if needed.
+ *
+ * @param cachesData Data received from cluster.
+ */
+ private void addReceivedClientNodesToDiscovery(CacheNodeCommonDiscoveryData cachesData) {
+ if (!F.isEmpty(cachesData.clientNodesMap())) {
+ for (Map.Entry<String, Map<UUID, Boolean>> entry : cachesData.clientNodesMap().entrySet()) {
+ String cacheName = entry.getKey();
+
+ for (Map.Entry<UUID, Boolean> tup : entry.getValue().entrySet())
+ ctx.discovery().addClientNode(cacheName, tup.getKey(), tup.getValue());
+ }
}
+ }
+
+ /**
+ * Register caches received from cluster.
+ *
+ * @param cachesData Data received from cluster.
+ */
+ private void registerReceivedCaches(CacheNodeCommonDiscoveryData cachesData) {
+ Map<DynamicCacheDescriptor, QuerySchemaPatch> patchesToApply = new HashMap<>();
+ Collection<DynamicCacheDescriptor> cachesToSave = new HashSet<>();
+
+ boolean hasSchemaPatchConflict = false;
for (CacheData cacheData : cachesData.caches().values()) {
CacheGroupDescriptor grpDesc = registeredCacheGrps.get(cacheData.groupId());
@@ -1053,7 +1083,22 @@ class ClusterCachesInfo {
cacheData.staticallyConfigured(),
cacheData.sql(),
cacheData.deploymentId(),
- cacheData.schema());
+ new QuerySchema(cacheData.schema().entities())
+ );
+
+ Collection<QueryEntity> localQueryEntities = getLocalQueryEntities(cfg.getName());
+
+ QuerySchemaPatch schemaPatch = desc.makeSchemaPatch(localQueryEntities);
+
+ if (schemaPatch.hasConflicts()) {
+ hasSchemaPatchConflict = true;
+
+ log.warning("Skipping apply patch because conflicts : " + schemaPatch.getConflictsMessage());
+ }
+ else if (!schemaPatch.isEmpty())
+ patchesToApply.put(desc, schemaPatch);
+ else if (!GridFunc.eqNotOrdered(desc.schema().entities(), localQueryEntities))
+ cachesToSave.add(desc); //received config is different of local config - need to resave
desc.receivedOnDiscovery(true);
@@ -1066,36 +1111,140 @@ class ClusterCachesInfo {
cfg.getNearConfiguration() != null);
}
- if (!F.isEmpty(cachesData.clientNodesMap())) {
- for (Map.Entry<String, Map<UUID, Boolean>> entry : cachesData.clientNodesMap().entrySet()) {
- String cacheName = entry.getKey();
+ updateRegisteredCachesIfNeeded(patchesToApply, cachesToSave, hasSchemaPatchConflict);
+ }
- for (Map.Entry<UUID, Boolean> tup : entry.getValue().entrySet())
- ctx.discovery().addClientNode(cacheName, tup.getKey(), tup.getValue());
+ /**
+ * Merging config or resaving it if it needed.
+ *
+ * @param patchesToApply Patches which need to apply.
+ * @param cachesToSave Caches which need to resave.
+ * @param hasSchemaPatchConflict {@code true} if we have conflict during making patch.
+ */
+ private void updateRegisteredCachesIfNeeded(Map<DynamicCacheDescriptor, QuerySchemaPatch> patchesToApply,
+ Collection<DynamicCacheDescriptor> cachesToSave, boolean hasSchemaPatchConflict) {
+ //Skip merge of config if least one conflict was found.
+ if (!hasSchemaPatchConflict && isMergeConfigSupports(ctx.discovery().localNode())) {
+ boolean isClusterActive = ctx.state().clusterState().active();
+
+ //Merge of config for cluster only for inactive grid.
+ if (!isClusterActive && !patchesToApply.isEmpty()) {
+ for (Map.Entry<DynamicCacheDescriptor, QuerySchemaPatch> entry : patchesToApply.entrySet()) {
+ if (entry.getKey().applySchemaPatch(entry.getValue()))
+ saveCacheConfiguration(entry.getKey());
+ }
+
+ for (DynamicCacheDescriptor descriptor : cachesToSave) {
+ saveCacheConfiguration(descriptor);
+ }
+ }
+ else if (patchesToApply.isEmpty()) {
+ for (DynamicCacheDescriptor descriptor : cachesToSave) {
+ saveCacheConfiguration(descriptor);
+ }
}
}
+ }
- String conflictErr = null;
+ /**
+ * Register cache templates received from cluster.
+ *
+ * @param cachesData Data received from cluster.
+ */
+ private void registerReceivedCacheTemplates(CacheNodeCommonDiscoveryData cachesData) {
+ for (CacheData cacheData : cachesData.templates().values()) {
+ DynamicCacheDescriptor desc = new DynamicCacheDescriptor(
+ ctx,
+ cacheData.cacheConfiguration(),
+ cacheData.cacheType(),
+ null,
+ true,
+ cacheData.receivedFrom(),
+ cacheData.staticallyConfigured(),
+ false,
+ cacheData.deploymentId(),
+ cacheData.schema());
- if (joinDiscoData != null) {
- for (Map.Entry<String, CacheJoinNodeDiscoveryData.CacheInfo> e : joinDiscoData.caches().entrySet()) {
- if (!registeredCaches.containsKey(e.getKey())) {
- conflictErr = checkCacheConflict(e.getValue().cacheData().config());
+ registeredTemplates.put(cacheData.cacheConfiguration().getName(), desc);
+ }
+ }
- if (conflictErr != null) {
- conflictErr = "Failed to start configured cache due to conflict with started caches. " +
- conflictErr;
+ /**
+ * Register cache groups received from cluster.
+ *
+ * @param cachesData Data received from cluster.
+ * @param locCacheGrps Current local cache groups.
+ */
+ private void registerReceivedCacheGroups(CacheNodeCommonDiscoveryData cachesData,
+ Map<Integer, CacheGroupDescriptor> locCacheGrps) {
+ for (CacheGroupData grpData : cachesData.cacheGroups().values()) {
+ CacheGroupDescriptor grpDesc = new CacheGroupDescriptor(
+ grpData.config(),
+ grpData.groupName(),
+ grpData.groupId(),
+ grpData.receivedFrom(),
+ grpData.startTopologyVersion(),
+ grpData.deploymentId(),
+ grpData.caches(),
+ grpData.persistenceEnabled(),
+ grpData.walEnabled(),
+ grpData.walChangeRequests());
- break;
- }
- }
+ if (locCacheGrps.containsKey(grpDesc.groupId())) {
+ CacheGroupDescriptor locGrpCfg = locCacheGrps.get(grpDesc.groupId());
+
+ grpDesc.mergeWith(locGrpCfg);
}
+
+ CacheGroupDescriptor old = registeredCacheGrps.put(grpDesc.groupId(), grpDesc);
+
+ assert old == null : old;
+
+ ctx.discovery().addCacheGroup(grpDesc,
+ grpData.config().getNodeFilter(),
+ grpData.config().getCacheMode());
}
+ }
- gridData = new GridData(joinDiscoData, cachesData, conflictErr);
+ /**
+ * Clean local registered caches and groups
+ */
+ private void cleanCachesAndGroups() {
+ registeredCaches.clear();
+ registeredCacheGrps.clear();
+ ctx.discovery().cleanCachesAndGroups();
+ }
- if (cachesOnDisconnect == null || cachesOnDisconnect.clusterActive())
- initStartCachesForLocalJoin(false, disconnectedState());
+ /**
+ * Save dynamic cache descriptor on disk.
+ *
+ * @param desc Cache to save.
+ */
+ private void saveCacheConfiguration(DynamicCacheDescriptor desc) {
+ try {
+ ctx.cache().saveCacheConfiguration(desc);
+ }
+ catch (IgniteCheckedException e) {
+ log.error("Error while saving cache configuration to disk, cfg = " + desc.cacheConfiguration(), e);
+ }
+ }
+
+ /**
+ * Get started node query entities by cacheName.
+ *
+ * @param cacheName Cache for which query entities will be returned.
+ * @return Local query entities.
+ */
+ private Collection<QueryEntity> getLocalQueryEntities(String cacheName) {
+ if (joinDiscoData == null)
+ return Collections.emptyList();
+
+ CacheJoinNodeDiscoveryData.CacheInfo cacheInfo = joinDiscoData.caches().get(cacheName);
+
+ if (cacheInfo == null)
+ return Collections.emptyList();
+
+ return cacheInfo.cacheData().queryEntities();
}
/**
@@ -1144,7 +1293,7 @@ class ClusterCachesInfo {
desc.staticallyConfigured(),
desc.sql(),
desc.deploymentId(),
- new QuerySchema(locCfg.cacheData().queryEntities()));
+ desc.schema().copy());
desc0.startTopologyVersion(desc.startTopologyVersion());
desc0.receivedFromStartVersion(desc.receivedFromStartVersion());
@@ -1385,26 +1534,14 @@ class ClusterCachesInfo {
* @return Configuration conflict error.
*/
private String processJoiningNode(CacheJoinNodeDiscoveryData joinData, UUID nodeId, boolean locJoin) {
- for (CacheJoinNodeDiscoveryData.CacheInfo cacheInfo : joinData.templates().values()) {
- CacheConfiguration<?, ?> cfg = cacheInfo.cacheData().config();
+ registerNewCacheTemplates(joinData, nodeId);
- if (!registeredTemplates.containsKey(cfg.getName())) {
- DynamicCacheDescriptor desc = new DynamicCacheDescriptor(ctx,
- cfg,
- cacheInfo.cacheType(),
- null,
- true,
- nodeId,
- true,
- false,
- joinData.cacheDeploymentId(),
- new QuerySchema(cacheInfo.cacheData().queryEntities()));
+ Map<DynamicCacheDescriptor, QuerySchemaPatch> patchesToApply = new HashMap<>();
- DynamicCacheDescriptor old = registeredTemplates.put(cfg.getName(), desc);
+ boolean hasSchemaPatchConflict = false;
+ boolean active = ctx.state().clusterState().active();
- assert old == null : old;
- }
- }
+ boolean isMergeConfigSupport = isMergeConfigSupports(null);
for (CacheJoinNodeDiscoveryData.CacheInfo cacheInfo : joinData.caches().values()) {
CacheConfiguration<?, ?> cfg = cacheInfo.cacheData().config();
@@ -1421,49 +1558,138 @@ class ClusterCachesInfo {
continue;
}
- int cacheId = CU.cacheId(cfg.getName());
+ registerNewCache(joinData, nodeId, cacheInfo);
+ }
+ else if (!active && isMergeConfigSupport) {
+ DynamicCacheDescriptor desc = registeredCaches.get(cfg.getName());
+
+ QuerySchemaPatch schemaPatch = desc.makeSchemaPatch(cacheInfo.cacheData().queryEntities());
- CacheGroupDescriptor grpDesc = registerCacheGroup(null,
- null,
- cfg,
- cacheId,
+ if (schemaPatch.hasConflicts()) {
+ hasSchemaPatchConflict = true;
+
+ log.error("Error during making schema patch : " + schemaPatch.getConflictsMessage());
+ }
+ else if (!schemaPatch.isEmpty() && !hasSchemaPatchConflict)
+ patchesToApply.put(desc, schemaPatch);
+ }
+
+ ctx.discovery().addClientNode(cfg.getName(), nodeId, cfg.getNearConfiguration() != null);
+ }
+
+ //If conflict was detected we don't merge config and we leave existed config.
+ if (!hasSchemaPatchConflict && !patchesToApply.isEmpty())
+ for(Map.Entry<DynamicCacheDescriptor, QuerySchemaPatch> entry: patchesToApply.entrySet()){
+ if (entry.getKey().applySchemaPatch(entry.getValue()))
+ saveCacheConfiguration(entry.getKey());
+ }
+
+ if (joinData.startCaches()) {
+ for (DynamicCacheDescriptor desc : registeredCaches.values()) {
+ ctx.discovery().addClientNode(desc.cacheName(),
nodeId,
- joinData.cacheDeploymentId());
+ desc.cacheConfiguration().getNearConfiguration() != null);
+ }
+ }
+
+ return null;
+ }
+
+ /**
+ * Register new cache received from joining node.
+ *
+ * @param joinData Data from joining node.
+ * @param nodeId Joining node id.
+ * @param cacheInfo Cache info of new node.
+ */
+ private void registerNewCache(
+ CacheJoinNodeDiscoveryData joinData,
+ UUID nodeId,
+ CacheJoinNodeDiscoveryData.CacheInfo cacheInfo) {
+ CacheConfiguration<?, ?> cfg = cacheInfo.cacheData().config();
+
+ int cacheId = CU.cacheId(cfg.getName());
+
+ CacheGroupDescriptor grpDesc = registerCacheGroup(null,
+ null,
+ cfg,
+ cacheId,
+ nodeId,
+ joinData.cacheDeploymentId());
+
+ ctx.discovery().setCacheFilter(
+ cacheId,
+ grpDesc.groupId(),
+ cfg.getName(),
+ cfg.getNearConfiguration() != null);
+
+ DynamicCacheDescriptor desc = new DynamicCacheDescriptor(ctx,
+ cfg,
+ cacheInfo.cacheType(),
+ grpDesc,
+ false,
+ nodeId,
+ cacheInfo.isStaticallyConfigured(),
+ cacheInfo.sql(),
+ joinData.cacheDeploymentId(),
+ new QuerySchema(cacheInfo.cacheData().queryEntities()));
+
+ DynamicCacheDescriptor old = registeredCaches.put(cfg.getName(), desc);
- ctx.discovery().setCacheFilter(
- cacheId,
- grpDesc.groupId(),
- cfg.getName(),
- cfg.getNearConfiguration() != null);
+ assert old == null : old;
+ }
+
+ /**
+ * Register new cache templates received from joining node.
+ *
+ * @param joinData Data from joining node.
+ * @param nodeId Joining node id.
+ */
+ private void registerNewCacheTemplates(CacheJoinNodeDiscoveryData joinData, UUID nodeId) {
+ for (CacheJoinNodeDiscoveryData.CacheInfo cacheInfo : joinData.templates().values()) {
+ CacheConfiguration<?, ?> cfg = cacheInfo.cacheData().config();
+ if (!registeredTemplates.containsKey(cfg.getName())) {
DynamicCacheDescriptor desc = new DynamicCacheDescriptor(ctx,
cfg,
cacheInfo.cacheType(),
- grpDesc,
- false,
+ null,
+ true,
nodeId,
true,
- cacheInfo.sql(),
+ false,
joinData.cacheDeploymentId(),
new QuerySchema(cacheInfo.cacheData().queryEntities()));
- DynamicCacheDescriptor old = registeredCaches.put(cfg.getName(), desc);
+ DynamicCacheDescriptor old = registeredTemplates.put(cfg.getName(), desc);
assert old == null : old;
}
-
- ctx.discovery().addClientNode(cfg.getName(), nodeId, cfg.getNearConfiguration() != null);
}
+ }
- if (joinData.startCaches()) {
- for (DynamicCacheDescriptor desc : registeredCaches.values()) {
- ctx.discovery().addClientNode(desc.cacheName(),
- nodeId,
- desc.cacheConfiguration().getNearConfiguration() != null);
- }
+ /**
+ * @return {@code true} if grid supports merge of config and {@code False} otherwise.
+ */
+ public boolean isMergeConfigSupports(ClusterNode joiningNode) {
+ DiscoCache discoCache = ctx.discovery().discoCache();
+
+ if (discoCache == null)
+ return true;
+
+ if (joiningNode != null && joiningNode.version().compareToIgnoreTimestamp(V_MERGE_CONFIG_SINCE) < 0)
+ return false;
+
+ Collection<ClusterNode> nodes = discoCache.allNodes();
+
+ for (ClusterNode node : nodes) {
+ IgniteProductVersion version = node.version();
+
+ if (version.compareToIgnoreTimestamp(V_MERGE_CONFIG_SINCE) < 0)
+ return false;
}
- return null;
+ return true;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/bbc439b8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
index cad8414..93882a2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
@@ -17,14 +17,17 @@
package org.apache.ignite.internal.processors.cache;
+import java.util.Collection;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.QueryEntity;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
import org.apache.ignite.internal.processors.query.QuerySchema;
+import org.apache.ignite.internal.processors.query.QuerySchemaPatch;
import org.apache.ignite.internal.processors.query.schema.message.SchemaFinishDiscoveryMessage;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.internal.CU;
@@ -346,6 +349,31 @@ public class DynamicCacheDescriptor {
}
/**
+ * Make schema patch for this cache.
+ *
+ * @param target Query entity list which current schema should be expanded to.
+ * @return Patch which contains operations for expanding schema of this cache.
+ * @see QuerySchemaPatch
+ */
+ public QuerySchemaPatch makeSchemaPatch(Collection<QueryEntity> target) {
+ synchronized (schemaMux) {
+ return schema.makePatch(target);
+ }
+ }
+
+ /**
+ * Apply query schema patch for changing current schema.
+ *
+ * @param patch patch to apply.
+ * @return {@code true} if applying was success and {@code false} otherwise.
+ */
+ public boolean applySchemaPatch(QuerySchemaPatch patch) {
+ synchronized (schemaMux) {
+ return schema.applyPatch(patch);
+ }
+ }
+
+ /**
* Form a {@link StoredCacheData} with all data to correctly restore cache params when its configuration is read
* from page store. Essentially, this method takes from {@link DynamicCacheDescriptor} all that's needed to start
* cache correctly, leaving out everything else.
http://git-wip-us.apache.org/repos/asf/ignite/blob/bbc439b8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 3aa6603..36edd72 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -117,6 +117,7 @@ import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState;
import org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor;
import org.apache.ignite.internal.processors.plugin.CachePluginManager;
import org.apache.ignite.internal.processors.query.QuerySchema;
+import org.apache.ignite.internal.processors.query.QuerySchemaPatch;
import org.apache.ignite.internal.processors.query.QueryUtils;
import org.apache.ignite.internal.processors.query.schema.SchemaExchangeWorkerTask;
import org.apache.ignite.internal.processors.query.schema.SchemaNodeLeaveExchangeWorkerTask;
@@ -183,6 +184,14 @@ import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isNearE
*/
@SuppressWarnings({"unchecked", "TypeMayBeWeakened", "deprecation"})
public class GridCacheProcessor extends GridProcessorAdapter {
+ /** Template of message of conflicts during configuration merge*/
+ private static final String MERGE_OF_CONFIG_CONFLICTS_MESSAGE =
+ "Conflicts during configuration merge for cache '%s' : \n%s";
+
+ /** Template of message of node join was fail because it requires to merge of config */
+ private static final String MERGE_OF_CONFIG_REQUIRED_MESSAGE = "Failed to join node to the active cluster " +
+ "(the config of the cache '%s' has to be merged which is impossible on active grid). " +
+ "Deactivate grid and retry node join or clean the joining node.";
/** */
private final boolean startClientCaches =
IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_START_CACHES_ON_JOIN, false);
@@ -742,15 +751,29 @@ public class GridCacheProcessor extends GridProcessorAdapter {
if (cacheType != CacheType.USER && cfg.getDataRegionName() == null)
cfg.setDataRegionName(sharedCtx.database().systemDateRegionName());
- if (!cacheType.userCache())
- stopSeq.addLast(cacheName);
- else
- stopSeq.addFirst(cacheName);
-
- caches.put(cacheName, new CacheJoinNodeDiscoveryData.CacheInfo(cacheData, cacheType, cacheData.sql(), 0));
+ addStoredCache(caches, cacheData, cacheName, cacheType, true);
}
else
- templates.put(cacheName, new CacheJoinNodeDiscoveryData.CacheInfo(cacheData, CacheType.USER, false, 0));
+ templates.put(cacheName, new CacheInfo(cacheData, CacheType.USER, false, 0, true));
+ }
+
+ /**
+ * Add stored cache data to caches storage.
+ *
+ * @param caches Cache storage.
+ * @param cacheData Cache data to add.
+ * @param cacheName Cache name.
+ * @param cacheType Cache type.
+ * @param isStaticalyConfigured Statically configured flag.
+ */
+ private void addStoredCache(Map<String, CacheInfo> caches, StoredCacheData cacheData, String cacheName,
+ CacheType cacheType, boolean isStaticalyConfigured) {
+ if (!cacheType.userCache())
+ stopSeq.addLast(cacheName);
+ else
+ stopSeq.addFirst(cacheName);
+
+ caches.put(cacheName, new CacheInfo(cacheData, cacheType, cacheData.sql(), 0, isStaticalyConfigured));
}
/**
@@ -774,6 +797,19 @@ public class GridCacheProcessor extends GridProcessorAdapter {
addCacheOnJoin(cfg, false, caches, templates);
}
+
+ if (CU.isPersistenceEnabled(ctx.config()) && ctx.cache().context().pageStore() != null) {
+ Map<String, StoredCacheData> storedCaches = ctx.cache().context().pageStore().readCacheConfigurations();
+
+ if (!F.isEmpty(storedCaches))
+ for (StoredCacheData storedCacheData : storedCaches.values()) {
+ String cacheName = storedCacheData.config().getName();
+
+ //Ignore stored caches if it already added by static config(static config has higher priority).
+ if (!caches.containsKey(cacheName))
+ addStoredCache(caches, storedCacheData, cacheName, cacheType(cacheName), false);
+ }
+ }
}
/**
@@ -2439,6 +2475,50 @@ public class GridCacheProcessor extends GridProcessorAdapter {
sharedCtx.walState().onCachesInfoCollected();
}
+ /** {@inheritDoc} */
+ @Nullable @Override public IgniteNodeValidationResult validateNode(
+ ClusterNode node, JoiningNodeDiscoveryData discoData
+ ) {
+ if(!cachesInfo.isMergeConfigSupports(node))
+ return null;
+
+ if (discoData.hasJoiningNodeData() && discoData.joiningNodeData() instanceof CacheJoinNodeDiscoveryData) {
+ CacheJoinNodeDiscoveryData nodeData = (CacheJoinNodeDiscoveryData)discoData.joiningNodeData();
+
+ boolean isGridActive = ctx.state().clusterState().active();
+
+ StringBuilder errorMessage = new StringBuilder();
+
+ for (CacheJoinNodeDiscoveryData.CacheInfo cacheInfo : nodeData.caches().values()) {
+ DynamicCacheDescriptor localDesc = cacheDescriptor(cacheInfo.cacheData().config().getName());
+
+ if (localDesc == null)
+ continue;
+
+ QuerySchemaPatch schemaPatch = localDesc.makeSchemaPatch(cacheInfo.cacheData().queryEntities());
+
+ if (schemaPatch.hasConflicts() || (isGridActive && !schemaPatch.isEmpty())) {
+ if (errorMessage.length() > 0)
+ errorMessage.append("\n");
+
+ if (schemaPatch.hasConflicts())
+ errorMessage.append(String.format(MERGE_OF_CONFIG_CONFLICTS_MESSAGE,
+ localDesc.cacheName(), schemaPatch.getConflictsMessage()));
+ else
+ errorMessage.append(String.format(MERGE_OF_CONFIG_REQUIRED_MESSAGE, localDesc.cacheName()));
+ }
+ }
+
+ if (errorMessage.length() > 0) {
+ String msg = errorMessage.toString();
+
+ return new IgniteNodeValidationResult(node.id(), msg, msg);
+ }
+ }
+
+ return null;
+ }
+
/**
* @param msg Message.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/bbc439b8/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
index 81a5b4e..2700a20 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
@@ -1035,7 +1035,13 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
", client=" + ctx.clientNode() +
", daemon" + ctx.isDaemon() + "]");
}
- IgniteCompute comp = ((ClusterGroupAdapter)ctx.cluster().get().forServers()).compute();
+
+ ClusterGroupAdapter clusterGroupAdapter = (ClusterGroupAdapter)ctx.cluster().get().forServers();
+
+ if (F.isEmpty(clusterGroupAdapter.nodes()))
+ return false;
+
+ IgniteCompute comp = clusterGroupAdapter.compute();
return comp.call(new IgniteCallable<Boolean>() {
@IgniteInstanceResource
http://git-wip-us.apache.org/repos/asf/ignite/blob/bbc439b8/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryField.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryField.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryField.java
index 882d816..d68a6cb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryField.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryField.java
@@ -61,6 +61,16 @@ public class QueryField implements Serializable {
* @param nullable Nullable flag.
* @param dfltValue Default value.
*/
+ public QueryField(String name, String typeName, boolean nullable, Object dfltValue) {
+ this(name, typeName, nullable, dfltValue, -1, -1);
+ }
+
+ /**
+ * @param name Field name.
+ * @param typeName Class name for this field's values.
+ * @param nullable Nullable flag.
+ * @param dfltValue Default value.
+ */
public QueryField(String name, String typeName, boolean nullable, Object dfltValue, int precision, int scale) {
this.name = name;
this.typeName = typeName;
http://git-wip-us.apache.org/repos/asf/ignite/blob/bbc439b8/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QuerySchema.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QuerySchema.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QuerySchema.java
index 5cbae29..569a02e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QuerySchema.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QuerySchema.java
@@ -20,12 +20,14 @@ package org.apache.ignite.internal.processors.query;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import java.util.Set;
-
import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.QueryEntityPatch;
import org.apache.ignite.cache.QueryIndex;
import org.apache.ignite.internal.processors.query.schema.message.SchemaFinishDiscoveryMessage;
import org.apache.ignite.internal.processors.query.schema.operation.SchemaAbstractOperation;
@@ -85,14 +87,90 @@ public class QuerySchema implements Serializable {
}
/**
+ * Make query schema patch.
+ *
+ * @param target Query entity list to which current schema should be expanded.
+ * @return Patch to achieve entity which is a result of merging current one and target.
+ * @see QuerySchemaPatch
+ */
+ public QuerySchemaPatch makePatch(Collection<QueryEntity> target) {
+ synchronized (mux) {
+ Map<String, QueryEntity> localEntities = new HashMap<>();
+
+ for (QueryEntity entity : entities) {
+ if (localEntities.put(entity.getTableName(), entity) != null)
+ throw new IllegalStateException("Duplicate key");
+ }
+
+ Collection<SchemaAbstractOperation> patchOperations = new ArrayList<>();
+ Collection<QueryEntity> entityToAdd = new ArrayList<>();
+
+ StringBuilder conflicts = new StringBuilder();
+
+ for (QueryEntity queryEntity : target) {
+ if (localEntities.containsKey(queryEntity.getTableName())) {
+ QueryEntity localEntity = localEntities.get(queryEntity.getTableName());
+
+ QueryEntityPatch entityPatch = localEntity.makePatch(queryEntity);
+
+ if (entityPatch.hasConflict()) {
+ if (conflicts.length() > 0)
+ conflicts.append("\n");
+
+ conflicts.append(entityPatch.getConflictsMessage());
+ }
+
+ if (!entityPatch.isEmpty())
+ patchOperations.addAll(entityPatch.getPatchOperations());
+ }
+ else
+ entityToAdd.add(QueryUtils.copy(queryEntity));
+ }
+
+ return new QuerySchemaPatch(patchOperations, entityToAdd, conflicts.toString());
+ }
+ }
+
+ /**
+ * Apply query schema patch for changing this schema.
+ *
+ * @param patch Patch to apply.
+ * @return {@code true} if applying was success and {@code false} otherwise.
+ */
+ public boolean applyPatch(QuerySchemaPatch patch) {
+ synchronized (mux) {
+ if (patch.hasConflicts())
+ return false;
+
+ if (patch.isEmpty())
+ return true;
+
+ for (SchemaAbstractOperation operation : patch.getPatchOperations()) {
+ finish(operation);
+ }
+
+ entities.addAll(patch.getEntityToAdd());
+
+ return true;
+ }
+ }
+
+ /**
* Process finish message.
*
* @param msg Message.
*/
public void finish(SchemaFinishDiscoveryMessage msg) {
- synchronized (mux) {
- SchemaAbstractOperation op = msg.operation();
+ finish(msg.operation());
+ }
+ /**
+ * Process operation.
+ *
+ * @param op Operation for handle.
+ */
+ public void finish(SchemaAbstractOperation op) {
+ synchronized (mux) {
if (op instanceof SchemaIndexCreateOperation) {
SchemaIndexCreateOperation op0 = (SchemaIndexCreateOperation)op;
http://git-wip-us.apache.org/repos/asf/ignite/blob/bbc439b8/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QuerySchemaPatch.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QuerySchemaPatch.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QuerySchemaPatch.java
new file mode 100644
index 0000000..68beb04
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QuerySchemaPatch.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query;
+
+import java.util.Collection;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.QueryEntityPatch;
+import org.apache.ignite.internal.processors.query.schema.operation.SchemaAbstractOperation;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Query schema patch which contains {@link SchemaAbstractOperation} operations for changing query entities.
+ * This patch is high level path on {@link org.apache.ignite.cache.QueryEntityPatch} but
+ * it has operations for all {@link QueryEntity} in schema
+ * and also contains {@link QueryEntity} for adding to schema by whole.
+ *
+ * @see org.apache.ignite.cache.QueryEntityPatch
+ */
+public class QuerySchemaPatch {
+ /** Message which described conflicts during creating this patch. */
+ private String conflictsMessage;
+
+ /** Operations for modification query entity. */
+ private Collection<SchemaAbstractOperation> patchOperations;
+
+ /** Entities which should be added by whole. */
+ private Collection<QueryEntity> entityToAdd;
+
+ /**
+ * Create patch.
+ */
+ public QuerySchemaPatch(
+ @NotNull Collection<SchemaAbstractOperation> patchOperations,
+ @NotNull Collection<QueryEntity> entityToAdd,
+ String conflictsMessage) {
+ this.patchOperations = patchOperations;
+ this.entityToAdd = entityToAdd;
+ this.conflictsMessage = conflictsMessage;
+ }
+
+ /**
+ * @return {@code true} if patch has conflict.
+ */
+ public boolean hasConflicts() {
+ return conflictsMessage != null && !conflictsMessage.isEmpty();
+ }
+
+ /**
+ * @return Conflicts message.
+ */
+ public String getConflictsMessage() {
+ return conflictsMessage;
+ }
+
+ /**
+ * @return {@code true} if patch is empty and can't be applying.
+ */
+ public boolean isEmpty() {
+ return patchOperations.isEmpty() && entityToAdd.isEmpty();
+ }
+
+ /**
+ * @return Patch operations for applying.
+ */
+ @NotNull public Collection<SchemaAbstractOperation> getPatchOperations() {
+ return patchOperations;
+ }
+
+ /**
+ * @return Entities which should be added by whole.
+ */
+ @NotNull public Collection<QueryEntity> getEntityToAdd() {
+ return entityToAdd;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(QuerySchemaPatch.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/bbc439b8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClusterActivateDeactivateTestWithPersistence.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClusterActivateDeactivateTestWithPersistence.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClusterActivateDeactivateTestWithPersistence.java
index 58511ee..8bae136 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClusterActivateDeactivateTestWithPersistence.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClusterActivateDeactivateTestWithPersistence.java
@@ -23,12 +23,13 @@ import java.util.Map;
import java.util.concurrent.CountDownLatch;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
-import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.testframework.GridTestUtils;
@@ -253,22 +254,15 @@ public class IgniteClusterActivateDeactivateTestWithPersistence extends IgniteCl
ccfg.setGroupName(DEFAULT_CACHE_NAME);
- ccfgs = new CacheConfiguration[]{ccfg};
-
- startGrids(SRVS);
+ ccfgs = new CacheConfiguration[] {ccfg};
try {
- ignite(0).active(true);
+ startGrids(SRVS);
fail();
}
- catch (IgniteException e) {
- // Expected error.
+ catch (IgniteCheckedException e) {
+ assertTrue(X.getCause(e).getMessage().contains("Failed to start configured cache."));
}
-
- for (int i = 0; i < SRVS; i++)
- assertFalse(ignite(i).active());
-
- checkNoCaches(SRVS);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/bbc439b8/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicSqlRestoreTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicSqlRestoreTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicSqlRestoreTest.java
new file mode 100644
index 0000000..f7dc7b4
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicSqlRestoreTest.java
@@ -0,0 +1,529 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.NotNull;
+
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+/**
+ *
+ */
+public class IgniteDynamicSqlRestoreTest extends GridCommonAbstractTest implements Serializable {
+
+ public static final String TEST_CACHE_NAME = "test";
+ public static final String TEST_INDEX_OBJECT = "TestIndexObject";
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.setAutoActivationEnabled(false);
+
+ DataStorageConfiguration memCfg = new DataStorageConfiguration()
+ .setDefaultDataRegionConfiguration(
+ new DataRegionConfiguration().setMaxSize(200 * 1024 * 1024).setPersistenceEnabled(true))
+ .setWalMode(WALMode.LOG_ONLY);
+
+ cfg.setDataStorageConfiguration(memCfg);
+
+ cfg.setConsistentId(gridName);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ stopAllGrids();
+
+ cleanPersistenceDir();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+
+ cleanPersistenceDir();
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ public void testMergeChangedConfigOnCoordinator() throws Exception {
+ {
+ //given: two started nodes with test table
+ Ignite ig = startGrid(0);
+ startGrid(1);
+
+ ig.cluster().active(true);
+
+ IgniteCache cache = ig.getOrCreateCache(getTestTableConfiguration());
+
+ fillTestData(ig);
+
+ //when: stop one node and create indexes on other node
+ stopGrid(1);
+
+ cache.query(new SqlFieldsQuery("create index myindexa on TestIndexObject(a)")).getAll();
+ cache.query(new SqlFieldsQuery("create index myindexb on TestIndexObject(b)")).getAll();
+ cache.query(new SqlFieldsQuery("alter table TestIndexObject add column (c int)")).getAll();
+
+ //and: stop all grid
+ stopAllGrids();
+ }
+
+ {
+ //and: start cluster from node without index
+ IgniteEx ig = startGrid(1);
+ startGrid(0);
+
+ ig.cluster().active(true);
+
+ //and: change data
+ try (IgniteDataStreamer<Object, Object> s = ig.dataStreamer(TEST_CACHE_NAME)) {
+ s.allowOverwrite(true);
+ for (int i = 0; i < 5_000; i++)
+ s.addData(i, null);
+ }
+
+ stopAllGrids();
+ }
+
+ {
+ //when: start node from first node
+ IgniteEx ig0 = startGrid(0);
+ IgniteEx ig1 = startGrid(1);
+
+ ig0.cluster().active(true);
+
+ //then: everything is ok
+ try (IgniteDataStreamer<Object, Object> s = ig1.dataStreamer(TEST_CACHE_NAME)) {
+ s.allowOverwrite(true);
+ for (int i = 0; i < 50_000; i++) {
+ BinaryObject bo = ig1.binary().builder(TEST_INDEX_OBJECT)
+ .setField("a", i, Object.class)
+ .setField("b", String.valueOf(i), Object.class)
+ .setField("c", i, Object.class)
+ .build();
+
+ s.addData(i, bo);
+ }
+ }
+
+ IgniteCache<Object, Object> cache = ig1.cache(TEST_CACHE_NAME);
+
+ assertThat(doExplainPlan(cache, "explain select * from TestIndexObject where a > 5"), containsString("myindexa"));
+ assertFalse(cache.query(new SqlFieldsQuery("SELECT a,b,c FROM TestIndexObject limit 1")).getAll().isEmpty());
+ }
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ public void testTakeConfigFromJoiningNodeOnInactiveGrid() throws Exception {
+ {
+ //given: two started nodes with test table
+ Ignite ig = startGrid(0);
+ startGrid(1);
+
+ ig.cluster().active(true);
+
+ IgniteCache cache = ig.getOrCreateCache(getTestTableConfiguration());
+
+ fillTestData(ig);
+
+ stopGrid(1);
+
+ cache.query(new SqlFieldsQuery("create index myindexa on TestIndexObject(a)")).getAll();
+ cache.query(new SqlFieldsQuery("create index myindexb on TestIndexObject(b)")).getAll();
+ cache.query(new SqlFieldsQuery("alter table TestIndexObject add column (c int)")).getAll();
+
+ stopAllGrids();
+ }
+
+ {
+ //and: start cluster from node without cache
+ IgniteEx ig = startGrid(1);
+ startGrid(0);
+
+ ig.cluster().active(true);
+
+ //then: config for cache was applying successful
+ IgniteCache<Object, Object> cache = ig.cache(TEST_CACHE_NAME);
+
+ assertThat(doExplainPlan(cache, "explain select * from TestIndexObject where a > 5"), containsString("myindexa"));
+ assertFalse(cache.query(new SqlFieldsQuery("SELECT a,b,c FROM TestIndexObject limit 1")).getAll().isEmpty());
+ }
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ public void testResaveConfigAfterMerge() throws Exception {
+ {
+ //given: two started nodes with test table
+ Ignite ig = startGrid(0);
+ startGrid(1);
+
+ ig.cluster().active(true);
+
+ IgniteCache cache = ig.getOrCreateCache(getTestTableConfiguration());
+
+ fillTestData(ig);
+
+ stopGrid(1);
+
+ cache.query(new SqlFieldsQuery("create index myindexa on TestIndexObject(a)")).getAll();
+ cache.query(new SqlFieldsQuery("create index myindexb on TestIndexObject(b)")).getAll();
+ cache.query(new SqlFieldsQuery("alter table TestIndexObject add column (c int)")).getAll();
+
+ stopAllGrids();
+ }
+
+ {
+ //when: start cluster from node without cache
+ IgniteEx ig = startGrid(1);
+ startGrid(0);
+
+ ig.cluster().active(true);
+
+ stopAllGrids();
+ }
+
+ {
+ //then: start only one node which originally was without index
+ IgniteEx ig = startGrid(1);
+
+ ig.cluster().active(true);
+
+ IgniteCache<Object, Object> cache = ig.cache(TEST_CACHE_NAME);
+
+ assertThat(doExplainPlan(cache, "explain select * from TestIndexObject where a > 5"), containsString("myindexa"));
+ assertFalse(cache.query(new SqlFieldsQuery("SELECT a,b,c FROM TestIndexObject limit 1")).getAll().isEmpty());
+ }
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ public void testMergeChangedConfigOnInactiveGrid() throws Exception {
+ {
+ //given: two started nodes with test table
+ Ignite ig = startGrid(0);
+ startGrid(1);
+
+ ig.cluster().active(true);
+ LinkedHashMap<String, String> fields = new LinkedHashMap<>();
+ fields.put("A", "java.lang.Integer");
+ fields.put("B", "java.lang.String");
+
+ CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(TEST_CACHE_NAME);
+
+ ccfg.setQueryEntities(Arrays.asList(
+ new QueryEntity()
+ .setKeyType("java.lang.Integer")
+ .setValueType("TestIndexObject")
+ .setFields(fields)
+ ));
+
+ IgniteCache cache = ig.getOrCreateCache(ccfg);
+
+ fillTestData(ig);
+
+ cache.query(new SqlFieldsQuery("create index myindexb on TestIndexObject(b)")).getAll();
+
+ //and: stop one node and create index on other node
+ stopGrid(1);
+
+ cache.query(new SqlFieldsQuery("create index myindexa on TestIndexObject(a)")).getAll();
+ cache.query(new SqlFieldsQuery("drop index myindexb")).getAll();
+ cache.query(new SqlFieldsQuery("alter table TestIndexObject drop column b")).getAll();
+
+ //and: stop all grid
+ stopAllGrids();
+ }
+
+ {
+ //and: start cluster
+ IgniteEx ig0 = startGrid(0);
+ IgniteEx ig1 = startGrid(1);
+
+ ig0.cluster().active(true);
+
+ //then: config should be merged
+ try (IgniteDataStreamer<Object, Object> s = ig1.dataStreamer(TEST_CACHE_NAME)) {
+ s.allowOverwrite(true);
+ for (int i = 0; i < 5_000; i++) {
+ BinaryObject bo = ig1.binary().builder("TestIndexObject")
+ .setField("a", i, Object.class)
+ .setField("b", String.valueOf(i), Object.class)
+ .build();
+
+ s.addData(i, bo);
+ }
+ }
+ IgniteCache<Object, Object> cache = ig1.cache(TEST_CACHE_NAME);
+
+ //then: index "myindexa" and column "b" restored from node "1"
+ assertThat(doExplainPlan(cache, "explain select * from TestIndexObject where a > 5"), containsString("myindexa"));
+ assertThat(doExplainPlan(cache, "explain select * from TestIndexObject where b > 5"), containsString("myindexb"));
+ assertFalse(cache.query(new SqlFieldsQuery("SELECT a,b FROM TestIndexObject limit 1")).getAll().isEmpty());
+ }
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ public void testTakeChangedConfigOnActiveGrid() throws Exception {
+ {
+ //given: two started nodes with test table
+ Ignite ig = startGrid(0);
+ startGrid(1);
+
+ ig.cluster().active(true);
+
+ IgniteCache cache = ig.getOrCreateCache(getTestTableConfiguration());
+
+ fillTestData(ig);
+
+ //stop one node and create index on other node
+ stopGrid(1);
+
+ cache.query(new SqlFieldsQuery("create index myindexa on TestIndexObject(a)")).getAll();
+ cache.query(new SqlFieldsQuery("create index myindexb on TestIndexObject(b)")).getAll();
+ cache.query(new SqlFieldsQuery("alter table TestIndexObject add column (c int)")).getAll();
+
+ stopAllGrids();
+ }
+
+ {
+ //and: start cluster
+ IgniteEx ig = startGrid(0);
+ ig.cluster().active(true);
+
+ ig = startGrid(1);
+
+ //then: config should be merged
+ try (IgniteDataStreamer<Object, Object> s = ig.dataStreamer(TEST_CACHE_NAME)) {
+ s.allowOverwrite(true);
+ for (int i = 0; i < 5_000; i++) {
+ BinaryObject bo = ig.binary().builder("TestIndexObject")
+ .setField("a", i, Object.class)
+ .setField("b", String.valueOf(i), Object.class)
+ .setField("c", i, Object.class)
+ .build();
+
+ s.addData(i, bo);
+ }
+ }
+ IgniteCache<Object, Object> cache = ig.getOrCreateCache(TEST_CACHE_NAME);
+
+ cache.indexReadyFuture().get();
+
+ assertThat(doExplainPlan(cache, "explain select * from TestIndexObject where a > 5"), containsString("myindexa"));
+ assertFalse(cache.query(new SqlFieldsQuery("SELECT a,b,c FROM TestIndexObject limit 1")).getAll().isEmpty());
+ }
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ public void testFailJoiningNodeBecauseDifferentSql() throws Exception {
+ {
+ //given: two started nodes with test table
+ Ignite ig = startGrid(0);
+ startGrid(1);
+
+ ig.cluster().active(true);
+
+ IgniteCache cache = ig.getOrCreateCache(getTestTableConfiguration());
+
+ cache.query(new SqlFieldsQuery("create index myindexa on TestIndexObject(a)")).getAll();
+
+ //stop one node and create index on other node
+ stopGrid(1);
+
+ cache.query(new SqlFieldsQuery("drop index myindexa")).getAll();
+ cache.query(new SqlFieldsQuery("alter table TestIndexObject drop column b")).getAll();
+ cache.query(new SqlFieldsQuery("alter table TestIndexObject add column (b int)")).getAll();
+ cache.query(new SqlFieldsQuery("create index myindexa on TestIndexObject(b)")).getAll();
+
+ //and: stopped all grid
+ stopAllGrids();
+ }
+
+ {
+ //and: start cluster
+ startGrid(0);
+ try {
+ startGrid(1);
+
+ fail("Node should start with fail");
+ }
+ catch (Exception e) {
+ String cause = X.cause(e, IgniteSpiException.class).getMessage();
+ assertThat(cause, containsString("fieldType of B is different"));
+ assertThat(cause, containsString("index MYINDEXA is different"));
+ }
+ }
+
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ public void testFailJoiningNodeBecauseFieldInlineSizeIsDifferent() throws Exception {
+ {
+ //given: two started nodes with test table
+ Ignite ig = startGrid(0);
+ startGrid(1);
+
+ ig.cluster().active(true);
+
+ IgniteCache cache = ig.getOrCreateCache(getTestTableConfiguration());
+
+ cache.query(new SqlFieldsQuery("create index myindexa on TestIndexObject(a) INLINE_SIZE 1000")).getAll();
+
+ //stop one node and create index on other node
+ stopGrid(1);
+
+ cache.query(new SqlFieldsQuery("drop index myindexa")).getAll();
+ cache.query(new SqlFieldsQuery("create index myindexa on TestIndexObject(a) INLINE_SIZE 2000")).getAll();
+
+ //and: stopped all grid
+ stopAllGrids();
+ }
+
+ {
+ //and: start cluster
+ startGrid(0);
+ try {
+ startGrid(1);
+
+ fail("Node should start with fail");
+ }
+ catch (Exception e) {
+ assertThat(X.cause(e, IgniteSpiException.class).getMessage(), containsString("index MYINDEXA is different"));
+ }
+ }
+
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ public void testFailJoiningNodeBecauseNeedConfigUpdateOnActiveGrid() throws Exception {
+ {
+ startGrid(0);
+ startGrid(1);
+
+ CacheConfiguration<Object, Object> ccfg = getTestTableConfiguration();
+
+ Ignite ig = ignite(0);
+
+ ig.cluster().active(true);
+
+ IgniteCache cache = ig.getOrCreateCache(ccfg);
+
+ fillTestData(ig);
+
+ stopGrid(1);
+
+ cache.query(new SqlFieldsQuery("create index myindexa on TestIndexObject(a)")).getAll();
+ cache.query(new SqlFieldsQuery("create index myindexb on TestIndexObject(b)")).getAll();
+
+ stopGrid(0);
+ }
+
+ {
+ IgniteEx ig = startGrid(1);
+ ig.cluster().active(true);
+
+ try {
+ startGrid(0);
+
+ fail("Node should start with fail");
+ }
+ catch (Exception e) {
+ assertThat(X.cause(e, IgniteSpiException.class).getMessage(), containsString("Failed to join node to the active cluster"));
+ }
+ }
+ }
+
+ /**
+ * @return result of explain plan
+ */
+ @NotNull private String doExplainPlan(IgniteCache<Object, Object> cache, String sql) {
+ return cache.query(new SqlFieldsQuery(sql)).getAll().get(0).get(0).toString().toLowerCase();
+ }
+
+ /**
+ * fill data by default
+ */
+ private void fillTestData(Ignite ig) {
+ try (IgniteDataStreamer<Object, Object> s = ig.dataStreamer(TEST_CACHE_NAME)) {
+ for (int i = 0; i < 50_000; i++) {
+ BinaryObject bo = ig.binary().builder("TestIndexObject")
+ .setField("a", i, Object.class)
+ .setField("b", String.valueOf(i), Object.class)
+ .build();
+
+ s.addData(i, bo);
+ }
+ }
+ }
+
+ /**
+ * @return cache configuration with test table
+ */
+ @NotNull private CacheConfiguration<Object, Object> getTestTableConfiguration() {
+ LinkedHashMap<String, String> fields = new LinkedHashMap<>();
+ fields.put("a", "java.lang.Integer");
+ fields.put("B", "java.lang.String");
+
+ CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(TEST_CACHE_NAME);
+
+ ccfg.setQueryEntities(Collections.singletonList(
+ new QueryEntity()
+ .setKeyType("java.lang.Integer")
+ .setValueType("TestIndexObject")
+ .setFields(fields)
+ ));
+ return ccfg;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/bbc439b8/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicColumnsAbstractConcurrentSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicColumnsAbstractConcurrentSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicColumnsAbstractConcurrentSelfTest.java
index dcb3722..3f09062 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicColumnsAbstractConcurrentSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicColumnsAbstractConcurrentSelfTest.java
@@ -315,7 +315,7 @@ public abstract class DynamicColumnsAbstractConcurrentSelfTest extends DynamicCo
* @throws Exception If failed.
*/
private void checkNodeJoinOnPendingOperation(boolean addOrRemove) throws Exception {
- CountDownLatch finishLatch = new CountDownLatch(4);
+ CountDownLatch finishLatch = new CountDownLatch(3);
IgniteEx srv1 = ignitionStart(serverConfiguration(1), finishLatch);
@@ -334,7 +334,6 @@ public abstract class DynamicColumnsAbstractConcurrentSelfTest extends DynamicCo
ignitionStart(serverConfiguration(2), finishLatch);
ignitionStart(serverConfiguration(3, true), finishLatch);
- ignitionStart(clientConfiguration(4), finishLatch);
assertFalse(idxFut.isDone());
http://git-wip-us.apache.org/repos/asf/ignite/blob/bbc439b8/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
index 619e7cf..68ff465 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
@@ -74,6 +74,7 @@ import org.apache.ignite.internal.processors.cache.IgniteCacheSqlQueryErrorSelfT
import org.apache.ignite.internal.processors.cache.IgniteCacheUpdateSqlQuerySelfTest;
import org.apache.ignite.internal.processors.cache.IgniteCheckClusterStateBeforeExecuteQueryTest;
import org.apache.ignite.internal.processors.cache.IgniteCrossCachesJoinsQueryTest;
+import org.apache.ignite.internal.processors.cache.IgniteDynamicSqlRestoreTest;
import org.apache.ignite.internal.processors.cache.IncorrectQueryEntityTest;
import org.apache.ignite.internal.processors.cache.QueryEntityCaseMismatchTest;
import org.apache.ignite.internal.processors.cache.SqlFieldsQuerySelfTest;
@@ -230,6 +231,7 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite {
// Config.
suite.addTestSuite(IgniteCacheDuplicateEntityConfigurationSelfTest.class);
suite.addTestSuite(IncorrectQueryEntityTest.class);
+ suite.addTestSuite(IgniteDynamicSqlRestoreTest.class);
// Queries tests.
suite.addTestSuite(LazyQuerySelfTest.class);