You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2019/12/05 06:30:05 UTC

[ignite] branch ignite-9913-bench created (now 426f0de)

This is an automated email from the ASF dual-hosted git repository.

av pushed a change to branch ignite-9913-bench
in repository https://gitbox.apache.org/repos/asf/ignite.git.


      at 426f0de  IGNITE-9913 Wip

This branch includes the following new commits:

     new 426f0de  IGNITE-9913 Wip

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[ignite] 01/01: IGNITE-9913 Wip

Posted by av...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

av pushed a commit to branch ignite-9913-bench
in repository https://gitbox.apache.org/repos/asf/ignite.git

commit 426f0de9185ac2938dadb7bd2cbfe488233fe7d6
Author: Anton Vinogradov <av...@apache.org>
AuthorDate: Thu Dec 5 09:26:45 2019 +0300

    IGNITE-9913 Wip
    
    Signed-off-by: Anton Vinogradov <av...@apache.org>
---
 .../processors/cache/distributed/Bench.java        | 296 +++++++++++++++++++++
 1 file changed, 296 insertions(+)

diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/Bench.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/Bench.java
new file mode 100644
index 0000000..c062d7d
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/Bench.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.affinity.AffinityFunction;
+import org.apache.ignite.cache.affinity.AffinityFunctionContext;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.jetbrains.annotations.Nullable;
+import org.junit.Test;
+
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+public class Bench extends GridCommonAbstractTest {
+    /** Worst. */
+    private volatile long worst;
+
+    /** Mutex. */
+    private final Object mux = new Object();
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        TestRecordingCommunicationSpi commSpi = new TestRecordingCommunicationSpi();
+
+        cfg.setCommunicationSpi(commSpi);
+
+        CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>();
+
+        ccfg.setName(DEFAULT_CACHE_NAME);
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+        ccfg.setBackups(1);
+        ccfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
+        ccfg.setAffinity(new Map4PartitionsTo4NodesAffinityFunction());
+
+        cfg.setCacheConfiguration(ccfg);
+
+        DataStorageConfiguration cfg1 = new DataStorageConfiguration();
+
+        DataRegionConfiguration drCfg = new DataRegionConfiguration();
+
+        drCfg.setPersistenceEnabled(true);
+
+        cfg.setActiveOnStart(false);
+        cfg.setAutoActivationEnabled(false);
+
+        cfg1.setDefaultDataRegionConfiguration(drCfg);
+
+        cfg.setDataStorageConfiguration(cfg1);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        cleanPersistenceDir();
+    }
+
+    /**
+     * @param parts Number of partitions.
+     * @return Affinity function.
+     */
+    protected AffinityFunction affinityFunction(@Nullable Integer parts) {
+        return new RendezvousAffinityFunction(false,
+            parts == null ? RendezvousAffinityFunction.DFLT_PARTITION_COUNT : parts);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void bench() throws Exception {
+        int nodes = 4;
+        int txDuration = 10_000;
+
+        startGridsMultiThreaded(nodes, true).cluster().active(true);
+
+        Ignite failed = grid(3);
+
+        int multiplicator = 64;
+
+        AtomicInteger key_from = new AtomicInteger();
+
+        AtomicBoolean finished = new AtomicBoolean();
+
+        IgniteCache<Integer, Integer> failedCache = failed.getOrCreateCache(DEFAULT_CACHE_NAME);
+
+        Random r = new Random();
+
+        IgniteInternalFuture<?> nearThenNearFut = multithreadedAsync(() -> {
+            try {
+                U.sleep(r.nextInt(txDuration * 2));
+
+                List<Integer> keys = nearKeys(failedCache, 100, key_from.addAndGet(100_000));
+
+                int idx = 0;
+                long max = 0;
+
+                Ignite primary = primaryNode(keys.get(0), DEFAULT_CACHE_NAME);
+
+                IgniteCache<Integer, Integer> primaryCache = primary.getOrCreateCache(DEFAULT_CACHE_NAME);
+
+                while (!finished.get()) {
+                    Integer key0 = keys.get(idx++);
+                    Integer key1 = keys.get(idx++);
+
+                    long started = System.currentTimeMillis();
+
+                    try (Transaction tx = primary.transactions().txStart()) {
+                        primaryCache.put(key0, key0);
+
+                        U.sleep(txDuration);
+
+                        primaryCache.put(key1, key1);
+
+                        tx.commit();
+                    }
+
+                    long res = System.currentTimeMillis() - started;
+
+                    log.info("Transactionh finished [duration=" + res + "]");
+
+                    if (res > max)
+                        max = res;
+
+                    assertEquals(key0, primaryCache.get(key0));
+                    assertEquals(key1, primaryCache.get(key1));
+                }
+
+                synchronized (mux) {
+                    if (worst < max)
+                        worst = max;
+                }
+            }
+            catch (Exception e) {
+                fail();
+            }
+        }, multiplicator);
+
+        IgniteInternalFuture<?> nearThenBackupFut = multithreadedAsync(() -> {
+            try {
+                U.sleep(r.nextInt(txDuration * 2));
+
+                List<Integer> keys0 = nearKeys(failedCache, 100, key_from.addAndGet(100_000));
+                List<Integer> keys1 = backupKeys(failedCache, 100, key_from.addAndGet(100_000));
+
+                int idx = 0;
+                long max = 0;
+
+                Ignite primary = primaryNode(keys0.get(0), DEFAULT_CACHE_NAME);
+
+                IgniteCache<Integer, Integer> primaryCache = primary.getOrCreateCache(DEFAULT_CACHE_NAME);
+
+                while (!finished.get()) {
+                    Integer key0 = keys0.get(idx++);
+                    Integer key1 = keys1.get(idx++);
+
+                    long started = System.currentTimeMillis();
+
+                    try (Transaction tx = primary.transactions().txStart()) {
+                        primaryCache.put(key0, key0);
+
+                        U.sleep(txDuration);
+
+                        primaryCache.put(key1, key1);
+
+                        tx.commit();
+                    }
+
+                    long res = System.currentTimeMillis() - started;
+
+                    log.info("Transactionh finished [duration=" + res + "]");
+
+                    if (res > max)
+                        max = res;
+
+                    assertEquals(key0, primaryCache.get(key0));
+                    assertEquals(key1, primaryCache.get(key1));
+                }
+
+                synchronized (mux) {
+                    if (worst < max)
+                        worst = max;
+                }
+            }
+            catch (Exception e) {
+                fail();
+            }
+        }, multiplicator);
+
+        U.sleep(txDuration * 2);
+
+        failed.close(); // Stopping node.
+
+        U.sleep(txDuration * 5);
+
+        finished.set(true);
+
+        nearThenNearFut.get();
+        nearThenBackupFut.get();
+
+        synchronized (mux) {
+            log.info("Worst case is " + worst); // master ~ 19830 ms, ignite-9913 ~ 10294 ms
+        }
+    }
+
+    /**
+     *
+     */
+    private static class Map4PartitionsTo4NodesAffinityFunction extends RendezvousAffinityFunction {
+        /**
+         * Default constructor.
+         */
+        public Map4PartitionsTo4NodesAffinityFunction() {
+            super(false, 4);
+        }
+
+        /** {@inheritDoc} */
+        @Override public List<List<ClusterNode>> assignPartitions(AffinityFunctionContext affCtx) {
+            List<List<ClusterNode>> res = new ArrayList<>(4);
+
+            // Partitions by owners (node{parts}): 0{0,3}, 1{0,1}, 2{1,2}, 3{2,3}
+            if (affCtx.currentTopologySnapshot().size() == 4) {
+                List<ClusterNode> p0 = new ArrayList<>();
+
+                p0.add(affCtx.currentTopologySnapshot().get(0));
+                p0.add(affCtx.currentTopologySnapshot().get(1));
+
+                List<ClusterNode> p1 = new ArrayList<>();
+
+                p1.add(affCtx.currentTopologySnapshot().get(1));
+                p1.add(affCtx.currentTopologySnapshot().get(2));
+
+                List<ClusterNode> p2 = new ArrayList<>();
+
+                p2.add(affCtx.currentTopologySnapshot().get(2));
+                p2.add(affCtx.currentTopologySnapshot().get(3));
+
+                List<ClusterNode> p3 = new ArrayList<>();
+
+                p3.add(affCtx.currentTopologySnapshot().get(3));
+                p3.add(affCtx.currentTopologySnapshot().get(0));
+
+                res.add(p0);
+                res.add(p1);
+                res.add(p2);
+                res.add(p3);
+            }
+
+            return res;
+        }
+    }
+}