You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2015/09/25 01:12:16 UTC
[40/50] ignite git commit: Added test.
Added test.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4b0c029c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4b0c029c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4b0c029c
Branch: refs/heads/ignite-257
Commit: 4b0c029cef4b351f0d389a171c30b7dcf8c1ca22
Parents: b56b15c
Author: sboikov <sb...@gridgain.com>
Authored: Thu Sep 24 12:19:28 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Sep 24 12:19:28 2015 +0300
----------------------------------------------------------------------
.../near/NearCacheMultithreadedUpdateTest.java | 217 +++++++++++++++++++
1 file changed, 217 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/4b0c029c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/NearCacheMultithreadedUpdateTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/NearCacheMultithreadedUpdateTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/NearCacheMultithreadedUpdateTest.java
new file mode 100644
index 0000000..9d92724
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/NearCacheMultithreadedUpdateTest.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.near;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+public class NearCacheMultithreadedUpdateTest extends GridCommonAbstractTest {
+ /** */
+ private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private boolean client;
+
+ /** */
+ private final int SRV_CNT = 3;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+ cfg.setClientMode(client);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ startGridsMultiThreaded(SRV_CNT);
+
+ client = true;
+
+ startGrid(SRV_CNT);
+
+ client = false;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ super.afterTestsStopped();
+
+ stopAllGrids();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testUpdateMultithreadedTx() throws Exception {
+ updateMultithreaded(TRANSACTIONAL, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testUpdateMultithreadedTxRestart() throws Exception {
+ updateMultithreaded(TRANSACTIONAL, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testUpdateMultithreadedAtomic() throws Exception {
+ updateMultithreaded(ATOMIC, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testUpdateMultithreadedAtomicRestart() throws Exception {
+ updateMultithreaded(ATOMIC, true);
+ }
+
+ /**
+ * @param atomicityMode Cache atomicity mode.
+ * @param restart If {@code true} restarts one node.
+ * @throws Exception If failed.
+ */
+ private void updateMultithreaded(CacheAtomicityMode atomicityMode, boolean restart) throws Exception {
+ Ignite srv = ignite(0);
+
+ srv.destroyCache(null);
+
+ IgniteCache<Integer, Integer> srvCache = srv.createCache(cacheConfiguration(atomicityMode));
+
+ Ignite client = ignite(SRV_CNT);
+
+ assertTrue(client.configuration().isClientMode());
+
+ final IgniteCache<Integer, Integer> clientCache =
+ client.createNearCache(null, new NearCacheConfiguration<Integer, Integer>());
+
+ final AtomicBoolean stop = new AtomicBoolean();
+
+ IgniteInternalFuture<?> restartFut = null;
+
+ // Primary key for restarted node.
+ final Integer key0 = primaryKey(ignite(SRV_CNT - 1).cache(null));
+
+ if (restart) {
+ restartFut = GridTestUtils.runAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ while (!stop.get()) {
+ Thread.sleep(300);
+
+ log.info("Stop node.");
+
+ stopGrid(SRV_CNT - 1);
+
+ Thread.sleep(300);
+
+ log.info("Start node.");
+
+ startGrid(SRV_CNT - 1);
+ }
+
+ return null;
+ }
+ }, "restart-thread");
+ }
+
+ try {
+ long stopTime = System.currentTimeMillis() + 30_000;
+
+ int iter = 0;
+
+ while (System.currentTimeMillis() < stopTime) {
+ if (iter % 100 == 0)
+ log.info("Iteration: " + iter);
+
+ final Integer key = iter++;
+
+ final AtomicInteger val = new AtomicInteger();
+
+ GridTestUtils.runMultiThreaded(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ clientCache.put(key0, val.incrementAndGet());
+
+ for (int i = 0; i < 10; i++)
+ clientCache.put(key, val.incrementAndGet());
+
+ return null;
+ }
+ }, 20, "update-thread");
+
+ if (restart) {
+ assertEquals(srvCache.get(key), clientCache.get(key));
+ assertEquals(srvCache.get(key0), clientCache.get(key0));
+ }
+ else {
+ assertEquals(srvCache.get(key), clientCache.localPeek(key));
+ assertEquals(srvCache.get(key0), clientCache.localPeek(key0));
+ }
+ }
+
+ stop.set(true);
+
+ if (restartFut != null)
+ restartFut.get();
+ }
+ finally {
+ stop.set(true);
+ }
+ }
+
+ /**
+ * @param atomicityMode Cache atomicity mode.
+ * @return Cache configuration.
+ */
+ private CacheConfiguration<Integer, Integer> cacheConfiguration(CacheAtomicityMode atomicityMode) {
+ CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>();
+
+ ccfg.setAtomicityMode(atomicityMode);
+ ccfg.setWriteSynchronizationMode(FULL_SYNC);
+ ccfg.setBackups(1);
+
+ return ccfg;
+ }
+}