You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/11/09 15:11:30 UTC

ignite git commit: ignite-6669 Do not call CacheStoreSessionListener if store operation is not executed

Repository: ignite
Updated Branches:
  refs/heads/master c939bdba3 -> b8672d7d6


ignite-6669 Do not call CacheStoreSessionListener if store operation is not executed


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b8672d7d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b8672d7d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b8672d7d

Branch: refs/heads/master
Commit: b8672d7d691981be3c10f74e97ae2caa5ddd1593
Parents: c939bdb
Author: Slava Koptilin <sl...@gmail.com>
Authored: Thu Nov 9 18:10:31 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Nov 9 18:10:31 2017 +0300

----------------------------------------------------------------------
 .../store/GridCacheStoreManagerAdapter.java     |  94 +++++-
 ...oreListenerRWThroughDisabledAtomicCache.java |  33 ++
 ...enerRWThroughDisabledTransactionalCache.java | 138 +++++++++
 ...SessionListenerReadWriteThroughDisabled.java | 291 ++++++++++++++++++
 ...eStoreSessionListenerWriteBehindEnabled.java | 304 +++++++++++++++++++
 .../testsuites/IgniteCacheTestSuite4.java       |   8 +-
 .../Cache/Store/CacheStoreSessionTest.cs        |  13 +-
 7 files changed, 856 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/b8672d7d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
index 9fe1f0c..22c2381 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
@@ -106,6 +106,9 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
     private boolean writeThrough;
 
     /** */
+    private boolean readThrough;
+
+    /** */
     private Collection<CacheStoreSessionListener> sesLsnrs;
 
     /** */
@@ -122,6 +125,8 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
 
         writeThrough = cfg.isWriteThrough();
 
+        readThrough = cfg.isReadThrough();
+
         this.cfgStore = cfgStore;
 
         store = cacheStoreWrapper(ctx, cfgStore, cfg);
@@ -306,7 +311,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
                 log.debug(S.toString("Loading value from store for key",
                     "key", storeKey, true));
 
-            sessionInit0(tx);
+            sessionInit0(tx, StoreOperation.READ, false);
 
             boolean threwEx = true;
 
@@ -442,7 +447,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
             if (log.isDebugEnabled())
                 log.debug("Loading values from store for keys: " + keys0);
 
-            sessionInit0(tx);
+            sessionInit0(tx, StoreOperation.READ, false);
 
             boolean threwEx = true;
 
@@ -501,7 +506,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
             if (log.isDebugEnabled())
                 log.debug("Loading all values from store.");
 
-            sessionInit0(null);
+            sessionInit0(null, StoreOperation.READ, false);
 
             boolean threwEx = true;
 
@@ -567,7 +572,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
                     "val", val0, true));
             }
 
-            sessionInit0(tx);
+            sessionInit0(tx, StoreOperation.WRITE, false);
 
             boolean threwEx = true;
 
@@ -622,7 +627,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
                 if (log.isDebugEnabled())
                     log.debug("Storing values in cache store [entries=" + entries + ']');
 
-                sessionInit0(tx);
+                sessionInit0(tx, StoreOperation.WRITE, false);
 
                 boolean threwEx = true;
 
@@ -675,7 +680,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
             if (log.isDebugEnabled())
                 log.debug(S.toString("Removing value from cache store", "key", key0, true));
 
-            sessionInit0(tx);
+            sessionInit0(tx, StoreOperation.WRITE, false);
 
             boolean threwEx = true;
 
@@ -727,7 +732,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
                 log.debug(S.toString("Removing values from cache store",
                     "keys", keys0, true));
 
-            sessionInit0(tx);
+            sessionInit0(tx, StoreOperation.WRITE, false);
 
             boolean threwEx = true;
 
@@ -778,10 +783,10 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
         boolean storeSessionEnded) throws IgniteCheckedException {
         assert store != null;
 
-        sessionInit0(tx);
+        sessionInit0(tx, commit? StoreOperation.COMMIT: StoreOperation.ROLLBACK, false);
 
         try {
-            if (sesLsnrs != null) {
+            if (sesLsnrs != null && sesHolder.get().contains(store)) {
                 for (CacheStoreSessionListener lsnr : sesLsnrs)
                     lsnr.onSessionEnd(locSes, commit);
             }
@@ -820,7 +825,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
 
     /** {@inheritDoc} */
     @Override public void writeBehindSessionInit() throws IgniteCheckedException {
-        sessionInit0(null);
+        sessionInit0(null, null, true);
     }
 
     /** {@inheritDoc} */
@@ -830,9 +835,12 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
 
     /**
      * @param tx Current transaction.
+     * @param op Store operation.
+     * @param writeBehindStoreInitiator {@code true} if method call is initiated by {@link GridCacheWriteBehindStore}.
      * @throws IgniteCheckedException If failed.
      */
-    private void sessionInit0(@Nullable IgniteInternalTx tx) throws IgniteCheckedException {
+    private void sessionInit0(@Nullable IgniteInternalTx tx, @Nullable StoreOperation op,
+        boolean writeBehindStoreInitiator) throws IgniteCheckedException {
         assert sesHolder != null;
 
         SessionData ses;
@@ -854,8 +862,45 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
 
         sesHolder.set(ses);
 
+        notifyCacheStoreSessionListeners(ses, op, writeBehindStoreInitiator);
+    }
+
+    /**
+     * @param ses Current session.
+     * @param op Store operation.
+     * @param writeBehindStoreInitiator {@code True} if method call is initiated by {@link GridCacheWriteBehindStore}.
+     * @throws IgniteCheckedException If failed.
+     */
+    private void notifyCacheStoreSessionListeners(SessionData ses, @Nullable StoreOperation op,
+        boolean writeBehindStoreInitiator) throws IgniteCheckedException {
         try {
-            if (!ses.started(store) && sesLsnrs != null) {
+            boolean notifyLsnrs = false;
+
+            if (writeBehindStoreInitiator)
+                notifyLsnrs = !ses.started(store) && sesLsnrs != null;
+            else {
+                assert op != null;
+
+                switch (op) {
+                    case READ:
+                        notifyLsnrs = readThrough && !ses.started(store) && sesLsnrs != null;
+                        break;
+
+                    case WRITE:
+                        notifyLsnrs = !cacheConfiguration().isWriteBehindEnabled() && writeThrough
+                            && !ses.started(store) && sesLsnrs != null;
+                        break;
+
+                    case COMMIT:
+                    case ROLLBACK:
+                        // No needs to start the session (if not started yet) and notify listeners.
+                        break;
+
+                    default:
+                        assert false : "Unexpected operation: " + op.toString();
+                }
+            }
+            if (notifyLsnrs) {
                 for (CacheStoreSessionListener lsnr : sesLsnrs)
                     lsnr.onSessionStart(locSes);
             }
@@ -871,7 +916,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
     private void sessionEnd0(@Nullable IgniteInternalTx tx, boolean threwEx) throws IgniteCheckedException {
         try {
             if (tx == null) {
-                if (sesLsnrs != null) {
+                if (sesLsnrs != null && sesHolder.get().contains(store)) {
                     for (CacheStoreSessionListener lsnr : sesLsnrs)
                         lsnr.onSessionEnd(locSes, !threwEx);
                 }
@@ -995,6 +1040,14 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
             return !started.remove(store);
         }
 
+        /**
+         * @param store Cache store.
+         * @return {@code True} if session started.
+         */
+        private boolean contains(CacheStore store) {
+            return started.contains(store);
+        }
+
         /** {@inheritDoc} */
         @Override public String toString() {
             return S.toString(SessionData.class, this, "tx", CU.txString(tx != null ? tx.tx : null));
@@ -1429,4 +1482,19 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
             throw new UnsupportedOperationException();
         }
     }
+
+    /** Enumeration that represents possible operations on the underlying store. */
+    private enum StoreOperation {
+        /** Read key-value pair from the underlying store. */
+        READ,
+
+        /** Update or remove key from the underlying store. */
+        WRITE,
+
+        /** Commit changes to the underlying store. */
+        COMMIT,
+
+        /** Rollback changes to the underlying store. */
+        ROLLBACK
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/b8672d7d/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreListenerRWThroughDisabledAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreListenerRWThroughDisabledAtomicCache.java b/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreListenerRWThroughDisabledAtomicCache.java
new file mode 100644
index 0000000..9b59940
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreListenerRWThroughDisabledAtomicCache.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+
+/**
+ * This class tests that redundant calls of {@link CacheStoreSessionListener#onSessionStart(CacheStoreSession)}
+ * and {@link CacheStoreSessionListener#onSessionEnd(CacheStoreSession, boolean)} are not executed.
+ */
+public class CacheStoreListenerRWThroughDisabledAtomicCache extends CacheStoreSessionListenerReadWriteThroughDisabled {
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicityMode atomicityMode() {
+        return ATOMIC;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/b8672d7d/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreListenerRWThroughDisabledTransactionalCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreListenerRWThroughDisabledTransactionalCache.java b/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreListenerRWThroughDisabledTransactionalCache.java
new file mode 100644
index 0000000..6502c97
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreListenerRWThroughDisabledTransactionalCache.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store;
+
+import java.util.Random;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
+
+/**
+ * This class tests that redundant calls of {@link CacheStoreSessionListener#onSessionStart(CacheStoreSession)}
+ * and {@link CacheStoreSessionListener#onSessionEnd(CacheStoreSession, boolean)} are not executed.
+ */
+public class CacheStoreListenerRWThroughDisabledTransactionalCache extends CacheStoreSessionListenerReadWriteThroughDisabled {
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicityMode atomicityMode() {
+        return TRANSACTIONAL;
+    }
+
+    /**
+     * Tests {@link IgniteCache#get(Object)} with disabled read-through and write-through modes.
+     */
+    public void testTransactionalLookup() {
+        testTransactionalLookup(OPTIMISTIC, READ_COMMITTED);
+        testTransactionalLookup(OPTIMISTIC, REPEATABLE_READ);
+        testTransactionalLookup(OPTIMISTIC, SERIALIZABLE);
+
+        testTransactionalLookup(PESSIMISTIC, READ_COMMITTED);
+        testTransactionalLookup(PESSIMISTIC, REPEATABLE_READ);
+        testTransactionalLookup(PESSIMISTIC, SERIALIZABLE);
+    }
+
+    /**
+     * @param concurrency Transaction concurrency level.
+     * @param isolation Transaction isolation level.
+     */
+    private void testTransactionalLookup(TransactionConcurrency concurrency, TransactionIsolation isolation) {
+        IgniteCache cache = grid(0).getOrCreateCache(DEFAULT_CACHE_NAME);
+
+        Random r = new Random();
+
+        try (Transaction tx = grid(0).transactions().txStart(concurrency, isolation)) {
+            for (int i = 0; i < CNT; ++i)
+                cache.get(r.nextInt());
+
+            tx.commit();
+        }
+    }
+
+    /**
+     * Tests {@link IgniteCache#put(Object, Object)} with disabled read-through and write-through modes.
+     */
+    public void testTransactionalUpdate() {
+        testTransactionalUpdate(OPTIMISTIC, READ_COMMITTED);
+        testTransactionalUpdate(OPTIMISTIC, REPEATABLE_READ);
+        testTransactionalUpdate(OPTIMISTIC, SERIALIZABLE);
+
+        testTransactionalUpdate(PESSIMISTIC, READ_COMMITTED);
+        testTransactionalUpdate(PESSIMISTIC, REPEATABLE_READ);
+        testTransactionalUpdate(PESSIMISTIC, SERIALIZABLE);
+    }
+
+    /**
+     * @param concurrency Transaction concurrency level.
+     * @param isolation Transaction isolation level.
+     */
+    private void testTransactionalUpdate(TransactionConcurrency concurrency, TransactionIsolation isolation) {
+        IgniteCache cache = grid(0).getOrCreateCache(DEFAULT_CACHE_NAME);
+
+        Random r = new Random();
+
+        try (Transaction tx = grid(0).transactions().txStart(concurrency, isolation)) {
+            for (int i = 0; i < CNT; ++i)
+                cache.put(r.nextInt(), "test-value");
+
+            tx.commit();
+        }
+    }
+
+    /**
+     * Tests {@link IgniteCache#remove(Object)} with disabled read-through and write-through modes.
+     */
+    public void testTransactionalRemove() {
+        testTransactionalRemove(OPTIMISTIC, READ_COMMITTED);
+        testTransactionalRemove(OPTIMISTIC, REPEATABLE_READ);
+        testTransactionalRemove(OPTIMISTIC, SERIALIZABLE);
+
+        testTransactionalRemove(PESSIMISTIC, READ_COMMITTED);
+        testTransactionalRemove(PESSIMISTIC, REPEATABLE_READ);
+        testTransactionalRemove(PESSIMISTIC, SERIALIZABLE);
+    }
+
+    /**
+     * @param concurrency Transaction concurrency level.
+     * @param isolation Transaction isolation level.
+     */
+    private void testTransactionalRemove(TransactionConcurrency concurrency, TransactionIsolation isolation) {
+        IgniteCache cache = grid(0).getOrCreateCache(DEFAULT_CACHE_NAME);
+
+        Random r = new Random();
+
+        try (Transaction tx = grid(0).transactions().txStart(concurrency, isolation)) {
+            for (int i = 0; i < CNT; ++i) {
+                int key = r.nextInt();
+
+                cache.put(key, "test-value");
+
+                cache.remove(key, "test-value");
+            }
+
+            tx.commit();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/b8672d7d/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerReadWriteThroughDisabled.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerReadWriteThroughDisabled.java b/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerReadWriteThroughDisabled.java
new file mode 100644
index 0000000..1f6e97d
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerReadWriteThroughDisabled.java
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store;
+
+import java.io.PrintWriter;
+import java.io.Serializable;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.logging.Logger;
+import javax.cache.Cache;
+import javax.cache.configuration.Factory;
+import javax.cache.configuration.FactoryBuilder;
+import javax.cache.integration.CacheLoaderException;
+import javax.cache.integration.CacheWriterException;
+import javax.sql.DataSource;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.store.jdbc.CacheJdbcStoreSessionListener;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.processors.cache.GridCacheAbstractSelfTest;
+
+/**
+ * This class tests that redundant calls of {@link CacheStoreSessionListener#onSessionStart(CacheStoreSession)}
+ * and {@link CacheStoreSessionListener#onSessionEnd(CacheStoreSession, boolean)} are not executed.
+ */
+public abstract class CacheStoreSessionListenerReadWriteThroughDisabled extends GridCacheAbstractSelfTest {
+    /** {@inheritDoc} */
+    protected int gridCount() {
+        return 2;
+    }
+
+    /** */
+    protected final int CNT = 100;
+
+    /** {@inheritDoc} */
+    protected CacheConfiguration cacheConfiguration(String igniteInstanceName) throws Exception {
+        CacheConfiguration cacheCfg = super.cacheConfiguration(igniteInstanceName);
+
+        cacheCfg.setCacheStoreFactory(FactoryBuilder.factoryOf(EmptyCacheStore.class));
+
+        cacheCfg.setCacheStoreSessionListenerFactories(new CacheStoreSessionFactory());
+
+        cacheCfg.setReadThrough(false);
+        cacheCfg.setWriteThrough(false);
+
+        cacheCfg.setBackups(0);
+
+        return cacheCfg;
+    }
+
+    /** {@inheritDoc} */
+    protected NearCacheConfiguration nearConfiguration() {
+        return null;
+    }
+
+    /**
+     * Tests that there are no calls of {@link CacheStoreSessionListener#onSessionStart(CacheStoreSession)} and
+     * {@link CacheStoreSessionListener#onSessionStart(CacheStoreSession)}
+     * while {@link IgniteCache#get(Object)} performed.
+     *
+     * @throws Exception If failed.
+     */
+    public void testLookup() throws Exception {
+        IgniteCache<Object, Object> cache = grid(0).getOrCreateCache(DEFAULT_CACHE_NAME);
+
+        Random r = new Random();
+
+        for (int i = 0; i < CNT; ++i)
+            cache.get(r.nextInt());
+    }
+
+    /**
+     * Tests that there are no calls of {@link CacheStoreSessionListener#onSessionStart(CacheStoreSession)} and
+     * {@link CacheStoreSessionListener#onSessionStart(CacheStoreSession)}
+     * while {@link IgniteCache#getAll(Set)} performed.
+     *
+     * @throws Exception If failed.
+     */
+    public void testBatchLookup() throws Exception {
+        IgniteCache<Object, Object> cache = grid(0).getOrCreateCache(DEFAULT_CACHE_NAME);
+
+        Random r = new Random();
+
+        Set<Object> values = new HashSet<>();
+
+        for (int i = 0; i < CNT; ++i)
+            values.add(r.nextInt());
+
+        cache.getAll(values);
+    }
+
+    /**
+     * Tests that there are no calls of {@link CacheStoreSessionListener#onSessionStart(CacheStoreSession)} and
+     * {@link CacheStoreSessionListener#onSessionStart(CacheStoreSession)}
+     * while {@link IgniteCache#put(Object, Object)} performed.
+     *
+     * @throws Exception If failed.
+     */
+    public void testUpdate() throws Exception {
+        IgniteCache<Object, Object> cache = grid(0).getOrCreateCache(DEFAULT_CACHE_NAME);
+
+        Random r = new Random();
+
+        for (int i = 0; i < CNT; ++i)
+            cache.put(r.nextInt(), "test-value");
+    }
+
+    /**
+     * Tests that there are no calls of {@link CacheStoreSessionListener#onSessionStart(CacheStoreSession)} and
+     * {@link CacheStoreSessionListener#onSessionStart(CacheStoreSession)}
+     * while {@link IgniteCache#putAll(Map)} performed.
+     *
+     * @throws Exception If failed.
+     */
+    public void testBatchUpdate() throws Exception {
+        IgniteCache<Object, Object> cache = grid(0).getOrCreateCache(DEFAULT_CACHE_NAME);
+
+        Random r = new Random();
+
+        Map<Object, Object> values = new TreeMap<>();
+
+        for (int i = 0; i < CNT; ++i)
+            values.put(r.nextInt(), "test-value");
+
+        cache.putAll(values);
+    }
+
+    /**
+     * Tests that there are no calls of {@link CacheStoreSessionListener#onSessionStart(CacheStoreSession)} and
+     * {@link CacheStoreSessionListener#onSessionStart(CacheStoreSession)}
+     * while {@link IgniteCache#remove(Object)} performed.
+     *
+     * @throws Exception If failed.
+     */
+    public void testRemove() throws Exception {
+        IgniteCache<Object, Object> cache = grid(0).getOrCreateCache(DEFAULT_CACHE_NAME);
+
+        Random r = new Random();
+
+        for (int i = 0; i < CNT; ++i) {
+            int key = r.nextInt();
+
+            cache.put(key, "test-value");
+
+            cache.remove(key);
+        }
+    }
+
+    /**
+     * Tests that there are no calls of {@link CacheStoreSessionListener#onSessionStart(CacheStoreSession)} and
+     * {@link CacheStoreSessionListener#onSessionStart(CacheStoreSession)}
+     * while {@link IgniteCache#removeAll(Set)} performed.
+     *
+     * @throws Exception If failed.
+     */
+    public void testBatchRemove() throws Exception {
+        IgniteCache<Object, Object> cache = grid(0).getOrCreateCache(DEFAULT_CACHE_NAME);
+
+        Random r = new Random();
+
+        Set<Object> values = new HashSet<>();
+
+        for (int i = 0; i < CNT; ++i) {
+            int key = r.nextInt();
+
+            cache.put(key, "test-value");
+
+            values.add(key);
+        }
+
+        cache.removeAll(values);
+    }
+
+    /**
+     * Cache store session factory.
+     */
+    public static class CacheStoreSessionFactory implements Factory<TestCacheStoreSessionListener> {
+        /** {@inheritDoc} */
+        @Override public TestCacheStoreSessionListener create() {
+            TestCacheStoreSessionListener lsnr = new TestCacheStoreSessionListener();
+            lsnr.setDataSource(new DataSourceStub());
+            return lsnr;
+        }
+    }
+
+    /**
+     * Test cache store session listener.
+     */
+    public static class TestCacheStoreSessionListener extends CacheJdbcStoreSessionListener {
+        /** {@inheritDoc} */
+        @Override public void onSessionStart(CacheStoreSession ses) {
+            fail("TestCacheStoreSessionListener.onSessionStart(CacheStoreSession) should not be called.");
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onSessionEnd(CacheStoreSession ses, boolean commit) {
+            fail("TestCacheStoreSessionListener.onSessionEnd(CacheStoreSession, boolean) should not be called.");
+        }
+    }
+
+    /** Empty cache store implementation. All overridden methods should not be called while the test is running. */
+    public static class EmptyCacheStore extends CacheStoreAdapter {
+        /** {@inheritDoc} */
+        @Override public Object load(Object key) throws CacheLoaderException {
+            fail("EmptyCacheStore.load(Object) should not be called.");
+
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void write(Cache.Entry entry) throws CacheWriterException {
+            fail("EmptyCacheStore.write(Cache.Entry) should not be called.");
+        }
+
+        /** {@inheritDoc} */
+        @Override public void delete(Object key) throws CacheWriterException {
+            fail("EmptyCacheStore.delete(Object) should not be called.");
+        }
+    }
+
+    /**
+     * Data source stub which should not be called.
+     */
+    public static class DataSourceStub implements DataSource, Serializable {
+        /** {@inheritDoc} */
+        @Override public Connection getConnection() throws SQLException {
+            throw new UnsupportedOperationException();
+        }
+
+        /** {@inheritDoc} */
+        @Override public Connection getConnection(String username, String password) throws SQLException {
+            throw new UnsupportedOperationException();
+        }
+
+        /** {@inheritDoc} */
+        @Override public <T> T unwrap(Class<T> iface) throws SQLException {
+            throw new UnsupportedOperationException();
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean isWrapperFor(Class<?> iface) throws SQLException {
+            throw new UnsupportedOperationException();
+        }
+
+        /** {@inheritDoc} */
+        @Override public PrintWriter getLogWriter() throws SQLException {
+            throw new UnsupportedOperationException();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void setLogWriter(PrintWriter out) throws SQLException {
+            throw new UnsupportedOperationException();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void setLoginTimeout(int seconds) throws SQLException {
+            throw new UnsupportedOperationException();
+        }
+
+        /** {@inheritDoc} */
+        @Override public int getLoginTimeout() throws SQLException {
+            throw new UnsupportedOperationException();
+        }
+
+        /** {@inheritDoc} */
+        @Override public Logger getParentLogger() throws SQLFeatureNotSupportedException {
+            throw new UnsupportedOperationException();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/b8672d7d/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerWriteBehindEnabled.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerWriteBehindEnabled.java b/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerWriteBehindEnabled.java
new file mode 100644
index 0000000..fbb881e
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerWriteBehindEnabled.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store;
+
+import java.io.PrintWriter;
+import java.io.Serializable;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Logger;
+import javax.cache.Cache;
+import javax.cache.configuration.Factory;
+import javax.cache.configuration.FactoryBuilder;
+import javax.cache.integration.CacheLoaderException;
+import javax.cache.integration.CacheWriterException;
+import javax.sql.DataSource;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.store.jdbc.CacheJdbcStoreSessionListener;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.processors.cache.GridCacheAbstractSelfTest;
+import org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindStore;
+import org.apache.ignite.resources.IgniteInstanceResource;
+
+/**
+ * This class tests that calls of {@link CacheStoreSessionListener#onSessionStart(CacheStoreSession)}
+ * and {@link CacheStoreSessionListener#onSessionEnd(CacheStoreSession, boolean)} are executed from
+ * {@link GridCacheWriteBehindStore} only.
+ */
+public class CacheStoreSessionListenerWriteBehindEnabled extends GridCacheAbstractSelfTest {
+    /** */
+    protected final static int CNT = 100;
+
+    /** */
+    private final static int WRITE_BEHIND_FLUSH_FREQUENCY = 1000;
+
+    /** */
+    private static final List<OperationType> operations = Collections.synchronizedList(new ArrayList<OperationType>());
+
+    /** */
+    private static final AtomicInteger entryCnt = new AtomicInteger();
+
+    /** {@inheritDoc} */
+    @Override protected int gridCount() {
+        return 1;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheConfiguration cacheConfiguration(String igniteInstanceName) throws Exception {
+        CacheConfiguration cacheCfg = super.cacheConfiguration(igniteInstanceName);
+
+        cacheCfg.setCacheStoreFactory(FactoryBuilder.factoryOf(EmptyCacheStore.class));
+
+        cacheCfg.setCacheStoreSessionListenerFactories(new CacheStoreSessionFactory());
+
+        cacheCfg.setReadThrough(true);
+        cacheCfg.setWriteThrough(true);
+
+        cacheCfg.setWriteBehindEnabled(true);
+        cacheCfg.setWriteBehindBatchSize(CNT * 2);
+        cacheCfg.setWriteBehindFlushFrequency(WRITE_BEHIND_FLUSH_FREQUENCY);
+
+        cacheCfg.setBackups(0);
+
+        return cacheCfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        operations.clear();
+
+        entryCnt.set(0);
+    }
+
+    /**
+     * Tests that there are no redundant calls of {@link CacheStoreSessionListener#onSessionStart(CacheStoreSession)}
+     * while {@link IgniteCache#get(Object)} performed.
+     */
+    public void testLookup() {
+        IgniteCache<Object, Object> cache = grid(0).getOrCreateCache(DEFAULT_CACHE_NAME);
+
+        for (int i = 0; i < CNT; ++i)
+            cache.get(i);
+
+        checkSessionCounters(CNT);
+    }
+
+    /**
+     * Tests that there are no redundant calls of {@link CacheStoreSessionListener#onSessionStart(CacheStoreSession)}
+     * while {@link IgniteCache#put(Object, Object)} performed.
+     */
+    public void testUpdate() {
+        IgniteCache<Object, Object> cache = grid(0).getOrCreateCache(DEFAULT_CACHE_NAME);
+
+        for (int i = 0; i < CNT; ++i)
+            cache.put(i, i);
+
+        checkSessionCounters(1);
+    }
+
+    /**
+     * Tests that there are no redundant calls of {@link CacheStoreSessionListener#onSessionStart(CacheStoreSession)}
+     * while {@link IgniteCache#remove(Object)} performed.
+     */
+    public void testRemove() {
+        IgniteCache<Object, Object> cache = grid(0).getOrCreateCache(DEFAULT_CACHE_NAME);
+
+        for (int i = 0; i < CNT; ++i) {
+            cache.remove(i);
+        }
+
+        checkSessionCounters(1);
+    }
+
+    /**
+     * @param startedSessions Number of expected sessions.
+     */
+    private void checkSessionCounters(int startedSessions) {
+        try {
+            // Wait for GridCacheWriteBehindStore
+            Thread.sleep(WRITE_BEHIND_FLUSH_FREQUENCY * 4);
+
+            assertEquals(CNT, entryCnt.get());
+
+            checkOpCount(operations, OperationType.SESSION_START, startedSessions);
+
+            checkOpCount(operations, OperationType.SESSION_END, startedSessions);
+        }
+        catch (InterruptedException e) {
+            throw new IgniteException("Failed to wait for the GridCacheWriteBehindStore due to interruption.", e);
+        }
+    }
+
+    /**
+     * @param operations List of {@link OperationType}.
+     * @param op Operation.
+     * @param expected Expected number of operations for the given {@code op}.
+     */
+    private void checkOpCount(List<OperationType> operations, OperationType op, int expected) {
+        int n = 0;
+
+        for (OperationType o : operations) {
+            if (op.equals(o))
+                ++n;
+        }
+
+        assertEquals("Operation=" + op.name(), expected, n);
+    }
+
+    /**
+     * Operation type.
+     */
+    public enum OperationType {
+        /**
+         * Cache store session started.
+         */
+        SESSION_START,
+
+        /**
+         * Cache store session ended.
+         */
+        SESSION_END,
+    }
+
+    /**
+     * Cache store session factory.
+     */
+    public static class CacheStoreSessionFactory implements Factory<TestCacheStoreSessionListener> {
+        /** {@inheritDoc} */
+        @Override public TestCacheStoreSessionListener create() {
+            TestCacheStoreSessionListener lsnr = new TestCacheStoreSessionListener();
+            lsnr.setDataSource(new DataSourceStub());
+            return lsnr;
+        }
+    }
+
+    /**
+     * Test cache store session listener.
+     */
+    public static class TestCacheStoreSessionListener extends CacheJdbcStoreSessionListener {
+        /** */
+        @IgniteInstanceResource
+        private Ignite ignite;
+
+        /** {@inheritDoc} */
+        @Override public void onSessionStart(CacheStoreSession ses) {
+            operations.add(OperationType.SESSION_START);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onSessionEnd(CacheStoreSession ses, boolean commit) {
+            operations.add(OperationType.SESSION_END);
+        }
+    }
+
+    /**
+     * Test cache store.
+     *
+     * {@link EmptyCacheStore#writeAll(Collection)} and {@link EmptyCacheStore#deleteAll(Collection)} should be called
+     * by {@link GridCacheWriteBehindStore}.
+     */
+    public static class EmptyCacheStore extends CacheStoreAdapter<Object, Object> {
+        /** */
+        @IgniteInstanceResource
+        private Ignite ignite;
+
+        /** {@inheritDoc} */
+        @Override public Object load(Object key) throws CacheLoaderException {
+            entryCnt.getAndIncrement();
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void writeAll(Collection<Cache.Entry<?, ?>> entries) {
+            entryCnt.addAndGet(entries.size());
+        }
+
+        /** {@inheritDoc} */
+        @Override public void write(Cache.Entry entry) throws CacheWriterException {
+        }
+
+        /** {@inheritDoc} */
+        @Override public void deleteAll(Collection<?> keys) {
+            entryCnt.addAndGet(keys.size());
+        }
+
+        /** {@inheritDoc} */
+        @Override public void delete(Object key) throws CacheWriterException {
+        }
+    }
+
+    /**
+     * Data source stub which should not be called.
+     */
+    public static class DataSourceStub implements DataSource, Serializable {
+        /** {@inheritDoc} */
+        @Override public Connection getConnection() throws SQLException {
+            throw new UnsupportedOperationException();
+        }
+
+        /** {@inheritDoc} */
+        @Override public Connection getConnection(String username, String password) throws SQLException {
+            throw new UnsupportedOperationException();
+        }
+
+        /** {@inheritDoc} */
+        @Override public <T> T unwrap(Class<T> iface) throws SQLException {
+            throw new UnsupportedOperationException();
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean isWrapperFor(Class<?> iface) throws SQLException {
+            throw new UnsupportedOperationException();
+        }
+
+        /** {@inheritDoc} */
+        @Override public PrintWriter getLogWriter() throws SQLException {
+            throw new UnsupportedOperationException();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void setLogWriter(PrintWriter out) throws SQLException {
+            throw new UnsupportedOperationException();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void setLoginTimeout(int seconds) throws SQLException {
+            throw new UnsupportedOperationException();
+        }
+
+        /** {@inheritDoc} */
+        @Override public int getLoginTimeout() throws SQLException {
+            throw new UnsupportedOperationException();
+        }
+
+        /** {@inheritDoc} */
+        @Override public Logger getParentLogger() throws SQLFeatureNotSupportedException {
+            throw new UnsupportedOperationException();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/b8672d7d/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
index d931ea9..e4930e0 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
@@ -18,6 +18,9 @@
 package org.apache.ignite.testsuites;
 
 import junit.framework.TestSuite;
+import org.apache.ignite.cache.store.CacheStoreListenerRWThroughDisabledAtomicCache;
+import org.apache.ignite.cache.store.CacheStoreListenerRWThroughDisabledTransactionalCache;
+import org.apache.ignite.cache.store.CacheStoreSessionListenerWriteBehindEnabled;
 import org.apache.ignite.cache.store.jdbc.CacheJdbcStoreSessionListenerSelfTest;
 import org.apache.ignite.internal.processors.GridCacheTxLoadFromStoreOnLockSelfTest;
 import org.apache.ignite.internal.processors.cache.CacheClientStoreSelfTest;
@@ -276,6 +279,9 @@ public class IgniteCacheTestSuite4 extends TestSuite {
         suite.addTestSuite(CacheOffheapMapEntrySelfTest.class);
 
         suite.addTestSuite(CacheJdbcStoreSessionListenerSelfTest.class);
+        suite.addTestSuite(CacheStoreListenerRWThroughDisabledAtomicCache.class);
+        suite.addTestSuite(CacheStoreListenerRWThroughDisabledTransactionalCache.class);
+        suite.addTestSuite(CacheStoreSessionListenerWriteBehindEnabled.class);
 
         suite.addTestSuite(CacheClientStoreSelfTest.class);
         suite.addTestSuite(CacheStoreUsageMultinodeStaticStartAtomicTest.class);
@@ -341,4 +347,4 @@ public class IgniteCacheTestSuite4 extends TestSuite {
 
         return suite;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/b8672d7d/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreSessionTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreSessionTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreSessionTest.cs
index 818948c..6c9def3 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreSessionTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreSessionTest.cs
@@ -106,17 +106,8 @@ namespace Apache.Ignite.Core.Tests.Cache.Store
                 tx.Rollback();
             }
 
-            // SessionEnd is called once per store instance.
-            Assert.AreEqual(StoreCount, _dumps.Count);
-
-            foreach (var ops in _dumps)
-            {
-                var op = ops.Single();
-                Assert.AreEqual(OperationType.SesEnd, op.Type);
-                Assert.IsFalse(op.Commit);
-            }
-
-            _dumps = new ConcurrentBag<ICollection<Operation>>();
+            // SessionEnd should not be called.
+            Assert.AreEqual(0, _dumps.Count);
 
             // 2. Test puts.
             using (var tx = ignite.GetTransactions().TxStart())