You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2019/12/05 06:30:06 UTC

[ignite] 01/01: IGNITE-9913 Wip

This is an automated email from the ASF dual-hosted git repository.

av pushed a commit to branch ignite-9913-bench
in repository https://gitbox.apache.org/repos/asf/ignite.git

commit 426f0de9185ac2938dadb7bd2cbfe488233fe7d6
Author: Anton Vinogradov <av...@apache.org>
AuthorDate: Thu Dec 5 09:26:45 2019 +0300

    IGNITE-9913 Wip
    
    Signed-off-by: Anton Vinogradov <av...@apache.org>
---
 .../processors/cache/distributed/Bench.java        | 296 +++++++++++++++++++++
 1 file changed, 296 insertions(+)

diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/Bench.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/Bench.java
new file mode 100644
index 0000000..c062d7d
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/Bench.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.affinity.AffinityFunction;
+import org.apache.ignite.cache.affinity.AffinityFunctionContext;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.jetbrains.annotations.Nullable;
+import org.junit.Test;
+
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+public class Bench extends GridCommonAbstractTest {
+    /** Worst. */
+    private volatile long worst;
+
+    /** Mutex. */
+    private final Object mux = new Object();
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        TestRecordingCommunicationSpi commSpi = new TestRecordingCommunicationSpi();
+
+        cfg.setCommunicationSpi(commSpi);
+
+        CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>();
+
+        ccfg.setName(DEFAULT_CACHE_NAME);
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+        ccfg.setBackups(1);
+        ccfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
+        ccfg.setAffinity(new Map4PartitionsTo4NodesAffinityFunction());
+
+        cfg.setCacheConfiguration(ccfg);
+
+        DataStorageConfiguration cfg1 = new DataStorageConfiguration();
+
+        DataRegionConfiguration drCfg = new DataRegionConfiguration();
+
+        drCfg.setPersistenceEnabled(true);
+
+        cfg.setActiveOnStart(false);
+        cfg.setAutoActivationEnabled(false);
+
+        cfg1.setDefaultDataRegionConfiguration(drCfg);
+
+        cfg.setDataStorageConfiguration(cfg1);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        cleanPersistenceDir();
+    }
+
+    /**
+     * @param parts Number of partitions.
+     * @return Affinity function.
+     */
+    protected AffinityFunction affinityFunction(@Nullable Integer parts) {
+        return new RendezvousAffinityFunction(false,
+            parts == null ? RendezvousAffinityFunction.DFLT_PARTITION_COUNT : parts);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void bench() throws Exception {
+        int nodes = 4;
+        int txDuration = 10_000;
+
+        startGridsMultiThreaded(nodes, true).cluster().active(true);
+
+        Ignite failed = grid(3);
+
+        int multiplicator = 64;
+
+        AtomicInteger key_from = new AtomicInteger();
+
+        AtomicBoolean finished = new AtomicBoolean();
+
+        IgniteCache<Integer, Integer> failedCache = failed.getOrCreateCache(DEFAULT_CACHE_NAME);
+
+        Random r = new Random();
+
+        IgniteInternalFuture<?> nearThenNearFut = multithreadedAsync(() -> {
+            try {
+                U.sleep(r.nextInt(txDuration * 2));
+
+                List<Integer> keys = nearKeys(failedCache, 100, key_from.addAndGet(100_000));
+
+                int idx = 0;
+                long max = 0;
+
+                Ignite primary = primaryNode(keys.get(0), DEFAULT_CACHE_NAME);
+
+                IgniteCache<Integer, Integer> primaryCache = primary.getOrCreateCache(DEFAULT_CACHE_NAME);
+
+                while (!finished.get()) {
+                    Integer key0 = keys.get(idx++);
+                    Integer key1 = keys.get(idx++);
+
+                    long started = System.currentTimeMillis();
+
+                    try (Transaction tx = primary.transactions().txStart()) {
+                        primaryCache.put(key0, key0);
+
+                        U.sleep(txDuration);
+
+                        primaryCache.put(key1, key1);
+
+                        tx.commit();
+                    }
+
+                    long res = System.currentTimeMillis() - started;
+
+                    log.info("Transactionh finished [duration=" + res + "]");
+
+                    if (res > max)
+                        max = res;
+
+                    assertEquals(key0, primaryCache.get(key0));
+                    assertEquals(key1, primaryCache.get(key1));
+                }
+
+                synchronized (mux) {
+                    if (worst < max)
+                        worst = max;
+                }
+            }
+            catch (Exception e) {
+                fail();
+            }
+        }, multiplicator);
+
+        IgniteInternalFuture<?> nearThenBackupFut = multithreadedAsync(() -> {
+            try {
+                U.sleep(r.nextInt(txDuration * 2));
+
+                List<Integer> keys0 = nearKeys(failedCache, 100, key_from.addAndGet(100_000));
+                List<Integer> keys1 = backupKeys(failedCache, 100, key_from.addAndGet(100_000));
+
+                int idx = 0;
+                long max = 0;
+
+                Ignite primary = primaryNode(keys0.get(0), DEFAULT_CACHE_NAME);
+
+                IgniteCache<Integer, Integer> primaryCache = primary.getOrCreateCache(DEFAULT_CACHE_NAME);
+
+                while (!finished.get()) {
+                    Integer key0 = keys0.get(idx++);
+                    Integer key1 = keys1.get(idx++);
+
+                    long started = System.currentTimeMillis();
+
+                    try (Transaction tx = primary.transactions().txStart()) {
+                        primaryCache.put(key0, key0);
+
+                        U.sleep(txDuration);
+
+                        primaryCache.put(key1, key1);
+
+                        tx.commit();
+                    }
+
+                    long res = System.currentTimeMillis() - started;
+
+                    log.info("Transactionh finished [duration=" + res + "]");
+
+                    if (res > max)
+                        max = res;
+
+                    assertEquals(key0, primaryCache.get(key0));
+                    assertEquals(key1, primaryCache.get(key1));
+                }
+
+                synchronized (mux) {
+                    if (worst < max)
+                        worst = max;
+                }
+            }
+            catch (Exception e) {
+                fail();
+            }
+        }, multiplicator);
+
+        U.sleep(txDuration * 2);
+
+        failed.close(); // Stopping node.
+
+        U.sleep(txDuration * 5);
+
+        finished.set(true);
+
+        nearThenNearFut.get();
+        nearThenBackupFut.get();
+
+        synchronized (mux) {
+            log.info("Worst case is " + worst); // master ~ 19830 ms, ignite-9913 ~ 10294 ms
+        }
+    }
+
+    /**
+     *
+     */
+    private static class Map4PartitionsTo4NodesAffinityFunction extends RendezvousAffinityFunction {
+        /**
+         * Default constructor.
+         */
+        public Map4PartitionsTo4NodesAffinityFunction() {
+            super(false, 4);
+        }
+
+        /** {@inheritDoc} */
+        @Override public List<List<ClusterNode>> assignPartitions(AffinityFunctionContext affCtx) {
+            List<List<ClusterNode>> res = new ArrayList<>(4);
+
+            // Partitions by owners (node{parts}): 0{0,3}, 1{0,1}, 2{1,2}, 3{2,3}
+            if (affCtx.currentTopologySnapshot().size() == 4) {
+                List<ClusterNode> p0 = new ArrayList<>();
+
+                p0.add(affCtx.currentTopologySnapshot().get(0));
+                p0.add(affCtx.currentTopologySnapshot().get(1));
+
+                List<ClusterNode> p1 = new ArrayList<>();
+
+                p1.add(affCtx.currentTopologySnapshot().get(1));
+                p1.add(affCtx.currentTopologySnapshot().get(2));
+
+                List<ClusterNode> p2 = new ArrayList<>();
+
+                p2.add(affCtx.currentTopologySnapshot().get(2));
+                p2.add(affCtx.currentTopologySnapshot().get(3));
+
+                List<ClusterNode> p3 = new ArrayList<>();
+
+                p3.add(affCtx.currentTopologySnapshot().get(3));
+                p3.add(affCtx.currentTopologySnapshot().get(0));
+
+                res.add(p0);
+                res.add(p1);
+                res.add(p2);
+                res.add(p3);
+            }
+
+            return res;
+        }
+    }
+}