You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by yz...@apache.org on 2015/09/30 18:22:58 UTC
ignite git commit: debugging rebalance
Repository: ignite
Updated Branches:
refs/heads/ignite-slow-rebal 79bc2eb07 -> 663a2d993
debugging rebalance
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/663a2d99
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/663a2d99
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/663a2d99
Branch: refs/heads/ignite-slow-rebal
Commit: 663a2d993b9750b34eb8eef89cebf5cea4db8c6d
Parents: 79bc2eb
Author: Yakov Zhdanov <yz...@gridgain.com>
Authored: Wed Sep 30 19:21:58 2015 +0300
Committer: Yakov Zhdanov <yz...@gridgain.com>
Committed: Wed Sep 30 19:21:58 2015 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheIoManager.java | 19 +-
.../GridDhtPartitionDemandMessage.java | 3 +-
.../GridDhtPartitionSupplyMessage.java | 3 +-
.../GridDhtPartitionsExchangeFuture.java | 10 +-
.../preloader/GridDhtPartitionsFullMessage.java | 11 +-
.../communication/tcp/TcpCommunicationSpi.java | 22 ++-
.../dht/GridCacheDhtPreloadPerformanceTest.java | 176 +++++++++++++++++++
7 files changed, 222 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/663a2d99/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 421ec82..300a0e8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
@@ -923,9 +924,12 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
", old=" + clsHandlers.get(key) + ", new=" + c + ']';
}
- if (log != null && log.isDebugEnabled())
- log.debug("Registered cache communication handler [cacheId=" + cacheId + ", type=" + type +
- ", msgIdx=" + msgIdx + ", handler=" + c + ']');
+ IgniteLogger log0 = log;
+
+ if (log0 != null && log0.isTraceEnabled())
+ log0.trace(
+ "Registered cache communication handler [cacheId=" + cacheId + ", type=" + type +
+ ", msgIdx=" + msgIdx + ", handler=" + c + ']');
}
/**
@@ -978,15 +982,16 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
*/
@SuppressWarnings({"unchecked"})
public void addOrderedHandler(Object topic, IgniteBiInClosure<UUID, ? extends GridCacheMessage> c) {
+ IgniteLogger log0 = log;
if (orderedHandlers.putIfAbsent(topic, c) == null) {
cctx.gridIO().addMessageListener(topic, new OrderedMessageListener(
(IgniteBiInClosure<UUID, GridCacheMessage>)c));
- if (log != null && log.isDebugEnabled())
- log.debug("Registered ordered cache communication handler [topic=" + topic + ", handler=" + c + ']');
+ if (log0 != null && log0.isTraceEnabled())
+ log0.trace("Registered ordered cache communication handler [topic=" + topic + ", handler=" + c + ']');
}
- else if (log != null)
- U.warn(log, "Failed to register ordered cache communication handler because it is already " +
+ else if (log0 != null)
+ U.warn(log0, "Failed to register ordered cache communication handler because it is already " +
"registered for this topic [topic=" + topic + ", handler=" + c + ']');
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/663a2d99/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
index b588372..863ec8f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
@@ -45,7 +45,6 @@ public class GridDhtPartitionDemandMessage extends GridCacheMessage {
private long updateSeq;
/** Partition. */
- @GridToStringInclude
@GridDirectCollection(int.class)
private Collection<Integer> parts;
@@ -330,4 +329,4 @@ public class GridDhtPartitionDemandMessage extends GridCacheMessage {
return S.toString(GridDhtPartitionDemandMessage.class, this, "partCnt", parts.size(), "super",
super.toString());
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/663a2d99/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
index 3ccc5ae..cf10a13 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
@@ -404,7 +404,6 @@ public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements G
@Override public String toString() {
return S.toString(GridDhtPartitionSupplyMessage.class, this,
"size", size(),
- "parts", infos.keySet(),
"super", super.toString());
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/663a2d99/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index a1b03c1..3cf0eb0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -475,6 +475,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
busyLock.readLock().unlock();
}
+ // TODO remove
+ long inited;
+
/**
* Starts activity.
*
@@ -488,6 +491,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
if (isDone())
return;
+ inited = U.currentTimeMillis();
+
try {
// Wait for event to occur to make sure that discovery
// will return corresponding nodes.
@@ -800,7 +805,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
for (Map.Entry<IgniteTxKey, Collection<GridCacheMvccCandidate>> e : locks.entrySet())
U.warn(log, "Awaited locked entry [key=" + e.getKey() + ", mvcc=" + e.getValue() + ']');
-
+
dumpedObjects++;
}
}
@@ -1059,7 +1064,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
if (super.onDone(res, err) && !dummy && !forcePreload) {
if (log.isDebugEnabled())
- log.debug("Completed partition exchange [localNode=" + cctx.localNodeId() + ", exchange= " + this + ']');
+ log.debug("Completed partition exchange [localNode=" + cctx.localNodeId() + ", exchange= " + this +
+ "duration=" + duration() + ", durationFromInit=" + (U.currentTimeMillis() - inited) + ']');
initFut.onDone(err == null);
http://git-wip-us.apache.org/repos/asf/ignite/blob/663a2d99/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
index 50e2e41..5429538 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
@@ -43,7 +43,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
/** */
@GridToStringInclude
@GridDirectTransient
- private Map<Integer, GridDhtPartitionFullMap> parts = new HashMap<>();
+ private Map<Integer, GridDhtPartitionFullMap> parts;
/** */
private byte[] partsBytes;
@@ -68,7 +68,6 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
@NotNull AffinityTopologyVersion topVer) {
super(id, lastVer);
- assert parts != null;
assert id == null || topVer.equals(id.topologyVersion());
this.topVer = topVer;
@@ -86,6 +85,9 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
* @param fullMap Full partitions map.
*/
public void addFullPartitionsMap(int cacheId, GridDhtPartitionFullMap fullMap) {
+ if (parts == null)
+ parts = new HashMap<>();
+
if (!parts.containsKey(cacheId))
parts.put(cacheId, fullMap);
}
@@ -95,7 +97,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
@Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
super.prepareMarshal(ctx);
- if (partsBytes == null && parts != null)
+ if (parts != null && partsBytes == null)
partsBytes = ctx.marshaller().marshal(parts);
}
@@ -117,7 +119,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
@Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
super.finishUnmarshal(ctx, ldr);
- if (partsBytes != null)
+ if (partsBytes != null && parts == null)
parts = ctx.marshaller().unmarshal(partsBytes, ldr);
}
@@ -201,3 +203,4 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
"super", super.toString());
}
}
+
http://git-wip-us.apache.org/repos/asf/ignite/blob/663a2d99/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index c93d5af..5ea2c02 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -2050,11 +2050,21 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
if (locNode == null)
throw new IgniteCheckedException("Failed to create NIO client (local node is stopping)");
+ if (log.isDebugEnabled())
+ log.debug("Creating NIO client to node: " + node);
+
// If remote node has shared memory server enabled and has the same set of MACs
// then we are likely to run on the same host and shared memory communication could be tried.
if (shmemPort != null && U.sameMacs(locNode, node)) {
try {
- return createShmemClient(node, shmemPort);
+ GridCommunicationClient client = createShmemClient(
+ node,
+ shmemPort);
+
+ if (log.isDebugEnabled())
+ log.debug("Shmem client created: " + client);
+
+ return client;
}
catch (IgniteCheckedException e) {
if (e.hasCause(IpcOutOfSystemResourcesException.class))
@@ -2071,7 +2081,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
connectGate.enter();
try {
- return createTcpClient(node);
+ GridCommunicationClient client = createTcpClient(node);
+
+ if (log.isDebugEnabled())
+ log.debug("TCP client created: " + client);
+
+ return client;
}
finally {
connectGate.leave();
@@ -2453,9 +2468,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
throw errs;
}
- if (log.isDebugEnabled())
- log.debug("Created client: " + client);
-
return client;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/663a2d99/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadPerformanceTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadPerformanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadPerformanceTest.java
new file mode 100644
index 0000000..c00f316
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadPerformanceTest.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CacheRebalanceMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import java.util.Collection;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * Test cases for partitioned cache {@link GridDhtPreloader preloader}.
+ */
+public class GridCacheDhtPreloadPerformanceTest extends GridCommonAbstractTest {
+ /** */
+ private static final int THREAD_CNT = 2;
+
+ /** IP finder. */
+ private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration c = super.getConfiguration(gridName);
+
+ CacheConfiguration cc = defaultCacheConfiguration();
+
+ cc.setCacheMode(
+ CacheMode.PARTITIONED);
+ cc.setWriteSynchronizationMode(
+ CacheWriteSynchronizationMode.FULL_SYNC);
+ cc.setRebalanceMode(
+ CacheRebalanceMode.SYNC);
+ cc.setAffinity(new RendezvousAffinityFunction(false, 1300));
+ cc.setBackups(2);
+
+ CacheConfiguration cc1 = defaultCacheConfiguration();
+
+ cc1.setName("cc1");
+ cc1.setCacheMode(
+ CacheMode.PARTITIONED);
+ cc1.setWriteSynchronizationMode(
+ CacheWriteSynchronizationMode.FULL_SYNC);
+ cc1.setRebalanceMode(
+ CacheRebalanceMode.SYNC);
+ cc1.setAffinity(
+ new RendezvousAffinityFunction(
+ false,
+ 1300));
+ cc1.setBackups(2);
+
+ TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+ disco.setIpFinder(ipFinder);
+
+ c.setDiscoverySpi(disco);
+
+ c.setIgfsThreadPoolSize(1);
+ c.setSystemThreadPoolSize(2);
+ c.setPublicThreadPoolSize(2);
+ c.setManagementThreadPoolSize(1);
+ c.setUtilityCachePoolSize(2);
+ c.setPeerClassLoadingThreadPoolSize(1);
+
+ c.setCacheConfiguration(cc, cc1);
+
+ TcpCommunicationSpi comm = new TcpCommunicationSpi();
+
+ comm.setSharedMemoryPort(-1);
+
+ c.setCommunicationSpi(comm);
+
+ return c;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testConcurrentStartPerformance() throws Exception {
+// for (int i = 0; i < 10; i++) {
+// try {
+// startGrid(1);
+// startGrid(2);
+// startGrid(3);
+// }
+// finally {
+// G.stopAll(true);
+// }
+// }
+
+ multithreaded(
+ new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ long start = U.currentTimeMillis();
+
+ Ignite grid = startGrid(Thread.currentThread().getName());
+
+ System.out.println(">>> Time to start: " + (U.currentTimeMillis() - start) + ", topSize=" + grid.cluster().nodes().size());
+
+ return null;
+ }
+ },
+ THREAD_CNT);
+ }
+
+ /**
+ * Communication SPI that will count single partition update messages.
+ */
+ private static class TestCommunicationSpi extends TcpCommunicationSpi {
+ /** Recorded messages. */
+ private Collection<GridDhtPartitionsSingleMessage> sentMsgs = new ConcurrentLinkedQueue<>();
+
+ /** {@inheritDoc} */
+ @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackClosure)
+ throws IgniteSpiException {
+ recordMessage((GridIoMessage)msg);
+
+ super.sendMessage(node, msg, ackClosure);
+ }
+
+ /**
+ * @return Collection of sent messages.
+ */
+ public Collection<GridDhtPartitionsSingleMessage> sentMessages() {
+ return sentMsgs;
+ }
+
+ /**
+ * Adds message to a list if message is of correct type.
+ *
+ * @param msg Message.
+ */
+ private void recordMessage(GridIoMessage msg) {
+ if (msg.message() instanceof GridDhtPartitionsSingleMessage) {
+ GridDhtPartitionsSingleMessage partSingleMsg = (GridDhtPartitionsSingleMessage)msg.message();
+
+ sentMsgs.add(partSingleMsg);
+ }
+ }
+ }
+}