You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2018/10/17 12:27:33 UTC
ignite git commit: IGNITE-9830 Fixed race in binary metadata
registration leading to exception on commit - Fixes #4996.
Repository: ignite
Updated Branches:
refs/heads/master d829b67e9 -> a1cb021c0
IGNITE-9830 Fixed race in binary metadata registration leading to exception on commit - Fixes #4996.
Signed-off-by: Alexey Goncharuk <al...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a1cb021c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a1cb021c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a1cb021c
Branch: refs/heads/master
Commit: a1cb021c06ffce5da460a8cd4ffe71de2c350b54
Parents: d829b67
Author: Aleksei Scherbakov <al...@gmail.com>
Authored: Wed Oct 17 13:28:20 2018 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Wed Oct 17 14:52:12 2018 +0300
----------------------------------------------------------------------
.../apache/ignite/IgniteSystemProperties.java | 5 +
.../internal/binary/BinaryReaderExImpl.java | 1 -
.../internal/binary/BinarySchemaRegistry.java | 114 +++--
.../ignite/internal/binary/BinaryUtils.java | 28 +-
.../cache/binary/BinaryMetadataTransport.java | 93 +++-
.../binary/CacheObjectBinaryProcessorImpl.java | 219 ++++++++-
.../spi/discovery/tcp/TcpDiscoverySpi.java | 5 +-
.../CachePageWriteLockUnlockTest.java | 2 +
.../transactions/TxRollbackOnTimeoutTest.java | 7 +-
...MetadataConcurrentUpdateWithIndexesTest.java | 439 +++++++++++++++++++
.../IgniteBinaryCacheQueryTestSuite.java | 3 +
11 files changed, 827 insertions(+), 89 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/a1cb021c/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index 6d48adf..02ebb25 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -1008,6 +1008,11 @@ public final class IgniteSystemProperties {
public static final String IGNITE_REUSE_MEMORY_ON_DEACTIVATE = "IGNITE_REUSE_MEMORY_ON_DEACTIVATE";
/**
+ * Timeout for waiting schema update if schema was not found for last accepted version.
+ */
+ public static final String IGNITE_WAIT_SCHEMA_UPDATE = "IGNITE_WAIT_SCHEMA_UPDATE";
+
+ /**
* System property to override {@link CacheConfiguration#rebalanceThrottle} configuration property for all caches.
* {@code 0} by default, which means that override is disabled.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/a1cb021c/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryReaderExImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryReaderExImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryReaderExImpl.java
index 38934f0..601141c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryReaderExImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryReaderExImpl.java
@@ -2028,7 +2028,6 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Bina
for (BinarySchema existingSchema : existingSchemas)
existingSchemaIds.add(existingSchema.schemaId());
-
throw new BinaryObjectException("Cannot find schema for object with compact footer" +
" [typeName=" + type.typeName() +
", typeId=" + typeId +
http://git-wip-us.apache.org/repos/asf/ignite/blob/a1cb021c/modules/core/src/main/java/org/apache/ignite/internal/binary/BinarySchemaRegistry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinarySchemaRegistry.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinarySchemaRegistry.java
index 91f29b2..f22fc4c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinarySchemaRegistry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinarySchemaRegistry.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.binary;
+import java.util.ArrayList;
+import java.util.List;
import org.jetbrains.annotations.Nullable;
import java.util.HashMap;
@@ -98,75 +100,95 @@ public class BinarySchemaRegistry {
* @param schemaId Schema ID.
* @param schema Schema.
*/
- public void addSchema(int schemaId, BinarySchema schema) {
- synchronized (this) {
- if (inline) {
- // Check if this is already known schema.
- if (schemaId == schemaId1 || schemaId == schemaId2 || schemaId == schemaId3 || schemaId == schemaId4)
- return;
+ public synchronized void addSchema(int schemaId, BinarySchema schema) {
+ if (inline) {
+ // Check if this is already known schema.
+ if (schemaId == schemaId1 || schemaId == schemaId2 || schemaId == schemaId3 || schemaId == schemaId4)
+ return;
- // Try positioning new schema in inline mode.
- if (schemaId1 == EMPTY) {
- schemaId1 = schemaId;
+ // Try positioning new schema in inline mode.
+ if (schemaId1 == EMPTY) {
+ schemaId1 = schemaId;
- schema1 = schema;
+ schema1 = schema;
- inline = true; // Forcing HB edge just in case.
+ inline = true; // Forcing HB edge just in case.
- return;
- }
+ return;
+ }
- if (schemaId2 == EMPTY) {
- schemaId2 = schemaId;
+ if (schemaId2 == EMPTY) {
+ schemaId2 = schemaId;
- schema2 = schema;
+ schema2 = schema;
- inline = true; // Forcing HB edge just in case.
+ inline = true; // Forcing HB edge just in case.
- return;
- }
+ return;
+ }
- if (schemaId3 == EMPTY) {
- schemaId3 = schemaId;
+ if (schemaId3 == EMPTY) {
+ schemaId3 = schemaId;
- schema3 = schema;
+ schema3 = schema;
- inline = true; // Forcing HB edge just in case.
+ inline = true; // Forcing HB edge just in case.
- return;
- }
+ return;
+ }
- if (schemaId4 == EMPTY) {
- schemaId4 = schemaId;
+ if (schemaId4 == EMPTY) {
+ schemaId4 = schemaId;
- schema4 = schema;
+ schema4 = schema;
- inline = true; // Forcing HB edge just in case.
+ inline = true; // Forcing HB edge just in case.
- return;
- }
+ return;
+ }
- // No luck, switching to hash map mode.
- HashMap<Integer, BinarySchema> newSchemas = new HashMap<>();
+ // No luck, switching to hash map mode.
+ HashMap<Integer, BinarySchema> newSchemas = new HashMap<>();
- newSchemas.put(schemaId1, schema1);
- newSchemas.put(schemaId2, schema2);
- newSchemas.put(schemaId3, schema3);
- newSchemas.put(schemaId4, schema4);
+ newSchemas.put(schemaId1, schema1);
+ newSchemas.put(schemaId2, schema2);
+ newSchemas.put(schemaId3, schema3);
+ newSchemas.put(schemaId4, schema4);
- newSchemas.put(schemaId, schema);
+ newSchemas.put(schemaId, schema);
- schemas = newSchemas;
+ schemas = newSchemas;
- inline = false;
- }
- else {
- HashMap<Integer, BinarySchema> newSchemas = new HashMap<>(schemas);
+ inline = false;
+ }
+ else {
+ HashMap<Integer, BinarySchema> newSchemas = new HashMap<>(schemas);
- newSchemas.put(schemaId, schema);
+ newSchemas.put(schemaId, schema);
- schemas = newSchemas;
- }
+ schemas = newSchemas;
}
}
+
+ /**
+ * @return List of known schemas.
+ */
+ public synchronized List<BinarySchema> schemas() {
+ List<BinarySchema> res = new ArrayList<>();
+
+ if (inline) {
+ if (schemaId1 != EMPTY)
+ res.add(schema1);
+ if (schemaId2 != EMPTY)
+ res.add(schema2);
+ if (schemaId3 != EMPTY)
+ res.add(schema3);
+ if (schemaId4 != EMPTY)
+ res.add(schema4);
+ }
+ else
+ res.addAll(schemas.values());
+
+ return res;
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a1cb021c/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryUtils.java
index 553d8e5..77dce56 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryUtils.java
@@ -958,10 +958,30 @@ public class BinaryUtils {
* @throws BinaryObjectException If merge failed due to metadata conflict.
*/
public static BinaryMetadata mergeMetadata(@Nullable BinaryMetadata oldMeta, BinaryMetadata newMeta) {
+ return mergeMetadata(oldMeta, newMeta, null);
+ }
+
+ /**
+ * Merge old and new metas.
+ *
+ * @param oldMeta Old meta.
+ * @param newMeta New meta.
+ * @param changedSchemas Set for holding changed schemas.
+ * @return New meta if old meta was null, old meta if no changes detected, merged meta otherwise.
+ * @throws BinaryObjectException If merge failed due to metadata conflict.
+ */
+ public static BinaryMetadata mergeMetadata(@Nullable BinaryMetadata oldMeta, BinaryMetadata newMeta,
+ @Nullable Set<Integer> changedSchemas) {
assert newMeta != null;
- if (oldMeta == null)
+ if (oldMeta == null) {
+ if (changedSchemas != null) {
+ for (BinarySchema schema : newMeta.schemas())
+ changedSchemas.add(schema.schemaId());
+ }
+
return newMeta;
+ }
else {
assert oldMeta.typeId() == newMeta.typeId();
@@ -1036,8 +1056,12 @@ public class BinaryUtils {
Collection<BinarySchema> mergedSchemas = new HashSet<>(oldMeta.schemas());
for (BinarySchema newSchema : newMeta.schemas()) {
- if (mergedSchemas.add(newSchema))
+ if (mergedSchemas.add(newSchema)) {
changed = true;
+
+ if (changedSchemas != null)
+ changedSchemas.add(newSchema.schemaId());
+ }
}
// Return either old meta if no changes detected, or new merged meta.
http://git-wip-us.apache.org/repos/asf/ignite/blob/a1cb021c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java
index 38450df..1c2f6f0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java
@@ -16,8 +16,12 @@
*/
package org.apache.ignite.internal.processors.cache.binary;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
import java.util.List;
+import java.util.Map;
import java.util.Queue;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -42,6 +46,7 @@ import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteInClosure;
import org.jetbrains.annotations.Nullable;
@@ -87,6 +92,9 @@ final class BinaryMetadataTransport {
private final ConcurrentMap<Integer, ClientMetadataRequestFuture> clientReqSyncMap = new ConcurrentHashMap<>();
/** */
+ private final ConcurrentMap<SyncKey, GridFutureAdapter<?>> schemaWaitFuts = new ConcurrentHashMap<>();
+
+ /** */
private volatile boolean stopping;
/** */
@@ -207,6 +215,21 @@ final class BinaryMetadataTransport {
}
/**
+ * Await specific schema update.
+ * @param typeId Type id.
+ * @param schemaId Schema id.
+ * @return Future which will be completed when schema is received.
+ */
+ GridFutureAdapter<?> awaitSchemaUpdate(int typeId, int schemaId) {
+ GridFutureAdapter<Object> fut = new GridFutureAdapter<>();
+
+ // Use version for schemaId.
+ GridFutureAdapter<?> oldFut = schemaWaitFuts.putIfAbsent(new SyncKey(typeId, schemaId), fut);
+
+ return oldFut == null ? fut : oldFut;
+ }
+
+ /**
* Allows client node to request latest version of binary metadata for a given typeId from the cluster
* in case client is able to detect that it has obsolete metadata in its local cache.
*
@@ -259,6 +282,13 @@ final class BinaryMetadataTransport {
/** {@inheritDoc} */
@Override public void onCustomEvent(AffinityTopologyVersion topVer, ClusterNode snd, MetadataUpdateProposedMessage msg) {
+ if (log.isDebugEnabled())
+ log.debug("Received MetadataUpdateProposedListener [typeId=" + msg.typeId() +
+ ", typeName=" + msg.metadata().typeName() +
+ ", pendingVer=" + msg.pendingVersion() +
+ ", acceptedVer=" + msg.acceptedVersion() +
+ ", schemasCnt=" + msg.metadata().schemas().size() + ']');
+
int typeId = msg.typeId();
BinaryMetadataHolder holder = metaLocCache.get(typeId);
@@ -277,20 +307,23 @@ final class BinaryMetadataTransport {
acceptedVer = 0;
}
- if (log.isDebugEnabled())
- log.debug("Versions are stamped on coordinator" +
- " [typeId=" + typeId +
- ", pendingVer=" + pendingVer +
- ", acceptedVer=" + acceptedVer + "]"
- );
-
msg.pendingVersion(pendingVer);
msg.acceptedVersion(acceptedVer);
BinaryMetadata locMeta = holder != null ? holder.metadata() : null;
try {
- BinaryMetadata mergedMeta = BinaryUtils.mergeMetadata(locMeta, msg.metadata());
+ Set<Integer> changedSchemas = new LinkedHashSet<>();
+
+ BinaryMetadata mergedMeta = BinaryUtils.mergeMetadata(locMeta, msg.metadata(), changedSchemas);
+
+ if (log.isDebugEnabled())
+ log.debug("Versions are stamped on coordinator" +
+ " [typeId=" + typeId +
+ ", changedSchemas=" + changedSchemas +
+ ", pendingVer=" + pendingVer +
+ ", acceptedVer=" + acceptedVer + "]"
+ );
msg.metadata(mergedMeta);
}
@@ -358,8 +391,10 @@ final class BinaryMetadataTransport {
if (!msg.rejected()) {
BinaryMetadata locMeta = holder != null ? holder.metadata() : null;
+ Set<Integer> changedSchemas = new LinkedHashSet<>();
+
try {
- BinaryMetadata mergedMeta = BinaryUtils.mergeMetadata(locMeta, msg.metadata());
+ BinaryMetadata mergedMeta = BinaryUtils.mergeMetadata(locMeta, msg.metadata(), changedSchemas);
BinaryMetadataHolder newHolder = new BinaryMetadataHolder(mergedMeta, pendingVer, acceptedVer);
@@ -382,7 +417,8 @@ final class BinaryMetadataTransport {
}
else {
if (log.isDebugEnabled())
- log.debug("Updated metadata on server node: " + newHolder);
+ log.debug("Updated metadata on server node [holder=" + newHolder +
+ ", changedSchemas=" + changedSchemas + ']');
metaLocCache.put(typeId, newHolder);
}
@@ -463,7 +499,7 @@ final class BinaryMetadataTransport {
if (oldAcceptedVer >= newAcceptedVer) {
if (log.isDebugEnabled())
log.debug("Marking ack as duplicate [holder=" + holder +
- ", newAcceptedVer: " + newAcceptedVer + ']');
+ ", newAcceptedVer=" + newAcceptedVer + ']');
//this is duplicate ack
msg.duplicated(true);
@@ -481,8 +517,26 @@ final class BinaryMetadataTransport {
GridFutureAdapter<MetadataUpdateResult> fut = syncMap.get(new SyncKey(typeId, newAcceptedVer));
+ holder = metaLocCache.get(typeId);
+
if (log.isDebugEnabled())
- log.debug("Completing future " + fut + " for " + metaLocCache.get(typeId));
+ log.debug("Completing future " + fut + " for " + holder);
+
+ if (!schemaWaitFuts.isEmpty()) {
+ Iterator<Map.Entry<SyncKey, GridFutureAdapter<?>>> iter = schemaWaitFuts.entrySet().iterator();
+
+ while (iter.hasNext()) {
+ Map.Entry<SyncKey, GridFutureAdapter<?>> entry = iter.next();
+
+ SyncKey key = entry.getKey();
+
+ if (key.typeId() == typeId && holder.metadata().hasSchema(key.version())) {
+ entry.getValue().onDone();
+
+ iter.remove();
+ }
+ }
+ }
if (fut != null)
fut.onDone(MetadataUpdateResult.createSuccessfulResult());
@@ -527,6 +581,11 @@ final class BinaryMetadataTransport {
void key(SyncKey key) {
this.key = key;
}
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(MetadataUpdateResultFuture.class, this);
+ }
}
/**
@@ -580,6 +639,11 @@ final class BinaryMetadataTransport {
return (typeId == that.typeId) && (ver == that.ver);
}
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(SyncKey.class, this);
+ }
}
/**
@@ -615,7 +679,7 @@ final class BinaryMetadataTransport {
binMetaBytes = U.marshal(ctx, metaHolder);
}
catch (IgniteCheckedException e) {
- U.error(log, "Failed to marshal binary metadata for [typeId: " + typeId + "]", e);
+ U.error(log, "Failed to marshal binary metadata for [typeId=" + typeId + ']', e);
resp.markErrorOnRequest();
}
@@ -670,7 +734,8 @@ final class BinaryMetadataTransport {
do {
oldHolder = metaLocCache.get(typeId);
- if (oldHolder != null && obsoleteUpdate(
+ // typeId metadata cannot be removed after initialization.
+ if (obsoleteUpdate(
oldHolder.pendingVersion(),
oldHolder.acceptedVersion(),
newHolder.pendingVersion(),
http://git-wip-us.apache.org/repos/asf/ignite/blob/a1cb021c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
index 4c101b2..137db9f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
@@ -19,10 +19,14 @@ package org.apache.ignite.internal.processors.cache.binary;
import java.io.File;
import java.io.Serializable;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -30,6 +34,8 @@ import javax.cache.CacheException;
import org.apache.ignite.IgniteBinary;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.binary.BinaryField;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.binary.BinaryObjectBuilder;
@@ -39,8 +45,10 @@ import org.apache.ignite.binary.BinaryTypeConfiguration;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.BinaryConfiguration;
import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.events.Event;
import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
import org.apache.ignite.internal.IgniteNodeAttributes;
import org.apache.ignite.internal.UnregisteredBinaryTypeException;
import org.apache.ignite.internal.binary.BinaryContext;
@@ -65,6 +73,7 @@ import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheUtils;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessorImpl;
import org.apache.ignite.internal.util.GridUnsafe;
import org.apache.ignite.internal.util.IgniteUtils;
@@ -76,7 +85,9 @@ import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T1;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiClosure;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteFuture;
@@ -88,7 +99,11 @@ import org.apache.ignite.spi.discovery.IgniteDiscoveryThread;
import org.apache.ignite.thread.IgniteThread;
import org.jetbrains.annotations.Nullable;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_TEST_FEATURES_ENABLED;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_WAIT_SCHEMA_UPDATE;
import static org.apache.ignite.IgniteSystemProperties.getBoolean;
import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_DISCONNECTED;
import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType.BINARY_PROC;
@@ -120,6 +135,12 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
*/
@Nullable private File binaryMetadataFileStoreDir;
+ /** How long to wait for schema if no updates in progress. */
+ private long waitSchemaTimeout = IgniteSystemProperties.getLong(IGNITE_WAIT_SCHEMA_UPDATE, 30_000);
+
+ /** For tests. */
+ public static boolean useTestBinaryCtx = false;
+
/** */
@GridToStringExclude
private IgniteBinary binaries;
@@ -205,7 +226,9 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
BinaryMarshaller bMarsh0 = (BinaryMarshaller)marsh;
- binaryCtx = new BinaryContext(metaHnd, ctx.config(), ctx.log(BinaryContext.class));
+ binaryCtx = useTestBinaryCtx ?
+ new TestBinaryContext(metaHnd, ctx.config(), ctx.log(BinaryContext.class)) :
+ new BinaryContext(metaHnd, ctx.config(), ctx.log(BinaryContext.class));
IgniteUtils.invoke(BinaryMarshaller.class, bMarsh0, "setBinaryContext", binaryCtx, ctx.config());
@@ -452,11 +475,12 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
BinaryMetadata oldMeta = metaHolder != null ? metaHolder.metadata() : null;
- BinaryMetadata mergedMeta = BinaryUtils.mergeMetadata(oldMeta, newMeta0);
+ Set<Integer> changedSchemas = new LinkedHashSet<>();
+
+ BinaryMetadata mergedMeta = BinaryUtils.mergeMetadata(oldMeta, newMeta0, changedSchemas);
- //metadata requested to be added is exactly the same as already presented in the cache
- if (mergedMeta == oldMeta)
- return;
+ if (oldMeta != null && mergedMeta == oldMeta && metaHolder.pendingVersion() == metaHolder.acceptedVersion())
+ return; // Safe to use existing schemas.
if (failIfUnregistered)
throw new UnregisteredBinaryTypeException(
@@ -466,7 +490,24 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
"dev-list.",
typeId, mergedMeta);
- MetadataUpdateResult res = transport.requestMetadataUpdate(mergedMeta).get();
+ long t0 = System.nanoTime();
+
+ GridFutureAdapter<MetadataUpdateResult> fut = transport.requestMetadataUpdate(mergedMeta);
+
+ MetadataUpdateResult res = fut.get();
+
+ if (log.isDebugEnabled()) {
+ IgniteInternalTx tx = ctx.cache().context().tm().tx();
+
+ log.debug("Completed metadata update [typeId=" + typeId +
+ ", typeName=" + newMeta.typeName() +
+ ", changedSchemas=" + changedSchemas +
+ ", waitTime=" + MILLISECONDS.convert(System.nanoTime() - t0, NANOSECONDS) + "ms" +
+ ", holder=" + metaHolder +
+ ", fut=" + fut +
+ ", tx=" + CU.txString(tx) +
+ ']');
+ }
assert res != null;
@@ -541,9 +582,9 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
if (log.isDebugEnabled() && !fut.isDone())
log.debug("Waiting for update for" +
- " [typeId=" + typeId +
- ", pendingVer=" + holder.pendingVersion() +
- ", acceptedVer=" + holder.acceptedVersion() + "]");
+ " [typeId=" + typeId +
+ ", pendingVer=" + holder.pendingVersion() +
+ ", acceptedVer=" + holder.acceptedVersion() + "]");
try {
fut.get();
@@ -565,40 +606,99 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
if (ctx.clientNode()) {
if (holder == null || !holder.metadata().hasSchema(schemaId)) {
+ if (log.isDebugEnabled())
+ log.debug("Waiting for client metadata update" +
+ " [typeId=" + typeId
+ + ", schemaId=" + schemaId
+ + ", pendingVer=" + (holder == null ? "NA" : holder.pendingVersion())
+ + ", acceptedVer=" + (holder == null ? "NA" :holder.acceptedVersion()) + ']');
+
try {
transport.requestUpToDateMetadata(typeId).get();
-
- holder = metadataLocCache.get(typeId);
}
catch (IgniteCheckedException ignored) {
// No-op.
}
+
+ holder = metadataLocCache.get(typeId);
+
+ if (log.isDebugEnabled())
+ log.debug("Finished waiting for client metadata update" +
+ " [typeId=" + typeId
+ + ", schemaId=" + schemaId
+ + ", pendingVer=" + (holder == null ? "NA" : holder.pendingVersion())
+ + ", acceptedVer=" + (holder == null ? "NA" :holder.acceptedVersion()) + ']');
}
}
- else if (holder != null) {
- if (IgniteThread.current() instanceof IgniteDiscoveryThread)
+ else {
+ if (holder != null && IgniteThread.current() instanceof IgniteDiscoveryThread)
return holder.metadata().wrap(binaryCtx);
+ else if (holder != null && (holder.pendingVersion() - holder.acceptedVersion() > 0)) {
+ if (log.isDebugEnabled())
+ log.debug("Waiting for metadata update" +
+ " [typeId=" + typeId
+ + ", schemaId=" + schemaId
+ + ", pendingVer=" + holder.pendingVersion()
+ + ", acceptedVer=" + holder.acceptedVersion() + ']');
- if (holder.pendingVersion() - holder.acceptedVersion() > 0) {
- GridFutureAdapter<MetadataUpdateResult> fut = transport.awaitMetadataUpdate(
- typeId,
- holder.pendingVersion());
+ long t0 = System.nanoTime();
- if (log.isDebugEnabled() && !fut.isDone())
- log.debug("Waiting for update for" +
- " [typeId=" + typeId
- + ", schemaId=" + schemaId
- + ", pendingVer=" + holder.pendingVersion()
- + ", acceptedVer=" + holder.acceptedVersion() + "]");
+ GridFutureAdapter<MetadataUpdateResult> fut = transport.awaitMetadataUpdate(
+ typeId,
+ holder.pendingVersion());
try {
fut.get();
}
+ catch (IgniteCheckedException e) {
+ log.error("Failed to wait for metadata update [typeId=" + typeId + ", schemaId=" + schemaId + ']', e);
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Finished waiting for metadata update" +
+ " [typeId=" + typeId
+ + ", waitTime=" + NANOSECONDS.convert(System.nanoTime() - t0, MILLISECONDS) + "ms"
+ + ", schemaId=" + schemaId
+ + ", pendingVer=" + holder.pendingVersion()
+ + ", acceptedVer=" + holder.acceptedVersion() + ']');
+
+ holder = metadataLocCache.get(typeId);
+ }
+ else if (holder == null || !holder.metadata().hasSchema(schemaId)) {
+ // Last resort waiting.
+ U.warn(log,
+ "Schema is missing while no metadata updates are in progress " +
+ "(will wait for schema update within timeout defined by IGNITE_BINARY_META_UPDATE_TIMEOUT system property)" +
+ " [typeId=" + typeId
+ + ", missingSchemaId=" + schemaId
+ + ", pendingVer=" + (holder == null ? "NA" : holder.pendingVersion())
+ + ", acceptedVer=" + (holder == null ? "NA" : holder.acceptedVersion())
+ + ", binMetaUpdateTimeout=" + waitSchemaTimeout +']');
+
+ long t0 = System.nanoTime();
+
+ GridFutureAdapter<?> fut = transport.awaitSchemaUpdate(typeId, schemaId);
+
+ try {
+ fut.get(waitSchemaTimeout);
+ }
+ catch (IgniteFutureTimeoutCheckedException e) {
+ log.error("Timed out while waiting for schema update [typeId=" + typeId + ", schemaId=" +
+ schemaId + ']');
+ }
catch (IgniteCheckedException ignored) {
// No-op.
}
holder = metadataLocCache.get(typeId);
+
+ if (log.isDebugEnabled() && holder != null && holder.metadata().hasSchema(schemaId))
+ log.debug("Found the schema after wait" +
+ " [typeId=" + typeId
+ + ", waitTime=" + NANOSECONDS.convert(System.nanoTime() - t0, MILLISECONDS) + "ms"
+ + ", schemaId=" + schemaId
+ + ", pendingVer=" + holder.pendingVersion()
+ + ", acceptedVer=" + holder.acceptedVersion() + ']');
}
}
@@ -903,7 +1003,7 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
if ((res = validateBinaryConfiguration(rmtNode)) != null)
return res;
- return validateBinaryMetadata(rmtNode.id(), (Map<Integer, BinaryMetadataHolder>) discoData.joiningNodeData());
+ return validateBinaryMetadata(rmtNode.id(), (Map<Integer, BinaryMetadataHolder>)discoData.joiningNodeData());
}
/** */
@@ -1070,4 +1170,75 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
public void setBinaryMetadataFileStoreDir(@Nullable File binaryMetadataFileStoreDir) {
this.binaryMetadataFileStoreDir = binaryMetadataFileStoreDir;
}
+
+ /** */
+ public static class TestBinaryContext extends BinaryContext {
+ /** */
+ private List<TestBinaryContextListener> listeners;
+
+ /**
+ * @param metaHnd Meta handler.
+ * @param igniteCfg Ignite config.
+ * @param log Logger.
+ */
+ public TestBinaryContext(BinaryMetadataHandler metaHnd, IgniteConfiguration igniteCfg,
+ IgniteLogger log) {
+ super(metaHnd, igniteCfg, log);
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public BinaryType metadata(int typeId) throws BinaryObjectException {
+ BinaryType metadata = super.metadata(typeId);
+
+ if (listeners != null) {
+ for (TestBinaryContextListener listener : listeners)
+ listener.onAfterMetadataRequest(typeId, metadata);
+ }
+
+ return metadata;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void updateMetadata(int typeId, BinaryMetadata meta,
+ boolean failIfUnregistered) throws BinaryObjectException {
+ if (listeners != null) {
+ for (TestBinaryContextListener listener : listeners)
+ listener.onBeforeMetadataUpdate(typeId, meta);
+ }
+
+ super.updateMetadata(typeId, meta, failIfUnregistered);
+ }
+
+ /** */
+ public interface TestBinaryContextListener {
+ /**
+ * @param typeId Type id.
+ * @param type Type.
+ */
+ void onAfterMetadataRequest(int typeId, BinaryType type);
+
+ /**
+ * @param typeId Type id.
+ * @param metadata Metadata.
+ */
+ void onBeforeMetadataUpdate(int typeId, BinaryMetadata metadata);
+ }
+
+ /**
+ * @param lsnr Listener.
+ */
+ public void addListener(TestBinaryContextListener lsnr) {
+ if (listeners == null)
+ listeners = new ArrayList<>();
+
+ if (!listeners.contains(lsnr))
+ listeners.add(lsnr);
+ }
+
+ /** */
+ public void clearAllListener() {
+ if (listeners != null)
+ listeners.clear();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a1cb021c/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 048abf6..55462ff 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -423,6 +423,9 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements IgniteDiscovery
/** */
private IgniteDiscoverySpiInternalListener internalLsnr;
+ /** For test purposes. */
+ private boolean skipAddrsRandomization = false;
+
/**
* Gets current SPI state.
*
@@ -1881,7 +1884,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements IgniteDiscovery
}
}
- if (!res.isEmpty())
+ if (!res.isEmpty() && !skipAddrsRandomization)
Collections.shuffle(res);
return res;
http://git-wip-us.apache.org/repos/asf/ignite/blob/a1cb021c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CachePageWriteLockUnlockTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CachePageWriteLockUnlockTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CachePageWriteLockUnlockTest.java
index 84fd916..41c5882 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CachePageWriteLockUnlockTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CachePageWriteLockUnlockTest.java
@@ -100,6 +100,8 @@ public class CachePageWriteLockUnlockTest extends GridCommonAbstractTest {
grid0 = startGrid(0);
+ grid0.cluster().active(true);
+
preloadPartition(grid0, DEFAULT_CACHE_NAME, PARTITION);
Iterator<Cache.Entry<Object, Object>> it = grid0.cache(DEFAULT_CACHE_NAME).iterator();
http://git-wip-us.apache.org/repos/asf/ignite/blob/a1cb021c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java
index ccf4c8a..ae75caa 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java
@@ -819,7 +819,12 @@ public class TxRollbackOnTimeoutTest extends GridCommonAbstractTest {
tx.commit();
}
catch (Throwable t) {
- assertTrue(X.hasCause(t, TransactionTimeoutException.class));
+ boolean timedOut = X.hasCause(t, TransactionTimeoutException.class);
+
+ if (!timedOut)
+ log.error("Got unexpected exception", t);
+
+ assertTrue(timedOut);
}
assertEquals(0, client.cache(CACHE_NAME).size());
http://git-wip-us.apache.org/repos/asf/ignite/blob/a1cb021c/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/BinaryMetadataConcurrentUpdateWithIndexesTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/BinaryMetadataConcurrentUpdateWithIndexesTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/BinaryMetadataConcurrentUpdateWithIndexesTest.java
new file mode 100644
index 0000000..fed1d7f
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/BinaryMetadataConcurrentUpdateWithIndexesTest.java
@@ -0,0 +1,439 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.reflect.Field;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.binary.BinaryObjectBuilder;
+import org.apache.ignite.binary.BinaryObjectException;
+import org.apache.ignite.binary.BinaryType;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.QueryIndex;
+import org.apache.ignite.cache.QueryIndexType;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.EventType;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.binary.BinaryMetadata;
+import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
+import org.apache.ignite.internal.processors.cache.binary.MetadataUpdateProposedMessage;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiClosure;
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+
+/**
+ * Tests scenario for too early metadata update completion in case of multiple concurrent updates for the same schema.
+ * <p>
+ * Scenario is the following:
+ *
+ * <ul>
+ * <li>Start 4 nodes, connect client to node 2 in topology order (starting from 1).</li>
+ * <li>Start two concurrent transactions from client node producing same schema update.</li>
+ * <li>Delay second update until first update will return to client with stamped propose message and writes new
+ * schema to local metadata cache</li>
+ * <li>Unblock second update. It should correctly wait until the metadata is applied on all
+ * nodes or tx will fail on commit.</li>
+ * </ul>
+ */
+public class BinaryMetadataConcurrentUpdateWithIndexesTest extends GridCommonAbstractTest {
+ /** */
+ private static final int FIELDS = 2;
+
+ /** */
+ private static final int MB = 1024 * 1024;
+
+ /** */
+ private static final TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+ cfg.setIncludeEventTypes(EventType.EVTS_DISCOVERY);
+
+ BlockTcpDiscoverySpi spi = new BlockTcpDiscoverySpi();
+
+ Field rndAddrsField = U.findField(BlockTcpDiscoverySpi.class, "skipAddrsRandomization");
+
+ assertNotNull(rndAddrsField);
+
+ rndAddrsField.set(spi, true);
+
+ cfg.setDiscoverySpi(spi.setIpFinder(ipFinder));
+
+ cfg.setClientMode(igniteInstanceName.startsWith("client"));
+
+ QueryEntity qryEntity = new QueryEntity("java.lang.Integer", "Value");
+
+ LinkedHashMap<String, String> fields = new LinkedHashMap<>();
+
+ Collection<QueryIndex> indexes = new ArrayList<>(FIELDS);
+
+ for (int i = 0; i < FIELDS; i++) {
+ String name = "s" + i;
+
+ fields.put(name, "java.lang.String");
+
+ indexes.add(new QueryIndex(name, QueryIndexType.SORTED));
+ }
+
+ qryEntity.setFields(fields);
+
+ qryEntity.setIndexes(indexes);
+
+ cfg.setDataStorageConfiguration(new DataStorageConfiguration().
+ setDefaultDataRegionConfiguration(new DataRegionConfiguration().setMaxSize(50 * MB)));
+
+ cfg.setCacheConfiguration(new CacheConfiguration(DEFAULT_CACHE_NAME).
+ setBackups(0).
+ setQueryEntities(Collections.singleton(qryEntity)).
+ setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL).
+ setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC).
+ setCacheMode(CacheMode.PARTITIONED));
+
+ return cfg;
+ }
+
+ /** Flag to start syncing metadata requests. Should skip on exchange. */
+ private volatile boolean syncMeta;
+
+ /** Metadata init latch. Both threads must request initial metadata. */
+ private CountDownLatch initMetaReq = new CountDownLatch(2);
+
+ /** Thread local flag for need of waiting local metadata update. */
+ private ThreadLocal<Boolean> delayMetadataUpdateThreadLoc = new ThreadLocal<>();
+
+ /** Latch for waiting local metadata update. */
+ public static final CountDownLatch localMetaUpdatedLatch = new CountDownLatch(1);
+
+ /** */
+ public void testMissingSchemaUpdate() throws Exception {
+ // Start order is important.
+ Ignite node0 = startGrid("node0");
+
+ Ignite node1 = startGrid("node1");
+
+ IgniteEx client0 = startGrid("client0");
+
+ CacheObjectBinaryProcessorImpl.TestBinaryContext clientCtx =
+ (CacheObjectBinaryProcessorImpl.TestBinaryContext)((CacheObjectBinaryProcessorImpl)client0.context().
+ cacheObjects()).binaryContext();
+
+ clientCtx.addListener(new CacheObjectBinaryProcessorImpl.TestBinaryContext.TestBinaryContextListener() {
+ @Override public void onAfterMetadataRequest(int typeId, BinaryType type) {
+ if (syncMeta) {
+ try {
+ initMetaReq.countDown();
+
+ initMetaReq.await();
+ }
+ catch (Exception e) {
+ throw new BinaryObjectException(e);
+ }
+ }
+ }
+
+ @Override public void onBeforeMetadataUpdate(int typeId, BinaryMetadata metadata) {
+ // Delay one of updates until schema is locally updated on propose message.
+ if (delayMetadataUpdateThreadLoc.get() != null)
+ await(localMetaUpdatedLatch, 5000);
+ }
+ });
+
+ Ignite node2 = startGrid("node2");
+
+ Ignite node3 = startGrid("node3");
+
+ startGrid("node4");
+
+ node0.cluster().active(true);
+
+ awaitPartitionMapExchange();
+
+ syncMeta = true;
+
+ CountDownLatch clientProposeMsgBlockedLatch = new CountDownLatch(1);
+
+ AtomicBoolean clientWait = new AtomicBoolean();
+ final Object clientMux = new Object();
+
+ AtomicBoolean srvWait = new AtomicBoolean();
+ final Object srvMux = new Object();
+
+ ((BlockTcpDiscoverySpi)node1.configuration().getDiscoverySpi()).setClosure((snd, msg) -> {
+ if (msg instanceof MetadataUpdateProposedMessage) {
+ if (Thread.currentThread().getName().contains("client")) {
+ log.info("Block custom message to client0: [locNode=" + snd + ", msg=" + msg + ']');
+
+ clientProposeMsgBlockedLatch.countDown();
+
+ // Message to client
+ synchronized (clientMux) {
+ while (!clientWait.get())
+ try {
+ clientMux.wait();
+ }
+ catch (InterruptedException e) {
+ fail();
+ }
+ }
+ }
+ }
+
+ return null;
+ });
+
+ ((BlockTcpDiscoverySpi)node2.configuration().getDiscoverySpi()).setClosure((snd, msg) -> {
+ if (msg instanceof MetadataUpdateProposedMessage) {
+ MetadataUpdateProposedMessage msg0 = (MetadataUpdateProposedMessage)msg;
+
+ int pendingVer = U.field(msg0, "pendingVer");
+
+ // Should not block propose messages until they reach coordinator.
+ if (pendingVer == 0)
+ return null;
+
+ log.info("Block custom message to next server: [locNode=" + snd + ", msg=" + msg + ']');
+
+ // Message to client
+ synchronized (srvMux) {
+ while (!srvWait.get())
+ try {
+ srvMux.wait();
+ }
+ catch (InterruptedException e) {
+ fail();
+ }
+ }
+ }
+
+ return null;
+ });
+
+ Integer key = primaryKey(node3.cache(DEFAULT_CACHE_NAME));
+
+ IgniteInternalFuture fut0 = runAsync(() -> {
+ try (Transaction tx = client0.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ client0.cache(DEFAULT_CACHE_NAME).put(key, build(client0, "val", 0));
+
+ tx.commit();
+ }
+ catch (Throwable t) {
+ log.error("err", t);
+ }
+
+ });
+
+ // Implements test logic.
+ IgniteInternalFuture fut1 = runAsync(() -> {
+ // Wait for initial metadata received. It should be initial version: pending=0, accepted=0
+ await(initMetaReq, 5000);
+
+ // Wait for blocking proposal message to client node.
+ await(clientProposeMsgBlockedLatch, 5000);
+
+ // Unblock proposal message to client.
+ clientWait.set(true);
+
+ synchronized (clientMux) {
+ clientMux.notify();
+ }
+
+ // Give some time to apply update.
+ doSleep(3000);
+
+ // Unblock second metadata update.
+ localMetaUpdatedLatch.countDown();
+
+ // Give some time for tx to complete (success or fail). fut2 will throw an error if tx has failed on commit.
+ doSleep(3000);
+
+ // Unblock metadata message and allow for correct version acceptance.
+ srvWait.set(true);
+
+ synchronized (srvMux) {
+ srvMux.notify();
+ }
+ });
+
+ IgniteInternalFuture fut2 = runAsync(() -> {
+ delayMetadataUpdateThreadLoc.set(true);
+
+ try (Transaction tx = client0.transactions().
+ txStart(PESSIMISTIC, REPEATABLE_READ, 0, 1)) {
+ client0.cache(DEFAULT_CACHE_NAME).put(key, build(client0, "val", 0));
+
+ tx.commit();
+ }
+ });
+
+ fut0.get();
+ fut1.get();
+ fut2.get();
+ }
+
+ /**
+ * @param latch Latch.
+ * @param timeout Timeout.
+ */
+ private void await(CountDownLatch latch, long timeout) {
+ try {
+ latch.await(5000, MILLISECONDS);
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+
+ long cnt = initMetaReq.getCount();
+
+ if (cnt != 0)
+ throw new RuntimeException("Invalid latch count after wait: " + cnt);
+ }
+
+ /**
+ * @param ignite Ignite.
+ * @param prefix Value prefix.
+ * @param fields Fields.
+ */
+ protected BinaryObject build(Ignite ignite, String prefix, int... fields) {
+ BinaryObjectBuilder builder = ignite.binary().builder("Value");
+
+ for (int field : fields) {
+ assertTrue(field < FIELDS);
+
+ builder.setField("i" + field, field);
+ builder.setField("s" + field, prefix + field);
+ }
+
+ return builder.build();
+ }
+
+ /**
+ * Discovery SPI which can simulate network split.
+ */
+ protected class BlockTcpDiscoverySpi extends TcpDiscoverySpi {
+ /** Closure. */
+ private volatile IgniteBiClosure<ClusterNode, DiscoveryCustomMessage, Void> clo;
+
+ /**
+ * @param clo Closure.
+ */
+ public void setClosure(IgniteBiClosure<ClusterNode, DiscoveryCustomMessage, Void> clo) {
+ this.clo = clo;
+ }
+
+ /**
+ * @param addr Address.
+ * @param msg Message.
+ */
+ private synchronized void apply(ClusterNode addr, TcpDiscoveryAbstractMessage msg) {
+ if (!(msg instanceof TcpDiscoveryCustomEventMessage))
+ return;
+
+ TcpDiscoveryCustomEventMessage cm = (TcpDiscoveryCustomEventMessage)msg;
+
+ DiscoveryCustomMessage delegate;
+
+ try {
+ DiscoverySpiCustomMessage custMsg = cm.message(marshaller(), U.resolveClassLoader(ignite().configuration()));
+
+ assertNotNull(custMsg);
+
+ delegate = ((CustomMessageWrapper)custMsg).delegate();
+
+ }
+ catch (Throwable throwable) {
+ throw new RuntimeException(throwable);
+ }
+
+ if (clo != null)
+ clo.apply(addr, delegate);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void writeToSocket(
+ Socket sock,
+ TcpDiscoveryAbstractMessage msg,
+ byte[] data,
+ long timeout
+ ) throws IOException {
+ if (spiCtx != null)
+ apply(spiCtx.localNode(), msg);
+
+ super.writeToSocket(sock, msg, data, timeout);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void writeToSocket(Socket sock,
+ OutputStream out,
+ TcpDiscoveryAbstractMessage msg,
+ long timeout) throws IOException, IgniteCheckedException {
+ if (spiCtx != null)
+ apply(spiCtx.localNode(), msg);
+
+ super.writeToSocket(sock, out, msg, timeout);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ CacheObjectBinaryProcessorImpl.useTestBinaryCtx = true;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ CacheObjectBinaryProcessorImpl.useTestBinaryCtx = false;
+
+ stopAllGrids();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a1cb021c/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
index b44ff2d..8a60c7d 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
@@ -18,6 +18,7 @@
package org.apache.ignite.testsuites;
import junit.framework.TestSuite;
+import org.apache.ignite.internal.processors.cache.BinaryMetadataConcurrentUpdateWithIndexesTest;
import org.apache.ignite.internal.processors.cache.BinarySerializationQuerySelfTest;
import org.apache.ignite.internal.processors.cache.BinarySerializationQueryWithReflectiveSerializerSelfTest;
import org.apache.ignite.internal.processors.cache.IgniteCacheBinaryObjectsScanSelfTest;
@@ -42,6 +43,8 @@ public class IgniteBinaryCacheQueryTestSuite extends TestSuite {
suite.addTestSuite(IgniteCacheBinaryObjectsScanWithEventsSelfTest.class);
suite.addTestSuite(BigEntryQueryTest.class);
+ suite.addTestSuite(BinaryMetadataConcurrentUpdateWithIndexesTest.class);
+
//Should be adjusted. Not ready to be used with BinaryMarshaller.
//suite.addTestSuite(GridCacheBinarySwapScanQuerySelfTest.class);