You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2016/03/21 15:21:15 UTC

[50/50] [abbrv] ignite git commit: IGNITE-2801 Coordinator floods network with partitions full map exchange messages

IGNITE-2801 Coordinator floods network with partitions full map exchange messages


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/cb56a4aa
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/cb56a4aa
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/cb56a4aa

Branch: refs/heads/ignite-2801
Commit: cb56a4aa58f4b610aa603f8c27ea2715803b1847
Parents: fa356e3
Author: Anton Vinogradov <av...@apache.org>
Authored: Mon Mar 21 17:19:36 2016 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Mon Mar 21 17:19:36 2016 +0300

----------------------------------------------------------------------
 .../GridCachePartitionExchangeManager.java      |   8 +-
 ...RabalancingPartitionMapExchangeSelfTest.java | 190 +++++++++++++++++++
 .../GridCacheRebalancingSyncSelfTest.java       |   7 +-
 3 files changed, 196 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/cb56a4aa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 1681f2f..caa32d8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -1264,13 +1264,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                             break;
                     }
 
-                    // If not first preloading and no more topology events present,
-                    // then we periodically refresh partition map.
-                    if (!cctx.kernalContext().clientNode() && futQ.isEmpty() && preloadFinished) {
-                        refreshPartitions(timeout);
-
+                    if (!cctx.kernalContext().clientNode() && futQ.isEmpty() && preloadFinished)
                         timeout = cctx.gridConfig().getNetworkTimeout();
-                    }
+
 
                     // After workers line up and before preloading starts we initialize all futures.
                     if (log.isDebugEnabled())

http://git-wip-us.apache.org/repos/asf/ignite/blob/cb56a4aa/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingPartitionMapExchangeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingPartitionMapExchangeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingPartitionMapExchangeSelfTest.java
new file mode 100644
index 0000000..8cf11cc
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingPartitionMapExchangeSelfTest.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.rebalancing;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutput;
+import java.io.ObjectOutputStream;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CacheRebalanceMode;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ *
+ */
+public class GridCacheRabalancingPartitionMapExchangeSelfTest extends GridCommonAbstractTest {
+    /** */
+    protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private final ConcurrentHashMap<String, AtomicInteger> map = new ConcurrentHashMap<>();
+
+    /** */
+    private volatile boolean record = false;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration iCfg = super.getConfiguration(gridName);
+
+        TcpCommunicationSpi commSpi = new CountingCommunicationSpi();
+
+        commSpi.setLocalPort(GridTestUtils.getNextCommPort(getClass()));
+        commSpi.setTcpNoDelay(true);
+
+        iCfg.setCommunicationSpi(commSpi);
+
+        return iCfg;
+    }
+
+    /**
+     *
+     */
+    public class CountingCommunicationSpi extends TcpCommunicationSpi {
+        /** {@inheritDoc} */
+        @Override public void sendMessage(final ClusterNode node, final Message msg,
+            final IgniteInClosure<IgniteException> ackC) throws IgniteSpiException {
+            final Object msg0 = ((GridIoMessage)msg).message();
+
+            recordMessage(msg0);
+
+            super.sendMessage(node, msg, ackC);
+        }
+    }
+
+    /**
+     * @param msg
+     */
+    private void recordMessage(Object msg) {
+        if (record) {
+            String id = msg.getClass().toString();
+
+            if (msg instanceof GridDhtPartitionsFullMessage)
+                id += ((GridDhtPartitionsFullMessage)msg).exchangeId();
+
+            int size = 0;
+
+            try {
+                ByteArrayOutputStream bos = new ByteArrayOutputStream();
+                ObjectOutput out = new ObjectOutputStream(bos);
+                out.writeObject(msg);
+
+                size = bos.toByteArray().length;
+            }
+            catch (IOException e) {
+            }
+
+            AtomicInteger ai = map.get("cnt " + id);
+
+            if (ai == null) {
+                ai = new AtomicInteger();
+
+                AtomicInteger oldAi = map.putIfAbsent("cnt " + id, ai);
+
+                (oldAi != null ? oldAi : ai).addAndGet(size);
+            }
+            else
+                ai.addAndGet(size);
+
+            ai = map.get("size" + id);
+
+            if (ai == null) {
+                ai = new AtomicInteger();
+
+                AtomicInteger oldAi = map.putIfAbsent("size" + id, ai);
+
+                (oldAi != null ? oldAi : ai).incrementAndGet();
+            }
+            else
+                ai.incrementAndGet();
+        }
+    }
+
+    @Override protected long getTestTimeout() {
+        return Long.MAX_VALUE;
+    }
+
+    /**
+     * @throws Exception e.
+     */
+    public void test() throws Exception {
+        record = false;
+
+        startGrids(10);
+
+        awaitPartitionMapExchange(true);
+
+        for (int i = 0; i < 10; i++) {
+            CacheConfiguration<Integer, Integer> cfg = new CacheConfiguration<>();
+
+            cfg.setName("cache" + i);
+            cfg.setCacheMode(CacheMode.PARTITIONED);
+            cfg.setRebalanceMode(CacheRebalanceMode.SYNC);
+            cfg.setBackups(1);
+
+            ignite(0).getOrCreateCache(cfg);
+        }
+
+        awaitPartitionMapExchange(true);
+
+        U.sleep(60_000);
+
+        System.out.println("--------------------------TESTING--------------------------");
+
+        record = true;
+
+        U.sleep(60_000);
+
+        record = false;
+
+        for (Map.Entry entry : map.entrySet()) {
+            System.out.println(entry.getKey().toString() + " ------ " + entry.getValue().toString());
+        }
+
+        IgniteKernal ignite = ((IgniteKernal)grid(0).cluster().forOldest().ignite());
+
+        ignite.dumpDebugInfo();
+
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/cb56a4aa/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
index 4ee080f..ceba0e1 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
@@ -172,8 +172,9 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
                 log.info("<" + name + "> Checked " + (i + 1) * 100 / (TEST_SIZE) + "% entries. [count=" + TEST_SIZE +
                     ", iteration=" + iter + ", cache=" + name + "]");
 
-            assert ignite.cache(name).get(i) != null && ignite.cache(name).get(i).equals(i + name.hashCode() + iter) :
-                i + " value " + (i + name.hashCode() + iter) + " does not match (" + ignite.cache(name).get(i) + ")";
+            assertTrue(i + " value " + (i + name.hashCode() + iter) + " does not match (" + ignite.cache(name).get(i) + ")",
+                ignite.cache(name).get(i) != null && ignite.cache(name).get(i).equals(i + name.hashCode() + iter));
+
         }
     }
 
@@ -340,7 +341,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
                 Map map = U.field(supplier, "scMap");
 
                 synchronized (map) {
-                    assert map.isEmpty();
+                    assertTrue(map.isEmpty());
                 }
             }
         }