You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2015/11/13 16:06:42 UTC
[16/23] ignite git commit: IGNITE-1681: loadAll threshold is not
configurable for CacheStoreBalancingWrapper
IGNITE-1681: loadAll threshold is not configurable for CacheStoreBalancingWrapper
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/967cfcbb
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/967cfcbb
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/967cfcbb
Branch: refs/heads/ignite-perftest
Commit: 967cfcbb5b87e172a48e619b18e3988f4ef2e428
Parents: 92881e0
Author: Michael Griggs <en...@gmail.com>
Authored: Wed Nov 11 13:45:48 2015 +0300
Committer: Denis Magda <dm...@gridgain.com>
Committed: Wed Nov 11 13:45:48 2015 +0300
----------------------------------------------------------------------
.../configuration/CacheConfiguration.java | 37 ++++++++++++++
.../cache/CacheStoreBalancingWrapper.java | 5 +-
.../store/GridCacheStoreManagerAdapter.java | 3 +-
.../store/GridCacheBalancingStoreSelfTest.java | 53 +++++++++++++++++---
4 files changed, 88 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/967cfcbb/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
index b7276c9..1b8d41c 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
@@ -174,9 +174,15 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
/** Default size for onheap SQL row cache size. */
public static final int DFLT_SQL_ONHEAP_ROW_CACHE_SIZE = 10 * 1024;
+ /** Default threshold for concurrent loading of keys from {@link CacheStore}. */
+ public static final int DFLT_CONCURRENT_LOAD_ALL_THRESHOLD = 5;
+
/** Cache name. */
private String name;
+ /** Threshold for concurrent loading of keys from {@link CacheStore}. */
+ private int storeConcurrentLoadAllThreshold = DFLT_CONCURRENT_LOAD_ALL_THRESHOLD;
+
/** Rebalance thread pool size. */
@Deprecated
private int rebalancePoolSize = DFLT_REBALANCE_THREAD_POOL_SIZE;
@@ -834,6 +840,37 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
}
/**
+ * Gets the threshold used in cases when values for multiple keys are being loaded from an underlying
+ * {@link CacheStore} in parallel. In the situation when several threads load the same or intersecting set of keys
+ * and the total number of keys to load is less or equal to this threshold then there will be no a second call to
+ * the storage in order to load a key from thread A if the same key is already being loaded by thread B.
+ *
+ * The threshold should be controlled wisely. On the one hand if it's set to a big value then the interaction with
+ * a storage during the load of missing keys will be minimal. On the other hand the big value may result in
+ * significant performance degradation because it is needed to check for every key whether it's being loaded or not.
+ *
+ * When not set, default value is {@link #DFLT_CONCURRENT_LOAD_ALL_THRESHOLD}.
+ *
+ * @return The concurrent load-all threshold.
+ */
+ public int getStoreConcurrentLoadAllThreshold() {
+ return storeConcurrentLoadAllThreshold;
+ }
+
+ /**
+ * Sets the concurrent load-all threshold used for cases when keys' values are being loaded from {@link CacheStore}
+ * in parallel.
+ *
+ * @param storeConcurrentLoadAllThreshold The concurrent load-all threshold.
+ * @return {@code this} for chaining.
+ */
+ public CacheConfiguration<K, V> setStoreConcurrentLoadAllThreshold(int storeConcurrentLoadAllThreshold) {
+ this.storeConcurrentLoadAllThreshold = storeConcurrentLoadAllThreshold;
+
+ return this;
+ }
+
+ /**
* Gets key topology resolver to provide mapping from keys to nodes.
*
* @return Key topology resolver to provide mapping from keys to nodes.
http://git-wip-us.apache.org/repos/asf/ignite/blob/967cfcbb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheStoreBalancingWrapper.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheStoreBalancingWrapper.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheStoreBalancingWrapper.java
index 93075f3..8992326 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheStoreBalancingWrapper.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheStoreBalancingWrapper.java
@@ -28,6 +28,7 @@ import javax.cache.integration.CacheLoaderException;
import javax.cache.integration.CacheWriterException;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.store.CacheStore;
+import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.lang.IgniteBiInClosure;
@@ -39,7 +40,7 @@ import org.jsr166.ConcurrentHashMap8;
*/
public class CacheStoreBalancingWrapper<K, V> implements CacheStore<K, V> {
/** */
- public static final int DFLT_LOAD_ALL_THRESHOLD = 5;
+ public static final int DFLT_LOAD_ALL_THRESHOLD = CacheConfiguration.DFLT_CONCURRENT_LOAD_ALL_THRESHOLD;
/** Delegate store. */
private CacheStore<K, V> delegate;
@@ -306,4 +307,4 @@ public class CacheStoreBalancingWrapper<K, V> implements CacheStore<K, V> {
return get().get(key);
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/967cfcbb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
index dd54da5..6bfafd4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
@@ -112,7 +112,8 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
store = cacheStoreWrapper(ctx, cfgStore, cfg);
- singleThreadGate = store == null ? null : new CacheStoreBalancingWrapper<>(store);
+ singleThreadGate = store == null ? null : new CacheStoreBalancingWrapper<>(store,
+ cfg.getStoreConcurrentLoadAllThreshold());
ThreadLocal<SessionData> sesHolder0 = null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/967cfcbb/modules/core/src/test/java/org/apache/ignite/cache/store/GridCacheBalancingStoreSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/GridCacheBalancingStoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/store/GridCacheBalancingStoreSelfTest.java
index 1e3e4b4..bfbb08c 100644
--- a/modules/core/src/test/java/org/apache/ignite/cache/store/GridCacheBalancingStoreSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cache/store/GridCacheBalancingStoreSelfTest.java
@@ -32,6 +32,7 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.cache.Cache;
+import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.processors.cache.CacheStoreBalancingWrapper;
@@ -127,15 +128,35 @@ public class GridCacheBalancingStoreSelfTest extends GridCommonAbstractTest {
* @throws Exception If failed.
*/
public void testConcurrentLoad() throws Exception {
- int threads = 5;
+ CacheConfiguration cfg = new CacheConfiguration();
- final int keys = 50;
+ assertEquals(CacheStoreBalancingWrapper.DFLT_LOAD_ALL_THRESHOLD, cfg.getStoreConcurrentLoadAllThreshold());
+ doTestConcurrentLoad(5, 50, CacheStoreBalancingWrapper.DFLT_LOAD_ALL_THRESHOLD);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testConcurrentLoadCustomThreshold() throws Exception {
+ CacheConfiguration cfg = new CacheConfiguration();
+
+ cfg.setStoreConcurrentLoadAllThreshold(15);
+
+ assertEquals(15, cfg.getStoreConcurrentLoadAllThreshold());
+
+ doTestConcurrentLoad(5, 50, cfg.getStoreConcurrentLoadAllThreshold());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ private void doTestConcurrentLoad(int threads, final int keys, int threshold) throws Exception {
final CyclicBarrier beforeBarrier = new CyclicBarrier(threads);
ConcurrentVerifyStore store = new ConcurrentVerifyStore(keys);
- final CacheStoreBalancingWrapper<Integer, Integer> wrapper =new CacheStoreBalancingWrapper<>(store);
+ final CacheStoreBalancingWrapper<Integer, Integer> wrapper = new CacheStoreBalancingWrapper<>(store, threshold);
GridTestUtils.runMultiThreaded(new Runnable() {
@Override public void run() {
@@ -159,17 +180,35 @@ public class GridCacheBalancingStoreSelfTest extends GridCommonAbstractTest {
* @throws Exception If failed.
*/
public void testConcurrentLoadAll() throws Exception {
- int threads = 5;
+ CacheConfiguration cfg = new CacheConfiguration();
- final int threshold = 5;
+ assertEquals(CacheStoreBalancingWrapper.DFLT_LOAD_ALL_THRESHOLD, cfg.getStoreConcurrentLoadAllThreshold());
- final int keysCnt = 100;
+ doTestConcurrentLoadAll(5, CacheStoreBalancingWrapper.DFLT_LOAD_ALL_THRESHOLD, 150);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testConcurrentLoadAllCustomThreshold() throws Exception {
+ CacheConfiguration cfg = new CacheConfiguration();
+
+ cfg.setStoreConcurrentLoadAllThreshold(15);
+ assertEquals(15, cfg.getStoreConcurrentLoadAllThreshold());
+
+ doTestConcurrentLoadAll(5, cfg.getStoreConcurrentLoadAllThreshold(), 150);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ private void doTestConcurrentLoadAll(int threads, final int threshold, final int keysCnt) throws Exception {
final CyclicBarrier beforeBarrier = new CyclicBarrier(threads);
ConcurrentVerifyStore store = new ConcurrentVerifyStore(keysCnt);
- final CacheStoreBalancingWrapper<Integer, Integer> wrapper = new CacheStoreBalancingWrapper<>(store);
+ final CacheStoreBalancingWrapper<Integer, Integer> wrapper = new CacheStoreBalancingWrapper<>(store, threshold);
GridTestUtils.runMultiThreaded(new Runnable() {
@Override public void run() {