You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by yz...@apache.org on 2015/09/30 18:22:58 UTC

ignite git commit: debugging rebalance

Repository: ignite
Updated Branches:
  refs/heads/ignite-slow-rebal 79bc2eb07 -> 663a2d993


debugging rebalance


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/663a2d99
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/663a2d99
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/663a2d99

Branch: refs/heads/ignite-slow-rebal
Commit: 663a2d993b9750b34eb8eef89cebf5cea4db8c6d
Parents: 79bc2eb
Author: Yakov Zhdanov <yz...@gridgain.com>
Authored: Wed Sep 30 19:21:58 2015 +0300
Committer: Yakov Zhdanov <yz...@gridgain.com>
Committed: Wed Sep 30 19:21:58 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheIoManager.java    |  19 +-
 .../GridDhtPartitionDemandMessage.java          |   3 +-
 .../GridDhtPartitionSupplyMessage.java          |   3 +-
 .../GridDhtPartitionsExchangeFuture.java        |  10 +-
 .../preloader/GridDhtPartitionsFullMessage.java |  11 +-
 .../communication/tcp/TcpCommunicationSpi.java  |  22 ++-
 .../dht/GridCacheDhtPreloadPerformanceTest.java | 176 +++++++++++++++++++
 7 files changed, 222 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/663a2d99/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 421ec82..300a0e8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
@@ -923,9 +924,12 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
                     ", old=" + clsHandlers.get(key) + ", new=" + c + ']';
         }
 
-        if (log != null && log.isDebugEnabled())
-            log.debug("Registered cache communication handler [cacheId=" + cacheId + ", type=" + type +
-                ", msgIdx=" + msgIdx + ", handler=" + c + ']');
+        IgniteLogger log0 = log;
+
+        if (log0 != null && log0.isTraceEnabled())
+            log0.trace(
+                "Registered cache communication handler [cacheId=" + cacheId + ", type=" + type +
+                    ", msgIdx=" + msgIdx + ", handler=" + c + ']');
     }
 
     /**
@@ -978,15 +982,16 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
      */
     @SuppressWarnings({"unchecked"})
     public void addOrderedHandler(Object topic, IgniteBiInClosure<UUID, ? extends GridCacheMessage> c) {
+        IgniteLogger log0 = log;
         if (orderedHandlers.putIfAbsent(topic, c) == null) {
             cctx.gridIO().addMessageListener(topic, new OrderedMessageListener(
                 (IgniteBiInClosure<UUID, GridCacheMessage>)c));
 
-            if (log != null && log.isDebugEnabled())
-                log.debug("Registered ordered cache communication handler [topic=" + topic + ", handler=" + c + ']');
+            if (log0 != null && log0.isTraceEnabled())
+                log0.trace("Registered ordered cache communication handler [topic=" + topic + ", handler=" + c + ']');
         }
-        else if (log != null)
-            U.warn(log, "Failed to register ordered cache communication handler because it is already " +
+        else if (log0 != null)
+            U.warn(log0, "Failed to register ordered cache communication handler because it is already " +
                 "registered for this topic [topic=" + topic + ", handler=" + c + ']');
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/663a2d99/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
index b588372..863ec8f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
@@ -45,7 +45,6 @@ public class GridDhtPartitionDemandMessage extends GridCacheMessage {
     private long updateSeq;
 
     /** Partition. */
-    @GridToStringInclude
     @GridDirectCollection(int.class)
     private Collection<Integer> parts;
 
@@ -330,4 +329,4 @@ public class GridDhtPartitionDemandMessage extends GridCacheMessage {
         return S.toString(GridDhtPartitionDemandMessage.class, this, "partCnt", parts.size(), "super",
             super.toString());
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/663a2d99/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
index 3ccc5ae..cf10a13 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
@@ -404,7 +404,6 @@ public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements G
     @Override public String toString() {
         return S.toString(GridDhtPartitionSupplyMessage.class, this,
             "size", size(),
-            "parts", infos.keySet(),
             "super", super.toString());
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/663a2d99/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index a1b03c1..3cf0eb0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -475,6 +475,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
         busyLock.readLock().unlock();
     }
 
+    // TODO remove
+    long inited;
+
     /**
      * Starts activity.
      *
@@ -488,6 +491,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
             if (isDone())
                 return;
 
+            inited = U.currentTimeMillis();
+
             try {
                 // Wait for event to occur to make sure that discovery
                 // will return corresponding nodes.
@@ -800,7 +805,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
 
                             for (Map.Entry<IgniteTxKey, Collection<GridCacheMvccCandidate>> e : locks.entrySet())
                                 U.warn(log, "Awaited locked entry [key=" + e.getKey() + ", mvcc=" + e.getValue() + ']');
-                            
+
                             dumpedObjects++;
                         }
                     }
@@ -1059,7 +1064,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
 
         if (super.onDone(res, err) && !dummy && !forcePreload) {
             if (log.isDebugEnabled())
-                log.debug("Completed partition exchange [localNode=" + cctx.localNodeId() + ", exchange= " + this + ']');
+                log.debug("Completed partition exchange [localNode=" + cctx.localNodeId() + ", exchange= " + this +
+                    "duration=" + duration() + ", durationFromInit=" + (U.currentTimeMillis() - inited) + ']');
 
             initFut.onDone(err == null);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/663a2d99/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
index 50e2e41..5429538 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
@@ -43,7 +43,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
     /** */
     @GridToStringInclude
     @GridDirectTransient
-    private Map<Integer, GridDhtPartitionFullMap> parts = new HashMap<>();
+    private Map<Integer, GridDhtPartitionFullMap> parts;
 
     /** */
     private byte[] partsBytes;
@@ -68,7 +68,6 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
         @NotNull AffinityTopologyVersion topVer) {
         super(id, lastVer);
 
-        assert parts != null;
         assert id == null || topVer.equals(id.topologyVersion());
 
         this.topVer = topVer;
@@ -86,6 +85,9 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
      * @param fullMap Full partitions map.
      */
     public void addFullPartitionsMap(int cacheId, GridDhtPartitionFullMap fullMap) {
+        if (parts == null)
+            parts = new HashMap<>();
+
         if (!parts.containsKey(cacheId))
             parts.put(cacheId, fullMap);
     }
@@ -95,7 +97,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
     @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
         super.prepareMarshal(ctx);
 
-        if (partsBytes == null && parts != null)
+        if (parts != null && partsBytes == null)
             partsBytes = ctx.marshaller().marshal(parts);
     }
 
@@ -117,7 +119,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
     @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
         super.finishUnmarshal(ctx, ldr);
 
-        if (partsBytes != null)
+        if (partsBytes != null && parts == null)
             parts = ctx.marshaller().unmarshal(partsBytes, ldr);
     }
 
@@ -201,3 +203,4 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
             "super", super.toString());
     }
 }
+

http://git-wip-us.apache.org/repos/asf/ignite/blob/663a2d99/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index c93d5af..5ea2c02 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -2050,11 +2050,21 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         if (locNode == null)
             throw new IgniteCheckedException("Failed to create NIO client (local node is stopping)");
 
+        if (log.isDebugEnabled())
+            log.debug("Creating NIO client to node: " + node);
+
         // If remote node has shared memory server enabled and has the same set of MACs
         // then we are likely to run on the same host and shared memory communication could be tried.
         if (shmemPort != null && U.sameMacs(locNode, node)) {
             try {
-                return createShmemClient(node, shmemPort);
+                GridCommunicationClient client = createShmemClient(
+                    node,
+                    shmemPort);
+
+                if (log.isDebugEnabled())
+                    log.debug("Shmem client created: " + client);
+
+                return client;
             }
             catch (IgniteCheckedException e) {
                 if (e.hasCause(IpcOutOfSystemResourcesException.class))
@@ -2071,7 +2081,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         connectGate.enter();
 
         try {
-            return createTcpClient(node);
+            GridCommunicationClient client = createTcpClient(node);
+
+            if (log.isDebugEnabled())
+                log.debug("TCP client created: " + client);
+
+            return client;
         }
         finally {
             connectGate.leave();
@@ -2453,9 +2468,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             throw errs;
         }
 
-        if (log.isDebugEnabled())
-            log.debug("Created client: " + client);
-
         return client;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/663a2d99/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadPerformanceTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadPerformanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadPerformanceTest.java
new file mode 100644
index 0000000..c00f316
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadPerformanceTest.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CacheRebalanceMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import java.util.Collection;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * Test cases for partitioned cache {@link GridDhtPreloader preloader}.
+ */
+public class GridCacheDhtPreloadPerformanceTest extends GridCommonAbstractTest {
+    /** */
+    private static final int THREAD_CNT = 2;
+
+    /** IP finder. */
+    private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration c = super.getConfiguration(gridName);
+
+        CacheConfiguration cc = defaultCacheConfiguration();
+
+        cc.setCacheMode(
+            CacheMode.PARTITIONED);
+        cc.setWriteSynchronizationMode(
+            CacheWriteSynchronizationMode.FULL_SYNC);
+        cc.setRebalanceMode(
+            CacheRebalanceMode.SYNC);
+        cc.setAffinity(new RendezvousAffinityFunction(false, 1300));
+        cc.setBackups(2);
+
+        CacheConfiguration cc1 = defaultCacheConfiguration();
+
+        cc1.setName("cc1");
+        cc1.setCacheMode(
+            CacheMode.PARTITIONED);
+        cc1.setWriteSynchronizationMode(
+            CacheWriteSynchronizationMode.FULL_SYNC);
+        cc1.setRebalanceMode(
+            CacheRebalanceMode.SYNC);
+        cc1.setAffinity(
+            new RendezvousAffinityFunction(
+                false,
+                1300));
+        cc1.setBackups(2);
+
+        TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+        disco.setIpFinder(ipFinder);
+
+        c.setDiscoverySpi(disco);
+
+        c.setIgfsThreadPoolSize(1);
+        c.setSystemThreadPoolSize(2);
+        c.setPublicThreadPoolSize(2);
+        c.setManagementThreadPoolSize(1);
+        c.setUtilityCachePoolSize(2);
+        c.setPeerClassLoadingThreadPoolSize(1);
+
+        c.setCacheConfiguration(cc, cc1);
+
+        TcpCommunicationSpi comm = new TcpCommunicationSpi();
+
+        comm.setSharedMemoryPort(-1);
+
+        c.setCommunicationSpi(comm);
+
+        return c;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testConcurrentStartPerformance() throws Exception {
+//        for (int i = 0; i < 10; i++) {
+//            try {
+//                startGrid(1);
+//                startGrid(2);
+//                startGrid(3);
+//            }
+//            finally {
+//                G.stopAll(true);
+//            }
+//        }
+
+        multithreaded(
+            new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    long start = U.currentTimeMillis();
+
+                    Ignite grid = startGrid(Thread.currentThread().getName());
+
+                    System.out.println(">>> Time to start: " + (U.currentTimeMillis() - start) + ", topSize=" + grid.cluster().nodes().size());
+
+                    return null;
+                }
+            },
+            THREAD_CNT);
+    }
+
+    /**
+     * Communication SPI that will count single partition update messages.
+     */
+    private static class TestCommunicationSpi extends TcpCommunicationSpi {
+        /** Recorded messages. */
+        private Collection<GridDhtPartitionsSingleMessage> sentMsgs = new ConcurrentLinkedQueue<>();
+
+        /** {@inheritDoc} */
+        @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackClosure)
+            throws IgniteSpiException {
+            recordMessage((GridIoMessage)msg);
+
+            super.sendMessage(node, msg, ackClosure);
+        }
+
+        /**
+         * @return Collection of sent messages.
+         */
+        public Collection<GridDhtPartitionsSingleMessage> sentMessages() {
+            return sentMsgs;
+        }
+
+        /**
+         * Adds message to a list if message is of correct type.
+         *
+         * @param msg Message.
+         */
+        private void recordMessage(GridIoMessage msg) {
+            if (msg.message() instanceof GridDhtPartitionsSingleMessage) {
+                GridDhtPartitionsSingleMessage partSingleMsg = (GridDhtPartitionsSingleMessage)msg.message();
+
+                sentMsgs.add(partSingleMsg);
+            }
+        }
+    }
+}