You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2015/09/25 01:12:16 UTC

[40/50] ignite git commit: Added test.

Added test.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4b0c029c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4b0c029c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4b0c029c

Branch: refs/heads/ignite-257
Commit: 4b0c029cef4b351f0d389a171c30b7dcf8c1ca22
Parents: b56b15c
Author: sboikov <sb...@gridgain.com>
Authored: Thu Sep 24 12:19:28 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Sep 24 12:19:28 2015 +0300

----------------------------------------------------------------------
 .../near/NearCacheMultithreadedUpdateTest.java  | 217 +++++++++++++++++++
 1 file changed, 217 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/4b0c029c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/NearCacheMultithreadedUpdateTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/NearCacheMultithreadedUpdateTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/NearCacheMultithreadedUpdateTest.java
new file mode 100644
index 0000000..9d92724
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/NearCacheMultithreadedUpdateTest.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.near;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+public class NearCacheMultithreadedUpdateTest extends GridCommonAbstractTest {
+    /** */
+    private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private boolean client;
+
+    /** */
+    private final int SRV_CNT = 3;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+        cfg.setClientMode(client);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGridsMultiThreaded(SRV_CNT);
+
+        client = true;
+
+        startGrid(SRV_CNT);
+
+        client = false;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        super.afterTestsStopped();
+
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testUpdateMultithreadedTx() throws Exception {
+        updateMultithreaded(TRANSACTIONAL, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testUpdateMultithreadedTxRestart() throws Exception {
+        updateMultithreaded(TRANSACTIONAL, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testUpdateMultithreadedAtomic() throws Exception {
+        updateMultithreaded(ATOMIC, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testUpdateMultithreadedAtomicRestart() throws Exception {
+        updateMultithreaded(ATOMIC, true);
+    }
+
+    /**
+     * @param atomicityMode Cache atomicity mode.
+     * @param restart If {@code true} restarts one node.
+     * @throws Exception If failed.
+     */
+    private void updateMultithreaded(CacheAtomicityMode atomicityMode, boolean restart) throws Exception {
+        Ignite srv = ignite(0);
+
+        srv.destroyCache(null);
+
+        IgniteCache<Integer, Integer> srvCache = srv.createCache(cacheConfiguration(atomicityMode));
+
+        Ignite client = ignite(SRV_CNT);
+
+        assertTrue(client.configuration().isClientMode());
+
+        final IgniteCache<Integer, Integer> clientCache =
+            client.createNearCache(null, new NearCacheConfiguration<Integer, Integer>());
+
+        final AtomicBoolean stop = new AtomicBoolean();
+
+        IgniteInternalFuture<?> restartFut = null;
+
+        // Primary key for restarted node.
+        final Integer key0 = primaryKey(ignite(SRV_CNT - 1).cache(null));
+
+        if (restart) {
+            restartFut = GridTestUtils.runAsync(new Callable<Void>() {
+                @Override public Void call() throws Exception {
+                    while (!stop.get()) {
+                        Thread.sleep(300);
+
+                        log.info("Stop node.");
+
+                        stopGrid(SRV_CNT - 1);
+
+                        Thread.sleep(300);
+
+                        log.info("Start node.");
+
+                        startGrid(SRV_CNT - 1);
+                    }
+
+                    return null;
+                }
+            }, "restart-thread");
+        }
+
+        try {
+            long stopTime = System.currentTimeMillis() + 30_000;
+
+            int iter = 0;
+
+            while (System.currentTimeMillis() < stopTime) {
+                if (iter % 100 == 0)
+                    log.info("Iteration: " + iter);
+
+                final Integer key = iter++;
+
+                final AtomicInteger val = new AtomicInteger();
+
+                GridTestUtils.runMultiThreaded(new Callable<Void>() {
+                    @Override public Void call() throws Exception {
+                        clientCache.put(key0, val.incrementAndGet());
+
+                        for (int i = 0; i < 10; i++)
+                            clientCache.put(key, val.incrementAndGet());
+
+                        return null;
+                    }
+                }, 20, "update-thread");
+
+                if (restart) {
+                    assertEquals(srvCache.get(key), clientCache.get(key));
+                    assertEquals(srvCache.get(key0), clientCache.get(key0));
+                }
+                else {
+                    assertEquals(srvCache.get(key), clientCache.localPeek(key));
+                    assertEquals(srvCache.get(key0), clientCache.localPeek(key0));
+                }
+            }
+
+            stop.set(true);
+
+            if (restartFut != null)
+                restartFut.get();
+        }
+        finally {
+            stop.set(true);
+        }
+    }
+
+    /**
+     * @param atomicityMode Cache atomicity mode.
+     * @return Cache configuration.
+     */
+    private CacheConfiguration<Integer, Integer> cacheConfiguration(CacheAtomicityMode atomicityMode) {
+        CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>();
+
+        ccfg.setAtomicityMode(atomicityMode);
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+        ccfg.setBackups(1);
+
+        return ccfg;
+    }
+}