You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by yz...@apache.org on 2015/11/11 17:17:39 UTC

[20/23] ignite git commit: IGNITE-1681: loadAll threshold is not configurable for CacheStoreBalancingWrapper

IGNITE-1681: loadAll threshold is not configurable for CacheStoreBalancingWrapper


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/967cfcbb
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/967cfcbb
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/967cfcbb

Branch: refs/heads/ignite-1.5-atomic-opt
Commit: 967cfcbb5b87e172a48e619b18e3988f4ef2e428
Parents: 92881e0
Author: Michael Griggs <en...@gmail.com>
Authored: Wed Nov 11 13:45:48 2015 +0300
Committer: Denis Magda <dm...@gridgain.com>
Committed: Wed Nov 11 13:45:48 2015 +0300

----------------------------------------------------------------------
 .../configuration/CacheConfiguration.java       | 37 ++++++++++++++
 .../cache/CacheStoreBalancingWrapper.java       |  5 +-
 .../store/GridCacheStoreManagerAdapter.java     |  3 +-
 .../store/GridCacheBalancingStoreSelfTest.java  | 53 +++++++++++++++++---
 4 files changed, 88 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/967cfcbb/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
index b7276c9..1b8d41c 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
@@ -174,9 +174,15 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
     /** Default size for onheap SQL row cache size. */
     public static final int DFLT_SQL_ONHEAP_ROW_CACHE_SIZE = 10 * 1024;
 
+    /** Default threshold for concurrent loading of keys from {@link CacheStore}. */
+    public static final int DFLT_CONCURRENT_LOAD_ALL_THRESHOLD = 5;
+
     /** Cache name. */
     private String name;
 
+    /** Threshold for concurrent loading of keys from {@link CacheStore}. */
+    private int storeConcurrentLoadAllThreshold = DFLT_CONCURRENT_LOAD_ALL_THRESHOLD;
+
     /** Rebalance thread pool size. */
     @Deprecated
     private int rebalancePoolSize = DFLT_REBALANCE_THREAD_POOL_SIZE;
@@ -834,6 +840,37 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
     }
 
     /**
+     * Gets the threshold used in cases when values for multiple keys are being loaded from an underlying
+     * {@link CacheStore} in parallel. In the situation when several threads load the same or intersecting set of keys
+     * and the total number of keys to load is less or equal to this threshold then there will be no a second call to
+     * the storage in order to load a key from thread A if the same key is already being loaded by thread B.
+     *
+     * The threshold should be controlled wisely. On the one hand if it's set to a big value then the interaction with
+     * a storage during the load of missing keys will be minimal. On the other hand the big value may result in
+     * significant performance degradation because it is needed to check for every key whether it's being loaded or not.
+     *
+     * When not set, default value is {@link #DFLT_CONCURRENT_LOAD_ALL_THRESHOLD}.
+     *
+     * @return The concurrent load-all threshold.
+     */
+    public int getStoreConcurrentLoadAllThreshold() {
+        return storeConcurrentLoadAllThreshold;
+    }
+
+    /**
+     * Sets the concurrent load-all threshold used for cases when keys' values are being loaded from {@link CacheStore}
+     * in parallel.
+     *
+     * @param storeConcurrentLoadAllThreshold The concurrent load-all threshold.
+     * @return {@code this} for chaining.
+     */
+    public CacheConfiguration<K, V> setStoreConcurrentLoadAllThreshold(int storeConcurrentLoadAllThreshold) {
+        this.storeConcurrentLoadAllThreshold = storeConcurrentLoadAllThreshold;
+
+        return this;
+    }
+
+    /**
      * Gets key topology resolver to provide mapping from keys to nodes.
      *
      * @return Key topology resolver to provide mapping from keys to nodes.

http://git-wip-us.apache.org/repos/asf/ignite/blob/967cfcbb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheStoreBalancingWrapper.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheStoreBalancingWrapper.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheStoreBalancingWrapper.java
index 93075f3..8992326 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheStoreBalancingWrapper.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheStoreBalancingWrapper.java
@@ -28,6 +28,7 @@ import javax.cache.integration.CacheLoaderException;
 import javax.cache.integration.CacheWriterException;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cache.store.CacheStore;
+import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.lang.IgniteBiInClosure;
@@ -39,7 +40,7 @@ import org.jsr166.ConcurrentHashMap8;
  */
 public class CacheStoreBalancingWrapper<K, V> implements CacheStore<K, V> {
     /** */
-    public static final int DFLT_LOAD_ALL_THRESHOLD = 5;
+    public static final int DFLT_LOAD_ALL_THRESHOLD = CacheConfiguration.DFLT_CONCURRENT_LOAD_ALL_THRESHOLD;
 
     /** Delegate store. */
     private CacheStore<K, V> delegate;
@@ -306,4 +307,4 @@ public class CacheStoreBalancingWrapper<K, V> implements CacheStore<K, V> {
             return get().get(key);
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/967cfcbb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
index dd54da5..6bfafd4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
@@ -112,7 +112,8 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
 
         store = cacheStoreWrapper(ctx, cfgStore, cfg);
 
-        singleThreadGate = store == null ? null : new CacheStoreBalancingWrapper<>(store);
+        singleThreadGate = store == null ? null : new CacheStoreBalancingWrapper<>(store,
+            cfg.getStoreConcurrentLoadAllThreshold());
 
         ThreadLocal<SessionData> sesHolder0 = null;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/967cfcbb/modules/core/src/test/java/org/apache/ignite/cache/store/GridCacheBalancingStoreSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/GridCacheBalancingStoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/store/GridCacheBalancingStoreSelfTest.java
index 1e3e4b4..bfbb08c 100644
--- a/modules/core/src/test/java/org/apache/ignite/cache/store/GridCacheBalancingStoreSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cache/store/GridCacheBalancingStoreSelfTest.java
@@ -32,6 +32,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import javax.cache.Cache;
+import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.processors.cache.CacheStoreBalancingWrapper;
@@ -127,15 +128,35 @@ public class GridCacheBalancingStoreSelfTest extends GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     public void testConcurrentLoad() throws Exception {
-        int threads = 5;
+        CacheConfiguration cfg = new CacheConfiguration();
 
-        final int keys = 50;
+        assertEquals(CacheStoreBalancingWrapper.DFLT_LOAD_ALL_THRESHOLD, cfg.getStoreConcurrentLoadAllThreshold());
 
+        doTestConcurrentLoad(5, 50, CacheStoreBalancingWrapper.DFLT_LOAD_ALL_THRESHOLD);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testConcurrentLoadCustomThreshold() throws Exception {
+        CacheConfiguration cfg = new CacheConfiguration();
+
+        cfg.setStoreConcurrentLoadAllThreshold(15);
+
+        assertEquals(15, cfg.getStoreConcurrentLoadAllThreshold());
+
+        doTestConcurrentLoad(5, 50, cfg.getStoreConcurrentLoadAllThreshold());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void doTestConcurrentLoad(int threads, final int keys, int threshold) throws Exception {
         final CyclicBarrier beforeBarrier = new CyclicBarrier(threads);
 
         ConcurrentVerifyStore store = new ConcurrentVerifyStore(keys);
 
-        final CacheStoreBalancingWrapper<Integer, Integer> wrapper =new CacheStoreBalancingWrapper<>(store);
+        final CacheStoreBalancingWrapper<Integer, Integer> wrapper = new CacheStoreBalancingWrapper<>(store, threshold);
 
         GridTestUtils.runMultiThreaded(new Runnable() {
             @Override public void run() {
@@ -159,17 +180,35 @@ public class GridCacheBalancingStoreSelfTest extends GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     public void testConcurrentLoadAll() throws Exception {
-        int threads = 5;
+        CacheConfiguration cfg = new CacheConfiguration();
 
-        final int threshold = 5;
+        assertEquals(CacheStoreBalancingWrapper.DFLT_LOAD_ALL_THRESHOLD, cfg.getStoreConcurrentLoadAllThreshold());
 
-        final int keysCnt = 100;
+        doTestConcurrentLoadAll(5, CacheStoreBalancingWrapper.DFLT_LOAD_ALL_THRESHOLD, 150);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testConcurrentLoadAllCustomThreshold() throws Exception {
+        CacheConfiguration cfg = new CacheConfiguration();
+
+        cfg.setStoreConcurrentLoadAllThreshold(15);
 
+        assertEquals(15, cfg.getStoreConcurrentLoadAllThreshold());
+
+        doTestConcurrentLoadAll(5, cfg.getStoreConcurrentLoadAllThreshold(), 150);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void doTestConcurrentLoadAll(int threads, final int threshold, final int keysCnt) throws Exception {
         final CyclicBarrier beforeBarrier = new CyclicBarrier(threads);
 
         ConcurrentVerifyStore store = new ConcurrentVerifyStore(keysCnt);
 
-        final CacheStoreBalancingWrapper<Integer, Integer> wrapper = new CacheStoreBalancingWrapper<>(store);
+        final CacheStoreBalancingWrapper<Integer, Integer> wrapper = new CacheStoreBalancingWrapper<>(store, threshold);
 
         GridTestUtils.runMultiThreaded(new Runnable() {
             @Override public void run() {