You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2015/12/15 12:44:12 UTC
[15/16] ignite git commit: ignite-1.5 Fixed
GridNearAtomicUpdateFuture to do not complete future before near cache is
updated. Several test fixes.
ignite-1.5 Fixed GridNearAtomicUpdateFuture to do not complete future before near cache is updated. Several test fixes.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/96feee9f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/96feee9f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/96feee9f
Branch: refs/heads/ignite-1.5.1
Commit: 96feee9f3db9fc792bb1cd7f9d4a72aa28774783
Parents: e45f8ae
Author: sboikov <sb...@gridgain.com>
Authored: Tue Dec 15 14:40:21 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Dec 15 14:40:21 2015 +0300
----------------------------------------------------------------------
.../CacheDataStructuresManager.java | 5 +-
.../distributed/dht/GridDhtCacheAdapter.java | 7 +-
.../dht/GridPartitionedSingleGetFuture.java | 7 +-
.../dht/atomic/GridDhtAtomicUpdateRequest.java | 10 +-
.../dht/atomic/GridNearAtomicUpdateFuture.java | 36 +++-
.../dht/atomic/GridNearAtomicUpdateRequest.java | 25 +++
.../distributed/near/GridNearAtomicCache.java | 3 +
.../CacheSerializableTransactionsTest.java | 8 +
.../cache/GridCacheAbstractFullApiSelfTest.java | 8 +
...IgniteCacheAtomicPutAllFailoverSelfTest.java | 1 +
...gniteAtomicLongChangingTopologySelfTest.java | 8 +-
...omicMultiNodeP2PDisabledFullApiSelfTest.java | 5 -
...ledFairAffinityMultiNodeFullApiSelfTest.java | 5 -
.../near/NearCacheSyncUpdateTest.java | 167 +++++++++++++++++++
...dTcpCommunicationSpiRecoveryAckSelfTest.java | 30 +++-
...CommunicationRecoveryAckClosureSelfTest.java | 39 +++--
.../testsuites/IgniteCacheTestSuite2.java | 2 +
.../IgniteSpiDiscoverySelfTestSuite.java | 6 +
18 files changed, 322 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/96feee9f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
index f56cbf8..47c3dd9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
@@ -39,6 +39,7 @@ import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.cluster.ClusterTopologyException;
import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager;
@@ -493,7 +494,7 @@ public class CacheDataStructuresManager extends GridCacheManagerAdapter {
true).get();
}
catch (IgniteCheckedException e) {
- if (e.hasCause(ClusterTopologyException.class)) {
+ if (e.hasCause(ClusterTopologyCheckedException.class)) {
if (log.isDebugEnabled())
log.debug("RemoveSetData job failed, will retry: " + e);
@@ -516,7 +517,7 @@ public class CacheDataStructuresManager extends GridCacheManagerAdapter {
true).get();
}
catch (IgniteCheckedException e) {
- if (e.hasCause(ClusterTopologyException.class)) {
+ if (e.hasCause(ClusterTopologyCheckedException.class)) {
if (log.isDebugEnabled())
log.debug("RemoveSetData job failed, will retry: " + e);
http://git-wip-us.apache.org/repos/asf/ignite/blob/96feee9f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index 9199e70..9cf8084 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -775,9 +775,14 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
res.setContainsValue();
}
else {
+ AffinityTopologyVersion topVer = ctx.shared().exchange().readyAffinityVersion();
+
+ assert topVer.compareTo(req.topologyVersion()) >= 0 : "Wrong ready topology version for " +
+ "invalid partitions response [topVer=" + topVer + ", req=" + req + ']';
+
res = new GridNearSingleGetResponse(ctx.cacheId(),
req.futureId(),
- ctx.shared().exchange().readyAffinityVersion(),
+ topVer,
null,
true,
req.addDeploymentInfo());
http://git-wip-us.apache.org/repos/asf/ignite/blob/96feee9f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
index f3f225a..5d0814f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
@@ -191,8 +191,6 @@ public class GridPartitionedSingleGetFuture extends GridFutureAdapter<Object> im
*/
@SuppressWarnings("unchecked")
private void map(AffinityTopologyVersion topVer) {
- this.topVer = topVer;
-
ClusterNode node = mapKeyToNode(topVer);
if (node == null) {
@@ -250,6 +248,9 @@ public class GridPartitionedSingleGetFuture extends GridFutureAdapter<Object> im
}
else {
synchronized (this) {
+ assert this.node == null;
+
+ this.topVer = topVer;
this.node = node;
}
@@ -325,7 +326,7 @@ public class GridPartitionedSingleGetFuture extends GridFutureAdapter<Object> im
GridDhtCacheAdapter colocated = cctx.dht();
while (true) {
- GridCacheEntryEx entry = null;
+ GridCacheEntryEx entry;
try {
entry = colocated.context().isSwapOrOffheapEnabled() ? colocated.entryEx(key) :
http://git-wip-us.apache.org/repos/asf/ignite/blob/96feee9f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
index 1f4cb6a..7bee5a3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
@@ -151,11 +151,13 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
@GridDirectTransient
private boolean onRes;
+ /** */
@GridDirectTransient
private List<Integer> partIds;
+ /** */
@GridDirectTransient
- private List<CacheObject> localPrevVals;
+ private List<CacheObject> locPrevVals;
/** Keep binary flag. */
private boolean keepBinary;
@@ -213,7 +215,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
keys = new ArrayList<>();
partIds = new ArrayList<>();
- localPrevVals = new ArrayList<>();
+ locPrevVals = new ArrayList<>();
if (forceTransformBackups) {
entryProcessors = new ArrayList<>();
@@ -254,7 +256,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
partIds.add(partId);
- localPrevVals.add(prevVal);
+ locPrevVals.add(prevVal);
if (forceTransformBackups) {
assert entryProcessor != null;
@@ -519,7 +521,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
* @return Value.
*/
@Nullable public CacheObject localPreviousValue(int idx) {
- return localPrevVals.get(idx);
+ return locPrevVals.get(idx);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/96feee9f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index ba3d546..b384bab 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -167,6 +167,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
* @param subjId Subject ID.
* @param taskNameHash Task name hash code.
* @param skipStore Skip store flag.
+ * @param keepBinary Keep binary flag.
* @param remapCnt Maximum number of retries.
* @param waitTopFut If {@code false} does not wait for affinity change future.
*/
@@ -359,7 +360,9 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
* @param res Update response.
*/
private void updateNear(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) {
- if (!nearEnabled || !req.hasPrimary())
+ assert nearEnabled;
+
+ if (res.remapKeys() != null || !req.hasPrimary())
return;
GridNearAtomicCache near = (GridNearAtomicCache)cctx.dht().near();
@@ -544,6 +547,9 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
@GridToStringInclude
private Map<UUID, GridNearAtomicUpdateRequest> mappings;
+ /** */
+ private int resCnt;
+
/** Error. */
private CachePartialUpdateCheckedException err;
@@ -583,7 +589,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
else
req = mappings != null ? mappings.get(nodeId) : null;
- if (req != null) {
+ if (req != null && req.response() == null) {
res = new GridNearAtomicUpdateResponse(cctx.cacheId(), nodeId, req.futureVersion(),
cctx.deploymentEnabled());
@@ -632,10 +638,13 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
rcvAll = true;
}
else {
- req = mappings != null ? mappings.remove(nodeId) : null;
+ req = mappings != null ? mappings.get(nodeId) : null;
+
+ if (req != null && req.onResponse(res)) {
+ resCnt++;
- if (req != null)
- rcvAll = mappings.isEmpty();
+ rcvAll = mappings.size() == resCnt;
+ }
else
return;
}
@@ -731,8 +740,19 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
return;
}
- if (!nodeErr && res.remapKeys() == null)
- updateNear(req, res);
+ if (rcvAll && nearEnabled) {
+ if (mappings != null) {
+ for (GridNearAtomicUpdateRequest req0 : mappings.values()) {
+ GridNearAtomicUpdateResponse res0 = req0.response();
+
+ assert res0 != null : req0;
+
+ updateNear(req0, res0);
+ }
+ }
+ else if (!nodeErr)
+ updateNear(req, res);
+ }
if (remapTopVer != null) {
if (fut0 != null)
@@ -828,6 +848,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
assert futVer == null : this;
assert this.topVer == AffinityTopologyVersion.ZERO : this;
+ resCnt = 0;
+
this.topVer = topVer;
futVer = cctx.versions().next(topVer);
http://git-wip-us.apache.org/repos/asf/ignite/blob/96feee9f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
index c24ad34..7c0aba5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
@@ -154,6 +154,10 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
/** Keep binary flag. */
private boolean keepBinary;
+ /** */
+ @GridDirectTransient
+ private GridNearAtomicUpdateResponse res;
+
/**
* Empty constructor required by {@link Externalizable}.
*/
@@ -544,6 +548,27 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
return hasPrimary;
}
+ /**
+ * @param res Response.
+ * @return {@code True} if current response was {@code null}.
+ */
+ public boolean onResponse(GridNearAtomicUpdateResponse res) {
+ if (this.res == null) {
+ this.res = res;
+
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
+ * @return Response.
+ */
+ @Nullable public GridNearAtomicUpdateResponse response() {
+ return res;
+ }
+
/** {@inheritDoc}
* @param ctx*/
@Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
http://git-wip-us.apache.org/repos/asf/ignite/blob/96feee9f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
index 06898cd..a2d5adb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
@@ -130,6 +130,9 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
GridNearAtomicUpdateRequest req,
GridNearAtomicUpdateResponse res
) {
+ if (F.size(res.failedKeys()) == req.keys().size())
+ return;
+
/*
* Choose value to be stored in near cache: first check key is not in failed and not in skipped list,
* then check if value was generated on primary node, if not then use value sent in request.
http://git-wip-us.apache.org/repos/asf/ignite/blob/96feee9f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
index ae64bb4..f4533f2 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
@@ -2981,6 +2981,8 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
IgniteInternalFuture<?> restartFut = restart ? restartFuture(stop, null) : null;
+ final long stopTime = U.currentTimeMillis() + getTestTimeout() - 30_000;
+
for (int i = 0; i < 30; i++) {
final AtomicInteger cntr = new AtomicInteger();
@@ -3007,6 +3009,9 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
barrier.await();
for (int i = 0; i < 1000; i++) {
+ if (i % 100 == 0 && U.currentTimeMillis() > stopTime)
+ break;
+
try {
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
Integer val = cache.get(key);
@@ -3036,6 +3041,9 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
assertTrue(cntr.get() > 0);
checkValue(key, cntr.get(), cacheName, restart);
+
+ if (U.currentTimeMillis() > stopTime)
+ break;
}
stop.set(true);
http://git-wip-us.apache.org/repos/asf/ignite/blob/96feee9f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
index b984afa..5b294cc 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
@@ -120,6 +120,9 @@ import static org.apache.ignite.transactions.TransactionState.COMMITTED;
*/
@SuppressWarnings("TransientFieldInNonSerializableClass")
public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstractSelfTest {
+ /** Test timeout */
+ private static final long TEST_TIMEOUT = 60 * 1000;
+
/** */
public static final CacheEntryProcessor<String, Integer, String> ERR_PROCESSOR =
new CacheEntryProcessor<String, Integer, String>() {
@@ -166,6 +169,11 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
private Map<String, CacheConfiguration[]> cacheCfgMap;
/** {@inheritDoc} */
+ @Override protected long getTestTimeout() {
+ return TEST_TIMEOUT;
+ }
+
+ /** {@inheritDoc} */
@Override protected int gridCount() {
return 1;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/96feee9f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicPutAllFailoverSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicPutAllFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicPutAllFailoverSelfTest.java
index b3464b8..3f9fc5c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicPutAllFailoverSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicPutAllFailoverSelfTest.java
@@ -30,6 +30,7 @@ public class IgniteCacheAtomicPutAllFailoverSelfTest extends GridCachePutAllFail
return ATOMIC;
}
+ /** {@inheritDoc} */
@Override public void testPutAllFailoverColocatedNearEnabledTwoBackupsOffheapTieredSwap(){
fail("https://issues.apache.org/jira/browse/IGNITE-1584");
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/96feee9f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteAtomicLongChangingTopologySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteAtomicLongChangingTopologySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteAtomicLongChangingTopologySelfTest.java
index 74cb9ef..f14dc5a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteAtomicLongChangingTopologySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteAtomicLongChangingTopologySelfTest.java
@@ -399,8 +399,14 @@ public class IgniteAtomicLongChangingTopologySelfTest extends GridCommonAbstract
/**
* @param i Node index.
+ * @param startLatch Thread start latch.
+ * @param run Run flag.
+ * @throws Exception If failed.
+ * @return Threads future.
*/
- private IgniteInternalFuture<?> startNodeAndCreaterThread(final int i, final CountDownLatch startLatch, final AtomicBoolean run)
+ private IgniteInternalFuture<?> startNodeAndCreaterThread(final int i,
+ final CountDownLatch startLatch,
+ final AtomicBoolean run)
throws Exception {
return multithreadedAsync(new Runnable() {
@Override public void run() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/96feee9f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAtomicMultiNodeP2PDisabledFullApiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAtomicMultiNodeP2PDisabledFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAtomicMultiNodeP2PDisabledFullApiSelfTest.java
index c468cc2..d4efff3 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAtomicMultiNodeP2PDisabledFullApiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAtomicMultiNodeP2PDisabledFullApiSelfTest.java
@@ -65,9 +65,4 @@ public class GridCacheAtomicMultiNodeP2PDisabledFullApiSelfTest
return ccfg;
}
-
- /** {@inheritDoc} */
- @Override public void testWithSkipStore() throws Exception {
- fail("https://issues.apache.org/jira/browse/IGNITE-1809");
- }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/96feee9f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAtomicNearEnabledFairAffinityMultiNodeFullApiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAtomicNearEnabledFairAffinityMultiNodeFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAtomicNearEnabledFairAffinityMultiNodeFullApiSelfTest.java
index e4784f2..64943e6 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAtomicNearEnabledFairAffinityMultiNodeFullApiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAtomicNearEnabledFairAffinityMultiNodeFullApiSelfTest.java
@@ -33,9 +33,4 @@ public class GridCacheAtomicNearEnabledFairAffinityMultiNodeFullApiSelfTest
return cfg;
}
-
- /** {@inheritDoc} */
- @Override public void testWithSkipStore(){
- fail("https://issues.apache.org/jira/browse/IGNITE-1582");
- }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/96feee9f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/NearCacheSyncUpdateTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/NearCacheSyncUpdateTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/NearCacheSyncUpdateTest.java
new file mode 100644
index 0000000..bbdf50e
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/NearCacheSyncUpdateTest.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.near;
+
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicWriteOrderMode;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK;
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+public class NearCacheSyncUpdateTest extends GridCommonAbstractTest {
+ /** */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ startGridsMultiThreaded(3);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testNearCacheSyncUpdateAtomic1() throws Exception {
+ nearCacheSyncUpdateTx(ATOMIC, CLOCK);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testNearCacheSyncUpdateAtomic2() throws Exception {
+ nearCacheSyncUpdateTx(ATOMIC, PRIMARY);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testNearCacheSyncUpdateTx() throws Exception {
+ nearCacheSyncUpdateTx(TRANSACTIONAL, null);
+ }
+
+ /**
+ * @param atomicityMode Atomicity mode.
+ * @param writeOrderMode Write order mode.
+ * @throws Exception If failed.
+ */
+ private void nearCacheSyncUpdateTx(CacheAtomicityMode atomicityMode,
+ CacheAtomicWriteOrderMode writeOrderMode) throws Exception {
+ final IgniteCache<Integer, Integer> cache =
+ ignite(0).createCache(cacheConfiguration(atomicityMode, writeOrderMode));
+
+ try {
+ final AtomicInteger idx = new AtomicInteger();
+
+ final int KEYS_PER_THREAD = 5000;
+
+ GridTestUtils.runMultiThreaded(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ int idx0 = idx.getAndIncrement();
+
+ int startKey = KEYS_PER_THREAD * idx0;
+
+ for (int i = startKey; i < startKey + KEYS_PER_THREAD; i++) {
+ cache.put(i, i);
+
+ assertEquals(i, (Object)cache.localPeek(i));
+
+ cache.remove(i);
+
+ assertNull(cache.get(i));
+ }
+
+ final int BATCH_SIZE = 50;
+
+ Map<Integer, Integer> map = new TreeMap<>();
+
+ for (int i = startKey; i < startKey + KEYS_PER_THREAD; i++) {
+ map.put(i, i);
+
+ if (map.size() == BATCH_SIZE) {
+ cache.putAll(map);
+
+ for (Integer key : map.keySet())
+ assertEquals(key, cache.localPeek(key));
+
+ cache.removeAll(map.keySet());
+
+ for (Integer key : map.keySet())
+ assertNull(cache.get(key));
+
+ map.clear();
+ }
+ }
+
+ return null;
+ }
+ }, 10, "update-thread");
+ }
+ finally {
+ ignite(0).destroyCache(null);
+ }
+ }
+
+ /**
+ * @param atomicityMode Atomicity mode.
+ * @param writeOrderMode Write order mode.
+ * @return Cache configuration.
+ */
+ private CacheConfiguration<Integer, Integer> cacheConfiguration(CacheAtomicityMode atomicityMode,
+ CacheAtomicWriteOrderMode writeOrderMode) {
+ CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>();
+
+ ccfg.setCacheMode(PARTITIONED);
+ ccfg.setBackups(1);
+ ccfg.setWriteSynchronizationMode(FULL_SYNC);
+ ccfg.setAtomicityMode(atomicityMode);
+ ccfg.setAtomicWriteOrderMode(writeOrderMode);
+ ccfg.setNearConfiguration(new NearCacheConfiguration<Integer, Integer>());
+
+ return ccfg;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/96feee9f/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
index a709cc4..38e3d98 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
@@ -37,6 +37,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.spi.IgniteSpiAdapter;
+import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.communication.CommunicationListener;
import org.apache.ignite.spi.communication.CommunicationSpi;
import org.apache.ignite.spi.communication.GridTestMessage;
@@ -133,6 +134,7 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS
* @param msgPerIter Messages per iteration.
* @throws Exception If failed.
*/
+ @SuppressWarnings("unchecked")
private void checkAck(int ackCnt, int idleTimeout, int msgPerIter) throws Exception {
createSpis(ackCnt, idleTimeout, TcpCommunicationSpi.DFLT_MSG_QUEUE_LIMIT);
@@ -196,8 +198,7 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS
final TestListener lsnr = (TestListener)spi.getListener();
GridTestUtils.waitForCondition(new GridAbsPredicate() {
- @Override
- public boolean apply() {
+ @Override public boolean apply() {
return lsnr.rcvCnt.get() >= expMsgs0;
}
}, 5000);
@@ -247,6 +248,7 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS
/**
* @throws Exception If failed.
*/
+ @SuppressWarnings("unchecked")
private void checkOverflow() throws Exception {
TcpCommunicationSpi spi0 = spis.get(0);
TcpCommunicationSpi spi1 = spis.get(1);
@@ -266,8 +268,20 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS
final GridNioSession ses0 = communicationSession(spi0);
- for (int i = 0; i < 150; i++)
- spi0.sendMessage(node1, new GridTestMessage(node0.id(), ++msgId, 0));
+ int sentMsgs = 1;
+
+ for (int i = 0; i < 150; i++) {
+ try {
+ spi0.sendMessage(node1, new GridTestMessage(node0.id(), ++msgId, 0));
+
+ sentMsgs++;
+ }
+ catch (IgniteSpiException e) {
+ log.info("Send error [err=" + e + ", sentMsgs=" + sentMsgs + ']');
+
+ break;
+ }
+ }
// Wait when session is closed because of queue overflow.
GridTestUtils.waitForCondition(new GridAbsPredicate() {
@@ -283,13 +297,12 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS
for (int i = 0; i < 100; i++)
spi0.sendMessage(node1, new GridTestMessage(node0.id(), ++msgId, 0));
- final int expMsgs = 251;
+ final int expMsgs = sentMsgs + 100;
final TestListener lsnr = (TestListener)spi1.getListener();
GridTestUtils.waitForCondition(new GridAbsPredicate() {
- @Override
- public boolean apply() {
+ @Override public boolean apply() {
return lsnr.rcvCnt.get() >= expMsgs;
}
}, 5000);
@@ -307,8 +320,7 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS
final GridNioServer srv = U.field(spi, "nioSrvr");
GridTestUtils.waitForCondition(new GridAbsPredicate() {
- @Override
- public boolean apply() {
+ @Override public boolean apply() {
Collection<? extends GridNioSession> sessions = GridTestUtils.getFieldValue(srv, "sessions");
return !sessions.isEmpty();
http://git-wip-us.apache.org/repos/asf/ignite/blob/96feee9f/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java
index fd2d91a..7521f2e 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java
@@ -40,6 +40,7 @@ import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.spi.IgniteSpiAdapter;
+import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.communication.CommunicationListener;
import org.apache.ignite.spi.communication.CommunicationSpi;
import org.apache.ignite.spi.communication.GridTestMessage;
@@ -135,6 +136,7 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic
* @param msgPerIter Messages per iteration.
* @throws Exception If failed.
*/
+ @SuppressWarnings("unchecked")
private void checkAck(int ackCnt, int idleTimeout, int msgPerIter) throws Exception {
createSpis(ackCnt, idleTimeout, TcpCommunicationSpi.DFLT_MSG_QUEUE_LIMIT);
@@ -154,7 +156,7 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic
final AtomicInteger ackMsgs = new AtomicInteger(0);
- IgniteInClosure<IgniteException> ackClosure = new CI1<IgniteException>() {
+ IgniteInClosure<IgniteException> ackC = new CI1<IgniteException>() {
@Override public void apply(IgniteException o) {
assert o == null;
@@ -163,9 +165,9 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic
};
for (int j = 0; j < msgPerIter; j++) {
- spi0.sendMessage(node1, new GridTestMessage(node0.id(), ++msgId, 0), ackClosure);
+ spi0.sendMessage(node1, new GridTestMessage(node0.id(), ++msgId, 0), ackC);
- spi1.sendMessage(node0, new GridTestMessage(node1.id(), ++msgId, 0), ackClosure);
+ spi1.sendMessage(node0, new GridTestMessage(node1.id(), ++msgId, 0), ackC);
}
expMsgs += msgPerIter;
@@ -207,8 +209,7 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic
final TestListener lsnr = (TestListener)spi.getListener();
GridTestUtils.waitForCondition(new GridAbsPredicate() {
- @Override
- public boolean apply() {
+ @Override public boolean apply() {
return lsnr.rcvCnt.get() >= expMsgs0;
}
}, 5000);
@@ -260,6 +261,7 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic
/**
* @throws Exception If failed.
*/
+ @SuppressWarnings("unchecked")
private void checkOverflow() throws Exception {
TcpCommunicationSpi spi0 = spis.get(0);
TcpCommunicationSpi spi1 = spis.get(1);
@@ -271,7 +273,7 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic
final AtomicInteger ackMsgs = new AtomicInteger(0);
- IgniteInClosure<IgniteException> ackClosure = new CI1<IgniteException>() {
+ IgniteInClosure<IgniteException> ackC = new CI1<IgniteException>() {
@Override public void apply(IgniteException o) {
assert o == null;
@@ -282,15 +284,27 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic
int msgId = 0;
// Send message to establish connection.
- spi0.sendMessage(node1, new GridTestMessage(node0.id(), ++msgId, 0), ackClosure);
+ spi0.sendMessage(node1, new GridTestMessage(node0.id(), ++msgId, 0), ackC);
// Prevent node1 from send
GridTestUtils.setFieldValue(srv1, "skipWrite", true);
final GridNioSession ses0 = communicationSession(spi0);
- for (int i = 0; i < 150; i++)
- spi0.sendMessage(node1, new GridTestMessage(node0.id(), ++msgId, 0), ackClosure);
+ int sentMsgs = 1;
+
+ for (int i = 0; i < 150; i++) {
+ try {
+ spi0.sendMessage(node1, new GridTestMessage(node0.id(), ++msgId, 0), ackC);
+
+ sentMsgs++;
+ }
+ catch (IgniteSpiException e) {
+ log.info("Send error [err=" + e + ", sentMsgs=" + sentMsgs + ']');
+
+ break;
+ }
+ }
// Wait when session is closed because of queue overflow.
GridTestUtils.waitForCondition(new GridAbsPredicate() {
@@ -304,9 +318,9 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic
GridTestUtils.setFieldValue(srv1, "skipWrite", false);
for (int i = 0; i < 100; i++)
- spi0.sendMessage(node1, new GridTestMessage(node0.id(), ++msgId, 0), ackClosure);
+ spi0.sendMessage(node1, new GridTestMessage(node0.id(), ++msgId, 0), ackC);
- final int expMsgs = 251;
+ final int expMsgs = sentMsgs + 100;
final TestListener lsnr = (TestListener)spi1.getListener();
@@ -335,8 +349,7 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic
final GridNioServer srv = U.field(spi, "nioSrvr");
GridTestUtils.waitForCondition(new GridAbsPredicate() {
- @Override
- public boolean apply() {
+ @Override public boolean apply() {
Collection<? extends GridNioSession> sessions = GridTestUtils.getFieldValue(srv, "sessions");
return !sessions.isEmpty();
http://git-wip-us.apache.org/repos/asf/ignite/blob/96feee9f/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
index c94931e..cadcba7 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
@@ -106,6 +106,7 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridCachePar
import org.apache.ignite.internal.processors.cache.distributed.near.GridCachePartitionedTxTimeoutSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheRendezvousAffinityClientSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridPartitionedBackupLoadSelfTest;
+import org.apache.ignite.internal.processors.cache.distributed.near.NearCacheSyncUpdateTest;
import org.apache.ignite.internal.processors.cache.distributed.near.NoneRebalanceModeSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.replicated.GridCacheReplicatedEvictionSelfTest;
import org.apache.ignite.internal.processors.cache.local.GridCacheLocalAtomicBasicStoreSelfTest;
@@ -246,6 +247,7 @@ public class IgniteCacheTestSuite2 extends TestSuite {
suite.addTest(new TestSuite(CrossCacheTxRandomOperationsTest.class));
suite.addTest(new TestSuite(IgniteDynamicCacheAndNodeStop.class));
suite.addTest(new TestSuite(CacheLockReleaseNodeLeaveTest.class));
+ suite.addTest(new TestSuite(NearCacheSyncUpdateTest.class));
return suite;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/96feee9f/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
index af86fbb..af7eb7e 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
@@ -39,6 +39,9 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.jdbc.TcpDiscoveryJdbcIpFinde
import org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinderSelfTest;
import org.apache.ignite.spi.discovery.tcp.ipfinder.sharedfs.TcpDiscoverySharedFsIpFinderSelfTest;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinderSelfTest;
+import org.apache.ignite.testframework.GridTestUtils;
+
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_OVERRIDE_MCAST_GRP;
/**
* Test suite for all discovery spi implementations.
@@ -49,6 +52,9 @@ public class IgniteSpiDiscoverySelfTestSuite extends TestSuite {
* @throws Exception If failed.
*/
public static TestSuite suite() throws Exception {
+ System.setProperty(IGNITE_OVERRIDE_MCAST_GRP,
+ GridTestUtils.getNextMulticastGroup(IgniteSpiDiscoverySelfTestSuite.class));
+
TestSuite suite = new TestSuite("Ignite Discovery SPI Test Suite");
// Tcp.