You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2017/04/24 08:21:57 UTC
[39/65] [abbrv] ignite git commit: IgniteCacheAtomicProtocolTest fix
- Fixes #1839.
IgniteCacheAtomicProtocolTest fix - Fixes #1839.
Signed-off-by: Alexey Goncharuk <al...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1f382afb
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1f382afb
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1f382afb
Branch: refs/heads/ignite-5024
Commit: 1f382afba6028bcb4463c3926e2d4e7ba6ffc590
Parents: 3eb52a8
Author: Konstantin Dudkov <kd...@ya.ru>
Authored: Fri Apr 21 11:47:31 2017 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Fri Apr 21 11:47:31 2017 +0300
----------------------------------------------------------------------
.../atomic/IgniteCacheAtomicProtocolTest.java | 175 +++++++++++++++++--
1 file changed, 156 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/1f382afb/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
index 29d67e2..5a6b1c8 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
@@ -17,11 +17,14 @@
package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.TimeUnit;
import javax.cache.processor.MutableEntry;
import org.apache.ignite.Ignite;
@@ -29,12 +32,18 @@ import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheEntryProcessor;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cache.affinity.AffinityFunction;
+import org.apache.ignite.cache.affinity.AffinityFunctionContext;
+import org.apache.ignite.cluster.ClusterMetrics;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.affinity.GridAffinityFunctionContextImpl;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
@@ -44,11 +53,13 @@ import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
import static org.apache.ignite.cache.CacheRebalanceMode.ASYNC;
@@ -357,7 +368,7 @@ public class IgniteCacheAtomicProtocolTest extends GridCommonAbstractTest {
final IgniteCache<Integer, Integer> nearCache = clientNode.createCache(cacheConfiguration(1, FULL_ASYNC));
- List<Integer> keys = primaryKeys(srv0.cache(TEST_CACHE), putAll ? 3 : 1);
+ List<Integer> keys = getKeysMoved(srv0, TEST_CACHE, putAll ? 3 : 1);
testSpi(clientNode).blockMessages(GridNearAtomicSingleUpdateRequest.class, srv0.name());
testSpi(clientNode).blockMessages(GridNearAtomicFullUpdateRequest.class, srv0.name());
@@ -372,30 +383,18 @@ public class IgniteCacheAtomicProtocolTest extends GridCommonAbstractTest {
else
nearCache.put(keys.get(0), map.get(keys.get(0)));
- int nodeIdx = 2;
-
Affinity<Object> aff = clientNode.affinity(TEST_CACHE);
- int keysMoved;
-
- do {
- startGrid(nodeIdx);
-
- awaitPartitionMapExchange();
-
- keysMoved = 0;
+ startGrid(2);
- for (Integer key : keys) {
- if (!aff.isPrimary(srv0.cluster().localNode(), key))
- keysMoved++;
- }
+ awaitPartitionMapExchange();
- if (keysMoved == keys.size())
- break;
+ int keysMoved = 0;
- nodeIdx++;
+ for (Integer key : keys) {
+ if (!aff.isPrimary(srv0.cluster().localNode(), key))
+ keysMoved++;
}
- while (nodeIdx < 10);
assertEquals(keys.size(), keysMoved);
@@ -826,6 +825,68 @@ public class IgniteCacheAtomicProtocolTest extends GridCommonAbstractTest {
}
/**
+ * Return list of keys that are primary for given node on given topology,
+ * but will not be primary after add one new node.
+ *
+ * @param ign Ignite.
+ * @param cacheName Cache name.
+ * @param size Number of keys.
+ * @return List of keys.
+ */
+ private List<Integer> getKeysMoved(Ignite ign, String cacheName, int size) {
+ GridCacheContext<Object, Object> cctx = ((IgniteKernal)ign).context().cache().internalCache(cacheName).context();
+
+ ArrayList<ClusterNode> nodes = new ArrayList<>(ign.cluster().nodes());
+
+ AffinityFunction func = cctx.config().getAffinity();
+
+ AffinityFunctionContext ctx = new GridAffinityFunctionContextImpl(
+ nodes,
+ null,
+ null,
+ new AffinityTopologyVersion(1, 0),
+ cctx.config().getBackups());
+
+ List<List<ClusterNode>> calcAff = func.assignPartitions(ctx);
+
+ String name = getTestIgniteInstanceName(nodes.size());
+
+ nodes.add(new FakeNode(name));
+
+ ctx = new GridAffinityFunctionContextImpl(
+ nodes,
+ null,
+ null,
+ new AffinityTopologyVersion(1, 0),
+ cctx.config().getBackups());
+
+ List<List<ClusterNode>> calcAff2 = func.assignPartitions(ctx);
+
+ Set<Integer> movedParts = new HashSet<>();
+
+ UUID localId = ign.cluster().localNode().id();
+
+ for (int i = 0; i < calcAff.size(); i++) {
+ if (calcAff.get(i).get(0).id().equals(localId) && !calcAff2.get(i).get(0).id().equals(localId))
+ movedParts.add(i);
+ }
+
+ List<Integer> keys = new ArrayList<>();
+
+ for (int i = 0; i < 10000; i++) {
+ int keyPart = func.partition(ign.affinity(cacheName).affinityKey(i));
+
+ if (movedParts.contains(keyPart))
+ keys.add(i);
+
+ if (keys.size() == size)
+ break;
+ }
+
+ return keys;
+ }
+
+ /**
*
*/
public static class SetValueEntryProcessor implements CacheEntryProcessor<Integer, Integer, Object> {
@@ -847,4 +908,80 @@ public class IgniteCacheAtomicProtocolTest extends GridCommonAbstractTest {
return null;
}
}
+
+ /**
+ *
+ */
+ public static class FakeNode implements ClusterNode {
+ /** */
+ private final String consistendId;
+ /** */
+ private final UUID uuid;
+
+ /** */
+ public FakeNode(String consistendId) {
+ this.consistendId = consistendId;
+ uuid = UUID.randomUUID();
+ }
+
+ /** {@inheritDoc} */
+ @Override public UUID id() {
+ return uuid;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object consistentId() {
+ return consistendId;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public <T> T attribute(String name) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public ClusterMetrics metrics() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Map<String, Object> attributes() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<String> addresses() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<String> hostNames() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long order() {
+ return 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteProductVersion version() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isLocal() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isDaemon() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isClient() {
+ return false;
+ }
+ }
}