You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by yz...@apache.org on 2015/05/27 18:13:37 UTC
[18/50] [abbrv] incubator-ignite git commit: # ignite-23
# ignite-23
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/e4e54bad
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/e4e54bad
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/e4e54bad
Branch: refs/heads/ignite-943
Commit: e4e54bad1481a64e6b270231158d2d299441eb9d
Parents: ff17caf
Author: sboikov <sb...@gridgain.com>
Authored: Mon May 25 10:00:46 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon May 25 11:12:32 2015 +0300
----------------------------------------------------------------------
.../dht/atomic/GridDhtAtomicCache.java | 8 +-
.../dht/atomic/GridDhtAtomicUpdateFuture.java | 10 +-
.../dht/atomic/GridNearAtomicUpdateFuture.java | 6 +-
.../dht/atomic/GridNearAtomicUpdateRequest.java | 112 ++++++-----
.../colocated/GridDhtColocatedLockFuture.java | 3 +
...niteCacheClientNodeChangingTopologyTest.java | 184 +++++++++++++++++--
6 files changed, 258 insertions(+), 65 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e4e54bad/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 7fe847a..c6a2bf7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1023,8 +1023,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
IgniteCacheExpiryPolicy expiry = null;
- boolean clientReq = false;
-
try {
// If batch store update is enabled, we need to lock all entries.
// First, need to acquire locks on cache entries, then check filter.
@@ -1052,13 +1050,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
return;
}
- clientReq = CU.clientNode(node);
-
// Do not check topology version for CLOCK versioning since
// partition exchange will wait for near update future.
// Also do not check topology version if topology was locked on near node by
// external transaction or explicit lock.
- if ((req.fastMap() && !clientReq) || req.topologyLocked() ||
+ if ((req.fastMap() && !req.clientRequest()) || req.topologyLocked() ||
!needRemap(req.topologyVersion(), topology().topologyVersion(), req.keys())) {
boolean hasNear = ctx.discovery().cacheNearNode(node, name());
@@ -1161,7 +1157,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
}
catch (GridDhtInvalidPartitionException ignore) {
- assert !req.fastMap() || clientReq : req;
+ assert !req.fastMap() || req.clientRequest() : req;
if (log.isDebugEnabled())
log.debug("Caught invalid partition exception for cache entry (will remap update request): " + req);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e4e54bad/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 40ab104..ff8454e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -86,6 +86,9 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
/** Future keys. */
private Collection<KeyCacheObject> keys;
+ /** */
+ private boolean waitForExchange;
+
/**
* @param cctx Cache context.
* @param completionCb Callback to invoke when future is completed.
@@ -113,6 +116,10 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
log = U.logger(cctx.kernalContext(), logRef, GridDhtAtomicUpdateFuture.class);
keys = new ArrayList<>(updateReq.keys().size());
+
+ boolean topLocked = updateReq.topologyLocked() || (updateReq.fastMap() && !updateReq.clientRequest());
+
+ waitForExchange = !topLocked;
}
/** {@inheritDoc} */
@@ -164,8 +171,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
/** {@inheritDoc} */
@Override public boolean waitForPartitionExchange() {
- // Wait dht update futures in PRIMARY mode.
- return cctx.config().getAtomicWriteOrderMode() == CacheAtomicWriteOrderMode.PRIMARY;
+ return waitForExchange;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e4e54bad/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 82659ca..50c3d56 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -614,7 +614,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
filter,
subjId,
taskNameHash,
- skipStore);
+ skipStore,
+ cctx.kernalContext().clientNode());
req.addUpdateEntry(cacheKey,
val,
@@ -755,7 +756,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
filter,
subjId,
taskNameHash,
- skipStore);
+ skipStore,
+ cctx.kernalContext().clientNode());
pendingMappings.put(nodeId, mapped);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e4e54bad/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
index a96a666..86c5ab8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
@@ -132,6 +132,9 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
/** Skip write-through to a persistent storage. */
private boolean skipStore;
+ /** */
+ private boolean clientReq;
+
/**
* Empty constructor required by {@link Externalizable}.
*/
@@ -148,6 +151,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
* @param fastMap Fast map scheme flag.
* @param updateVer Update version set if fast map is performed.
* @param topVer Topology version.
+ * @param topLocked Topology locked flag.
* @param syncMode Synchronization mode.
* @param op Cache update operation.
* @param retval Return value required flag.
@@ -157,6 +161,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
* @param subjId Subject ID.
* @param taskNameHash Task name hash code.
* @param skipStore Skip write-through to a persistent storage.
+ * @param clientReq Client node request flag.
*/
public GridNearAtomicUpdateRequest(
int cacheId,
@@ -174,7 +179,8 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
@Nullable CacheEntryPredicate[] filter,
@Nullable UUID subjId,
int taskNameHash,
- boolean skipStore
+ boolean skipStore,
+ boolean clientReq
) {
this.cacheId = cacheId;
this.nodeId = nodeId;
@@ -193,6 +199,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
this.subjId = subjId;
this.taskNameHash = taskNameHash;
this.skipStore = skipStore;
+ this.clientReq = clientReq;
keys = new ArrayList<>();
}
@@ -266,6 +273,13 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
}
/**
+ * @return {@code True} if request sent from client node.
+ */
+ public boolean clientRequest() {
+ return clientReq;
+ }
+
+ /**
* @return Cache write synchronization mode.
*/
public CacheWriteSynchronizationMode writeSynchronizationMode() {
@@ -574,126 +588,132 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
switch (writer.state()) {
case 3:
- if (!writer.writeMessage("conflictExpireTimes", conflictExpireTimes))
+ if (!writer.writeBoolean("clientReq", clientReq))
return false;
writer.incrementState();
case 4:
- if (!writer.writeMessage("conflictTtls", conflictTtls))
+ if (!writer.writeMessage("conflictExpireTimes", conflictExpireTimes))
return false;
writer.incrementState();
case 5:
- if (!writer.writeCollection("conflictVers", conflictVers, MessageCollectionItemType.MSG))
+ if (!writer.writeMessage("conflictTtls", conflictTtls))
return false;
writer.incrementState();
case 6:
- if (!writer.writeCollection("entryProcessorsBytes", entryProcessorsBytes, MessageCollectionItemType.BYTE_ARR))
+ if (!writer.writeCollection("conflictVers", conflictVers, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
case 7:
- if (!writer.writeByteArray("expiryPlcBytes", expiryPlcBytes))
+ if (!writer.writeCollection("entryProcessorsBytes", entryProcessorsBytes, MessageCollectionItemType.BYTE_ARR))
return false;
writer.incrementState();
case 8:
- if (!writer.writeBoolean("fastMap", fastMap))
+ if (!writer.writeByteArray("expiryPlcBytes", expiryPlcBytes))
return false;
writer.incrementState();
case 9:
- if (!writer.writeObjectArray("filter", filter, MessageCollectionItemType.MSG))
+ if (!writer.writeBoolean("fastMap", fastMap))
return false;
writer.incrementState();
case 10:
- if (!writer.writeMessage("futVer", futVer))
+ if (!writer.writeObjectArray("filter", filter, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
case 11:
- if (!writer.writeBoolean("hasPrimary", hasPrimary))
+ if (!writer.writeMessage("futVer", futVer))
return false;
writer.incrementState();
case 12:
- if (!writer.writeObjectArray("invokeArgsBytes", invokeArgsBytes, MessageCollectionItemType.BYTE_ARR))
+ if (!writer.writeBoolean("hasPrimary", hasPrimary))
return false;
writer.incrementState();
case 13:
- if (!writer.writeCollection("keys", keys, MessageCollectionItemType.MSG))
+ if (!writer.writeObjectArray("invokeArgsBytes", invokeArgsBytes, MessageCollectionItemType.BYTE_ARR))
return false;
writer.incrementState();
case 14:
- if (!writer.writeByte("op", op != null ? (byte)op.ordinal() : -1))
+ if (!writer.writeCollection("keys", keys, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
case 15:
- if (!writer.writeBoolean("retval", retval))
+ if (!writer.writeByte("op", op != null ? (byte)op.ordinal() : -1))
return false;
writer.incrementState();
case 16:
- if (!writer.writeBoolean("skipStore", skipStore))
+ if (!writer.writeBoolean("retval", retval))
return false;
writer.incrementState();
case 17:
- if (!writer.writeUuid("subjId", subjId))
+ if (!writer.writeBoolean("skipStore", skipStore))
return false;
writer.incrementState();
case 18:
- if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1))
+ if (!writer.writeUuid("subjId", subjId))
return false;
writer.incrementState();
case 19:
- if (!writer.writeInt("taskNameHash", taskNameHash))
+ if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1))
return false;
writer.incrementState();
case 20:
- if (!writer.writeBoolean("topLocked", topLocked))
+ if (!writer.writeInt("taskNameHash", taskNameHash))
return false;
writer.incrementState();
case 21:
- if (!writer.writeMessage("topVer", topVer))
+ if (!writer.writeBoolean("topLocked", topLocked))
return false;
writer.incrementState();
case 22:
- if (!writer.writeMessage("updateVer", updateVer))
+ if (!writer.writeMessage("topVer", topVer))
return false;
writer.incrementState();
case 23:
+ if (!writer.writeMessage("updateVer", updateVer))
+ return false;
+
+ writer.incrementState();
+
+ case 24:
if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG))
return false;
@@ -716,7 +736,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
switch (reader.state()) {
case 3:
- conflictExpireTimes = reader.readMessage("conflictExpireTimes");
+ clientReq = reader.readBoolean("clientReq");
if (!reader.isLastRead())
return false;
@@ -724,7 +744,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
reader.incrementState();
case 4:
- conflictTtls = reader.readMessage("conflictTtls");
+ conflictExpireTimes = reader.readMessage("conflictExpireTimes");
if (!reader.isLastRead())
return false;
@@ -732,7 +752,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
reader.incrementState();
case 5:
- conflictVers = reader.readCollection("conflictVers", MessageCollectionItemType.MSG);
+ conflictTtls = reader.readMessage("conflictTtls");
if (!reader.isLastRead())
return false;
@@ -740,7 +760,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
reader.incrementState();
case 6:
- entryProcessorsBytes = reader.readCollection("entryProcessorsBytes", MessageCollectionItemType.BYTE_ARR);
+ conflictVers = reader.readCollection("conflictVers", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
return false;
@@ -748,7 +768,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
reader.incrementState();
case 7:
- expiryPlcBytes = reader.readByteArray("expiryPlcBytes");
+ entryProcessorsBytes = reader.readCollection("entryProcessorsBytes", MessageCollectionItemType.BYTE_ARR);
if (!reader.isLastRead())
return false;
@@ -756,7 +776,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
reader.incrementState();
case 8:
- fastMap = reader.readBoolean("fastMap");
+ expiryPlcBytes = reader.readByteArray("expiryPlcBytes");
if (!reader.isLastRead())
return false;
@@ -764,7 +784,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
reader.incrementState();
case 9:
- filter = reader.readObjectArray("filter", MessageCollectionItemType.MSG, CacheEntryPredicate.class);
+ fastMap = reader.readBoolean("fastMap");
if (!reader.isLastRead())
return false;
@@ -772,7 +792,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
reader.incrementState();
case 10:
- futVer = reader.readMessage("futVer");
+ filter = reader.readObjectArray("filter", MessageCollectionItemType.MSG, CacheEntryPredicate.class);
if (!reader.isLastRead())
return false;
@@ -780,7 +800,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
reader.incrementState();
case 11:
- hasPrimary = reader.readBoolean("hasPrimary");
+ futVer = reader.readMessage("futVer");
if (!reader.isLastRead())
return false;
@@ -788,7 +808,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
reader.incrementState();
case 12:
- invokeArgsBytes = reader.readObjectArray("invokeArgsBytes", MessageCollectionItemType.BYTE_ARR, byte[].class);
+ hasPrimary = reader.readBoolean("hasPrimary");
if (!reader.isLastRead())
return false;
@@ -796,7 +816,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
reader.incrementState();
case 13:
- keys = reader.readCollection("keys", MessageCollectionItemType.MSG);
+ invokeArgsBytes = reader.readObjectArray("invokeArgsBytes", MessageCollectionItemType.BYTE_ARR, byte[].class);
if (!reader.isLastRead())
return false;
@@ -804,6 +824,14 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
reader.incrementState();
case 14:
+ keys = reader.readCollection("keys", MessageCollectionItemType.MSG);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 15:
byte opOrd;
opOrd = reader.readByte("op");
@@ -815,7 +843,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
reader.incrementState();
- case 15:
+ case 16:
retval = reader.readBoolean("retval");
if (!reader.isLastRead())
@@ -823,7 +851,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
reader.incrementState();
- case 16:
+ case 17:
skipStore = reader.readBoolean("skipStore");
if (!reader.isLastRead())
@@ -831,7 +859,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
reader.incrementState();
- case 17:
+ case 18:
subjId = reader.readUuid("subjId");
if (!reader.isLastRead())
@@ -839,7 +867,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
reader.incrementState();
- case 18:
+ case 19:
byte syncModeOrd;
syncModeOrd = reader.readByte("syncMode");
@@ -851,7 +879,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
reader.incrementState();
- case 19:
+ case 20:
taskNameHash = reader.readInt("taskNameHash");
if (!reader.isLastRead())
@@ -859,7 +887,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
reader.incrementState();
- case 20:
+ case 21:
topLocked = reader.readBoolean("topLocked");
if (!reader.isLastRead())
@@ -867,7 +895,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
reader.incrementState();
- case 21:
+ case 22:
topVer = reader.readMessage("topVer");
if (!reader.isLastRead())
@@ -875,7 +903,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
reader.incrementState();
- case 22:
+ case 23:
updateVer = reader.readMessage("updateVer");
if (!reader.isLastRead())
@@ -883,7 +911,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
reader.incrementState();
- case 23:
+ case 24:
vals = reader.readCollection("vals", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
@@ -903,7 +931,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 24;
+ return 25;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e4e54bad/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index 500495a..cc8f064 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -341,6 +341,9 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
else if (log.isDebugEnabled())
log.debug("Transaction was not marked rollback-only while locks were not acquired: " + tx);
}
+
+ for (KeyCacheObject key : GridDhtColocatedLockFuture.this.keys)
+ cctx.mvcc().removeExplicitLock(threadId, key, lockVer);
}
cctx.mvcc().recheckPendingLocks();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e4e54bad/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
index c01ef6f..4603aaf 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
@@ -24,8 +24,11 @@ import org.apache.ignite.cluster.*;
import org.apache.ignite.configuration.*;
import org.apache.ignite.internal.*;
import org.apache.ignite.internal.managers.communication.*;
+import org.apache.ignite.internal.processors.affinity.*;
+import org.apache.ignite.internal.processors.cache.*;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.*;
import org.apache.ignite.internal.processors.cache.distributed.near.*;
+import org.apache.ignite.internal.processors.cache.version.*;
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.plugin.extensions.communication.*;
@@ -38,6 +41,7 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
import org.apache.ignite.testframework.*;
import org.apache.ignite.testframework.junits.common.*;
import org.apache.ignite.transactions.*;
+import org.eclipse.jetty.util.*;
import org.jetbrains.annotations.*;
import javax.cache.*;
@@ -96,21 +100,53 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
* @throws Exception If failed.
*/
public void testAtomicPutAllClockMode() throws Exception {
- atomicPutAll(CLOCK);
+ atomicPut(CLOCK, true, null);
}
/**
* @throws Exception If failed.
*/
public void testAtomicPutAllPrimaryMode() throws Exception {
- atomicPutAll(PRIMARY);
+ atomicPut(PRIMARY, true, null);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAtomicPutAllNearEnabledClockMode() throws Exception {
+ atomicPut(CLOCK, true, new NearCacheConfiguration());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAtomicPutAllNearEnabledPrimaryMode() throws Exception {
+ atomicPut(PRIMARY, true, new NearCacheConfiguration());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAtomicPutClockMode() throws Exception {
+ atomicPut(CLOCK, false, null);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAtomicPutPrimaryMode() throws Exception {
+ atomicPut(PRIMARY, false, null);
}
/**
* @param writeOrder Write order.
+ * @param putAll If {@code true} executes putAll.
+ * @param nearCfg Near cache configuration.
* @throws Exception If failed.
*/
- private void atomicPutAll(CacheAtomicWriteOrderMode writeOrder) throws Exception {
+ private void atomicPut(CacheAtomicWriteOrderMode writeOrder,
+ final boolean putAll,
+ @Nullable NearCacheConfiguration nearCfg) throws Exception {
ccfg = new CacheConfiguration();
ccfg.setCacheMode(PARTITIONED);
@@ -123,15 +159,21 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
IgniteEx ignite0 = startGrid(0);
IgniteEx ignite1 = startGrid(1);
+ ccfg.setNearConfiguration(nearCfg);
+
client = true;
+ ccfg.setNearConfiguration(null);
+
Ignite ignite2 = startGrid(2);
assertTrue(ignite2.configuration().isClientMode());
final Map<Integer, Integer> map = new HashMap<>();
- for (int i = 0; i < 100; i++)
+ final int KEYS = putAll ? 100 : 1;
+
+ for (int i = 0; i < KEYS; i++)
map.put(i, i);
TestCommunicationSpi spi = (TestCommunicationSpi)ignite2.configuration().getCommunicationSpi();
@@ -148,7 +190,10 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
@Override public Object call() throws Exception {
Thread.currentThread().setName("put-thread");
- cache.putAll(map);
+ if (putAll)
+ cache.putAll(map);
+ else
+ cache.put(0, 0);
return null;
}
@@ -172,7 +217,7 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
map.clear();
- for (int i = 0; i < 100; i++)
+ for (int i = 0; i < KEYS; i++)
map.put(i, i + 1);
// Block messages requests for single node.
@@ -182,7 +227,10 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
@Override public Object call() throws Exception {
Thread.currentThread().setName("put-thread");
- cache.putAll(map);
+ if (putAll)
+ cache.putAll(map);
+ else
+ cache.put(0, 1);
return null;
}
@@ -202,10 +250,13 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
checkData(map, cache, 4);
- for (int i = 0; i < 100; i++)
+ for (int i = 0; i < KEYS; i++)
map.put(i, i + 2);
- cache.putAll(map);
+ if (putAll)
+ cache.putAll(map);
+ else
+ cache.put(0, 2);
checkData(map, cache, 4);
}
@@ -401,7 +452,7 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
/**
* @throws Exception If failed.
*/
- public void _testLock() throws Exception {
+ public void testLock() throws Exception {
lock(null);
}
@@ -740,6 +791,72 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
}
/**
+ * @throws Exception If failed.
+ */
+ public void testLockFromClientBlocksExchange() throws Exception {
+ ccfg = new CacheConfiguration();
+
+ ccfg.setCacheMode(PARTITIONED);
+ ccfg.setBackups(1);
+ ccfg.setAtomicityMode(TRANSACTIONAL);
+ ccfg.setWriteSynchronizationMode(FULL_SYNC);
+ ccfg.setRebalanceMode(SYNC);
+
+ startGrid(0);
+ startGrid(1);
+
+ client = true;
+
+ Ignite ignite2 = startGrid(2);
+
+ IgniteCache<Integer, Integer> cache = ignite2.cache(null);
+
+ Lock lock = cache.lock(0);
+
+ lock.lock();
+
+ IgniteInternalFuture<?> startFut = GridTestUtils.runAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ client = false;
+
+ startGrid(3);
+
+ return null;
+ }
+ });
+
+ U.sleep(2000);
+
+ assertFalse(startFut.isDone());
+
+ AffinityTopologyVersion ver = new AffinityTopologyVersion(4);
+
+ List<IgniteInternalFuture<?>> futs = new ArrayList<>();
+
+ U.sleep(2000);
+
+ for (int i = 0; i < 3; i++) {
+ Ignite ignite = ignite(i);
+
+ IgniteInternalFuture<?> fut =
+ ((IgniteKernal)ignite).context().cache().context().exchange().affinityReadyFuture(ver);
+
+ assertNotNull(fut);
+
+ assertFalse(fut.isDone());
+
+ futs.add(fut);
+ }
+
+ lock.unlock();
+
+ for (IgniteInternalFuture<?> fut : futs)
+ fut.get(10_000);
+
+ startFut.get(10_000);
+ }
+
+ /**
* @param map Expected data.
* @param clientCache Client cache.
* @param expNodes Expected nodes number.
@@ -764,11 +881,35 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
for (Map.Entry<Integer, Integer> e : map.entrySet()) {
Integer key = e.getKey();
+ GridCacheVersion ver = null;
+
for (Ignite node : nodes) {
IgniteCache<Integer, Integer> cache = node.cache(null);
- if (aff.isPrimaryOrBackup(node.cluster().localNode(), key) || node == nearCacheNode)
+ boolean affNode = aff.isPrimaryOrBackup(node.cluster().localNode(), key);
+
+ if (affNode || node == nearCacheNode) {
assertEquals("Unexpected value for " + node.name(), e.getValue(), cache.localPeek(key));
+
+ GridCacheAdapter cache0 = ((IgniteKernal)node).internalCache(null);
+
+ if (affNode && cache0.isNear())
+ cache0 = ((GridNearCacheAdapter)cache0).dht();
+
+ GridCacheEntryEx entry = cache0.peekEx(key);
+
+ assertNotNull("No entry [node=" + node.name() + ", key=" + key + ']', entry);
+
+ GridCacheVersion ver0 = entry instanceof GridNearCacheEntry ?
+ ((GridNearCacheEntry)entry).dhtVersion() : entry.version();
+
+ assertNotNull("Null version [node=" + node.name() + ", key=" + key + ']', ver0);
+
+ if (ver == null)
+ ver = ver0;
+ else
+ assertEquals(ver0, ver);
+ }
else
assertNull("Unexpected non-null value for " + node.name(), cache.localPeek(key));
}
@@ -779,6 +920,9 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
return false;
}
+ catch (Exception e) {
+ fail("Unexpected exception: " + e);
+ }
return true;
}
@@ -860,6 +1004,8 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
final int THREADS = CLIENT_CNT * 3;
+ final ConcurrentHashSet<Integer> putKeys = new ConcurrentHashSet<>();
+
try {
GridTestUtils.runMultiThreadedAsync(new Callable<Object>() {
@Override public Object call() throws Exception {
@@ -887,8 +1033,11 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
while (!stop.get()) {
TreeMap<Integer, Integer> map = new TreeMap<>();
- for (int i = 0; i < 100; i++)
- map.put(rnd.nextInt(0, 1000), i);
+ for (int i = 0; i < 100; i++) {
+ Integer key = rnd.nextInt(0, 1000);
+
+ map.put(key, key);
+ }
try {
if (useTx) {
@@ -904,6 +1053,8 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
}
else
cache.putAll(map);
+
+ putKeys.addAll(map.keySet());
}
catch (CacheException | IgniteException e) {
log.info("Update failed, ignore: " + e);
@@ -1002,6 +1153,13 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
for (IgniteInternalFuture<?> fut : futs)
fut.get();
+
+ Map<Integer, Integer> map = new HashMap<>();
+
+ for (Integer key : putKeys)
+ map.put(key, key);
+
+ checkData(map, grid(SRV_CNT).cache(null), SRV_CNT + CLIENT_CNT);
}
/**