You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2015/10/22 11:12:44 UTC
[21/51] [abbrv] ignite git commit: ignite-1635,
ignite-1616 Added unit-tests for the bugs.
ignite-1635, ignite-1616 Added unit-tests for the bugs.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/077af17f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/077af17f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/077af17f
Branch: refs/heads/ignite-1282
Commit: 077af17f7e62ed1c4d0f699c9fd39b9d8161ae1f
Parents: 3a29b97
Author: ashutak <as...@gridgain.com>
Authored: Thu Oct 15 16:58:23 2015 +0300
Committer: ashutak <as...@gridgain.com>
Committed: Thu Oct 15 16:58:23 2015 +0300
----------------------------------------------------------------------
.../CacheAbstractRestartSelfTest.java | 247 +++++++++++++++++++
...NearDisabledAtomicInvokeRestartSelfTest.java | 179 ++++++++++++++
...abledTransactionalInvokeRestartSelfTest.java | 173 +++++++++++++
...edTransactionalWriteReadRestartSelfTest.java | 124 ++++++++++
.../IgniteCacheLoadConsistencyTestSuite.java | 42 ++++
5 files changed, 765 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/077af17f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAbstractRestartSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAbstractRestartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAbstractRestartSelfTest.java
new file mode 100644
index 0000000..7537af1
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAbstractRestartSelfTest.java
@@ -0,0 +1,247 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import java.util.ArrayList;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.cache.IgniteCacheAbstractTest;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.testframework.GridTestUtils;
+
+/**
+ * Abstract restart test.
+ */
+public abstract class CacheAbstractRestartSelfTest extends IgniteCacheAbstractTest {
+ /** */
+ private volatile CountDownLatch cacheCheckedLatch = new CountDownLatch(1);
+
+ /** */
+ private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(true);
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ if (gridName.equals(getTestGridName(gridCount() - 1)))
+ cfg.setClientMode(true);
+
+ cfg.setPeerClassLoadingEnabled(false);
+
+ ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected long getTestTimeout() {
+ return 8 * 60_000;
+ }
+
+ /**
+ * @return Number of updaters threads.
+ */
+ protected int updatersNumber() {
+ return 64;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testRestart() throws Exception {
+ final int clientGrid = gridCount() - 1;
+
+ assertTrue(ignite(clientGrid).configuration().isClientMode());
+
+ final IgniteEx grid = grid(clientGrid);
+
+ final IgniteCache cache = jcache(clientGrid);
+
+ updateCache(grid, cache);
+
+ final AtomicBoolean stop = new AtomicBoolean();
+
+ ArrayList<IgniteInternalFuture> updaterFuts = new ArrayList<>();
+
+ for (int i = 0; i < updatersNumber(); i++) {
+ final int threadIdx = i;
+
+ IgniteInternalFuture<?> updateFut = GridTestUtils.runAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ Thread.currentThread().setName("update-thread-" + threadIdx);
+
+ assertTrue(cacheCheckedLatch.await(30_000, TimeUnit.MILLISECONDS));
+
+ int iter = 0;
+
+ while (!stop.get()) {
+ log.info("Start update: " + iter);
+
+ rwl.readLock().lock();
+
+ try {
+ updateCache(grid, cache);
+ }
+ finally {
+ rwl.readLock().unlock();
+ }
+
+ log.info("End update: " + iter++);
+ }
+
+ log.info("Update iterations: " + iter);
+
+ return null;
+ }
+ });
+
+ updaterFuts.add(updateFut);
+ }
+
+ IgniteInternalFuture<?> restartFut = GridTestUtils.runAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ Thread.currentThread().setName("restart-thread");
+
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ while (!stop.get()) {
+ assertTrue(cacheCheckedLatch.await(30_000, TimeUnit.MILLISECONDS));
+
+ int node = rnd.nextInt(0, gridCount() - 1);
+
+ log.info("Stop node: " + node);
+
+ stopGrid(node);
+
+ U.sleep(restartSleep());
+
+ log.info("Start node: " + node);
+
+ startGrid(node);
+
+ cacheCheckedLatch = new CountDownLatch(1);
+
+ U.sleep(restartDelay());
+
+ awaitPartitionMapExchange();
+ }
+
+ return null;
+ }
+ });
+
+ long endTime = System.currentTimeMillis() + getTestDuration();
+
+ try {
+ int iter = 0;
+
+ while (System.currentTimeMillis() < endTime && !isAnyDone(updaterFuts) && !restartFut.isDone()) {
+ try {
+ log.info("Start of cache checking: " + iter);
+
+ rwl.writeLock().lock();
+
+ try {
+ checkCache(grid, cache);
+ }
+ finally {
+ rwl.writeLock().unlock();
+ }
+
+ log.info("End of cache checking: " + iter++);
+ }
+ finally {
+ cacheCheckedLatch.countDown();
+ }
+ }
+
+ log.info("Checking iteration: " + iter);
+ }
+ finally {
+ cacheCheckedLatch.countDown();
+
+ stop.set(true);
+ }
+
+ for (IgniteInternalFuture fut : updaterFuts)
+ fut.get();
+
+ restartFut.get();
+
+ checkCache(grid, cache);
+ }
+
+ /**
+ * @return Test duration.
+ * @see #getTestTimeout()
+ */
+ protected int getTestDuration() {
+ return 60_000;
+ }
+
+ /**
+ * @return Restart sleep in milliseconds.
+ */
+ private int restartSleep() {
+ return 100;
+ }
+
+ /**
+ * @return Restart delay in milliseconds.
+ */
+ private int restartDelay() {
+ return 100;
+ }
+
+ /**
+ * Checks cache in one thread. All update operations are not executed.
+ *
+ * @param cache Cache.
+ */
+ protected abstract void checkCache(IgniteEx grid, IgniteCache cache) throws Exception ;
+
+ /**
+ * Updates cache in many threads.
+ *
+ * @param grid Grid.
+ * @param cache Cache.
+ */
+ protected abstract void updateCache(IgniteEx grid, IgniteCache cache) throws Exception ;
+
+ /**
+ * @param futs Futers.
+ * @return {@code True} if all futures are done.
+ */
+ private static boolean isAnyDone(ArrayList<IgniteInternalFuture> futs) {
+ for (IgniteInternalFuture fut : futs) {
+ if (fut.isDone())
+ return true;
+ }
+
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/077af17f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheNearDisabledAtomicInvokeRestartSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheNearDisabledAtomicInvokeRestartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheNearDisabledAtomicInvokeRestartSelfTest.java
new file mode 100644
index 0000000..90427f5
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheNearDisabledAtomicInvokeRestartSelfTest.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicLong;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicWriteOrderMode;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheEntryProcessor;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+
+/**
+ * Invoke retry consistency test.
+ */
+public class CacheNearDisabledAtomicInvokeRestartSelfTest extends CacheAbstractRestartSelfTest {
+ /** */
+ public static final int RANGE = 50;
+
+ /** */
+ private static final long FIRST_VAL = 1;
+
+ /** */
+ private final ConcurrentMap<String, AtomicLong> nextValMap = new ConcurrentHashMap<>();
+
+ /** {@inheritDoc} */
+ @Override protected int gridCount() {
+ return 4;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected CacheMode cacheMode() {
+ return PARTITIONED;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected CacheAtomicityMode atomicityMode() {
+ return ATOMIC;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected CacheAtomicWriteOrderMode atomicWriteOrderMode() {
+ return CacheAtomicWriteOrderMode.PRIMARY;
+ }
+
+ /** */
+ @Override protected NearCacheConfiguration nearConfiguration() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ protected void checkCache(IgniteEx ignite, IgniteCache cache) throws Exception {
+ log.info("Start cache validation.");
+
+ long startTime = U.currentTimeMillis();
+
+ Map<String, Set> badCacheEntries = new HashMap<>();
+
+ for (Map.Entry<String, AtomicLong> e : nextValMap.entrySet()) {
+ String key = e.getKey();
+
+ Set set = (Set)cache.get(key);
+
+ if (set == null || e.getValue() == null || !Objects.equals(e.getValue().get(), (long)set.size()))
+ badCacheEntries.put(key, set);
+ }
+
+ if (!badCacheEntries.isEmpty()) {
+ // Print all usefull information and finish.
+ for (Map.Entry<String, Set> e : badCacheEntries.entrySet()) {
+ String key = e.getKey();
+
+ U.error(log, "Got unexpected set size [key='" + key + "', expSize=" + nextValMap.get(key)
+ + ", cacheVal=" + e.getValue() + "]");
+ }
+
+ log.info("Next values map contant:");
+ for (Map.Entry<String, AtomicLong> e : nextValMap.entrySet())
+ log.info("Map Entry [key=" + e.getKey() + ", val=" + e.getValue() + "]");
+
+ log.info("Cache content:");
+
+ for (int k2 = 0; k2 < RANGE; k2++) {
+ String key2 = "key-" + k2;
+
+ Object val = cache.get(key2);
+
+ if (val != null)
+ log.info("Cache Entry [key=" + key2 + ", val=" + val + "]");
+
+ }
+
+ fail("Cache and local map are in inconsistent state [badKeys=" + badCacheEntries.keySet() + ']');
+ }
+
+ log.info("Clearing all data.");
+
+ cache.removeAll();
+ nextValMap.clear();
+
+ log.info("Cache validation successfully finished in "
+ + (U.currentTimeMillis() - startTime) / 1000 + " sec.");
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void updateCache(IgniteEx ignite, IgniteCache cache) {
+ final int k = ThreadLocalRandom.current().nextInt(RANGE);
+
+ String key = "key-" + k;
+
+ AtomicLong nextAtomicVal = nextValMap.putIfAbsent(key, new AtomicLong(FIRST_VAL));
+
+ Long nextVal = FIRST_VAL;
+
+ if (nextAtomicVal != null)
+ nextVal = nextAtomicVal.incrementAndGet();
+
+ cache.invoke(key, new AddInSetEntryProcessor(), nextVal);
+ }
+
+ /**
+ */
+ private static class AddInSetEntryProcessor implements CacheEntryProcessor<String, Set, Object> {
+ /** */
+ private static final long serialVersionUID = 0;
+
+ /** {@inheritDoc} */
+ @Override public Object process(MutableEntry<String, Set> entry,
+ Object... arguments) throws EntryProcessorException {
+ assert !F.isEmpty(arguments);
+
+ Object val = arguments[0];
+
+ Set set;
+
+ if (!entry.exists())
+ set = new HashSet<>();
+ else
+ set = entry.getValue();
+
+ set.add(val);
+
+ entry.setValue(set);
+
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/077af17f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheNearDisabledTransactionalInvokeRestartSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheNearDisabledTransactionalInvokeRestartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheNearDisabledTransactionalInvokeRestartSelfTest.java
new file mode 100644
index 0000000..f4eea6c
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheNearDisabledTransactionalInvokeRestartSelfTest.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicLong;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheEntryProcessor;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+
+/**
+ * Invoke retry consistency test.
+ */
+public class CacheNearDisabledTransactionalInvokeRestartSelfTest extends CacheAbstractRestartSelfTest {
+ /** */
+ public static final int RANGE = 100;
+
+ /** */
+ private static final int KEYS_CNT = 5;
+
+ /** */
+ protected final ConcurrentMap<String, AtomicLong> map = new ConcurrentHashMap<>();
+
+ /** {@inheritDoc} */
+ @Override protected int gridCount() {
+ return 4;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected CacheMode cacheMode() {
+ return PARTITIONED;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected CacheAtomicityMode atomicityMode() {
+ return TRANSACTIONAL;
+ }
+
+ /** */
+ @Override protected NearCacheConfiguration nearConfiguration() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ protected void checkCache(IgniteEx ignite, IgniteCache cache) {
+ log.info("Start cache validation.");
+
+ long startTime = U.currentTimeMillis();
+
+ Map<String, Long> notEqualsCacheVals = new HashMap<>();
+ Map<String, Long> notEqualsLocMapVals = new HashMap<>();
+
+ for (int k = 0; k < RANGE; k++) {
+ if (k % 10_000 == 0)
+ log.info("Start validation for keys like 'key-" + k + "-*'");
+
+ for (int i = 0; i < KEYS_CNT; i++) {
+ String key = "key-" + k + "-" + i;
+
+ Long cacheVal = (Long)cache.get(key);
+
+ AtomicLong aVal = map.get(key);
+ Long mapVal = aVal != null ? aVal.get() : null;
+
+ if (!Objects.equals(cacheVal, mapVal)) {
+ notEqualsCacheVals.put(key, cacheVal);
+ notEqualsLocMapVals.put(key, mapVal);
+ }
+ }
+ }
+
+ assert notEqualsCacheVals.size() == notEqualsLocMapVals.size() : "Invalid state " +
+ "[cacheMapVals=" + notEqualsCacheVals + ", mapVals=" + notEqualsLocMapVals + "]";
+
+ if (!notEqualsCacheVals.isEmpty()) {
+ // Print all usefull information and finish.
+ for (Map.Entry<String, Long> eLocMap : notEqualsLocMapVals.entrySet()) {
+ String key = eLocMap.getKey();
+ Long mapVal = eLocMap.getValue();
+ Long cacheVal = notEqualsCacheVals.get(key);
+
+ U.error(log, "Got different values [key='" + key
+ + "', cacheVal=" + cacheVal + ", localMapVal=" + mapVal + "]");
+ }
+
+ log.info("Local driver map contant:\n " + map);
+
+ log.info("Cache content:");
+
+ for (int k2 = 0; k2 < RANGE; k2++) {
+ for (int i2 = 0; i2 < KEYS_CNT; i2++) {
+ String key2 = "key-" + k2 + "-" + i2;
+
+ Long val = (Long)cache.get(key2);
+
+ if (val != null)
+ log.info("Entry [key=" + key2 + ", val=" + val + "]");
+ }
+ }
+
+ throw new IllegalStateException("Cache and local map are in inconsistent state [badKeys="
+ + notEqualsCacheVals.keySet() + ']');
+ }
+
+ log.info("Cache validation successfully finished in "
+ + (U.currentTimeMillis() - startTime) / 1000 + " sec.");
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void updateCache(IgniteEx ignite, IgniteCache cache) {
+ final int k = ThreadLocalRandom.current().nextInt(RANGE);
+
+ final String[] keys = new String[KEYS_CNT];
+
+ for (int i = 0; i < keys.length; i++)
+ keys[i] = "key-" + k + "-" + i;
+
+ for (String key : keys) {
+ cache.invoke(key, new IncrementCacheEntryProcessor());
+
+ AtomicLong prevVal = map.putIfAbsent(key, new AtomicLong(0));
+
+ if (prevVal != null)
+ prevVal.incrementAndGet();
+ }
+ }
+
+ /**
+ */
+ private static class IncrementCacheEntryProcessor implements CacheEntryProcessor<String, Long, Long> {
+ /** */
+ private static final long serialVersionUID = 0;
+
+ /** {@inheritDoc} */
+ @Override public Long process(MutableEntry<String, Long> entry,
+ Object... arguments) throws EntryProcessorException {
+ long newVal = entry.getValue() == null ? 0 : entry.getValue() + 1;
+
+ entry.setValue(newVal);
+
+ return newVal;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/077af17f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheNearDisabledTransactionalWriteReadRestartSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheNearDisabledTransactionalWriteReadRestartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheNearDisabledTransactionalWriteReadRestartSelfTest.java
new file mode 100644
index 0000000..875aef3
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheNearDisabledTransactionalWriteReadRestartSelfTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+
+/**
+ * Transactional write read consistency test.
+ */
+public class CacheNearDisabledTransactionalWriteReadRestartSelfTest extends CacheAbstractRestartSelfTest{
+ /** */
+ public static final int RANGE = 100;
+
+ /** */
+ private static final int KEYS_CNT = 5;
+
+ /** {@inheritDoc} */
+ @Override protected int gridCount() {
+ return 4;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected CacheMode cacheMode() {
+ return PARTITIONED;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected CacheAtomicityMode atomicityMode() {
+ return TRANSACTIONAL;
+ }
+
+ /** */
+ @Override protected NearCacheConfiguration nearConfiguration() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void checkCache(IgniteEx ignite, IgniteCache cache) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void updateCache(IgniteEx ignite, IgniteCache cache) throws Exception {
+ final int k = ThreadLocalRandom.current().nextInt(RANGE);
+
+ final String[] keys = new String[KEYS_CNT];
+
+ for (int i = 0; i < keys.length; i++)
+ keys[i] = "key-" + k + "-" + i;
+
+ doInTransaction(ignite, new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ Map<String, Long> map = new HashMap<>();
+
+ for (String key : keys) {
+ Long val = (Long)cache.get(key);
+
+ map.put(key, val);
+ }
+
+ Set<Long> values = new HashSet<>(map.values());
+
+ if (values.size() != 1) {
+ // Print all usefull information and finish.
+ U.error(log, "Got different values for keys [map=" + map + "]");
+
+ log.info("Cache content:");
+
+ for (int k = 0; k < RANGE; k++) {
+ for (int i = 0; i < KEYS_CNT; i++) {
+ String key = "key-" + k + "-" + i;
+
+ Long val = (Long)cache.get(key);
+
+ if (val != null)
+ log.info("Entry [key=" + key + ", val=" + val + "]");
+ }
+ }
+
+ throw new IllegalStateException("Found different values for keys (see above information) [map="
+ + map + ']');
+ }
+
+ final Long oldVal = map.get(keys[0]);
+
+ final Long newVal = oldVal == null ? 0 : oldVal + 1;
+
+ for (String key : keys)
+ cache.put(key, newVal);
+
+ return null;
+ }
+ });
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/077af17f/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheLoadConsistencyTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheLoadConsistencyTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheLoadConsistencyTestSuite.java
new file mode 100644
index 0000000..cd0be9c
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheLoadConsistencyTestSuite.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.testsuites;
+
+import junit.framework.TestSuite;
+import org.apache.ignite.internal.processors.cache.distributed.CacheNearDisabledAtomicInvokeRestartSelfTest;
+import org.apache.ignite.internal.processors.cache.distributed.CacheNearDisabledTransactionalInvokeRestartSelfTest;
+import org.apache.ignite.internal.processors.cache.distributed.CacheNearDisabledTransactionalWriteReadRestartSelfTest;
+
+/**
+ * Test suite.
+ */
+public class IgniteCacheLoadConsistencyTestSuite extends TestSuite {
+ /**
+ * @return Ignite Cache Failover test suite.
+ * @throws Exception Thrown in case of the failure.
+ */
+ public static TestSuite suite() throws Exception {
+ TestSuite suite = new TestSuite("Cache Load Consistency Test Suite");
+
+ suite.addTestSuite(CacheNearDisabledAtomicInvokeRestartSelfTest.class);
+ suite.addTestSuite(CacheNearDisabledTransactionalInvokeRestartSelfTest.class);
+ suite.addTestSuite(CacheNearDisabledTransactionalWriteReadRestartSelfTest.class);
+
+ return suite;
+ }
+}