You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2018/08/17 09:32:48 UTC
[2/2] ignite git commit: IGNITE-8926 Fixed deadlock on metadata
registration and concurrent CQ modification - Fixes #4507.
IGNITE-8926 Fixed deadlock on metadata registration and concurrent CQ modification - Fixes #4507.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3980e1ef
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3980e1ef
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3980e1ef
Branch: refs/heads/master
Commit: 3980e1ef408d1aa57af1d3165e8b471eccca47af
Parents: 94ba157
Author: Ilya Lantukh <il...@gridgain.com>
Authored: Fri Aug 17 12:13:06 2018 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Fri Aug 17 12:32:22 2018 +0300
----------------------------------------------------------------------
.../UnregisteredBinaryTypeException.java | 96 ++++++++++++
.../binary/BinaryCachingMetadataHandler.java | 2 +-
.../internal/binary/BinaryClassDescriptor.java | 9 +-
.../ignite/internal/binary/BinaryContext.java | 11 +-
.../internal/binary/BinaryFieldAccessor.java | 4 +
.../internal/binary/BinaryMetadataHandler.java | 3 +-
.../binary/BinaryNoopMetadataHandler.java | 2 +-
.../internal/binary/BinaryWriterExImpl.java | 13 ++
.../binary/builder/BinaryBuilderSerializer.java | 2 +-
.../binary/builder/BinaryObjectBuilderImpl.java | 9 +-
.../internal/client/thin/ClientBinary.java | 2 +-
.../internal/client/thin/TcpIgniteClient.java | 6 +-
.../cache/CacheInvokeDirectResult.java | 26 ++++
.../processors/cache/CacheInvokeResult.java | 6 +
.../processors/cache/GridCacheMapEntry.java | 21 +++
.../processors/cache/GridCacheReturn.java | 13 +-
.../binary/CacheObjectBinaryProcessor.java | 3 +-
.../binary/CacheObjectBinaryProcessorImpl.java | 17 ++-
.../distributed/dht/GridDhtTxPrepareFuture.java | 14 +-
.../dht/atomic/DhtAtomicUpdateResult.java | 33 +++-
.../dht/atomic/GridDhtAtomicCache.java | 143 ++++++++++--------
.../local/atomic/GridLocalAtomicCache.java | 6 +
.../cache/persistence/tree/BPlusTree.java | 5 +
.../cache/transactions/IgniteTxAdapter.java | 6 +
.../cache/transactions/IgniteTxEntry.java | 6 +
.../transactions/IgniteTxLocalAdapter.java | 6 +
.../platform/PlatformContextImpl.java | 2 +-
.../binary/ClientBinaryTypePutRequest.java | 2 +-
.../org/apache/ignite/thread/IgniteThread.java | 45 +++++-
.../binary/TestCachingMetadataHandler.java | 2 +-
...ataRegistrationInsideEntryProcessorTest.java | 141 +++++++++++++++++
.../cache/IgniteCacheInvokeAbstractTest.java | 150 +++++++++++++++++++
.../ignite/testsuites/IgniteCacheTestSuite.java | 3 +
33 files changed, 711 insertions(+), 98 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/3980e1ef/modules/core/src/main/java/org/apache/ignite/internal/UnregisteredBinaryTypeException.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/UnregisteredBinaryTypeException.java b/modules/core/src/main/java/org/apache/ignite/internal/UnregisteredBinaryTypeException.java
new file mode 100644
index 0000000..f46de12
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/UnregisteredBinaryTypeException.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.binary.BinaryMetadata;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Exception thrown during serialization if binary metadata isn't registered and it's registration isn't allowed.
+ */
+public class UnregisteredBinaryTypeException extends IgniteException {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private final int typeId;
+
+ /** */
+ private final BinaryMetadata binaryMetadata;
+
+ /**
+ * @param typeId Type ID.
+ * @param binaryMetadata Binary metadata.
+ */
+ public UnregisteredBinaryTypeException(int typeId, BinaryMetadata binaryMetadata) {
+ this.typeId = typeId;
+ this.binaryMetadata = binaryMetadata;
+ }
+
+ /**
+ * @param msg Error message.
+ * @param typeId Type ID.
+ * @param binaryMetadata Binary metadata.
+ */
+ public UnregisteredBinaryTypeException(String msg, int typeId,
+ BinaryMetadata binaryMetadata) {
+ super(msg);
+ this.typeId = typeId;
+ this.binaryMetadata = binaryMetadata;
+ }
+
+ /**
+ * @param cause Non-null throwable cause.
+ * @param typeId Type ID.
+ * @param binaryMetadata Binary metadata.
+ */
+ public UnregisteredBinaryTypeException(Throwable cause, int typeId,
+ BinaryMetadata binaryMetadata) {
+ super(cause);
+ this.typeId = typeId;
+ this.binaryMetadata = binaryMetadata;
+ }
+
+ /**
+ * @param msg Error message.
+ * @param cause Non-null throwable cause.
+ * @param typeId Type ID.
+ * @param binaryMetadata Binary metadata.
+ */
+ public UnregisteredBinaryTypeException(String msg, @Nullable Throwable cause, int typeId,
+ BinaryMetadata binaryMetadata) {
+ super(msg, cause);
+ this.typeId = typeId;
+ this.binaryMetadata = binaryMetadata;
+ }
+
+ /**
+ * @return Type ID.
+ */
+ public int typeId() {
+ return typeId;
+ }
+
+ /**
+ * @return Binary metadata.
+ */
+ public BinaryMetadata binaryMetadata() {
+ return binaryMetadata;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/3980e1ef/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryCachingMetadataHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryCachingMetadataHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryCachingMetadataHandler.java
index 535249c..a0559cb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryCachingMetadataHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryCachingMetadataHandler.java
@@ -46,7 +46,7 @@ public class BinaryCachingMetadataHandler implements BinaryMetadataHandler {
}
/** {@inheritDoc} */
- @Override public synchronized void addMeta(int typeId, BinaryType type) throws BinaryObjectException {
+ @Override public synchronized void addMeta(int typeId, BinaryType type, boolean failIfUnregistered) throws BinaryObjectException {
synchronized (this) {
BinaryType oldType = metas.put(typeId, type);
http://git-wip-us.apache.org/repos/asf/ignite/blob/3980e1ef/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryClassDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryClassDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryClassDescriptor.java
index 106d238..cd32120 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryClassDescriptor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryClassDescriptor.java
@@ -40,6 +40,8 @@ import org.apache.ignite.binary.BinaryObjectException;
import org.apache.ignite.binary.BinaryReflectiveSerializer;
import org.apache.ignite.binary.BinarySerializer;
import org.apache.ignite.binary.Binarylizable;
+import org.apache.ignite.internal.UnregisteredClassException;
+import org.apache.ignite.internal.UnregisteredBinaryTypeException;
import org.apache.ignite.internal.marshaller.optimized.OptimizedMarshaller;
import org.apache.ignite.internal.processors.cache.CacheObjectImpl;
import org.apache.ignite.internal.processors.query.QueryUtils;
@@ -773,7 +775,7 @@ public class BinaryClassDescriptor {
BinaryMetadata meta = new BinaryMetadata(typeId, typeName, collector.meta(),
affKeyFieldName, Collections.singleton(newSchema), false, null);
- ctx.updateMetadata(typeId, meta);
+ ctx.updateMetadata(typeId, meta, writer.failIfUnregistered());
schemaReg.addSchema(newSchema.schemaId(), newSchema);
}
@@ -794,7 +796,7 @@ public class BinaryClassDescriptor {
BinaryMetadata meta = new BinaryMetadata(typeId, typeName, stableFieldsMeta,
affKeyFieldName, Collections.singleton(stableSchema), false, null);
- ctx.updateMetadata(typeId, meta);
+ ctx.updateMetadata(typeId, meta, writer.failIfUnregistered());
schemaReg.addSchema(stableSchema.schemaId(), stableSchema);
@@ -823,6 +825,9 @@ public class BinaryClassDescriptor {
}
}
catch (Exception e) {
+ if (e instanceof UnregisteredBinaryTypeException || e instanceof UnregisteredClassException)
+ throw e;
+
String msg;
if (S.INCLUDE_SENSITIVE && !F.isEmpty(typeName))
http://git-wip-us.apache.org/repos/asf/ignite/blob/3980e1ef/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryContext.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryContext.java
index 0121570..7885d95 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryContext.java
@@ -653,7 +653,7 @@ public class BinaryContext {
schemas, desc0.isEnum(),
cls.isEnum() ? enumMap(cls) : null);
- metaHnd.addMeta(desc0.typeId(), meta.wrap(this));
+ metaHnd.addMeta(desc0.typeId(), meta.wrap(this), false);
return desc0;
}
@@ -801,7 +801,7 @@ public class BinaryContext {
if (!deserialize)
metaHnd.addMeta(typeId, new BinaryMetadata(typeId, typeName, desc.fieldsMeta(), affFieldName, null,
- desc.isEnum(), cls.isEnum() ? enumMap(cls) : null).wrap(this));
+ desc.isEnum(), cls.isEnum() ? enumMap(cls) : null).wrap(this), false);
descByCls.put(cls, desc);
@@ -1170,7 +1170,7 @@ public class BinaryContext {
}
metaHnd.addMeta(id,
- new BinaryMetadata(id, typeName, fieldsMeta, affKeyFieldName, null, isEnum, enumMap).wrap(this));
+ new BinaryMetadata(id, typeName, fieldsMeta, affKeyFieldName, null, isEnum, enumMap).wrap(this), false);
}
/**
@@ -1325,10 +1325,11 @@ public class BinaryContext {
/**
* @param typeId Type ID.
* @param meta Meta data.
+ * @param failIfUnregistered Fail if unregistered.
* @throws BinaryObjectException In case of error.
*/
- public void updateMetadata(int typeId, BinaryMetadata meta) throws BinaryObjectException {
- metaHnd.addMeta(typeId, meta.wrap(this));
+ public void updateMetadata(int typeId, BinaryMetadata meta, boolean failIfUnregistered) throws BinaryObjectException {
+ metaHnd.addMeta(typeId, meta.wrap(this), failIfUnregistered);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/3980e1ef/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryFieldAccessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryFieldAccessor.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryFieldAccessor.java
index 3277403..87c4f3e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryFieldAccessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryFieldAccessor.java
@@ -26,6 +26,7 @@ import java.util.Date;
import java.util.Map;
import java.util.UUID;
import org.apache.ignite.binary.BinaryObjectException;
+import org.apache.ignite.internal.UnregisteredClassException;
import org.apache.ignite.internal.util.GridUnsafe;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -155,6 +156,9 @@ public abstract class BinaryFieldAccessor {
write0(obj, writer);
}
catch (Exception ex) {
+ if (ex instanceof UnregisteredClassException)
+ throw ex;
+
if (S.INCLUDE_SENSITIVE && !F.isEmpty(name))
throw new BinaryObjectException("Failed to write field [name=" + name + ']', ex);
else
http://git-wip-us.apache.org/repos/asf/ignite/blob/3980e1ef/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryMetadataHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryMetadataHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryMetadataHandler.java
index 5df32e7..85ab137 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryMetadataHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryMetadataHandler.java
@@ -30,9 +30,10 @@ public interface BinaryMetadataHandler {
*
* @param typeId Type ID.
* @param meta Metadata.
+ * @param failIfUnregistered Fail if unregistered.
* @throws BinaryObjectException In case of error.
*/
- public void addMeta(int typeId, BinaryType meta) throws BinaryObjectException;
+ public void addMeta(int typeId, BinaryType meta, boolean failIfUnregistered) throws BinaryObjectException;
/**
* Gets meta data for provided type ID.
http://git-wip-us.apache.org/repos/asf/ignite/blob/3980e1ef/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryNoopMetadataHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryNoopMetadataHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryNoopMetadataHandler.java
index bbd9311..4ee2428 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryNoopMetadataHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryNoopMetadataHandler.java
@@ -43,7 +43,7 @@ public class BinaryNoopMetadataHandler implements BinaryMetadataHandler {
}
/** {@inheritDoc} */
- @Override public void addMeta(int typeId, BinaryType meta) throws BinaryObjectException {
+ @Override public void addMeta(int typeId, BinaryType meta, boolean failIfUnregistered) throws BinaryObjectException {
// No-op.
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/3980e1ef/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryWriterExImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryWriterExImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryWriterExImpl.java
index 3d93e70..e6efb0c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryWriterExImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryWriterExImpl.java
@@ -116,6 +116,13 @@ public class BinaryWriterExImpl implements BinaryWriter, BinaryRawWriterEx, Obje
}
/**
+ * @return Fail if unregistered flag value.
+ */
+ public boolean failIfUnregistered() {
+ return failIfUnregistered;
+ }
+
+ /**
* @param failIfUnregistered Fail if unregistered.
*/
public void failIfUnregistered(boolean failIfUnregistered) {
@@ -503,6 +510,8 @@ public class BinaryWriterExImpl implements BinaryWriter, BinaryRawWriterEx, Obje
else {
BinaryWriterExImpl writer = new BinaryWriterExImpl(ctx, out, schema, handles());
+ writer.failIfUnregistered(failIfUnregistered);
+
writer.marshal(obj);
}
}
@@ -1492,6 +1501,8 @@ public class BinaryWriterExImpl implements BinaryWriter, BinaryRawWriterEx, Obje
else {
BinaryWriterExImpl writer = new BinaryWriterExImpl(ctx, out, schema, null);
+ writer.failIfUnregistered(failIfUnregistered);
+
writer.marshal(obj);
}
}
@@ -1915,6 +1926,8 @@ public class BinaryWriterExImpl implements BinaryWriter, BinaryRawWriterEx, Obje
public BinaryWriterExImpl newWriter(int typeId) {
BinaryWriterExImpl res = new BinaryWriterExImpl(ctx, out, schema, handles());
+ res.failIfUnregistered(failIfUnregistered);
+
res.typeId(typeId);
return res;
http://git-wip-us.apache.org/repos/asf/ignite/blob/3980e1ef/modules/core/src/main/java/org/apache/ignite/internal/binary/builder/BinaryBuilderSerializer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/builder/BinaryBuilderSerializer.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/builder/BinaryBuilderSerializer.java
index 5333cc4..edc80b6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/builder/BinaryBuilderSerializer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/builder/BinaryBuilderSerializer.java
@@ -126,7 +126,7 @@ class BinaryBuilderSerializer {
BinaryMetadata meta = new BinaryMetadata(typeId, typeName, null, null, null, true, enumMap);
- writer.context().updateMetadata(typeId, meta);
+ writer.context().updateMetadata(typeId, meta, writer.failIfUnregistered());
// Need register class for marshaller to be able to deserialize enum value.
writer.context().descriptorForClass(((Enum)val).getDeclaringClass(), false, false);
http://git-wip-us.apache.org/repos/asf/ignite/blob/3980e1ef/modules/core/src/main/java/org/apache/ignite/internal/binary/builder/BinaryObjectBuilderImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/builder/BinaryObjectBuilderImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/builder/BinaryObjectBuilderImpl.java
index c577e02..abd63cd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/builder/BinaryObjectBuilderImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/builder/BinaryObjectBuilderImpl.java
@@ -36,6 +36,7 @@ import org.apache.ignite.internal.binary.BinaryUtils;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.thread.IgniteThread;
import org.jetbrains.annotations.Nullable;
import java.util.Collection;
@@ -174,6 +175,12 @@ public class BinaryObjectBuilderImpl implements BinaryObjectBuilder {
/** {@inheritDoc} */
@Override public BinaryObject build() {
try (BinaryWriterExImpl writer = new BinaryWriterExImpl(ctx)) {
+ Thread curThread = Thread.currentThread();
+
+ if (curThread instanceof IgniteThread)
+ writer.failIfUnregistered(((IgniteThread)curThread).executingEntryProcessor() &&
+ ((IgniteThread)curThread).holdsTopLock());
+
writer.typeId(typeId);
BinaryBuilderSerializer serializationCtx = new BinaryBuilderSerializer();
@@ -360,7 +367,7 @@ public class BinaryObjectBuilderImpl implements BinaryObjectBuilder {
ctx.registerUserClassName(typeId, typeName);
ctx.updateMetadata(typeId, new BinaryMetadata(typeId, typeName, fieldsMeta, affFieldName0,
- Collections.singleton(curSchema), false, null));
+ Collections.singleton(curSchema), false, null), writer.failIfUnregistered());
schemaReg.addSchema(curSchema.schemaId(), curSchema);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/3980e1ef/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientBinary.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientBinary.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientBinary.java
index 8525f5e..4164532 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientBinary.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientBinary.java
@@ -160,7 +160,7 @@ class ClientBinary implements IgniteBinary {
int typeId = ctx.typeId(typeName);
- ctx.updateMetadata(typeId, new BinaryMetadata(typeId, typeName, null, null, null, true, vals));
+ ctx.updateMetadata(typeId, new BinaryMetadata(typeId, typeName, null, null, null, true, vals), false);
return ctx.metadata(typeId);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/3980e1ef/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java
index 7beeb79..7a0bc7a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java
@@ -233,7 +233,7 @@ public class TcpIgniteClient implements IgniteClient {
private final BinaryMetadataHandler cache = BinaryCachingMetadataHandler.create();
/** {@inheritDoc} */
- @Override public void addMeta(int typeId, BinaryType meta) throws BinaryObjectException {
+ @Override public void addMeta(int typeId, BinaryType meta, boolean failIfUnregistered) throws BinaryObjectException {
if (cache.metadata(typeId) == null) {
try {
ch.request(
@@ -246,7 +246,7 @@ public class TcpIgniteClient implements IgniteClient {
}
}
- cache.addMeta(typeId, meta); // merge
+ cache.addMeta(typeId, meta, failIfUnregistered); // merge
}
/** {@inheritDoc} */
@@ -259,7 +259,7 @@ public class TcpIgniteClient implements IgniteClient {
if (meta0 != null) {
meta = new BinaryTypeImpl(marsh.context(), meta0);
- cache.addMeta(typeId, meta);
+ cache.addMeta(typeId, meta, false);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/3980e1ef/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeDirectResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeDirectResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeDirectResult.java
index 17f304e..89a0a0ff 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeDirectResult.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeDirectResult.java
@@ -42,6 +42,10 @@ public class CacheInvokeDirectResult implements Message {
/** */
@GridToStringInclude
+ private transient Object unprepareRes;
+
+ /** */
+ @GridToStringInclude
private CacheObject res;
/** */
@@ -69,6 +73,22 @@ public class CacheInvokeDirectResult implements Message {
}
/**
+ * Constructs CacheInvokeDirectResult with unprepared res, to avoid object marshaling while holding topology locks.
+ *
+ * @param key Key.
+ * @param res Result.
+ * @return a new instance of CacheInvokeDirectResult.
+ */
+ static CacheInvokeDirectResult lazyResult(KeyCacheObject key, Object res) {
+ CacheInvokeDirectResult res0 = new CacheInvokeDirectResult();
+
+ res0.key = key;
+ res0.unprepareRes = res;
+
+ return res0;
+ }
+
+ /**
* @param key Key.
* @param err Exception thrown by {@link EntryProcessor#process(MutableEntry, Object...)}.
*/
@@ -120,6 +140,12 @@ public class CacheInvokeDirectResult implements Message {
}
}
+ if (unprepareRes != null) {
+ res = ctx.toCacheObject(unprepareRes);
+
+ unprepareRes = null;
+ }
+
if (res != null)
res.prepareMarshal(ctx.cacheObjectContext());
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/3980e1ef/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeResult.java
index b51c136..2e6d64a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeResult.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeResult.java
@@ -25,6 +25,9 @@ import javax.cache.processor.EntryProcessor;
import javax.cache.processor.EntryProcessorException;
import javax.cache.processor.EntryProcessorResult;
import javax.cache.processor.MutableEntry;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.UnregisteredBinaryTypeException;
+import org.apache.ignite.internal.UnregisteredClassException;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -96,6 +99,9 @@ public class CacheInvokeResult<T> implements EntryProcessorResult<T>, Externaliz
/** {@inheritDoc} */
@Override public T get() throws EntryProcessorException {
if (err != null) {
+ if (err instanceof UnregisteredClassException || err instanceof UnregisteredBinaryTypeException)
+ throw (IgniteException) err;
+
if (err instanceof EntryProcessorException)
throw (EntryProcessorException)err;
http://git-wip-us.apache.org/repos/asf/ignite/blob/3980e1ef/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 90d4c4a..d8dcc07 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -35,6 +35,8 @@ import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.eviction.EvictableEntry;
import org.apache.ignite.internal.NodeStoppingException;
+import org.apache.ignite.internal.UnregisteredBinaryTypeException;
+import org.apache.ignite.internal.UnregisteredClassException;
import org.apache.ignite.internal.pagemem.wal.StorageException;
import org.apache.ignite.internal.pagemem.wal.WALPointer;
import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
@@ -81,6 +83,7 @@ import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.thread.IgniteThread;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_EXPIRED;
@@ -1493,6 +1496,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
CacheInvokeEntry<Object, Object> entry = new CacheInvokeEntry<>(key, old, version(), keepBinary, this);
+ IgniteThread.onEntryProcessorEntered(false);
+
try {
Object computed = entryProcessor.process(entry, invokeArgs);
@@ -1517,6 +1522,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
invokeRes = CacheInvokeResult.fromError(e);
}
+ finally {
+ IgniteThread.onEntryProcessorLeft();
+ }
if (!entry.modified()) {
if (expiryPlc != null && !readFromStore && hasValueUnlocked())
@@ -1823,6 +1831,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
CacheInvokeEntry<Object, Object> entry =
new CacheInvokeEntry<>(key, prevVal, version(), keepBinary, this);
+ IgniteThread.onEntryProcessorEntered(true);
+
try {
entryProcessor.process(entry, invokeArgs);
@@ -1832,6 +1842,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
catch (Exception ignore) {
evtVal = prevVal;
}
+ finally {
+ IgniteThread.onEntryProcessorLeft();
+ }
}
else
evtVal = (CacheObject)writeObj;
@@ -5405,6 +5418,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
private IgniteBiTuple<Object, Exception> runEntryProcessor(CacheInvokeEntry<Object, Object> invokeEntry) {
EntryProcessor<Object, Object, ?> entryProcessor = (EntryProcessor<Object, Object, ?>)writeObj;
+ IgniteThread.onEntryProcessorEntered(true);
+
try {
Object computed = entryProcessor.process(invokeEntry, invokeArgs);
@@ -5422,10 +5437,16 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
return null;
}
catch (Exception e) {
+ if (e instanceof UnregisteredClassException || e instanceof UnregisteredBinaryTypeException)
+ throw (IgniteException) e;
+
writeObj = invokeEntry.valObj;
return new IgniteBiTuple<>(null, e);
}
+ finally {
+ IgniteThread.onEntryProcessorLeft();
+ }
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/3980e1ef/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturn.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturn.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturn.java
index bc85931..530f5b6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturn.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturn.java
@@ -31,6 +31,7 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.internal.GridDirectCollection;
import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.UnregisteredBinaryTypeException;
import org.apache.ignite.internal.UnregisteredClassException;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.CU;
@@ -243,9 +244,13 @@ public class GridCacheReturn implements Externalizable, Message {
v = resMap;
}
- // This exception means that we should register class and call EntryProcessor again.
- if (err != null && err instanceof UnregisteredClassException)
- throw (UnregisteredClassException) err;
+ // These exceptions mean that we should register class and call EntryProcessor again.
+ if (err != null) {
+ if (err instanceof UnregisteredClassException)
+ throw (UnregisteredClassException) err;
+ else if (err instanceof UnregisteredBinaryTypeException)
+ throw (UnregisteredBinaryTypeException) err;
+ }
CacheInvokeResult res0 = err == null ? CacheInvokeResult.fromResult(res) : CacheInvokeResult.fromError(err);
@@ -264,7 +269,7 @@ public class GridCacheReturn implements Externalizable, Message {
invokeResCol = new ArrayList<>();
CacheInvokeDirectResult res0 = err == null ?
- new CacheInvokeDirectResult(key, cctx.toCacheObject(res)) : new CacheInvokeDirectResult(key, err);
+ CacheInvokeDirectResult.lazyResult(key, res) : new CacheInvokeDirectResult(key, err);
invokeResCol.add(res0);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/3980e1ef/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessor.java
index c7e2e68..ea8f371 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessor.java
@@ -49,9 +49,10 @@ public interface CacheObjectBinaryProcessor extends IgniteCacheObjectProcessor {
/**
* @param typeId Type ID.
* @param newMeta New meta data.
+ * @param failIfUnregistered Fail if unregistered.
* @throws IgniteException In case of error.
*/
- public void addMeta(int typeId, final BinaryType newMeta) throws IgniteException;
+ public void addMeta(int typeId, final BinaryType newMeta, boolean failIfUnregistered) throws IgniteException;
/**
* Adds metadata locally without triggering discovery exchange.
http://git-wip-us.apache.org/repos/asf/ignite/blob/3980e1ef/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
index 69d1f91..d6b920a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
@@ -42,6 +42,7 @@ import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.events.Event;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteNodeAttributes;
+import org.apache.ignite.internal.UnregisteredBinaryTypeException;
import org.apache.ignite.internal.binary.BinaryContext;
import org.apache.ignite.internal.binary.BinaryEnumObjectImpl;
import org.apache.ignite.internal.binary.BinaryFieldMetadata;
@@ -163,7 +164,7 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
transport = new BinaryMetadataTransport(metadataLocCache, metadataFileStore, ctx, log);
BinaryMetadataHandler metaHnd = new BinaryMetadataHandler() {
- @Override public void addMeta(int typeId, BinaryType newMeta) throws BinaryObjectException {
+ @Override public void addMeta(int typeId, BinaryType newMeta, boolean failIfUnregistered) throws BinaryObjectException {
assert newMeta != null;
assert newMeta instanceof BinaryTypeImpl;
@@ -182,7 +183,7 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
BinaryMetadata newMeta0 = ((BinaryTypeImpl)newMeta).metadata();
- CacheObjectBinaryProcessorImpl.this.addMeta(typeId, newMeta0.wrap(binaryCtx));
+ CacheObjectBinaryProcessorImpl.this.addMeta(typeId, newMeta0.wrap(binaryCtx), failIfUnregistered);
}
@Override public BinaryType metadata(int typeId) throws BinaryObjectException {
@@ -436,11 +437,11 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
BinaryMetadata meta = new BinaryMetadata(typeId, typeName, fieldTypeIds, affKeyFieldName, null, isEnum,
enumMap);
- binaryCtx.updateMetadata(typeId, meta);
+ binaryCtx.updateMetadata(typeId, meta, false);
}
/** {@inheritDoc} */
- @Override public void addMeta(final int typeId, final BinaryType newMeta) throws BinaryObjectException {
+ @Override public void addMeta(final int typeId, final BinaryType newMeta, boolean failIfUnregistered) throws BinaryObjectException {
assert newMeta != null;
assert newMeta instanceof BinaryTypeImpl;
@@ -457,6 +458,14 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
if (mergedMeta == oldMeta)
return;
+ if (failIfUnregistered)
+ throw new UnregisteredBinaryTypeException(
+ "Attempted to update binary metadata inside a critical synchronization block (will be " +
+ "automatically retried). This exception must not be wrapped to any other exception class. " +
+ "If you encounter this exception outside of EntryProcessor, please report to Apache Ignite " +
+ "dev-list.",
+ typeId, mergedMeta);
+
MetadataUpdateResult res = transport.requestMetadataUpdate(mergedMeta).get();
assert res != null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/3980e1ef/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index ba1210e..d8e204a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -87,6 +87,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteFutureCancelledException;
import org.apache.ignite.lang.IgniteReducer;
import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.thread.IgniteThread;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_OBJECT_LOADED;
@@ -412,10 +413,12 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
txEntry.oldValueOnPrimary(val != null);
for (T2<EntryProcessor<Object, Object, Object>, Object[]> t : txEntry.entryProcessors()) {
- CacheInvokeEntry<Object, Object> invokeEntry = new CacheInvokeEntry<>(key, val,
- txEntry.cached().version(), keepBinary, txEntry.cached());
+ CacheInvokeEntry<Object, Object> invokeEntry = new CacheInvokeEntry<>(key, val,
+ txEntry.cached().version(), keepBinary, txEntry.cached());
- try {
+ IgniteThread.onEntryProcessorEntered(false);
+
+ try {
EntryProcessor<Object, Object, Object> processor = t.get1();
procRes = processor.process(invokeEntry, t.get2());
@@ -430,8 +433,11 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
break;
}
+ finally {
+ IgniteThread.onEntryProcessorLeft();
+ }
- modified |= invokeEntry.modified();
+ modified |= invokeEntry.modified();
}
if (modified)
http://git-wip-us.apache.org/repos/asf/ignite/blob/3980e1ef/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/DhtAtomicUpdateResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/DhtAtomicUpdateResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/DhtAtomicUpdateResult.java
index e7d2b19..15db625 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/DhtAtomicUpdateResult.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/DhtAtomicUpdateResult.java
@@ -45,6 +45,12 @@ class DhtAtomicUpdateResult {
private IgniteCacheExpiryPolicy expiry;
/**
+ * If batch update was interrupted in the middle, it should be continued from processedEntriesCount to avoid
+ * extra update closure invocation.
+ */
+ private int processedEntriesCount;
+
+ /**
*
*/
DhtAtomicUpdateResult() {
@@ -97,11 +103,20 @@ class DhtAtomicUpdateResult {
/**
* @return Deleted entries.
*/
- Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted() {
+ public Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted() {
return deleted;
}
/**
+ * Sets deleted entries.
+ *
+ * @param deleted deleted entries.
+ */
+ void deleted(Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted) {
+ this.deleted = deleted;
+ }
+
+ /**
* @return DHT future.
*/
GridDhtAtomicAbstractUpdateFuture dhtFuture() {
@@ -128,4 +143,20 @@ class DhtAtomicUpdateResult {
void dhtFuture(@Nullable GridDhtAtomicAbstractUpdateFuture dhtFut) {
this.dhtFut = dhtFut;
}
+
+ /**
+ * Sets processed entries count.
+ * @param idx processed entries count.
+ */
+ public void processedEntriesCount(int idx) {
+ processedEntriesCount = idx;
+ }
+
+ /**
+ * Returns processed entries count.
+ * @return processed entries count.
+ */
+ public int processedEntriesCount() {
+ return processedEntriesCount;
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/3980e1ef/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index c39842e..93b9ac6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -37,6 +37,7 @@ import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.NodeStoppingException;
+import org.apache.ignite.internal.UnregisteredBinaryTypeException;
import org.apache.ignite.internal.UnregisteredClassException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.mem.IgniteOutOfMemoryException;
@@ -110,6 +111,7 @@ import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.plugin.security.SecurityPermission;
+import org.apache.ignite.thread.IgniteThread;
import org.apache.ignite.transactions.TransactionIsolation;
import org.jetbrains.annotations.Nullable;
@@ -1741,6 +1743,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted = null;
+ DhtAtomicUpdateResult updDhtRes = new DhtAtomicUpdateResult();
+
try {
while (true) {
try {
@@ -1769,11 +1773,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
if (!remap) {
- DhtAtomicUpdateResult updRes = update(node, locked, req, res);
+ update(node, locked, req, res, updDhtRes);
- dhtFut = updRes.dhtFuture();
- deleted = updRes.deleted();
- expiry = updRes.expiryPolicy();
+ dhtFut = updDhtRes.dhtFuture();
+ deleted = updDhtRes.deleted();
+ expiry = updDhtRes.expiryPolicy();
}
else
// Should remap all keys.
@@ -1790,7 +1794,16 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
assert cacheObjProc instanceof CacheObjectBinaryProcessorImpl;
- ((CacheObjectBinaryProcessorImpl)cacheObjProc).binaryContext().descriptorForClass(ex.cls(), false, false);
+ ((CacheObjectBinaryProcessorImpl)cacheObjProc)
+ .binaryContext().descriptorForClass(ex.cls(), false, false);
+ }
+ catch (UnregisteredBinaryTypeException ex) {
+ IgniteCacheObjectProcessor cacheObjProc = ctx.cacheObjects();
+
+ assert cacheObjProc instanceof CacheObjectBinaryProcessorImpl;
+
+ ((CacheObjectBinaryProcessorImpl)cacheObjProc)
+ .binaryContext().updateMetadata(ex.typeId(), ex.binaryMetadata(), false);
}
}
}
@@ -1863,6 +1876,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
* @param locked Entries.
* @param req Request.
* @param res Response.
+ * @param dhtUpdRes DHT update result
* @return Operation result.
* @throws GridCacheEntryRemovedException If got obsolete entry.
*/
@@ -1870,7 +1884,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
ClusterNode node,
List<GridDhtCacheEntry> locked,
GridNearAtomicAbstractUpdateRequest req,
- GridNearAtomicUpdateResponse res)
+ GridNearAtomicUpdateResponse res,
+ DhtAtomicUpdateResult dhtUpdRes)
throws GridCacheEntryRemovedException
{
GridDhtPartitionTopology top = topology();
@@ -1894,14 +1909,13 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
boolean sndPrevVal = !top.rebalanceFinished(req.topologyVersion());
- GridDhtAtomicAbstractUpdateFuture dhtFut = createDhtFuture(ver, req);
+ if (dhtUpdRes.dhtFuture() == null)
+ dhtUpdRes.dhtFuture(createDhtFuture(ver, req));
IgniteCacheExpiryPolicy expiry = expiryPolicy(req.expiry());
GridCacheReturn retVal = null;
- DhtAtomicUpdateResult updRes;
-
if (req.size() > 1 && // Several keys ...
writeThrough() && !req.skipStore() && // and store is enabled ...
!ctx.store().isLocal() && // and this is not local store ...
@@ -1909,40 +1923,39 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
!ctx.dr().receiveEnabled() // and no DR.
) {
// This method can only be used when there are no replicated entries in the batch.
- updRes = updateWithBatch(node,
+ updateWithBatch(node,
hasNear,
req,
res,
locked,
ver,
- dhtFut,
ctx.isDrEnabled(),
taskName,
expiry,
- sndPrevVal);
-
- dhtFut = updRes.dhtFuture();
+ sndPrevVal,
+ dhtUpdRes);
if (req.operation() == TRANSFORM)
- retVal = updRes.returnValue();
+ retVal = dhtUpdRes.returnValue();
}
else {
- updRes = updateSingle(node,
+ updateSingle(node,
hasNear,
req,
res,
locked,
ver,
- dhtFut,
ctx.isDrEnabled(),
taskName,
expiry,
- sndPrevVal);
+ sndPrevVal,
+ dhtUpdRes);
- retVal = updRes.returnValue();
- dhtFut = updRes.dhtFuture();
+ retVal = dhtUpdRes.returnValue();
}
+ GridDhtAtomicAbstractUpdateFuture dhtFut = dhtUpdRes.dhtFuture();
+
if (retVal == null)
retVal = new GridCacheReturn(ctx, node.isLocal(), true, null, true);
@@ -1955,7 +1968,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
&& !dhtFut.isDone()) {
final IgniteRunnable tracker = GridNioBackPressureControl.threadTracker();
- if (tracker != null && tracker instanceof GridNioMessageTracker) {
+ if (tracker instanceof GridNioMessageTracker) {
((GridNioMessageTracker)tracker).onMessageReceived();
dhtFut.listen(new IgniteInClosure<IgniteInternalFuture<Void>>() {
@@ -1969,9 +1982,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
ctx.mvcc().addAtomicFuture(dhtFut.id(), dhtFut);
}
- updRes.expiryPolicy(expiry);
+ dhtUpdRes.expiryPolicy(expiry);
- return updRes;
+ return dhtUpdRes;
}
/**
@@ -1983,27 +1996,26 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
* @param res Update response.
* @param locked Locked entries.
* @param ver Assigned version.
- * @param dhtFut Optional DHT future.
* @param replicate Whether replication is enabled.
* @param taskName Task name.
* @param expiry Expiry policy.
* @param sndPrevVal If {@code true} sends previous value to backups.
- * @return Deleted entries.
+ * @param dhtUpdRes DHT update result.
* @throws GridCacheEntryRemovedException Should not be thrown.
*/
@SuppressWarnings("unchecked")
- private DhtAtomicUpdateResult updateWithBatch(
+ private void updateWithBatch(
final ClusterNode node,
final boolean hasNear,
final GridNearAtomicAbstractUpdateRequest req,
final GridNearAtomicUpdateResponse res,
final List<GridDhtCacheEntry> locked,
final GridCacheVersion ver,
- @Nullable GridDhtAtomicAbstractUpdateFuture dhtFut,
final boolean replicate,
final String taskName,
@Nullable final IgniteCacheExpiryPolicy expiry,
- final boolean sndPrevVal
+ final boolean sndPrevVal,
+ final DhtAtomicUpdateResult dhtUpdRes
) throws GridCacheEntryRemovedException {
assert !ctx.dr().receiveEnabled(); // Cannot update in batches during DR due to possible conflicts.
assert !req.returnValue() || req.operation() == TRANSFORM; // Should not request return values for putAll.
@@ -2015,7 +2027,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
catch (IgniteCheckedException e) {
res.addFailedKeys(req.keys(), e);
- return new DhtAtomicUpdateResult();
+ return;
}
}
@@ -2029,8 +2041,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
List<CacheObject> writeVals = null;
- DhtAtomicUpdateResult updRes = new DhtAtomicUpdateResult();
-
List<GridDhtCacheEntry> filtered = new ArrayList<>(size);
GridCacheOperation op = req.operation();
@@ -2041,7 +2051,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
boolean intercept = ctx.config().getInterceptor() != null;
- for (int i = 0; i < locked.size(); i++) {
+ for (int i = dhtUpdRes.processedEntriesCount(); i < locked.size(); i++) {
GridDhtCacheEntry entry = locked.get(i);
try {
@@ -2100,6 +2110,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
boolean validation = false;
+ IgniteThread.onEntryProcessorEntered(true);
+
try {
Object computed = entryProcessor.process(invokeEntry, req.invokeArguments());
@@ -2126,6 +2138,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
}
catch (Exception e) {
+ if (e instanceof UnregisteredClassException || e instanceof UnregisteredBinaryTypeException)
+ throw (IgniteException) e;
+
curInvokeRes = CacheInvokeResult.fromError(e);
updated = old;
@@ -2137,6 +2152,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
}
finally {
+ IgniteThread.onEntryProcessorLeft();
+
if (curInvokeRes != null) {
invokeRes.addEntryProcessResult(ctx, entry.key(), invokeEntry.key(), curInvokeRes.result(),
curInvokeRes.error(), req.keepBinary());
@@ -2155,7 +2172,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
// Update previous batch.
if (putMap != null) {
- dhtFut = updatePartialBatch(
+ updatePartialBatch(
hasNear,
firstEntryIdx,
filtered,
@@ -2165,11 +2182,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
putMap,
null,
entryProcessorMap,
- dhtFut,
req,
res,
replicate,
- updRes,
+ dhtUpdRes,
taskName,
expiry,
sndPrevVal);
@@ -2203,7 +2219,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
// Update previous batch.
if (rmvKeys != null) {
- dhtFut = updatePartialBatch(
+ updatePartialBatch(
hasNear,
firstEntryIdx,
filtered,
@@ -2213,11 +2229,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
null,
rmvKeys,
entryProcessorMap,
- dhtFut,
req,
res,
replicate,
- updRes,
+ dhtUpdRes,
taskName,
expiry,
sndPrevVal);
@@ -2327,7 +2342,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
// Store final batch.
if (putMap != null || rmvKeys != null) {
- dhtFut = updatePartialBatch(
+ updatePartialBatch(
hasNear,
firstEntryIdx,
filtered,
@@ -2337,11 +2352,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
putMap,
rmvKeys,
entryProcessorMap,
- dhtFut,
req,
res,
replicate,
- updRes,
+ dhtUpdRes,
taskName,
expiry,
sndPrevVal);
@@ -2349,11 +2363,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
else
assert filtered.isEmpty();
- updRes.dhtFuture(dhtFut);
-
- updRes.returnValue(invokeRes);
-
- return updRes;
+ dhtUpdRes.returnValue(invokeRes);
}
/**
@@ -2416,29 +2426,30 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
* @param res Update response.
* @param locked Locked entries.
* @param ver Assigned update version.
- * @param dhtFut Optional DHT future.
* @param replicate Whether DR is enabled for that cache.
* @param taskName Task name.
* @param expiry Expiry policy.
* @param sndPrevVal If {@code true} sends previous value to backups.
- * @return Return value.
+ * @param dhtUpdRes Dht update result
* @throws GridCacheEntryRemovedException Should be never thrown.
*/
- private DhtAtomicUpdateResult updateSingle(
+ private void updateSingle(
ClusterNode nearNode,
boolean hasNear,
GridNearAtomicAbstractUpdateRequest req,
GridNearAtomicUpdateResponse res,
List<GridDhtCacheEntry> locked,
GridCacheVersion ver,
- @Nullable GridDhtAtomicAbstractUpdateFuture dhtFut,
boolean replicate,
String taskName,
@Nullable IgniteCacheExpiryPolicy expiry,
- boolean sndPrevVal
+ boolean sndPrevVal,
+ DhtAtomicUpdateResult dhtUpdRes
) throws GridCacheEntryRemovedException {
- GridCacheReturn retVal = null;
- Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted = null;
+ GridCacheReturn retVal = dhtUpdRes.returnValue();
+ GridDhtAtomicAbstractUpdateFuture dhtFut = dhtUpdRes.dhtFuture();
+ Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted = dhtUpdRes.deleted();
+
AffinityTopologyVersion topVer = req.topologyVersion();
@@ -2447,7 +2458,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
AffinityAssignment affAssignment = ctx.affinity().assignment(topVer);
// Avoid iterator creation.
- for (int i = 0; i < req.size(); i++) {
+ for (int i = dhtUpdRes.processedEntriesCount(); i < req.size(); i++) {
KeyCacheObject k = req.key(i);
GridCacheOperation op = req.operation();
@@ -2611,9 +2622,13 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
catch (IgniteCheckedException e) {
res.addFailedKey(k, e);
}
+
+ dhtUpdRes.processedEntriesCount(i + 1);
}
- return new DhtAtomicUpdateResult(retVal, deleted, dhtFut);
+ dhtUpdRes.returnValue(retVal);
+ dhtUpdRes.deleted(deleted);
+ dhtUpdRes.dhtFuture(dhtFut);
}
/**
@@ -2626,18 +2641,16 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
* @param putMap Values to put.
* @param rmvKeys Keys to remove.
* @param entryProcessorMap Entry processors.
- * @param dhtFut DHT update future if has backups.
* @param req Request.
* @param res Response.
* @param replicate Whether replication is enabled.
- * @param batchRes Batch update result.
+ * @param dhtUpdRes Batch update result.
* @param taskName Task name.
* @param expiry Expiry policy.
* @param sndPrevVal If {@code true} sends previous value to backups.
- * @return Deleted entries.
*/
@SuppressWarnings("ForLoopReplaceableByForEach")
- @Nullable private GridDhtAtomicAbstractUpdateFuture updatePartialBatch(
+ @Nullable private void updatePartialBatch(
final boolean hasNear,
final int firstEntryIdx,
final List<GridDhtCacheEntry> entries,
@@ -2647,11 +2660,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
@Nullable final Map<KeyCacheObject, CacheObject> putMap,
@Nullable final Collection<KeyCacheObject> rmvKeys,
@Nullable final Map<KeyCacheObject, EntryProcessor<Object, Object, Object>> entryProcessorMap,
- @Nullable GridDhtAtomicAbstractUpdateFuture dhtFut,
final GridNearAtomicAbstractUpdateRequest req,
final GridNearAtomicUpdateResponse res,
final boolean replicate,
- final DhtAtomicUpdateResult batchRes,
+ final DhtAtomicUpdateResult dhtUpdRes,
final String taskName,
@Nullable final IgniteCacheExpiryPolicy expiry,
final boolean sndPrevVal
@@ -2699,6 +2711,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
AffinityAssignment affAssignment = ctx.affinity().assignment(topVer);
+ final GridDhtAtomicAbstractUpdateFuture dhtFut = dhtUpdRes.dhtFuture();
+
// Avoid iterator creation.
for (int i = 0; i < entries.size(); i++) {
GridDhtCacheEntry entry = entries.get(i);
@@ -2778,7 +2792,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
}
- batchRes.addDeleted(entry, updRes, entries);
+ dhtUpdRes.addDeleted(entry, updRes, entries);
if (dhtFut != null) {
@@ -2838,7 +2852,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
e.printStackTrace();
}
+
+ dhtUpdRes.processedEntriesCount(firstEntryIdx + i + 1);
}
+
}
catch (IgniteCheckedException e) {
res.addFailedKeys(putMap != null ? putMap.keySet() : rmvKeys, e);
@@ -2852,8 +2869,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
res.addFailedKeys(failed, storeErr.getCause());
}
-
- return dhtFut;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/3980e1ef/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
index b96dbdc..c92daba 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
@@ -72,6 +72,7 @@ import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.plugin.security.SecurityPermission;
+import org.apache.ignite.thread.IgniteThread;
import org.apache.ignite.transactions.TransactionIsolation;
import org.jetbrains.annotations.Nullable;
@@ -1108,6 +1109,8 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> {
boolean validation = false;
+ IgniteThread.onEntryProcessorEntered(false);
+
try {
Object computed = entryProcessor.process(invokeEntry, invokeArgs);
@@ -1136,6 +1139,9 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> {
continue;
}
}
+ finally {
+ IgniteThread.onEntryProcessorLeft();
+ }
if (invokeRes != null)
invokeResMap.put((K)entry.key().value(ctx.cacheObjectContext(), false), invokeRes);
http://git-wip-us.apache.org/repos/asf/ignite/blob/3980e1ef/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
index cb28878..762013e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
@@ -30,6 +30,8 @@ import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.failure.FailureContext;
import org.apache.ignite.failure.FailureType;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.UnregisteredBinaryTypeException;
+import org.apache.ignite.internal.UnregisteredClassException;
import org.apache.ignite.internal.pagemem.PageIdUtils;
import org.apache.ignite.internal.pagemem.PageMemory;
import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
@@ -1645,6 +1647,9 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
}
}
}
+ catch (UnregisteredClassException | UnregisteredBinaryTypeException e) {
+ throw e;
+ }
catch (IgniteCheckedException e) {
throw new IgniteCheckedException("Runtime failure on search row: " + row, e);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/3980e1ef/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index 6fdb046..e97b31c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -85,6 +85,7 @@ import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.thread.IgniteThread;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
import org.apache.ignite.transactions.TransactionState;
@@ -1607,6 +1608,8 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
Object procRes = null;
Exception err = null;
+ IgniteThread.onEntryProcessorEntered(true);
+
try {
EntryProcessor<Object, Object, Object> processor = t.get1();
@@ -1619,6 +1622,9 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
catch (Exception e) {
err = e;
}
+ finally {
+ IgniteThread.onEntryProcessorLeft();
+ }
if (ret != null) {
if (err != null || procRes != null)
http://git-wip-us.apache.org/repos/asf/ignite/blob/3980e1ef/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
index 71c6b65..8e65605 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
@@ -52,6 +52,7 @@ import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.apache.ignite.thread.IgniteThread;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.READ;
@@ -762,6 +763,8 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
Object keyVal = null;
for (T2<EntryProcessor<Object, Object, Object>, Object[]> t : entryProcessors()) {
+ IgniteThread.onEntryProcessorEntered(true);
+
try {
CacheInvokeEntry<Object, Object> invokeEntry = new CacheInvokeEntry(key, keyVal, cacheVal, val,
ver, keepBinary(), cached());
@@ -777,6 +780,9 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
catch (Exception ignore) {
// No-op.
}
+ finally {
+ IgniteThread.onEntryProcessorLeft();
+ }
}
return ctx.toCacheObject(val);
http://git-wip-us.apache.org/repos/asf/ignite/blob/3980e1ef/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 6f11a57..0f590ff 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -76,6 +76,7 @@ import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiClosure;
import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.thread.IgniteThread;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionDeadlockException;
import org.apache.ignite.transactions.TransactionIsolation;
@@ -1241,6 +1242,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
Object key0 = null;
Object val0 = null;
+ IgniteThread.onEntryProcessorEntered(true);
+
try {
Object res = null;
@@ -1268,6 +1271,9 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
catch (Exception e) {
ret.addEntryProcessResult(ctx, txEntry.key(), key0, null, e, txEntry.keepBinary());
}
+ finally {
+ IgniteThread.onEntryProcessorLeft();
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/3980e1ef/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContextImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContextImpl.java
index 9e22f38..4e22ce9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContextImpl.java
@@ -333,7 +333,7 @@ public class PlatformContextImpl implements PlatformContext {
BinaryContext binCtx = cacheObjProc.binaryContext();
for (BinaryMetadata meta : metas)
- binCtx.updateMetadata(meta.typeId(), meta);
+ binCtx.updateMetadata(meta.typeId(), meta, false);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/3980e1ef/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/binary/ClientBinaryTypePutRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/binary/ClientBinaryTypePutRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/binary/ClientBinaryTypePutRequest.java
index 7839d48..64c8d80 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/binary/ClientBinaryTypePutRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/binary/ClientBinaryTypePutRequest.java
@@ -49,7 +49,7 @@ public class ClientBinaryTypePutRequest extends ClientRequest {
@Override public ClientResponse process(ClientConnectionContext ctx) {
BinaryContext binCtx = ((CacheObjectBinaryProcessorImpl) ctx.kernalContext().cacheObjects()).binaryContext();
- binCtx.updateMetadata(meta.typeId(), meta);
+ binCtx.updateMetadata(meta.typeId(), meta, false);
return super.process(ctx);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/3980e1ef/modules/core/src/main/java/org/apache/ignite/thread/IgniteThread.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/thread/IgniteThread.java b/modules/core/src/main/java/org/apache/ignite/thread/IgniteThread.java
index 70b75e3..6f65e0e 100644
--- a/modules/core/src/main/java/org/apache/ignite/thread/IgniteThread.java
+++ b/modules/core/src/main/java/org/apache/ignite/thread/IgniteThread.java
@@ -56,6 +56,12 @@ public class IgniteThread extends Thread {
/** */
private final byte plc;
+ /** */
+ private boolean executingEntryProcessor;
+
+ /** */
+ private boolean holdsTopLock;
+
/**
* Creates thread with given worker.
*
@@ -158,9 +164,46 @@ public class IgniteThread extends Thread {
}
/**
+ * @return {@code True} if thread is currently executing entry processor.
+ */
+ public boolean executingEntryProcessor() {
+ return executingEntryProcessor;
+ }
+
+ /**
+ * @return {@code True} if thread is currently holds topology lock.
+ */
+ public boolean holdsTopLock() {
+ return holdsTopLock;
+ }
+
+ /**
+ * Callback before entry processor execution is started.
+ */
+ public static void onEntryProcessorEntered(boolean holdsTopLock) {
+ Thread curThread = Thread.currentThread();
+
+ if (curThread instanceof IgniteThread) {
+ ((IgniteThread)curThread).executingEntryProcessor = true;
+
+ ((IgniteThread)curThread).holdsTopLock = holdsTopLock;
+ }
+ }
+
+ /**
+ * Callback after entry processor execution is finished.
+ */
+ public static void onEntryProcessorLeft() {
+ Thread curThread = Thread.currentThread();
+
+ if (curThread instanceof IgniteThread)
+ ((IgniteThread)curThread).executingEntryProcessor = false;
+ }
+
+ /**
* @return IgniteThread or {@code null} if current thread is not an instance of IgniteThread.
*/
- public static IgniteThread current(){
+ public static IgniteThread current() {
Thread thread = Thread.currentThread();
return thread.getClass() == IgniteThread.class || thread instanceof IgniteThread ?
http://git-wip-us.apache.org/repos/asf/ignite/blob/3980e1ef/modules/core/src/test/java/org/apache/ignite/internal/binary/TestCachingMetadataHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/binary/TestCachingMetadataHandler.java b/modules/core/src/test/java/org/apache/ignite/internal/binary/TestCachingMetadataHandler.java
index 0870153..c515f81 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/binary/TestCachingMetadataHandler.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/binary/TestCachingMetadataHandler.java
@@ -30,7 +30,7 @@ public class TestCachingMetadataHandler implements BinaryMetadataHandler {
private final ConcurrentHashMap<Integer, BinaryType> metas = new ConcurrentHashMap<>();
/** {@inheritDoc} */
- @Override public void addMeta(int typeId, BinaryType meta) throws BinaryObjectException {
+ @Override public void addMeta(int typeId, BinaryType meta, boolean failIfUnregistered) throws BinaryObjectException {
BinaryType otherType = metas.put(typeId, meta);
if (otherType != null)
http://git-wip-us.apache.org/repos/asf/ignite/blob/3980e1ef/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/BinaryMetadataRegistrationInsideEntryProcessorTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/BinaryMetadataRegistrationInsideEntryProcessorTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/BinaryMetadataRegistrationInsideEntryProcessorTest.java
new file mode 100644
index 0000000..73dae4b
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/BinaryMetadataRegistrationInsideEntryProcessorTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ *
+ */
+public class BinaryMetadataRegistrationInsideEntryProcessorTest extends GridCommonAbstractTest {
+ /** */
+ private static final String CACHE_NAME = "test-cache";
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration() {
+ TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder()
+ .setAddresses(Arrays.asList("127.0.0.1:47500..47509"));
+
+ return new IgniteConfiguration()
+ .setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(ipFinder))
+ .setPeerClassLoadingEnabled(true);
+ }
+
+ /**
+ * @throws Exception If failed;
+ */
+ public void test() throws Exception {
+ Ignite ignite = startGrids(2);
+
+ IgniteCache<Integer, Map<Integer, CustomObj>> cache = ignite.createCache(CACHE_NAME);
+
+ try {
+ for (int i = 0; i < 10_000; i++)
+ cache.invoke(i, new CustomProcessor());
+ }
+ catch (Exception e) {
+ Map<Integer, CustomObj> value = cache.get(1);
+
+ if ((value != null) && (value.get(1) != null) && (value.get(1).getObj() == CustomEnum.ONE))
+ System.out.println("Data was saved.");
+ else
+ System.out.println("Data wasn't saved.");
+
+ throw e;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class CustomProcessor implements EntryProcessor<Integer,
+ Map<Integer, CustomObj>, Object> {
+ /** {@inheritDoc} */
+ @Override public Object process(
+ MutableEntry<Integer, Map<Integer, CustomObj>> entry,
+ Object... objects) throws EntryProcessorException {
+ Map<Integer, CustomObj> map = new HashMap<>();
+
+ map.put(1, new CustomObj(CustomEnum.ONE));
+
+ entry.setValue(map);
+
+ return null;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class CustomObj {
+ /** Object. */
+ private final Object obj;
+
+ /**
+ * @param obj Object.
+ */
+ public CustomObj(Object obj) {
+ this.obj = obj;
+ }
+
+ /**
+ * @param val Value.
+ */
+ public static CustomObj valueOf(int val) {
+ return new CustomObj(val);
+ }
+
+ /**
+ *
+ */
+ public Object getObj() {
+ return obj;
+ }
+ }
+
+ /**
+ *
+ */
+ private enum CustomEnum {
+ /** */ONE(1),
+ /** */TWO(2),
+ /** */THREE(3);
+
+ /** Value. */
+ private final Object val;
+
+ /**
+ * @param val Value.
+ */
+ CustomEnum(Object val) {
+ this.val = val;
+ }
+ }
+
+}
\ No newline at end of file