You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by dg...@apache.org on 2019/02/19 18:51:14 UTC
[ignite] branch master updated: IGNITE-11313 Fix cluster hangs on
cache invoke with binary objects creation - Fixes #6102.
This is an automated email from the ASF dual-hosted git repository.
dgovorukhin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 0058d3f IGNITE-11313 Fix cluster hangs on cache invoke with binary objects creation - Fixes #6102.
0058d3f is described below
commit 0058d3f896f159a2d809eed91fb338209b2c06a4
Author: ibessonov <be...@gmail.com>
AuthorDate: Tue Feb 19 21:50:54 2019 +0300
IGNITE-11313 Fix cluster hangs on cache invoke with binary objects creation - Fixes #6102.
Signed-off-by: Dmitriy Govorukhin <dm...@gmail.com>
---
.../ignite/internal/MarshallerContextImpl.java | 29 ++++-
.../internal/UnregisteredBinaryTypeException.java | 59 ++++++-----
.../internal/binary/BinaryClassDescriptor.java | 8 +-
.../ignite/internal/binary/BinaryContext.java | 12 ++-
.../internal/binary/BinaryFieldAccessor.java | 6 +-
.../binary/builder/BinaryObjectBuilderImpl.java | 2 +-
.../internal/client/thin/TcpIgniteClient.java | 15 ++-
.../optimized/OptimizedMarshallerUtils.java | 2 +-
.../processors/cache/GridCacheMapEntry.java | 6 +-
.../binary/CacheObjectBinaryProcessorImpl.java | 11 +-
.../distributed/dht/atomic/GridDhtAtomicCache.java | 17 ++-
.../platform/binary/PlatformBinaryProcessor.java | 2 +-
.../binary/ClientBinaryTypeNamePutRequest.java | 3 +-
.../ignite/marshaller/MarshallerContext.java | 26 ++++-
.../org/apache/ignite/thread/IgniteThread.java | 9 ++
.../internal/binary/BinaryMarshallerSelfTest.java | 4 +-
.../GridBinaryMarshallerCtxDisabledSelfTest.java | 6 +-
...tadataRegistrationInsideEntryProcessorTest.java | 118 +++++++++++++++++++++
.../cache/GridCacheEntryMemorySizeSelfTest.java | 6 +-
.../marshaller/MarshallerContextTestImpl.java | 2 +-
20 files changed, 275 insertions(+), 68 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
index a9ac22d..bfe2da0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
@@ -258,9 +258,10 @@ public class MarshallerContextImpl implements MarshallerContext {
/** {@inheritDoc} */
@Override public boolean registerClassName(
- byte platformId,
- int typeId,
- String clsName
+ byte platformId,
+ int typeId,
+ String clsName,
+ boolean failIfUnregistered
) throws IgniteCheckedException {
ConcurrentMap<Integer, MappedName> cache = getCacheFor(platformId);
@@ -276,7 +277,13 @@ public class MarshallerContextImpl implements MarshallerContext {
if (transport.stopping())
return false;
- IgniteInternalFuture<MappingExchangeResult> fut = transport.awaitMappingAcceptance(new MarshallerMappingItem(platformId, typeId, clsName), cache);
+ MarshallerMappingItem item = new MarshallerMappingItem(platformId, typeId, clsName);
+
+ GridFutureAdapter<MappingExchangeResult> fut = transport.awaitMappingAcceptance(item, cache);
+
+ if (failIfUnregistered && !fut.isDone())
+ throw new UnregisteredBinaryTypeException(typeId, fut);
+
MappingExchangeResult res = fut.get();
return convertXchRes(res);
@@ -286,7 +293,13 @@ public class MarshallerContextImpl implements MarshallerContext {
if (transport.stopping())
return false;
- IgniteInternalFuture<MappingExchangeResult> fut = transport.proposeMapping(new MarshallerMappingItem(platformId, typeId, clsName), cache);
+ MarshallerMappingItem item = new MarshallerMappingItem(platformId, typeId, clsName);
+
+ GridFutureAdapter<MappingExchangeResult> fut = transport.proposeMapping(item, cache);
+
+ if (failIfUnregistered && !fut.isDone())
+ throw new UnregisteredBinaryTypeException(typeId, fut);
+
MappingExchangeResult res = fut.get();
return convertXchRes(res);
@@ -294,6 +307,12 @@ public class MarshallerContextImpl implements MarshallerContext {
}
/** {@inheritDoc} */
+ @Override
+ public boolean registerClassName(byte platformId, int typeId, String clsName) {
+ throw new UnsupportedOperationException("registerClassName");
+ }
+
+ /** {@inheritDoc} */
@Override public boolean registerClassNameLocally(byte platformId, int typeId, String clsName)
throws IgniteCheckedException
{
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/UnregisteredBinaryTypeException.java b/modules/core/src/main/java/org/apache/ignite/internal/UnregisteredBinaryTypeException.java
index f46de12..de507be 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/UnregisteredBinaryTypeException.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/UnregisteredBinaryTypeException.java
@@ -19,65 +19,65 @@ package org.apache.ignite.internal;
import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.binary.BinaryMetadata;
-import org.jetbrains.annotations.Nullable;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
/**
* Exception thrown during serialization if binary metadata isn't registered and it's registration isn't allowed.
+ * Used for both binary types and marshalling mappings.
+ * Confusing old class name is preserved for backwards compatibility.
*/
public class UnregisteredBinaryTypeException extends IgniteException {
/** */
private static final long serialVersionUID = 0L;
/** */
+ private static final String MESSAGE =
+ "Attempted to update binary metadata inside a critical synchronization block (will be " +
+ "automatically retried). This exception must not be wrapped to any other exception class. " +
+ "If you encounter this exception outside of EntryProcessor, please report to Apache Ignite " +
+ "dev-list. Debug info [typeId=%d, binaryMetadata=%s, fut=%s]";
+
+ /** */
+ private static String createMessage(int typeId, BinaryMetadata binaryMetadata, GridFutureAdapter<?> fut) {
+ return String.format(MESSAGE, typeId, binaryMetadata, fut);
+ }
+
+ /** */
private final int typeId;
/** */
private final BinaryMetadata binaryMetadata;
+ /** */
+ private final GridFutureAdapter<?> fut;
+
/**
* @param typeId Type ID.
* @param binaryMetadata Binary metadata.
*/
public UnregisteredBinaryTypeException(int typeId, BinaryMetadata binaryMetadata) {
- this.typeId = typeId;
- this.binaryMetadata = binaryMetadata;
+ this(typeId, binaryMetadata, null);
}
/**
- * @param msg Error message.
* @param typeId Type ID.
- * @param binaryMetadata Binary metadata.
+ * @param fut Future to wait in handler.
*/
- public UnregisteredBinaryTypeException(String msg, int typeId,
- BinaryMetadata binaryMetadata) {
- super(msg);
- this.typeId = typeId;
- this.binaryMetadata = binaryMetadata;
+ public UnregisteredBinaryTypeException(int typeId, GridFutureAdapter<?> fut) {
+ this(typeId, null, fut);
}
/**
- * @param cause Non-null throwable cause.
* @param typeId Type ID.
* @param binaryMetadata Binary metadata.
+ * @param fut Future to wait in handler.
*/
- public UnregisteredBinaryTypeException(Throwable cause, int typeId,
- BinaryMetadata binaryMetadata) {
- super(cause);
- this.typeId = typeId;
- this.binaryMetadata = binaryMetadata;
- }
+ private UnregisteredBinaryTypeException(int typeId, BinaryMetadata binaryMetadata, GridFutureAdapter<?> fut) {
+ super(createMessage(typeId, binaryMetadata, fut));
- /**
- * @param msg Error message.
- * @param cause Non-null throwable cause.
- * @param typeId Type ID.
- * @param binaryMetadata Binary metadata.
- */
- public UnregisteredBinaryTypeException(String msg, @Nullable Throwable cause, int typeId,
- BinaryMetadata binaryMetadata) {
- super(msg, cause);
this.typeId = typeId;
this.binaryMetadata = binaryMetadata;
+ this.fut = fut;
}
/**
@@ -93,4 +93,11 @@ public class UnregisteredBinaryTypeException extends IgniteException {
public BinaryMetadata binaryMetadata() {
return binaryMetadata;
}
+
+ /**
+ * @return Future to wait in handler.
+ */
+ public GridFutureAdapter<?> future() {
+ return fut;
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryClassDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryClassDescriptor.java
index 73dee2b..b4fd932 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryClassDescriptor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryClassDescriptor.java
@@ -40,8 +40,8 @@ import org.apache.ignite.binary.BinaryObjectException;
import org.apache.ignite.binary.BinaryReflectiveSerializer;
import org.apache.ignite.binary.BinarySerializer;
import org.apache.ignite.binary.Binarylizable;
-import org.apache.ignite.internal.UnregisteredClassException;
import org.apache.ignite.internal.UnregisteredBinaryTypeException;
+import org.apache.ignite.internal.UnregisteredClassException;
import org.apache.ignite.internal.marshaller.optimized.OptimizedMarshaller;
import org.apache.ignite.internal.processors.cache.CacheObjectImpl;
import org.apache.ignite.internal.processors.query.QueryUtils;
@@ -823,10 +823,10 @@ public class BinaryClassDescriptor {
assert false : "Invalid mode: " + mode;
}
}
+ catch (UnregisteredBinaryTypeException | UnregisteredClassException e) {
+ throw e;
+ }
catch (Exception e) {
- if (e instanceof UnregisteredBinaryTypeException || e instanceof UnregisteredClassException)
- throw e;
-
String msg;
if (S.INCLUDE_SENSITIVE && !F.isEmpty(typeName))
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryContext.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryContext.java
index 7885d95..c263def 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryContext.java
@@ -48,7 +48,9 @@ import java.util.jar.JarFile;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.UnregisteredBinaryTypeException;
import org.apache.ignite.internal.UnregisteredClassException;
+import org.apache.ignite.internal.processors.marshaller.MappingExchangeResult;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.binary.BinaryBasicIdMapper;
import org.apache.ignite.binary.BinaryBasicNameMapper;
@@ -781,7 +783,7 @@ public class BinaryContext {
final int typeId = mapper.typeId(clsName);
- registered = registerUserClassName(typeId, cls.getName());
+ registered = registerUserClassName(typeId, cls.getName(), false);
BinarySerializer serializer = serializerForClass(cls);
@@ -819,7 +821,7 @@ public class BinaryContext {
private BinaryClassDescriptor registerUserClassDescriptor(BinaryClassDescriptor desc) {
boolean registered;
- registered = registerUserClassName(desc.typeId(), desc.describedClass().getName());
+ registered = registerUserClassName(desc.typeId(), desc.describedClass().getName(), false);
if (registered) {
BinarySerializer serializer = desc.initialSerializer();
@@ -1191,15 +1193,17 @@ public class BinaryContext {
*
* @param typeId Type ID.
* @param clsName Class Name.
+ * @param failIfUnregistered If {@code true} then throw {@link UnregisteredBinaryTypeException} with
+ * {@link MappingExchangeResult} future instead of synchronously awaiting for its completion.
* @return {@code True} if the mapping was registered successfully.
*/
- public boolean registerUserClassName(int typeId, String clsName) {
+ public boolean registerUserClassName(int typeId, String clsName, boolean failIfUnregistered) {
IgniteCheckedException e = null;
boolean res = false;
try {
- res = marshCtx.registerClassName(JAVA_ID, typeId, clsName);
+ res = marshCtx.registerClassName(JAVA_ID, typeId, clsName, failIfUnregistered);
}
catch (DuplicateTypeIdException dupEx) {
// Ignore if trying to register mapped type name of the already registered class name and vise versa
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryFieldAccessor.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryFieldAccessor.java
index 29f6ef0..bd5ded5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryFieldAccessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryFieldAccessor.java
@@ -156,10 +156,10 @@ public abstract class BinaryFieldAccessor {
try {
write0(obj, writer);
}
+ catch (UnregisteredClassException | UnregisteredBinaryTypeException ex) {
+ throw ex;
+ }
catch (Exception ex) {
- if (ex instanceof UnregisteredClassException || ex instanceof UnregisteredBinaryTypeException)
- throw ex;
-
if (S.INCLUDE_SENSITIVE && !F.isEmpty(name))
throw new BinaryObjectException("Failed to write field [name=" + name + ']', ex);
else
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/builder/BinaryObjectBuilderImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/builder/BinaryObjectBuilderImpl.java
index a21e74b..f2664c2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/builder/BinaryObjectBuilderImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/builder/BinaryObjectBuilderImpl.java
@@ -361,7 +361,7 @@ public class BinaryObjectBuilderImpl implements BinaryObjectBuilder {
if (affFieldName0 == null)
affFieldName0 = ctx.affinityKeyFieldName(typeId);
- ctx.registerUserClassName(typeId, typeName);
+ ctx.registerUserClassName(typeId, typeName, writer.failIfUnregistered());
ctx.updateMetadata(typeId, new BinaryMetadata(typeId, typeName, fieldsMeta, affFieldName0,
Collections.singleton(curSchema), false, null), writer.failIfUnregistered());
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java
index 5040816..898e447 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java
@@ -327,8 +327,12 @@ public class TcpIgniteClient implements IgniteClient {
private Map<Integer, String> cache = new ConcurrentHashMap<>();
/** {@inheritDoc} */
- @Override public boolean registerClassName(byte platformId, int typeId, String clsName)
- throws IgniteCheckedException {
+ @Override public boolean registerClassName(
+ byte platformId,
+ int typeId,
+ String clsName,
+ boolean failIfUnregistered
+ ) throws IgniteCheckedException {
if (platformId != MarshallerPlatformIds.JAVA_ID)
throw new IllegalArgumentException("platformId");
@@ -359,6 +363,13 @@ public class TcpIgniteClient implements IgniteClient {
}
/** {@inheritDoc} */
+ @Override
+ @Deprecated
+ public boolean registerClassName(byte platformId, int typeId, String clsName) throws IgniteCheckedException {
+ return registerClassName(platformId, typeId, clsName, false);
+ }
+
+ /** {@inheritDoc} */
@Override public boolean registerClassNameLocally(byte platformId, int typeId, String clsName) {
if (platformId != MarshallerPlatformIds.JAVA_ID)
throw new IllegalArgumentException("platformId");
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/marshaller/optimized/OptimizedMarshallerUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/marshaller/optimized/OptimizedMarshallerUtils.java
index 4626df72..3ede240 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/marshaller/optimized/OptimizedMarshallerUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/marshaller/optimized/OptimizedMarshallerUtils.java
@@ -201,7 +201,7 @@ class OptimizedMarshallerUtils {
boolean registered;
try {
- registered = ctx.registerClassName(JAVA_ID, typeId, cls.getName());
+ registered = ctx.registerClassName(JAVA_ID, typeId, cls.getName(), false);
}
catch (Exception e) {
throw new IOException("Failed to register class: " + cls.getName(), e);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index e044a2d..1f56fe81 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -6688,10 +6688,10 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
return null;
}
+ catch (UnregisteredClassException | UnregisteredBinaryTypeException e) {
+ throw e;
+ }
catch (Exception e) {
- if (e instanceof UnregisteredClassException || e instanceof UnregisteredBinaryTypeException)
- throw (IgniteException)e;
-
writeObj = invokeEntry.valObj;
return new IgniteBiTuple<>(null, e);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
index 5e43dd3..c30da7a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
@@ -542,18 +542,17 @@ public class CacheObjectBinaryProcessorImpl extends GridProcessorAdapter impleme
GridFutureAdapter<MetadataUpdateResult> fut =
transport.awaitMetadataUpdate(typeId, metaHolder.pendingVersion());
+ if (failIfUnregistered && !fut.isDone())
+ throw new UnregisteredBinaryTypeException(typeId, fut);
+
fut.get();
}
+
return;
}
if (failIfUnregistered)
- throw new UnregisteredBinaryTypeException(
- "Attempted to update binary metadata inside a critical synchronization block (will be " +
- "automatically retried). This exception must not be wrapped to any other exception class. " +
- "If you encounter this exception outside of EntryProcessor, please report to Apache Ignite " +
- "dev-list.",
- typeId, mergedMeta);
+ throw new UnregisteredBinaryTypeException(typeId, mergedMeta);
long t0 = System.nanoTime();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 5952617..1da003a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1877,6 +1877,17 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
.binaryContext().descriptorForClass(ex.cls(), false, false);
}
catch (UnregisteredBinaryTypeException ex) {
+ if (ex.future() != null) {
+ // Wait for the future that couldn't be processed because of
+ // IgniteThread#isForbiddenToRequestBinaryMetadata flag being true. Usually this means
+ // that awaiting for the future right there would lead to potential deadlock if
+ // continuous queries are used in parallel with entry processor.
+ ex.future().get();
+
+ // Retry and don't update current binary metadata, because it most likely already exists.
+ continue;
+ }
+
IgniteCacheObjectProcessor cacheObjProc = ctx.cacheObjects();
assert cacheObjProc instanceof CacheObjectBinaryProcessorImpl;
@@ -2216,10 +2227,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
ctx.validateKeyAndValue(entry.key(), updated);
}
}
+ catch (UnregisteredClassException | UnregisteredBinaryTypeException e) {
+ throw e;
+ }
catch (Exception e) {
- if (e instanceof UnregisteredClassException || e instanceof UnregisteredBinaryTypeException)
- throw (IgniteException) e;
-
curInvokeRes = CacheInvokeResult.fromError(e);
updated = old;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/binary/PlatformBinaryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/binary/PlatformBinaryProcessor.java
index be43848..76df6a2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/binary/PlatformBinaryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/binary/PlatformBinaryProcessor.java
@@ -76,7 +76,7 @@ public class PlatformBinaryProcessor extends PlatformAbstractTarget {
String typeName = reader.readString();
return platformContext().kernalContext().marshallerContext()
- .registerClassName(MarshallerPlatformIds.DOTNET_ID, typeId, typeName)
+ .registerClassName(MarshallerPlatformIds.DOTNET_ID, typeId, typeName, false)
? TRUE : FALSE;
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/binary/ClientBinaryTypeNamePutRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/binary/ClientBinaryTypeNamePutRequest.java
index 3119fbb..1ab93c5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/binary/ClientBinaryTypeNamePutRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/binary/ClientBinaryTypeNamePutRequest.java
@@ -54,7 +54,8 @@ public class ClientBinaryTypeNamePutRequest extends ClientRequest {
/** {@inheritDoc} */
@Override public ClientResponse process(ClientConnectionContext ctx) {
try {
- boolean res = ctx.kernalContext().marshallerContext().registerClassName(platformId, typeId, typeName);
+ boolean res = ctx.kernalContext().marshallerContext()
+ .registerClassName(platformId, typeId, typeName, false);
return new ClientBooleanResponse(requestId(), res);
}
diff --git a/modules/core/src/main/java/org/apache/ignite/marshaller/MarshallerContext.java b/modules/core/src/main/java/org/apache/ignite/marshaller/MarshallerContext.java
index 987e999..f2ac393 100644
--- a/modules/core/src/main/java/org/apache/ignite/marshaller/MarshallerContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/marshaller/MarshallerContext.java
@@ -18,6 +18,7 @@
package org.apache.ignite.marshaller;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.UnregisteredBinaryTypeException;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.marshaller.jdk.JdkMarshaller;
@@ -34,10 +35,33 @@ public interface MarshallerContext {
* @param platformId Id of a platform (java, .NET, etc.) to register mapping for.
* @param typeId Type ID.
* @param clsName Class name.
+ * @param failIfUnregistered If {@code true} then throw {@link UnregisteredBinaryTypeException} with
+ * registration future instead of synchronously awaiting for its completion.
* @return {@code True} if mapping was registered successfully.
* @throws IgniteCheckedException In case of error.
*/
- public boolean registerClassName(byte platformId, int typeId, String clsName) throws IgniteCheckedException;
+ public default boolean registerClassName(
+ byte platformId,
+ int typeId,
+ String clsName,
+ boolean failIfUnregistered
+ ) throws IgniteCheckedException {
+ return registerClassName(platformId, typeId, clsName);
+ }
+
+ /**
+ * Same as {@link MarshallerContext#registerClassName(byte, int, java.lang.String, boolean)} but with shortened
+ * parameters list.
+ *
+ * @deprecated Use {@link MarshallerContext#registerClassName(byte, int, java.lang.String, boolean)} instead.
+ * This particular method will be deleted in future releases.
+ */
+ @Deprecated
+ public boolean registerClassName(
+ byte platformId,
+ int typeId,
+ String clsName
+ ) throws IgniteCheckedException;
/**
* Method to register typeId->class name mapping in marshaller context <b>on local node only</b>.
diff --git a/modules/core/src/main/java/org/apache/ignite/thread/IgniteThread.java b/modules/core/src/main/java/org/apache/ignite/thread/IgniteThread.java
index e37e6a7..f12afe3 100644
--- a/modules/core/src/main/java/org/apache/ignite/thread/IgniteThread.java
+++ b/modules/core/src/main/java/org/apache/ignite/thread/IgniteThread.java
@@ -171,6 +171,15 @@ public class IgniteThread extends Thread {
}
/**
+ * @return {@code True} if thread is not allowed to request binary metadata to avoid potential deadlock.
+ */
+ public static boolean currentThreadCanRequestBinaryMetadata() {
+ IgniteThread curThread = current();
+
+ return curThread == null || !curThread.isForbiddenToRequestBinaryMetadata();
+ }
+
+ /**
* Callback before entry processor execution is started.
*/
public static void onEntryProcessorEntered(boolean holdsTopLock) {
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryMarshallerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryMarshallerSelfTest.java
index aabd2c8..e4655dd 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryMarshallerSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryMarshallerSelfTest.java
@@ -695,8 +695,8 @@ public class BinaryMarshallerSelfTest extends GridCommonAbstractTest {
@Test
public void testDeclaredBodyEnum() throws Exception {
final MarshallerContextTestImpl ctx = new MarshallerContextTestImpl();
- ctx.registerClassName((byte)0, 1, EnumObject.class.getName());
- ctx.registerClassName((byte)0, 2, DeclaredBodyEnum.class.getName());
+ ctx.registerClassName((byte)0, 1, EnumObject.class.getName(), false);
+ ctx.registerClassName((byte)0, 2, DeclaredBodyEnum.class.getName(), false);
BinaryMarshaller marsh = binaryMarshaller();
marsh.setContext(ctx);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/binary/GridBinaryMarshallerCtxDisabledSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/binary/GridBinaryMarshallerCtxDisabledSelfTest.java
index 0625903..3ea1a2d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/binary/GridBinaryMarshallerCtxDisabledSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/binary/GridBinaryMarshallerCtxDisabledSelfTest.java
@@ -94,9 +94,9 @@ public class GridBinaryMarshallerCtxDisabledSelfTest extends GridCommonAbstractT
private static class MarshallerContextWithNoStorage implements MarshallerContext {
/** {@inheritDoc} */
@Override public boolean registerClassName(
- byte platformId,
- int typeId,
- String clsName
+ byte platformId,
+ int typeId,
+ String clsName
) throws IgniteCheckedException {
return false;
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/BinaryMetadataRegistrationInsideEntryProcessorTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/BinaryMetadataRegistrationInsideEntryProcessorTest.java
index 331dd11..dd26e09 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/BinaryMetadataRegistrationInsideEntryProcessorTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/BinaryMetadataRegistrationInsideEntryProcessorTest.java
@@ -20,17 +20,37 @@ package org.apache.ignite.internal.processors.cache;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.cache.Cache;
import javax.cache.processor.EntryProcessor;
import javax.cache.processor.EntryProcessorException;
import javax.cache.processor.MutableEntry;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
+import org.apache.ignite.binary.BinaryObjectBuilder;
+import org.apache.ignite.cache.CacheEntryProcessor;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cache.query.ScanQuery;
+import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.After;
import org.junit.Test;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.cache.PartitionLossPolicy.READ_WRITE_SAFE;
+
/**
*
*/
@@ -48,6 +68,12 @@ public class BinaryMetadataRegistrationInsideEntryProcessorTest extends GridComm
.setPeerClassLoadingEnabled(true);
}
+ /** Stop all grids after each test. */
+ @After
+ public void stopAllGridsAfterTest() {
+ stopAllGrids();
+ }
+
/**
* @throws Exception If failed;
*/
@@ -74,6 +100,98 @@ public class BinaryMetadataRegistrationInsideEntryProcessorTest extends GridComm
}
/**
+ * Continuously execute multiple EntryProcessors with having continuous queries in parallel.
+ * This used to lead to several deadlocks.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testContinuousQueryAndBinaryObjectBuilder() throws Exception {
+ startGrids(3).cluster().active(true);
+
+ grid(0).createCache(new CacheConfiguration<>()
+ .setName(CACHE_NAME)
+ .setAtomicityMode(ATOMIC)
+ .setBackups(2)
+ .setCacheMode(PARTITIONED)
+ .setWriteSynchronizationMode(FULL_SYNC)
+ .setPartitionLossPolicy(READ_WRITE_SAFE)
+ );
+
+ IgniteEx client1 = startGrid(getConfiguration().setIgniteInstanceName("client1").setClientMode(true));
+ IgniteEx client2 = startGrid(getConfiguration().setIgniteInstanceName("client2").setClientMode(true));
+
+ AtomicBoolean stop = new AtomicBoolean();
+ AtomicInteger keyCntr = new AtomicInteger();
+ AtomicInteger binaryTypeCntr = new AtomicInteger();
+
+ /** */
+ class MyEntryProcessor implements CacheEntryProcessor<Object, Object, Object> {
+ /** Cached int value retrieved from {@code binaryTypeCntr} variable. */
+ private int i;
+
+ /** */
+ public MyEntryProcessor(int i) {
+ this.i = i;
+ }
+
+ /** */
+ @IgniteInstanceResource
+ Ignite ignite;
+
+ /** {@inheritDoc} */
+ @Override
+ public Object process(MutableEntry<Object, Object> entry, Object... arguments) throws EntryProcessorException {
+ BinaryObjectBuilder builder = ignite.binary().builder("my_type");
+
+ builder.setField("new_field" + i, i);
+
+ entry.setValue(builder.build());
+
+ return null;
+ }
+ }
+
+ IgniteInternalFuture fut1 = GridTestUtils.runMultiThreadedAsync(() -> {
+ IgniteCache<Object, Object> cache = client1.cache(CACHE_NAME).withKeepBinary();
+
+ while (!stop.get()) {
+ Integer key = keyCntr.getAndIncrement();
+
+ cache.put(key, key);
+
+ cache.invoke(key, new MyEntryProcessor(binaryTypeCntr.get()));
+
+ binaryTypeCntr.incrementAndGet();
+ }
+ }, 8, "writer-thread");
+
+ IgniteInternalFuture fut2 = GridTestUtils.runAsync(() -> {
+ IgniteCache<Object, Object> cache = client2.cache(CACHE_NAME).withKeepBinary();
+
+ while (!stop.get()) {
+ ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+ qry.setInitialQuery(new ScanQuery<>((key, val) -> true));
+
+ qry.setLocalListener(evts -> {});
+
+ //noinspection EmptyTryBlock
+ try (QueryCursor<Cache.Entry<Object, Object>> cursor = cache.query(qry)) {
+ // No-op.
+ }
+ }
+ });
+
+ doSleep(10_000);
+
+ stop.set(true);
+
+ fut1.get(10, TimeUnit.SECONDS);
+ fut2.get(10, TimeUnit.SECONDS);
+ }
+
+ /**
*
*/
private static class CustomProcessor implements EntryProcessor<Integer,
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheEntryMemorySizeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheEntryMemorySizeSelfTest.java
index c6f74cb..168fced 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheEntryMemorySizeSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheEntryMemorySizeSelfTest.java
@@ -140,7 +140,11 @@ public class GridCacheEntryMemorySizeSelfTest extends GridCommonAbstractTest {
Marshaller marsh = createStandaloneBinaryMarshaller();
marsh.setContext(new MarshallerContext() {
- @Override public boolean registerClassName(byte platformId, int typeId, String clsName) {
+ @Override public boolean registerClassName(
+ byte platformId,
+ int typeId,
+ String clsName
+ ) {
return true;
}
diff --git a/modules/core/src/test/java/org/apache/ignite/marshaller/MarshallerContextTestImpl.java b/modules/core/src/test/java/org/apache/ignite/marshaller/MarshallerContextTestImpl.java
index ae057ee..8d98b0d 100644
--- a/modules/core/src/test/java/org/apache/ignite/marshaller/MarshallerContextTestImpl.java
+++ b/modules/core/src/test/java/org/apache/ignite/marshaller/MarshallerContextTestImpl.java
@@ -72,7 +72,7 @@ public class MarshallerContextTestImpl extends MarshallerContextImpl {
}
/** {@inheritDoc} */
- @Override public boolean registerClassName(byte platformId, int typeId, String clsName) throws IgniteCheckedException {
+ @Override public boolean registerClassName(byte platformId, int typeId, String clsName, boolean failIfUnregistered) throws IgniteCheckedException {
if (excluded != null && excluded.contains(clsName))
return false;