You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/11/10 14:16:46 UTC
[07/10] ignite git commit: IGNITE-1681: Dogpile effect tests for
CacheStoreBalancingWrapper
IGNITE-1681: Dogpile effect tests for CacheStoreBalancingWrapper
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d70f7eda
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d70f7eda
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d70f7eda
Branch: refs/heads/master
Commit: d70f7eda0492857ffd4879c311c814867552070e
Parents: 7ba2efb
Author: Andrey Gura <ag...@gridgain.com>
Authored: Tue Nov 10 13:59:38 2015 +0300
Committer: Denis Magda <dm...@gridgain.com>
Committed: Tue Nov 10 13:59:38 2015 +0300
----------------------------------------------------------------------
.../store/GridCacheBalancingStoreSelfTest.java | 181 ++++++++++++++++++-
1 file changed, 180 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/d70f7eda/modules/core/src/test/java/org/apache/ignite/cache/store/GridCacheBalancingStoreSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/GridCacheBalancingStoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/store/GridCacheBalancingStoreSelfTest.java
index d41a441..1e3e4b4 100644
--- a/modules/core/src/test/java/org/apache/ignite/cache/store/GridCacheBalancingStoreSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cache/store/GridCacheBalancingStoreSelfTest.java
@@ -17,10 +17,14 @@
package org.apache.ignite.cache.store;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -29,8 +33,10 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.cache.Cache;
import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.processors.cache.CacheStoreBalancingWrapper;
import org.apache.ignite.internal.util.typedef.CI2;
+import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.lang.IgniteCallable;
import org.apache.ignite.testframework.GridTestUtils;
@@ -118,6 +124,81 @@ public class GridCacheBalancingStoreSelfTest extends GridCommonAbstractTest {
}
/**
+ * @throws Exception If failed.
+ */
+ public void testConcurrentLoad() throws Exception {
+ int threads = 5;
+
+ final int keys = 50;
+
+ final CyclicBarrier beforeBarrier = new CyclicBarrier(threads);
+
+ ConcurrentVerifyStore store = new ConcurrentVerifyStore(keys);
+
+ final CacheStoreBalancingWrapper<Integer, Integer> wrapper =new CacheStoreBalancingWrapper<>(store);
+
+ GridTestUtils.runMultiThreaded(new Runnable() {
+ @Override public void run() {
+ for (int i = 0; i < keys; i++) {
+ try {
+ beforeBarrier.await();
+ }
+ catch (InterruptedException | BrokenBarrierException e) {
+ throw new RuntimeException(e);
+ }
+
+ info("Load key: " + i);
+
+ wrapper.load(i);
+ }
+ }
+ }, threads, "load-thread");
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testConcurrentLoadAll() throws Exception {
+ int threads = 5;
+
+ final int threshold = 5;
+
+ final int keysCnt = 100;
+
+ final CyclicBarrier beforeBarrier = new CyclicBarrier(threads);
+
+ ConcurrentVerifyStore store = new ConcurrentVerifyStore(keysCnt);
+
+ final CacheStoreBalancingWrapper<Integer, Integer> wrapper = new CacheStoreBalancingWrapper<>(store);
+
+ GridTestUtils.runMultiThreaded(new Runnable() {
+ @Override public void run() {
+ for (int i = 0; i < keysCnt; i += threshold) {
+ try {
+ beforeBarrier.await();
+ }
+ catch (InterruptedException | BrokenBarrierException e) {
+ throw new RuntimeException(e);
+ }
+
+ List<Integer> keys = new ArrayList<>(threshold);
+
+ for (int j = i; j < i + threshold; j++)
+ keys.add(j);
+
+ info("Load keys: " + keys);
+
+ wrapper.loadAll(keys, new IgniteBiInClosure<Integer, Integer>() {
+ @Override public void apply(Integer integer, Integer integer2) {
+ // No-op.
+ }
+ });
+ }
+ }
+ }, threads, "load-thread");
+ }
+
+ /**
*
*/
private static class VerifyStore implements CacheStore<Integer, Integer> {
@@ -204,4 +285,102 @@ public class GridCacheBalancingStoreSelfTest extends GridCommonAbstractTest {
// No-op.
}
}
-}
\ No newline at end of file
+
+ /**
+ *
+ */
+ private static class ConcurrentVerifyStore implements CacheStore<Integer, Integer> {
+
+ /** Cnts. */
+ private final AtomicInteger[] cnts;
+
+ /**
+ */
+ private ConcurrentVerifyStore(int keys) {
+ this.cnts = new AtomicInteger[keys];
+
+ for (int i = 0; i < keys; i++)
+ cnts[i] = new AtomicInteger();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override public Integer load(Integer key) {
+ try {
+ U.sleep(500);
+ }
+ catch (IgniteInterruptedCheckedException e) {
+ throw new RuntimeException(e);
+ }
+
+ assertEquals("Redundant load call.", 1, cnts[key].incrementAndGet());
+
+ return key;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override public void loadCache(IgniteBiInClosure<Integer, Integer> clo, @Nullable Object... args) {
+ // No-op.
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override public Map<Integer, Integer> loadAll(Iterable<? extends Integer> keys) {
+ try {
+ U.sleep(500);
+ }
+ catch (IgniteInterruptedCheckedException e) {
+ e.printStackTrace();
+ }
+
+ Map<Integer, Integer> loaded = new HashMap<>();
+
+ for (Integer key : keys) {
+ assertEquals("Redundant loadAll call.", 1, cnts[key].incrementAndGet());
+
+ loaded.put(key, key);
+ }
+
+ return loaded;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override public void write(Cache.Entry<? extends Integer, ? extends Integer> entry) {
+ // No-op.
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override public void writeAll(Collection<Cache.Entry<? extends Integer, ? extends Integer>> entries) {
+ // No-op.
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override public void delete(Object key) {
+ // No-op.
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override public void deleteAll(Collection<?> keys) {
+ // No-op.
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override public void sessionEnd(boolean commit) {
+ // No-op.
+ }
+ }
+}