You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2017/04/24 08:21:57 UTC

[39/65] [abbrv] ignite git commit: IgniteCacheAtomicProtocolTest fix - Fixes #1839.

IgniteCacheAtomicProtocolTest fix - Fixes #1839.

Signed-off-by: Alexey Goncharuk <al...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1f382afb
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1f382afb
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1f382afb

Branch: refs/heads/ignite-5024
Commit: 1f382afba6028bcb4463c3926e2d4e7ba6ffc590
Parents: 3eb52a8
Author: Konstantin Dudkov <kd...@ya.ru>
Authored: Fri Apr 21 11:47:31 2017 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Fri Apr 21 11:47:31 2017 +0300

----------------------------------------------------------------------
 .../atomic/IgniteCacheAtomicProtocolTest.java   | 175 +++++++++++++++++--
 1 file changed, 156 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/1f382afb/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
index 29d67e2..5a6b1c8 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
@@ -17,11 +17,14 @@
 
 package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
 
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import javax.cache.processor.MutableEntry;
 import org.apache.ignite.Ignite;
@@ -29,12 +32,18 @@ import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.CacheEntryProcessor;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cache.affinity.AffinityFunction;
+import org.apache.ignite.cache.affinity.AffinityFunctionContext;
+import org.apache.ignite.cluster.ClusterMetrics;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.TestRecordingCommunicationSpi;
 import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.affinity.GridAffinityFunctionContextImpl;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheMessage;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
@@ -44,11 +53,13 @@ import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.lang.IgniteProductVersion;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
 import static org.apache.ignite.cache.CacheRebalanceMode.ASYNC;
@@ -357,7 +368,7 @@ public class IgniteCacheAtomicProtocolTest extends GridCommonAbstractTest {
 
         final IgniteCache<Integer, Integer> nearCache = clientNode.createCache(cacheConfiguration(1, FULL_ASYNC));
 
-        List<Integer> keys = primaryKeys(srv0.cache(TEST_CACHE), putAll ? 3 : 1);
+        List<Integer> keys = getKeysMoved(srv0, TEST_CACHE, putAll ? 3 : 1);
 
         testSpi(clientNode).blockMessages(GridNearAtomicSingleUpdateRequest.class, srv0.name());
         testSpi(clientNode).blockMessages(GridNearAtomicFullUpdateRequest.class, srv0.name());
@@ -372,30 +383,18 @@ public class IgniteCacheAtomicProtocolTest extends GridCommonAbstractTest {
         else
             nearCache.put(keys.get(0), map.get(keys.get(0)));
 
-        int nodeIdx = 2;
-
         Affinity<Object> aff = clientNode.affinity(TEST_CACHE);
 
-        int keysMoved;
-
-        do {
-            startGrid(nodeIdx);
-
-            awaitPartitionMapExchange();
-
-            keysMoved = 0;
+        startGrid(2);
 
-            for (Integer key : keys) {
-                if (!aff.isPrimary(srv0.cluster().localNode(), key))
-                    keysMoved++;
-            }
+        awaitPartitionMapExchange();
 
-            if (keysMoved == keys.size())
-                break;
+        int keysMoved = 0;
 
-            nodeIdx++;
+        for (Integer key : keys) {
+            if (!aff.isPrimary(srv0.cluster().localNode(), key))
+                keysMoved++;
         }
-        while (nodeIdx < 10);
 
         assertEquals(keys.size(), keysMoved);
 
@@ -826,6 +825,68 @@ public class IgniteCacheAtomicProtocolTest extends GridCommonAbstractTest {
     }
 
     /**
+     * Return list of keys that are primary for given node on given topology,
+     * but will not be primary after add one new node.
+     *
+     * @param ign Ignite.
+     * @param cacheName Cache name.
+     * @param size Number of keys.
+     * @return List of keys.
+     */
+    private List<Integer> getKeysMoved(Ignite ign, String cacheName, int size) {
+        GridCacheContext<Object, Object> cctx = ((IgniteKernal)ign).context().cache().internalCache(cacheName).context();
+
+        ArrayList<ClusterNode> nodes = new ArrayList<>(ign.cluster().nodes());
+
+        AffinityFunction func = cctx.config().getAffinity();
+
+        AffinityFunctionContext ctx = new GridAffinityFunctionContextImpl(
+            nodes,
+            null,
+            null,
+            new AffinityTopologyVersion(1, 0),
+            cctx.config().getBackups());
+
+        List<List<ClusterNode>> calcAff = func.assignPartitions(ctx);
+
+        String name = getTestIgniteInstanceName(nodes.size());
+
+        nodes.add(new FakeNode(name));
+
+        ctx = new GridAffinityFunctionContextImpl(
+            nodes,
+            null,
+            null,
+            new AffinityTopologyVersion(1, 0),
+            cctx.config().getBackups());
+
+        List<List<ClusterNode>> calcAff2 = func.assignPartitions(ctx);
+
+        Set<Integer> movedParts = new HashSet<>();
+
+        UUID localId = ign.cluster().localNode().id();
+
+        for (int i = 0; i < calcAff.size(); i++) {
+            if (calcAff.get(i).get(0).id().equals(localId) && !calcAff2.get(i).get(0).id().equals(localId))
+                movedParts.add(i);
+        }
+
+        List<Integer> keys = new ArrayList<>();
+
+        for (int i = 0; i < 10000; i++) {
+            int keyPart = func.partition(ign.affinity(cacheName).affinityKey(i));
+
+            if (movedParts.contains(keyPart))
+                keys.add(i);
+
+            if (keys.size() == size)
+                break;
+        }
+
+        return keys;
+    }
+
+    /**
      *
      */
     public static class SetValueEntryProcessor implements CacheEntryProcessor<Integer, Integer, Object> {
@@ -847,4 +908,80 @@ public class IgniteCacheAtomicProtocolTest extends GridCommonAbstractTest {
             return null;
         }
     }
+
+    /**
+     *
+     */
+    public static class FakeNode implements ClusterNode {
+        /** */
+        private final String consistendId;
+        /** */
+        private final UUID uuid;
+
+        /** */
+        public FakeNode(String consistendId) {
+            this.consistendId = consistendId;
+            uuid = UUID.randomUUID();
+        }
+
+        /** {@inheritDoc} */
+        @Override public UUID id() {
+            return uuid;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object consistentId() {
+            return consistendId;
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public <T> T attribute(String name) {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public ClusterMetrics metrics() {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Map<String, Object> attributes() {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Collection<String> addresses() {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Collection<String> hostNames() {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public long order() {
+            return 0;
+        }
+
+        /** {@inheritDoc} */
+        @Override public IgniteProductVersion version() {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean isLocal() {
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean isDaemon() {
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean isClient() {
+            return false;
+        }
+    }
 }