You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2015/12/24 10:35:20 UTC
[06/14] ignite git commit: ignite-1.5 Fixed hang on metadata update
inside put in atomic cache when topology read lock is held. Also fixed
several test issues.
ignite-1.5 Fixed hang on metadata update inside put in atomic cache when topology read lock is held. Also fixed several test issues.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/53ec76ff
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/53ec76ff
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/53ec76ff
Branch: refs/heads/ignite-1.5.1-2
Commit: 53ec76ffe65d5788fc1ffa32c2fba66222e51dcc
Parents: 66b33bc
Author: sboikov <sb...@gridgain.com>
Authored: Wed Dec 23 15:06:48 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Dec 23 15:06:48 2015 +0300
----------------------------------------------------------------------
.../managers/communication/GridIoManager.java | 39 +-
.../processors/cache/GridCacheAdapter.java | 47 ++-
.../processors/cache/GridCacheProxyImpl.java | 29 ++
.../cache/GridCacheSharedContext.java | 10 +-
.../processors/cache/IgniteCacheProxy.java | 35 ++
.../processors/cache/IgniteInternalCache.java | 26 ++
.../binary/CacheObjectBinaryProcessorImpl.java | 4 +-
.../distributed/dht/GridDhtLockFuture.java | 2 +-
.../dht/atomic/GridDhtAtomicCache.java | 95 +++--
.../dht/atomic/GridNearAtomicUpdateFuture.java | 150 ++++---
.../colocated/GridDhtColocatedLockFuture.java | 11 +-
.../distributed/near/GridNearLockFuture.java | 11 +-
...arOptimisticSerializableTxPrepareFuture.java | 5 +-
.../near/GridNearOptimisticTxPrepareFuture.java | 5 +-
...ridNearOptimisticTxPrepareFutureAdapter.java | 12 +-
.../transactions/IgniteTxLocalAdapter.java | 2 +
.../cache/transactions/IgniteTxManager.java | 61 ++-
.../datastreamer/DataStreamProcessor.java | 12 +-
.../ignite/internal/util/lang/GridFunc.java | 1 +
.../test/config/websession/example-cache.xml | 9 +-
...niteClientReconnectFailoverAbstractTest.java | 3 +-
.../cache/GridCacheAbstractFullApiSelfTest.java | 22 +-
.../cache/GridCacheAbstractSelfTest.java | 3 +-
...yMetadataUpdateChangingTopologySelfTest.java | 7 +-
...niteBinaryMetadataUpdateNodeRestartTest.java | 411 +++++++++++++++++++
.../distributed/IgniteCacheManyClientsTest.java | 2 +
...ContinuousQueryFailoverAbstractSelfTest.java | 128 +++---
...ridCacheContinuousQueryAbstractSelfTest.java | 3 +
.../service/ClosureServiceClientsNodesTest.java | 22 +-
.../GridServiceProcessorStopSelfTest.java | 21 +-
...cpCommunicationSpiMultithreadedSelfTest.java | 21 +
.../spi/discovery/tcp/TcpDiscoverySelfTest.java | 14 +-
.../testframework/GridSpiTestContext.java | 18 +-
.../cache/websession/WebSessionFilter.java | 82 ++--
.../cache/websession/WebSessionListener.java | 25 +-
.../internal/websession/WebSessionSelfTest.java | 2 -
36 files changed, 1023 insertions(+), 327 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index bf7c7e4..42f8dae 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -666,6 +666,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
*
* @param plc Policy.
* @return Execution pool.
+ * @throws IgniteCheckedException If failed.
*/
private Executor pool(byte plc) throws IgniteCheckedException {
switch (plc) {
@@ -767,6 +768,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
* @param msg Message.
* @param plc Execution policy.
* @param msgC Closure to call when message processing finished.
+ * @throws IgniteCheckedException If failed.
*/
private void processRegularMessage(
final UUID nodeId,
@@ -824,6 +826,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
* @param msg Ordered message.
* @param plc Execution policy.
* @param msgC Closure to call when message processing finished ({@code null} for sync processing).
+ * @throws IgniteCheckedException If failed.
*/
@SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
private void processOrderedMessage(
@@ -1029,7 +1032,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
* @param ordered Ordered flag.
* @param timeout Timeout.
* @param skipOnTimeout Whether message can be skipped on timeout.
- * @param ackClosure Ack closure.
+ * @param ackC Ack closure.
* @throws IgniteCheckedException Thrown in case of any errors.
*/
private void send(
@@ -1041,7 +1044,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
boolean ordered,
long timeout,
boolean skipOnTimeout,
- IgniteInClosure<IgniteException> ackClosure
+ IgniteInClosure<IgniteException> ackC
) throws IgniteCheckedException {
assert node != null;
assert topic != null;
@@ -1062,8 +1065,8 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
else
processRegularMessage0(ioMsg, locNodeId);
- if (ackClosure != null)
- ackClosure.apply(null);
+ if (ackC != null)
+ ackC.apply(null);
}
else {
if (topicOrd < 0)
@@ -1071,7 +1074,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
try {
if ((CommunicationSpi)getSpi() instanceof TcpCommunicationSpi)
- ((TcpCommunicationSpi)(CommunicationSpi)getSpi()).sendMessage(node, ioMsg, ackClosure);
+ ((TcpCommunicationSpi)(CommunicationSpi)getSpi()).sendMessage(node, ioMsg, ackC);
else
getSpi().sendMessage(node, ioMsg);
}
@@ -1197,12 +1200,12 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
* @param topic Topic to send the message to.
* @param msg Message to send.
* @param plc Type of processing.
- * @param ackClosure Ack closure.
+ * @param ackC Ack closure.
* @throws IgniteCheckedException Thrown in case of any errors.
*/
public void send(ClusterNode node, GridTopic topic, Message msg, byte plc,
- IgniteInClosure<IgniteException> ackClosure) throws IgniteCheckedException {
- send(node, topic, topic.ordinal(), msg, plc, false, 0, false, ackClosure);
+ IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException {
+ send(node, topic, topic.ordinal(), msg, plc, false, 0, false, ackC);
}
/**
@@ -1233,12 +1236,12 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
* @param topic Topic to send the message to.
* @param msg Message to send.
* @param plc Type of processing.
- * @param ackClosure Ack closure.
+ * @param ackC Ack closure.
* @throws IgniteCheckedException Thrown in case of any errors.
*/
- public void send(ClusterNode node, Object topic, Message msg, byte plc, IgniteInClosure<IgniteException> ackClosure)
+ public void send(ClusterNode node, Object topic, Message msg, byte plc, IgniteInClosure<IgniteException> ackC)
throws IgniteCheckedException {
- send(node, topic, -1, msg, plc, false, 0, false, ackClosure);
+ send(node, topic, -1, msg, plc, false, 0, false, ackC);
}
/**
@@ -1280,7 +1283,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
* @param plc Type of processing.
* @param timeout Timeout to keep a message on receiving queue.
* @param skipOnTimeout Whether message can be skipped on timeout.
- * @param ackClosure Ack closure.
+ * @param ackC Ack closure.
* @throws IgniteCheckedException Thrown in case of any errors.
*/
public void sendOrderedMessage(
@@ -1290,11 +1293,11 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
byte plc,
long timeout,
boolean skipOnTimeout,
- IgniteInClosure<IgniteException> ackClosure
+ IgniteInClosure<IgniteException> ackC
) throws IgniteCheckedException {
assert timeout > 0 || skipOnTimeout;
- send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, ackClosure);
+ send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, ackC);
}
/**
@@ -1385,6 +1388,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
* @param topic Topic to subscribe to.
* @param p Message predicate.
*/
+ @SuppressWarnings("unchecked")
public void addUserMessageListener(@Nullable final Object topic, @Nullable final IgniteBiPredicate<UUID, ?> p) {
if (p != null) {
try {
@@ -1406,6 +1410,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
* @param topic Topic to unsubscribe from.
* @param p Message predicate.
*/
+ @SuppressWarnings("unchecked")
public void removeUserMessageListener(@Nullable Object topic, IgniteBiPredicate<UUID, ?> p) {
try {
removeMessageListener(TOPIC_COMM_USER,
@@ -1423,7 +1428,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
* @param plc Type of processing.
* @param timeout Timeout to keep a message on receiving queue.
* @param skipOnTimeout Whether message can be skipped on timeout.
- * @param ackClosure Ack closure.
+ * @param ackC Ack closure.
* @throws IgniteCheckedException Thrown in case of any errors.
*/
public void sendOrderedMessage(
@@ -1433,7 +1438,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
byte plc,
long timeout,
boolean skipOnTimeout,
- IgniteInClosure<IgniteException> ackClosure
+ IgniteInClosure<IgniteException> ackC
) throws IgniteCheckedException {
assert timeout > 0 || skipOnTimeout;
@@ -1442,7 +1447,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
if (node == null)
throw new IgniteCheckedException("Failed to send message to node (has node left grid?): " + nodeId);
- send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, ackClosure);
+ send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, ackC);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index cc4e962..5d4c386 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -2077,8 +2077,32 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
}
/** {@inheritDoc} */
+ @Nullable @Override public <T> EntryProcessorResult<T> invoke(@Nullable AffinityTopologyVersion topVer,
+ K key,
+ EntryProcessor<K, V, T> entryProcessor,
+ Object... args) throws IgniteCheckedException {
+ return invoke0(topVer, key, entryProcessor, args);
+ }
+
+ /** {@inheritDoc} */
@Override public <T> EntryProcessorResult<T> invoke(final K key,
final EntryProcessor<K, V, T> entryProcessor,
+ final Object... args) throws IgniteCheckedException {
+ return invoke0(null, key, entryProcessor, args);
+ }
+
+ /**
+ * @param topVer Locked topology version.
+ * @param key Key.
+ * @param entryProcessor Entry processor.
+ * @param args Entry processor arguments.
+ * @return Invoke result.
+ * @throws IgniteCheckedException If failed.
+ */
+ private <T> EntryProcessorResult<T> invoke0(
+ @Nullable final AffinityTopologyVersion topVer,
+ final K key,
+ final EntryProcessor<K, V, T> entryProcessor,
final Object... args)
throws IgniteCheckedException {
A.notNull(key, "key", entryProcessor, "entryProcessor");
@@ -2089,8 +2113,15 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
return syncOp(new SyncOp<EntryProcessorResult<T>>(true) {
@Nullable @Override public EntryProcessorResult<T> op(IgniteTxLocalAdapter tx)
throws IgniteCheckedException {
- IgniteInternalFuture<GridCacheReturn> fut =
- tx.invokeAsync(ctx, key, (EntryProcessor<K, V, Object>)entryProcessor, args);
+ assert topVer == null || tx.implicit();
+
+ if (topVer != null)
+ tx.topologyVersion(topVer);
+
+ IgniteInternalFuture<GridCacheReturn> fut = tx.invokeAsync(ctx,
+ key,
+ (EntryProcessor<K, V, Object>)entryProcessor,
+ args);
Map<K, EntryProcessorResult<T>> resMap = fut.get().value();
@@ -2324,16 +2355,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
});
}
- /**
- * Tries to put value in cache. Will fail with {@link GridCacheTryPutFailedException}
- * if topology exchange is in progress.
- *
- * @param key Key.
- * @param val value.
- * @return Old value.
- * @throws IgniteCheckedException In case of error.
- */
- @Nullable public V tryPutIfAbsent(K key, V val) throws IgniteCheckedException {
+ /** {@inheritDoc} */
+ @Nullable @Override public V tryPutIfAbsent(K key, V val) throws IgniteCheckedException {
// Supported only in ATOMIC cache.
throw new UnsupportedOperationException();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
index d1d93d8..8ffd273 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
@@ -36,6 +36,7 @@ import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.cache.affinity.Affinity;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.affinity.GridCacheAffinityProxy;
import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
@@ -1231,6 +1232,34 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte
}
/** {@inheritDoc} */
+ @Nullable @Override public V tryPutIfAbsent(K key, V val) throws IgniteCheckedException {
+ CacheOperationContext prev = gate.enter(opCtx);
+
+ try {
+ return delegate.tryPutIfAbsent(key, val);
+ }
+ finally {
+ gate.leave(prev);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public <T> EntryProcessorResult<T> invoke(
+ AffinityTopologyVersion topVer,
+ K key,
+ EntryProcessor<K, V, T> entryProcessor,
+ Object... args) throws IgniteCheckedException {
+ CacheOperationContext prev = gate.enter(opCtx);
+
+ try {
+ return delegate.invoke(topVer, key, entryProcessor, args);
+ }
+ finally {
+ gate.leave(prev);
+ }
+ }
+
+ /** {@inheritDoc} */
@Override public void removeAll()
throws IgniteCheckedException {
CacheOperationContext prev = gate.enter(opCtx);
http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
index 5ed1df9..2221d3b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
@@ -91,9 +91,6 @@ public class GridCacheSharedContext<K, V> {
/** Tx metrics. */
private volatile TransactionMetricsAdapter txMetrics;
- /** Preloaders start future. */
- private IgniteInternalFuture<Object> preloadersStartFut;
-
/** Store session listeners. */
private Collection<CacheStoreSessionListener> storeSesLsnrs;
@@ -578,12 +575,7 @@ public class GridCacheSharedContext<K, V> {
@Nullable public AffinityTopologyVersion lockedTopologyVersion(IgniteInternalTx ignore) {
long threadId = Thread.currentThread().getId();
- IgniteInternalTx tx = txMgr.anyActiveThreadTx(threadId, ignore);
-
- AffinityTopologyVersion topVer = null;
-
- if (tx != null && tx.topologyVersionSnapshot() != null)
- topVer = tx.topologyVersionSnapshot();
+ AffinityTopologyVersion topVer = txMgr.lockedTopologyVersion(threadId, ignore);
if (topVer == null)
topVer = mvccMgr.lastExplicitLockTopologyVersion(threadId);
http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index 271a2cf..27a7587 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -64,6 +64,7 @@ import org.apache.ignite.internal.AsyncSupportAdapter;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.query.CacheQuery;
import org.apache.ignite.internal.processors.cache.query.CacheQueryFuture;
import org.apache.ignite.internal.processors.query.GridQueryProcessor;
@@ -1483,6 +1484,40 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
return invoke(key, (EntryProcessor<K, V, T>)entryProcessor, args);
}
+ /**
+ * @param topVer Locked topology version.
+ * @param key Key.
+ * @param entryProcessor Entry processor.
+ * @param args Arguments.
+ * @return Invoke result.
+ */
+ public <T> T invoke(@Nullable AffinityTopologyVersion topVer,
+ K key,
+ EntryProcessor<K, V, T> entryProcessor,
+ Object... args) {
+ try {
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
+
+ try {
+ if (isAsync())
+ throw new UnsupportedOperationException();
+ else {
+ EntryProcessorResult<T> res = delegate.invoke(topVer, key, entryProcessor, args);
+
+ return res != null ? res.get() : null;
+ }
+ }
+ finally {
+ onLeave(gate, prev);
+ }
+ }
+ catch (IgniteCheckedException e) {
+ throw cacheException(e);
+ }
+ }
+
/** {@inheritDoc} */
@Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<? extends K> keys,
EntryProcessor<K, V, T> entryProcessor,
http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java
index 186de68..433290c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java
@@ -40,6 +40,7 @@ import org.apache.ignite.cache.store.CacheStore;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -1863,4 +1864,29 @@ public interface IgniteInternalCache<K, V> extends Iterable<Cache.Entry<K, V>> {
* @throws IgniteCheckedException If failed.
*/
public V getTopologySafe(K key) throws IgniteCheckedException;
+
+ /**
+ * Tries to put value in cache. Will fail with {@link GridCacheTryPutFailedException}
+ * if topology exchange is in progress.
+ *
+ * @param key Key.
+ * @param val value.
+ * @return Old value.
+ * @throws IgniteCheckedException In case of error.
+ */
+ @Nullable public V tryPutIfAbsent(K key, V val) throws IgniteCheckedException;
+
+ /**
+ * @param topVer Locked topology version.
+ * @param key Key.
+ * @param entryProcessor Entry processor.
+ * @param args Arguments.
+ * @return Invoke result.
+ * @throws IgniteCheckedException If failed.
+ */
+ @Nullable public <T> EntryProcessorResult<T> invoke(
+ @Nullable AffinityTopologyVersion topVer,
+ K key,
+ EntryProcessor<K, V, T> entryProcessor,
+ Object... args) throws IgniteCheckedException;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
index b335179..7586a42 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
@@ -489,7 +489,9 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
BinaryMetadata oldMeta = metaDataCache.localPeek(key);
BinaryMetadata mergedMeta = BinaryUtils.mergeMetadata(oldMeta, newMeta0);
- BinaryObjectException err = metaDataCache.invoke(key, new MetadataProcessor(mergedMeta));
+ AffinityTopologyVersion topVer = ctx.cache().context().lockedTopologyVersion(null);
+
+ BinaryObjectException err = metaDataCache.invoke(topVer, key, new MetadataProcessor(mergedMeta));
if (err != null)
throw err;
http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
index f0d2e15..98711b8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
@@ -743,7 +743,7 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
if (tx != null) {
cctx.tm().txContext(tx);
- set = cctx.tm().setTxTopologyHint(tx);
+ set = cctx.tm().setTxTopologyHint(tx.topologyVersionSnapshot());
}
try {
http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 481317a..634a9ea 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -77,6 +77,7 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSing
import org.apache.ignite.internal.processors.cache.dr.GridCacheDrExpirationInfo;
import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx;
@@ -1240,7 +1241,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
top.readLock();
try {
- if (topology().stopping()) {
+ if (top.stopping()) {
res.addFailedKeys(keys, new IgniteCheckedException("Failed to perform cache operation " +
"(cache is stopped): " + name()));
@@ -1289,48 +1290,59 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
GridCacheReturn retVal = null;
- if (keys.size() > 1 && // Several keys ...
- writeThrough() && !req.skipStore() && // and store is enabled ...
- !ctx.store().isLocal() && // and this is not local store ...
- !ctx.dr().receiveEnabled() // and no DR.
- ) {
- // This method can only be used when there are no replicated entries in the batch.
- UpdateBatchResult updRes = updateWithBatch(node,
- hasNear,
- req,
- res,
- locked,
- ver,
- dhtFut,
- completionCb,
- ctx.isDrEnabled(),
- taskName,
- expiry,
- sndPrevVal);
+ IgniteTxManager tm = ctx.tm();
- deleted = updRes.deleted();
- dhtFut = updRes.dhtFuture();
+ // Needed for metadata cache transaction.
+ boolean set = tm.setTxTopologyHint(req.topologyVersion());
- if (req.operation() == TRANSFORM)
- retVal = updRes.invokeResults();
+ try {
+ if (keys.size() > 1 && // Several keys ...
+ writeThrough() && !req.skipStore() && // and store is enabled ...
+ !ctx.store().isLocal() && // and this is not local store ...
+ !ctx.dr().receiveEnabled() // and no DR.
+ ) {
+ // This method can only be used when there are no replicated entries in the batch.
+ UpdateBatchResult updRes = updateWithBatch(node,
+ hasNear,
+ req,
+ res,
+ locked,
+ ver,
+ dhtFut,
+ completionCb,
+ ctx.isDrEnabled(),
+ taskName,
+ expiry,
+ sndPrevVal);
+
+ deleted = updRes.deleted();
+ dhtFut = updRes.dhtFuture();
+
+ if (req.operation() == TRANSFORM)
+ retVal = updRes.invokeResults();
+ }
+ else {
+ UpdateSingleResult updRes = updateSingle(node,
+ hasNear,
+ req,
+ res,
+ locked,
+ ver,
+ dhtFut,
+ completionCb,
+ ctx.isDrEnabled(),
+ taskName,
+ expiry,
+ sndPrevVal);
+
+ retVal = updRes.returnValue();
+ deleted = updRes.deleted();
+ dhtFut = updRes.dhtFuture();
+ }
}
- else {
- UpdateSingleResult updRes = updateSingle(node,
- hasNear,
- req,
- res,
- locked,
- ver,
- dhtFut,
- completionCb,
- ctx.isDrEnabled(),
- taskName,
- expiry,
- sndPrevVal);
-
- retVal = updRes.returnValue();
- deleted = updRes.deleted();
- dhtFut = updRes.dhtFuture();
+ finally {
+ if (set)
+ tm.setTxTopologyHint(null);
}
if (retVal == null)
@@ -2782,8 +2794,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
if (log.isDebugEnabled())
log.debug("Processing dht atomic update response [nodeId=" + nodeId + ", res=" + res + ']');
- GridDhtAtomicUpdateFuture updateFut = (GridDhtAtomicUpdateFuture)ctx.mvcc().
- atomicFuture(res.futureVersion());
+ GridDhtAtomicUpdateFuture updateFut = (GridDhtAtomicUpdateFuture)ctx.mvcc().atomicFuture(res.futureVersion());
if (updateFut != null)
updateFut.onResult(nodeId, res);
http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index eefdc73..3c86083 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.UUID;
@@ -47,8 +46,8 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearAtomicCache;
import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.CI1;
@@ -288,7 +287,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
// Cannot remap.
remapCnt = 1;
- state.map(topVer);
+ state.map(topVer, null);
}
}
@@ -415,7 +414,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
cache.topology().readUnlock();
}
- state.map(topVer);
+ state.map(topVer, null);
}
/**
@@ -582,7 +581,9 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
req = mappings != null ? mappings.get(nodeId) : null;
if (req != null && req.response() == null) {
- res = new GridNearAtomicUpdateResponse(cctx.cacheId(), nodeId, req.futureVersion(),
+ res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
+ nodeId,
+ req.futureVersion(),
cctx.deploymentEnabled());
ClusterTopologyCheckedException e = new ClusterTopologyCheckedException("Primary node left grid " +
@@ -603,6 +604,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
* @param res Response.
* @param nodeErr {@code True} if response was created on node failure.
*/
+ @SuppressWarnings("unchecked")
void onResult(UUID nodeId, GridNearAtomicUpdateResponse res, boolean nodeErr) {
GridNearAtomicUpdateRequest req;
@@ -774,7 +776,11 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
return;
}
- IgniteInternalFuture<AffinityTopologyVersion> fut = cctx.affinity().affinityReadyFuture(remapTopVer);
+ IgniteInternalFuture<AffinityTopologyVersion> fut =
+ cctx.shared().exchange().affinityReadyFuture(remapTopVer);
+
+ if (fut == null)
+ fut = new GridFinishedFuture<>(remapTopVer);
fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
@Override public void apply(final IgniteInternalFuture<AffinityTopologyVersion> fut) {
@@ -783,7 +789,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
try {
AffinityTopologyVersion topVer = fut.get();
- map(topVer);
+ map(topVer, remapKeys);
}
catch (IgniteCheckedException e) {
onDone(e);
@@ -819,8 +825,9 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
/**
* @param topVer Topology version.
+ * @param remapKeys Keys to remap.
*/
- void map(AffinityTopologyVersion topVer) {
+ void map(AffinityTopologyVersion topVer, @Nullable Collection<KeyCacheObject> remapKeys) {
Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer);
if (F.isEmpty(topNodes)) {
@@ -832,68 +839,78 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
Exception err = null;
GridNearAtomicUpdateRequest singleReq0 = null;
- Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = null;
+ Map<UUID, GridNearAtomicUpdateRequest> mappings0 = null;
int size = keys.size();
- synchronized (this) {
- assert futVer == null : this;
- assert this.topVer == AffinityTopologyVersion.ZERO : this;
+ GridCacheVersion futVer = cctx.versions().next(topVer);
- resCnt = 0;
+ GridCacheVersion updVer;
- this.topVer = topVer;
+ // Assign version on near node in CLOCK ordering mode even if fastMap is false.
+ if (cctx.config().getAtomicWriteOrderMode() == CLOCK) {
+ updVer = this.updVer;
- futVer = cctx.versions().next(topVer);
+ if (updVer == null) {
+ updVer = cctx.versions().next(topVer);
- if (storeFuture()) {
- if (!cctx.mvcc().addAtomicFuture(futVer, GridNearAtomicUpdateFuture.this)) {
- assert isDone() : GridNearAtomicUpdateFuture.this;
-
- return;
- }
+ if (log.isDebugEnabled())
+ log.debug("Assigned fast-map version for update on near node: " + updVer);
}
+ }
+ else
+ updVer = null;
- // Assign version on near node in CLOCK ordering mode even if fastMap is false.
- if (updVer == null)
- updVer = cctx.config().getAtomicWriteOrderMode() == CLOCK ? cctx.versions().next(topVer) : null;
+ try {
+ if (size == 1 && !fastMap) {
+ assert remapKeys == null || remapKeys.size() == 1;
- if (updVer != null && log.isDebugEnabled())
- log.debug("Assigned fast-map version for update on near node: " + updVer);
+ singleReq0 = mapSingleUpdate(topVer, futVer, updVer);
+ }
+ else {
+ Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = mapUpdate(topNodes,
+ topVer,
+ futVer,
+ updVer,
+ remapKeys);
+
+ if (pendingMappings.size() == 1)
+ singleReq0 = F.firstValue(pendingMappings);
+ else {
+ if (syncMode == PRIMARY_SYNC) {
+ mappings0 = U.newHashMap(pendingMappings.size());
- try {
- if (size == 1 && !fastMap) {
- assert remapKeys == null || remapKeys.size() == 1;
+ for (GridNearAtomicUpdateRequest req : pendingMappings.values()) {
+ if (req.hasPrimary())
+ mappings0.put(req.nodeId(), req);
+ }
+ }
+ else
+ mappings0 = pendingMappings;
- singleReq0 = singleReq = mapSingleUpdate();
+ assert !mappings0.isEmpty() || size == 0 : GridNearAtomicUpdateFuture.this;
}
- else {
- pendingMappings = mapUpdate(topNodes);
+ }
- if (pendingMappings.size() == 1)
- singleReq0 = singleReq = F.firstValue(pendingMappings);
- else {
- if (syncMode == PRIMARY_SYNC) {
- mappings = U.newHashMap(pendingMappings.size());
+ synchronized (this) {
+ assert this.futVer == null : this;
+ assert this.topVer == AffinityTopologyVersion.ZERO : this;
- for (GridNearAtomicUpdateRequest req : pendingMappings.values()) {
- if (req.hasPrimary())
- mappings.put(req.nodeId(), req);
- }
- }
- else
- mappings = new HashMap<>(pendingMappings);
+ this.topVer = topVer;
+ this.updVer = updVer;
+ this.futVer = futVer;
- assert !mappings.isEmpty() || size == 0 : GridNearAtomicUpdateFuture.this;
- }
- }
+ resCnt = 0;
- remapKeys = null;
- }
- catch (Exception e) {
- err = e;
+ singleReq = singleReq0;
+ mappings = mappings0;
+
+ this.remapKeys = null;
}
}
+ catch (Exception e) {
+ err = e;
+ }
if (err != null) {
onDone(err);
@@ -901,16 +918,24 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
return;
}
+ if (storeFuture()) {
+ if (!cctx.mvcc().addAtomicFuture(futVer, GridNearAtomicUpdateFuture.this)) {
+ assert isDone() : GridNearAtomicUpdateFuture.this;
+
+ return;
+ }
+ }
+
// Optimize mapping for single key.
if (singleReq0 != null)
mapSingle(singleReq0.nodeId(), singleReq0);
else {
- assert pendingMappings != null;
+ assert mappings0 != null;
if (size == 0)
onDone(new GridCacheReturn(cctx, true, true, null, true));
else
- doUpdate(pendingMappings);
+ doUpdate(mappings0);
}
}
@@ -958,10 +983,18 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
/**
* @param topNodes Cache nodes.
+ * @param topVer Topology version.
+ * @param futVer Future version.
+ * @param updVer Update version.
+ * @param remapKeys Keys to remap.
* @return Mapping.
* @throws Exception If failed.
*/
- private Map<UUID, GridNearAtomicUpdateRequest> mapUpdate(Collection<ClusterNode> topNodes) throws Exception {
+ private Map<UUID, GridNearAtomicUpdateRequest> mapUpdate(Collection<ClusterNode> topNodes,
+ AffinityTopologyVersion topVer,
+ GridCacheVersion futVer,
+ @Nullable GridCacheVersion updVer,
+ @Nullable Collection<KeyCacheObject> remapKeys) throws Exception {
Iterator<?> it = null;
if (vals != null)
@@ -999,7 +1032,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
throw new NullPointerException("Null value.");
}
else if (conflictPutVals != null) {
- GridCacheDrInfo conflictPutVal = conflictPutValsIt.next();
+ GridCacheDrInfo conflictPutVal = conflictPutValsIt.next();
val = conflictPutVal.value();
conflictVer = conflictPutVal.version();
@@ -1082,10 +1115,15 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
}
/**
+ * @param topVer Topology version.
+ * @param futVer Future version.
+ * @param updVer Update version.
* @return Request.
* @throws Exception If failed.
*/
- private GridNearAtomicUpdateRequest mapSingleUpdate() throws Exception {
+ private GridNearAtomicUpdateRequest mapSingleUpdate(AffinityTopologyVersion topVer,
+ GridCacheVersion futVer,
+ @Nullable GridCacheVersion updVer) throws Exception {
Object key = F.first(keys);
Object val;
http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index 22b329c..a5f5286 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -48,7 +48,6 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLock
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockResponse;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -596,12 +595,8 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
AffinityTopologyVersion topVer = cctx.mvcc().lastExplicitLockTopologyVersion(threadId);
// If there is another system transaction in progress, use it's topology version to prevent deadlock.
- if (topVer == null && tx != null && tx.system()) {
- IgniteInternalTx tx0 = cctx.tm().anyActiveThreadTx(Thread.currentThread().getId(), tx);
-
- if (tx0 != null)
- topVer = tx0.topologyVersionSnapshot();
- }
+ if (topVer == null && tx != null && tx.system())
+ topVer = cctx.tm().lockedTopologyVersion(Thread.currentThread().getId(), tx);
if (topVer != null && tx != null)
tx.topologyVersion(topVer);
@@ -980,7 +975,7 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
* @throws IgniteCheckedException If failed.
*/
private void proceedMapping() throws IgniteCheckedException {
- boolean set = tx != null && cctx.shared().tm().setTxTopologyHint(tx);
+ boolean set = tx != null && cctx.shared().tm().setTxTopologyHint(tx.topologyVersionSnapshot());
try {
proceedMapping0();
http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
index 23e0f6b..55c5ab6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
@@ -46,7 +46,6 @@ import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCa
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTransactionalCacheAdapter;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -723,12 +722,8 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
AffinityTopologyVersion topVer = cctx.mvcc().lastExplicitLockTopologyVersion(threadId);
// If there is another system transaction in progress, use it's topology version to prevent deadlock.
- if (topVer == null && tx != null && tx.system()) {
- IgniteInternalTx tx0 = cctx.tm().anyActiveThreadTx(threadId, tx);
-
- if (tx0 != null)
- topVer = tx0.topologyVersionSnapshot();
- }
+ if (topVer == null && tx != null && tx.system())
+ topVer = cctx.tm().lockedTopologyVersion(threadId, tx);
if (topVer != null && tx != null)
tx.topologyVersion(topVer);
@@ -1098,7 +1093,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
* @throws IgniteCheckedException If failed.
*/
private void proceedMapping() throws IgniteCheckedException {
- boolean set = tx != null && cctx.shared().tm().setTxTopologyHint(tx);
+ boolean set = tx != null && cctx.shared().tm().setTxTopologyHint(tx.topologyVersionSnapshot());
try {
proceedMapping0();
http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
index f52b3fc..37dc564 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
@@ -84,7 +84,8 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
* @param cctx Context.
* @param tx Transaction.
*/
- public GridNearOptimisticSerializableTxPrepareFuture(GridCacheSharedContext cctx, GridNearTxLocal tx) {
+ public GridNearOptimisticSerializableTxPrepareFuture(GridCacheSharedContext cctx,
+ GridNearTxLocal tx) {
super(cctx, tx);
assert tx.optimistic() && tx.serializable() : tx;
@@ -304,7 +305,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
return;
}
- boolean set = cctx.tm().setTxTopologyHint(tx);
+ boolean set = cctx.tm().setTxTopologyHint(tx.topologyVersionSnapshot());
try {
prepare(tx.readEntries(), tx.writeEntries(), remap, topLocked);
http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index 2ce14af..a9f158a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -72,7 +72,8 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
* @param cctx Context.
* @param tx Transaction.
*/
- public GridNearOptimisticTxPrepareFuture(GridCacheSharedContext cctx, GridNearTxLocal tx) {
+ public GridNearOptimisticTxPrepareFuture(GridCacheSharedContext cctx,
+ GridNearTxLocal tx) {
super(cctx, tx);
assert tx.optimistic() && !tx.serializable() : tx;
@@ -405,7 +406,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
if (isDone())
return;
- boolean set = cctx.tm().setTxTopologyHint(tx);
+ boolean set = cctx.tm().setTxTopologyHint(tx.topologyVersionSnapshot());
try {
assert !m.empty();
http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
index b3eab34..fa7020b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
@@ -23,7 +23,6 @@ import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -41,7 +40,8 @@ public abstract class GridNearOptimisticTxPrepareFutureAdapter extends GridNearT
* @param cctx Context.
* @param tx Transaction.
*/
- public GridNearOptimisticTxPrepareFutureAdapter(GridCacheSharedContext cctx, GridNearTxLocal tx) {
+ public GridNearOptimisticTxPrepareFutureAdapter(GridCacheSharedContext cctx,
+ GridNearTxLocal tx) {
super(cctx, tx);
assert tx.optimistic() : tx;
@@ -55,12 +55,8 @@ public abstract class GridNearOptimisticTxPrepareFutureAdapter extends GridNearT
AffinityTopologyVersion topVer = cctx.mvcc().lastExplicitLockTopologyVersion(threadId);
// If there is another system transaction in progress, use it's topology version to prevent deadlock.
- if (topVer == null && tx != null && tx.system()) {
- IgniteInternalTx tx0 = cctx.tm().anyActiveThreadTx(threadId, tx);
-
- if (tx0 != null)
- topVer = tx0.topologyVersionSnapshot();
- }
+ if (topVer == null && tx != null && tx.system())
+ topVer = cctx.tm().lockedTopologyVersion(threadId, tx);
if (topVer != null) {
tx.topologyVersion(topVer);
http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 720832e..70c79a5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -3203,8 +3203,10 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
}
/**
+ * @param cacheCtx Cache context.
* @param loadFut Missing keys load future.
* @param ret Future result.
+ * @param keepBinary Keep binary flag.
* @return Future.
*/
private IgniteInternalFuture optimisticPutFuture(
http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index d2b803a..d384e4e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -114,8 +114,8 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
/** Committing transactions. */
private final ThreadLocal<IgniteInternalTx> threadCtx = new ThreadLocal<>();
- /** Transaction which topology version should be used when mapping internal tx. */
- private final ThreadLocal<IgniteInternalTx> txTopology = new ThreadLocal<>();
+ /** Topology version should be used when mapping internal tx. */
+ private final ThreadLocal<AffinityTopologyVersion> txTop = new ThreadLocal<>();
/** Per-thread transaction map. */
private final ConcurrentMap<Long, IgniteInternalTx> threadMap = newMap();
@@ -130,7 +130,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
private final ConcurrentMap<GridCacheVersion, IgniteInternalTx> nearIdMap = newMap();
/** TX handler. */
- private IgniteTxHandler txHandler;
+ private IgniteTxHandler txHnd;
/** Committed local transactions. */
private final GridBoundedConcurrentOrderedMap<GridCacheVersion, Boolean> completedVersSorted =
@@ -197,7 +197,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
@Override protected void start0() throws IgniteCheckedException {
txFinishSync = new GridCacheTxFinishSync<>(cctx);
- txHandler = new IgniteTxHandler(cctx);
+ txHnd = new IgniteTxHandler(cctx);
}
/** {@inheritDoc} */
@@ -212,7 +212,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
* @return TX handler.
*/
public IgniteTxHandler txHandler() {
- return txHandler;
+ return txHnd;
}
/**
@@ -607,13 +607,17 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
/**
* @param threadId Thread ID.
* @param ignore Transaction to ignore.
- * @return Any transaction associated with the current thread.
+ * @return Not null topology version if current thread holds lock preventing topology change.
*/
- public IgniteInternalTx anyActiveThreadTx(long threadId, IgniteInternalTx ignore) {
+ @Nullable public AffinityTopologyVersion lockedTopologyVersion(long threadId, IgniteInternalTx ignore) {
IgniteInternalTx tx = threadMap.get(threadId);
- if (tx != null && tx.topologyVersionSnapshot() != null)
- return tx;
+ if (tx != null) {
+ AffinityTopologyVersion topVer = tx.topologyVersionSnapshot();
+
+ if (topVer != null)
+ return topVer;
+ }
for (GridCacheContext cacheCtx : cctx.cache().context().cacheContexts()) {
if (!cacheCtx.systemTx())
@@ -621,22 +625,27 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
tx = sysThreadMap.get(new TxThreadKey(threadId, cacheCtx.cacheId()));
- if (tx != null && tx != ignore && tx.topologyVersionSnapshot() != null)
- return tx;
+ if (tx != null && tx != ignore) {
+ AffinityTopologyVersion topVer = tx.topologyVersionSnapshot();
+
+ if (topVer != null)
+ return topVer;
+ }
}
- return txTopology.get();
+ return txTop.get();
}
/**
- * @param tx Transaction.
+ * @param topVer Locked topology version.
+ * @return {@code True} if topology hint was set.
*/
- public boolean setTxTopologyHint(IgniteInternalTx tx) {
- if (tx == null)
- txTopology.remove();
+ public boolean setTxTopologyHint(@Nullable AffinityTopologyVersion topVer) {
+ if (topVer == null)
+ txTop.remove();
else {
- if (txTopology.get() == null) {
- txTopology.set(tx);
+ if (txTop.get() == null) {
+ txTop.set(topVer);
return true;
}
@@ -1807,8 +1816,10 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
this.evtNodeId = evtNodeId;
}
- /** {@inheritDoc} */
- @Override public void onTimeout() {
+ /**
+ *
+ */
+ private void onTimeout0() {
try {
cctx.kernalContext().gateway().readLock();
}
@@ -1861,6 +1872,16 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
cctx.kernalContext().gateway().readUnlock();
}
}
+
+ /** {@inheritDoc} */
+ @Override public void onTimeout() {
+ // Should not block timeout thread.
+ cctx.kernalContext().closure().runLocalSafe(new Runnable() {
+ @Override public void run() {
+ onTimeout0();
+ }
+ });
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
index a2aab77..da39209 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
@@ -62,6 +62,9 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter {
/** Marshaller. */
private final Marshaller marsh;
+ /** */
+ private byte[] marshErrBytes;
+
/**
* @param ctx Kernal context.
*/
@@ -86,6 +89,9 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter {
if (ctx.config().isDaemon())
return;
+ marshErrBytes = marsh.marshal(new IgniteCheckedException("Failed to marshal response error, " +
+ "see node log for details."));
+
flusher = new IgniteThread(new GridWorker(ctx.gridName(), "grid-data-loader-flusher", log) {
@Override protected void body() throws InterruptedException {
while (!isCancelled()) {
@@ -324,10 +330,10 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter {
try {
errBytes = err != null ? marsh.marshal(err) : null;
}
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to marshal message.", e);
+ catch (Exception e) {
+ U.error(log, "Failed to marshal error [err=" + err + ", marshErr=" + e + ']', e);
- return;
+ errBytes = marshErrBytes;
}
DataStreamerResponse res = new DataStreamerResponse(reqId, errBytes, forceLocDep);
http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
index 8d5a8e7..8eeca6b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
@@ -3408,6 +3408,7 @@ public class GridFunc {
* @return First element in given collection for which predicate evaluates to
* {@code true} - or {@code null} if such element cannot be found.
*/
+ @SafeVarargs
@Nullable public static <V> V find(Iterable<? extends V> c, @Nullable V dfltVal,
@Nullable IgnitePredicate<? super V>... p) {
A.notNull(c, "c");
http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/test/config/websession/example-cache.xml
----------------------------------------------------------------------
diff --git a/modules/core/src/test/config/websession/example-cache.xml b/modules/core/src/test/config/websession/example-cache.xml
index d5bfeb7..0cc0e1e 100644
--- a/modules/core/src/test/config/websession/example-cache.xml
+++ b/modules/core/src/test/config/websession/example-cache.xml
@@ -130,14 +130,7 @@
<property name="discoverySpi">
<bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
<property name="ipFinder">
- <!--
- Ignite provides several options for automatic discovery that can be used
- instead os static IP based discovery. For information on all options refer
- to our documentation: http://apacheignite.readme.io/docs/cluster-config
- -->
- <!-- Uncomment static IP finder to enable static-based discovery of initial nodes. -->
- <!--<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">-->
- <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">
+ <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
<property name="addresses">
<list>
<!-- In distributed environment, replace with actual host IP address. -->
http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverAbstractTest.java
index f050c72..7e217b7 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverAbstractTest.java
@@ -117,7 +117,8 @@ public abstract class IgniteClientReconnectFailoverAbstractTest extends IgniteCl
}
return null;
- } catch (Throwable e) {
+ }
+ catch (Throwable e) {
log.error("Unexpected error in operation thread: " + e, e);
stop.set(true);
http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
index 5b294cc..0d9c541 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
@@ -3277,9 +3277,9 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
* @throws Exception If failed.
*/
public void testPeekExpired() throws Exception {
- IgniteCache<String, Integer> c = jcache();
+ final IgniteCache<String, Integer> c = jcache();
- String key = primaryKeysForCache(c, 1).get(0);
+ final String key = primaryKeysForCache(c, 1).get(0);
info("Using key: " + key);
@@ -3295,6 +3295,12 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
Thread.sleep(ttl + 100);
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return peek(c, key) == null;
+ }
+ }, 2000);
+
assert peek(c, key) == null;
assert c.localSize() == 0 : "Cache is not empty.";
@@ -3307,9 +3313,9 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
*/
public void testPeekExpiredTx() throws Exception {
if (txShouldBeUsed()) {
- IgniteCache<String, Integer> c = jcache();
+ final IgniteCache<String, Integer> c = jcache();
- String key = "1";
+ final String key = "1";
int ttl = 500;
try (Transaction tx = grid(0).transactions().txStart()) {
@@ -3320,9 +3326,13 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
tx.commit();
}
- Thread.sleep(ttl + 100);
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return peek(c, key) == null;
+ }
+ }, 2000);
- assertNull(c.localPeek(key, ONHEAP));
+ assertNull(peek(c, key));
assert c.localSize() == 0;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java
index 52fbf4c..b3d1384 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java
@@ -416,9 +416,8 @@ public abstract class GridCacheAbstractSelfTest extends GridCommonAbstractTest {
* @param cache Cache projection.
* @param key Key.
* @return Value.
- * @throws Exception If failed.
*/
- @Nullable protected <K, V> V peek(IgniteCache<K, V> cache, K key) throws Exception {
+ @Nullable protected <K, V> V peek(IgniteCache<K, V> cache, K key) {
return offheapTiered(cache) ? cache.localPeek(key, CachePeekMode.SWAP, CachePeekMode.OFFHEAP) :
cache.localPeek(key, CachePeekMode.ONHEAP);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateChangingTopologySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateChangingTopologySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateChangingTopologySelfTest.java
index e53650c..c95c586 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateChangingTopologySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateChangingTopologySelfTest.java
@@ -152,7 +152,7 @@ public class IgniteBinaryMetadataUpdateChangingTopologySelfTest extends GridComm
private List<Object> recordedMsgs = new ArrayList<>();
/** {@inheritDoc} */
- @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackClosure)
+ @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC)
throws IgniteSpiException {
if (msg instanceof GridIoMessage) {
Object msg0 = ((GridIoMessage)msg).message();
@@ -174,7 +174,7 @@ public class IgniteBinaryMetadataUpdateChangingTopologySelfTest extends GridComm
}
}
- super.sendMessage(node, msg, ackClosure);
+ super.sendMessage(node, msg, ackC);
}
/**
@@ -238,6 +238,9 @@ public class IgniteBinaryMetadataUpdateChangingTopologySelfTest extends GridComm
}
}
+ /**
+ *
+ */
private static class TestValue {
/** Field1. */
private String field1;
http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateNodeRestartTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateNodeRestartTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateNodeRestartTest.java
new file mode 100644
index 0000000..814fb08
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateNodeRestartTest.java
@@ -0,0 +1,411 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.cache.CacheException;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheEntryProcessor;
+import org.apache.ignite.cluster.ClusterTopologyException;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+public class IgniteBinaryMetadataUpdateNodeRestartTest extends GridCommonAbstractTest {
+ /** */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private static final String ATOMIC_CACHE = "atomicCache";
+
+ /** */
+ private static final String TX_CACHE = "txCache";
+
+ /** */
+ private static final int SRVS = 3;
+
+ /** */
+ private static final int CLIENTS = 1;
+
+ /** */
+ private boolean client;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+
+ ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);
+
+ cfg.setMarshaller(null);
+
+ CacheConfiguration ccfg1 = cacheConfiguration(TX_CACHE, TRANSACTIONAL);
+ CacheConfiguration ccfg2 = cacheConfiguration(ATOMIC_CACHE, ATOMIC);
+
+ cfg.setCacheConfiguration(ccfg1, ccfg2);
+
+ cfg.setClientMode(client);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ stopAllGrids();
+
+ super.afterTestsStopped();
+ }
+
+ /**
+ * @param name Cache name.
+ * @param atomicityMode Cache atomicity mode.
+ * @return Cache configuration.
+ */
+ private CacheConfiguration cacheConfiguration(String name, CacheAtomicityMode atomicityMode) {
+ CacheConfiguration ccfg = new CacheConfiguration();
+
+ ccfg.setCacheMode(PARTITIONED);
+ ccfg.setBackups(1);
+ ccfg.setWriteSynchronizationMode(FULL_SYNC);
+ ccfg.setName(name);
+ ccfg.setAtomicityMode(atomicityMode);
+
+ return ccfg;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testNodeRestart() throws Exception {
+ for (int i = 0; i < 10; i++) {
+ log.info("Iteration: " + i);
+
+ client = false;
+
+ startGridsMultiThreaded(SRVS);
+
+ client = true;
+
+ startGrid(SRVS);
+
+ final AtomicBoolean stop = new AtomicBoolean();
+
+ try {
+ IgniteInternalFuture<?> restartFut = GridTestUtils.runAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ while (!stop.get()) {
+ log.info("Start node.");
+
+ startGrid(SRVS + CLIENTS);
+
+ log.info("Stop node.");
+
+ stopGrid(SRVS + CLIENTS);
+ }
+
+ return null;
+ }
+ }, "restart-thread");
+
+ final AtomicInteger idx = new AtomicInteger();
+
+ IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ int threadIdx = idx.getAndIncrement();
+
+ int node = threadIdx % (SRVS + CLIENTS);
+
+ Ignite ignite = ignite(node);
+
+ log.info("Started thread: " + ignite.name());
+
+ Thread.currentThread().setName("update-thread-" + threadIdx + "-" + ignite.name());
+
+ IgniteCache<Object, Object> cache1 = ignite.cache(ATOMIC_CACHE);
+ IgniteCache<Object, Object> cache2 = ignite.cache(TX_CACHE);
+
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ while (!stop.get()) {
+ try {
+ cache1.put(new TestClass1(true), create(rnd.nextInt(20) + 1));
+
+ cache1.invoke(new TestClass1(true), new TestEntryProcessor(rnd.nextInt(20) + 1));
+
+ cache2.put(new TestClass1(true), create(rnd.nextInt(20) + 1));
+
+ cache2.invoke(new TestClass1(true), new TestEntryProcessor(rnd.nextInt(20) + 1));
+ }
+ catch (CacheException | IgniteException e) {
+ log.info("Error: " + e);
+
+ if (X.hasCause(e, ClusterTopologyException.class)) {
+ ClusterTopologyException cause = X.cause(e, ClusterTopologyException.class);
+
+ if (cause.retryReadyFuture() != null)
+ cause.retryReadyFuture().get();
+ }
+ }
+ }
+
+ return null;
+ }
+ }, 10, "update-thread");
+
+ U.sleep(5_000);
+
+ stop.set(true);
+
+ restartFut.get();
+
+ fut.get();
+ }
+ finally {
+ stop.set(true);
+
+ stopAllGrids();
+ }
+ }
+ }
+
+ /**
+ * @param id Class ID.
+ * @return Test class instance.
+ */
+ private static Object create(int id) {
+ switch (id) {
+ case 1: return new TestClass1(true);
+
+ case 2: return new TestClass2();
+
+ case 3: return new TestClass3();
+
+ case 4: return new TestClass4();
+
+ case 5: return new TestClass5();
+
+ case 6: return new TestClass6();
+
+ case 7: return new TestClass7();
+
+ case 8: return new TestClass8();
+
+ case 9: return new TestClass9();
+
+ case 10: return new TestClass10();
+
+ case 11: return new TestClass11();
+
+ case 12: return new TestClass12();
+
+ case 13: return new TestClass13();
+
+ case 14: return new TestClass14();
+
+ case 15: return new TestClass15();
+
+ case 16: return new TestClass16();
+
+ case 17: return new TestClass17();
+
+ case 18: return new TestClass18();
+
+ case 19: return new TestClass19();
+
+ case 20: return new TestClass20();
+ }
+
+ fail();
+
+ return null;
+ }
+
+ /**
+ *
+ */
+ static class TestEntryProcessor implements CacheEntryProcessor<Object, Object, Object> {
+ /** */
+ private int id;
+
+ /**
+ * @param id Value id.
+ */
+ public TestEntryProcessor(int id) {
+ this.id = id;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object process(MutableEntry<Object, Object> entry, Object... args) {
+ entry.setValue(create(id));
+
+ return null;
+ }
+ }
+
+ /**
+ *
+ */
+ static class TestClass1 {
+ /** */
+ int val;
+
+ /**
+ * @param setVal Set value flag.
+ */
+ public TestClass1(boolean setVal) {
+ this.val = setVal ? ThreadLocalRandom.current().nextInt(10_000) : 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o) return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ TestClass1 that = (TestClass1)o;
+
+ return val == that.val;
+
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return val;
+ }
+ }
+
+ /**
+ *
+ */
+ static class TestClass2 {}
+
+ /**
+ *
+ */
+ static class TestClass3 {}
+
+ /**
+ *
+ */
+ static class TestClass4 {}
+
+ /**
+ *
+ */
+ static class TestClass5 {}
+
+ /**
+ *
+ */
+ static class TestClass6 {}
+
+ /**
+ *
+ */
+ static class TestClass7 {}
+
+ /**
+ *
+ */
+ static class TestClass8 {}
+
+ /**
+ *
+ */
+ static class TestClass9 {}
+
+ /**
+ *
+ */
+ static class TestClass10 {}
+
+ /**
+ *
+ */
+ static class TestClass11 {}
+
+ /**
+ *
+ */
+ static class TestClass12 {}
+
+ /**
+ *
+ */
+ static class TestClass13 {}
+
+ /**
+ *
+ */
+ static class TestClass14 {}
+
+ /**
+ *
+ */
+ static class TestClass15 {}
+
+ /**
+ *
+ */
+ static class TestClass16 {}
+
+ /**
+ *
+ */
+ static class TestClass17 {}
+
+ /**
+ *
+ */
+ static class TestClass18 {}
+
+ /**
+ *
+ */
+ static class TestClass19 {}
+
+ /**
+ *
+ */
+ static class TestClass20 {}
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java
index 242b12d..8d4af19 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java
@@ -65,6 +65,8 @@ public class IgniteCacheManyClientsTest extends GridCommonAbstractTest {
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
+ cfg.setFailureDetectionTimeout(20_000);
+
cfg.setConnectorConfiguration(null);
cfg.setPeerClassLoadingEnabled(false);
cfg.setTimeServerPortRange(200);