You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by zs...@apache.org on 2021/10/18 08:39:48 UTC

[ignite] branch master updated: IGNITE-15722 Yardstick. Make IgnitePutAllBenchmark more predictable, fix initialization - Fixes #9485.

This is an automated email from the ASF dual-hosted git repository.

zstan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new ca461f8  IGNITE-15722 Yardstick. Make IgnitePutAllBenchmark more predictable, fix initialization - Fixes #9485.
ca461f8 is described below

commit ca461f8c3ddfbec812655cccfeb83de383a996ab
Author: zstan <st...@gmail.com>
AuthorDate: Mon Oct 18 11:38:29 2021 +0300

    IGNITE-15722 Yardstick. Make IgnitePutAllBenchmark more predictable, fix initialization - Fixes #9485.
    
    Signed-off-by: zstan <st...@gmail.com>
---
 .../ignite/yardstick/IgniteBenchmarkArguments.java |  11 --
 .../yardstick/cache/IgnitePutAllBenchmark.java     | 117 +++++++++------------
 2 files changed, 50 insertions(+), 78 deletions(-)

diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/IgniteBenchmarkArguments.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/IgniteBenchmarkArguments.java
index 5f247ab..45f1612 100644
--- a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/IgniteBenchmarkArguments.java
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/IgniteBenchmarkArguments.java
@@ -148,10 +148,6 @@ public class IgniteBenchmarkArguments {
     private boolean collocated;
 
     /** */
-    @Parameter(names = {"-stripe", "--singleStripe"}, description = "Generate keys belonging to single stripe per node")
-    private boolean singleStripe;
-
-    /** */
     @Parameter(names = {"-jdbc", "--jdbcUrl"}, description = "JDBC url")
     private String jdbcUrl;
 
@@ -562,13 +558,6 @@ public class IgniteBenchmarkArguments {
     }
 
     /**
-     * @return Generate keys for single stripe per node.
-     */
-    public boolean singleStripe() {
-        return singleStripe;
-    }
-
-    /**
      * @return Delay in second which used in nodes restart algorithm.
      */
     public int restartDelay() {
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgnitePutAllBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgnitePutAllBenchmark.java
index 707e56c..1ab170d 100644
--- a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgnitePutAllBenchmark.java
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgnitePutAllBenchmark.java
@@ -18,15 +18,16 @@
 package org.apache.ignite.yardstick.cache;
 
 import java.util.ArrayList;
-import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.affinity.Affinity;
 import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.internal.util.typedef.internal.U;
 import org.yardstickframework.BenchmarkConfiguration;
 
 /**
@@ -42,100 +43,82 @@ public class IgnitePutAllBenchmark extends IgniteCacheAbstractBenchmark<Integer,
     /** Affinity mapper. */
     private Affinity<Integer> aff;
 
-    /** */
-    private int srvrCnt;
+    /** Sequentially grow thread data identifier.*/
+    AtomicInteger threadIdent = new AtomicInteger();
 
-    /** */
-    private int stripesCnt;
+    /** Predefined batches.*/
+    List<Map<Integer, Map<Integer, Integer>>> batchMaps;
 
     /** {@inheritDoc} */
-    @Override public void setUp(BenchmarkConfiguration cfg) throws Exception {
-        super.setUp(cfg);
-
-        aff = ignite().affinity(cache().getName());
-
-        Collection<ClusterNode> nodes = ignite().cluster().forServers().nodes();
-
-        stripesCnt = ignite().cluster().forServers().forRandom().metrics().getTotalCpus();
+    @Override public void tearDown() throws Exception {
+        super.tearDown();
 
-        srvrCnt = nodes.size();
-
-        IgniteLogger log = ignite().log();
-
-        if (log.isInfoEnabled())
-            log.info("Servers info [srvrsCnt=" + srvrCnt + ", stripesCnt=" + stripesCnt + ']');
+        if (threadIdent.get() != batchMaps.size())
+            throw new IgniteException("Some workers are not initialized.");
     }
 
     /** {@inheritDoc} */
-    @Override public boolean test(Map<Object, Object> ctx) throws Exception {
-        List<Map<Integer, Integer>> putMaps = (List<Map<Integer, Integer>>)ctx.get(PUT_MAPS_KEY);
-
-        if (putMaps == null) {
-            putMaps = new ArrayList<>(PUT_MAPS_CNT);
+    @Override public void setUp(BenchmarkConfiguration cfg) throws Exception {
+        super.setUp(cfg);
 
-            ctx.put(PUT_MAPS_KEY, putMaps);
-        }
+        int threadsCnt = cfg.threads();
 
-        Map<Integer, Integer> vals;
+        batchMaps = new ArrayList<>(threadsCnt);
 
-        if (putMaps.size() == PUT_MAPS_CNT)
-            vals = putMaps.get(nextRandom(PUT_MAPS_CNT));
-        else {
-            vals = new TreeMap<>();
+        for (int i = 0; i < threadsCnt; ++i)
+            batchMaps.add(null);
 
-            ClusterNode node = args.collocated() ? aff.mapKeyToNode(nextRandom(args.range())) : null;
+        for (int i = 0; i < threadsCnt; ++i) {
+            for (int m = 0; m < PUT_MAPS_CNT; ++m) {
+                TreeMap<Integer, Integer> vals = new TreeMap<>();
 
-            Map<ClusterNode, Integer> stripesMap = null;
+                ClusterNode node = args.collocated() ? aff.mapKeyToNode(nextRandom(args.range())) : null;
 
-            if (args.singleStripe())
-                stripesMap = U.newHashMap(srvrCnt);
+                for (; vals.size() < args.batch(); ) {
+                    int key = nextRandom(args.range());
+                    
+                    if (args.collocated() && !aff.isPrimary(node, key))
+                        continue;
 
-            for (; vals.size() < args.batch(); ) {
-                int key = nextRandom(args.range());
+                    vals.put(key, key);
+                }
 
-                if (args.collocated() && !aff.isPrimary(
-                    node,
-                    key))
-                    continue;
+                Map<Integer, Map<Integer, Integer>> map = batchMaps.get(i);
 
-                if (args.singleStripe()) {
-                    int part = aff.partition(key);
+                if (map == null)
+                    batchMaps.set(i, map = new HashMap<>());
 
-                    ClusterNode node0 = node != null ? node : aff.mapPartitionToNode(part);
+                map.put(m, vals);
+            }
+        }
 
-                    Integer stripe0 = stripesMap.get(node0);
-                    int stripe = part % stripesCnt;
+        aff = ignite().affinity(cache().getName());
 
-                    if (stripe0 != null) {
-                        if (stripe0 != stripe)
-                            continue;
-                    }
-                    else
-                        stripesMap.put(
-                            node0,
-                            stripe);
-                }
+        IgniteLogger log = ignite().log();
 
-                vals.put(
-                    key,
-                    key);
-            }
+        if (log.isInfoEnabled())
+            log.info("Initialization completed, batches predefined for " + threadsCnt + "threads.");
+    }
 
-            putMaps.add(vals);
+    /** {@inheritDoc} */
+    @Override public boolean test(Map<Object, Object> ctx) throws Exception {
+        if (ctx.isEmpty()) {
+            int currCeil = threadIdent.getAndIncrement();
 
-            if (putMaps.size() == PUT_MAPS_CNT) {
-                IgniteLogger log = ignite().log();
+            Map<Integer, Map<Integer, Integer>> batches = batchMaps.get(currCeil);
 
-                if (log.isInfoEnabled())
-                    log.info("Put maps set generated.");
-            }
+            ctx.put(PUT_MAPS_KEY, batches);
         }
 
+        Map<Integer, Map<Integer, Integer>> batches = (Map<Integer, Map<Integer, Integer>>)ctx.get(PUT_MAPS_KEY);
+
+        Map<Integer, Integer> vals = batches.get(nextRandom(PUT_MAPS_CNT));
+
         putData(vals);
 
         return true;
     }
-
+    
     /** Put operations.*/
     protected void putData(Map<Integer, Integer> vals) throws Exception {
         IgniteCache<Integer, Object> cache = cacheForOperation();