You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2018/10/17 13:47:22 UTC
ignite git commit: IGNITE-9875 Optimized
GridDhtPartitionsStateValidator - Fixes #4983.
Repository: ignite
Updated Branches:
refs/heads/master 10c2b10e6 -> 33b9611e7
IGNITE-9875 Optimized GridDhtPartitionsStateValidator - Fixes #4983.
Signed-off-by: Alexey Goncharuk <al...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/33b9611e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/33b9611e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/33b9611e
Branch: refs/heads/master
Commit: 33b9611e72f03b2f6ec8c72600e7d42558a92339
Parents: 10c2b10
Author: Evgeny Stanilovskiy <es...@gridgain.com>
Authored: Wed Oct 17 16:35:32 2018 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Wed Oct 17 16:45:51 2018 +0300
----------------------------------------------------------------------
modules/benchmarks/pom.xml | 12 +-
...ridDhtPartitionsStateValidatorBenchmark.java | 168 +++++++++++++++++++
.../CachePartitionFullCountersMap.java | 21 ---
.../GridDhtPartitionsSingleMessage.java | 29 ++++
.../GridDhtPartitionsStateValidator.java | 84 ++++++----
5 files changed, 257 insertions(+), 57 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/33b9611e/modules/benchmarks/pom.xml
----------------------------------------------------------------------
diff --git a/modules/benchmarks/pom.xml b/modules/benchmarks/pom.xml
index 1ea984c..06e0e50 100644
--- a/modules/benchmarks/pom.xml
+++ b/modules/benchmarks/pom.xml
@@ -62,6 +62,16 @@
<version>${jmh.version}</version>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <version>${mockito.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>${guava.version}</version>
+ </dependency>
</dependencies>
<build>
@@ -131,4 +141,4 @@
</plugins>
</pluginManagement>
</build>
-</project>
\ No newline at end of file
+</project>
http://git-wip-us.apache.org/repos/asf/ignite/blob/33b9611e/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/misc/GridDhtPartitionsStateValidatorBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/misc/GridDhtPartitionsStateValidatorBenchmark.java b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/misc/GridDhtPartitionsStateValidatorBenchmark.java
new file mode 100644
index 0000000..151606d
--- /dev/null
+++ b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/misc/GridDhtPartitionsStateValidatorBenchmark.java
@@ -0,0 +1,168 @@
+package org.apache.ignite.internal.benchmarks.jmh.misc;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.apache.ignite.internal.benchmarks.jmh.JmhAbstractBenchmark;
+import org.apache.ignite.internal.benchmarks.jmh.runner.JmhIdeBenchmarkRunner;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionsStateValidator;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.jetbrains.annotations.Nullable;
+import org.mockito.Matchers;
+import org.mockito.Mockito;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.IntStream;
+
+import static org.openjdk.jmh.annotations.Scope.Thread;
+
+/** */
+@State(Scope.Benchmark)
+public class GridDhtPartitionsStateValidatorBenchmark extends JmhAbstractBenchmark {
+ /** */
+ @State(Thread)
+ public static class Context {
+ /** */
+ private final UUID localNodeId = UUID.randomUUID();
+
+ /** */
+ private GridCacheSharedContext cctxMock;
+
+ /** */
+ private GridDhtPartitionTopology topologyMock;
+
+ /** */
+ private GridDhtPartitionsStateValidator validator;
+
+ /** */
+ private Map<UUID, GridDhtPartitionsSingleMessage> messages = new HashMap<>();
+
+ /** */
+ private UUID ignoreNode = UUID.randomUUID();
+
+ /** */
+ private static final int NODES = 3;
+
+ /** */
+ private static final int PARTS = 100;
+
+ /**
+ * @return Partition mock with specified {@code id}, {@code updateCounter} and {@code size}.
+ */
+ private GridDhtLocalPartition partitionMock(int id, long updateCounter, long size) {
+ GridDhtLocalPartition partitionMock = Mockito.mock(GridDhtLocalPartition.class);
+ Mockito.when(partitionMock.id()).thenReturn(id);
+ Mockito.when(partitionMock.updateCounter()).thenReturn(updateCounter);
+ Mockito.when(partitionMock.fullSize()).thenReturn(size);
+ Mockito.when(partitionMock.state()).thenReturn(GridDhtPartitionState.OWNING);
+ return partitionMock;
+ }
+
+ /**
+ * @param countersMap Update counters map.
+ * @param sizesMap Sizes map.
+ * @return Message with specified {@code countersMap} and {@code sizeMap}.
+ */
+ private GridDhtPartitionsSingleMessage from(@Nullable Map<Integer, T2<Long, Long>> countersMap, @Nullable Map<Integer, Long> sizesMap) {
+ GridDhtPartitionsSingleMessage msg = new GridDhtPartitionsSingleMessage();
+ if (countersMap != null)
+ msg.addPartitionUpdateCounters(0, countersMap);
+ if (sizesMap != null)
+ msg.addPartitionSizes(0, sizesMap);
+ return msg;
+ }
+
+ /** */
+ @Setup
+ public void setup() {
+ // Prepare mocks.
+ cctxMock = Mockito.mock(GridCacheSharedContext.class);
+ Mockito.when(cctxMock.localNodeId()).thenReturn(localNodeId);
+
+ topologyMock = Mockito.mock(GridDhtPartitionTopology.class);
+ Mockito.when(topologyMock.partitionState(Matchers.any(), Matchers.anyInt())).thenReturn(GridDhtPartitionState.OWNING);
+ Mockito.when(topologyMock.groupId()).thenReturn(0);
+
+ Mockito.when(topologyMock.partitions()).thenReturn(PARTS);
+
+ List<GridDhtLocalPartition> localPartitions = Lists.newArrayList();
+
+ Map<Integer, T2<Long, Long>> updateCountersMap = new HashMap<>();
+
+ Map<Integer, Long> cacheSizesMap = new HashMap<>();
+
+ IntStream.range(0, PARTS).forEach(k -> { localPartitions.add(partitionMock(k, k + 1, k + 1));
+ long us = k > 20 && k <= 30 ? 0 :k + 2L;
+ updateCountersMap.put(k, new T2<>(k + 2L, us));
+ cacheSizesMap.put(k, us); });
+
+ Mockito.when(topologyMock.localPartitions()).thenReturn(localPartitions);
+ Mockito.when(topologyMock.currentLocalPartitions()).thenReturn(localPartitions);
+
+ // Form single messages map.
+ Map<UUID, GridDhtPartitionsSingleMessage> messages = new HashMap<>();
+
+ for (int n = 0; n < NODES; ++n) {
+ UUID remoteNode = UUID.randomUUID();
+
+ messages.put(remoteNode, from(updateCountersMap, cacheSizesMap));
+ }
+
+ messages.put(ignoreNode, from(updateCountersMap, cacheSizesMap));
+
+ validator = new GridDhtPartitionsStateValidator(cctxMock);
+ }
+ }
+
+ /** */
+ @Benchmark
+ public void testValidatePartitionsUpdateCounters(Context context) {
+ context.validator.validatePartitionsUpdateCounters(context.topologyMock,
+ context.messages, Sets.newHashSet(context.ignoreNode));
+ }
+
+ /** */
+ @Benchmark
+ public void testValidatePartitionsSizes(Context context) {
+ context.validator.validatePartitionsSizes(context.topologyMock, context
+ .messages, Sets.newHashSet(context.ignoreNode));
+ }
+
+ /**
+ * Run benchmarks.
+ *
+ * @param args Arguments.
+ * @throws Exception If failed.
+ */
+ public static void main(String[] args) throws Exception {
+ run(1);
+ }
+
+ /**
+ * Run benchmark.
+ *
+ * @param threads Amount of threads.
+ * @throws Exception If failed.
+ */
+ private static void run(int threads) throws Exception {
+ JmhIdeBenchmarkRunner.create()
+ .forks(1)
+ .threads(threads)
+ .warmupIterations(5)
+ .measurementIterations(10)
+ .benchmarks(GridDhtPartitionsStateValidatorBenchmark.class.getSimpleName())
+ .jvmArguments("-XX:+UseG1GC", "-Xms4g", "-Xmx4g")
+ .run();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/33b9611e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionFullCountersMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionFullCountersMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionFullCountersMap.java
index 2d5eec3..008c276 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionFullCountersMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionFullCountersMap.java
@@ -94,27 +94,6 @@ public class CachePartitionFullCountersMap implements Serializable {
}
/**
- * Creates submap for provided partition IDs.
- *
- * @param parts Partition IDs.
- * @return Partial counters map.
- */
- public CachePartitionPartialCountersMap subMap(Set<Integer> parts) {
- CachePartitionPartialCountersMap res = new CachePartitionPartialCountersMap(parts.size());
-
- for (int p = 0; p < updCntrs.length; p++) {
- if (!parts.contains(p))
- continue;
-
- res.add(p, initialUpdCntrs[p], updCntrs[p]);
- }
-
- assert res.size() == parts.size();
-
- return res;
- }
-
- /**
* Clears full counters map.
*/
public void clear() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/33b9611e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
index 088fb31..7dd34f8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
@@ -246,6 +246,35 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
}
/**
+ * @param grpId Cache group ID.
+ * @param partsCnt Total cache partitions.
+ * @return Partition update counters.
+ */
+ @SuppressWarnings("unchecked")
+ public CachePartitionPartialCountersMap partitionUpdateCountersUnsorted(int grpId, int partsCnt) {
+ Object res = partCntrs == null ? null : partCntrs.get(grpId);
+
+ if (res == null)
+ return CachePartitionPartialCountersMap.EMPTY;
+
+ if (res instanceof CachePartitionPartialCountersMap)
+ return (CachePartitionPartialCountersMap)res;
+
+ assert res instanceof Map : res;
+
+ Map<Integer, T2<Long, Long>> map = (Map<Integer, T2<Long, Long>>)res;
+
+ CachePartitionPartialCountersMap partCounersMap = new CachePartitionPartialCountersMap(partsCnt);
+
+ for (Map.Entry<Integer, T2<Long, Long>> e : map.entrySet())
+ partCounersMap.add(e.getKey(), e.getValue().get1(), e.getValue().get2());
+
+ partCounersMap.trim();
+
+ return partCounersMap;
+ }
+
+ /**
* Adds partition sizes map for specified {@code grpId} to the current message.
*
* @param grpId Group id.
http://git-wip-us.apache.org/repos/asf/ignite/blob/33b9611e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionsStateValidator.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionsStateValidator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionsStateValidator.java
index 544d453..63fe926 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionsStateValidator.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionsStateValidator.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.cache.distributed.dht.topology;
+import java.util.AbstractMap;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@@ -33,7 +34,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Cac
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
import org.apache.ignite.internal.processors.cache.mvcc.MvccUtils;
-import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.SB;
import org.apache.ignite.lang.IgniteProductVersion;
import org.jetbrains.annotations.Nullable;
@@ -78,14 +78,16 @@ public class GridDhtPartitionsStateValidator {
final Set<UUID> ignoringNodes = new HashSet<>();
// Ignore just joined nodes.
- for (DiscoveryEvent evt : fut.events().events())
+ for (DiscoveryEvent evt : fut.events().events()) {
if (evt.type() == EVT_NODE_JOINED)
ignoringNodes.add(evt.eventNode().id());
+ }
AffinityTopologyVersion topVer = fut.context().events().topologyVersion();
// Validate update counters.
Map<Integer, Map<UUID, Long>> result = validatePartitionsUpdateCounters(top, messages, ignoringNodes);
+
if (!result.isEmpty())
throw new IgniteCheckedException("Partitions update counters are inconsistent for " + fold(topVer, result));
@@ -110,13 +112,16 @@ public class GridDhtPartitionsStateValidator {
*
* @param top Topology to validate.
* @param nodeId Node which sent single message.
- * @param singleMsg Single message.
+ * @param countersMap Counters map.
+ * @param sizesMap Sizes map.
* @return Set of partition ids should be excluded from validation.
*/
- @Nullable private Set<Integer> shouldIgnore(GridDhtPartitionTopology top, UUID nodeId, GridDhtPartitionsSingleMessage singleMsg) {
- CachePartitionPartialCountersMap countersMap = singleMsg.partitionUpdateCounters(top.groupId(), top.partitions());
- Map<Integer, Long> sizesMap = singleMsg.partitionSizes(top.groupId());
-
+ @Nullable private Set<Integer> shouldIgnore(
+ GridDhtPartitionTopology top,
+ UUID nodeId,
+ CachePartitionPartialCountersMap countersMap,
+ Map<Integer, Long> sizesMap
+ ) {
Set<Integer> ignore = null;
for (int i = 0; i < countersMap.size(); i++) {
@@ -155,14 +160,14 @@ public class GridDhtPartitionsStateValidator {
* @return Invalid partitions map with following structure: (partId, (nodeId, updateCounter)).
* If map is empty validation is successful.
*/
- public Map<Integer, Map<UUID, Long>> validatePartitionsUpdateCounters(
- GridDhtPartitionTopology top,
- Map<UUID, GridDhtPartitionsSingleMessage> messages,
- Set<UUID> ignoringNodes
- ) {
+ public Map<Integer, Map<UUID, Long>> validatePartitionsUpdateCounters(
+ GridDhtPartitionTopology top,
+ Map<UUID, GridDhtPartitionsSingleMessage> messages,
+ Set<UUID> ignoringNodes
+ ) {
Map<Integer, Map<UUID, Long>> invalidPartitions = new HashMap<>();
- Map<Integer, T2<UUID, Long>> updateCountersAndNodesByPartitions = new HashMap<>();
+ Map<Integer, AbstractMap.Entry<UUID, Long>> updateCountersAndNodesByPartitions = new HashMap<>();
// Populate counters statistics from local node partitions.
for (GridDhtLocalPartition part : top.currentLocalPartitions()) {
@@ -172,7 +177,7 @@ public class GridDhtPartitionsStateValidator {
if (part.updateCounter() == 0 && part.fullSize() == 0)
continue;
- updateCountersAndNodesByPartitions.put(part.id(), new T2<>(cctx.localNodeId(), part.updateCounter()));
+ updateCountersAndNodesByPartitions.put(part.id(), new AbstractMap.SimpleEntry<>(cctx.localNodeId(), part.updateCounter()));
}
int partitions = top.partitions();
@@ -183,9 +188,13 @@ public class GridDhtPartitionsStateValidator {
if (ignoringNodes.contains(nodeId))
continue;
- CachePartitionPartialCountersMap countersMap = e.getValue().partitionUpdateCounters(top.groupId(), partitions);
+ final GridDhtPartitionsSingleMessage message = e.getValue();
- Set<Integer> ignorePartitions = shouldIgnore(top, nodeId, e.getValue());
+ CachePartitionPartialCountersMap countersMap = message.partitionUpdateCountersUnsorted(top.groupId(), partitions);
+
+ Map<Integer, Long> sizesMap = message.partitionSizes(top.groupId());
+
+ Set<Integer> ignorePartitions = shouldIgnore(top, nodeId, countersMap, sizesMap);
for (int i = 0; i < countersMap.size(); i++) {
int p = countersMap.partitionAt(i);
@@ -211,14 +220,14 @@ public class GridDhtPartitionsStateValidator {
* @return Invalid partitions map with following structure: (partId, (nodeId, cacheSize)).
* If map is empty validation is successful.
*/
- public Map<Integer, Map<UUID, Long>> validatePartitionsSizes(
- GridDhtPartitionTopology top,
- Map<UUID, GridDhtPartitionsSingleMessage> messages,
- Set<UUID> ignoringNodes
- ) {
+ public Map<Integer, Map<UUID, Long>> validatePartitionsSizes(
+ GridDhtPartitionTopology top,
+ Map<UUID, GridDhtPartitionsSingleMessage> messages,
+ Set<UUID> ignoringNodes
+ ) {
Map<Integer, Map<UUID, Long>> invalidPartitions = new HashMap<>();
- Map<Integer, T2<UUID, Long>> sizesAndNodesByPartitions = new HashMap<>();
+ Map<Integer, AbstractMap.Entry<UUID, Long>> sizesAndNodesByPartitions = new HashMap<>();
// Populate sizes statistics from local node partitions.
for (GridDhtLocalPartition part : top.currentLocalPartitions()) {
@@ -228,7 +237,7 @@ public class GridDhtPartitionsStateValidator {
if (part.updateCounter() == 0 && part.fullSize() == 0)
continue;
- sizesAndNodesByPartitions.put(part.id(), new T2<>(cctx.localNodeId(), part.fullSize()));
+ sizesAndNodesByPartitions.put(part.id(), new AbstractMap.SimpleEntry<>(cctx.localNodeId(), part.fullSize()));
}
int partitions = top.partitions();
@@ -239,10 +248,13 @@ public class GridDhtPartitionsStateValidator {
if (ignoringNodes.contains(nodeId))
continue;
- CachePartitionPartialCountersMap countersMap = e.getValue().partitionUpdateCounters(top.groupId(), partitions);
- Map<Integer, Long> sizesMap = e.getValue().partitionSizes(top.groupId());
+ final GridDhtPartitionsSingleMessage message = e.getValue();
+
+ CachePartitionPartialCountersMap countersMap = message.partitionUpdateCountersUnsorted(top.groupId(), partitions);
+
+ Map<Integer, Long> sizesMap = message.partitionSizes(top.groupId());
- Set<Integer> ignorePartitions = shouldIgnore(top, nodeId, e.getValue());
+ Set<Integer> ignorePartitions = shouldIgnore(top, nodeId, countersMap, sizesMap);
for (int i = 0; i < countersMap.size(); i++) {
int p = countersMap.partitionAt(i);
@@ -269,20 +281,22 @@ public class GridDhtPartitionsStateValidator {
* @param node Node id.
* @param counter Counter value reported by {@code node}.
*/
- private void process(Map<Integer, Map<UUID, Long>> invalidPartitions,
- Map<Integer, T2<UUID, Long>> countersAndNodes,
- int part,
- UUID node,
- long counter) {
- T2<UUID, Long> existingData = countersAndNodes.get(part);
+ private void process(
+ Map<Integer, Map<UUID, Long>> invalidPartitions,
+ Map<Integer, AbstractMap.Entry<UUID, Long>> countersAndNodes,
+ int part,
+ UUID node,
+ long counter
+ ) {
+ AbstractMap.Entry<UUID, Long> existingData = countersAndNodes.get(part);
if (existingData == null)
- countersAndNodes.put(part, new T2<>(node, counter));
+ countersAndNodes.put(part, new AbstractMap.SimpleEntry<>(node, counter));
- if (existingData != null && counter != existingData.get2()) {
+ if (existingData != null && counter != existingData.getValue()) {
if (!invalidPartitions.containsKey(part)) {
Map<UUID, Long> map = new HashMap<>();
- map.put(existingData.get1(), existingData.get2());
+ map.put(existingData.getKey(), existingData.getValue());
invalidPartitions.put(part, map);
}