You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/04/18 15:09:46 UTC
[6/8] ignite git commit: IGNITE-4927 Write behind - add an option to
skip write coalescing
IGNITE-4927 Write behind - add an option to skip write coalescing
Signed-off-by: nikolay_tikhonov <nt...@gridgain.com>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/22580e19
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/22580e19
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/22580e19
Branch: refs/heads/ignite-1561-1
Commit: 22580e19b7ae5d11b8c299e2b3d92f5c8b9f0e8c
Parents: c4d8180
Author: Alexander Belyak <al...@xored.com>
Authored: Tue Apr 18 14:56:50 2017 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Tue Apr 18 15:57:45 2017 +0300
----------------------------------------------------------------------
.../configuration/CacheConfiguration.java | 33 +
.../cache/store/GridCacheWriteBehindStore.java | 614 ++++++++++++++++---
...idCacheWriteBehindStoreAbstractSelfTest.java | 24 +-
.../GridCacheWriteBehindStoreAbstractTest.java | 4 +
...heWriteBehindStoreMultithreadedSelfTest.java | 88 ++-
.../GridCacheWriteBehindStoreSelfTest.java | 159 ++++-
...ClientWriteBehindStoreNonCoalescingTest.java | 175 ++++++
.../IgniteCacheWriteBehindTestSuite.java | 2 +
8 files changed, 978 insertions(+), 121 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/22580e19/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
index b5afba4..2308a10 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
@@ -176,6 +176,9 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
/** Default batch size for write-behind cache store. */
public static final int DFLT_WRITE_BEHIND_BATCH_SIZE = 512;
+ /** Default write coalescing for write-behind cache store. */
+ public static final boolean DFLT_WRITE_BEHIND_COALESCING = true;
+
/** Default maximum number of query iterators that can be stored. */
public static final int DFLT_MAX_QUERY_ITERATOR_CNT = 1024;
@@ -310,6 +313,9 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
/** Maximum batch size for write-behind cache store. */
private int writeBehindBatchSize = DFLT_WRITE_BEHIND_BATCH_SIZE;
+ /** Write coalescing flag for write-behind cache store */
+ private boolean writeBehindCoalescing = DFLT_WRITE_BEHIND_COALESCING;
+
/** Maximum number of query iterators that can be stored. */
private int maxQryIterCnt = DFLT_MAX_QUERY_ITERATOR_CNT;
@@ -454,6 +460,7 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
tmLookupClsName = cc.getTransactionManagerLookupClassName();
topValidator = cc.getTopologyValidator();
writeBehindBatchSize = cc.getWriteBehindBatchSize();
+ writeBehindCoalescing = cc.getWriteBehindCoalescing();
writeBehindEnabled = cc.isWriteBehindEnabled();
writeBehindFlushFreq = cc.getWriteBehindFlushFrequency();
writeBehindFlushSize = cc.getWriteBehindFlushSize();
@@ -1287,6 +1294,32 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
}
/**
+ * Write coalescing flag for write-behind cache store operations. Store operations (get or remove)
+ * with the same key are combined or coalesced to single, resulting operation
+ * to reduce pressure to underlying cache store.
+ * <p/>
+ * If not provided, default value is {@link #DFLT_WRITE_BEHIND_COALESCING}.
+ *
+ * @return Write coalescing flag.
+ */
+ public boolean getWriteBehindCoalescing() {
+ return writeBehindCoalescing;
+ }
+
+ /**
+ * Sets write coalescing flag for write-behind cache.
+ *
+ * @param writeBehindCoalescing Write coalescing flag.
+ * @see #getWriteBehindCoalescing()
+ * @return {@code this} for chaining.
+ */
+ public CacheConfiguration<K, V> setWriteBehindCoalescing(boolean writeBehindCoalescing) {
+ this.writeBehindCoalescing = writeBehindCoalescing;
+
+ return this;
+ }
+
+ /**
* Use {@link IgniteConfiguration#getRebalanceThreadPoolSize()} instead.
*
* @return Size of rebalancing thread pool.
http://git-wip-us.apache.org/repos/asf/ignite/blob/22580e19/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStore.java
index 91008ce..64238ab 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStore.java
@@ -17,17 +17,19 @@
package org.apache.ignite.internal.processors.cache.store;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
-import java.util.LinkedList;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.cache.integration.CacheWriterException;
@@ -43,9 +45,11 @@ import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.lang.IgniteBiInClosure;
+import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lifecycle.LifecycleAware;
import org.apache.ignite.thread.IgniteThread;
import org.jetbrains.annotations.Nullable;
+import org.jsr166.ConcurrentLinkedDeque8;
import org.jsr166.ConcurrentLinkedHashMap;
import static javax.cache.Cache.Entry;
@@ -65,6 +69,8 @@ import static javax.cache.Cache.Entry;
* <p/>
* Since write operations to the cache store are deferred, transaction support is lost; no
* transaction objects are passed to the underlying store.
+ * <p/>
+ * {@link GridCacheWriteBehindStore} doesn't support concurrent modifications of the same key.
*/
public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, LifecycleAware {
/** Default write cache initial capacity. */
@@ -91,6 +97,9 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
/** Count of worker threads performing underlying store updates. */
private int flushThreadCnt = CacheConfiguration.DFLT_WRITE_FROM_BEHIND_FLUSH_THREAD_CNT;
+ /** Is flush threads count power of two flag. */
+ private boolean flushThreadCntIsPowerOfTwo;
+
/** Cache flush frequency. All pending operations will be performed in not less then this value ms. */
private long cacheFlushFreq = CacheConfiguration.DFLT_WRITE_BEHIND_FLUSH_FREQUENCY;
@@ -98,29 +107,26 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
private int batchSize = CacheConfiguration.DFLT_WRITE_BEHIND_BATCH_SIZE;
/** Ignite instance name. */
- private String igniteInstanceName;
+ private final String igniteInstanceName;
/** Cache name. */
- private String cacheName;
+ private final String cacheName;
/** Underlying store. */
- private CacheStore<K, V> store;
+ private final CacheStore<K, V> store;
/** Write cache. */
private ConcurrentLinkedHashMap<K, StatefulValue<K, V>> writeCache;
/** Flusher threads. */
- private GridWorker[] flushThreads;
+ private Flusher[] flushThreads;
+
+ /** Write coalescing. */
+ private boolean writeCoalescing = CacheConfiguration.DFLT_WRITE_BEHIND_COALESCING;
/** Atomic flag indicating store shutdown. */
private AtomicBoolean stopping = new AtomicBoolean(true);
- /** Flush lock. */
- private Lock flushLock = new ReentrantLock();
-
- /** Condition to determine records available for flush. */
- private Condition canFlush = flushLock.newCondition();
-
/** Variable for counting total cache overflows. */
private AtomicInteger cacheTotalOverflowCntr = new AtomicInteger();
@@ -131,10 +137,16 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
private AtomicInteger retryEntriesCnt = new AtomicInteger();
/** Log. */
- private IgniteLogger log;
+ private final IgniteLogger log;
/** Store manager. */
- private CacheStoreManager storeMgr;
+ private final CacheStoreManager storeMgr;
+
+ /** Flush lock. */
+ private final Lock flushLock = new ReentrantLock();
+
+ /** Condition to determine records available for flush. */
+ private Condition canFlush = flushLock.newCondition();
/**
* Creates a write-behind cache store for the given store.
@@ -193,7 +205,7 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
* <p/>
* If this value is {@code 0}, then flush is performed only on time-elapsing basis. However,
* when this value is {@code 0}, the cache critical size is set to
- * {@link CacheConfiguration#DFLT_WRITE_BEHIND_CRITICAL_SIZE}
+ * {@link CacheConfiguration#DFLT_WRITE_BEHIND_CRITICAL_SIZE}.
*
* @return Buffer size that triggers flush procedure.
*/
@@ -208,6 +220,7 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
*/
public void setFlushThreadCount(int flushThreadCnt) {
this.flushThreadCnt = flushThreadCnt;
+ this.flushThreadCntIsPowerOfTwo = (flushThreadCnt & (flushThreadCnt - 1)) == 0;
}
/**
@@ -220,6 +233,24 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
}
/**
+ * Sets the write coalescing flag.
+ *
+ * @param writeCoalescing Write coalescing flag.
+ */
+ public void setWriteCoalescing(boolean writeCoalescing) {
+ this.writeCoalescing = writeCoalescing;
+ }
+
+ /**
+ * Gets the write coalescing flag.
+ *
+ * @return Write coalescing flag.
+ */
+ public boolean getWriteCoalescing() {
+ return writeCoalescing;
+ }
+
+ /**
* Sets the cache flush frequency. All pending operations on the underlying store will be performed
* within time interval not less then this value.
*
@@ -266,7 +297,16 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
* @return Total count of entries in cache store internal buffer.
*/
public int getWriteBehindBufferSize() {
- return writeCache.sizex();
+ if (writeCoalescing)
+ return writeCache.sizex();
+ else {
+ int size = 0;
+
+ for (Flusher f : flushThreads)
+ size += f.size();
+
+ return size;
+ }
}
/**
@@ -292,14 +332,15 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
if (cacheCriticalSize == 0)
cacheCriticalSize = CacheConfiguration.DFLT_WRITE_BEHIND_CRITICAL_SIZE;
- flushThreads = new GridWorker[flushThreadCnt];
+ flushThreads = new GridCacheWriteBehindStore.Flusher[flushThreadCnt];
- writeCache = new ConcurrentLinkedHashMap<>(initCap, 0.75f, concurLvl);
+ if (writeCoalescing)
+ writeCache = new ConcurrentLinkedHashMap<>(initCap, 0.75f, concurLvl);
for (int i = 0; i < flushThreads.length; i++) {
flushThreads[i] = new Flusher(igniteInstanceName, "flusher-" + i, log);
- new IgniteThread(flushThreads[i]).start();
+ flushThreads[i].start();
}
}
}
@@ -344,7 +385,10 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
if (log.isDebugEnabled())
log.debug("Stopping write-behind store for cache '" + cacheName + '\'');
- wakeUp();
+ for (Flusher f : flushThreads) {
+ if (!f.isEmpty())
+ f.wakeUp();
+ }
boolean graceful = true;
@@ -352,7 +396,7 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
graceful &= U.join(worker, log);
if (!graceful)
- log.warning("Shutdown was aborted");
+ log.warning("Write behind store shutdown was aborted.");
}
}
@@ -361,7 +405,10 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
* @throws IgniteCheckedException If failed.
*/
public void forceFlush() throws IgniteCheckedException {
- wakeUp();
+ for (Flusher f : flushThreads) {
+ if (!f.isEmpty())
+ f.wakeUp();
+ }
}
/** {@inheritDoc} */
@@ -376,10 +423,15 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
Map<K, V> loaded = new HashMap<>();
- Collection<K> remaining = new LinkedList<>();
+ Collection<K> remaining = null;
for (K key : keys) {
- StatefulValue<K, V> val = writeCache.get(key);
+ StatefulValue<K, V> val;
+
+ if (writeCoalescing)
+ val = writeCache.get(key);
+ else
+ val = flusher(key).flusherWriteMap.get(key);
if (val != null) {
val.readLock().lock();
@@ -394,12 +446,16 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
val.readLock().unlock();
}
}
- else
+ else {
+ if (remaining == null)
+ remaining = new ArrayList<>();
+
remaining.add(key);
+ }
}
// For items that were not found in queue.
- if (!remaining.isEmpty()) {
+ if (remaining != null && !remaining.isEmpty()) {
Map<K, V> loaded0 = store.loadAll(remaining);
if (loaded0 != null)
@@ -414,7 +470,12 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
if (log.isDebugEnabled())
log.debug("Store load [key=" + key + ']');
- StatefulValue<K, V> val = writeCache.get(key);
+ StatefulValue<K, V> val;
+
+ if (writeCoalescing)
+ val = writeCache.get(key);
+ else
+ val = flusher(key).flusherWriteMap.get(key);
if (val != null) {
val.readLock().lock();
@@ -493,7 +554,7 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
*
* @param key Key for which update is performed.
* @param val New value, may be null for remove operation.
- * @param operation Updated value status
+ * @param operation Updated value status.
* @throws IgniteInterruptedCheckedException If interrupted while waiting for value to be flushed.
*/
private void updateCache(K key,
@@ -502,8 +563,27 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
throws IgniteInterruptedCheckedException {
StatefulValue<K, V> newVal = new StatefulValue<>(val, operation);
+ if (writeCoalescing)
+ putToWriteCache(key, newVal);
+ else
+ flusher(key).putToFlusherWriteCache(key, newVal);
+ }
+
+ /**
+ * Performs flush-consistent writeCache update for the given key.
+ *
+ * @param key Key for which update is performed.
+ * @param newVal stateful value to put
+ * @throws IgniteInterruptedCheckedException If interrupted while waiting for value to be flushed.
+ */
+ private void putToWriteCache(
+ K key,
+ StatefulValue<K, V> newVal)
+ throws IgniteInterruptedCheckedException {
StatefulValue<K, V> prev;
+ assert writeCoalescing : "Unexpected write coalescing.";
+
while ((prev = writeCache.putIfAbsent(key, newVal)) != null) {
prev.writeLock().lock();
@@ -523,7 +603,7 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
assert prev.status() == ValueStatus.NEW || prev.status() == ValueStatus.RETRY;
- prev.update(val, operation, ValueStatus.NEW);
+ prev.update(newVal.val, newVal.operation(), ValueStatus.NEW);
break;
}
@@ -533,14 +613,33 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
}
// Now check the map size
- if (writeCache.sizex() > cacheCriticalSize)
+ int cacheSize = getWriteBehindBufferSize();
+
+ if (cacheSize > cacheCriticalSize)
// Perform single store update in the same thread.
flushSingleValue();
- else if (cacheMaxSize > 0 && writeCache.sizex() > cacheMaxSize)
+ else if (cacheMaxSize > 0 && cacheSize > cacheMaxSize)
wakeUp();
}
/**
+ * Return flusher by by key.
+ *
+ * @param key Key for search.
+ * @return flusher.
+ */
+ private Flusher flusher(K key) {
+ int h, idx;
+
+ if (flushThreadCntIsPowerOfTwo)
+ idx = ((h = key.hashCode()) ^ (h >>> 16)) & (flushThreadCnt - 1);
+ else
+ idx = ((h = key.hashCode()) ^ (h >>> 16)) % flushThreadCnt;
+
+ return flushThreads[idx];
+ }
+
+ /**
* Flushes one upcoming value to the underlying store. Called from
* {@link #updateCache(Object, Entry, StoreOperation)} method in case when current map size exceeds
* critical size.
@@ -549,7 +648,7 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
cacheOverflowCntr.incrementAndGet();
try {
- Map<K, StatefulValue<K, V>> batch = null;
+ Map<K, StatefulValue<K, V>> batch;
for (Map.Entry<K, StatefulValue<K, V>> e : writeCache.entrySet()) {
StatefulValue<K, V> val = e.getValue();
@@ -577,7 +676,7 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
}
if (!batch.isEmpty()) {
- applyBatch(batch, false);
+ applyBatch(batch, false, null);
cacheTotalOverflowCntr.incrementAndGet();
@@ -595,9 +694,12 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
*
* @param valMap Batch map.
* @param initSes {@code True} if need to initialize session.
+ * @param flusher Flusher, assotiated with all keys in batch (have sense in write coalescing = false mode)
+ * @return {@code True} if batch was successfully applied, {@code False} otherwise.
*/
- private void applyBatch(Map<K, StatefulValue<K, V>> valMap, boolean initSes) {
+ private boolean applyBatch(Map<K, StatefulValue<K, V>> valMap, boolean initSes, Flusher flusher) {
assert valMap.size() <= batchSize;
+ assert !valMap.isEmpty();
StoreOperation operation = null;
@@ -615,7 +717,9 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
batch.put(e.getKey(), e.getValue().entry());
}
- if (updateStore(operation, batch, initSes)) {
+ boolean result = updateStore(operation, batch, initSes, flusher);
+
+ if (result) {
for (Map.Entry<K, StatefulValue<K, V>> e : valMap.entrySet()) {
StatefulValue<K, V> val = e.getValue();
@@ -624,12 +728,22 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
try {
val.status(ValueStatus.FLUSHED);
- StatefulValue<K, V> prev = writeCache.remove(e.getKey());
+ if (writeCoalescing) {
+ StatefulValue<K, V> prev = writeCache.remove(e.getKey());
- // Additional check to ensure consistency.
- assert prev == val : "Map value for key " + e.getKey() + " was updated during flush";
+ // Additional check to ensure consistency.
+ assert prev == val : "Map value for key " + e.getKey() + " was updated during flush";
- val.signalFlushed();
+ val.signalFlushed();
+ }
+ else {
+ Flusher f = flusher(e.getKey());
+
+ // Can remove using equal because if map contains another similar value it has different state.
+ f.flusherWriteMap.remove(e.getKey(), e.getValue());
+
+ val.signalFlushed();
+ }
}
finally {
val.writeLock().unlock();
@@ -653,6 +767,8 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
}
}
}
+
+ return result;
}
/**
@@ -666,13 +782,16 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
* @param operation Status indicating operation that should be performed.
* @param vals Key-Value map.
* @param initSes {@code True} if need to initialize session.
+ * @param flusher Flusher, assotiated with vals keys (in writeCoalescing=false mode)
* @return {@code true} if value may be deleted from the write cache,
* {@code false} otherwise
*/
- private boolean updateStore(StoreOperation operation,
+ private boolean updateStore(
+ StoreOperation operation,
Map<K, Entry<? extends K, ? extends V>> vals,
- boolean initSes) {
-
+ boolean initSes,
+ Flusher flusher
+ ) {
try {
if (initSes && storeMgr != null)
storeMgr.writeBehindSessionInit();
@@ -707,7 +826,14 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
catch (Exception e) {
LT.error(log, e, "Unable to update underlying store: " + store);
- if (writeCache.sizex() > cacheCriticalSize || stopping.get()) {
+ boolean overflow;
+
+ if (writeCoalescing)
+ overflow = writeCache.sizex() > cacheCriticalSize || stopping.get();
+ else
+ overflow = flusher.isOverflowed() || stopping.get();
+
+ if (overflow) {
for (Map.Entry<K, Entry<? extends K, ? extends V>> entry : vals.entrySet()) {
Object val = entry.getValue() != null ? entry.getValue().getValue() : null;
@@ -738,29 +864,163 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
}
/**
- * Thread that performs time-based flushing of written values to the underlying storage.
+ * Thread that performs time/size-based flushing of written values to the underlying storage.
*/
private class Flusher extends GridWorker {
+ /** Queue to flush. */
+ private final ConcurrentLinkedDeque8<IgniteBiTuple<K, StatefulValue<K,V>>> queue;
+
+ /** Flusher write map. */
+ private final ConcurrentHashMap<K, StatefulValue<K,V>> flusherWriteMap;
+
+ /** Critical size of flusher local queue. */
+ private final int flusherCacheCriticalSize;
+
+ /** Flusher parked flag. */
+ private volatile boolean parked;
+
+ /** Flusher thread. */
+ protected Thread thread;
+
+ /** Cache flushing frequence in nanos. */
+ protected long cacheFlushFreqNanos = cacheFlushFreq * 1000;
+
+ /** Writer lock. */
+ private final Lock flusherWriterLock = new ReentrantLock();
+
+ /** Confition to determine available space for flush. */
+ private Condition flusherWriterCanWrite = flusherWriterLock.newCondition();
+
/** {@inheritDoc */
- protected Flusher(String igniteInstanceName, String name, IgniteLogger log) {
+ protected Flusher(String igniteInstanceName,
+ String name,
+ IgniteLogger log) {
super(igniteInstanceName, name, log);
+
+ flusherCacheCriticalSize = cacheCriticalSize/flushThreadCnt;
+
+ assert flusherCacheCriticalSize > batchSize;
+
+ if (writeCoalescing) {
+ queue = null;
+ flusherWriteMap = null;
+ }
+ else {
+ queue = new ConcurrentLinkedDeque8<>();
+ flusherWriteMap = new ConcurrentHashMap<>(initCap, 0.75f, concurLvl);
+ }
+ }
+
+ /** Start flusher thread */
+ protected void start() {
+ thread = new IgniteThread(this);
+ thread.start();
+ }
+
+ /**
+ * Performs flush-consistent flusher writeCache update for the given key.
+ *
+ * @param key Key for which update is performed.
+ * @param newVal stateful value to put
+ * @throws IgniteInterruptedCheckedException If interrupted while waiting for value to be flushed.
+ */
+ private void putToFlusherWriteCache(
+ K key,
+ StatefulValue<K, V> newVal)
+ throws IgniteInterruptedCheckedException {
+ assert !writeCoalescing : "Unexpected write coalescing.";
+
+ if (queue.sizex() > flusherCacheCriticalSize) {
+ while (queue.sizex() > flusherCacheCriticalSize) {
+ wakeUp();
+
+ flusherWriterLock.lock();
+
+ try {
+ // Wait for free space in flusher queue
+ while (queue.sizex() >= flusherCacheCriticalSize && !stopping.get()) {
+ if (cacheFlushFreq > 0)
+ flusherWriterCanWrite.await(cacheFlushFreq, TimeUnit.MILLISECONDS);
+ else
+ flusherWriterCanWrite.await();
+ }
+
+ cacheTotalOverflowCntr.incrementAndGet();
+ }
+ catch (InterruptedException e) {
+ if (log.isDebugEnabled())
+ log.debug("Caught interrupted exception: " + e);
+
+ Thread.currentThread().interrupt();
+ }
+ finally {
+ flusherWriterLock.unlock();
+ }
+ }
+
+ cacheTotalOverflowCntr.incrementAndGet();
+ }
+
+ queue.add(F.t(key, newVal));
+
+ flusherWriteMap.put(key, newVal);
+ }
+
+ /**
+ * Get overflowed flag.
+ *
+ * @return {@code True} if write behind flusher is overflowed,
+ * {@code False} otherwise.
+ */
+ public boolean isOverflowed() {
+ if (writeCoalescing)
+ return writeCache.sizex() > cacheCriticalSize;
+ else
+ return queue.sizex() > flusherCacheCriticalSize;
+ }
+
+ /**
+ * Get write behind flusher size.
+ *
+ * @return Flusher write behind size.
+ */
+ public int size() {
+ return writeCoalescing ? writeCache.sizex() : queue.sizex();
+ }
+
+ /**
+ * Test if write behind flusher is empty
+ *
+ * @return {@code True} if write behind flusher is empty, {@code False} otherwise
+ */
+ public boolean isEmpty() {
+ return writeCoalescing ? writeCache.isEmpty() : queue.isEmpty();
}
/** {@inheritDoc} */
@Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
- while (!stopping.get() || writeCache.sizex() > 0) {
- awaitOperationsAvailable();
+ if (writeCoalescing) {
+ while (!stopping.get() || writeCache.sizex() > 0) {
+ awaitOperationsAvailableCoalescing();
- flushCache(writeCache.entrySet().iterator());
+ flushCacheCoalescing();
+ }
+ }
+ else {
+ while (!stopping.get() || queue.sizex() > 0) {
+ awaitOperationsAvailableNonCoalescing();
+
+ flushCacheNonCoalescing();
+ }
}
}
/**
- * This method awaits until enough elements in map are available or given timeout is over.
+ * This method awaits until enough elements in flusher queue are available or given timeout is over.
*
* @throws InterruptedException If awaiting was interrupted.
*/
- private void awaitOperationsAvailable() throws InterruptedException {
+ private void awaitOperationsAvailableCoalescing() throws InterruptedException {
flushLock.lock();
try {
@@ -780,74 +1040,215 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
}
/**
+ * This method awaits until enough elements in flusher queue are available or given timeout is over.
+ *
+ * @throws InterruptedException If awaiting was interrupted.
+ */
+ private void awaitOperationsAvailableNonCoalescing() throws InterruptedException {
+ if (queue.sizex() >= batchSize)
+ return;
+
+ parked = true;
+
+ try {
+ for (;;) {
+ if (queue.sizex() >= batchSize)
+ return;
+
+ if (cacheFlushFreq > 0)
+ LockSupport.parkNanos(cacheFlushFreqNanos);
+ else
+ LockSupport.park();
+
+ if (queue.sizex() > 0)
+ return;
+
+ if (Thread.interrupted())
+ throw new InterruptedException();
+
+ if (stopping.get())
+ return;
+ }
+ }
+ finally {
+ parked = false;
+ }
+ }
+
+ /**
+ * Wake up flusher thread.
+ */
+ public void wakeUp() {
+ if (parked)
+ LockSupport.unpark(thread);
+ }
+
+ /**
* Removes values from the write cache and performs corresponding operation
* on the underlying store.
- *
- * @param it Iterator for write cache.
*/
- private void flushCache(Iterator<Map.Entry<K,StatefulValue<K, V>>> it) {
- StoreOperation operation = null;
+ private void flushCacheCoalescing() {
+ StoreOperation prevOperation = null;
- Map<K, StatefulValue<K, V>> batch = null;
- Map<K, StatefulValue<K, V>> pending = U.newLinkedHashMap(batchSize);
+ Map<K, StatefulValue<K, V>> pending = U.newLinkedHashMap(batchSize);
+ Iterator<Map.Entry<K, StatefulValue<K, V>>> it = writeCache.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<K, StatefulValue<K, V>> e = it.next();
-
StatefulValue<K, V> val = e.getValue();
- val.writeLock().lock();
+ if (!val.writeLock().tryLock()) // TODO: stripe write maps to avoid lock contention.
+ continue;
try {
- ValueStatus status = val.status();
+ BatchingResult addRes = tryAddStatefulValue(pending, prevOperation, e.getKey(), val);
- if (acquired(status))
- // Another thread is helping us, continue to the next entry.
- continue;
-
- if (status == ValueStatus.RETRY)
- retryEntriesCnt.decrementAndGet();
+ switch (addRes) {
+ case NEW_BATCH:
+ applyBatch(pending, true, null);
- assert retryEntriesCnt.get() >= 0;
+ pending = U.newLinkedHashMap(batchSize);
- val.status(ValueStatus.PENDING);
+ // No need to test first value in batch
+ val.status(ValueStatus.PENDING);
+ pending.put(e.getKey(), val);
+ prevOperation = val.operation();
- // We scan for the next operation and apply batch on operation change. Null means new batch.
- if (operation == null)
- operation = val.operation();
+ break;
- if (operation != val.operation()) {
- // Operation is changed, so we need to perform a batch.
- batch = pending;
- pending = U.newLinkedHashMap(batchSize);
+ case ADDED:
+ prevOperation = val.operation();
- operation = val.operation();
+ break;
- pending.put(e.getKey(), val);
+ default:
+ assert addRes == BatchingResult.SKIPPED : "Unexpected result: " + addRes;
}
- else
- pending.put(e.getKey(), val);
+ }
+ finally {
+ val.writeLock().unlock();
+ }
+ }
+
+ // Process the remainder.
+ if (!pending.isEmpty())
+ applyBatch(pending, true, null);
+ }
+
+ /**
+ * Removes values from the flusher write queue and performs corresponding operation
+ * on the underlying store.
+ */
+ private void flushCacheNonCoalescing() {
+ StoreOperation prevOperation;
+ Map<K, StatefulValue<K, V>> pending;
+ IgniteBiTuple<K, StatefulValue<K, V>> tuple;
+ boolean applied;
+
+ while(!queue.isEmpty()) {
+ pending = U.newLinkedHashMap(batchSize);
+ prevOperation = null;
+ boolean needNewBatch = false;
+
+ // Collect batch
+ while (!needNewBatch && (tuple = queue.peek()) != null) {
+ BatchingResult addRes = tryAddStatefulValue(pending, prevOperation, tuple.getKey(),
+ tuple.getValue());
+
+ switch (addRes) {
+ case ADDED:
+ prevOperation = tuple.getValue().operation();
+ queue.poll();
+
+ break;
+
+ case SKIPPED:
+ assert false : "Unexpected result: " + addRes;
+
+ break;
- if (pending.size() == batchSize) {
- batch = pending;
- pending = U.newLinkedHashMap(batchSize);
+ case NEW_BATCH:
+ needNewBatch = true;
+ prevOperation = null;
- operation = null;
+ break;
+
+ default:
+ assert false : "Unexpected result: " + addRes;
}
}
- finally {
- val.writeLock().unlock();
+
+ // Process collected batch
+ applied = applyBatch(pending, true, this);
+
+ if (applied) {
+ // Wake up awaiting writers
+ flusherWriterLock.lock();
+
+ try {
+ flusherWriterCanWrite.signalAll();
+ }
+ finally {
+ flusherWriterLock.unlock();
+ }
}
+ else {
+ // Return values to queue
+ ArrayList<Map.Entry<K, StatefulValue<K,V>>> pendingList = new ArrayList(pending.entrySet());
- if (batch != null && !batch.isEmpty()) {
- applyBatch(batch, true);
- batch = null;
+ for (int i = pendingList.size() - 1; i >= 0; i--)
+ queue.addFirst(F.t(pendingList.get(i).getKey(), pendingList.get(i).getValue()));
}
}
+ }
- // Process the remainder.
- if (!pending.isEmpty())
- applyBatch(pending, true);
+ /**
+ * Trying to add key and statefull value pairs into pending map.
+ *
+ * @param pending Map to populate.
+ * @param key Key to add.
+ * @param val Stateful value to add.
+ * @return {@code BatchingResult.ADDED} if pair was sucessfully added,
+ * {@code BatchingResult.SKIPPED} if pair cannot be processed by this thread,
+ * {@code BatchingResult.NEW_BATCH} if pair require new batch (pending map) to be added.
+ */
+ public BatchingResult tryAddStatefulValue(
+ Map<K, StatefulValue<K, V>> pending,
+ StoreOperation prevOperation,
+ K key,
+ StatefulValue<K, V> val
+ ) {
+ ValueStatus status = val.status();
+
+ assert !(pending.isEmpty() && prevOperation != null) : "prev operation cannot be " + prevOperation
+ + " if prev map is empty!";
+
+ if (acquired(status))
+ // Another thread is helping us, continue to the next entry.
+ return BatchingResult.SKIPPED;
+
+ if (!writeCoalescing && pending.containsKey(key))
+ return BatchingResult.NEW_BATCH;
+
+ if (status == ValueStatus.RETRY)
+ retryEntriesCnt.decrementAndGet();
+
+ assert retryEntriesCnt.get() >= 0;
+
+ if (pending.size() == batchSize)
+ return BatchingResult.NEW_BATCH;
+
+ // We scan for the next operation and apply batch on operation change. Null means new batch.
+ if (prevOperation != val.operation() && prevOperation != null)
+ // Operation is changed, so we need to perform a batch.
+ return BatchingResult.NEW_BATCH;
+ else {
+ val.status(ValueStatus.PENDING);
+
+ pending.put(key, val);
+
+ return BatchingResult.ADDED;
+ }
}
}
@@ -861,6 +1262,20 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
}
/**
+ * For test purposes only.
+ *
+ * @return Flusher maps for the underlying store operations.
+ */
+ Map<K, StatefulValue<K,V>>[] flusherMaps() {
+ Map<K, StatefulValue<K,V>>[] result = new Map[flushThreadCnt];
+
+ for (int i=0; i < flushThreadCnt; i++)
+ result[i] = flushThreads[i].flusherWriteMap;
+
+ return result;
+ }
+
+ /**
* Enumeration that represents possible operations on the underlying store.
*/
private enum StoreOperation {
@@ -889,6 +1304,20 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
}
/**
+ * Enumeration that represents possible result of "add to batch" operation.
+ */
+ private enum BatchingResult {
+ /** Added to batch */
+ ADDED,
+
+ /** Skipped. */
+ SKIPPED,
+
+ /** Need new batch. */
+ NEW_BATCH
+ }
+
+ /**
* Checks if given status indicates pending or complete flush operation.
*
* @param status Status to check.
@@ -901,6 +1330,7 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
/**
* A state-value-operation trio.
*
+ * @param <K> Key type.
* @param <V> Value type.
*/
private static class StatefulValue<K, V> extends ReentrantReadWriteLock {
@@ -949,7 +1379,7 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
}
/**
- * @return Value status
+ * @return Value status.
*/
private ValueStatus status() {
return valStatus;
@@ -980,7 +1410,7 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
}
/**
- * Awaits a signal on flush condition
+ * Awaits a signal on flush condition.
*
* @throws IgniteInterruptedCheckedException If thread was interrupted.
*/
@@ -1023,4 +1453,4 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
return S.toString(StatefulValue.class, this);
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/22580e19/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreAbstractSelfTest.java
index 323278f..3bac906 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreAbstractSelfTest.java
@@ -20,12 +20,14 @@ package org.apache.ignite.internal.processors.cache.store;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.cache.CacheEntryImpl;
import org.apache.ignite.internal.processors.cache.GridCacheTestStore;
@@ -59,16 +61,29 @@ public abstract class GridCacheWriteBehindStoreAbstractSelfTest extends GridComm
/**
* Initializes store.
*
- * @param flushThreadCnt Count of flush threads
+ * @param flushThreadCnt Count of flush threads.
* @throws Exception If failed.
*/
protected void initStore(int flushThreadCnt) throws Exception {
+ initStore(flushThreadCnt, CacheConfiguration.DFLT_WRITE_BEHIND_COALESCING);
+ }
+
+ /**
+ * Initializes store.
+ *
+ * @param flushThreadCnt Count of flush threads.
+ * @param writeCoalescing write coalescing flag.
+ * @throws Exception If failed.
+ */
+ protected void initStore(int flushThreadCnt, boolean writeCoalescing) throws Exception {
store = new GridCacheWriteBehindStore<>(null, "", "", log, delegate);
store.setFlushFrequency(FLUSH_FREQUENCY);
store.setFlushSize(CACHE_SIZE);
+ store.setWriteCoalescing(writeCoalescing);
+
store.setFlushThreadCount(flushThreadCnt);
delegate.reset();
@@ -83,8 +98,11 @@ public abstract class GridCacheWriteBehindStoreAbstractSelfTest extends GridComm
*/
protected void shutdownStore() throws Exception {
store.stop();
-
- assertTrue("Store cache must be empty after shutdown", store.writeCache().isEmpty());
+ if (store.getWriteCoalescing())
+ assertTrue("Store cache must be empty after shutdown", store.writeCache().isEmpty());
+ else
+ for (Map<?,?> fMap : store.flusherMaps())
+ assertTrue("Store flusher cache must be empty after shutdown", fMap.isEmpty());
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/22580e19/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreAbstractTest.java
index ffdad5c..56ee760 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreAbstractTest.java
@@ -37,6 +37,10 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.transactions.Transaction;
import org.jetbrains.annotations.Nullable;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+import org.junit.runners.Parameterized.Parameter;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
http://git-wip-us.apache.org/repos/asf/ignite/blob/22580e19/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreMultithreadedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreMultithreadedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreMultithreadedSelfTest.java
index bc6b7bd..15c58d9 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreMultithreadedSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreMultithreadedSelfTest.java
@@ -21,6 +21,7 @@ import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.util.typedef.internal.U;
/**
@@ -28,12 +29,30 @@ import org.apache.ignite.internal.util.typedef.internal.U;
*/
public class GridCacheWriteBehindStoreMultithreadedSelfTest extends GridCacheWriteBehindStoreAbstractSelfTest {
/**
+ * This test performs complex set of operations on store with coalescing from multiple threads.
+ *
+ * @throws Exception If failed.
+ */
+ public void testPutGetRemoveWithCoalescing() throws Exception {
+ testPutGetRemove(true);
+ }
+
+ /**
+ * This test performs complex set of operations on store without coalescing from multiple threads.
+ *
+ * @throws Exception If failed.
+ */
+ public void testPutGetRemoveWithoutCoalescing() throws Exception {
+ testPutGetRemove(false);
+ }
+
+ /**
* This test performs complex set of operations on store from multiple threads.
*
* @throws Exception If failed.
*/
- public void testPutGetRemove() throws Exception {
- initStore(2);
+ private void testPutGetRemove(boolean writeCoalescing) throws Exception {
+ initStore(2, writeCoalescing);
Set<Integer> exp;
@@ -63,26 +82,54 @@ public class GridCacheWriteBehindStoreMultithreadedSelfTest extends GridCacheWri
}
/**
+ * Tests that cache with write coalescing would keep values if underlying store fails.
+ *
+ * @throws Exception if failed.
+ */
+ public void testStoreFailureWithCoalescing() throws Exception {
+ testStoreFailure(true);
+ }
+
+ /**
+ * Tests that cache without write coalescing would keep values if underlying store fails.
+ *
+ * @throws Exception if failed.
+ */
+ public void testStoreFailureWithoutCoalescing() throws Exception {
+ testStoreFailure(false);
+ }
+
+ /**
* Tests that cache would keep values if underlying store fails.
*
* @throws Exception If failed.
*/
- public void testStoreFailure() throws Exception {
+ private void testStoreFailure(boolean writeCoalescing) throws Exception {
delegate.setShouldFail(true);
- initStore(2);
+ initStore(2, writeCoalescing);
Set<Integer> exp;
try {
+ Thread timer = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ U.sleep(FLUSH_FREQUENCY+50);
+ } catch (IgniteInterruptedCheckedException e) {
+ assertTrue("Timer was interrupted", false);
+ }
+ delegate.setShouldFail(false);
+ }
+ });
+ timer.start();
exp = runPutGetRemoveMultithreaded(10, 10);
- U.sleep(FLUSH_FREQUENCY);
+ timer.join();
info(">>> There are " + store.getWriteBehindErrorRetryCount() + " entries in RETRY state");
- delegate.setShouldFail(false);
-
// Despite that we set shouldFail flag to false, flush thread may just have caught an exception.
// If we move store to the stopping state right away, this value will be lost. That's why this sleep
// is inserted here to let all exception handlers in write-behind store exit.
@@ -111,16 +158,37 @@ public class GridCacheWriteBehindStoreMultithreadedSelfTest extends GridCacheWri
}
/**
+ * Tests store (with write coalescing) consistency in case of high put rate,
+ * when flush is performed from the same thread as put or remove operation.
+ *
+ * @throws Exception If failed.
+ */
+ public void testFlushFromTheSameThreadWithCoalescing() throws Exception {
+ testFlushFromTheSameThread(true);
+ }
+
+ /**
+ * Tests store (without write coalescing) consistency in case of high put rate,
+ * when flush is performed from the same thread as put or remove operation.
+ *
+ * @throws Exception If failed.
+ */
+ public void testFlushFromTheSameThreadWithoutCoalescing() throws Exception {
+ testFlushFromTheSameThread(false);
+ }
+
+ /**
* Tests store consistency in case of high put rate, when flush is performed from the same thread
* as put or remove operation.
*
+ * @param writeCoalescing write coalescing flag.
* @throws Exception If failed.
*/
- public void testFlushFromTheSameThread() throws Exception {
+ private void testFlushFromTheSameThread(boolean writeCoalescing) throws Exception {
// 50 milliseconds should be enough.
delegate.setOperationDelay(50);
- initStore(2);
+ initStore(2, writeCoalescing);
Set<Integer> exp;
@@ -162,4 +230,4 @@ public class GridCacheWriteBehindStoreMultithreadedSelfTest extends GridCacheWri
for (Integer key : exp)
assertEquals("Invalid value for key " + key, "val" + key, map.get(key));
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/22580e19/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreSelfTest.java
index 67e26ab..9a487a4 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreSelfTest.java
@@ -35,11 +35,30 @@ import org.jsr166.ConcurrentLinkedHashMap;
*/
public class GridCacheWriteBehindStoreSelfTest extends GridCacheWriteBehindStoreAbstractSelfTest {
/**
- * Tests correct store shutdown when underlying store fails,
+ * Tests correct store (with write coalescing) shutdown when underlying store fails.
*
* @throws Exception If failed.
*/
- public void testShutdownWithFailure() throws Exception {
+ public void testShutdownWithFailureWithCoalescing() throws Exception {
+ testShutdownWithFailure(true);
+ }
+
+ /**
+ * Tests correct store (without write coalescing) shutdown when underlying store fails.
+ *
+ * @throws Exception If failed.
+ */
+ public void testShutdownWithFailureWithoutCoalescing() throws Exception {
+ testShutdownWithFailure(false);
+ }
+
+ /**
+ * Tests correct store shutdown when underlying store fails.
+ *
+ * @param writeCoalescing Write coalescing flag.
+ * @throws Exception If failed.
+ */
+ private void testShutdownWithFailure(final boolean writeCoalescing) throws Exception {
final AtomicReference<Exception> err = new AtomicReference<>();
multithreadedAsync(new Runnable() {
@@ -47,7 +66,7 @@ public class GridCacheWriteBehindStoreSelfTest extends GridCacheWriteBehindStore
try {
delegate.setShouldFail(true);
- initStore(2);
+ initStore(2, writeCoalescing);
try {
store.write(new CacheEntryImpl<>(1, "val1"));
@@ -70,10 +89,31 @@ public class GridCacheWriteBehindStoreSelfTest extends GridCacheWriteBehindStore
}
/**
+ * Simple store (with write coalescing) test.
+ *
+ * @throws Exception If failed.
+ */
+ public void testSimpleStoreWithCoalescing() throws Exception {
+ testSimpleStore(true);
+ }
+
+ /**
+ * Simple store (without write coalescing) test.
+ *
* @throws Exception If failed.
*/
- public void testSimpleStore() throws Exception {
- initStore(2);
+ public void testSimpleStoreWithoutCoalescing() throws Exception {
+ testSimpleStore(false);
+ }
+
+ /**
+ * Simple store test.
+ *
+ * @param writeCoalescing Write coalescing flag.
+ * @throws Exception If failed.
+ */
+ private void testSimpleStore(boolean writeCoalescing) throws Exception {
+ initStore(2, writeCoalescing);
try {
store.write(new CacheEntryImpl<>(1, "v1"));
@@ -95,14 +135,35 @@ public class GridCacheWriteBehindStoreSelfTest extends GridCacheWriteBehindStore
}
/**
+ * Check that all values written to the store with coalescing will be in underlying store after timeout
+ * or due to size limits.
+ *
+ * @throws Exception If failed.
+ */
+ public void testValuePropagationWithCoalescing() throws Exception {
+ testValuePropagation(true);
+ }
+
+ /**
+ * Check that all values written to the store without coalescing will be in underlying store after timeout
+ * or due to size limits.
+ *
+ * @throws Exception If failed.
+ */
+ public void testValuePropagationWithoutCoalescing() throws Exception {
+ testValuePropagation(false);
+ }
+
+ /**
* Check that all values written to the store will be in underlying store after timeout or due to size limits.
*
+ * @param writeCoalescing Write coalescing flag
* @throws Exception If failed.
*/
@SuppressWarnings({"NullableProblems"})
- public void testValuePropagation() throws Exception {
+ private void testValuePropagation(boolean writeCoalescing) throws Exception {
// Need to test size-based write.
- initStore(1);
+ initStore(1, writeCoalescing);
try {
for (int i = 0; i < CACHE_SIZE * 2; i++)
@@ -132,12 +193,31 @@ public class GridCacheWriteBehindStoreSelfTest extends GridCacheWriteBehindStore
}
/**
+ * Tests store with write coalescing behaviour under continuous put of the same key with different values.
+ *
+ * @throws Exception If failed.
+ */
+ public void testContinuousPutWithCoalescing() throws Exception {
+ testContinuousPut(true);
+ }
+
+ /**
+ * Tests store without write coalescing behaviour under continuous put of the same key with different values.
+ *
+ * @throws Exception If failed.
+ */
+ public void testContinuousPutWithoutCoalescing() throws Exception {
+ testContinuousPut(false);
+ }
+
+ /**
* Tests store behaviour under continuous put of the same key with different values.
*
- * @throws Exception If failed
+ * @param writeCoalescing Write coalescing flag for cache.
+ * @throws Exception If failed.
*/
- public void testContinuousPut() throws Exception {
- initStore(2);
+ private void testContinuousPut(boolean writeCoalescing) throws Exception {
+ initStore(2, writeCoalescing);
try {
final AtomicBoolean running = new AtomicBoolean(true);
@@ -169,17 +249,22 @@ public class GridCacheWriteBehindStoreSelfTest extends GridCacheWriteBehindStore
}, 1, "put");
U.sleep(FLUSH_FREQUENCY * 2 + 500);
+ running.set(false);
+ U.sleep(FLUSH_FREQUENCY * 2 + 500);
int delegatePutCnt = delegate.getPutAllCount();
- running.set(false);
fut.get();
log().info(">>> [putCnt = " + actualPutCnt.get() + ", delegatePutCnt=" + delegatePutCnt + "]");
assertTrue("No puts were made to the underlying store", delegatePutCnt > 0);
- assertTrue("Too many puts were made to the underlying store", delegatePutCnt < actualPutCnt.get() / 10);
+ if (store.getWriteCoalescing()) {
+ assertTrue("Too many puts were made to the underlying store", delegatePutCnt < actualPutCnt.get() / 10);
+ } else {
+ assertTrue("Too few puts cnt=" + actualPutCnt.get() + " << storePutCnt=" + delegatePutCnt, delegatePutCnt > actualPutCnt.get() / 2);
+ }
}
finally {
shutdownStore();
@@ -193,13 +278,34 @@ public class GridCacheWriteBehindStoreSelfTest extends GridCacheWriteBehindStore
}
/**
+ * Tests that all values were put into the store with write coalescing will be written to the underlying store
+ * after shutdown is called.
+ *
+ * @throws Exception If failed.
+ */
+ public void testShutdownWithCoalescing() throws Exception {
+ testShutdown(true);
+ }
+
+ /**
+ * Tests that all values were put into the store without write coalescing will be written to the underlying store
+ * after shutdown is called.
+ *
+ * @throws Exception If failed.
+ */
+ public void testShutdownWithoutCoalescing() throws Exception {
+ testShutdown(false);
+ }
+
+ /**
* Tests that all values were put into the store will be written to the underlying store
* after shutdown is called.
*
+ * @param writeCoalescing Write coalescing flag.
* @throws Exception If failed.
*/
- public void testShutdown() throws Exception {
- initStore(2);
+ private void testShutdown(boolean writeCoalescing) throws Exception {
+ initStore(2, writeCoalescing);
try {
final AtomicBoolean running = new AtomicBoolean(true);
@@ -243,14 +349,35 @@ public class GridCacheWriteBehindStoreSelfTest extends GridCacheWriteBehindStore
/**
* Tests that all values will be written to the underlying store
+ * right in the same order as they were put into the store with coalescing.
+ *
+ * @throws Exception If failed.
+ */
+ public void testBatchApplyWithCoalescing() throws Exception {
+ testBatchApply(true);
+ }
+
+ /**
+ * Tests that all values will be written to the underlying store
+ * right in the same order as they were put into the store without coalescing.
+ *
+ * @throws Exception If failed.
+ */
+ public void testBatchApplyWithoutCoalescing() throws Exception {
+ testBatchApply(false);
+ }
+
+ /**
+ * Tests that all values will be written to the underlying store
* right in the same order as they were put into the store.
*
+ * @param writeCoalescing Write coalescing flag.
* @throws Exception If failed.
*/
- public void testBatchApply() throws Exception {
+ private void testBatchApply(boolean writeCoalescing) throws Exception {
delegate = new GridCacheTestStore(new ConcurrentLinkedHashMap<Integer, String>());
- initStore(1);
+ initStore(1, writeCoalescing);
List<Integer> intList = new ArrayList<>(CACHE_SIZE);
http://git-wip-us.apache.org/repos/asf/ignite/blob/22580e19/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/IgnteCacheClientWriteBehindStoreNonCoalescingTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/IgnteCacheClientWriteBehindStoreNonCoalescingTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/IgnteCacheClientWriteBehindStoreNonCoalescingTest.java
new file mode 100644
index 0000000..8ea109d
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/IgnteCacheClientWriteBehindStoreNonCoalescingTest.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.store;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import javax.cache.Cache;
+import javax.cache.configuration.Factory;
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicWriteOrderMode;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.store.CacheStore;
+import org.apache.ignite.cache.store.CacheStoreAdapter;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.processors.cache.IgniteCacheAbstractTest;
+import org.apache.ignite.lang.IgniteBiInClosure;
+import org.apache.ignite.lang.IgniteFuture;
+
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+
+/**
+ * This class provides non write coalescing tests for {@link org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindStore}.
+ */
+public class IgnteCacheClientWriteBehindStoreNonCoalescingTest extends IgniteCacheAbstractTest {
+ /** {@inheritDoc} */
+ @Override protected int gridCount() {
+ return 1;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected CacheMode cacheMode() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected CacheAtomicityMode atomicityMode() {
+ return ATOMIC;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected CacheAtomicWriteOrderMode atomicWriteOrderMode() {
+ return CLOCK;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected NearCacheConfiguration nearConfiguration() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected Factory<CacheStore> cacheStoreFactory() {
+ return new TestIncrementStoreFactory();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testNonCoalescingIncrementing() throws Exception {
+ Ignite ignite = grid(0);
+
+ IgniteCache<Integer, Integer> cache = ignite.cache(null);
+
+ assertEquals(cache.getConfiguration(CacheConfiguration.class).getCacheStoreFactory().getClass(),
+ TestIncrementStoreFactory.class);
+
+ Set<Integer> keys = new HashSet<>();
+
+ for (int i = 0; i < 1000; i++) {
+ keys.add(i);
+
+ cache.put(i, i);
+ }
+
+ Collection<IgniteFuture<?>> futs = new ArrayList<>();
+
+ for (int i = 0; i < 100; i++)
+ futs.add(updateKeys(cache, keys));
+
+ for (IgniteFuture<?> fut : futs)
+ fut.get();
+ }
+
+ /**
+ * Update specified keys in async mode.
+ *
+ * @param cache Cache to use.
+ * @param keys Keys to update.
+ * @return IgniteFuture.
+ */
+ private IgniteFuture<?> updateKeys(IgniteCache<Integer, Integer> cache, Set<Integer> keys) {
+ IgniteCache asyncCache = cache.withAsync();
+
+ // Using EntryProcessor.invokeAll to increment every value in place.
+ asyncCache.invokeAll(keys, new EntryProcessor<Integer, Integer, Object>() {
+ @Override public Object process(MutableEntry<Integer, Integer> entry, Object... arguments)
+ throws EntryProcessorException {
+ entry.setValue(entry.getValue() + 1);
+
+ return null;
+ }
+ });
+
+ return asyncCache.future();
+ }
+
+ /**
+ * Test increment store factory.
+ */
+ public static class TestIncrementStoreFactory implements Factory<CacheStore> {
+ /** {@inheritDoc} */
+ @Override public CacheStore create() {
+ return new TestIncrementStore();
+ }
+ }
+
+ /**
+ * Test cache store to validate int value incrementing
+ */
+ public static class TestIncrementStore extends CacheStoreAdapter<Object, Object> {
+ /** {@inheritDoc} */
+ @Override public void loadCache(IgniteBiInClosure<Object, Object> clo, Object... args) {
+ for (Map.Entry<Object, Object> e : storeMap.entrySet())
+ clo.apply(e.getKey(), e.getValue());
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object load(Object key) {
+ return storeMap.get(key);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void write(Cache.Entry<? extends Object, ? extends Object> entry) {
+ Object oldValue = storeMap.put(entry.getKey(), entry.getValue());
+
+ if (oldValue instanceof Integer && entry.getValue() instanceof Integer) {
+ Integer oldInt = (Integer)oldValue;
+ Integer newInt = (Integer)entry.getValue();
+
+ assertTrue(
+ "newValue(" + newInt + ") != oldValue(" + oldInt + ")+1 !",
+ newInt == oldInt + 1);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void delete(Object key) {
+ storeMap.remove(key);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/22580e19/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheWriteBehindTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheWriteBehindTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheWriteBehindTestSuite.java
index b4cdfa8..dff93ff 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheWriteBehindTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheWriteBehindTestSuite.java
@@ -26,6 +26,7 @@ import org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindSto
import org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindStoreReplicatedTest;
import org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindStoreSelfTest;
import org.apache.ignite.internal.processors.cache.store.IgnteCacheClientWriteBehindStoreAtomicTest;
+import org.apache.ignite.internal.processors.cache.store.IgnteCacheClientWriteBehindStoreNonCoalescingTest;
import org.apache.ignite.internal.processors.cache.store.IgnteCacheClientWriteBehindStoreTxTest;
/**
@@ -49,6 +50,7 @@ public class IgniteCacheWriteBehindTestSuite extends TestSuite {
suite.addTest(new TestSuite(GridCachePartitionedWritesTest.class));
suite.addTest(new TestSuite(IgnteCacheClientWriteBehindStoreAtomicTest.class));
suite.addTest(new TestSuite(IgnteCacheClientWriteBehindStoreTxTest.class));
+ suite.addTest(new TestSuite(IgnteCacheClientWriteBehindStoreNonCoalescingTest.class));
return suite;
}