You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2014/12/23 00:59:39 UTC

[05/50] [abbrv] incubator-ignite git commit: GG-9141 - Fixes for write-behind store.

GG-9141 - Fixes for write-behind store.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/79118aff
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/79118aff
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/79118aff

Branch: refs/heads/master
Commit: 79118aff1d63408629fe9d3e9e8b38b265022d2e
Parents: c7a96e1
Author: Alexey Goncharuk <ag...@gridgain.com>
Authored: Fri Dec 19 16:46:38 2014 -0800
Committer: Alexey Goncharuk <ag...@gridgain.com>
Committed: Fri Dec 19 16:46:38 2014 -0800

----------------------------------------------------------------------
 .../processors/cache/GridCacheProcessor.java    |  33 ++---
 .../processors/cache/GridCacheTxHandler.java    |   4 +-
 .../cache/GridCacheTxLocalAdapter.java          | 142 +++++++++----------
 .../cache/distributed/dht/GridDhtTxLocal.java   |  12 +-
 .../distributed/dht/GridDhtTxLocalAdapter.java  |   4 +
 .../near/GridNearTxFinishFuture.java            |   1 +
 .../near/GridNearTxFinishRequest.java           |  26 ++++
 ...BehindStorePartitionedMultiNodeSelfTest.java |   2 +-
 .../GridCacheWriteBehindTestSuite.java          |   4 +-
 9 files changed, 118 insertions(+), 110 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/79118aff/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java
index 9f11834..c38d853 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java
@@ -422,7 +422,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         if (ctx.config().getCacheMode() == LOCAL || !isNearEnabled(ctx))
             return Collections.emptyList();
         else
-            return F.asList((GridCacheManager)ctx.queries(), ctx.continuousQueries());
+            return F.asList((GridCacheManager)ctx.queries(), ctx.continuousQueries(), ctx.store());
     }
 
     /**
@@ -568,8 +568,9 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             GridCacheTtlManager ttlMgr = new GridCacheTtlManager();
             GridCacheDrManager drMgr = ctx.createComponent(GridCacheDrManager.class);
 
-            GridCacheStore nearStore = cacheStore(ctx.gridName(), cfg, isNearEnabled(cfg));
-            GridCacheStoreManager storeMgr = new GridCacheStoreManager(nearStore);
+            GridCacheStore store = cacheStore(ctx.gridName(), cfg);
+
+            GridCacheStoreManager storeMgr = new GridCacheStoreManager(store);
 
             GridCacheContext<?, ?> cacheCtx = new GridCacheContext(
                 ctx,
@@ -708,10 +709,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                 evtMgr = new GridCacheEventManager();
                 drMgr = ctx.createComponent(GridCacheDrManager.class);
 
-                GridCacheStore dhtStore = cacheStore(ctx.gridName(), cfg, false);
-
-                storeMgr = new GridCacheStoreManager(dhtStore);
-
                 cacheCtx = new GridCacheContext(
                     ctx,
                     sharedCtx,
@@ -1740,29 +1737,23 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      *
      * @param gridName Grid name.
      * @param cfg Cache configuration.
-     * @param near Whether or not store retrieved for near cache.
      * @return Instance if {@link GridCacheWriteBehindStore} if write-behind store is configured,
      *         or user-defined cache store.
      */
     @SuppressWarnings({"unchecked"})
-    private GridCacheStore cacheStore(String gridName, GridCacheConfiguration cfg, boolean near) {
+    private GridCacheStore cacheStore(String gridName, GridCacheConfiguration cfg) {
         if (cfg.getStore() == null || !cfg.isWriteBehindEnabled())
             return cfg.getStore();
 
-        // Write-behind store is used in DHT cache only.
-        if (!near) {
-            GridCacheWriteBehindStore store = new GridCacheWriteBehindStore(gridName, cfg.getName(), log,
-                cfg.getStore());
+        GridCacheWriteBehindStore store = new GridCacheWriteBehindStore(gridName, cfg.getName(), log,
+            cfg.getStore());
 
-            store.setFlushSize(cfg.getWriteBehindFlushSize());
-            store.setFlushThreadCount(cfg.getWriteBehindFlushThreadCount());
-            store.setFlushFrequency(cfg.getWriteBehindFlushFrequency());
-            store.setBatchSize(cfg.getWriteBehindBatchSize());
+        store.setFlushSize(cfg.getWriteBehindFlushSize());
+        store.setFlushThreadCount(cfg.getWriteBehindFlushThreadCount());
+        store.setFlushFrequency(cfg.getWriteBehindFlushFrequency());
+        store.setBatchSize(cfg.getWriteBehindBatchSize());
 
-            return store;
-        }
-        else
-            return cfg.getStore();
+        return store;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/79118aff/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxHandler.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxHandler.java
index abd7e78..c3ff36c 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxHandler.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxHandler.java
@@ -509,7 +509,7 @@ public class GridCacheTxHandler<K, V> {
                             READ_COMMITTED,
                             /*timeout */0,
                             req.isInvalidate(),
-                            req.explicitLock(),
+                            req.storeEnabled(),
                             req.txSize(),
                             req.groupLockKey(),
                             false,
@@ -523,6 +523,8 @@ public class GridCacheTxHandler<K, V> {
                     tx.topologyVersion(req.topologyVersion());
                 }
 
+                tx.storeEnabled(req.storeEnabled());
+
                 if (!tx.markFinalizing(USER_FINISH)) {
                     if (log.isDebugEnabled())
                         log.debug("Will not finish transaction (it is handled by another thread): " + tx);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/79118aff/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java
index 4af716a..b1872be 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java
@@ -437,98 +437,97 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K
     protected void batchStoreCommit(Iterable<GridCacheTxEntry<K, V>> writeEntries) throws IgniteCheckedException {
         GridCacheStoreManager<K, V> store = store();
 
-        if (store != null && storeEnabled() && (!internal() || groupLock())) {
+        if (store != null && storeEnabled() && (!internal() || groupLock()) && (near() || store.writeToStoreFromDht())) {
             try {
-                // Implicit transactions are always updated at the end.
-                if (isBatchUpdate()) {
-                    if (writeEntries != null) {
-                        Map<K, IgniteBiTuple<V, GridCacheVersion>> putMap = null;
-                        List<K> rmvCol = null;
+                if (writeEntries != null) {
+                    Map<K, IgniteBiTuple<V, GridCacheVersion>> putMap = null;
+                    List<K> rmvCol = null;
 
-                        /*
-                         * Batch database processing.
-                         */
-                        for (GridCacheTxEntry<K, V> e : writeEntries) {
-                            boolean intercept = e.context().config().getInterceptor() != null;
+                    boolean skipNear = near() && store.writeToStoreFromDht();
 
-                            if (intercept || !F.isEmpty(e.transformClosures()))
-                                e.cached().unswap(true, false);
+                    for (GridCacheTxEntry<K, V> e : writeEntries) {
+                        if (skipNear && e.cached().isNear())
+                            continue;
 
-                            GridTuple3<GridCacheOperation, V, byte[]> res = applyTransformClosures(e, false);
+                        boolean intercept = e.context().config().getInterceptor() != null;
 
-                            GridCacheContext<K, V> cacheCtx = e.context();
+                        if (intercept || !F.isEmpty(e.transformClosures()))
+                            e.cached().unswap(true, false);
 
-                            GridCacheOperation op = res.get1();
-                            K key = e.key();
-                            V val = res.get2();
-                            GridCacheVersion ver = writeVersion();
+                        GridTuple3<GridCacheOperation, V, byte[]> res = applyTransformClosures(e, false);
 
-                            if (op == CREATE || op == UPDATE) {
-                                // Batch-process all removes if needed.
-                                if (rmvCol != null && !rmvCol.isEmpty()) {
-                                    store.removeAllFromStore(this, rmvCol);
+                        GridCacheContext<K, V> cacheCtx = e.context();
 
-                                    // Reset.
-                                    rmvCol.clear();
-                                }
+                        GridCacheOperation op = res.get1();
+                        K key = e.key();
+                        V val = res.get2();
+                        GridCacheVersion ver = writeVersion();
 
-                                if (intercept) {
-                                    V old = e.cached().rawGetOrUnmarshal(true);
+                        if (op == CREATE || op == UPDATE) {
+                            // Batch-process all removes if needed.
+                            if (rmvCol != null && !rmvCol.isEmpty()) {
+                                store.removeAllFromStore(this, rmvCol);
 
-                                    val = (V)cacheCtx.config().getInterceptor().onBeforePut(key, old, val);
+                                // Reset.
+                                rmvCol.clear();
+                            }
 
-                                    if (val == null)
-                                        continue;
+                            if (intercept) {
+                                V old = e.cached().rawGetOrUnmarshal(true);
 
-                                    val = cacheCtx.unwrapTemporary(val);
-                                }
+                                val = (V)cacheCtx.config().getInterceptor().onBeforePut(key, old, val);
 
-                                if (putMap == null)
-                                    putMap = new LinkedHashMap<>(writeMap().size(), 1.0f);
+                                if (val == null)
+                                    continue;
 
-                                putMap.put(key, F.t(val, ver));
+                                val = cacheCtx.unwrapTemporary(val);
                             }
-                            else if (op == DELETE) {
-                                // Batch-process all puts if needed.
-                                if (putMap != null && !putMap.isEmpty()) {
-                                    store.putAllToStore(this, putMap);
 
-                                    // Reset.
-                                    putMap.clear();
-                                }
+                            if (putMap == null)
+                                putMap = new LinkedHashMap<>(writeMap().size(), 1.0f);
 
-                                if (intercept) {
-                                    V old = e.cached().rawGetOrUnmarshal(true);
+                            putMap.put(key, F.t(val, ver));
+                        }
+                        else if (op == DELETE) {
+                            // Batch-process all puts if needed.
+                            if (putMap != null && !putMap.isEmpty()) {
+                                store.putAllToStore(this, putMap);
 
-                                    IgniteBiTuple<Boolean, V> t = cacheCtx.config().<K, V>getInterceptor()
-                                        .onBeforeRemove(key, old);
+                                // Reset.
+                                putMap.clear();
+                            }
 
-                                    if (cacheCtx.cancelRemove(t))
-                                        continue;
-                                }
+                            if (intercept) {
+                                V old = e.cached().rawGetOrUnmarshal(true);
 
-                                if (rmvCol == null)
-                                    rmvCol = new LinkedList<>();
+                                IgniteBiTuple<Boolean, V> t = cacheCtx.config().<K, V>getInterceptor()
+                                    .onBeforeRemove(key, old);
 
-                                rmvCol.add(key);
+                                if (cacheCtx.cancelRemove(t))
+                                    continue;
                             }
-                            else if (log.isDebugEnabled())
-                                log.debug("Ignoring NOOP entry for batch store commit: " + e);
-                        }
 
-                        if (putMap != null && !putMap.isEmpty()) {
-                            assert rmvCol == null || rmvCol.isEmpty();
+                            if (rmvCol == null)
+                                rmvCol = new LinkedList<>();
 
-                            // Batch put at the end of transaction.
-                            store.putAllToStore(this, putMap);
+                            rmvCol.add(key);
                         }
+                        else if (log.isDebugEnabled())
+                            log.debug("Ignoring NOOP entry for batch store commit: " + e);
+                    }
 
-                        if (rmvCol != null && !rmvCol.isEmpty()) {
-                            assert putMap == null || putMap.isEmpty();
+                    if (putMap != null && !putMap.isEmpty()) {
+                        assert rmvCol == null || rmvCol.isEmpty();
 
-                            // Batch remove at the end of transaction.
-                            store.removeAllFromStore(this, rmvCol);
-                        }
+                        // Batch put at the end of transaction.
+                        store.putAllToStore(this, putMap);
+                    }
+
+                    if (rmvCol != null && !rmvCol.isEmpty()) {
+                        assert putMap == null || putMap.isEmpty();
+
+                        // Batch remove at the end of transaction.
+                        store.removeAllFromStore(this, rmvCol);
                     }
                 }
 
@@ -954,7 +953,7 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K
 
                 GridCacheStoreManager<K, V> store = store();
 
-                if (store != null && isBatchUpdate()) {
+                if (store != null && (near() || store.writeToStoreFromDht())) {
                     if (!internal() || groupLock())
                         store.txEnd(this, false);
                 }
@@ -2609,7 +2608,7 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K
      * @throws IgniteCheckedException If caches already enlisted in this transaction are not compatible with given
      *      cache (e.g. they have different stores).
      */
-    private void addActiveCache(GridCacheContext<K, V> cacheCtx) throws IgniteCheckedException {
+    protected void addActiveCache(GridCacheContext<K, V> cacheCtx) throws IgniteCheckedException {
         int cacheId = cacheCtx.cacheId();
 
         // Check if we can enlist new cache to transaction.
@@ -2800,15 +2799,6 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K
         }
     }
 
-    /**
-     * @return {@code True} if updates should be batched up.
-     */
-    protected boolean isBatchUpdate() {
-        GridCacheStoreManager<K, V> store = store();
-
-        return store != null;
-    }
-
     /** {@inheritDoc} */
     @Override public String toString() {
         return GridToStringBuilder.toString(GridCacheTxLocalAdapter.class, this, "super", super.toString(),

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/79118aff/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocal.java
index c21fc19..8147878 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocal.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocal.java
@@ -81,7 +81,7 @@ public class GridDhtTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> implements
      * @param concurrency Concurrency.
      * @param isolation Isolation.
      * @param timeout Timeout.
-     * @param explicitLock Explicit lock flag.
+     * @param storeEnabled Store enabled flag.
      * @param txSize Expected transaction size.
      * @param grpLockKey Group lock key if this is a group-lock transaction.
      * @param partLock {@code True} if this is a group-lock transaction and whole partition should be locked.
@@ -101,7 +101,7 @@ public class GridDhtTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> implements
         GridCacheTxIsolation isolation,
         long timeout,
         boolean invalidate,
-        boolean explicitLock,
+        boolean storeEnabled,
         int txSize,
         @Nullable GridCacheTxKey grpLockKey,
         boolean partLock,
@@ -119,7 +119,7 @@ public class GridDhtTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> implements
             isolation,
             timeout,
             invalidate,
-            explicitLock,
+            storeEnabled,
             txSize,
             grpLockKey,
             partLock,
@@ -203,12 +203,6 @@ public class GridDhtTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> implements
     }
 
     /** {@inheritDoc} */
-    @Override protected boolean isBatchUpdate() {
-        // Cache store updates may happen from DHT local transactions if write behind is enabled.
-        return super.isBatchUpdate() && (store().writeToStoreFromDht() || onePhaseCommit());
-    }
-
-    /** {@inheritDoc} */
     @Override protected boolean updateNearCache(GridCacheContext<K, V> cacheCtx, K key, long topVer) {
         return cacheCtx.isDht() && isNearEnabled(cacheCtx) && !cctx.localNodeId().equals(nearNodeId());
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/79118aff/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
index e18fe82..0d84353 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
@@ -410,6 +410,8 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends GridCacheTxLocalAdapte
             else {
                 entry = e;
 
+                addActiveCache(dhtCache.context());
+
                 while (true) {
                     GridDhtCacheEntry<K, V> cached = dhtCache.entryExx(entry.key(), topologyVersion());
 
@@ -510,6 +512,8 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends GridCacheTxLocalAdapte
                 if (txEntry == null) {
                     GridDhtCacheEntry<K, V> cached = dhtCache.entryExx(key, topVer);
 
+                    addActiveCache(dhtCache.context());
+
                     cached.unswap(!read, read);
 
                     GridCacheTxEntry<K, V> w = writeEntries == null ? null : writeEntries.get(idx++);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/79118aff/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index 4a438bf..f1000f8 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -342,6 +342,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
             tx.syncCommit(),
             tx.syncRollback(),
             m.explicitLock(),
+            tx.storeEnabled(),
             tx.topologyVersion(),
             null,
             null,

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/79118aff/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxFinishRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxFinishRequest.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxFinishRequest.java
index 76976e8..097730e 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxFinishRequest.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxFinishRequest.java
@@ -34,6 +34,9 @@ public class GridNearTxFinishRequest<K, V> extends GridDistributedTxFinishReques
     /** Explicit lock flag. */
     private boolean explicitLock;
 
+    /** Store enabled flag. */
+    private boolean storeEnabled;
+
     /** Topology version. */
     private long topVer;
 
@@ -60,6 +63,7 @@ public class GridNearTxFinishRequest<K, V> extends GridDistributedTxFinishReques
      * @param invalidate Invalidate flag.
      * @param sys System flag.
      * @param explicitLock Explicit lock flag.
+     * @param storeEnabled Store enabled flag.
      * @param topVer Topology version.
      * @param baseVer Base version.
      * @param committedVers Committed versions.
@@ -78,6 +82,7 @@ public class GridNearTxFinishRequest<K, V> extends GridDistributedTxFinishReques
         boolean syncCommit,
         boolean syncRollback,
         boolean explicitLock,
+        boolean storeEnabled,
         long topVer,
         GridCacheVersion baseVer,
         Collection<GridCacheVersion> committedVers,
@@ -91,6 +96,7 @@ public class GridNearTxFinishRequest<K, V> extends GridDistributedTxFinishReques
             rolledbackVers, txSize, writeEntries, recoverEntries, null);
 
         this.explicitLock = explicitLock;
+        this.storeEnabled = storeEnabled;
         this.topVer = topVer;
         this.subjId = subjId;
         this.taskNameHash = taskNameHash;
@@ -104,6 +110,13 @@ public class GridNearTxFinishRequest<K, V> extends GridDistributedTxFinishReques
     }
 
     /**
+     * @return Store enabled flag.
+     */
+    public boolean storeEnabled() {
+        return storeEnabled;
+    }
+
+    /**
      * @return Mini future ID.
      */
     public IgniteUuid miniId() {
@@ -156,6 +169,7 @@ public class GridNearTxFinishRequest<K, V> extends GridDistributedTxFinishReques
 
         _clone.miniId = miniId;
         _clone.explicitLock = explicitLock;
+        _clone.storeEnabled = storeEnabled;
         _clone.topVer = topVer;
         _clone.subjId = subjId;
         _clone.taskNameHash = taskNameHash;
@@ -207,6 +221,11 @@ public class GridNearTxFinishRequest<K, V> extends GridDistributedTxFinishReques
 
                 commState.idx++;
 
+            case 26:
+                if (!commState.putBoolean(storeEnabled))
+                    return false;
+
+                commState.idx++;
         }
 
         return true;
@@ -265,6 +284,13 @@ public class GridNearTxFinishRequest<K, V> extends GridDistributedTxFinishReques
 
                 commState.idx++;
 
+            case 26:
+                if (buf.remaining() < 1)
+                    return false;
+
+                storeEnabled = commState.getBoolean();
+
+                commState.idx++;
         }
 
         return true;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/79118aff/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheWriteBehindStorePartitionedMultiNodeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheWriteBehindStorePartitionedMultiNodeSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheWriteBehindStorePartitionedMultiNodeSelfTest.java
index 274a8ff..309794b 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheWriteBehindStorePartitionedMultiNodeSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheWriteBehindStorePartitionedMultiNodeSelfTest.java
@@ -173,7 +173,7 @@ public class GridCacheWriteBehindStorePartitionedMultiNodeSelfTest extends GridC
         for (int i = 0; i < GRID_CNT; i++) {
             Map<Integer,String> map = stores[i].getMap();
 
-            assertFalse(map.isEmpty());
+            assertFalse("Missing writes for node: " + i, map.isEmpty());
 
             allKeys.addAll(map.keySet());
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/79118aff/modules/core/src/test/java/org/gridgain/testsuites/GridCacheWriteBehindTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/testsuites/GridCacheWriteBehindTestSuite.java b/modules/core/src/test/java/org/gridgain/testsuites/GridCacheWriteBehindTestSuite.java
index 6e0b70f..819f4b2 100644
--- a/modules/core/src/test/java/org/gridgain/testsuites/GridCacheWriteBehindTestSuite.java
+++ b/modules/core/src/test/java/org/gridgain/testsuites/GridCacheWriteBehindTestSuite.java
@@ -25,10 +25,10 @@ public class GridCacheWriteBehindTestSuite extends TestSuite {
         // Write-behind tests.
         suite.addTest(new TestSuite(GridCacheWriteBehindStoreSelfTest.class));
         suite.addTest(new TestSuite(GridCacheWriteBehindStoreMultithreadedSelfTest.class));
-//        suite.addTest(new TestSuite(GridCacheWriteBehindStoreLocalTest.class)); // TODO GG-9141
+        suite.addTest(new TestSuite(GridCacheWriteBehindStoreLocalTest.class));
         suite.addTest(new TestSuite(GridCacheWriteBehindStoreReplicatedTest.class));
         suite.addTest(new TestSuite(GridCacheWriteBehindStorePartitionedTest.class));
-//        suite.addTest(new TestSuite(GridCacheWriteBehindStorePartitionedMultiNodeSelfTest.class));
+        suite.addTest(new TestSuite(GridCacheWriteBehindStorePartitionedMultiNodeSelfTest.class));
         suite.addTest(new TestSuite(GridCachePartitionedWritesTest.class));
 
         return suite;