You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/05/29 14:46:24 UTC
[17/50] incubator-ignite git commit: # ignite-23
# ignite-23
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/00eadd62
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/00eadd62
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/00eadd62
Branch: refs/heads/ignite-929
Commit: 00eadd62ea451922e9fe3396085f5f003f36c32e
Parents: e4e54ba
Author: sboikov <sb...@gridgain.com>
Authored: Mon May 25 13:34:13 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon May 25 13:34:13 2015 +0300
----------------------------------------------------------------------
.../dht/atomic/GridNearAtomicUpdateFuture.java | 49 +++++++++---
...niteCacheClientNodeChangingTopologyTest.java | 78 ++++++++++++++++++++
2 files changed, 115 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00eadd62/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 50c3d56..64a4882 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -143,6 +143,9 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
/** Skip store flag. */
private final boolean skipStore;
+ /** */
+ private boolean fastMapRemap;
+
/**
* @param cctx Cache context.
* @param cache Cache instance.
@@ -345,7 +348,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
if (res.remapKeys() != null) {
assert !fastMap || cctx.kernalContext().clientNode();
- Collection<?> remapKeys = fastMap && cctx.kernalContext().clientNode() ? null : res.remapKeys();
+ Collection<KeyCacheObject> remapKeys = fastMap ? null : res.remapKeys();
mapOnTopology(remapKeys, true, nodeId, true);
@@ -456,8 +459,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
else {
if (waitTopFut) {
fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
- @Override
- public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
+ @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
mapOnTopology(keys, remap, oldNodeId, waitTopFut);
}
});
@@ -478,15 +480,29 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
/**
* Checks if future is ready to be completed.
*/
- private synchronized void checkComplete() {
- if ((syncMode == FULL_ASYNC && cctx.config().getAtomicWriteOrderMode() == PRIMARY) || mappings.isEmpty()) {
- CachePartialUpdateCheckedException err0 = err;
+ private void checkComplete() {
+ boolean remap = false;
- if (err0 != null)
- onDone(err0);
- else
- onDone(opRes);
+ synchronized (this) {
+ if ((syncMode == FULL_ASYNC && cctx.config().getAtomicWriteOrderMode() == PRIMARY) || mappings.isEmpty()) {
+ CachePartialUpdateCheckedException err0 = err;
+
+ if (err0 != null)
+ onDone(err0);
+ else {
+ if (fastMapRemap) {
+ assert cctx.kernalContext().clientNode();
+
+ remap = true;
+ }
+ else
+ onDone(opRes);
+ }
+ }
}
+
+ if (remap)
+ mapOnTopology(null, true, null, true);
}
/**
@@ -500,7 +516,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
@Nullable Collection<?> remapKeys,
boolean remap,
@Nullable UUID oldNodeId) {
- assert oldNodeId == null || remap;
+ assert oldNodeId == null || remap || fastMapRemap;
Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer);
@@ -652,9 +668,16 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
// Must do this in synchronized block because we need to atomically remove and add mapping.
// Otherwise checkComplete() may see empty intermediate state.
synchronized (this) {
- if (remap)
+ if (oldNodeId != null)
removeMapping(oldNodeId);
+ // For fastMap mode wait for all responses before remapping.
+ if (remap && fastMap && !mappings.isEmpty()) {
+ fastMapRemap = true;
+
+ return;
+ }
+
// Create mappings first, then send messages.
for (Object key : keys) {
if (key == null) {
@@ -772,6 +795,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
i++;
}
}
+
+ fastMapRemap = false;
}
if ((single == null || single) && pendingMappings.size() == 1) {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00eadd62/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
index 4603aaf..5a5a648 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
@@ -264,6 +264,84 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
/**
* @throws Exception If failed.
*/
+ public void testAtomicGetAndPutClockMode() throws Exception {
+ atomicGetAndPut(CLOCK);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAtomicGetAndPutPrimaryMode() throws Exception {
+ atomicGetAndPut(PRIMARY);
+ }
+
+ /**
+ * @param writeOrder Write order.
+ * @throws Exception If failed.
+ */
+ private void atomicGetAndPut(CacheAtomicWriteOrderMode writeOrder) throws Exception {
+ ccfg = new CacheConfiguration();
+
+ ccfg.setCacheMode(PARTITIONED);
+ ccfg.setBackups(1);
+ ccfg.setAtomicityMode(ATOMIC);
+ ccfg.setWriteSynchronizationMode(FULL_SYNC);
+ ccfg.setAtomicWriteOrderMode(writeOrder);
+ ccfg.setRebalanceMode(SYNC);
+
+ IgniteEx ignite0 = startGrid(0);
+ IgniteEx ignite1 = startGrid(1);
+
+ client = true;
+
+ ignite0.cache(null).put(0, 0);
+
+ Ignite ignite2 = startGrid(2);
+
+ assertTrue(ignite2.configuration().isClientMode());
+
+ final Map<Integer, Integer> map = new HashMap<>();
+
+ map.put(0, 1);
+
+ TestCommunicationSpi spi = (TestCommunicationSpi)ignite2.configuration().getCommunicationSpi();
+
+ // Block messages requests for both nodes.
+ spi.blockMessages(GridNearAtomicUpdateRequest.class, ignite0.localNode().id());
+ spi.blockMessages(GridNearAtomicUpdateRequest.class, ignite1.localNode().id());
+
+ final IgniteCache<Integer, Integer> cache = ignite2.cache(null);
+
+ assertEquals(writeOrder, cache.getConfiguration(CacheConfiguration.class).getAtomicWriteOrderMode());
+
+ IgniteInternalFuture<Integer> putFut = GridTestUtils.runAsync(new Callable<Integer>() {
+ @Override public Integer call() throws Exception {
+ Thread.currentThread().setName("put-thread");
+
+ return cache.getAndPut(0, 1);
+ }
+ });
+
+ assertFalse(putFut.isDone());
+
+ client = false;
+
+ startGrid(3);
+
+ log.info("Stop block.");
+
+ spi.stopBlock();
+
+ Integer old = putFut.get();
+
+ checkData(map, cache, 4);
+
+ assertEquals((Object)0, old);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testTxPutAll() throws Exception {
ccfg = new CacheConfiguration();