You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2017/04/18 13:46:36 UTC

[51/62] [abbrv] ignite git commit: Fixed CacheExchangeMessageDuplicatedStateTest (was broken after affinity function changes).

Fixed CacheExchangeMessageDuplicatedStateTest (was broken after affinity function changes).


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a6da0959
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a6da0959
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a6da0959

Branch: refs/heads/ignite-4938
Commit: a6da0959f435c4a765e441f42d2c400bb128560b
Parents: 65eeec5
Author: sboikov <sb...@gridgain.com>
Authored: Tue Apr 18 14:34:51 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Apr 18 14:34:51 2017 +0300

----------------------------------------------------------------------
 .../configuration/CacheConfiguration.java       |  7 +++-
 .../cache/CacheAffinitySharedManager.java       |  8 +++-
 .../internal/TestRecordingCommunicationSpi.java | 16 +++++++-
 ...CacheExchangeMessageDuplicatedStateTest.java | 41 +++++++++++---------
 4 files changed, 51 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/a6da0959/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
index 2d81458..b5afba4 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
@@ -2263,7 +2263,7 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
         private static final long serialVersionUID = 0L;
 
         /** {@inheritDoc} */
-        @Override public boolean apply(ClusterNode clusterNode) {
+        @Override public boolean apply(ClusterNode node) {
             return true;
         }
 
@@ -2271,6 +2271,11 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
         @Override public boolean equals(Object obj) {
             return obj != null && obj.getClass().equals(this.getClass());
         }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return "IgniteAllNodesPredicate []";
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/a6da0959/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index af63d54..77e832e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -1738,7 +1738,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
          * @param aff Affinity.
          * @param initAff Current affinity.
          */
-        public CacheHolder2(
+        CacheHolder2(
             boolean rebalanceEnabled,
             GridCacheSharedContext cctx,
             GridAffinityAssignmentCache aff,
@@ -1828,5 +1828,11 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
             cacheAssignment.put(part, assignment);
         }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return "WaitRebalanceInfo [topVer=" + topVer +
+                ", caches=" + (waitCaches != null ? waitCaches.keySet() : null) + ']';
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/a6da0959/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java b/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java
index c4d8a79..c3d26b7 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java
@@ -29,6 +29,7 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.managers.communication.GridIoMessage;
 import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.plugin.extensions.communication.Message;
@@ -45,6 +46,9 @@ public class TestRecordingCommunicationSpi extends TcpCommunicationSpi {
     private Set<Class<?>> recordClasses;
 
     /** */
+    private IgniteBiPredicate<ClusterNode, Message> recordP;
+
+    /** */
     private List<Object> recordedMsgs = new ArrayList<>();
 
     /** */
@@ -65,7 +69,8 @@ public class TestRecordingCommunicationSpi extends TcpCommunicationSpi {
             Object msg0 = ioMsg.message();
 
             synchronized (this) {
-                if (recordClasses != null && recordClasses.contains(msg0.getClass()))
+                if ((recordClasses != null && recordClasses.contains(msg0.getClass())) ||
+                    (recordP != null && recordP.apply(node, msg)))
                     recordedMsgs.add(msg0);
 
                 boolean block = false;
@@ -99,6 +104,15 @@ public class TestRecordingCommunicationSpi extends TcpCommunicationSpi {
     }
 
     /**
+     * @param recordP Record predicate.
+     */
+    public void record(IgniteBiPredicate<ClusterNode, Message> recordP) {
+        synchronized (this) {
+            this.recordP = recordP;
+        }
+    }
+
+    /**
      * @param recordClasses Message classes to record.
      */
     public void record(Class<?>... recordClasses) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/a6da0959/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheExchangeMessageDuplicatedStateTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheExchangeMessageDuplicatedStateTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheExchangeMessageDuplicatedStateTest.java
index ef7c7a7..6c74dae 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheExchangeMessageDuplicatedStateTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheExchangeMessageDuplicatedStateTest.java
@@ -27,12 +27,16 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsAbstractMessage;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
 import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
@@ -74,7 +78,16 @@ public class CacheExchangeMessageDuplicatedStateTest extends GridCommonAbstractT
 
         TestRecordingCommunicationSpi commSpi = new TestRecordingCommunicationSpi();
 
-        commSpi.record(GridDhtPartitionsSingleMessage.class, GridDhtPartitionsFullMessage.class);
+        commSpi.record(new IgniteBiPredicate<ClusterNode, Message>() {
+            @Override public boolean apply(ClusterNode node, Message msg) {
+                Message msg0 = ((GridIoMessage) msg).message();
+
+                return (msg0.getClass() == GridDhtPartitionsSingleMessage.class ||
+                    msg0.getClass() == GridDhtPartitionsFullMessage.class) &&
+                    ((GridDhtPartitionsAbstractMessage) msg0).exchangeId() != null;
+
+            }
+        });
 
         cfg.setCommunicationSpi(commSpi);
 
@@ -83,13 +96,13 @@ public class CacheExchangeMessageDuplicatedStateTest extends GridCommonAbstractT
         {
             CacheConfiguration ccfg = new CacheConfiguration();
             ccfg.setName(AFF1_CACHE1);
-            ccfg.setAffinity(new RendezvousAffinityFunction(false,512));
+            ccfg.setAffinity(new RendezvousAffinityFunction(false, 512));
             ccfgs.add(ccfg);
         }
         {
             CacheConfiguration ccfg = new CacheConfiguration();
             ccfg.setName(AFF1_CACHE2);
-            ccfg.setAffinity(new RendezvousAffinityFunction(false,512));
+            ccfg.setAffinity(new RendezvousAffinityFunction(false, 512));
             ccfgs.add(ccfg);
         }
         {
@@ -142,31 +155,23 @@ public class CacheExchangeMessageDuplicatedStateTest extends GridCommonAbstractT
     public void testExchangeMessages() throws Exception {
         ignite(0);
 
-        startGrid(1);
-
-        awaitPartitionMapExchange();
-
-        checkMessages(0, true);
+        final int SRVS = 4;
 
-        startGrid(2);
+        for (int i = 1; i < SRVS; i++) {
+            startGrid(i);
 
-        awaitPartitionMapExchange();
+            awaitPartitionMapExchange();
 
-        checkMessages(0, true);
+            checkMessages(0, true);
+        }
 
         client = true;
 
-        startGrid(3);
+        startGrid(SRVS);
 
         awaitPartitionMapExchange();
 
         checkMessages(0, false);
-
-        stopGrid(0);
-
-        awaitPartitionMapExchange();
-
-        checkMessages(1, true);
     }
 
     /**