You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2018/10/17 13:47:22 UTC

ignite git commit: IGNITE-9875 Optimized GridDhtPartitionsStateValidator - Fixes #4983.

Repository: ignite
Updated Branches:
  refs/heads/master 10c2b10e6 -> 33b9611e7


IGNITE-9875 Optimized GridDhtPartitionsStateValidator - Fixes #4983.

Signed-off-by: Alexey Goncharuk <al...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/33b9611e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/33b9611e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/33b9611e

Branch: refs/heads/master
Commit: 33b9611e72f03b2f6ec8c72600e7d42558a92339
Parents: 10c2b10
Author: Evgeny Stanilovskiy <es...@gridgain.com>
Authored: Wed Oct 17 16:35:32 2018 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Wed Oct 17 16:45:51 2018 +0300

----------------------------------------------------------------------
 modules/benchmarks/pom.xml                      |  12 +-
 ...ridDhtPartitionsStateValidatorBenchmark.java | 168 +++++++++++++++++++
 .../CachePartitionFullCountersMap.java          |  21 ---
 .../GridDhtPartitionsSingleMessage.java         |  29 ++++
 .../GridDhtPartitionsStateValidator.java        |  84 ++++++----
 5 files changed, 257 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/33b9611e/modules/benchmarks/pom.xml
----------------------------------------------------------------------
diff --git a/modules/benchmarks/pom.xml b/modules/benchmarks/pom.xml
index 1ea984c..06e0e50 100644
--- a/modules/benchmarks/pom.xml
+++ b/modules/benchmarks/pom.xml
@@ -62,6 +62,16 @@
             <version>${jmh.version}</version>
             <scope>provided</scope>
         </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+            <version>${mockito.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+            <version>${guava.version}</version>
+        </dependency>
     </dependencies>
 
     <build>
@@ -131,4 +141,4 @@
             </plugins>
         </pluginManagement>
     </build>
-</project>
\ No newline at end of file
+</project>

http://git-wip-us.apache.org/repos/asf/ignite/blob/33b9611e/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/misc/GridDhtPartitionsStateValidatorBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/misc/GridDhtPartitionsStateValidatorBenchmark.java b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/misc/GridDhtPartitionsStateValidatorBenchmark.java
new file mode 100644
index 0000000..151606d
--- /dev/null
+++ b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/misc/GridDhtPartitionsStateValidatorBenchmark.java
@@ -0,0 +1,168 @@
+package org.apache.ignite.internal.benchmarks.jmh.misc;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.apache.ignite.internal.benchmarks.jmh.JmhAbstractBenchmark;
+import org.apache.ignite.internal.benchmarks.jmh.runner.JmhIdeBenchmarkRunner;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionsStateValidator;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.jetbrains.annotations.Nullable;
+import org.mockito.Matchers;
+import org.mockito.Mockito;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.IntStream;
+
+import static org.openjdk.jmh.annotations.Scope.Thread;
+
+/** */
+@State(Scope.Benchmark)
+public class GridDhtPartitionsStateValidatorBenchmark extends JmhAbstractBenchmark {
+    /** */
+    @State(Thread)
+    public static class Context {
+        /** */
+        private final UUID localNodeId = UUID.randomUUID();
+
+        /** */
+        private GridCacheSharedContext cctxMock;
+
+        /** */
+        private GridDhtPartitionTopology topologyMock;
+
+        /** */
+        private GridDhtPartitionsStateValidator validator;
+
+        /** */
+        private Map<UUID, GridDhtPartitionsSingleMessage> messages = new HashMap<>();
+
+        /** */
+        private UUID ignoreNode = UUID.randomUUID();
+
+        /** */
+        private static final int NODES = 3;
+
+        /** */
+        private static final int PARTS = 100;
+
+        /**
+         * @return Partition mock with specified {@code id}, {@code updateCounter} and {@code size}.
+         */
+        private GridDhtLocalPartition partitionMock(int id, long updateCounter, long size) {
+            GridDhtLocalPartition partitionMock = Mockito.mock(GridDhtLocalPartition.class);
+            Mockito.when(partitionMock.id()).thenReturn(id);
+            Mockito.when(partitionMock.updateCounter()).thenReturn(updateCounter);
+            Mockito.when(partitionMock.fullSize()).thenReturn(size);
+            Mockito.when(partitionMock.state()).thenReturn(GridDhtPartitionState.OWNING);
+            return partitionMock;
+        }
+
+        /**
+         * @param countersMap Update counters map.
+         * @param sizesMap Sizes map.
+         * @return Message with specified {@code countersMap} and {@code sizeMap}.
+         */
+        private GridDhtPartitionsSingleMessage from(@Nullable Map<Integer, T2<Long, Long>> countersMap, @Nullable Map<Integer, Long> sizesMap) {
+            GridDhtPartitionsSingleMessage msg = new GridDhtPartitionsSingleMessage();
+            if (countersMap != null)
+                msg.addPartitionUpdateCounters(0, countersMap);
+            if (sizesMap != null)
+                msg.addPartitionSizes(0, sizesMap);
+            return msg;
+        }
+
+        /** */
+        @Setup
+        public void setup() {
+            // Prepare mocks.
+            cctxMock = Mockito.mock(GridCacheSharedContext.class);
+            Mockito.when(cctxMock.localNodeId()).thenReturn(localNodeId);
+
+            topologyMock = Mockito.mock(GridDhtPartitionTopology.class);
+            Mockito.when(topologyMock.partitionState(Matchers.any(), Matchers.anyInt())).thenReturn(GridDhtPartitionState.OWNING);
+            Mockito.when(topologyMock.groupId()).thenReturn(0);
+
+            Mockito.when(topologyMock.partitions()).thenReturn(PARTS);
+
+            List<GridDhtLocalPartition> localPartitions = Lists.newArrayList();
+
+            Map<Integer, T2<Long, Long>> updateCountersMap = new HashMap<>();
+
+            Map<Integer, Long> cacheSizesMap = new HashMap<>();
+
+            IntStream.range(0, PARTS).forEach(k -> { localPartitions.add(partitionMock(k, k + 1, k + 1));
+                long us = k > 20 && k <= 30 ? 0 :k + 2L;
+                updateCountersMap.put(k, new T2<>(k + 2L, us));
+                cacheSizesMap.put(k, us); });
+
+            Mockito.when(topologyMock.localPartitions()).thenReturn(localPartitions);
+            Mockito.when(topologyMock.currentLocalPartitions()).thenReturn(localPartitions);
+
+            // Form single messages map.
+            Map<UUID, GridDhtPartitionsSingleMessage> messages = new HashMap<>();
+
+            for (int n = 0; n < NODES; ++n) {
+                UUID remoteNode = UUID.randomUUID();
+
+                messages.put(remoteNode, from(updateCountersMap, cacheSizesMap));
+            }
+
+            messages.put(ignoreNode, from(updateCountersMap, cacheSizesMap));
+
+            validator = new GridDhtPartitionsStateValidator(cctxMock);
+        }
+    }
+
+    /** */
+    @Benchmark
+    public void testValidatePartitionsUpdateCounters(Context context) {
+        context.validator.validatePartitionsUpdateCounters(context.topologyMock,
+                context.messages, Sets.newHashSet(context.ignoreNode));
+    }
+
+    /** */
+    @Benchmark
+    public void testValidatePartitionsSizes(Context context) {
+        context.validator.validatePartitionsSizes(context.topologyMock, context
+                .messages, Sets.newHashSet(context.ignoreNode));
+    }
+
+    /**
+     * Run benchmarks.
+     *
+     * @param args Arguments.
+     * @throws Exception If failed.
+     */
+    public static void main(String[] args) throws Exception {
+        run(1);
+    }
+
+    /**
+     * Run benchmark.
+     *
+     * @param threads Amount of threads.
+     * @throws Exception If failed.
+     */
+    private static void run(int threads) throws Exception {
+        JmhIdeBenchmarkRunner.create()
+                .forks(1)
+                .threads(threads)
+                .warmupIterations(5)
+                .measurementIterations(10)
+                .benchmarks(GridDhtPartitionsStateValidatorBenchmark.class.getSimpleName())
+                .jvmArguments("-XX:+UseG1GC", "-Xms4g", "-Xmx4g")
+                .run();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/33b9611e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionFullCountersMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionFullCountersMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionFullCountersMap.java
index 2d5eec3..008c276 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionFullCountersMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionFullCountersMap.java
@@ -94,27 +94,6 @@ public class CachePartitionFullCountersMap implements Serializable {
     }
 
     /**
-     * Creates submap for provided partition IDs.
-     *
-     * @param parts Partition IDs.
-     * @return Partial counters map.
-     */
-    public CachePartitionPartialCountersMap subMap(Set<Integer> parts) {
-        CachePartitionPartialCountersMap res = new CachePartitionPartialCountersMap(parts.size());
-
-        for (int p = 0; p < updCntrs.length; p++) {
-            if (!parts.contains(p))
-                continue;
-
-            res.add(p, initialUpdCntrs[p], updCntrs[p]);
-        }
-
-        assert res.size() == parts.size();
-
-        return res;
-    }
-
-    /**
      * Clears full counters map.
      */
     public void clear() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/33b9611e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
index 088fb31..7dd34f8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
@@ -246,6 +246,35 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
     }
 
     /**
+     * @param grpId Cache group ID.
+     * @param partsCnt Total cache partitions.
+     * @return Partition update counters.
+     */
+    @SuppressWarnings("unchecked")
+    public CachePartitionPartialCountersMap partitionUpdateCountersUnsorted(int grpId, int partsCnt) {
+        Object res = partCntrs == null ? null : partCntrs.get(grpId);
+
+        if (res == null)
+            return CachePartitionPartialCountersMap.EMPTY;
+
+        if (res instanceof CachePartitionPartialCountersMap)
+            return (CachePartitionPartialCountersMap)res;
+
+        assert res instanceof Map : res;
+
+        Map<Integer, T2<Long, Long>> map = (Map<Integer, T2<Long, Long>>)res;
+
+        CachePartitionPartialCountersMap partCounersMap = new CachePartitionPartialCountersMap(partsCnt);
+
+        for (Map.Entry<Integer, T2<Long, Long>> e : map.entrySet())
+            partCounersMap.add(e.getKey(), e.getValue().get1(), e.getValue().get2());
+
+        partCounersMap.trim();
+
+        return partCounersMap;
+    }
+
+    /**
      * Adds partition sizes map for specified {@code grpId} to the current message.
      *
      * @param grpId Group id.

http://git-wip-us.apache.org/repos/asf/ignite/blob/33b9611e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionsStateValidator.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionsStateValidator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionsStateValidator.java
index 544d453..63fe926 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionsStateValidator.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionsStateValidator.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.cache.distributed.dht.topology;
 
+import java.util.AbstractMap;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -33,7 +34,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Cac
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccUtils;
-import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.SB;
 import org.apache.ignite.lang.IgniteProductVersion;
 import org.jetbrains.annotations.Nullable;
@@ -78,14 +78,16 @@ public class GridDhtPartitionsStateValidator {
         final Set<UUID> ignoringNodes = new HashSet<>();
 
         // Ignore just joined nodes.
-        for (DiscoveryEvent evt : fut.events().events())
+        for (DiscoveryEvent evt : fut.events().events()) {
             if (evt.type() == EVT_NODE_JOINED)
                 ignoringNodes.add(evt.eventNode().id());
+        }
 
         AffinityTopologyVersion topVer = fut.context().events().topologyVersion();
 
         // Validate update counters.
         Map<Integer, Map<UUID, Long>> result = validatePartitionsUpdateCounters(top, messages, ignoringNodes);
+
         if (!result.isEmpty())
             throw new IgniteCheckedException("Partitions update counters are inconsistent for " + fold(topVer, result));
 
@@ -110,13 +112,16 @@ public class GridDhtPartitionsStateValidator {
      *
      * @param top Topology to validate.
      * @param nodeId Node which sent single message.
-     * @param singleMsg Single message.
+     * @param countersMap Counters map.
+     * @param sizesMap Sizes map.
      * @return Set of partition ids should be excluded from validation.
      */
-    @Nullable private Set<Integer> shouldIgnore(GridDhtPartitionTopology top, UUID nodeId, GridDhtPartitionsSingleMessage singleMsg) {
-        CachePartitionPartialCountersMap countersMap = singleMsg.partitionUpdateCounters(top.groupId(), top.partitions());
-        Map<Integer, Long> sizesMap = singleMsg.partitionSizes(top.groupId());
-
+    @Nullable private Set<Integer> shouldIgnore(
+        GridDhtPartitionTopology top,
+        UUID nodeId,
+        CachePartitionPartialCountersMap countersMap,
+        Map<Integer, Long> sizesMap
+    ) {
         Set<Integer> ignore = null;
 
         for (int i = 0; i < countersMap.size(); i++) {
@@ -155,14 +160,14 @@ public class GridDhtPartitionsStateValidator {
      * @return Invalid partitions map with following structure: (partId, (nodeId, updateCounter)).
      * If map is empty validation is successful.
      */
-     public Map<Integer, Map<UUID, Long>> validatePartitionsUpdateCounters(
-            GridDhtPartitionTopology top,
-            Map<UUID, GridDhtPartitionsSingleMessage> messages,
-            Set<UUID> ignoringNodes
-     ) {
+    public Map<Integer, Map<UUID, Long>> validatePartitionsUpdateCounters(
+        GridDhtPartitionTopology top,
+        Map<UUID, GridDhtPartitionsSingleMessage> messages,
+        Set<UUID> ignoringNodes
+    ) {
         Map<Integer, Map<UUID, Long>> invalidPartitions = new HashMap<>();
 
-        Map<Integer, T2<UUID, Long>> updateCountersAndNodesByPartitions = new HashMap<>();
+        Map<Integer, AbstractMap.Entry<UUID, Long>> updateCountersAndNodesByPartitions = new HashMap<>();
 
         // Populate counters statistics from local node partitions.
         for (GridDhtLocalPartition part : top.currentLocalPartitions()) {
@@ -172,7 +177,7 @@ public class GridDhtPartitionsStateValidator {
             if (part.updateCounter() == 0 && part.fullSize() == 0)
                 continue;
 
-            updateCountersAndNodesByPartitions.put(part.id(), new T2<>(cctx.localNodeId(), part.updateCounter()));
+            updateCountersAndNodesByPartitions.put(part.id(), new AbstractMap.SimpleEntry<>(cctx.localNodeId(), part.updateCounter()));
         }
 
         int partitions = top.partitions();
@@ -183,9 +188,13 @@ public class GridDhtPartitionsStateValidator {
             if (ignoringNodes.contains(nodeId))
                 continue;
 
-            CachePartitionPartialCountersMap countersMap = e.getValue().partitionUpdateCounters(top.groupId(), partitions);
+            final GridDhtPartitionsSingleMessage message = e.getValue();
 
-            Set<Integer> ignorePartitions = shouldIgnore(top, nodeId, e.getValue());
+            CachePartitionPartialCountersMap countersMap = message.partitionUpdateCountersUnsorted(top.groupId(), partitions);
+
+            Map<Integer, Long> sizesMap = message.partitionSizes(top.groupId());
+
+            Set<Integer> ignorePartitions = shouldIgnore(top, nodeId, countersMap, sizesMap);
 
             for (int i = 0; i < countersMap.size(); i++) {
                 int p = countersMap.partitionAt(i);
@@ -211,14 +220,14 @@ public class GridDhtPartitionsStateValidator {
      * @return Invalid partitions map with following structure: (partId, (nodeId, cacheSize)).
      * If map is empty validation is successful.
      */
-     public Map<Integer, Map<UUID, Long>> validatePartitionsSizes(
-            GridDhtPartitionTopology top,
-            Map<UUID, GridDhtPartitionsSingleMessage> messages,
-            Set<UUID> ignoringNodes
-     ) {
+    public Map<Integer, Map<UUID, Long>> validatePartitionsSizes(
+        GridDhtPartitionTopology top,
+        Map<UUID, GridDhtPartitionsSingleMessage> messages,
+        Set<UUID> ignoringNodes
+    ) {
         Map<Integer, Map<UUID, Long>> invalidPartitions = new HashMap<>();
 
-        Map<Integer, T2<UUID, Long>> sizesAndNodesByPartitions = new HashMap<>();
+        Map<Integer, AbstractMap.Entry<UUID, Long>> sizesAndNodesByPartitions = new HashMap<>();
 
         // Populate sizes statistics from local node partitions.
         for (GridDhtLocalPartition part : top.currentLocalPartitions()) {
@@ -228,7 +237,7 @@ public class GridDhtPartitionsStateValidator {
             if (part.updateCounter() == 0 && part.fullSize() == 0)
                 continue;
 
-            sizesAndNodesByPartitions.put(part.id(), new T2<>(cctx.localNodeId(), part.fullSize()));
+            sizesAndNodesByPartitions.put(part.id(), new AbstractMap.SimpleEntry<>(cctx.localNodeId(), part.fullSize()));
         }
 
         int partitions = top.partitions();
@@ -239,10 +248,13 @@ public class GridDhtPartitionsStateValidator {
             if (ignoringNodes.contains(nodeId))
                 continue;
 
-            CachePartitionPartialCountersMap countersMap = e.getValue().partitionUpdateCounters(top.groupId(), partitions);
-            Map<Integer, Long> sizesMap = e.getValue().partitionSizes(top.groupId());
+            final GridDhtPartitionsSingleMessage message = e.getValue();
+
+            CachePartitionPartialCountersMap countersMap = message.partitionUpdateCountersUnsorted(top.groupId(), partitions);
+
+            Map<Integer, Long> sizesMap = message.partitionSizes(top.groupId());
 
-            Set<Integer> ignorePartitions = shouldIgnore(top, nodeId, e.getValue());
+            Set<Integer> ignorePartitions = shouldIgnore(top, nodeId, countersMap, sizesMap);
 
             for (int i = 0; i < countersMap.size(); i++) {
                 int p = countersMap.partitionAt(i);
@@ -269,20 +281,22 @@ public class GridDhtPartitionsStateValidator {
      * @param node Node id.
      * @param counter Counter value reported by {@code node}.
      */
-    private void process(Map<Integer, Map<UUID, Long>> invalidPartitions,
-                         Map<Integer, T2<UUID, Long>> countersAndNodes,
-                         int part,
-                         UUID node,
-                         long counter) {
-        T2<UUID, Long> existingData = countersAndNodes.get(part);
+    private void process(
+        Map<Integer, Map<UUID, Long>> invalidPartitions,
+        Map<Integer, AbstractMap.Entry<UUID, Long>> countersAndNodes,
+        int part,
+        UUID node,
+        long counter
+    ) {
+        AbstractMap.Entry<UUID, Long> existingData = countersAndNodes.get(part);
 
         if (existingData == null)
-            countersAndNodes.put(part, new T2<>(node, counter));
+            countersAndNodes.put(part, new AbstractMap.SimpleEntry<>(node, counter));
 
-        if (existingData != null && counter != existingData.get2()) {
+        if (existingData != null && counter != existingData.getValue()) {
             if (!invalidPartitions.containsKey(part)) {
                 Map<UUID, Long> map = new HashMap<>();
-                map.put(existingData.get1(), existingData.get2());
+                map.put(existingData.getKey(), existingData.getValue());
                 invalidPartitions.put(part, map);
             }