You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/02 11:01:47 UTC

[25/38] incubator-ignite git commit: IGNITE-891 - Cache store improvements

IGNITE-891 - Cache store improvements


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/ada1b2a7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/ada1b2a7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/ada1b2a7

Branch: refs/heads/ignite-883_1
Commit: ada1b2a7c4d82722cc5721bad50042af7216bfdc
Parents: 990bf9e
Author: Valentin Kulichenko <vk...@gridgain.com>
Authored: Sun May 24 20:42:53 2015 -0700
Committer: Valentin Kulichenko <vk...@gridgain.com>
Committed: Sun May 24 20:42:53 2015 -0700

----------------------------------------------------------------------
 .../hibernate/CacheHibernatePersonStore.java    |  27 +-
 .../store/jdbc/CacheJdbcPersonStore.java        |  69 ++--
 .../store/jdbc/CacheJdbcStoreExample.java       |   3 +-
 .../store/spring/CacheSpringPersonStore.java    | 128 ++++++
 .../store/spring/CacheSpringStoreExample.java   | 143 +++++++
 .../datagrid/store/spring/package-info.java     |  22 ++
 .../processors/cache/GridCacheProcessor.java    |   4 +-
 .../processors/cache/GridCacheUtils.java        |  25 +-
 .../store/GridCacheStoreManagerAdapter.java     |  61 ++-
 ...heStoreSessionListenerLifeCycleSelfTest.java | 395 +++++++++++++++++++
 .../IgniteCrossCacheTxStoreSelfTest.java        |  24 --
 .../junits/common/GridCommonAbstractTest.java   |  24 ++
 .../spring/CacheSpringStoreSessionListener.java |   2 +-
 13 files changed, 810 insertions(+), 117 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ada1b2a7/examples/src/main/java/org/apache/ignite/examples/datagrid/store/hibernate/CacheHibernatePersonStore.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/datagrid/store/hibernate/CacheHibernatePersonStore.java b/examples/src/main/java/org/apache/ignite/examples/datagrid/store/hibernate/CacheHibernatePersonStore.java
index 557ec6f..80a9f22 100644
--- a/examples/src/main/java/org/apache/ignite/examples/datagrid/store/hibernate/CacheHibernatePersonStore.java
+++ b/examples/src/main/java/org/apache/ignite/examples/datagrid/store/hibernate/CacheHibernatePersonStore.java
@@ -54,13 +54,7 @@ public class CacheHibernatePersonStore extends CacheStoreAdapter<Long, Person> {
         Long key = entry.getKey();
         Person val = entry.getValue();
 
-        System.out.println(">>> Store put [key=" + key + ", val=" + val + ']');
-
-        if (val == null) {
-            delete(key);
-
-            return;
-        }
+        System.out.println(">>> Store write [key=" + key + ", val=" + val + ']');
 
         Session hibSes = ses.attachment();
 
@@ -75,13 +69,14 @@ public class CacheHibernatePersonStore extends CacheStoreAdapter<Long, Person> {
     /** {@inheritDoc} */
     @SuppressWarnings({"JpaQueryApiInspection"})
     @Override public void delete(Object key) {
-        System.out.println(">>> Store remove [key=" + key + ']');
+        System.out.println(">>> Store delete [key=" + key + ']');
 
         Session hibSes = ses.attachment();
 
         try {
-            hibSes.createQuery("delete " + Person.class.getSimpleName() + " where key = :key")
-                .setParameter("key", key).setFlushMode(FlushMode.ALWAYS).executeUpdate();
+            hibSes.createQuery("delete " + Person.class.getSimpleName() + " where key = :key").
+                setParameter("key", key).
+                executeUpdate();
         }
         catch (HibernateException e) {
             throw new CacheWriterException("Failed to remove value from cache store [key=" + key + ']', e);
@@ -100,13 +95,13 @@ public class CacheHibernatePersonStore extends CacheStoreAdapter<Long, Person> {
         try {
             int cnt = 0;
 
-            List res = hibSes.createCriteria(Person.class).list();
-
-            if (res != null) {
-                Iterator iter = res.iterator();
+            List list = hibSes.createCriteria(Person.class).
+                setMaxResults(entryCnt).
+                list();
 
-                while (cnt < entryCnt && iter.hasNext()) {
-                    Person person = (Person)iter.next();
+            if (list != null) {
+                for (Object obj : list) {
+                    Person person = (Person)obj;
 
                     clo.apply(person.getId(), person);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ada1b2a7/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/CacheJdbcPersonStore.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/CacheJdbcPersonStore.java b/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/CacheJdbcPersonStore.java
index 6eb0386..ed14a99 100644
--- a/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/CacheJdbcPersonStore.java
+++ b/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/CacheJdbcPersonStore.java
@@ -22,9 +22,11 @@ import org.apache.ignite.cache.store.*;
 import org.apache.ignite.examples.datagrid.store.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.resources.*;
+import org.h2.jdbcx.*;
 
 import javax.cache.*;
 import javax.cache.integration.*;
+import javax.sql.*;
 import java.sql.*;
 
 /**
@@ -32,6 +34,10 @@ import java.sql.*;
  * transaction with cache transactions and maps {@link Long} to {@link Person}.
  */
 public class CacheJdbcPersonStore extends CacheStoreAdapter<Long, Person> {
+    /** Data source. */
+    public static final DataSource DATA_SRC =
+        JdbcConnectionPool.create("jdbc:h2:mem:example;DB_CLOSE_DELAY=-1", "", "");
+
     /** Store session. */
     @CacheStoreSessionResource
     private CacheStoreSession ses;
@@ -52,12 +58,10 @@ public class CacheJdbcPersonStore extends CacheStoreAdapter<Long, Person> {
      * @throws IgniteException If failed.
      */
     private void prepareDb() throws IgniteException {
-        try (
-            Connection conn = DriverManager.getConnection("jdbc:h2:mem:example;DB_CLOSE_DELAY=-1");
-            Statement st = conn.createStatement()
-        ) {
-            st.execute("create table if not exists PERSONS (id number unique, firstName varchar(255), " +
-                "lastName varchar(255))");
+        try (Connection conn = DATA_SRC.getConnection()) {
+            conn.createStatement().execute(
+                "create table if not exists PERSONS (" +
+                "id number unique, firstName varchar(255), lastName varchar(255))");
         }
         catch (SQLException e) {
             throw new IgniteException("Failed to create database table.", e);
@@ -66,34 +70,28 @@ public class CacheJdbcPersonStore extends CacheStoreAdapter<Long, Person> {
 
     /** {@inheritDoc} */
     @Override public Person load(Long key) {
-        System.out.println(">>> Loading key: " + key);
+        System.out.println(">>> Store load [key=" + key + ']');
 
-        try {
-            Connection conn = ses.attachment();
+        Connection conn = ses.attachment();
 
-            try (PreparedStatement st = conn.prepareStatement("select * from PERSONS where id=?")) {
-                st.setString(1, key.toString());
+        try (PreparedStatement st = conn.prepareStatement("select * from PERSONS where id = ?")) {
+            st.setString(1, key.toString());
 
-                ResultSet rs = st.executeQuery();
+            ResultSet rs = st.executeQuery();
 
-                if (rs.next())
-                    return new Person(rs.getLong(1), rs.getString(2), rs.getString(3));
-            }
+            return rs.next() ? new Person(rs.getLong(1), rs.getString(2), rs.getString(3)) : null;
         }
         catch (SQLException e) {
-            throw new CacheLoaderException("Failed to load object: " + key, e);
+            throw new CacheLoaderException("Failed to load object [key=" + key + ']', e);
         }
-
-        return null;
     }
 
     /** {@inheritDoc} */
     @Override public void write(Cache.Entry<? extends Long, ? extends Person> entry) {
         Long key = entry.getKey();
-
         Person val = entry.getValue();
 
-        System.out.println(">>> Putting [key=" + key + ", val=" + val +  ']');
+        System.out.println(">>> Store write [key=" + key + ", val=" + val + ']');
 
         try {
             Connection conn = ses.attachment();
@@ -103,7 +101,7 @@ public class CacheJdbcPersonStore extends CacheStoreAdapter<Long, Person> {
             // Try update first. If it does not work, then try insert.
             // Some databases would allow these to be done in one 'upsert' operation.
             try (PreparedStatement st = conn.prepareStatement(
-                "update PERSONS set firstName=?, lastName=? where id=?")) {
+                "update PERSONS set firstName = ?, lastName = ? where id = ?")) {
                 st.setString(1, val.getFirstName());
                 st.setString(2, val.getLastName());
                 st.setLong(3, val.getId());
@@ -114,7 +112,7 @@ public class CacheJdbcPersonStore extends CacheStoreAdapter<Long, Person> {
             // If update failed, try to insert.
             if (updated == 0) {
                 try (PreparedStatement st = conn.prepareStatement(
-                    "insert into PERSONS (id, firstName, lastName) values(?, ?, ?)")) {
+                    "insert into PERSONS (id, firstName, lastName) values (?, ?, ?)")) {
                     st.setLong(1, val.getId());
                     st.setString(2, val.getFirstName());
                     st.setString(3, val.getLastName());
@@ -124,25 +122,23 @@ public class CacheJdbcPersonStore extends CacheStoreAdapter<Long, Person> {
             }
         }
         catch (SQLException e) {
-            throw new CacheLoaderException("Failed to put object [key=" + key + ", val=" + val + ']', e);
+            throw new CacheWriterException("Failed to write object [key=" + key + ", val=" + val + ']', e);
         }
     }
 
     /** {@inheritDoc} */
     @Override public void delete(Object key) {
-        System.out.println(">>> Removing key: " + key);
+        System.out.println(">>> Store delete [key=" + key + ']');
 
-        try {
-            Connection conn = ses.attachment();
+        Connection conn = ses.attachment();
 
-            try (PreparedStatement st = conn.prepareStatement("delete from PERSONS where id=?")) {
-                st.setLong(1, (Long)key);
+        try (PreparedStatement st = conn.prepareStatement("delete from PERSONS where id=?")) {
+            st.setLong(1, (Long)key);
 
-                st.executeUpdate();
-            }
+            st.executeUpdate();
         }
         catch (SQLException e) {
-            throw new CacheWriterException("Failed to remove object: " + key, e);
+            throw new CacheWriterException("Failed to delete object [key=" + key + ']', e);
         }
     }
 
@@ -155,13 +151,14 @@ public class CacheJdbcPersonStore extends CacheStoreAdapter<Long, Person> {
 
         Connection conn = ses.attachment();
 
-        try (
-            PreparedStatement st = conn.prepareStatement("select * from PERSONS");
-            ResultSet rs = st.executeQuery()
-        ) {
+        try (PreparedStatement stmt = conn.prepareStatement("select * from PERSONS limit ?")) {
+            stmt.setInt(1, entryCnt);
+
+            ResultSet rs = stmt.executeQuery();
+
             int cnt = 0;
 
-            while (cnt < entryCnt && rs.next()) {
+            while (rs.next()) {
                 Person person = new Person(rs.getLong(1), rs.getString(2), rs.getString(3));
 
                 clo.apply(person.getId(), person);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ada1b2a7/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/CacheJdbcStoreExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/CacheJdbcStoreExample.java b/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/CacheJdbcStoreExample.java
index 74e262c..637d6dc 100644
--- a/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/CacheJdbcStoreExample.java
+++ b/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/CacheJdbcStoreExample.java
@@ -24,7 +24,6 @@ import org.apache.ignite.configuration.*;
 import org.apache.ignite.examples.*;
 import org.apache.ignite.examples.datagrid.store.*;
 import org.apache.ignite.transactions.*;
-import org.h2.jdbcx.*;
 
 import javax.cache.configuration.*;
 import java.util.*;
@@ -79,7 +78,7 @@ public class CacheJdbcStoreExample {
                 @Override public CacheStoreSessionListener create() {
                     CacheJdbcStoreSessionListener lsnr = new CacheJdbcStoreSessionListener();
 
-                    lsnr.setDataSource(JdbcConnectionPool.create("jdbc:h2:mem:example;DB_CLOSE_DELAY=-1", "", ""));
+                    lsnr.setDataSource(CacheJdbcPersonStore.DATA_SRC);
 
                     return lsnr;
                 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ada1b2a7/examples/src/main/java/org/apache/ignite/examples/datagrid/store/spring/CacheSpringPersonStore.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/datagrid/store/spring/CacheSpringPersonStore.java b/examples/src/main/java/org/apache/ignite/examples/datagrid/store/spring/CacheSpringPersonStore.java
new file mode 100644
index 0000000..50149ba
--- /dev/null
+++ b/examples/src/main/java/org/apache/ignite/examples/datagrid/store/spring/CacheSpringPersonStore.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.datagrid.store.spring;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.store.*;
+import org.apache.ignite.examples.datagrid.store.*;
+import org.apache.ignite.lang.*;
+import org.springframework.dao.*;
+import org.springframework.jdbc.core.*;
+import org.springframework.jdbc.datasource.*;
+
+import javax.cache.*;
+import javax.cache.integration.*;
+import javax.sql.*;
+import java.sql.*;
+import java.util.concurrent.atomic.*;
+
+/**
+ * Example of {@link CacheStore} implementation that uses JDBC
+ * transaction with cache transactions and maps {@link Long} to {@link Person}.
+ */
+public class CacheSpringPersonStore extends CacheStoreAdapter<Long, Person> {
+    /** Data source. */
+    public static final DataSource DATA_SRC = new DriverManagerDataSource("jdbc:h2:mem:example;DB_CLOSE_DELAY=-1");
+
+    /** Spring JDBC template. */
+    private JdbcTemplate jdbcTemplate;
+
+    /**
+     * Constructor.
+     *
+     * @throws IgniteException If failed.
+     */
+    public CacheSpringPersonStore() throws IgniteException {
+        jdbcTemplate = new JdbcTemplate(DATA_SRC);
+
+        prepareDb();
+    }
+
+    /**
+     * Prepares database for example execution. This method will create a
+     * table called "PERSONS" so it can be used by store implementation.
+     *
+     * @throws IgniteException If failed.
+     */
+    private void prepareDb() throws IgniteException {
+        jdbcTemplate.update(
+            "create table if not exists PERSONS (" +
+            "id number unique, firstName varchar(255), lastName varchar(255))");
+    }
+
+    /** {@inheritDoc} */
+    @Override public Person load(Long key) {
+        System.out.println(">>> Store load [key=" + key + ']');
+
+        try {
+            return jdbcTemplate.queryForObject("select * from PERSONS where id = ?", new RowMapper<Person>() {
+                @Override public Person mapRow(ResultSet rs, int rowNum) throws SQLException {
+                    return new Person(rs.getLong(1), rs.getString(2), rs.getString(3));
+                }
+            }, key);
+        }
+        catch (EmptyResultDataAccessException ignored) {
+            return null;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void write(Cache.Entry<? extends Long, ? extends Person> entry) {
+        Long key = entry.getKey();
+        Person val = entry.getValue();
+
+        System.out.println(">>> Store write [key=" + key + ", val=" + val + ']');
+
+        int updated = jdbcTemplate.update("update PERSONS set firstName = ?, lastName = ? where id = ?",
+            val.getFirstName(), val.getLastName(), val.getId());
+
+        if (updated == 0) {
+            jdbcTemplate.update("insert into PERSONS (id, firstName, lastName) values (?, ?, ?)",
+                val.getId(), val.getFirstName(), val.getLastName());
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void delete(Object key) {
+        System.out.println(">>> Store delete [key=" + key + ']');
+
+        jdbcTemplate.update("delete from PERSONS where id = ?", key);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void loadCache(final IgniteBiInClosure<Long, Person> clo, Object... args) {
+        if (args == null || args.length == 0 || args[0] == null)
+            throw new CacheLoaderException("Expected entry count parameter is not provided.");
+
+        int entryCnt = (Integer)args[0];
+
+        final AtomicInteger cnt = new AtomicInteger();
+
+        jdbcTemplate.query("select * from PERSONS limit ?", new RowCallbackHandler() {
+            @Override public void processRow(ResultSet rs) throws SQLException {
+                Person person = new Person(rs.getLong(1), rs.getString(2), rs.getString(3));
+
+                clo.apply(person.getId(), person);
+
+                cnt.incrementAndGet();
+            }
+        }, entryCnt);
+
+        System.out.println(">>> Loaded " + cnt + " values into cache.");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ada1b2a7/examples/src/main/java/org/apache/ignite/examples/datagrid/store/spring/CacheSpringStoreExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/datagrid/store/spring/CacheSpringStoreExample.java b/examples/src/main/java/org/apache/ignite/examples/datagrid/store/spring/CacheSpringStoreExample.java
new file mode 100644
index 0000000..9be6672
--- /dev/null
+++ b/examples/src/main/java/org/apache/ignite/examples/datagrid/store/spring/CacheSpringStoreExample.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.datagrid.store.spring;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.store.*;
+import org.apache.ignite.cache.store.jdbc.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.examples.*;
+import org.apache.ignite.examples.datagrid.store.*;
+import org.apache.ignite.transactions.*;
+
+import javax.cache.configuration.*;
+import java.util.*;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+
+/**
+ * Demonstrates usage of cache with underlying persistent store configured.
+ * <p>
+ * This example uses {@link CacheSpringPersonStore} as a persistent store.
+ * <p>
+ * Remote nodes can be started with {@link ExampleNodeStartup} in another JVM which will
+ * start node with {@code examples/config/example-ignite.xml} configuration.
+ */
+public class CacheSpringStoreExample {
+    /** Cache name. */
+    private static final String CACHE_NAME = CacheSpringStoreExample.class.getSimpleName();
+
+    /** Heap size required to run this example. */
+    public static final int MIN_MEMORY = 1024 * 1024 * 1024;
+
+    /** Number of entries to load. */
+    private static final int ENTRY_COUNT = 100_000;
+
+    /** Global person ID to use across entire example. */
+    private static final Long id = Math.abs(UUID.randomUUID().getLeastSignificantBits());
+
+    /**
+     * Executes example.
+     *
+     * @param args Command line arguments, none required.
+     * @throws IgniteException If example execution failed.
+     */
+    public static void main(String[] args) throws IgniteException {
+        ExamplesUtils.checkMinMemory(MIN_MEMORY);
+
+        // To start ignite with desired configuration uncomment the appropriate line.
+        try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) {
+            System.out.println();
+            System.out.println(">>> Cache store example started.");
+
+            CacheConfiguration<Long, Person> cacheCfg = new CacheConfiguration<>(CACHE_NAME);
+
+            // Set atomicity as transaction, since we are showing transactions in example.
+            cacheCfg.setAtomicityMode(TRANSACTIONAL);
+
+            // Configure JDBC store.
+            cacheCfg.setCacheStoreFactory(FactoryBuilder.factoryOf(CacheSpringPersonStore.class));
+
+            // Configure JDBC session listener.
+            cacheCfg.setCacheStoreSessionListenerFactories(new Factory<CacheStoreSessionListener>() {
+                @Override public CacheStoreSessionListener create() {
+                    CacheJdbcStoreSessionListener lsnr = new CacheJdbcStoreSessionListener();
+
+                    lsnr.setDataSource(CacheSpringPersonStore.DATA_SRC);
+
+                    return lsnr;
+                }
+            });
+
+            cacheCfg.setReadThrough(true);
+            cacheCfg.setWriteThrough(true);
+
+            try (IgniteCache<Long, Person> cache = ignite.getOrCreateCache(cacheCfg)) {
+                // Make initial cache loading from persistent store. This is a
+                // distributed operation and will call CacheStore.loadCache(...)
+                // method on all nodes in topology.
+                loadCache(cache);
+
+                // Start transaction and execute several cache operations with
+                // read/write-through to persistent store.
+                executeTransaction(cache);
+            }
+        }
+    }
+
+    /**
+     * Makes initial cache loading.
+     *
+     * @param cache Cache to load.
+     */
+    private static void loadCache(IgniteCache<Long, Person> cache) {
+        long start = System.currentTimeMillis();
+
+        // Start loading cache from persistent store on all caching nodes.
+        cache.loadCache(null, ENTRY_COUNT);
+
+        long end = System.currentTimeMillis();
+
+        System.out.println(">>> Loaded " + cache.size() + " keys with backups in " + (end - start) + "ms.");
+    }
+
+    /**
+     * Executes transaction with read/write-through to persistent store.
+     *
+     * @param cache Cache to execute transaction on.
+     */
+    private static void executeTransaction(IgniteCache<Long, Person> cache) {
+        try (Transaction tx = Ignition.ignite().transactions().txStart()) {
+            Person val = cache.get(id);
+
+            System.out.println("Read value: " + val);
+
+            val = cache.getAndPut(id, new Person(id, "Isaac", "Newton"));
+
+            System.out.println("Overwrote old value: " + val);
+
+            val = cache.get(id);
+
+            System.out.println("Read value: " + val);
+
+            tx.commit();
+        }
+
+        System.out.println("Read value after commit: " + cache.get(id));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ada1b2a7/examples/src/main/java/org/apache/ignite/examples/datagrid/store/spring/package-info.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/datagrid/store/spring/package-info.java b/examples/src/main/java/org/apache/ignite/examples/datagrid/store/spring/package-info.java
new file mode 100644
index 0000000..211239f
--- /dev/null
+++ b/examples/src/main/java/org/apache/ignite/examples/datagrid/store/spring/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * <!-- Package description. -->
+ * Contains Spring-based cache store implementation.
+ */
+package org.apache.ignite.examples.datagrid.store.spring;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ada1b2a7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 5b57817..4457f98 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -567,7 +567,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
         CacheConfiguration[] cfgs = ctx.config().getCacheConfiguration();
 
-        sharedCtx = createSharedContext(ctx, CU.createStoreSessionListeners(ctx,
+        sharedCtx = createSharedContext(ctx, CU.startStoreSessionListeners(ctx,
             ctx.config().getCacheStoreSessionListenerFactories()));
 
         ctx.performance().add("Disable serializable transactions (set 'txSerializableEnabled' to false)",
@@ -813,6 +813,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             mgr.stop(cancel);
         }
 
+        CU.stopStoreSessionListeners(ctx, sharedCtx.storeSessionListeners());
+
         sharedCtx.cleanup();
 
         if (log.isDebugEnabled())

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ada1b2a7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index 6968fcb..7096da5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -1795,13 +1795,14 @@ public class GridCacheUtils {
     }
 
     /**
-     * Creates store session listeners.
+     * Creates and starts store session listeners.
      *
      * @param ctx Kernal context.
      * @param factories Factories.
      * @return Listeners.
+     * @throws IgniteCheckedException In case of error.
      */
-    public static Collection<CacheStoreSessionListener> createStoreSessionListeners(GridKernalContext ctx,
+    public static Collection<CacheStoreSessionListener> startStoreSessionListeners(GridKernalContext ctx,
         Factory<CacheStoreSessionListener>[] factories) throws IgniteCheckedException {
         if (factories == null)
             return null;
@@ -1823,4 +1824,24 @@ public class GridCacheUtils {
 
         return lsnrs;
     }
+
+    /**
+     * Stops store session listeners.
+     *
+     * @param ctx Kernal context.
+     * @param sesLsnrs Session listeners.
+     * @throws IgniteCheckedException In case of error.
+     */
+    public static void stopStoreSessionListeners(GridKernalContext ctx, Collection<CacheStoreSessionListener> sesLsnrs)
+        throws IgniteCheckedException {
+        if (sesLsnrs == null)
+            return;
+
+        for (CacheStoreSessionListener lsnr : sesLsnrs) {
+            if (lsnr instanceof LifecycleAware)
+                ((LifecycleAware)lsnr).stop();
+
+            ctx.resource().cleanupGeneric(lsnr);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ada1b2a7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
index 11d711c..bc5a0a2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
@@ -70,6 +70,9 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
     /** */
     private Collection<CacheStoreSessionListener> sesLsnrs;
 
+    /** */
+    private boolean globalSesLsnrs;
+
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
     @Override public void initialize(@Nullable CacheStore cfgStore, Map sesHolders) throws IgniteCheckedException {
@@ -166,10 +169,13 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
                 "Persistence store is configured, but both read-through and write-through are disabled.");
         }
 
-        sesLsnrs = CU.createStoreSessionListeners(cctx.kernalContext(), cfg.getCacheStoreSessionListenerFactories());
+        sesLsnrs = CU.startStoreSessionListeners(cctx.kernalContext(), cfg.getCacheStoreSessionListenerFactories());
 
-        if (sesLsnrs == null)
+        if (sesLsnrs == null) {
             sesLsnrs = cctx.shared().storeSessionListeners();
+
+            globalSesLsnrs = true;
+        }
     }
 
     /** {@inheritDoc} */
@@ -187,18 +193,12 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
             }
         }
 
-        if (sesLsnrs != null) {
-            for (CacheStoreSessionListener lsnr : sesLsnrs) {
-                if (lsnr instanceof LifecycleAware)
-                    ((LifecycleAware)lsnr).stop();
-
-                try {
-                    cctx.kernalContext().resource().cleanupGeneric(lsnr);
-                }
-                catch (IgniteCheckedException e) {
-                    U.error(log, "Failed to remove injected resources from store session listener (ignoring): " +
-                        lsnr, e);
-                }
+        if (!globalSesLsnrs) {
+            try {
+                CU.stopStoreSessionListeners(cctx.kernalContext(), sesLsnrs);
+            }
+            catch (IgniteCheckedException e) {
+                U.error(log, "Failed to stop store session listeners for cache: " + cctx.name(), e);
             }
         }
     }
@@ -721,7 +721,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
                     lsnr.onSessionEnd(locSes, commit);
             }
 
-            if (!sesHolder.get().storeEnded(store))
+            if (!sesHolder.get().ended(store))
                 store.sessionEnd(commit);
         }
         catch (Throwable e) {
@@ -788,13 +788,9 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
 
         sesHolder.set(ses);
 
-        if (!ses.started()) {
-            if (sesLsnrs != null) {
-                for (CacheStoreSessionListener lsnr : sesLsnrs)
-                    lsnr.onSessionStart(locSes);
-            }
-
-            ses.onStarted();
+        if (sesLsnrs != null && !ses.started(this)) {
+            for (CacheStoreSessionListener lsnr : sesLsnrs)
+                lsnr.onSessionStart(locSes);
         }
     }
 
@@ -809,7 +805,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
                         lsnr.onSessionEnd(locSes, !threwEx);
                 }
 
-                assert !sesHolder.get().storeEnded(store);
+                assert !sesHolder.get().ended(store);
 
                 store.sessionEnd(!threwEx);
             }
@@ -858,10 +854,11 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
         private Object attachment;
 
         /** */
-        private boolean started;
+        private final Set<CacheStoreManager> started =
+            new GridSetWrapper<>(new IdentityHashMap<CacheStoreManager, Object>());
 
         /** */
-        private final Set<CacheStore> endedStores = new GridSetWrapper<>(new IdentityHashMap<CacheStore, Object>());
+        private final Set<CacheStore> ended = new GridSetWrapper<>(new IdentityHashMap<CacheStore, Object>());
 
         /**
          * @param tx Current transaction.
@@ -918,24 +915,18 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
         }
 
         /**
-         */
-        private void onStarted() {
-            started = true;
-        }
-
-        /**
          * @return If session is started.
          */
-        private boolean started() {
-            return started;
+        private boolean started(CacheStoreManager mgr) {
+            return !started.add(mgr);
         }
 
         /**
          * @param store Cache store.
          * @return Whether session already ended on this store instance.
          */
-        private boolean storeEnded(CacheStore store) {
-            return !endedStores.add(store);
+        private boolean ended(CacheStore store) {
+            return !ended.add(store);
         }
 
         /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ada1b2a7/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerLifeCycleSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerLifeCycleSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerLifeCycleSelfTest.java
new file mode 100644
index 0000000..814c8a5
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerLifeCycleSelfTest.java
@@ -0,0 +1,395 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store;
+
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.lifecycle.*;
+import org.apache.ignite.resources.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.junits.common.*;
+import org.apache.ignite.transactions.*;
+
+import javax.cache.*;
+import javax.cache.configuration.*;
+import javax.cache.integration.*;
+import java.util.*;
+import java.util.concurrent.*;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+
+/**
+ * Store session listeners test.
+ */
+public class CacheStoreSessionListenerLifecycleSelfTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static final Queue<String> evts = new ConcurrentLinkedDeque<>();
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setCacheStoreSessionListenerFactories(
+            new SessionListenerFactory("Shared 1"),
+            new SessionListenerFactory("Shared 2")
+        );
+
+        TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+        disco.setIpFinder(IP_FINDER);
+
+        cfg.setDiscoverySpi(disco);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        evts.clear();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNoCaches() throws Exception {
+        try {
+            startGrid();
+        }
+        finally {
+            stopGrid();
+        }
+
+        assertEqualsCollections(Arrays.asList("Shared 1 START", "Shared 2 START", "Shared 1 STOP", "Shared 2 STOP"),
+            evts);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNoOverride() throws Exception {
+        try {
+            Ignite ignite = startGrid();
+
+            for (int i = 0; i < 2; i++) {
+                CacheConfiguration<Integer, Integer> cacheCfg = cacheConfiguration("cache-" + i);
+
+                cacheCfg.setAtomicityMode(TRANSACTIONAL);
+
+                ignite.createCache(cacheCfg);
+            }
+
+            ignite.cache("cache-0").put(1, 1);
+            ignite.cache("cache-1").put(1, 1);
+
+            try (Transaction tx = ignite.transactions().txStart()) {
+                ignite.cache("cache-0").put(2, 2);
+                ignite.cache("cache-0").put(3, 3);
+                ignite.cache("cache-1").put(2, 2);
+                ignite.cache("cache-1").put(3, 3);
+
+                tx.commit();
+            }
+        }
+        finally {
+            stopGrid();
+        }
+
+        assertEqualsCollections(Arrays.asList(
+            "Shared 1 START",
+            "Shared 2 START",
+
+            // Put to cache-0.
+            "Shared 1 SESSION START cache-0",
+            "Shared 2 SESSION START cache-0",
+            "Shared 1 SESSION END cache-0",
+            "Shared 2 SESSION END cache-0",
+
+            // Put to cache-1.
+            "Shared 1 SESSION START cache-1",
+            "Shared 2 SESSION START cache-1",
+            "Shared 1 SESSION END cache-1",
+            "Shared 2 SESSION END cache-1",
+
+            // Transaction.
+            "Shared 1 SESSION START cache-0",
+            "Shared 2 SESSION START cache-0",
+            "Shared 1 SESSION START cache-1",
+            "Shared 2 SESSION START cache-1",
+            "Shared 1 SESSION END cache-0",
+            "Shared 2 SESSION END cache-0",
+            "Shared 1 SESSION END cache-1",
+            "Shared 2 SESSION END cache-1",
+
+            "Shared 1 STOP",
+            "Shared 2 STOP"
+        ), evts);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPartialOverride() throws Exception {
+        try {
+            Ignite ignite = startGrid();
+
+            for (int i = 0; i < 2; i++) {
+                String name = "cache-" + i;
+
+                CacheConfiguration cacheCfg = cacheConfiguration(name);
+
+                cacheCfg.setAtomicityMode(TRANSACTIONAL);
+
+                if (i == 0) {
+                    cacheCfg.setCacheStoreSessionListenerFactories(
+                        new SessionListenerFactory(name + " 1"),
+                        new SessionListenerFactory(name + " 2")
+                    );
+                }
+
+                ignite.createCache(cacheCfg);
+            }
+
+            ignite.cache("cache-0").put(1, 1);
+            ignite.cache("cache-1").put(1, 1);
+
+            try (Transaction tx = ignite.transactions().txStart()) {
+                ignite.cache("cache-0").put(2, 2);
+                ignite.cache("cache-0").put(3, 3);
+                ignite.cache("cache-1").put(2, 2);
+                ignite.cache("cache-1").put(3, 3);
+
+                tx.commit();
+            }
+        }
+        finally {
+            stopGrid();
+        }
+
+        assertEqualsCollections(Arrays.asList(
+            "Shared 1 START",
+            "Shared 2 START",
+            "cache-0 1 START",
+            "cache-0 2 START",
+
+            // Put to cache-0.
+            "cache-0 1 SESSION START cache-0",
+            "cache-0 2 SESSION START cache-0",
+            "cache-0 1 SESSION END cache-0",
+            "cache-0 2 SESSION END cache-0",
+
+            // Put to cache-1.
+            "Shared 1 SESSION START cache-1",
+            "Shared 2 SESSION START cache-1",
+            "Shared 1 SESSION END cache-1",
+            "Shared 2 SESSION END cache-1",
+
+            // Transaction.
+            "cache-0 1 SESSION START cache-0",
+            "cache-0 2 SESSION START cache-0",
+            "Shared 1 SESSION START cache-1",
+            "Shared 2 SESSION START cache-1",
+            "cache-0 1 SESSION END cache-0",
+            "cache-0 2 SESSION END cache-0",
+            "Shared 1 SESSION END cache-1",
+            "Shared 2 SESSION END cache-1",
+
+            "cache-0 1 STOP",
+            "cache-0 2 STOP",
+            "Shared 1 STOP",
+            "Shared 2 STOP"
+        ), evts);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testOverride() throws Exception {
+        try {
+            Ignite ignite = startGrid();
+
+            for (int i = 0; i < 2; i++) {
+                String name = "cache-" + i;
+
+                CacheConfiguration cacheCfg = cacheConfiguration(name);
+
+                cacheCfg.setCacheStoreSessionListenerFactories(new SessionListenerFactory(name + " 1"), new SessionListenerFactory(name + " 2"));
+
+                ignite.createCache(cacheCfg);
+            }
+
+            ignite.cache("cache-0").put(1, 1);
+            ignite.cache("cache-1").put(1, 1);
+
+            try (Transaction tx = ignite.transactions().txStart()) {
+                ignite.cache("cache-0").put(2, 2);
+                ignite.cache("cache-0").put(3, 3);
+                ignite.cache("cache-1").put(2, 2);
+                ignite.cache("cache-1").put(3, 3);
+
+                tx.commit();
+            }
+        }
+        finally {
+            stopGrid();
+        }
+
+        assertEqualsCollections(Arrays.asList(
+            "Shared 1 START",
+            "Shared 2 START",
+            "cache-0 1 START",
+            "cache-0 2 START",
+            "cache-1 1 START",
+            "cache-1 2 START",
+
+            // Put to cache-0.
+            "cache-0 1 SESSION START cache-0",
+            "cache-0 2 SESSION START cache-0",
+            "cache-0 1 SESSION END cache-0",
+            "cache-0 2 SESSION END cache-0",
+
+            // Put to cache-1.
+            "cache-1 1 SESSION START cache-1",
+            "cache-1 2 SESSION START cache-1",
+            "cache-1 1 SESSION END cache-1",
+            "cache-1 2 SESSION END cache-1",
+
+            // Transaction.
+            "cache-0 1 SESSION START cache-0",
+            "cache-0 2 SESSION START cache-0",
+            "cache-1 1 SESSION START cache-1",
+            "cache-1 2 SESSION START cache-1",
+            "cache-0 1 SESSION END cache-0",
+            "cache-0 2 SESSION END cache-0",
+            "cache-1 1 SESSION END cache-1",
+            "cache-1 2 SESSION END cache-1",
+
+            "cache-0 1 STOP",
+            "cache-0 2 STOP",
+            "cache-1 1 STOP",
+            "cache-1 2 STOP",
+            "Shared 1 STOP",
+            "Shared 2 STOP"
+        ), evts);
+    }
+
+    /**
+     * @param name Cache name.
+     * @return Cache configuration.
+     */
+    private CacheConfiguration<Integer, Integer> cacheConfiguration(String name) {
+        CacheConfiguration<Integer, Integer> cacheCfg = new CacheConfiguration<>(name);
+
+        cacheCfg.setAtomicityMode(TRANSACTIONAL);
+        cacheCfg.setCacheStoreFactory(FactoryBuilder.factoryOf(Store.class));
+        cacheCfg.setWriteThrough(true);
+
+        return cacheCfg;
+    }
+
+    /**
+     */
+    private static class SessionListener implements CacheStoreSessionListener, LifecycleAware {
+        /** */
+        private final String name;
+
+        /** */
+        @IgniteInstanceResource
+        private Ignite ignite;
+
+        /**
+         * @param name Name.
+         */
+        private SessionListener(String name) {
+            this.name = name;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void start() throws IgniteException {
+            assertNotNull(ignite);
+
+            evts.add(name + " START");
+        }
+
+        /** {@inheritDoc} */
+        @Override public void stop() throws IgniteException {
+            assertNotNull(ignite);
+
+            evts.add(name + " STOP");
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onSessionStart(CacheStoreSession ses) {
+            assertNotNull(ignite);
+
+            evts.add(name + " SESSION START " + ses.cacheName());
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onSessionEnd(CacheStoreSession ses, boolean commit) {
+            assertNotNull(ignite);
+
+            evts.add(name + " SESSION END " + ses.cacheName());
+        }
+    }
+
+    /**
+     */
+    private static class SessionListenerFactory implements Factory<CacheStoreSessionListener> {
+        /** */
+        private String name;
+
+        /**
+         * @param name Name.
+         */
+        private SessionListenerFactory(String name) {
+            this.name = name;
+        }
+
+        @Override public CacheStoreSessionListener create() {
+            return new SessionListener(name);
+        }
+    }
+
+    /**
+     */
+    public static class Store extends CacheStoreAdapter<Integer, Integer> {
+        public Store() {
+        }
+
+        /** {@inheritDoc} */
+        @Override public Integer load(Integer key) throws CacheLoaderException {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void write(Cache.Entry<? extends Integer, ? extends Integer> entry)
+            throws CacheWriterException {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public void delete(Object key) throws CacheWriterException {
+            // No-op.
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ada1b2a7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java
index f72ea47..f2de8ce 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java
@@ -266,30 +266,6 @@ public class IgniteCrossCacheTxStoreSelfTest extends GridCommonAbstractTest {
     }
 
     /**
-     * @param col1 Collection 1.
-     * @param col2 Collection 2.
-     */
-    private static void assertEqualsCollections(Collection<?> col1, Collection<?> col2) {
-        if (col1.size() != col2.size())
-            fail("Collections are not equal:\nExpected:\t" + col1 + "\nActual:\t" + col2);
-
-        Iterator<?> it1 = col1.iterator();
-        Iterator<?> it2 = col2.iterator();
-
-        int idx = 0;
-
-        while (it1.hasNext()) {
-            Object item1 = it1.next();
-            Object item2 = it2.next();
-
-            if (!F.eq(item1, item2))
-                fail("Collections are not equal (position " + idx + "):\nExpected: " + col1 + "\nActual:   " + col2);
-
-            idx++;
-        }
-    }
-
-    /**
      *
      */
     private static class TestStore implements CacheStore<Object, Object> {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ada1b2a7/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
index 5533897..a19ea23 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
@@ -858,4 +858,28 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
             ccfg.getAtomicWriteOrderMode() == CacheAtomicWriteOrderMode.CLOCK)
             U.sleep(50);
     }
+
+    /**
+     * @param exp Expected.
+     * @param act Actual.
+     */
+    protected void assertEqualsCollections(Collection<?> exp, Collection<?> act) {
+        if (exp.size() != act.size())
+            fail("Collections are not equal:\nExpected:\t" + exp + "\nActual:\t" + act);
+
+        Iterator<?> it1 = exp.iterator();
+        Iterator<?> it2 = act.iterator();
+
+        int idx = 0;
+
+        while (it1.hasNext()) {
+            Object item1 = it1.next();
+            Object item2 = it2.next();
+
+            if (!F.eq(item1, item2))
+                fail("Collections are not equal (position " + idx + "):\nExpected: " + exp + "\nActual:   " + act);
+
+            idx++;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ada1b2a7/modules/spring/src/main/java/org/apache/ignite/cache/store/spring/CacheSpringStoreSessionListener.java
----------------------------------------------------------------------
diff --git a/modules/spring/src/main/java/org/apache/ignite/cache/store/spring/CacheSpringStoreSessionListener.java b/modules/spring/src/main/java/org/apache/ignite/cache/store/spring/CacheSpringStoreSessionListener.java
index 81736cd..90431d7 100644
--- a/modules/spring/src/main/java/org/apache/ignite/cache/store/spring/CacheSpringStoreSessionListener.java
+++ b/modules/spring/src/main/java/org/apache/ignite/cache/store/spring/CacheSpringStoreSessionListener.java
@@ -154,7 +154,7 @@ public class CacheSpringStoreSessionListener implements CacheStoreSessionListene
 
     /** {@inheritDoc} */
     @Override public void onSessionStart(CacheStoreSession ses) {
-        if (ses.isWithinTransaction()) {
+        if (ses.isWithinTransaction() && ses.attachment() == null) {
             try {
                 TransactionDefinition def = definition(ses.transaction(), ses.cacheName());