You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by nt...@apache.org on 2016/01/19 11:59:21 UTC
[30/38] ignite git commit: ignite-1811 Optimized cache 'get' on
affinity node.
ignite-1811 Optimized cache 'get' on affinity node.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/83b2bf5e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/83b2bf5e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/83b2bf5e
Branch: refs/heads/ignite-gg-10837
Commit: 83b2bf5e1f287dc83343945b0e47b83ee7724a8e
Parents: d85616b
Author: sboikov <sb...@gridgain.com>
Authored: Mon Jan 18 18:05:37 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Jan 18 18:05:37 2016 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheAdapter.java | 30 +-
.../processors/cache/GridCacheContext.java | 33 ++
.../dht/CacheDistributedGetFutureAdapter.java | 28 +-
.../dht/GridClientPartitionTopology.java | 2 +
.../dht/GridDhtPartitionTopologyImpl.java | 27 +-
.../dht/GridPartitionedGetFuture.java | 241 ++++++-----
.../dht/GridPartitionedSingleGetFuture.java | 229 ++++++----
.../dht/atomic/GridDhtAtomicCache.java | 26 ++
.../distributed/near/GridNearGetFuture.java | 267 +++++++-----
.../cache/transactions/IgniteTxManager.java | 18 +-
.../internal/TestRecordingCommunicationSpi.java | 157 +++++++
...idCacheConfigurationConsistencySelfTest.java | 58 +--
.../cache/IgniteCacheNearLockValueSelfTest.java | 62 +--
.../cache/IgniteCacheStoreCollectionTest.java | 12 +
...eDynamicCacheStartNoExchangeTimeoutTest.java | 7 +
...ridCachePartitionNotLoadedEventSelfTest.java | 7 +-
.../IgniteCacheAtomicNodeRestartTest.java | 2 +
...niteCacheClientNodeChangingTopologyTest.java | 4 +-
.../distributed/IgniteCacheGetRestartTest.java | 280 ++++++++++++
.../IgniteCacheReadFromBackupTest.java | 427 +++++++++++++++++++
.../IgniteCacheSingleGetMessageTest.java | 88 +---
.../IgniteCrossCacheTxStoreSelfTest.java | 1 +
.../GridCacheDhtPreloadMessageCountTest.java | 62 +--
.../near/GridCacheGetStoreErrorSelfTest.java | 9 +-
.../GridCachePartitionedNodeRestartTest.java | 4 +-
...ePartitionedOptimisticTxNodeRestartTest.java | 4 +-
.../IgniteCacheRestartTestSuite2.java | 3 +
.../testsuites/IgniteCacheTestSuite4.java | 2 +
28 files changed, 1524 insertions(+), 566 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/83b2bf5e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 5d4c386..2582e6c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -4540,9 +4540,33 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
* @return Cached value.
* @throws IgniteCheckedException If failed.
*/
- @Nullable public V get(K key, boolean deserializeBinary)
- throws IgniteCheckedException {
- return getAsync(key, deserializeBinary).get();
+ @Nullable public V get(K key, boolean deserializeBinary) throws IgniteCheckedException {
+ checkJta();
+
+ String taskName = ctx.kernalContext().job().currentTaskName();
+
+ return get(key, taskName, deserializeBinary);
+ }
+
+ /**
+ * @param key Key.
+ * @param taskName Task name.
+ * @param deserializeBinary Deserialize binary flag.
+ * @return Cached value.
+ * @throws IgniteCheckedException If failed.
+ */
+ protected V get(
+ final K key,
+ String taskName,
+ boolean deserializeBinary) throws IgniteCheckedException {
+ return getAsync(key,
+ !ctx.config().isReadFromBackup(),
+ /*skip tx*/false,
+ null,
+ taskName,
+ deserializeBinary,
+ false,
+ /*can remap*/true).get();
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/83b2bf5e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index c10ebf3..fc48b9d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -111,6 +111,7 @@ import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_VALUES;
import static org.apache.ignite.cache.CacheRebalanceMode.NONE;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING;
/**
* Cache context.
@@ -1434,6 +1435,13 @@ public class GridCacheContext<K, V> implements Externalizable {
}
/**
+ * @return {@code True} if store and read-through mode are enabled in configuration.
+ */
+ public boolean readThroughConfigured() {
+ return store().configured() && cacheCfg.isReadThrough();
+ }
+
+ /**
* @return {@code True} if {@link CacheConfiguration#isLoadPreviousValue()} flag is set.
*/
public boolean loadPreviousValue() {
@@ -1961,6 +1969,31 @@ public class GridCacheContext<K, V> implements Externalizable {
});
}
+ /**
+ * @param part Partition.
+ * @param affNodes Affinity nodes.
+ * @param topVer Topology version.
+ * @return {@code True} if cache 'get' operation is allowed to get entry locally.
+ */
+ public boolean allowFastLocalRead(int part, List<ClusterNode> affNodes, AffinityTopologyVersion topVer) {
+ return affinityNode() && rebalanceEnabled() && hasPartition(part, affNodes, topVer);
+ }
+
+ /**
+ * @param part Partition.
+ * @param affNodes Affinity nodes.
+ * @param topVer Topology version.
+ * @return {@code True} if partition is available locally.
+ */
+ private boolean hasPartition(int part, List<ClusterNode> affNodes, AffinityTopologyVersion topVer) {
+ assert affinityNode();
+
+ GridDhtPartitionTopology top = topology();
+
+ return (top.rebalanceFinished(topVer) && (isReplicated() || affNodes.contains(locNode)))
+ || (top.partitionState(localNodeId(), part) == OWNING);
+ }
+
/** {@inheritDoc} */
@Override public void writeExternal(ObjectOutput out) throws IOException {
U.writeString(out, gridName());
http://git-wip-us.apache.org/repos/asf/ignite/blob/83b2bf5e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
index c43cce9..40eec63 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
@@ -24,6 +24,7 @@ import java.util.UUID;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheFuture;
@@ -39,6 +40,7 @@ import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_NEAR_GET_MAX_REMAPS;
import static org.apache.ignite.IgniteSystemProperties.getInteger;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING;
/**
*
@@ -168,14 +170,11 @@ public abstract class CacheDistributedGetFutureAdapter<K, V> extends GridCompoun
/**
* Affinity node to send get request to.
*
- * @param key Key to get.
- * @param topVer Topology version.
+ * @param affNodes All affinity nodes.
* @return Affinity node to get key from.
*/
- protected final ClusterNode affinityNode(KeyCacheObject key, AffinityTopologyVersion topVer) {
+ protected final ClusterNode affinityNode(List<ClusterNode> affNodes) {
if (!canRemap) {
- List<ClusterNode> affNodes = cctx.affinity().nodes(key, topVer);
-
for (ClusterNode node : affNodes) {
if (cctx.discovery().alive(node))
return node;
@@ -184,6 +183,23 @@ public abstract class CacheDistributedGetFutureAdapter<K, V> extends GridCompoun
return null;
}
else
- return cctx.affinity().primary(key, topVer);
+ return affNodes.get(0);
+ }
+
+ /**
+ * @param part Partition.
+ * @return {@code True} if partition is in owned state.
+ */
+ protected final boolean partitionOwned(int part) {
+ return cctx.topology().partitionState(cctx.localNodeId(), part) == OWNING;
+ }
+
+ /**
+ * @param topVer Topology version.
+ * @return Exception.
+ */
+ protected final ClusterTopologyServerNotFoundException serverNotFoundError(AffinityTopologyVersion topVer) {
+ return new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
+ "(all partition nodes left the grid) [topVer=" + topVer + ", cache=" + cctx.name() + ']');
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/83b2bf5e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index 8aef5ad..dcfc038 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -882,6 +882,8 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
/** {@inheritDoc} */
@Override public boolean rebalanceFinished(AffinityTopologyVersion topVer) {
+ assert false : "Should not be called on non-affinity node";
+
return false;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/83b2bf5e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index a0709c5..2ab8a12 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -88,7 +88,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
private GridDhtPartitionExchangeId lastExchangeId;
/** */
- private AffinityTopologyVersion topVer = AffinityTopologyVersion.NONE;
+ private volatile AffinityTopologyVersion topVer = AffinityTopologyVersion.NONE;
/** */
private volatile boolean stopping;
@@ -136,9 +136,9 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
topReadyFut = null;
- topVer = AffinityTopologyVersion.NONE;
-
rebalancedTopVer = AffinityTopologyVersion.NONE;
+
+ topVer = AffinityTopologyVersion.NONE;
}
finally {
lock.writeLock().unlock();
@@ -223,13 +223,13 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
this.stopping = stopping;
- topVer = exchId.topologyVersion();
-
updateSeq.setIfGreater(updSeq);
topReadyFut = exchFut;
rebalancedTopVer = AffinityTopologyVersion.NONE;
+
+ topVer = exchId.topologyVersion();
}
finally {
lock.writeLock().unlock();
@@ -238,17 +238,12 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
/** {@inheritDoc} */
@Override public AffinityTopologyVersion topologyVersion() {
- lock.readLock().lock();
+ AffinityTopologyVersion topVer = this.topVer;
- try {
- assert topVer.topologyVersion() > 0 : "Invalid topology version [topVer=" + topVer +
- ", cacheName=" + cctx.name() + ']';
+ assert topVer.topologyVersion() > 0 : "Invalid topology version [topVer=" + topVer +
+ ", cacheName=" + cctx.name() + ']';
- return topVer;
- }
- finally {
- lock.readLock().unlock();
- }
+ return topVer;
}
/** {@inheritDoc} */
@@ -1336,7 +1331,9 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
/** {@inheritDoc} */
@Override public boolean rebalanceFinished(AffinityTopologyVersion topVer) {
- return topVer.equals(rebalancedTopVer);
+ AffinityTopologyVersion curTopVer = this.topVer;
+
+ return curTopVer.equals(topVer) && curTopVer.equals(rebalancedTopVer);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/83b2bf5e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index 19df1c2..1f2d7c5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
@@ -234,15 +235,16 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
Map<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> mapped,
AffinityTopologyVersion topVer
) {
- if (CU.affinityNodes(cctx, topVer).isEmpty()) {
+ Collection<ClusterNode> cacheNodes = CU.affinityNodes(cctx, topVer);
+
+ if (cacheNodes.isEmpty()) {
onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
"(all partition nodes left the grid) [topVer=" + topVer + ", cache=" + cctx.name() + ']'));
return;
}
- Map<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> mappings =
- U.newHashMap(CU.affinityNodes(cctx, topVer).size());
+ Map<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> mappings = U.newHashMap(cacheNodes.size());
final int keysSize = keys.size();
@@ -374,135 +376,160 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
AffinityTopologyVersion topVer,
Map<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> mapped
) {
- GridDhtCacheAdapter<K, V> colocated = cache();
+ int part = cctx.affinity().partition(key);
- boolean remote = false;
+ List<ClusterNode> affNodes = cctx.affinity().nodes(part, topVer);
- // Allow to get cached value from the local node.
- boolean allowLocRead = (cctx.affinityNode() && !forcePrimary) ||
- cctx.affinity().primary(cctx.localNode(), key, topVer);
+ if (affNodes.isEmpty()) {
+ onDone(serverNotFoundError(topVer));
- while (true) {
- GridCacheEntryEx entry;
+ return false;
+ }
- try {
- if (allowLocRead) {
- try {
- entry = colocated.context().isSwapOrOffheapEnabled() ? colocated.entryEx(key) :
- colocated.peekEx(key);
-
- // If our DHT cache do has value, then we peek it.
- if (entry != null) {
- boolean isNew = entry.isNewLocked();
-
- CacheObject v = null;
- GridCacheVersion ver = null;
-
- if (needVer) {
- T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(
- null,
- /*swap*/true,
- /*unmarshal*/true,
- /**update-metrics*/false,
- /*event*/!skipVals,
- subjId,
- null,
- taskName,
- expiryPlc,
- !deserializeBinary);
-
- if (res != null) {
- v = res.get1();
- ver = res.get2();
- }
- }
- else {
- v = entry.innerGet(null,
- /*swap*/true,
- /*read-through*/false,
- /*fail-fast*/true,
- /*unmarshal*/true,
- /**update-metrics*/false,
- /*event*/!skipVals,
- /*temporary*/false,
- subjId,
- null,
- taskName,
- expiryPlc,
- !deserializeBinary);
- }
+ boolean fastLocGet = (!forcePrimary || affNodes.get(0).isLocal()) &&
+ cctx.allowFastLocalRead(part, affNodes, topVer);
- colocated.context().evicts().touch(entry, topVer);
+ if (fastLocGet && localGet(key, part, locVals))
+ return false;
- // Entry was not in memory or in swap, so we remove it from cache.
- if (v == null) {
- if (isNew && entry.markObsoleteIfEmpty(ver))
- colocated.removeIfObsolete(key);
- }
- else {
- if (needVer)
- versionedResult(locVals, key, v, ver);
- else
- cctx.addResult(locVals,
- key,
- v,
- skipVals,
- keepCacheObjects,
- deserializeBinary,
- true);
-
- return false;
- }
- }
- }
- catch (GridDhtInvalidPartitionException ignored) {
- // No-op.
- }
- }
+ ClusterNode node = affinityNode(affNodes);
- ClusterNode node = affinityNode(key, topVer);
+ if (node == null) {
+ onDone(serverNotFoundError(topVer));
- if (node == null) {
- onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
- "(all partition nodes left the grid)."));
+ return false;
+ }
- return false;
- }
+ boolean remote = !node.isLocal();
+
+ LinkedHashMap<KeyCacheObject, Boolean> keys = mapped.get(node);
+
+ if (keys != null && keys.containsKey(key)) {
+ if (REMAP_CNT_UPD.incrementAndGet(this) > MAX_REMAP_CNT) {
+ onDone(new ClusterTopologyCheckedException("Failed to remap key to a new node after " +
+ MAX_REMAP_CNT + " attempts (key got remapped to the same node) [key=" + key + ", node=" +
+ U.toShortString(node) + ", mappings=" + mapped + ']'));
+
+ return false;
+ }
+ }
+
+ LinkedHashMap<KeyCacheObject, Boolean> old = mappings.get(node);
+
+ if (old == null)
+ mappings.put(node, old = new LinkedHashMap<>(3, 1f));
+
+ old.put(key, false);
+
+ return remote;
+ }
- remote = !node.isLocal();
+ /**
+ * @param key Key.
+ * @param part Partition.
+ * @param locVals Local values.
+ * @return {@code True} if there is no need to further search value.
+ */
+ private boolean localGet(KeyCacheObject key, int part, Map<K, V> locVals) {
+ assert cctx.affinityNode() : this;
- LinkedHashMap<KeyCacheObject, Boolean> keys = mapped.get(node);
+ GridDhtCacheAdapter<K, V> cache = cache();
- if (keys != null && keys.containsKey(key)) {
- if (REMAP_CNT_UPD.incrementAndGet(this) > MAX_REMAP_CNT) {
- onDone(new ClusterTopologyCheckedException("Failed to remap key to a new node after " +
- MAX_REMAP_CNT + " attempts (key got remapped to the same node) [key=" + key + ", node=" +
- U.toShortString(node) + ", mappings=" + mapped + ']'));
+ while (true) {
+ GridCacheEntryEx entry;
- return false;
+ try {
+ entry = cache.context().isSwapOrOffheapEnabled() ? cache.entryEx(key) : cache.peekEx(key);
+
+ // If our DHT cache do has value, then we peek it.
+ if (entry != null) {
+ boolean isNew = entry.isNewLocked();
+
+ CacheObject v = null;
+ GridCacheVersion ver = null;
+
+ if (needVer) {
+ T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(
+ null,
+ /*swap*/true,
+ /*unmarshal*/true,
+ /**update-metrics*/false,
+ /*event*/!skipVals,
+ subjId,
+ null,
+ taskName,
+ expiryPlc,
+ !deserializeBinary);
+
+ if (res != null) {
+ v = res.get1();
+ ver = res.get2();
+ }
+ }
+ else {
+ v = entry.innerGet(null,
+ /*swap*/true,
+ /*read-through*/false,
+ /*fail-fast*/true,
+ /*unmarshal*/true,
+ /**update-metrics*/false,
+ /*event*/!skipVals,
+ /*temporary*/false,
+ subjId,
+ null,
+ taskName,
+ expiryPlc,
+ !deserializeBinary);
+ }
+
+ cache.context().evicts().touch(entry, topVer);
+
+ // Entry was not in memory or in swap, so we remove it from cache.
+ if (v == null) {
+ if (isNew && entry.markObsoleteIfEmpty(ver))
+ cache.removeIfObsolete(key);
+ }
+ else {
+ if (needVer)
+ versionedResult(locVals, key, v, ver);
+ else {
+ cctx.addResult(locVals,
+ key,
+ v,
+ skipVals,
+ keepCacheObjects,
+ deserializeBinary,
+ true);
+ }
+
+ return true;
}
}
- LinkedHashMap<KeyCacheObject, Boolean> old = mappings.get(node);
+ boolean topStable = cctx.isReplicated() || topVer.equals(cctx.topology().topologyVersion());
- if (old == null)
- mappings.put(node, old = new LinkedHashMap<>(3, 1f));
+ // Entry not found, do not continue search if topology did not change and there is no store.
+ if (!cctx.readThroughConfigured() && (topStable || partitionOwned(part))) {
+ if (!skipVals && cctx.config().isStatisticsEnabled())
+ cache.metrics0().onRead(false);
- old.put(key, false);
+ return true;
+ }
- break;
+ return false;
+ }
+ catch (GridCacheEntryRemovedException ignored) {
+ // No-op, will retry.
+ }
+ catch (GridDhtInvalidPartitionException ignored) {
+ return false;
}
catch (IgniteCheckedException e) {
onDone(e);
- break;
- }
- catch (GridCacheEntryRemovedException ignored) {
- // No-op, will retry.
+ return true;
}
}
-
- return remote;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/83b2bf5e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
index 29971fd..0c811ae 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
@@ -58,6 +58,8 @@ import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.jetbrains.annotations.Nullable;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING;
+
/**
*
*/
@@ -319,105 +321,140 @@ public class GridPartitionedSingleGetFuture extends GridFutureAdapter<Object> im
* @return Primary node or {@code null} if future was completed.
*/
@Nullable private ClusterNode mapKeyToNode(AffinityTopologyVersion topVer) {
- ClusterNode primary = affinityNode(key, topVer);
+ int part = cctx.affinity().partition(key);
+
+ List<ClusterNode> affNodes = cctx.affinity().nodes(part, topVer);
- if (primary == null) {
- onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
- "(all partition nodes left the grid) [topVer=" + topVer + ", cache=" + cctx.name() + ']'));
+ if (affNodes.isEmpty()) {
+ onDone(serverNotFoundError(topVer));
return null;
}
- boolean allowLocRead = (cctx.affinityNode() && !forcePrimary) || primary.isLocal();
-
- if (allowLocRead) {
- GridDhtCacheAdapter colocated = cctx.dht();
-
- while (true) {
- GridCacheEntryEx entry;
-
- try {
- entry = colocated.context().isSwapOrOffheapEnabled() ? colocated.entryEx(key) :
- colocated.peekEx(key);
-
- // If our DHT cache do has value, then we peek it.
- if (entry != null) {
- boolean isNew = entry.isNewLocked();
-
- CacheObject v = null;
- GridCacheVersion ver = null;
-
- if (needVer) {
- T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(
- null,
- /*swap*/true,
- /*unmarshal*/true,
- /**update-metrics*/false,
- /*event*/!skipVals,
- subjId,
- null,
- taskName,
- expiryPlc,
- true);
-
- if (res != null) {
- v = res.get1();
- ver = res.get2();
- }
- }
- else {
- v = entry.innerGet(null,
- /*swap*/true,
- /*read-through*/false,
- /*fail-fast*/true,
- /*unmarshal*/true,
- /**update-metrics*/false,
- /*event*/!skipVals,
- /*temporary*/false,
- subjId,
- null,
- taskName,
- expiryPlc,
- true);
- }
+ boolean fastLocGet = (!forcePrimary || affNodes.get(0).isLocal()) &&
+ cctx.allowFastLocalRead(part, affNodes, topVer);
- colocated.context().evicts().touch(entry, topVer);
+ if (fastLocGet && localGet(topVer, part))
+ return null;
- // Entry was not in memory or in swap, so we remove it from cache.
- if (v == null) {
- if (isNew && entry.markObsoleteIfEmpty(ver))
- colocated.removeIfObsolete(key);
- }
- else {
- if (!skipVals && cctx.config().isStatisticsEnabled())
- cctx.cache().metrics0().onRead(true);
+ ClusterNode affNode = affinityNode(affNodes);
+
+ if (affNode == null) {
+ onDone(serverNotFoundError(topVer));
+
+ return null;
+ }
+
+ return affNode;
+ }
+
+ /**
+ * @param topVer Topology version.
+ * @param part Partition.
+ * @return {@code True} if future completed.
+ */
+ private boolean localGet(AffinityTopologyVersion topVer, int part) {
+ assert cctx.affinityNode() : this;
+
+ GridDhtCacheAdapter colocated = cctx.dht();
- if (!skipVals)
- setResult(v, ver);
- else
- setSkipValueResult(true, ver);
+ while (true) {
+ GridCacheEntryEx entry;
- return null;
+ try {
+ entry = colocated.context().isSwapOrOffheapEnabled() ? colocated.entryEx(key) :
+ colocated.peekEx(key);
+
+ // If our DHT cache do has value, then we peek it.
+ if (entry != null) {
+ boolean isNew = entry.isNewLocked();
+
+ CacheObject v = null;
+ GridCacheVersion ver = null;
+
+ if (needVer) {
+ T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(
+ null,
+ /*swap*/true,
+ /*unmarshal*/true,
+ /**update-metrics*/false,
+ /*event*/!skipVals,
+ subjId,
+ null,
+ taskName,
+ expiryPlc,
+ true);
+
+ if (res != null) {
+ v = res.get1();
+ ver = res.get2();
}
}
+ else {
+ v = entry.innerGet(null,
+ /*swap*/true,
+ /*read-through*/false,
+ /*fail-fast*/true,
+ /*unmarshal*/true,
+ /**update-metrics*/false,
+ /*event*/!skipVals,
+ /*temporary*/false,
+ subjId,
+ null,
+ taskName,
+ expiryPlc,
+ true);
+ }
- break;
- }
- catch (GridDhtInvalidPartitionException ignored) {
- break;
- }
- catch (IgniteCheckedException e) {
- onDone(e);
+ colocated.context().evicts().touch(entry, topVer);
+
+ // Entry was not in memory or in swap, so we remove it from cache.
+ if (v == null) {
+ if (isNew && entry.markObsoleteIfEmpty(ver))
+ colocated.removeIfObsolete(key);
+ }
+ else {
+ if (!skipVals && cctx.config().isStatisticsEnabled())
+ cctx.cache().metrics0().onRead(true);
+
+ if (!skipVals)
+ setResult(v, ver);
+ else
+ setSkipValueResult(true, ver);
- return null;
+ return true;
+ }
}
- catch (GridCacheEntryRemovedException ignored) {
- // No-op, will retry.
+
+ boolean topStable = cctx.isReplicated() || topVer.equals(cctx.topology().topologyVersion());
+
+ // Entry not found, complete future with null result if topology did not change and there is no store.
+ if (!cctx.readThroughConfigured() && (topStable || partitionOwned(part))) {
+ if (!skipVals && cctx.config().isStatisticsEnabled())
+ colocated.metrics0().onRead(false);
+
+ if (skipVals)
+ setSkipValueResult(false, null);
+ else
+ setResult(null, null);
+
+ return true;
}
+
+ return false;
}
- }
+ catch (GridCacheEntryRemovedException ignored) {
+ // No-op, will retry.
+ }
+ catch (GridDhtInvalidPartitionException ignored) {
+ return false;
+ }
+ catch (IgniteCheckedException e) {
+ onDone(e);
- return primary;
+ return true;
+ }
+ }
}
/**
@@ -595,7 +632,7 @@ public class GridPartitionedSingleGetFuture extends GridFutureAdapter<Object> im
}
else {
if (!keepCacheObjects) {
- Object res = cctx.unwrapBinaryIfNeeded(val, !deserializeBinary && !skipVals);
+ Object res = cctx.unwrapBinaryIfNeeded(val, !deserializeBinary);
onDone(res);
}
@@ -612,16 +649,30 @@ public class GridPartitionedSingleGetFuture extends GridFutureAdapter<Object> im
}
/**
+ * @param part Partition.
+ * @return {@code True} if partition is in owned state.
+ */
+ private boolean partitionOwned(int part) {
+ return cctx.topology().partitionState(cctx.localNodeId(), part) == OWNING;
+ }
+
+ /**
+ * @param topVer Topology version.
+ * @return Exception.
+ */
+ private ClusterTopologyServerNotFoundException serverNotFoundError(AffinityTopologyVersion topVer) {
+ return new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
+ "(all partition nodes left the grid) [topVer=" + topVer + ", cache=" + cctx.name() + ']');
+ }
+
+ /**
* Affinity node to send get request to.
*
- * @param key Key to get.
- * @param topVer Topology version.
+ * @param affNodes All affinity nodes.
* @return Affinity node to get key from.
*/
- private ClusterNode affinityNode(KeyCacheObject key, AffinityTopologyVersion topVer) {
+ @Nullable private ClusterNode affinityNode(List<ClusterNode> affNodes) {
if (!canRemap) {
- List<ClusterNode> affNodes = cctx.affinity().nodes(key, topVer);
-
for (ClusterNode node : affNodes) {
if (cctx.discovery().alive(node))
return node;
@@ -630,7 +681,7 @@ public class GridPartitionedSingleGetFuture extends GridFutureAdapter<Object> im
return null;
}
else
- return cctx.affinity().primary(key, topVer);
+ return affNodes.get(0);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/83b2bf5e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 393413e..81fd5d6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -317,6 +317,32 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
/** {@inheritDoc} */
+ @Override protected V get(K key, String taskName, boolean deserializeBinary) throws IgniteCheckedException {
+ ctx.checkSecurity(SecurityPermission.CACHE_READ);
+
+ if (keyCheck)
+ validateCacheKey(key);
+
+ CacheOperationContext opCtx = ctx.operationContextPerCall();
+
+ UUID subjId = ctx.subjectIdPerCall(null, opCtx);
+
+ final ExpiryPolicy expiryPlc = opCtx != null ? opCtx.expiry() : null;
+
+ final boolean skipStore = opCtx != null && opCtx.skipStore();
+
+ return getAsync0(ctx.toCacheKeyObject(key),
+ !ctx.config().isReadFromBackup(),
+ subjId,
+ taskName,
+ deserializeBinary,
+ expiryPlc,
+ false,
+ skipStore,
+ true).get();
+ }
+
+ /** {@inheritDoc} */
@Override protected IgniteInternalFuture<V> getAsync(final K key,
final boolean forcePrimary,
final boolean skipTx,
http://git-wip-us.apache.org/repos/asf/ignite/blob/83b2bf5e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
index c547a88..9291001 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
@@ -405,10 +406,20 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
Map<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> mapped,
Map<KeyCacheObject, GridNearCacheEntry> saved
) {
+ int part = cctx.affinity().partition(key);
+
+ List<ClusterNode> affNodes = cctx.affinity().nodes(part, topVer);
+
+ if (affNodes.isEmpty()) {
+ onDone(serverNotFoundError(topVer));
+
+ return null;
+ }
+
final GridNearCacheAdapter near = cache();
// Allow to get cached value from the local node.
- boolean allowLocRead = !forcePrimary || cctx.affinity().primary(cctx.localNode(), key, topVer);
+ boolean allowLocRead = !forcePrimary || cctx.localNode().equals(affNodes.get(0));
while (true) {
GridNearCacheEntry entry = allowLocRead ? (GridNearCacheEntry)near.peekEx(key) : null;
@@ -456,124 +467,23 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
}
}
- ClusterNode affNode = null;
-
- if (v == null && allowLocRead && cctx.affinityNode()) {
- GridDhtCacheAdapter<K, V> dht = cache().dht();
-
- GridCacheEntryEx dhtEntry = null;
-
- try {
- dhtEntry = dht.context().isSwapOrOffheapEnabled() ? dht.entryEx(key) : dht.peekEx(key);
-
- // If near cache does not have value, then we peek DHT cache.
- if (dhtEntry != null) {
- boolean isNew = dhtEntry.isNewLocked() || !dhtEntry.valid(topVer);
-
- if (needVer) {
- T2<CacheObject, GridCacheVersion> res = dhtEntry.innerGetVersioned(
- null,
- /*swap*/true,
- /*unmarshal*/true,
- /**update-metrics*/false,
- /*event*/!isNear && !skipVals,
- subjId,
- null,
- taskName,
- expiryPlc,
- !deserializeBinary);
-
- if (res != null) {
- v = res.get1();
- ver = res.get2();
- }
- }
- else {
- v = dhtEntry.innerGet(tx,
- /*swap*/true,
- /*read-through*/false,
- /*fail-fast*/true,
- /*unmarshal*/true,
- /*update-metrics*/false,
- /*events*/!isNear && !skipVals,
- /*temporary*/false,
- subjId,
- null,
- taskName,
- expiryPlc,
- !deserializeBinary);
- }
-
- // Entry was not in memory or in swap, so we remove it from cache.
- if (v == null && isNew && dhtEntry.markObsoleteIfEmpty(ver))
- dht.removeIfObsolete(key);
- }
-
- if (v != null) {
- if (cctx.cache().configuration().isStatisticsEnabled() && !skipVals)
- near.metrics0().onRead(true);
- }
- else {
- affNode = affinityNode(key, topVer);
-
- if (affNode == null) {
- onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
- "(all partition nodes left the grid)."));
-
- return saved;
- }
-
- if (!affNode.isLocal() && cctx.cache().configuration().isStatisticsEnabled() && !skipVals)
- near.metrics0().onRead(false);
- }
- }
- catch (GridDhtInvalidPartitionException | GridCacheEntryRemovedException ignored) {
- // No-op.
- }
- finally {
- if (dhtEntry != null && (tx == null || (!tx.implicit() && tx.isolation() == READ_COMMITTED))) {
- dht.context().evicts().touch(dhtEntry, topVer);
-
- entry = null;
- }
- }
- }
-
- if (v != null) {
- if (needVer) {
- V val0 = (V)new T2<>(skipVals ? true : v, ver);
+ if (v == null) {
+ boolean fastLocGet = allowLocRead && cctx.allowFastLocalRead(part, affNodes, topVer);
- add(new GridFinishedFuture<>(Collections.singletonMap((K)key, val0)));
- }
- else {
- if (keepCacheObjects) {
- K key0 = (K)key;
- V val0 = (V)(skipVals ? true : v);
+ if (fastLocGet && localDhtGet(key, part, topVer, isNear))
+ break;
- add(new GridFinishedFuture<>(Collections.singletonMap(key0, val0)));
- }
- else {
- K key0 = (K)cctx.unwrapBinaryIfNeeded(key, !deserializeBinary, false);
- V val0 = !skipVals ?
- (V)cctx.unwrapBinaryIfNeeded(v, !deserializeBinary, false) :
- (V)Boolean.TRUE;
+ ClusterNode affNode = affinityNode(affNodes);
- add(new GridFinishedFuture<>(Collections.singletonMap(key0, val0)));
- }
- }
- }
- else {
if (affNode == null) {
- affNode = affinityNode(key, topVer);
-
- if (affNode == null) {
- onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
- "(all partition nodes left the grid)."));
+ onDone(serverNotFoundError(topVer));
- return saved;
- }
+ return saved;
}
+ if (cctx.cache().configuration().isStatisticsEnabled() && !skipVals && !affNode.isLocal())
+ cache().metrics0().onRead(false);
+
LinkedHashMap<KeyCacheObject, Boolean> keys = mapped.get(affNode);
if (keys != null && keys.containsKey(key)) {
@@ -586,7 +496,7 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
}
}
- if (!cctx.affinity().localNode(key, topVer)) {
+ if (!affNodes.contains(cctx.localNode())) {
GridNearCacheEntry nearEntry = entry != null ? entry : near.entryExx(key, topVer);
nearEntry.reserveEviction();
@@ -612,6 +522,8 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
old.put(key, addRdr);
}
+ else
+ addResult(key, v, ver);
break;
}
@@ -633,6 +545,135 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
}
/**
+ * @param key Key.
+ * @param part Partition.
+ * @param topVer Topology version.
+ * @param nearRead {@code True} if already tried to read from near cache.
+ * @return {@code True} if there is no need to further search value.
+ */
+ private boolean localDhtGet(KeyCacheObject key,
+ int part,
+ AffinityTopologyVersion topVer,
+ boolean nearRead) {
+ GridDhtCacheAdapter<K, V> dht = cache().dht();
+
+ assert dht.context().affinityNode() : this;
+
+ while (true) {
+ GridCacheEntryEx dhtEntry = null;
+
+ try {
+ dhtEntry = dht.context().isSwapOrOffheapEnabled() ? dht.entryEx(key) : dht.peekEx(key);
+
+ CacheObject v = null;
+
+ // If near cache does not have value, then we peek DHT cache.
+ if (dhtEntry != null) {
+ boolean isNew = dhtEntry.isNewLocked() || !dhtEntry.valid(topVer);
+
+ if (needVer) {
+ T2<CacheObject, GridCacheVersion> res = dhtEntry.innerGetVersioned(
+ null,
+ /*swap*/true,
+ /*unmarshal*/true,
+ /**update-metrics*/false,
+ /*event*/!nearRead && !skipVals,
+ subjId,
+ null,
+ taskName,
+ expiryPlc,
+ !deserializeBinary);
+
+ if (res != null) {
+ v = res.get1();
+ ver = res.get2();
+ }
+ }
+ else {
+ v = dhtEntry.innerGet(tx,
+ /*swap*/true,
+ /*read-through*/false,
+ /*fail-fast*/true,
+ /*unmarshal*/true,
+ /*update-metrics*/false,
+ /*events*/!nearRead && !skipVals,
+ /*temporary*/false,
+ subjId,
+ null,
+ taskName,
+ expiryPlc,
+ !deserializeBinary);
+ }
+
+ // Entry was not in memory or in swap, so we remove it from cache.
+ if (v == null && isNew && dhtEntry.markObsoleteIfEmpty(ver))
+ dht.removeIfObsolete(key);
+ }
+
+ if (v != null) {
+ if (cctx.cache().configuration().isStatisticsEnabled() && !skipVals)
+ cache().metrics0().onRead(true);
+
+ addResult(key, v, ver);
+
+ return true;
+ }
+ else {
+ boolean topStable = cctx.isReplicated() || topVer.equals(cctx.topology().topologyVersion());
+
+ // Entry not found, do not continue search if topology did not change and there is no store.
+ return !cctx.readThroughConfigured() && (topStable || partitionOwned(part));
+ }
+ }
+ catch (GridCacheEntryRemovedException ignored) {
+ // Retry.
+ }
+ catch (GridDhtInvalidPartitionException e) {
+ return false;
+ }
+ catch (IgniteCheckedException e) {
+ onDone(e);
+
+ return false;
+ }
+ finally {
+ if (dhtEntry != null && (tx == null || (!tx.implicit() && tx.isolation() == READ_COMMITTED)))
+ dht.context().evicts().touch(dhtEntry, topVer);
+ }
+ }
+ }
+
+ /**
+ * @param key Key.
+ * @param v Value.
+ * @param ver Version.
+ */
+ @SuppressWarnings("unchecked")
+ private void addResult(KeyCacheObject key, CacheObject v, GridCacheVersion ver) {
+ if (needVer) {
+ V val0 = (V)new T2<>(skipVals ? true : v, ver);
+
+ add(new GridFinishedFuture<>(Collections.singletonMap((K)key, val0)));
+ }
+ else {
+ if (keepCacheObjects) {
+ K key0 = (K)key;
+ V val0 = (V)(skipVals ? true : v);
+
+ add(new GridFinishedFuture<>(Collections.singletonMap(key0, val0)));
+ }
+ else {
+ K key0 = (K)cctx.unwrapBinaryIfNeeded(key, !deserializeBinary, false);
+ V val0 = !skipVals ?
+ (V)cctx.unwrapBinaryIfNeeded(v, !deserializeBinary, false) :
+ (V)Boolean.TRUE;
+
+ add(new GridFinishedFuture<>(Collections.singletonMap(key0, val0)));
+ }
+ }
+ }
+
+ /**
* @return Near cache.
*/
private GridNearCacheAdapter<K, V> cache() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/83b2bf5e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index ca15e20..7a3b8ff 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -619,17 +619,19 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
return topVer;
}
- for (GridCacheContext cacheCtx : cctx.cache().context().cacheContexts()) {
- if (!cacheCtx.systemTx())
- continue;
+ if (!sysThreadMap.isEmpty()) {
+ for (GridCacheContext cacheCtx : cctx.cache().context().cacheContexts()) {
+ if (!cacheCtx.systemTx())
+ continue;
- tx = sysThreadMap.get(new TxThreadKey(threadId, cacheCtx.cacheId()));
+ tx = sysThreadMap.get(new TxThreadKey(threadId, cacheCtx.cacheId()));
- if (tx != null && tx != ignore) {
- AffinityTopologyVersion topVer = tx.topologyVersionSnapshot();
+ if (tx != null && tx != ignore) {
+ AffinityTopologyVersion topVer = tx.topologyVersionSnapshot();
- if (topVer != null)
- return topVer;
+ if (topVer != null)
+ return topVer;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/83b2bf5e/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java b/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java
new file mode 100644
index 0000000..8a602ad
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_GRID_NAME;
+
+/**
+ *
+ */
+public class TestRecordingCommunicationSpi extends TcpCommunicationSpi {
+ /** */
+ private Class<?> recordCls;
+
+ /** */
+ private List<Object> recordedMsgs = new ArrayList<>();
+
+ /** */
+ private List<T2<ClusterNode, GridIoMessage>> blockedMsgs = new ArrayList<>();
+
+ /** */
+ private Map<Class<?>, Set<String>> blockCls = new HashMap<>();
+
+ /** */
+ private IgnitePredicate<GridIoMessage> blockP;
+
+ /** {@inheritDoc} */
+ @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC)
+ throws IgniteSpiException {
+ if (msg instanceof GridIoMessage) {
+ GridIoMessage ioMsg = (GridIoMessage)msg;
+
+ Object msg0 = ioMsg.message();
+
+ synchronized (this) {
+ if (recordCls != null && msg0.getClass().equals(recordCls))
+ recordedMsgs.add(msg0);
+
+ boolean block = false;
+
+ if (blockP != null && blockP.apply(ioMsg))
+ block = true;
+ else {
+ Set<String> blockNodes = blockCls.get(msg0.getClass());
+
+ if (blockNodes != null) {
+ String nodeName = (String)node.attributes().get(ATTR_GRID_NAME);
+
+ block = blockNodes.contains(nodeName);
+ }
+ }
+
+ if (block) {
+ blockedMsgs.add(new T2<>(node, ioMsg));
+
+ return;
+ }
+ }
+ }
+
+ super.sendMessage(node, msg, ackC);
+ }
+
+ /**
+ * @param recordCls Message class to record.
+ */
+ public void record(@Nullable Class<?> recordCls) {
+ synchronized (this) {
+ this.recordCls = recordCls;
+ }
+ }
+
+ /**
+ * @return Recorded messages.
+ */
+ public List<Object> recordedMessages() {
+ synchronized (this) {
+ List<Object> msgs = recordedMsgs;
+
+ recordedMsgs = new ArrayList<>();
+
+ return msgs;
+ }
+ }
+
+ /**
+ * @param blockP Message block predicate.
+ */
+ public void blockMessages(IgnitePredicate<GridIoMessage> blockP) {
+ synchronized (this) {
+ this.blockP = blockP;
+ }
+ }
+
+ /**
+ * @param cls Message class.
+ * @param nodeName Node name.
+ */
+ public void blockMessages(Class<?> cls, String nodeName) {
+ synchronized (this) {
+ Set<String> set = blockCls.get(cls);
+
+ if (set == null) {
+ set = new HashSet<>();
+
+ blockCls.put(cls, set);
+ }
+
+ set.add(nodeName);
+ }
+ }
+
+ /**
+ * Stops block messages and sends all already blocked messages.
+ */
+ public void stopBlock() {
+ synchronized (this) {
+ blockCls.clear();
+
+ for (T2<ClusterNode, GridIoMessage> msg : blockedMsgs)
+ super.sendMessage(msg.get1(), msg.get2());
+
+ blockedMsgs.clear();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/83b2bf5e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConfigurationConsistencySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConfigurationConsistencySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConfigurationConsistencySelfTest.java
index e28e89f..a1f917f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConfigurationConsistencySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConfigurationConsistencySelfTest.java
@@ -19,25 +19,19 @@ package org.apache.ignite.internal.processors.cache;
import java.io.Externalizable;
import java.io.Serializable;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
import java.util.concurrent.Callable;
import javax.cache.Cache;
-import javax.cache.integration.CacheLoaderException;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.CacheInterceptorAdapter;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.affinity.AffinityFunction;
import org.apache.ignite.cache.affinity.AffinityNodeIdHashResolver;
-import org.apache.ignite.cache.affinity.fair.FairAffinityFunction;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cache.eviction.EvictionFilter;
import org.apache.ignite.cache.eviction.fifo.FifoEvictionPolicy;
import org.apache.ignite.cache.eviction.lru.LruEvictionPolicy;
import org.apache.ignite.cache.eviction.random.RandomEvictionPolicy;
-import org.apache.ignite.cache.store.CacheStore;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DeploymentMode;
@@ -46,7 +40,6 @@ import org.apache.ignite.configuration.NearCacheConfiguration;
import org.apache.ignite.internal.util.typedef.C1;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.CU;
-import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.lang.IgniteCallable;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
@@ -54,7 +47,6 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.testframework.GridStringLogger;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
@@ -862,49 +854,9 @@ public class GridCacheConfigurationConsistencySelfTest extends GridCommonAbstrac
}, IgniteCheckedException.class, null);
}
- /** */
- private static class TestStore implements CacheStore<Object,Object> {
- /** {@inheritDoc} */
- @Nullable @Override public Object load(Object key) {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public void loadCache(IgniteBiInClosure<Object, Object> clo, @Nullable Object... args) {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public Map<Object, Object> loadAll(Iterable<?> keys) throws CacheLoaderException {
- return Collections.emptyMap();
- }
-
- /** {@inheritDoc} */
- @Override public void write(Cache.Entry<?, ?> entry) {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public void writeAll(Collection<Cache.Entry<?, ?>> entries) {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public void delete(Object key) {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public void deleteAll(Collection<?> keys) {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public void sessionEnd(boolean commit) {
- // No-op.
- }
- }
-
+ /**
+ *
+ */
private static class TestRendezvousAffinityFunction extends RendezvousAffinityFunction {
/**
* Empty constructor required by {@link Externalizable}.
@@ -941,6 +893,10 @@ public class GridCacheConfigurationConsistencySelfTest extends GridCommonAbstrac
// No-op, just different class.
}
+ /**
+ *
+ */
private static class TestCacheDefaultAffinityKeyMapper extends GridCacheDefaultAffinityKeyMapper {
+ // No-op, just different class.
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/83b2bf5e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNearLockValueSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNearLockValueSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNearLockValueSelfTest.java
index 100acfe..f106fec 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNearLockValueSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNearLockValueSelfTest.java
@@ -18,22 +18,15 @@
package org.apache.ignite.internal.processors.cache;
import java.util.Collection;
-import java.util.concurrent.ConcurrentLinkedDeque;
import org.apache.ignite.IgniteCache;
-import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.CacheAtomicityMode;
-import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.NearCacheConfiguration;
import org.apache.ignite.internal.IgniteKernal;
-import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockRequest;
-import org.apache.ignite.lang.IgniteInClosure;
-import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.spi.IgniteSpiException;
-import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
@@ -71,7 +64,11 @@ public class IgniteCacheNearLockValueSelfTest extends GridCommonAbstractTest {
if (getTestGridName(0).equals(gridName))
cfg.setClientMode(true);
- cfg.setCommunicationSpi(new TestCommunicationSpi());
+ TestRecordingCommunicationSpi commSpi = new TestRecordingCommunicationSpi();
+
+ commSpi.record(GridNearLockRequest.class);
+
+ cfg.setCommunicationSpi(commSpi);
return cfg;
}
@@ -88,18 +85,18 @@ public class IgniteCacheNearLockValueSelfTest extends GridCommonAbstractTest {
cache.put("key1", "val1");
for (int i = 0; i < 3; i++) {
- ((TestCommunicationSpi)ignite(0).configuration().getCommunicationSpi()).clear();
- ((TestCommunicationSpi)ignite(1).configuration().getCommunicationSpi()).clear();
-
try (Transaction tx = ignite(0).transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
cache.get("key1");
tx.commit();
}
- TestCommunicationSpi comm = (TestCommunicationSpi)ignite(0).configuration().getCommunicationSpi();
+ TestRecordingCommunicationSpi comm =
+ (TestRecordingCommunicationSpi)ignite(0).configuration().getCommunicationSpi();
+
+ Collection<GridNearLockRequest> reqs = (Collection)comm.recordedMessages();
- assertEquals(1, comm.requests().size());
+ assertEquals(1, reqs.size());
GridCacheAdapter<Object, Object> primary = ((IgniteKernal)grid(1)).internalCache("partitioned");
@@ -107,7 +104,7 @@ public class IgniteCacheNearLockValueSelfTest extends GridCommonAbstractTest {
assertNotNull(dhtEntry);
- GridNearLockRequest req = comm.requests().iterator().next();
+ GridNearLockRequest req = reqs.iterator().next();
assertEquals(dhtEntry.version(), req.dhtVersion(0));
@@ -122,39 +119,4 @@ public class IgniteCacheNearLockValueSelfTest extends GridCommonAbstractTest {
}
}
}
-
- /**
- *
- */
- private static class TestCommunicationSpi extends TcpCommunicationSpi {
- /** */
- private Collection<GridNearLockRequest> reqs = new ConcurrentLinkedDeque<>();
-
- /** {@inheritDoc} */
- @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC)
- throws IgniteSpiException {
- if (msg instanceof GridIoMessage) {
- GridIoMessage ioMsg = (GridIoMessage)msg;
-
- if (ioMsg.message() instanceof GridNearLockRequest)
- reqs.add((GridNearLockRequest)ioMsg.message());
- }
-
- super.sendMessage(node, msg, ackC);
- }
-
- /**
- * @return Collected requests.
- */
- public Collection<GridNearLockRequest> requests() {
- return reqs;
- }
-
- /**
- *
- */
- public void clear() {
- reqs.clear();
- }
- }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/83b2bf5e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheStoreCollectionTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheStoreCollectionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheStoreCollectionTest.java
index 57d57ca..48acdfc 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheStoreCollectionTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheStoreCollectionTest.java
@@ -22,29 +22,41 @@ import java.util.Collections;
import java.util.Map;
import java.util.Set;
import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
/**
*
*/
public class IgniteCacheStoreCollectionTest extends GridCommonAbstractTest {
+ /** */
+ private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
CacheConfiguration<Object, Object> ccfg1 = new CacheConfiguration<>();
ccfg1.setName("cache1");
ccfg1.setAtomicityMode(ATOMIC);
+ ccfg1.setWriteSynchronizationMode(FULL_SYNC);
CacheConfiguration<Object, Object> ccfg2 = new CacheConfiguration<>();
ccfg2.setName("cache2");
ccfg2.setAtomicityMode(TRANSACTIONAL);
+ ccfg2.setWriteSynchronizationMode(FULL_SYNC);
cfg.setCacheConfiguration(ccfg1, ccfg2);
http://git-wip-us.apache.org/repos/asf/ignite/blob/83b2bf5e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartNoExchangeTimeoutTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartNoExchangeTimeoutTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartNoExchangeTimeoutTest.java
index 9acc4b5..ac80d69 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartNoExchangeTimeoutTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartNoExchangeTimeoutTest.java
@@ -46,6 +46,7 @@ import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
/**
*
@@ -344,6 +345,7 @@ public class IgniteDynamicCacheStartNoExchangeTimeoutTest extends GridCommonAbst
ccfg.setName("cache-1");
ccfg.setAtomicityMode(ATOMIC);
ccfg.setBackups(0);
+ ccfg.setWriteSynchronizationMode(FULL_SYNC);
res.add(ccfg);
}
@@ -354,6 +356,7 @@ public class IgniteDynamicCacheStartNoExchangeTimeoutTest extends GridCommonAbst
ccfg.setName("cache-2");
ccfg.setAtomicityMode(ATOMIC);
ccfg.setBackups(1);
+ ccfg.setWriteSynchronizationMode(FULL_SYNC);
res.add(ccfg);
}
@@ -365,6 +368,7 @@ public class IgniteDynamicCacheStartNoExchangeTimeoutTest extends GridCommonAbst
ccfg.setAtomicityMode(ATOMIC);
ccfg.setBackups(1);
ccfg.setAffinity(new FairAffinityFunction());
+ ccfg.setWriteSynchronizationMode(FULL_SYNC);
res.add(ccfg);
}
@@ -375,6 +379,7 @@ public class IgniteDynamicCacheStartNoExchangeTimeoutTest extends GridCommonAbst
ccfg.setName("cache-4");
ccfg.setAtomicityMode(TRANSACTIONAL);
ccfg.setBackups(0);
+ ccfg.setWriteSynchronizationMode(FULL_SYNC);
res.add(ccfg);
}
@@ -385,6 +390,7 @@ public class IgniteDynamicCacheStartNoExchangeTimeoutTest extends GridCommonAbst
ccfg.setName("cache-5");
ccfg.setAtomicityMode(TRANSACTIONAL);
ccfg.setBackups(1);
+ ccfg.setWriteSynchronizationMode(FULL_SYNC);
res.add(ccfg);
}
@@ -396,6 +402,7 @@ public class IgniteDynamicCacheStartNoExchangeTimeoutTest extends GridCommonAbst
ccfg.setAtomicityMode(TRANSACTIONAL);
ccfg.setBackups(1);
ccfg.setAffinity(new FairAffinityFunction());
+ ccfg.setWriteSynchronizationMode(FULL_SYNC);
res.add(ccfg);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/83b2bf5e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePartitionNotLoadedEventSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePartitionNotLoadedEventSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePartitionNotLoadedEventSelfTest.java
index 5bc779c..6a42752 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePartitionNotLoadedEventSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePartitionNotLoadedEventSelfTest.java
@@ -22,7 +22,6 @@ import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.IgniteCache;
-import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.affinity.Affinity;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
@@ -42,6 +41,9 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.util.TestTcpCommunicationSpi;
import org.eclipse.jetty.util.ConcurrentHashSet;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
/**
*
*/
@@ -76,8 +78,9 @@ public class GridCachePartitionNotLoadedEventSelfTest extends GridCommonAbstract
CacheConfiguration<Integer, Integer> cacheCfg = new CacheConfiguration<>();
- cacheCfg.setCacheMode(CacheMode.PARTITIONED);
+ cacheCfg.setCacheMode(PARTITIONED);
cacheCfg.setBackups(backupCnt);
+ cacheCfg.setWriteSynchronizationMode(FULL_SYNC);
cfg.setCacheConfiguration(cacheCfg);
http://git-wip-us.apache.org/repos/asf/ignite/blob/83b2bf5e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheAtomicNodeRestartTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheAtomicNodeRestartTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheAtomicNodeRestartTest.java
index 327db0e..37ed866 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheAtomicNodeRestartTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheAtomicNodeRestartTest.java
@@ -31,10 +31,12 @@ public class IgniteCacheAtomicNodeRestartTest extends GridCachePartitionedNodeRe
return ATOMIC;
}
+ /** {@inheritDoc} */
@Override public void testRestartWithPutFourNodesNoBackups() {
fail("https://issues.apache.org/jira/browse/IGNITE-1587");
}
+ /** {@inheritDoc} */
@Override public void testRestartWithPutFourNodesOneBackupsOffheapTiered() {
fail("https://issues.apache.org/jira/browse/IGNITE-1587");
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/83b2bf5e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
index e7657a6..13f2598 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
@@ -2010,7 +2010,7 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
private List<Object> recordedMsgs = new ArrayList<>();
/** {@inheritDoc} */
- @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackClosure)
+ @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC)
throws IgniteSpiException {
if (msg instanceof GridIoMessage) {
Object msg0 = ((GridIoMessage)msg).message();
@@ -2032,7 +2032,7 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
}
}
- super.sendMessage(node, msg, ackClosure);
+ super.sendMessage(node, msg, ackC);
}
/**