You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by jo...@apache.org on 2019/04/17 08:28:12 UTC
[ignite] branch master updated: IGNITE-1903 Do not deserialize
cache configurations on non-affinity nodes - Fixes #6393.
This is an automated email from the ASF dual-hosted git repository.
jokser pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new d47221d IGNITE-1903 Do not deserialize cache configurations on non-affinity nodes - Fixes #6393.
d47221d is described below
commit d47221daa40a0ccc10ed2b9ae16f658065b27993
Author: Pavel Kovalenko <jo...@gmail.com>
AuthorDate: Wed Apr 17 11:26:30 2019 +0300
IGNITE-1903 Do not deserialize cache configurations on non-affinity nodes - Fixes #6393.
Signed-off-by: Pavel Kovalenko <jo...@gmail.com>
---
.../ignite/configuration/CacheConfiguration.java | 4 +
.../configuration/NearCacheConfiguration.java | 1 +
.../ignite/configuration/SerializeSeparately.java | 34 +++
.../org/apache/ignite/internal/IgniteFeatures.java | 5 +-
.../cache/CacheConfigurationEnricher.java | 172 +++++++++++
.../cache/CacheConfigurationEnrichment.java | 89 ++++++
.../cache/CacheConfigurationSplitter.java | 52 ++++
.../cache/CacheConfigurationSplitterImpl.java | 118 ++++++++
.../cache/CacheConfigurationSplitterOldFormat.java | 60 ++++
.../internal/processors/cache/CacheData.java | 15 +-
.../internal/processors/cache/CacheGroupData.java | 24 +-
.../processors/cache/CacheGroupDescriptor.java | 41 ++-
.../cache/CacheJoinNodeDiscoveryData.java | 3 +-
.../internal/processors/cache/CachesRegistry.java | 2 +-
.../processors/cache/ClusterCachesInfo.java | 121 ++++++--
.../cache/DynamicCacheChangeRequest.java | 25 +-
.../processors/cache/DynamicCacheDescriptor.java | 54 +++-
.../processors/cache/GridCacheAdapter.java | 19 +-
.../processors/cache/GridCacheAttributes.java | 34 ++-
.../processors/cache/GridCacheProcessor.java | 186 +++++++++---
.../internal/processors/cache/StoredCacheData.java | 66 ++++
.../cluster/GridClusterStateProcessor.java | 17 +-
.../processors/cache/CacheComparatorTest.java | 4 +-
...eConfigurationSerializationOnDiscoveryTest.java | 336 +++++++++++++++++++++
...heConfigurationSerializationOnExchangeTest.java | 276 +++++++++++++++++
.../CacheStoreUsageMultinodeAbstractTest.java | 3 +-
.../cache/GridCacheAbstractFullApiSelfTest.java | 10 +-
...CacheConfigurationFileConsistencyCheckTest.java | 6 +-
.../testframework/junits/GridAbstractTest.java | 14 +-
.../ignite/testsuites/IgniteCacheTestSuite7.java | 5 +
.../query/IgniteSqlNotNullConstraintTest.java | 7 +
.../Store/CacheStoreSessionTestSharedFactory.cs | 2 +-
...gniteSpringBeanSpringResourceInjectionTest.java | 6 +-
33 files changed, 1697 insertions(+), 114 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
index ccbf35b..9b5e0d2 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
@@ -215,6 +215,7 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
private EvictionPolicy evictPlc;
/** Cache eviction policy factory. */
+ @SerializeSeparately
private Factory evictPlcFactory;
/** */
@@ -227,6 +228,7 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
private int sqlOnheapCacheMaxSize = DFLT_SQL_ONHEAP_CACHE_MAX_SIZE;
/** Eviction filter. */
+ @SerializeSeparately
private EvictionFilter<?, ?> evictFilter;
/** Eager ttl flag. */
@@ -245,6 +247,7 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
private CacheWriteSynchronizationMode writeSync;
/** */
+ @SerializeSeparately
private Factory storeFactory;
/** */
@@ -360,6 +363,7 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
private TopologyValidator topValidator;
/** Cache store session listeners. */
+ @SerializeSeparately
private Factory<? extends CacheStoreSessionListener>[] storeSesLsnrs;
/** Query entities. */
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/NearCacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/NearCacheConfiguration.java
index ae6e98b..322d042 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/NearCacheConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/NearCacheConfiguration.java
@@ -43,6 +43,7 @@ public class NearCacheConfiguration<K, V> implements Serializable {
private EvictionPolicy<K, V> nearEvictPlc;
/** Near cache eviction policy factory. */
+ @SerializeSeparately
private Factory nearEvictPlcFactory;
/** Default near cache start size. */
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/SerializeSeparately.java b/modules/core/src/main/java/org/apache/ignite/configuration/SerializeSeparately.java
new file mode 100644
index 0000000..6c7df56
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/SerializeSeparately.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.configuration;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * This annotation is needed to mark fields of {@link CacheConfiguration} to serialize them separately
+ * during sending over network or storing to disk.
+ *
+ * Such fields are deserialized only during cache start process.
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.FIELD)
+public @interface SerializeSeparately {
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java
index 07df7ea..6ab8863 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java
@@ -43,7 +43,10 @@ public enum IgniteFeatures {
DATA_PACKET_COMPRESSION(3),
/** Support of different rebalance size for nodes. */
- DIFFERENT_REBALANCE_POOL_SIZE(4);
+ DIFFERENT_REBALANCE_POOL_SIZE(4),
+
+ /** Support of splitted cache configurations to avoid broken deserialization on non-affinity nodes. */
+ SPLITTED_CACHE_CONFIGURATIONS(5);
/**
* Unique feature identifier.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheConfigurationEnricher.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheConfigurationEnricher.java
new file mode 100644
index 0000000..788198a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheConfigurationEnricher.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.lang.reflect.Field;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.SerializeSeparately;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.marshaller.Marshaller;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public class CacheConfigurationEnricher {
+ /** Marshaller. */
+ private final Marshaller marshaller;
+
+ /** Class loader. */
+ private final ClassLoader clsLdr;
+
+ /**
+ * @param marshaller Marshaller.
+ * @param clsLdr Class loader.
+ */
+ public CacheConfigurationEnricher(Marshaller marshaller, ClassLoader clsLdr) {
+ this.marshaller = marshaller;
+ this.clsLdr = clsLdr;
+ }
+
+ /**
+ * Enriches descriptor cache configuration with stored enrichment.
+ *
+ * @param desc Description.
+ * @param affinityNode Affinity node.
+ */
+ public DynamicCacheDescriptor enrich(DynamicCacheDescriptor desc, boolean affinityNode) {
+ if (CU.isUtilityCache(desc.cacheName()))
+ return desc;
+
+ if (desc.isConfigurationEnriched())
+ return desc;
+
+ CacheConfiguration<?, ?> enrichedCfg = enrich(
+ desc.cacheConfiguration(), desc.cacheConfigurationEnrichment(), affinityNode);
+
+ desc.cacheConfiguration(enrichedCfg);
+
+ desc.configurationEnriched(true);
+
+ return desc;
+ }
+
+ /**
+ * Enriches descriptor cache configuration with stored enrichment.
+ *
+ * @param desc Description.
+ * @param affinityNode Affinity node.
+ */
+ public CacheGroupDescriptor enrich(CacheGroupDescriptor desc, boolean affinityNode) {
+ if (CU.isUtilityCache(desc.cacheOrGroupName()))
+ return desc;
+
+ if (desc.isConfigurationEnriched())
+ return desc;
+
+ CacheConfiguration<?, ?> enrichedCfg = enrich(
+ desc.config(), desc.cacheConfigurationEnrichment(), affinityNode);
+
+ desc.config(enrichedCfg);
+
+ desc.configurationEnriched(true);
+
+ return desc;
+ }
+
+ /**
+ * Enriches cache configuration fields with deserialized values from given @{code enrichment}.
+ *
+ * @param ccfg Cache configuration to enrich.
+ * @param enrichment Cache configuration enrichment.
+ * @param affinityNode {@code true} if enrichment is happened on affinity node.
+ *
+ * @return Enriched cache configuration.
+ */
+ public CacheConfiguration<?, ?> enrich(
+ CacheConfiguration<?, ?> ccfg,
+ @Nullable CacheConfigurationEnrichment enrichment,
+ boolean affinityNode
+ ) {
+ if (enrichment == null)
+ return ccfg;
+
+ CacheConfiguration<?, ?> enrichedCp = new CacheConfiguration<>(ccfg);
+
+ try {
+ for (Field field : CacheConfiguration.class.getDeclaredFields())
+ if (field.getDeclaredAnnotation(SerializeSeparately.class) != null) {
+ if (!affinityNode && skipDeserialization(ccfg, field))
+ continue;
+
+ field.setAccessible(true);
+
+ Object enrichedVal = deserialize(
+ field.getName(), enrichment.getFieldSerializedValue(field.getName()));
+
+ field.set(enrichedCp, enrichedVal);
+ }
+ }
+ catch (Exception e) {
+ throw new IgniteException("Failed to enrich cache configuration [cacheName=" + ccfg.getName() + "]", e);
+ }
+
+ return enrichedCp;
+ }
+
+ /**
+ * @see #enrich(CacheConfiguration, CacheConfigurationEnrichment, boolean).
+ * Does the same thing but without skipping any fields.
+ */
+ public CacheConfiguration<?, ?> enrichFully(
+ CacheConfiguration<?, ?> ccfg,
+ CacheConfigurationEnrichment enrichment
+ ) {
+ return enrich(ccfg, enrichment, true);
+ }
+
+ /**
+ * @param fieldName Field name.
+ */
+ private Object deserialize(String fieldName, byte[] serializedVal) {
+ try {
+ return U.unmarshal(marshaller, serializedVal, clsLdr);
+ }
+ catch (Exception e) {
+ throw new IgniteException("Failed to deserialize field " + fieldName, e);
+ }
+ }
+
+ /**
+ * Skips deserialization for some fields.
+ *
+ * @param ccfg Cache configuration.
+ * @param field Field.
+ *
+ * @return {@code true} if deserialization for given field should be skipped.
+ */
+ private static boolean skipDeserialization(CacheConfiguration<?, ?> ccfg, Field field) {
+ if ("storeFactory".equals(field.getName()) && ccfg.getAtomicityMode() == CacheAtomicityMode.ATOMIC)
+ return true;
+
+ return false;
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheConfigurationEnrichment.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheConfigurationEnrichment.java
new file mode 100644
index 0000000..4b766de
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheConfigurationEnrichment.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.io.Serializable;
+import java.util.Map;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Object that contains serialized values for fields marked with {@link org.apache.ignite.configuration.SerializeSeparately}
+ * in {@link org.apache.ignite.configuration.CacheConfiguration}.
+ * This object is needed to exchange and store shrinked cache configurations to avoid possible {@link ClassNotFoundException} errors
+ * during deserialization on nodes where some specific class may not exist.
+ */
+public class CacheConfigurationEnrichment implements Serializable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Field name -> Field serialized value. */
+ private final Map<String, byte[]> enrichFields;
+
+ /** Field name -> Field value class name. */
+ private final Map<String, String> fieldClassNames;
+
+ /** Enrichment fields for {@link org.apache.ignite.configuration.NearCacheConfiguration}. */
+ private volatile @Nullable CacheConfigurationEnrichment nearCacheCfgEnrichment;
+
+ /**
+ * @param enrichFields Enrich fields.
+ * @param fieldClassNames Field class names.
+ */
+ public CacheConfigurationEnrichment(
+ Map<String, byte[]> enrichFields,
+ Map<String, String> fieldClassNames
+ ) {
+ this.enrichFields = enrichFields;
+ this.fieldClassNames = fieldClassNames;
+ }
+
+ /**
+ * @param fieldName Field name.
+ */
+ public byte[] getFieldSerializedValue(String fieldName) {
+ return enrichFields.get(fieldName);
+ }
+
+ /**
+ * @param fieldName Field name.
+ */
+ public String getFieldClassName(String fieldName) {
+ return fieldClassNames.get(fieldName);
+ }
+
+ /**
+ * @param nearCacheCfgEnrichment Enrichment configured for {@link org.apache.ignite.configuration.NearCacheConfiguration}.
+ */
+ public void nearCacheConfigurationEnrichment(CacheConfigurationEnrichment nearCacheCfgEnrichment) {
+ this.nearCacheCfgEnrichment = nearCacheCfgEnrichment;
+ }
+
+ /**
+ * @return Enrichment for configured {@link org.apache.ignite.configuration.NearCacheConfiguration}.
+ */
+ public CacheConfigurationEnrichment nearCacheConfigurationEnrichment() {
+ return nearCacheCfgEnrichment;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return "CacheConfigurationEnrichment{" +
+ "enrichFields=" + enrichFields +
+ '}';
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheConfigurationSplitter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheConfigurationSplitter.java
new file mode 100644
index 0000000..9956763
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheConfigurationSplitter.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.util.typedef.T2;
+
+/**
+ *
+ */
+public interface CacheConfigurationSplitter {
+ /**
+ *
+ * @param desc Description.
+ */
+ default T2<CacheConfiguration, CacheConfigurationEnrichment> split(CacheGroupDescriptor desc) {
+ if (desc.isConfigurationEnriched())
+ return split(desc.config());
+
+ return new T2<>(desc.config(), desc.cacheConfigurationEnrichment());
+ }
+
+ /**
+ * @param desc Description.
+ */
+ default T2<CacheConfiguration, CacheConfigurationEnrichment> split(DynamicCacheDescriptor desc) {
+ if (desc.isConfigurationEnriched())
+ return split(desc.cacheConfiguration());
+
+ return new T2<>(desc.cacheConfiguration(), desc.cacheConfigurationEnrichment());
+ }
+
+ /**
+ * @param ccfg Cache configuration.
+ */
+ T2<CacheConfiguration, CacheConfigurationEnrichment> split(CacheConfiguration ccfg);
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheConfigurationSplitterImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheConfigurationSplitterImpl.java
new file mode 100644
index 0000000..9cb3fd6
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheConfigurationSplitterImpl.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.configuration.SerializeSeparately;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.marshaller.Marshaller;
+
+/**
+ *
+ */
+public class CacheConfigurationSplitterImpl implements CacheConfigurationSplitter {
+ /** Cache configuration to hold default field values. */
+ private static final CacheConfiguration<?, ?> DEFAULT_CACHE_CONFIG = new CacheConfiguration<>();
+
+ /** Near cache configuration to hold default field values. */
+ private static final NearCacheConfiguration<?, ?> DEFAULT_NEAR_CACHE_CONFIG = new NearCacheConfiguration<>();
+
+ /** Marshaller. */
+ private final Marshaller marshaller;
+
+ /**
+ * @param marshaller Marshaller.
+ */
+ public CacheConfigurationSplitterImpl(Marshaller marshaller) {
+ this.marshaller = marshaller;
+ }
+
+ /**
+ * @param ccfg Cache configuration.
+ */
+ @Override public T2<CacheConfiguration, CacheConfigurationEnrichment> split(CacheConfiguration ccfg) {
+ try {
+ CacheConfiguration cfgCp = new CacheConfiguration(ccfg);
+
+ CacheConfigurationEnrichment enrichment = buildEnrichment(
+ CacheConfiguration.class, cfgCp, DEFAULT_CACHE_CONFIG);
+
+ return new T2<>(cfgCp, enrichment);
+ }
+ catch (Exception e) {
+ throw new IgniteException("Failed to split cache configuration", e);
+ }
+ }
+
+ /**
+ * Builds {@link CacheConfigurationEnrichment} from given config.
+ * It extracts all field values to enrichment object replacing values of that fields with default.
+ *
+ * @param cfgCls Configuration class.
+ * @param cfg Configuration to build enrichment from.
+ * @param dfltCfg Default configuration to replace enriched values with default.
+ * @param <T> Configuration class.
+ * @return Enrichment object for given config.
+ * @throws IllegalAccessException If failed.
+ */
+ private <T> CacheConfigurationEnrichment buildEnrichment(
+ Class<T> cfgCls,
+ T cfg,
+ T dfltCfg
+ ) throws IllegalAccessException {
+ Map<String, byte[]> enrichment = new HashMap<>();
+ Map<String, String> fieldClsNames = new HashMap<>();
+
+ for (Field field : cfgCls.getDeclaredFields()) {
+ if (field.getDeclaredAnnotation(SerializeSeparately.class) != null) {
+ field.setAccessible(true);
+
+ Object val = field.get(cfg);
+
+ byte[] serializedVal = serialize(field.getName(), val);
+
+ enrichment.put(field.getName(), serializedVal);
+
+ fieldClsNames.put(field.getName(), val != null ? val.getClass().getName() : null);
+
+ // Replace field in original configuration with default value.
+ field.set(cfg, field.get(dfltCfg));
+ }
+ }
+
+ return new CacheConfigurationEnrichment(enrichment, fieldClsNames);
+ }
+
+ /**
+ * @param val Value.
+ */
+ private byte[] serialize(String fieldName, Object val) {
+ try {
+ return U.marshal(marshaller, val);
+ }
+ catch (Exception e) {
+ throw new IgniteException("Failed to serialize field [fieldName=" + fieldName + ", value=" + val + ']', e);
+ }
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheConfigurationSplitterOldFormat.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheConfigurationSplitterOldFormat.java
new file mode 100644
index 0000000..e180db9
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheConfigurationSplitterOldFormat.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.util.typedef.T2;
+
+/**
+ *
+ */
+public class CacheConfigurationSplitterOldFormat implements CacheConfigurationSplitter {
+ /** Enricher to merge cache configuration and enrichment to support old (full) format. */
+ private final CacheConfigurationEnricher enricher;
+
+ /**
+ * @param enricher Enricher.
+ */
+ public CacheConfigurationSplitterOldFormat(CacheConfigurationEnricher enricher) {
+ this.enricher = enricher;
+ }
+
+ /** {@inheritDoc} */
+ @Override public T2<CacheConfiguration, CacheConfigurationEnrichment> split(CacheGroupDescriptor desc) {
+ if (!desc.isConfigurationEnriched())
+ return new T2<>(
+ enricher.enrichFully(desc.config(), desc.cacheConfigurationEnrichment()), null);
+ else
+ return new T2<>(desc.config(), null);
+ }
+
+ /** {@inheritDoc} */
+ @Override public T2<CacheConfiguration, CacheConfigurationEnrichment> split(DynamicCacheDescriptor desc) {
+ if (!desc.isConfigurationEnriched())
+ return new T2<>(
+ enricher.enrichFully(desc.cacheConfiguration(), desc.cacheConfigurationEnrichment()), null);
+ else
+ return new T2<>(desc.cacheConfiguration(), null);
+
+ }
+
+ /** {@inheritDoc} */
+ @Override public T2<CacheConfiguration, CacheConfigurationEnrichment> split(CacheConfiguration ccfg) {
+ return new T2<>(ccfg, null);
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java
index dee32fa..7760721 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java
@@ -64,6 +64,9 @@ public class CacheData implements Serializable {
/** Flags added for future usage. */
private final long flags;
+ /** Cache configuration enrichment. */
+ private final CacheConfigurationEnrichment cacheCfgEnrichment;
+
/**
* @param cacheCfg Cache configuration.
* @param cacheId Cache ID.
@@ -87,7 +90,9 @@ public class CacheData implements Serializable {
boolean staticCfg,
boolean sql,
boolean template,
- long flags) {
+ long flags,
+ CacheConfigurationEnrichment cacheCfgEnrichment
+ ) {
assert cacheCfg != null;
assert rcvdFrom != null : cacheCfg.getName();
assert deploymentId != null : cacheCfg.getName();
@@ -105,6 +110,7 @@ public class CacheData implements Serializable {
this.sql = sql;
this.template = template;
this.flags = flags;
+ this.cacheCfgEnrichment = cacheCfgEnrichment;
}
/**
@@ -184,6 +190,13 @@ public class CacheData implements Serializable {
return flags;
}
+ /**
+ *
+ */
+ public CacheConfigurationEnrichment cacheConfigurationEnrichment() {
+ return cacheCfgEnrichment;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(CacheData.class, this, "cacheName", cacheCfg.getName());
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupData.java
index 5588cfb..2dacbe2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupData.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupData.java
@@ -17,6 +17,10 @@
package org.apache.ignite.internal.processors.cache;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -24,11 +28,6 @@ import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;
-import java.io.Serializable;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
/**
*
*/
@@ -70,6 +69,9 @@ public class CacheGroupData implements Serializable {
/** WAL change requests. */
private final List<WalStateProposeMessage> walChangeReqs;
+ /** Cache configuration enrichment. */
+ private final CacheConfigurationEnrichment cacheCfgEnrichment;
+
/**
* @param cacheCfg Cache configuration.
* @param grpName Group name.
@@ -93,7 +95,9 @@ public class CacheGroupData implements Serializable {
long flags,
boolean persistenceEnabled,
boolean walEnabled,
- List<WalStateProposeMessage> walChangeReqs) {
+ List<WalStateProposeMessage> walChangeReqs,
+ CacheConfigurationEnrichment cacheCfgEnrichment
+ ) {
assert cacheCfg != null;
assert grpId != 0 : cacheCfg.getName();
assert deploymentId != null : cacheCfg.getName();
@@ -109,6 +113,7 @@ public class CacheGroupData implements Serializable {
this.persistenceEnabled = persistenceEnabled;
this.walEnabled = walEnabled;
this.walChangeReqs = walChangeReqs;
+ this.cacheCfgEnrichment = cacheCfgEnrichment;
}
/**
@@ -181,6 +186,13 @@ public class CacheGroupData implements Serializable {
return walChangeReqs;
}
+ /**
+ * @return Cache configuration enrichment.
+ */
+ public CacheConfigurationEnrichment cacheConfigurationEnrichment() {
+ return cacheCfgEnrichment;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(CacheGroupData.class, this);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupDescriptor.java
index e72de28..2963aea 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupDescriptor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupDescriptor.java
@@ -54,7 +54,7 @@ public class CacheGroupDescriptor {
/** */
@GridToStringExclude
- private final CacheConfiguration<?, ?> cacheCfg;
+ private volatile CacheConfiguration<?, ?> cacheCfg;
/** */
@GridToStringInclude
@@ -72,6 +72,12 @@ public class CacheGroupDescriptor {
/** Pending WAL change requests. */
private final LinkedList<WalStateProposeMessage> walChangeReqs;
+ /** Cache config enrichment. */
+ private final CacheConfigurationEnrichment cacheCfgEnrichment;
+
+ /** Is configuration enriched. */
+ private volatile boolean cacheCfgEnriched;
+
/**
* @param cacheCfg Cache configuration.
* @param grpName Group name.
@@ -95,7 +101,9 @@ public class CacheGroupDescriptor {
Map<String, Integer> caches,
boolean persistenceEnabled,
boolean walEnabled,
- @Nullable Collection<WalStateProposeMessage> walChangeReqs) {
+ @Nullable Collection<WalStateProposeMessage> walChangeReqs,
+ CacheConfigurationEnrichment cacheCfgEnrichment
+ ) {
assert cacheCfg != null;
assert grpId != 0;
@@ -109,6 +117,7 @@ public class CacheGroupDescriptor {
this.persistenceEnabled = persistenceEnabled;
this.walEnabled = walEnabled;
this.walChangeReqs = walChangeReqs == null ? new LinkedList<>() : new LinkedList<>(walChangeReqs);
+ this.cacheCfgEnrichment = cacheCfgEnrichment;
}
/**
@@ -256,6 +265,13 @@ public class CacheGroupDescriptor {
}
/**
+ * @param cacheCfg Cache config.
+ */
+ public void config(CacheConfiguration cacheCfg) {
+ this.cacheCfg = cacheCfg;
+ }
+
+ /**
* @return Group caches.
*/
public Map<String, Integer> caches() {
@@ -308,6 +324,27 @@ public class CacheGroupDescriptor {
return persistenceEnabled;
}
+ /**
+ * @return Cache configuration enrichment.
+ */
+ public CacheConfigurationEnrichment cacheConfigurationEnrichment() {
+ return cacheCfgEnrichment;
+ }
+
+ /**
+ * @return {@code True} if cache configuration is already enriched.
+ */
+ public boolean isConfigurationEnriched() {
+ return cacheCfgEnrichment == null || cacheCfgEnriched;
+ }
+
+ /**
+ * @param cacheCfgEnriched Is configuration enriched.
+ */
+ public void configurationEnriched(boolean cacheCfgEnriched) {
+ this.cacheCfgEnriched = cacheCfgEnriched;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(CacheGroupDescriptor.class, this, "cacheName", cacheCfg.getName());
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheJoinNodeDiscoveryData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheJoinNodeDiscoveryData.java
index bb0b59b..4ab3266 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheJoinNodeDiscoveryData.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheJoinNodeDiscoveryData.java
@@ -57,7 +57,8 @@ public class CacheJoinNodeDiscoveryData implements Serializable {
IgniteUuid cacheDeploymentId,
Map<String, CacheJoinNodeDiscoveryData.CacheInfo> caches,
Map<String, CacheJoinNodeDiscoveryData.CacheInfo> templates,
- boolean startCaches) {
+ boolean startCaches
+ ) {
this.cacheDeploymentId = cacheDeploymentId;
this.caches = caches;
this.templates = templates;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachesRegistry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachesRegistry.java
index d37f69c..3b906ca 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachesRegistry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachesRegistry.java
@@ -280,7 +280,7 @@ public class CachesRegistry {
*/
private IgniteInternalFuture<?> persistCacheConfigurations(List<DynamicCacheDescriptor> cacheDescriptors) {
List<StoredCacheData> cacheConfigsToPersist = cacheDescriptors.stream()
- .map(DynamicCacheDescriptor::toStoredData)
+ .map(desc -> desc.toStoredData(cctx.cache().splitter()))
.collect(Collectors.toList());
// Pre-create cache work directories if they don't exist.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
index 84bcca1..fbd650d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
@@ -43,9 +43,11 @@ import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.NearCacheConfiguration;
import org.apache.ignite.internal.GridCachePluginContext;
import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteFeatures;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteNodeAttributes;
import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage;
@@ -326,8 +328,8 @@ class ClusterCachesInfo {
@SuppressWarnings("unchecked")
private void checkCache(CacheJoinNodeDiscoveryData.CacheInfo locInfo, CacheData rmtData, UUID rmt)
throws IgniteCheckedException {
- GridCacheAttributes rmtAttr = new GridCacheAttributes(rmtData.cacheConfiguration());
- GridCacheAttributes locAttr = new GridCacheAttributes(locInfo.cacheData().config());
+ GridCacheAttributes rmtAttr = new GridCacheAttributes(rmtData.cacheConfiguration(), rmtData.cacheConfigurationEnrichment());
+ GridCacheAttributes locAttr = new GridCacheAttributes(locInfo.cacheData().config(), locInfo.cacheData().cacheConfigurationEnrichment());
CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "cacheMode", "Cache mode",
locAttr.cacheMode(), rmtAttr.cacheMode(), true);
@@ -804,7 +806,9 @@ class ClusterCachesInfo {
false,
false,
req.deploymentId(),
- req.schema());
+ req.schema(),
+ req.cacheConfigurationEnrichment()
+ );
DynamicCacheDescriptor old = registeredTemplates().put(ccfg.getName(), templateDesc);
@@ -923,7 +927,9 @@ class ClusterCachesInfo {
cacheId,
req.initiatingNodeId(),
req.deploymentId(),
- req.encryptionKey());
+ req.encryptionKey(),
+ req.cacheConfigurationEnrichment()
+ );
DynamicCacheDescriptor startDesc = new DynamicCacheDescriptor(ctx,
ccfg,
@@ -934,7 +940,9 @@ class ClusterCachesInfo {
false,
req.sql(),
req.deploymentId(),
- req.schema());
+ req.schema(),
+ req.cacheConfigurationEnrichment()
+ );
DynamicCacheDescriptor old = registeredCaches.put(ccfg.getName(), startDesc);
@@ -1136,8 +1144,12 @@ class ClusterCachesInfo {
private CacheNodeCommonDiscoveryData collectCommonDiscoveryData() {
Map<Integer, CacheGroupData> cacheGrps = new HashMap<>();
+ CacheConfigurationSplitter cfgSplitter = ctx.cache().backwardCompatibleSplitter();
+
for (CacheGroupDescriptor grpDesc : registeredCacheGrps.values()) {
- CacheGroupData grpData = new CacheGroupData(grpDesc.config(),
+ T2<CacheConfiguration, CacheConfigurationEnrichment> splitCfg = cfgSplitter.split(grpDesc);
+
+ CacheGroupData grpData = new CacheGroupData(splitCfg.get1(),
grpDesc.groupName(),
grpDesc.groupId(),
grpDesc.receivedFrom(),
@@ -1147,7 +1159,8 @@ class ClusterCachesInfo {
0,
grpDesc.persistenceEnabled(),
grpDesc.walEnabled(),
- grpDesc.walChangeRequests());
+ grpDesc.walChangeRequests(),
+ splitCfg.get2());
cacheGrps.put(grpDesc.groupId(), grpData);
}
@@ -1155,7 +1168,9 @@ class ClusterCachesInfo {
Map<String, CacheData> caches = new HashMap<>();
for (DynamicCacheDescriptor desc : registeredCaches.values()) {
- CacheData cacheData = new CacheData(desc.cacheConfiguration(),
+ T2<CacheConfiguration, CacheConfigurationEnrichment> splitCfg = cfgSplitter.split(desc);
+
+ CacheData cacheData = new CacheData(splitCfg.get1(),
desc.cacheId(),
desc.groupId(),
desc.cacheType(),
@@ -1165,7 +1180,9 @@ class ClusterCachesInfo {
desc.staticallyConfigured(),
desc.sql(),
false,
- 0);
+ 0,
+ splitCfg.get2()
+ );
caches.put(desc.cacheName(), cacheData);
}
@@ -1173,7 +1190,10 @@ class ClusterCachesInfo {
Map<String, CacheData> templates = new HashMap<>();
for (DynamicCacheDescriptor desc : registeredTemplates.values()) {
- CacheData cacheData = new CacheData(desc.cacheConfiguration(),
+ T2<CacheConfiguration, CacheConfigurationEnrichment> splitCfg = cfgSplitter.split(desc);
+
+ CacheData cacheData = new CacheData(
+ splitCfg.get1(),
0,
0,
desc.cacheType(),
@@ -1183,7 +1203,9 @@ class ClusterCachesInfo {
desc.staticallyConfigured(),
false,
true,
- 0);
+ 0,
+ splitCfg.get2()
+ );
templates.put(desc.cacheName(), cacheData);
}
@@ -1209,6 +1231,8 @@ class ClusterCachesInfo {
CacheNodeCommonDiscoveryData cachesData = (CacheNodeCommonDiscoveryData)data.commonData();
+ validateNoNewCachesWithNewFormat(cachesData);
+
// CacheGroup configurations that were created from local node configuration.
Map<Integer, CacheGroupDescriptor> locCacheGrps = new HashMap<>(registeredCacheGroups());
@@ -1232,6 +1256,38 @@ class ClusterCachesInfo {
}
/**
+ * Validates that joining node doesn't have newly configured caches
+ * in case when there is no cluster-wide support of SPLITTED_CACHE_CONFIGURATIONS.
+ *
+ * If validation is failed that caches will be destroyed cluster-wide and node joining process will be failed.
+ *
+ * @param clusterWideCacheData Cluster wide cache data.
+ */
+ public void validateNoNewCachesWithNewFormat(CacheNodeCommonDiscoveryData clusterWideCacheData) {
+ IgniteDiscoverySpi spi = (IgniteDiscoverySpi) ctx.discovery().getInjectedDiscoverySpi();
+
+ boolean allowSplitCacheConfigurations = spi.allNodesSupport(IgniteFeatures.SPLITTED_CACHE_CONFIGURATIONS);
+
+ if (!allowSplitCacheConfigurations) {
+ List<String> cachesToDestroy = new ArrayList<>();
+
+ for (DynamicCacheDescriptor cacheDescriptor : registeredCaches().values()) {
+ CacheData clusterCacheData = clusterWideCacheData.caches().get(cacheDescriptor.cacheName());
+
+ // Node spawned new cache.
+ if (clusterCacheData.receivedFrom().equals(cacheDescriptor.receivedFrom()))
+ cachesToDestroy.add(cacheDescriptor.cacheName());
+ }
+
+ if (!cachesToDestroy.isEmpty()) {
+ ctx.cache().dynamicDestroyCaches(cachesToDestroy, false);
+
+ throw new IllegalStateException("Node can't join to cluster in compatibility mode with newly configured caches: " + cachesToDestroy);
+ }
+ }
+ }
+
+ /**
* Validation {@link #registeredCaches} on conflicts.
*
* @return Error message if conflicts was found.
@@ -1301,7 +1357,8 @@ class ClusterCachesInfo {
cacheData.staticallyConfigured(),
cacheData.sql(),
cacheData.deploymentId(),
- new QuerySchema(cacheData.schema().entities())
+ new QuerySchema(cacheData.schema().entities()),
+ cacheData.cacheConfigurationEnrichment()
);
Collection<QueryEntity> localQueryEntities = getLocalQueryEntities(cfg.getName());
@@ -1381,7 +1438,9 @@ class ClusterCachesInfo {
cacheData.staticallyConfigured(),
false,
cacheData.deploymentId(),
- cacheData.schema());
+ cacheData.schema(),
+ cacheData.cacheConfigurationEnrichment()
+ );
registeredTemplates.put(cacheData.cacheConfiguration().getName(), desc);
}
@@ -1406,7 +1465,9 @@ class ClusterCachesInfo {
grpData.caches(),
grpData.persistenceEnabled(),
grpData.walEnabled(),
- grpData.walChangeRequests());
+ grpData.walChangeRequests(),
+ grpData.cacheConfigurationEnrichment()
+ );
if (locCacheGrps.containsKey(grpDesc.groupId())) {
CacheGroupDescriptor locGrpCfg = locCacheGrps.get(grpDesc.groupId());
@@ -1510,7 +1571,9 @@ class ClusterCachesInfo {
desc.staticallyConfigured(),
desc.sql(),
desc.deploymentId(),
- desc.schema().copy());
+ desc.schema().copy(),
+ locCfg.cacheData().cacheConfigurationEnrichment()
+ );
desc0.startTopologyVersion(desc.startTopologyVersion());
desc0.receivedFromStartVersion(desc.receivedFromStartVersion());
@@ -1826,24 +1889,29 @@ class ClusterCachesInfo {
private void registerNewCache(
CacheJoinNodeDiscoveryData joinData,
UUID nodeId,
- CacheJoinNodeDiscoveryData.CacheInfo cacheInfo) {
+ CacheJoinNodeDiscoveryData.CacheInfo cacheInfo
+ ) {
CacheConfiguration<?, ?> cfg = cacheInfo.cacheData().config();
int cacheId = CU.cacheId(cfg.getName());
- CacheGroupDescriptor grpDesc = registerCacheGroup(null,
+ CacheGroupDescriptor grpDesc = registerCacheGroup(
+ null,
null,
cfg,
cacheId,
nodeId,
joinData.cacheDeploymentId(),
- null);
+ null,
+ cacheInfo.cacheData().cacheConfigurationEnrichment()
+ );
ctx.discovery().setCacheFilter(
cacheId,
grpDesc.groupId(),
cfg.getName(),
- cfg.getNearConfiguration() != null);
+ cfg.getNearConfiguration() != null
+ );
DynamicCacheDescriptor desc = new DynamicCacheDescriptor(ctx,
cfg,
@@ -1854,7 +1922,9 @@ class ClusterCachesInfo {
cacheInfo.isStaticallyConfigured(),
cacheInfo.sql(),
joinData.cacheDeploymentId(),
- new QuerySchema(cacheInfo.cacheData().queryEntities()));
+ new QuerySchema(cacheInfo.cacheData().queryEntities()),
+ cacheInfo.cacheData().cacheConfigurationEnrichment()
+ );
DynamicCacheDescriptor old = registeredCaches.put(cfg.getName(), desc);
@@ -1881,7 +1951,9 @@ class ClusterCachesInfo {
true,
false,
joinData.cacheDeploymentId(),
- new QuerySchema(cacheInfo.cacheData().queryEntities()));
+ new QuerySchema(cacheInfo.cacheData().queryEntities()),
+ cacheInfo.cacheData().cacheConfigurationEnrichment()
+ );
DynamicCacheDescriptor old = registeredTemplates.put(cfg.getName(), desc);
@@ -1961,7 +2033,8 @@ class ClusterCachesInfo {
Integer cacheId,
UUID rcvdFrom,
IgniteUuid deploymentId,
- @Nullable byte[] encKey
+ @Nullable byte[] encKey,
+ CacheConfigurationEnrichment cacheCfgEnrichment
) {
if (startedCacheCfg.getGroupName() != null) {
CacheGroupDescriptor desc = cacheGroupByName(startedCacheCfg.getGroupName());
@@ -1989,7 +2062,9 @@ class ClusterCachesInfo {
caches,
persistent,
persistent,
- null);
+ null,
+ cacheCfgEnrichment
+ );
if (startedCacheCfg.isEncryptionEnabled())
ctx.encryption().beforeCacheGroupStart(grpId, encKey);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
index 8128230..3032dc8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
@@ -24,6 +24,7 @@ import org.apache.ignite.configuration.NearCacheConfiguration;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.query.QuerySchema;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;
@@ -100,6 +101,9 @@ public class DynamicCacheChangeRequest implements Serializable {
/** Encryption key. */
@Nullable private byte[] encKey;
+ /** Cache configuration enrichment. */
+ private CacheConfigurationEnrichment cacheCfgEnrichment;
+
/**
* @param reqId Unique request ID.
* @param cacheName Cache stop name.
@@ -138,7 +142,12 @@ public class DynamicCacheChangeRequest implements Serializable {
DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(UUID.randomUUID(), cfg.getName(), ctx.localNodeId());
req.template(true);
- req.startCacheConfiguration(cfg);
+
+ T2<CacheConfiguration, CacheConfigurationEnrichment> splitCfg = ctx.cache().backwardCompatibleSplitter().split(cfg);
+
+ req.startCacheConfiguration(splitCfg.get1());
+ req.cacheConfigurationEnrichment(splitCfg.get2());
+
req.schema(new QuerySchema(cfg.getQueryEntities()));
req.deploymentId(IgniteUuid.randomUuid());
@@ -457,6 +466,20 @@ public class DynamicCacheChangeRequest implements Serializable {
return encKey;
}
+ /**
+ * @return Cache configuration enrichment.
+ */
+ public CacheConfigurationEnrichment cacheConfigurationEnrichment() {
+ return cacheCfgEnrichment;
+ }
+
+ /**
+ * @param cacheCfgEnrichment Cache config enrichment.
+ */
+ public void cacheConfigurationEnrichment(CacheConfigurationEnrichment cacheCfgEnrichment) {
+ this.cacheCfgEnrichment = cacheCfgEnrichment;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return "DynamicCacheChangeRequest [cacheName=" + cacheName() +
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
index 93882a2..a1e1cf1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
@@ -30,6 +30,7 @@ import org.apache.ignite.internal.processors.query.QuerySchema;
import org.apache.ignite.internal.processors.query.QuerySchemaPatch;
import org.apache.ignite.internal.processors.query.schema.message.SchemaFinishDiscoveryMessage;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -45,7 +46,7 @@ public class DynamicCacheDescriptor {
/** Cache configuration. */
@GridToStringExclude
- private CacheConfiguration cacheCfg;
+ private volatile CacheConfiguration cacheCfg;
/** Statically configured flag. */
private final boolean staticCfg;
@@ -95,6 +96,12 @@ public class DynamicCacheDescriptor {
/** */
private final CacheGroupDescriptor grpDesc;
+ /** Cache config enrichment. */
+ private final @Nullable CacheConfigurationEnrichment cacheCfgEnrichment;
+
+ /** Cache config enriched. */
+ private volatile boolean cacheCfgEnriched;
+
/**
* @param ctx Context.
* @param cacheCfg Cache configuration.
@@ -117,7 +124,9 @@ public class DynamicCacheDescriptor {
boolean staticCfg,
boolean sql,
IgniteUuid deploymentId,
- QuerySchema schema) {
+ QuerySchema schema,
+ @Nullable CacheConfigurationEnrichment cacheCfgEnrichment
+ ) {
assert cacheCfg != null;
assert grpDesc != null || template;
assert schema != null;
@@ -142,6 +151,8 @@ public class DynamicCacheDescriptor {
synchronized (schemaMux) {
this.schema = schema.copy();
}
+
+ this.cacheCfgEnrichment = cacheCfgEnrichment;
}
/**
@@ -221,6 +232,13 @@ public class DynamicCacheDescriptor {
}
/**
+ * @param cacheCfg Cache config.
+ */
+ public void cacheConfiguration(CacheConfiguration cacheCfg) {
+ this.cacheCfg = cacheCfg;
+ }
+
+ /**
* Creates and caches cache object context if needed.
*
* @param proc Object processor.
@@ -378,7 +396,7 @@ public class DynamicCacheDescriptor {
* from page store. Essentially, this method takes from {@link DynamicCacheDescriptor} all that's needed to start
* cache correctly, leaving out everything else.
*/
- public StoredCacheData toStoredData() {
+ public StoredCacheData toStoredData(CacheConfigurationSplitter splitter) {
assert schema != null;
StoredCacheData res = new StoredCacheData(cacheConfiguration());
@@ -386,9 +404,39 @@ public class DynamicCacheDescriptor {
res.queryEntities(schema().entities());
res.sql(sql());
+ if (isConfigurationEnriched()) {
+ T2<CacheConfiguration, CacheConfigurationEnrichment> splitCfg = splitter.split(this);
+
+ res.config(splitCfg.get1());
+ res.cacheConfigurationEnrichment(splitCfg.get2());
+ }
+ else
+ res.cacheConfigurationEnrichment(cacheCfgEnrichment);
+
return res;
}
+ /**
+ * @return Cache configuration enrichment.
+ */
+ public CacheConfigurationEnrichment cacheConfigurationEnrichment() {
+ return cacheCfgEnrichment;
+ }
+
+ /**
+ * @return {@code True} if configuration is already enriched.
+ */
+ public boolean isConfigurationEnriched() {
+ return cacheCfgEnrichment == null || cacheCfgEnriched;
+ }
+
+ /**
+ * @param cacheCfgEnriched Flag indicates that configuration is enriched.
+ */
+ public void configurationEnriched(boolean cacheCfgEnriched) {
+ this.cacheCfgEnriched = cacheCfgEnriched;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(DynamicCacheDescriptor.class, this, "cacheName", U.maskName(cacheCfg.getName()));
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index fca8abe..7058733 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -3782,9 +3782,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
for (Object key : keys)
A.notNull(key, "key");
- if (!ctx.store().configured())
- return new GridFinishedFuture<>();
-
//TODO IGNITE-7954
MvccUtils.verifyMvccOperationSupport(ctx, "Load");
@@ -3794,21 +3791,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
final boolean keepBinary = opCtx != null && opCtx.isKeepBinary();
- if (replaceExisting) {
- if (ctx.store().isLocal())
- return runLoadKeysCallable(keys, plc, keepBinary, true);
- else {
- return ctx.closures().callLocalSafe(new Callable<Void>() {
- @Override public Void call() throws Exception {
- localLoadAndUpdate(keys);
-
- return null;
- }
- });
- }
- }
- else
- return runLoadKeysCallable(keys, plc, keepBinary, false);
+ return runLoadKeysCallable(keys, plc, keepBinary, replaceExisting);
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAttributes.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAttributes.java
index 230320a..1fc2e64 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAttributes.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAttributes.java
@@ -46,12 +46,25 @@ public class GridCacheAttributes implements Serializable {
/** Cache configuration. */
private CacheConfiguration ccfg;
+ /** Cache configuration enrichment. */
+ private CacheConfigurationEnrichment enrichment;
+
+
/**
* @param cfg Cache configuration.
*
*/
public GridCacheAttributes(CacheConfiguration cfg) {
- ccfg = cfg;
+ this.ccfg = cfg;
+ }
+
+ /**
+ * @param cfg Cache configuration.
+ *
+ */
+ public GridCacheAttributes(CacheConfiguration cfg, CacheConfigurationEnrichment enrichment) {
+ this.ccfg = cfg;
+ this.enrichment = enrichment;
}
/**
@@ -154,6 +167,9 @@ public class GridCacheAttributes implements Serializable {
* @return Eviction filter class name.
*/
public String evictionFilterClassName() {
+ if (enrichment != null)
+ return enrichment.getFieldClassName("evictFilter");
+
return className(ccfg.getEvictionFilter());
}
@@ -171,6 +187,9 @@ public class GridCacheAttributes implements Serializable {
* @return Eviction policy factory class name.
*/
public String evictionPolicyFactoryClassName() {
+ if (enrichment != null)
+ return enrichment.getFieldClassName("evictPlcFactory");
+
return className(ccfg.getEvictionPolicyFactory());
}
@@ -192,13 +211,24 @@ public class GridCacheAttributes implements Serializable {
* @return Near eviction policy factory class name.
*/
public String nearEvictionPolicyFactoryClassName() {
- return className(ccfg.getEvictionPolicyFactory());
+ NearCacheConfiguration nearCfg = ccfg.getNearConfiguration();
+
+ if (nearCfg == null)
+ return null;
+
+ if (enrichment != null && enrichment.nearCacheConfigurationEnrichment() != null)
+ return enrichment.nearCacheConfigurationEnrichment().getFieldClassName("nearEvictPlcFactory");
+
+ return className(nearCfg.getNearEvictionPolicyFactory());
}
/**
* @return Store class name.
*/
public String storeFactoryClassName() {
+ if (enrichment != null)
+ return enrichment.getFieldClassName("storeFactory");
+
return className(ccfg.getCacheStoreFactory());
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index fb15175..3fe99ef 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.processors.cache;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
-import java.util.Comparator;
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
@@ -76,8 +75,10 @@ import org.apache.ignite.internal.IgniteTransactionsEx;
import org.apache.ignite.internal.binary.BinaryContext;
import org.apache.ignite.internal.binary.BinaryMarshaller;
import org.apache.ignite.internal.binary.GridBinaryMarshaller;
+import org.apache.ignite.internal.cluster.DetachedClusterNode;
import org.apache.ignite.internal.managers.communication.GridIoPolicy;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi;
import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager;
import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
@@ -155,6 +156,7 @@ import org.apache.ignite.internal.util.lang.IgniteThrowableFunction;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.CIX1;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.CU;
@@ -297,6 +299,12 @@ public class GridCacheProcessor extends GridProcessorAdapter {
/** Node's local cache configurations (both from static configuration and from persistent caches). */
private CacheJoinNodeDiscoveryData localConfigs;
+ /** Cache configuration splitter. */
+ private CacheConfigurationSplitter splitter;
+
+ /** Cache configuration enricher. */
+ private CacheConfigurationEnricher enricher;
+
/**
* @param ctx Kernal context.
*/
@@ -309,6 +317,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
internalCaches = new HashSet<>();
marsh = MarshallerUtils.jdkMarshaller(ctx.igniteInstanceName());
+ splitter = new CacheConfigurationSplitterImpl(marsh);
+ enricher = new CacheConfigurationEnricher(marsh, U.resolveClassLoader(ctx.config()));
}
/**
@@ -578,7 +588,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
"Custom IndexingSpi cannot be used with TRANSACTIONAL_SNAPSHOT atomicity mode");
}
- if (cc.isWriteBehindEnabled()) {
+ if (cc.isWriteBehindEnabled() && ctx.discovery().cacheAffinityNode(ctx.discovery().localNode(), cc.getName())) {
if (cfgStore == null)
throw new IgniteCheckedException("Cannot enable write-behind (writer or store is not provided) " +
"for cache: " + U.maskName(cc.getName()));
@@ -593,11 +603,13 @@ public class GridCacheProcessor extends GridProcessorAdapter {
"'writeBehindFlushSize' parameters to 0 for cache: " + U.maskName(cc.getName()));
}
- if (cc.isReadThrough() && cfgStore == null)
+ if (cc.isReadThrough() && cfgStore == null
+ && ctx.discovery().cacheAffinityNode(ctx.discovery().localNode(), cc.getName()))
throw new IgniteCheckedException("Cannot enable read-through (loader or store is not provided) " +
"for cache: " + U.maskName(cc.getName()));
- if (cc.isWriteThrough() && cfgStore == null)
+ if (cc.isWriteThrough() && cfgStore == null
+ && ctx.discovery().cacheAffinityNode(ctx.discovery().localNode(), cc.getName()))
throw new IgniteCheckedException("Cannot enable write-through (writer or store is not provided) " +
"for cache: " + U.maskName(cc.getName()));
@@ -873,6 +885,13 @@ public class GridCacheProcessor extends GridProcessorAdapter {
cacheData.sql(sql);
+ T2<CacheConfiguration, CacheConfigurationEnrichment> splitCfg = splitter().split(cfg);
+
+ cacheData.config(splitCfg.get1());
+ cacheData.cacheConfigurationEnrichment(splitCfg.get2());
+
+ cfg = splitCfg.get1();
+
if (GridCacheUtils.isCacheTemplateName(cacheName))
templates.put(cacheName, new CacheInfo(cacheData, CacheType.USER, false, 0, true));
else {
@@ -940,6 +959,19 @@ public class GridCacheProcessor extends GridProcessorAdapter {
List<String> skippedConfigs = new ArrayList<>();
for (StoredCacheData storedCacheData : storedCaches.values()) {
+ // Backward compatibility for old stored caches data.
+ if (storedCacheData.hasOldCacheConfigurationFormat()) {
+ storedCacheData = new StoredCacheData(storedCacheData);
+
+ T2<CacheConfiguration, CacheConfigurationEnrichment> splitCfg = splitter().split(storedCacheData.config());
+
+ storedCacheData.config(splitCfg.get1());
+ storedCacheData.cacheConfigurationEnrichment(splitCfg.get2());
+
+ // Overwrite with new format.
+ saveCacheConfiguration(storedCacheData);
+ }
+
String cacheName = storedCacheData.config().getName();
CacheType type = cacheType(cacheName);
@@ -1363,8 +1395,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
public void initQueryStructuresForNotStartedCache(DynamicCacheDescriptor cacheDesc) throws IgniteCheckedException {
QuerySchema schema = cacheDesc.schema() != null ? cacheDesc.schema() : new QuerySchema();
- CacheObjectContext coCtx = cacheDesc.cacheObjectContext(ctx.cacheObjects());
-
GridCacheContextInfo cacheInfo = new GridCacheContextInfo(cacheDesc);
ctx.query().onCacheStart(cacheInfo, schema, cacheDesc.sql());
@@ -2176,7 +2206,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
startCacheInfo,
cacheInfo -> {
prepareCacheStart(
- cacheInfo.getCacheDescriptor().cacheConfiguration(),
cacheInfo.getCacheDescriptor(),
cacheInfo.getReqNearCfg(),
cacheInfo.getExchangeTopVer(),
@@ -2206,7 +2235,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
startCacheInfo,
cacheInfo -> {
GridCacheContext cacheCtx = prepareCacheContext(
- cacheInfo.getCacheDescriptor().cacheConfiguration(),
cacheInfo.getCacheDescriptor(),
cacheInfo.getReqNearCfg(),
cacheInfo.getExchangeTopVer(),
@@ -2288,7 +2316,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
}
/**
- * @param startCfg Cache configuration to use.
* @param desc Cache descriptor.
* @param reqNearCfg Near configuration if specified for client cache start request.
* @param exchTopVer Current exchange version.
@@ -2297,14 +2324,13 @@ public class GridCacheProcessor extends GridProcessorAdapter {
* @throws IgniteCheckedException If failed.
*/
public void prepareCacheStart(
- CacheConfiguration startCfg,
DynamicCacheDescriptor desc,
@Nullable NearCacheConfiguration reqNearCfg,
AffinityTopologyVersion exchTopVer,
boolean disabledAfterStart,
boolean clientCache
) throws IgniteCheckedException {
- GridCacheContext cacheCtx = prepareCacheContext(startCfg, desc, reqNearCfg, exchTopVer, disabledAfterStart);
+ GridCacheContext cacheCtx = prepareCacheContext(desc, reqNearCfg, exchTopVer, disabledAfterStart);
if (cacheCtx.isRecoveryMode())
finishRecovery(exchTopVer, cacheCtx);
@@ -2322,7 +2348,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
/**
* Preparing cache context to start.
*
- * @param startCfg Cache configuration to use.
* @param desc Cache descriptor.
* @param reqNearCfg Near configuration if specified for client cache start request.
* @param exchTopVer Current exchange version.
@@ -2332,12 +2357,16 @@ public class GridCacheProcessor extends GridProcessorAdapter {
* @throws IgniteCheckedException if failed.
*/
private GridCacheContext prepareCacheContext(
- CacheConfiguration startCfg,
DynamicCacheDescriptor desc,
@Nullable NearCacheConfiguration reqNearCfg,
AffinityTopologyVersion exchTopVer,
boolean disabledAfterStart
) throws IgniteCheckedException {
+ desc = enricher().enrich(desc,
+ desc.cacheConfiguration().getCacheMode() == LOCAL || isLocalAffinity(desc.cacheConfiguration()));
+
+ CacheConfiguration startCfg = desc.cacheConfiguration();
+
if (caches.containsKey(startCfg.getName())) {
GridCacheAdapter<?, ?> existingCache = caches.get(startCfg.getName());
@@ -2477,7 +2506,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
return true;
}
- if (isLocalAffinity(desc.groupDescriptor().config()))
+ if (isLocalAffinity(desc.cacheConfiguration()))
return true;
ccfg.setNearConfiguration(reqNearCfg);
@@ -2496,7 +2525,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
if (sharedCtx.pageStore() != null && affNode)
initializationProtector.protect(
desc.groupDescriptor().groupId(),
- () -> sharedCtx.pageStore().initializeForCache(desc.groupDescriptor(), desc.toStoredData())
+ () -> sharedCtx.pageStore().initializeForCache(desc.groupDescriptor(), desc.toStoredData(splitter))
);
}
@@ -2634,6 +2663,9 @@ public class GridCacheProcessor extends GridProcessorAdapter {
private GridCacheContext<?, ?> startCacheInRecoveryMode(
DynamicCacheDescriptor desc
) throws IgniteCheckedException {
+ // Only affinity nodes are able to start cache in recovery mode.
+ desc = enricher().enrich(desc, true);
+
CacheConfiguration cfg = desc.cacheConfiguration();
CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(cfg);
@@ -2774,6 +2806,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
AffinityTopologyVersion exchTopVer,
boolean recoveryMode
) throws IgniteCheckedException {
+ desc = enricher().enrich(desc, affNode);
+
CacheConfiguration cfg = new CacheConfiguration(desc.config());
String memPlcName = cfg.getDataRegionName();
@@ -3673,11 +3707,11 @@ public class GridCacheProcessor extends GridProcessorAdapter {
* @throws IgniteCheckedException If failed.
*/
public CacheConfiguration getConfigFromTemplate(String cacheName) throws IgniteCheckedException {
- CacheConfiguration cfgTemplate = null;
+ DynamicCacheDescriptor cfgTemplate = null;
- CacheConfiguration dfltCacheCfg = null;
+ DynamicCacheDescriptor dfltCacheCfg = null;
- List<CacheConfiguration> wildcardNameCfgs = null;
+ List<DynamicCacheDescriptor> wildcardNameCfgs = null;
for (DynamicCacheDescriptor desc : cachesInfo.registeredTemplates().values()) {
assert desc.template();
@@ -3687,7 +3721,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
assert cfg != null;
if (F.eq(cacheName, cfg.getName())) {
- cfgTemplate = cfg;
+ cfgTemplate = desc;
break;
}
@@ -3698,29 +3732,25 @@ public class GridCacheProcessor extends GridProcessorAdapter {
if (wildcardNameCfgs == null)
wildcardNameCfgs = new ArrayList<>();
- wildcardNameCfgs.add(cfg);
+ wildcardNameCfgs.add(desc);
}
else
- dfltCacheCfg = cfg; // Template with name '*'.
+ dfltCacheCfg = desc; // Template with name '*'.
}
}
else if (dfltCacheCfg == null)
- dfltCacheCfg = cfg;
+ dfltCacheCfg = desc;
}
if (cfgTemplate == null && cacheName != null && wildcardNameCfgs != null) {
- Collections.sort(wildcardNameCfgs, new Comparator<CacheConfiguration>() {
- @Override public int compare(CacheConfiguration cfg1, CacheConfiguration cfg2) {
- Integer len1 = cfg1.getName() != null ? cfg1.getName().length() : 0;
- Integer len2 = cfg2.getName() != null ? cfg2.getName().length() : 0;
+ wildcardNameCfgs.sort((a, b) ->
+ Integer.compare(b.cacheConfiguration().getName().length(), a.cacheConfiguration().getName().length()));
- return len2.compareTo(len1);
- }
- });
+ for (DynamicCacheDescriptor desc : wildcardNameCfgs) {
+ String wildcardCacheName = desc.cacheConfiguration().getName();
- for (CacheConfiguration cfg : wildcardNameCfgs) {
- if (cacheName.startsWith(cfg.getName().substring(0, cfg.getName().length() - 1))) {
- cfgTemplate = cfg;
+ if (cacheName.startsWith(wildcardCacheName.substring(0, wildcardCacheName.length() - 1))) {
+ cfgTemplate = desc;
break;
}
@@ -3733,9 +3763,13 @@ public class GridCacheProcessor extends GridProcessorAdapter {
if (cfgTemplate == null)
return null;
- cfgTemplate = cloneCheckSerializable(cfgTemplate);
+ // It's safe to enrich cache configuration here because we requested this cache from current node.
+ CacheConfiguration enrichedTemplate = enricher().enrichFully(
+ cfgTemplate.cacheConfiguration(), cfgTemplate.cacheConfigurationEnrichment());
- CacheConfiguration cfg = new CacheConfiguration(cfgTemplate);
+ enrichedTemplate = cloneCheckSerializable(enrichedTemplate);
+
+ CacheConfiguration cfg = new CacheConfiguration(enrichedTemplate);
cfg.setName(cacheName);
@@ -4271,7 +4305,20 @@ public class GridCacheProcessor extends GridProcessorAdapter {
if (sharedCtx.pageStore() != null && !sharedCtx.kernalContext().clientNode() &&
isPersistentCache(desc.cacheConfiguration(), sharedCtx.gridConfig().getDataStorageConfiguration()))
- sharedCtx.pageStore().storeCacheData(desc.toStoredData(), true);
+ sharedCtx.pageStore().storeCacheData(desc.toStoredData(splitter), true);
+ }
+
+ /**
+ * Save cache configuration to persistent store if necessary.
+ *
+ * @param storedCacheData Stored cache data.
+ */
+ public void saveCacheConfiguration(StoredCacheData storedCacheData) throws IgniteCheckedException {
+ assert storedCacheData != null;
+
+ if (sharedCtx.pageStore() != null && !sharedCtx.kernalContext().clientNode() &&
+ isPersistentCache(storedCacheData.config(), sharedCtx.gridConfig().getDataStorageConfiguration()))
+ sharedCtx.pageStore().storeCacheData(storedCacheData, true);
}
/**
@@ -5521,7 +5568,12 @@ public class GridCacheProcessor extends GridProcessorAdapter {
req.clientStartOnly(true);
req.deploymentId(desc.deploymentId());
- req.startCacheConfiguration(descCfg);
+
+ T2<CacheConfiguration, CacheConfigurationEnrichment> splitCfg = backwardCompatibleSplitter().split(desc);
+
+ req.startCacheConfiguration(splitCfg.get1());
+ req.cacheConfigurationEnrichment(splitCfg.get2());
+
req.schema(desc.schema());
}
}
@@ -5530,11 +5582,16 @@ public class GridCacheProcessor extends GridProcessorAdapter {
req.deploymentId(IgniteUuid.randomUuid());
- req.startCacheConfiguration(cfg);
+ T2<CacheConfiguration, CacheConfigurationEnrichment> splitCfg = backwardCompatibleSplitter().split(cfg);
+
+ req.startCacheConfiguration(splitCfg.get1());
+ req.cacheConfigurationEnrichment(splitCfg.get2());
+
+ cfg = splitCfg.get1();
CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(cfg);
- initialize(cfg, cacheObjCtx);
+ initialize(req.startCacheConfiguration(), cacheObjCtx);
if (restartId != null)
req.schema(new QuerySchema(qryEntities == null ? cfg.getQueryEntities() : qryEntities));
@@ -5559,7 +5616,12 @@ public class GridCacheProcessor extends GridProcessorAdapter {
}
req.deploymentId(desc.deploymentId());
- req.startCacheConfiguration(ccfg);
+
+ T2<CacheConfiguration, CacheConfigurationEnrichment> splitCfg = backwardCompatibleSplitter().split(ccfg);
+
+ req.startCacheConfiguration(splitCfg.get1());
+ req.cacheConfigurationEnrichment(splitCfg.get2());
+
req.schema(desc.schema());
}
@@ -5717,6 +5779,43 @@ public class GridCacheProcessor extends GridProcessorAdapter {
}
/**
+ * @param oldFormat Old format.
+ */
+ public CacheConfigurationSplitter splitter(boolean oldFormat) {
+ // Requesting splitter with old format support is rare operation.
+ // It's acceptable to allocate it every time by request.
+ return oldFormat ? new CacheConfigurationSplitterOldFormat(enricher) : splitter;
+ }
+
+ /**
+ * @return By default it returns splitter without old format configuration support.
+ */
+ public CacheConfigurationSplitter splitter() {
+ return splitter(false);
+ }
+
+ /**
+ * If not all nodes in cluster support splitted cache configurations it returns old format splitter.
+ * In other case it returns default splitter.
+ *
+ * @return Cache configuration splitter with or without old format support depending on cluster state.
+ */
+ public CacheConfigurationSplitter backwardCompatibleSplitter() {
+ IgniteDiscoverySpi spi = (IgniteDiscoverySpi) ctx.discovery().getInjectedDiscoverySpi();
+
+ boolean oldFormat = !spi.allNodesSupport(IgniteFeatures.SPLITTED_CACHE_CONFIGURATIONS);
+
+ return splitter(oldFormat);
+ }
+
+ /**
+ * @return Cache configuration enricher.
+ */
+ public CacheConfigurationEnricher enricher() {
+ return enricher;
+ }
+
+ /**
* Recovery lifecycle for caches.
*/
private class CacheRecoveryLifecycle implements MetastorageLifecycleListener, DatabaseLifecycleListener {
@@ -5753,7 +5852,16 @@ public class GridCacheProcessor extends GridProcessorAdapter {
@Override public void afterBinaryMemoryRestore(
IgniteCacheDatabaseSharedManager mgr,
GridCacheDatabaseSharedManager.RestoreBinaryState restoreState) throws IgniteCheckedException {
+
+ Object consistentId = ctx.pdsFolderResolver().resolveFolders().consistentId();
+ DetachedClusterNode clusterNode = new DetachedClusterNode(consistentId, ctx.nodeAttributes());
+
for (DynamicCacheDescriptor cacheDescriptor : persistentCaches()) {
+ boolean affinityNode = CU.affinityNode(clusterNode, cacheDescriptor.cacheConfiguration().getNodeFilter());
+
+ if (!affinityNode)
+ continue;
+
startCacheInRecoveryMode(cacheDescriptor);
querySchemas.put(cacheDescriptor.cacheId(), cacheDescriptor.schema().copy());
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/StoredCacheData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/StoredCacheData.java
index 7b8e534..40f24e0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/StoredCacheData.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/StoredCacheData.java
@@ -23,6 +23,7 @@ import org.apache.ignite.cache.QueryEntity;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.marshaller.jdk.JdkMarshaller;
@@ -48,6 +49,9 @@ public class StoredCacheData implements Serializable {
/** SQL flag - {@code true} if cache was created with {@code CREATE TABLE}. */
private boolean sql;
+ /** */
+ private CacheConfigurationEnrichment cacheConfigurationEnrichment;
+
/**
* Constructor.
*
@@ -61,6 +65,16 @@ public class StoredCacheData implements Serializable {
}
/**
+ * @param cacheData Cache data.
+ */
+ public StoredCacheData(StoredCacheData cacheData) {
+ this.ccfg = cacheData.ccfg;
+ this.qryEntities = cacheData.qryEntities;
+ this.sql = cacheData.sql;
+ this.cacheConfigurationEnrichment = cacheData.cacheConfigurationEnrichment;
+ }
+
+ /**
* @param ccfg Cache configuration.
*/
public void config(CacheConfiguration<?, ?> ccfg) {
@@ -104,6 +118,58 @@ public class StoredCacheData implements Serializable {
return this;
}
+ /**
+ * @param ccfgEnrichment Ccfg enrichment.
+ */
+ public StoredCacheData cacheConfigurationEnrichment(CacheConfigurationEnrichment ccfgEnrichment) {
+ this.cacheConfigurationEnrichment = ccfgEnrichment;
+
+ return this;
+ }
+
+ /**
+ *
+ */
+ public CacheConfigurationEnrichment cacheConfigurationEnrichment() {
+ return cacheConfigurationEnrichment;
+ }
+
+ /**
+ *
+ */
+ public boolean hasOldCacheConfigurationFormat() {
+ return cacheConfigurationEnrichment == null;
+ }
+
+ /**
+ *
+ */
+ public StoredCacheData withSplittedCacheConfig(CacheConfigurationSplitter splitter) {
+ if (cacheConfigurationEnrichment != null)
+ return this;
+
+ T2<CacheConfiguration, CacheConfigurationEnrichment> splitCfg = splitter.split(ccfg);
+
+ ccfg = splitCfg.get1();
+ cacheConfigurationEnrichment = splitCfg.get2();
+
+ return this;
+ }
+
+ /**
+ *
+ */
+ public StoredCacheData withOldCacheConfig(CacheConfigurationEnricher enricher) {
+ if (cacheConfigurationEnrichment == null)
+ return this;
+
+ ccfg = enricher.enrichFully(ccfg, cacheConfigurationEnrichment);
+
+ cacheConfigurationEnrichment = null;
+
+ return this;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(StoredCacheData.class, this);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
index 36f80d1..23901cc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
@@ -29,6 +29,7 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteCompute;
@@ -40,11 +41,13 @@ import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteFeatures;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cluster.ClusterGroupAdapter;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.cluster.DistributedBaselineConfiguration;
import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -926,8 +929,20 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
try {
Map<String, StoredCacheData> cfgs = ctx.cache().context().pageStore().readCacheConfigurations();
- if (!F.isEmpty(cfgs))
+ if (!F.isEmpty(cfgs)) {
storedCfgs = new ArrayList<>(cfgs.values());
+
+ IgniteDiscoverySpi spi = (IgniteDiscoverySpi) ctx.discovery().getInjectedDiscoverySpi();
+
+ boolean splittedCacheCfgs = spi.allNodesSupport(IgniteFeatures.SPLITTED_CACHE_CONFIGURATIONS);
+
+ storedCfgs = storedCfgs.stream()
+ .map(storedCacheData -> splittedCacheCfgs
+ ? storedCacheData.withSplittedCacheConfig(ctx.cache().splitter())
+ : storedCacheData.withOldCacheConfig(ctx.cache().enricher())
+ )
+ .collect(Collectors.toList());
+ }
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to read stored cache configurations: " + e, e);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheComparatorTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheComparatorTest.java
index 94b3a33..2b6e170 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheComparatorTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheComparatorTest.java
@@ -35,12 +35,12 @@ public class CacheComparatorTest {
DynamicCacheDescriptor desc1 = new DynamicCacheDescriptor(null,
new CacheConfiguration().setName("1111"), CacheType.DATA_STRUCTURES,
null, true, null, true,
- false, null, new QuerySchema());
+ false, null, new QuerySchema(), null);
DynamicCacheDescriptor desc2 = new DynamicCacheDescriptor(null,
new CacheConfiguration().setName("2222"), CacheType.INTERNAL,
null, true, null, true,
- false, null, new QuerySchema());
+ false, null, new QuerySchema(), null);
assertEquals(-1,
ClusterCachesInfo.CacheComparators.DIRECT.compare(desc1, desc2));
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheConfigurationSerializationOnDiscoveryTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheConfigurationSerializationOnDiscoveryTest.java
new file mode 100644
index 0000000..371681c
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheConfigurationSerializationOnDiscoveryTest.java
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.ArrayList;
+import java.util.List;
+import javax.cache.configuration.FactoryBuilder;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ * Test suite to check that user-defined parameters (marked as {@link org.apache.ignite.configuration.SerializeSeparately})
+ * for static cache configurations are not explicitly deserialized on non-affinity nodes.
+ */
+@RunWith(Parameterized.class)
+public class CacheConfigurationSerializationOnDiscoveryTest extends GridCommonAbstractTest {
+ /** */
+ @Parameterized.Parameters(name = "Persistence enabled = {0}")
+ public static List<Object[]> parameters() {
+ ArrayList<Object[]> params = new ArrayList<>();
+
+ params.add(new Object[]{false});
+ params.add(new Object[]{true});
+
+ return params;
+ }
+
+ /** Client mode. */
+ private boolean clientMode;
+
+ /** Caches. */
+ private CacheConfiguration[] caches;
+
+ /** Persistence enabled. */
+ @Parameterized.Parameter
+ public boolean persistenceEnabled;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ cfg.setConsistentId(igniteInstanceName);
+
+ cfg.setClientMode(clientMode);
+
+ if (caches != null)
+ cfg.setCacheConfiguration(caches);
+
+ if (persistenceEnabled)
+ cfg.setDataStorageConfiguration(new DataStorageConfiguration()
+ .setDefaultDataRegionConfiguration(
+ new DataRegionConfiguration().setPersistenceEnabled(true).setMaxSize(256 * 1024 * 1024))
+ );
+
+ return cfg;
+ }
+
+ /**
+ *
+ */
+ @Before
+ public void before() throws Exception {
+ stopAllGrids();
+
+ cleanPersistenceDir();
+ }
+
+ /**
+ *
+ */
+ @After
+ public void after() throws Exception {
+ stopAllGrids();
+
+ cleanPersistenceDir();
+ }
+
+ /**
+ * Creates configuration for cache which affinity belongs only to given node index.
+ *
+ * @param nodeIdx Node index.
+ * @return Cache configuration.
+ */
+ private CacheConfiguration onlyOnNode(int nodeIdx) {
+ return new CacheConfiguration("cache-" + getTestIgniteInstanceName(nodeIdx))
+ .setNodeFilter(new OnlyOneNodeFilter(getTestIgniteInstanceName(nodeIdx)))
+ .setCacheStoreFactory(FactoryBuilder.factoryOf(GridCacheTestStore.class));
+ }
+
+ /**
+ *
+ */
+ @Test
+ public void testSerializationForCachesConfiguredOnCoordinator() throws Exception {
+ caches = new CacheConfiguration[] {onlyOnNode(0), onlyOnNode(1), onlyOnNode(2)};
+
+ IgniteEx crd = startGrid(0);
+
+ caches = null;
+
+ startGridsMultiThreaded(1, 2);
+
+ if (persistenceEnabled)
+ crd.cluster().active(true);
+
+ awaitPartitionMapExchange();
+
+ for (Ignite node : G.allGrids())
+ checkCaches((IgniteEx) node);
+
+ if (persistenceEnabled)
+ restartNodesAndCheck();
+ }
+
+ /**
+ *
+ */
+ @Test
+ public void testSerializationForCachesConfiguredOnDifferentNodes1() throws Exception {
+ IgniteEx crd = startGrid(0);
+
+ caches = new CacheConfiguration[] {onlyOnNode(0), onlyOnNode(1)};
+
+ startGrid(1);
+
+ caches = new CacheConfiguration[] {onlyOnNode(2)};
+
+ startGrid(2);
+
+ caches = null;
+
+ if (persistenceEnabled)
+ crd.cluster().active(true);
+
+ awaitPartitionMapExchange();
+
+ for (Ignite node : G.allGrids())
+ checkCaches((IgniteEx) node);
+
+ if (persistenceEnabled)
+ restartNodesAndCheck();
+ }
+
+ /**
+ *
+ */
+ @Test
+ public void testSerializationForCachesConfiguredOnDifferentNodes2() throws Exception {
+ caches = new CacheConfiguration[] {onlyOnNode(0)};
+
+ IgniteEx crd = startGrid(0);
+
+ caches = new CacheConfiguration[] {onlyOnNode(1)};
+
+ startGrid(1);
+
+ caches = new CacheConfiguration[] {onlyOnNode(2)};
+
+ startGrid(2);
+
+ caches = null;
+
+ if (persistenceEnabled)
+ crd.cluster().active(true);
+
+ awaitPartitionMapExchange();
+
+ for (Ignite node : G.allGrids())
+ checkCaches((IgniteEx) node);
+
+ if (persistenceEnabled)
+ restartNodesAndCheck();
+ }
+
+ /**
+ *
+ */
+ @Test
+ public void testSerializationForCachesConfiguredOnDifferentNodes3() throws Exception {
+ caches = new CacheConfiguration[] {onlyOnNode(1)};
+
+ IgniteEx crd = startGrid(0);
+
+ caches = new CacheConfiguration[] {onlyOnNode(2)};
+
+ startGrid(1);
+
+ caches = new CacheConfiguration[] {onlyOnNode(0)};
+
+ startGrid(2);
+
+ caches = null;
+
+ if (persistenceEnabled)
+ crd.cluster().active(true);
+
+ awaitPartitionMapExchange();
+
+ for (Ignite node : G.allGrids())
+ checkCaches((IgniteEx) node);
+
+ if (persistenceEnabled)
+ restartNodesAndCheck();
+ }
+
+ /**
+ *
+ */
+ @Test
+ public void testSerializationForCachesOnClientNode() throws Exception {
+ startGrid(0);
+
+ caches = new CacheConfiguration[] {onlyOnNode(0), onlyOnNode(1)};
+
+ startGrid(1);
+
+ caches = new CacheConfiguration[] {onlyOnNode(2)};
+
+ startGrid(2);
+
+ caches = null;
+ clientMode = true;
+
+ IgniteEx clnt = startGrid(3);
+
+ clientMode = false;
+
+ if (persistenceEnabled)
+ clnt.cluster().active(true);
+
+ awaitPartitionMapExchange();
+
+ for (Ignite node : G.allGrids())
+ checkCaches((IgniteEx) node);
+
+ if (persistenceEnabled)
+ restartNodesAndCheck();
+ }
+
+ /**
+ * Restart nodes and check caches.
+ */
+ private void restartNodesAndCheck() throws Exception {
+ stopAllGrids();
+
+ startGridsMultiThreaded(3);
+
+ awaitPartitionMapExchange();
+
+ for (Ignite node : G.allGrids())
+ checkCaches((IgniteEx) node);
+ }
+
+ /**
+ * @param node Node.
+ */
+ private void checkCaches(IgniteEx node) {
+ ClusterNode clusterNode = node.localNode();
+ GridCacheProcessor cacheProcessor = node.context().cache();
+
+ for (DynamicCacheDescriptor cacheDesc : cacheProcessor.cacheDescriptors().values()) {
+ if (CU.isUtilityCache(cacheDesc.cacheName()))
+ continue;
+
+ boolean affinityNode = CU.affinityNode(clusterNode, cacheDesc.cacheConfiguration().getNodeFilter());
+
+ IgniteInternalCache cache = cacheProcessor.cache(cacheDesc.cacheName());
+
+ if (affinityNode) {
+ Assert.assertTrue("Cache is not started " + cacheDesc.cacheName() + ", node " + node.name(), cache != null);
+
+ CacheConfiguration ccfg = cache.configuration();
+
+ Assert.assertTrue("Cache store factory is null " + cacheDesc.cacheName() + ", node " + node.name(), ccfg.getCacheStoreFactory() != null);
+ }
+ else {
+ Assert.assertTrue("Cache is started " + cacheDesc.cacheName() + ", node " + node.name(), cache == null || !cache.context().affinityNode());
+
+ if (cache == null) {
+ Assert.assertTrue("Cache configuration is enriched " + cacheDesc.cacheName() + ", node " + node.name(), !cacheDesc.isConfigurationEnriched());
+ Assert.assertTrue("Cache store factory is not null " + cacheDesc.cacheName() + ", node " + node.name(), cacheDesc.cacheConfiguration().getCacheStoreFactory() == null);
+ }
+ }
+ }
+ }
+
+ /**
+ *
+ */
+ private static class OnlyOneNodeFilter implements IgnitePredicate<ClusterNode> {
+ /** Consistent id. */
+ private final String consistentId;
+
+ /**
+ * @param consistentId Consistent id.
+ */
+ private OnlyOneNodeFilter(String consistentId) {
+ this.consistentId = consistentId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean apply(ClusterNode node) {
+ return node.consistentId().equals(consistentId);
+ }
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheConfigurationSerializationOnExchangeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheConfigurationSerializationOnExchangeTest.java
new file mode 100644
index 0000000..4b782fc
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheConfigurationSerializationOnExchangeTest.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.ArrayList;
+import java.util.List;
+import javax.cache.configuration.FactoryBuilder;
+import com.google.common.collect.Lists;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ * Test suite to check that user-defined parameters (marked as {@link org.apache.ignite.configuration.SerializeSeparately})
+ * for dynamic cache configurations are not explicitly deserialized on non-affinity nodes.
+ */
+@RunWith(Parameterized.class)
+public class CacheConfigurationSerializationOnExchangeTest extends GridCommonAbstractTest {
+ /** */
+ @Parameterized.Parameters(name = "Persistence enabled = {0}")
+ public static List<Object[]> parameters() {
+ ArrayList<Object[]> params = new ArrayList<>();
+
+ params.add(new Object[]{false});
+ params.add(new Object[]{true});
+
+ return params;
+ }
+
+ /** Client mode. */
+ private boolean clientMode;
+
+ /** Persistence enabled. */
+ @Parameterized.Parameter
+ public boolean persistenceEnabled;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ cfg.setConsistentId(igniteInstanceName);
+
+ cfg.setClientMode(clientMode);
+
+ if (persistenceEnabled)
+ cfg.setDataStorageConfiguration(new DataStorageConfiguration()
+ .setDefaultDataRegionConfiguration(
+ new DataRegionConfiguration().setPersistenceEnabled(true).setMaxSize(256 * 1024 * 1024))
+ );
+
+ return cfg;
+ }
+
+ /**
+ *
+ */
+ @Before
+ public void before() throws Exception {
+ stopAllGrids();
+
+ cleanPersistenceDir();
+ }
+
+ /**
+ *
+ */
+ @After
+ public void after() throws Exception {
+ stopAllGrids();
+
+ cleanPersistenceDir();
+ }
+
+ /**
+ * Creates configuration for cache which affinity belongs only to given node index.
+ *
+ * @param nodeIdx Node index.
+ * @return Cache configuration.
+ */
+ private CacheConfiguration<?, ?> onlyOnNode(int nodeIdx) {
+ return new CacheConfiguration("cache-" + getTestIgniteInstanceName(nodeIdx))
+ .setNodeFilter(new OnlyOneNodeFilter(getTestIgniteInstanceName(nodeIdx)))
+ .setCacheStoreFactory(FactoryBuilder.factoryOf(GridCacheTestStore.class));
+ }
+
+ /**
+ *
+ */
+ @Test
+ public void testSerializationForDynamicCacheStartedOnCoordinator() throws Exception {
+ IgniteEx crd = (IgniteEx) startGridsMultiThreaded(3);
+
+ if (persistenceEnabled)
+ crd.cluster().active(true);
+
+ clientMode = true;
+
+ startGrid(3);
+
+ crd.getOrCreateCaches(Lists.newArrayList(
+ onlyOnNode(0),
+ onlyOnNode(1),
+ onlyOnNode(2)
+ ));
+
+ awaitPartitionMapExchange();
+
+ for (Ignite node : G.allGrids())
+ checkCaches((IgniteEx) node);
+
+ if (persistenceEnabled)
+ restartNodesAndCheck();
+ }
+
+ /**
+ *
+ */
+ @Test
+ public void testSerializationForDynamicCacheStartedOnOtherNode() throws Exception {
+ startGridsMultiThreaded(2);
+
+ IgniteEx otherNode = startGrid(2);
+
+ if (persistenceEnabled)
+ otherNode.cluster().active(true);
+
+ clientMode = true;
+
+ startGrid(3);
+
+ otherNode.getOrCreateCaches(Lists.newArrayList(
+ onlyOnNode(0),
+ onlyOnNode(1),
+ onlyOnNode(2)
+ ));
+
+ awaitPartitionMapExchange();
+
+ for (Ignite node : G.allGrids())
+ checkCaches((IgniteEx) node);
+
+ if (persistenceEnabled)
+ restartNodesAndCheck();
+ }
+
+ /**
+ *
+ */
+ @Test
+ public void testSerializationForDynamicCacheStartedOnClientNode() throws Exception {
+ IgniteEx crd = (IgniteEx) startGridsMultiThreaded(3);
+
+ if (persistenceEnabled)
+ crd.cluster().active(true);
+
+ clientMode = true;
+
+ IgniteEx clientNode = startGrid(3);
+
+ clientNode.getOrCreateCaches(Lists.newArrayList(
+ onlyOnNode(0),
+ onlyOnNode(1),
+ onlyOnNode(2)
+ ));
+
+ awaitPartitionMapExchange();
+
+ for (Ignite node : G.allGrids())
+ checkCaches((IgniteEx) node);
+
+ if (persistenceEnabled)
+ restartNodesAndCheck();
+ }
+
+ /**
+ * Restart nodes and check caches.
+ */
+ private void restartNodesAndCheck() throws Exception {
+ clientMode = false;
+
+ stopAllGrids();
+
+ startGridsMultiThreaded(3);
+
+ clientMode = true;
+
+ startGrid(3);
+
+ awaitPartitionMapExchange();
+
+ for (Ignite node : G.allGrids())
+ checkCaches((IgniteEx) node);
+ }
+
+ /**
+ * @param node Node.
+ */
+ private void checkCaches(IgniteEx node) {
+ ClusterNode clusterNode = node.localNode();
+ GridCacheProcessor cacheProcessor = node.context().cache();
+
+ for (DynamicCacheDescriptor cacheDesc : cacheProcessor.cacheDescriptors().values()) {
+ if (CU.isUtilityCache(cacheDesc.cacheName()))
+ continue;
+
+ boolean affinityNode = CU.affinityNode(clusterNode, cacheDesc.cacheConfiguration().getNodeFilter());
+
+ IgniteInternalCache cache = cacheProcessor.cache(cacheDesc.cacheName());
+
+ if (affinityNode) {
+ Assert.assertTrue("Cache is not started " + cacheDesc.cacheName() + ", node " + node.name(), cache != null);
+
+ CacheConfiguration ccfg = cache.configuration();
+
+ Assert.assertTrue("Cache store factory is null " + cacheDesc.cacheName() + ", node " + node.name(), ccfg.getCacheStoreFactory() != null);
+ }
+ else {
+ Assert.assertTrue("Cache is started " + cacheDesc.cacheName() + ", node " + node.name(), cache == null || !cache.context().affinityNode());
+
+ if (cache == null) {
+ Assert.assertTrue("Cache configuration is enriched " + cacheDesc.cacheName() + ", node " + node.name(), !cacheDesc.isConfigurationEnriched());
+ Assert.assertTrue("Cache store factory is not null " + cacheDesc.cacheName() + ", node " + node.name(), cacheDesc.cacheConfiguration().getCacheStoreFactory() == null);
+ }
+ }
+ }
+ }
+
+ /**
+ *
+ */
+ private static class OnlyOneNodeFilter implements IgnitePredicate<ClusterNode> {
+ /** Consistent id. */
+ private final String consistentId;
+
+ /**
+ * @param consistentId Consistent id.
+ */
+ private OnlyOneNodeFilter(String consistentId) {
+ this.consistentId = consistentId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean apply(ClusterNode node) {
+ return node.consistentId().equals(consistentId);
+ }
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheStoreUsageMultinodeAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheStoreUsageMultinodeAbstractTest.java
index ff4fb75..25a9cc6 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheStoreUsageMultinodeAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheStoreUsageMultinodeAbstractTest.java
@@ -144,7 +144,8 @@ public abstract class CacheStoreUsageMultinodeAbstractTest extends GridCommonAbs
IgniteCache<Object, Object> clientCache = client.cache(DEFAULT_CACHE_NAME);
assertTrue(((IgniteCacheProxy)cache0).context().store().configured());
- assertEquals(clientStore, ((IgniteCacheProxy) clientCache).context().store().configured());
+ if (atomicityMode() != ATOMIC)
+ assertEquals(clientStore, ((IgniteCacheProxy) clientCache).context().store().configured());
List<TransactionConcurrency> tcList = new ArrayList<>();
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
index a1ddcf8..8de3127 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
@@ -17,8 +17,6 @@
package org.apache.ignite.internal.processors.cache;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Sets;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
@@ -49,6 +47,8 @@ import javax.cache.processor.EntryProcessor;
import javax.cache.processor.EntryProcessorException;
import javax.cache.processor.EntryProcessorResult;
import javax.cache.processor.MutableEntry;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
@@ -4175,7 +4175,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
*/
@Test
public void testEvictExpired() throws Exception {
- final IgniteCache<String, Integer> cache = jcache();
+ final IgniteCache<String, Integer> cache = jcache(0);
final String key = primaryKeysForCache(cache, 1).get(0);
@@ -4556,7 +4556,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
/**
* @throws Exception In case of error.
*/
- @Test
+ @Test(timeout = 10050000)
public void testLocalEvict() throws Exception {
IgniteCache<String, Integer> cache = jcache();
@@ -4586,7 +4586,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
for (int i = 0; i < gridCount(); i++) {
if (aff.isPrimaryOrBackup(grid(i).cluster().localNode(), key1))
- assertEquals((Integer)1, peek(jcache(i), key1));
+ assertEquals("node name = " + grid(i).name(), (Integer)1, peek(jcache(i), key1));
if (aff.isPrimaryOrBackup(grid(i).cluster().localNode(), key2))
assertEquals((Integer)2, peek(jcache(i), key2));
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheConfigurationFileConsistencyCheckTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheConfigurationFileConsistencyCheckTest.java
index f0bf10f..7272ab3 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheConfigurationFileConsistencyCheckTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheConfigurationFileConsistencyCheckTest.java
@@ -195,7 +195,7 @@ public class IgnitePdsCacheConfigurationFileConsistencyCheckTest extends GridCom
FilePageStoreManager pageStore = (FilePageStoreManager) sharedCtx.pageStore();
- StoredCacheData corrData = cacheDescr.toStoredData();
+ StoredCacheData corrData = cacheDescr.toStoredData(ig.context().cache().splitter());
corrData.config().setGroupName(ODD_GROUP_NAME);
@@ -219,7 +219,7 @@ public class IgnitePdsCacheConfigurationFileConsistencyCheckTest extends GridCom
FilePageStoreManager pageStore = (FilePageStoreManager) sharedCtx.pageStore();
- StoredCacheData data = cacheDescr.toStoredData();
+ StoredCacheData data = cacheDescr.toStoredData(ig.context().cache().splitter());
data.config().setGroupName(ODD_GROUP_NAME);
@@ -245,7 +245,7 @@ public class IgnitePdsCacheConfigurationFileConsistencyCheckTest extends GridCom
FilePageStoreManager pageStore = (FilePageStoreManager) sharedCtx.pageStore();
- StoredCacheData data = cacheDescr.toStoredData();
+ StoredCacheData data = cacheDescr.toStoredData(ig.context().cache().splitter());
data.config().setGroupName(ODD_GROUP_NAME);
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
index 8c059b2..249ecc8 100755
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
@@ -1789,6 +1789,7 @@ public abstract class GridAbstractTest extends JUnit3TestLegacySupport {
U.clearClassCache();
MarshallerExclusions.clearCache();
BinaryEnumCache.clear();
+ serializedObj.clear();
if (err!= null)
throw err;
@@ -2148,7 +2149,8 @@ public abstract class GridAbstractTest extends JUnit3TestLegacySupport {
afterTest();
}
finally {
- serializedObj.clear();
+ if (!keepSerializedObjects())
+ serializedObj.clear();
Thread.currentThread().setContextClassLoader(clsLdr);
@@ -2159,6 +2161,16 @@ public abstract class GridAbstractTest extends JUnit3TestLegacySupport {
}
/**
+ * @return If {@code true} serialized objects placed to {@link #serializedObj}
+ * are not cleared after each test execution.
+ *
+ * Setting this flag to true is need when some serialized objects are need to be shared between all tests in class.
+ */
+ protected boolean keepSerializedObjects() {
+ return false;
+ }
+
+ /**
* @return Error handler to process all uncaught exceptions of the test run ({@code null} by default).
*/
protected IgniteClosure<Throwable, Throwable> errorHandler() {
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite7.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite7.java
index 0f6cdbc..11177df 100755
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite7.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite7.java
@@ -26,6 +26,8 @@ import org.apache.ignite.internal.processors.authentication.AuthenticationOnNotA
import org.apache.ignite.internal.processors.authentication.AuthenticationProcessorNPEOnStartTest;
import org.apache.ignite.internal.processors.authentication.AuthenticationProcessorNodeRestartTest;
import org.apache.ignite.internal.processors.authentication.AuthenticationProcessorSelfTest;
+import org.apache.ignite.internal.processors.cache.CacheConfigurationSerializationOnDiscoveryTest;
+import org.apache.ignite.internal.processors.cache.CacheConfigurationSerializationOnExchangeTest;
import org.apache.ignite.internal.processors.cache.CacheDataRegionConfigurationTest;
import org.apache.ignite.internal.processors.cache.CacheGroupMetricsMBeanTest;
import org.apache.ignite.internal.processors.cache.CacheMetricsManageTest;
@@ -104,6 +106,9 @@ public class IgniteCacheTestSuite7 {
GridTestUtils.addTestIfNeeded(suite, TransactionIntegrityWithPrimaryIndexCorruptionTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, CacheDataLossOnPartitionMoveTest.class, ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite, CacheConfigurationSerializationOnDiscoveryTest.class, ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite, CacheConfigurationSerializationOnExchangeTest.class, ignoredTests);
+
return suite;
}
}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlNotNullConstraintTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlNotNullConstraintTest.java
index a419309..3b48222 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlNotNullConstraintTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlNotNullConstraintTest.java
@@ -238,6 +238,13 @@ public class IgniteSqlNotNullConstraintTest extends AbstractIndexingCommonTest {
cleanup();
}
+ /**
+ * We need to keep serialized cache store factories to share them between tests.
+ */
+ @Override protected boolean keepSerializedObjects() {
+ return true;
+ }
+
/** */
@Test
public void testQueryEntityGetSetNotNullFields() throws Exception {
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreSessionTestSharedFactory.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreSessionTestSharedFactory.cs
index 2af5915..3a47139 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreSessionTestSharedFactory.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreSessionTestSharedFactory.cs
@@ -41,7 +41,7 @@ namespace Apache.Ignite.Core.Tests.Cache.Store
get
{
// Shared PlatformDotNetCacheStoreFactory results in a single store instance.
- return 1;
+ return 2;
}
}
}
diff --git a/modules/spring/src/test/java/org/apache/ignite/spring/injection/IgniteSpringBeanSpringResourceInjectionTest.java b/modules/spring/src/test/java/org/apache/ignite/spring/injection/IgniteSpringBeanSpringResourceInjectionTest.java
index a7d65a1..a2c27fd 100644
--- a/modules/spring/src/test/java/org/apache/ignite/spring/injection/IgniteSpringBeanSpringResourceInjectionTest.java
+++ b/modules/spring/src/test/java/org/apache/ignite/spring/injection/IgniteSpringBeanSpringResourceInjectionTest.java
@@ -28,6 +28,8 @@ import javax.cache.integration.CacheLoaderException;
import javax.cache.integration.CacheWriterException;
import org.apache.ignite.Ignite;
import org.apache.ignite.cache.store.CacheStoreAdapter;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.resources.SpringResource;
import org.apache.ignite.services.Service;
import org.apache.ignite.services.ServiceContext;
@@ -191,8 +193,8 @@ public class IgniteSpringBeanSpringResourceInjectionTest extends GridCommonAbstr
new TestSpringResourceInjectedRunnable(SPRING_CFG_LOCATION, BEAN_TO_INJECT_NAME) {
/** {@inheritDoc} */
@Override Integer getInjectedBean() {
- IgniteCacheStoreWithSpringResource cacheStore =
- appCtx.getBean(IgniteCacheStoreWithSpringResource.class);
+ IgniteCacheStoreWithSpringResource cacheStore = (IgniteCacheStoreWithSpringResource)
+ ((IgniteEx) G.allGrids().get(0)).cachex("cache1").context().store().store();
return cacheStore.getInjectedSpringField();
}