You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2016/03/21 15:21:15 UTC
[50/50] [abbrv] ignite git commit: IGNITE-2801 Coordinator floods
network with partitions full map exchange messages
IGNITE-2801 Coordinator floods network with partitions full map exchange messages
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/cb56a4aa
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/cb56a4aa
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/cb56a4aa
Branch: refs/heads/ignite-2801
Commit: cb56a4aa58f4b610aa603f8c27ea2715803b1847
Parents: fa356e3
Author: Anton Vinogradov <av...@apache.org>
Authored: Mon Mar 21 17:19:36 2016 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Mon Mar 21 17:19:36 2016 +0300
----------------------------------------------------------------------
.../GridCachePartitionExchangeManager.java | 8 +-
...RabalancingPartitionMapExchangeSelfTest.java | 190 +++++++++++++++++++
.../GridCacheRebalancingSyncSelfTest.java | 7 +-
3 files changed, 196 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/cb56a4aa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 1681f2f..caa32d8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -1264,13 +1264,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
break;
}
- // If not first preloading and no more topology events present,
- // then we periodically refresh partition map.
- if (!cctx.kernalContext().clientNode() && futQ.isEmpty() && preloadFinished) {
- refreshPartitions(timeout);
-
+ if (!cctx.kernalContext().clientNode() && futQ.isEmpty() && preloadFinished)
timeout = cctx.gridConfig().getNetworkTimeout();
- }
+
// After workers line up and before preloading starts we initialize all futures.
if (log.isDebugEnabled())
http://git-wip-us.apache.org/repos/asf/ignite/blob/cb56a4aa/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingPartitionMapExchangeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingPartitionMapExchangeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingPartitionMapExchangeSelfTest.java
new file mode 100644
index 0000000..8cf11cc
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingPartitionMapExchangeSelfTest.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.rebalancing;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutput;
+import java.io.ObjectOutputStream;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CacheRebalanceMode;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ *
+ */
+public class GridCacheRabalancingPartitionMapExchangeSelfTest extends GridCommonAbstractTest {
+ /** */
+ protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private final ConcurrentHashMap<String, AtomicInteger> map = new ConcurrentHashMap<>();
+
+ /** */
+ private volatile boolean record = false;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration iCfg = super.getConfiguration(gridName);
+
+ TcpCommunicationSpi commSpi = new CountingCommunicationSpi();
+
+ commSpi.setLocalPort(GridTestUtils.getNextCommPort(getClass()));
+ commSpi.setTcpNoDelay(true);
+
+ iCfg.setCommunicationSpi(commSpi);
+
+ return iCfg;
+ }
+
+ /**
+ *
+ */
+ public class CountingCommunicationSpi extends TcpCommunicationSpi {
+ /** {@inheritDoc} */
+ @Override public void sendMessage(final ClusterNode node, final Message msg,
+ final IgniteInClosure<IgniteException> ackC) throws IgniteSpiException {
+ final Object msg0 = ((GridIoMessage)msg).message();
+
+ recordMessage(msg0);
+
+ super.sendMessage(node, msg, ackC);
+ }
+ }
+
+ /**
+ * @param msg
+ */
+ private void recordMessage(Object msg) {
+ if (record) {
+ String id = msg.getClass().toString();
+
+ if (msg instanceof GridDhtPartitionsFullMessage)
+ id += ((GridDhtPartitionsFullMessage)msg).exchangeId();
+
+ int size = 0;
+
+ try {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ ObjectOutput out = new ObjectOutputStream(bos);
+ out.writeObject(msg);
+
+ size = bos.toByteArray().length;
+ }
+ catch (IOException e) {
+ }
+
+ AtomicInteger ai = map.get("cnt " + id);
+
+ if (ai == null) {
+ ai = new AtomicInteger();
+
+ AtomicInteger oldAi = map.putIfAbsent("cnt " + id, ai);
+
+ (oldAi != null ? oldAi : ai).addAndGet(size);
+ }
+ else
+ ai.addAndGet(size);
+
+ ai = map.get("size" + id);
+
+ if (ai == null) {
+ ai = new AtomicInteger();
+
+ AtomicInteger oldAi = map.putIfAbsent("size" + id, ai);
+
+ (oldAi != null ? oldAi : ai).incrementAndGet();
+ }
+ else
+ ai.incrementAndGet();
+ }
+ }
+
+ @Override protected long getTestTimeout() {
+ return Long.MAX_VALUE;
+ }
+
+ /**
+ * @throws Exception e.
+ */
+ public void test() throws Exception {
+ record = false;
+
+ startGrids(10);
+
+ awaitPartitionMapExchange(true);
+
+ for (int i = 0; i < 10; i++) {
+ CacheConfiguration<Integer, Integer> cfg = new CacheConfiguration<>();
+
+ cfg.setName("cache" + i);
+ cfg.setCacheMode(CacheMode.PARTITIONED);
+ cfg.setRebalanceMode(CacheRebalanceMode.SYNC);
+ cfg.setBackups(1);
+
+ ignite(0).getOrCreateCache(cfg);
+ }
+
+ awaitPartitionMapExchange(true);
+
+ U.sleep(60_000);
+
+ System.out.println("--------------------------TESTING--------------------------");
+
+ record = true;
+
+ U.sleep(60_000);
+
+ record = false;
+
+ for (Map.Entry entry : map.entrySet()) {
+ System.out.println(entry.getKey().toString() + " ------ " + entry.getValue().toString());
+ }
+
+ IgniteKernal ignite = ((IgniteKernal)grid(0).cluster().forOldest().ignite());
+
+ ignite.dumpDebugInfo();
+
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/cb56a4aa/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
index 4ee080f..ceba0e1 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
@@ -172,8 +172,9 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
log.info("<" + name + "> Checked " + (i + 1) * 100 / (TEST_SIZE) + "% entries. [count=" + TEST_SIZE +
", iteration=" + iter + ", cache=" + name + "]");
- assert ignite.cache(name).get(i) != null && ignite.cache(name).get(i).equals(i + name.hashCode() + iter) :
- i + " value " + (i + name.hashCode() + iter) + " does not match (" + ignite.cache(name).get(i) + ")";
+ assertTrue(i + " value " + (i + name.hashCode() + iter) + " does not match (" + ignite.cache(name).get(i) + ")",
+ ignite.cache(name).get(i) != null && ignite.cache(name).get(i).equals(i + name.hashCode() + iter));
+
}
}
@@ -340,7 +341,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
Map map = U.field(supplier, "scMap");
synchronized (map) {
- assert map.isEmpty();
+ assertTrue(map.isEmpty());
}
}
}