You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/02 11:01:39 UTC

[17/38] incubator-ignite git commit: IGNITE-891 - Cache store improvements

IGNITE-891 - Cache store improvements


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/b37d0046
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/b37d0046
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/b37d0046

Branch: refs/heads/ignite-883_1
Commit: b37d00465f1a162436082660ec69a1f765492373
Parents: 80ebfe0
Author: Valentin Kulichenko <vk...@gridgain.com>
Authored: Thu May 21 18:52:15 2015 -0700
Committer: Valentin Kulichenko <vk...@gridgain.com>
Committed: Thu May 21 18:52:15 2015 -0700

----------------------------------------------------------------------
 .../jdbc/CacheStoreSessionJdbcListener.java     | 16 +++-
 .../processors/cache/GridCacheProcessor.java    | 10 ++-
 .../cache/GridCacheSharedContext.java           | 15 +++-
 .../processors/cache/GridCacheUtils.java        | 33 +++++++
 .../store/GridCacheStoreManagerAdapter.java     | 44 +++-------
 .../cache/transactions/IgniteTxAdapter.java     | 13 ++-
 .../transactions/IgniteTxLocalAdapter.java      | 27 ++----
 .../loadtests/hashmap/GridCacheTestContext.java |  3 +-
 .../CacheStoreSessionHibernateListener.java     | 82 +++++++++++++++++-
 modules/spring/pom.xml                          | 12 +--
 .../spring/CacheStoreSessionSpringListener.java | 90 ++++++++++++++++++--
 ...CacheStoreSessionSpringListenerSelfTest.java |  2 +-
 12 files changed, 271 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b37d0046/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheStoreSessionJdbcListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheStoreSessionJdbcListener.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheStoreSessionJdbcListener.java
index c683abe..e4cd617 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheStoreSessionJdbcListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheStoreSessionJdbcListener.java
@@ -17,9 +17,10 @@
 
 package org.apache.ignite.cache.store.jdbc;
 
+import org.apache.ignite.*;
 import org.apache.ignite.cache.store.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.resources.*;
+import org.apache.ignite.lifecycle.*;
 
 import javax.cache.integration.*;
 import javax.sql.*;
@@ -29,7 +30,7 @@ import java.util.*;
 /**
  * Cache store session listener based on JDBC connection.
  */
-public class CacheStoreSessionJdbcListener implements CacheStoreSessionListener {
+public class CacheStoreSessionJdbcListener implements CacheStoreSessionListener, LifecycleAware {
     /** Session key for JDBC connection. */
     public static final String JDBC_CONN_KEY = "__jdbc_conn_";
 
@@ -57,6 +58,17 @@ public class CacheStoreSessionJdbcListener implements CacheStoreSessionListener
     }
 
     /** {@inheritDoc} */
+    @Override public void start() throws IgniteException {
+        if (dataSrc == null)
+            throw new IgniteException("Data source is required by " + getClass().getSimpleName() + '.');
+    }
+
+    /** {@inheritDoc} */
+    @Override public void stop() throws IgniteException {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
     @Override public void onSessionStart(CacheStoreSession ses) {
         Map<String, Connection> props = ses.properties();
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b37d0046/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 0e1a9c2..5b57817 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -567,7 +567,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
         CacheConfiguration[] cfgs = ctx.config().getCacheConfiguration();
 
-        sharedCtx = createSharedContext(ctx);
+        sharedCtx = createSharedContext(ctx, CU.createStoreSessionListeners(ctx,
+            ctx.config().getCacheStoreSessionListenerFactories()));
 
         ctx.performance().add("Disable serializable transactions (set 'txSerializableEnabled' to false)",
             !ctx.config().getTransactionConfiguration().isTxSerializableEnabled());
@@ -1562,10 +1563,12 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      * Creates shared context.
      *
      * @param kernalCtx Kernal context.
+     * @param storeSesLsnrs Store session listeners.
      * @return Shared context.
      */
     @SuppressWarnings("unchecked")
-    private GridCacheSharedContext createSharedContext(GridKernalContext kernalCtx) {
+    private GridCacheSharedContext createSharedContext(GridKernalContext kernalCtx,
+        Collection<CacheStoreSessionListener> storeSesLsnrs) {
         IgniteTxManager tm = new IgniteTxManager();
         GridCacheMvccManager mvccMgr = new GridCacheMvccManager();
         GridCacheVersionManager verMgr = new GridCacheVersionManager();
@@ -1580,7 +1583,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             mvccMgr,
             depMgr,
             exchMgr,
-            ioMgr
+            ioMgr,
+            storeSesLsnrs
         );
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b37d0046/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
index b16885e..45634b4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.cache;
 
 import org.apache.ignite.*;
+import org.apache.ignite.cache.store.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.internal.*;
@@ -77,6 +78,9 @@ public class GridCacheSharedContext<K, V> {
     /** Preloaders start future. */
     private IgniteInternalFuture<Object> preloadersStartFut;
 
+    /** Store session listeners. */
+    private Collection<CacheStoreSessionListener> storeSesLsnrs;
+
     /**
      * @param txMgr Transaction manager.
      * @param verMgr Version manager.
@@ -89,7 +93,8 @@ public class GridCacheSharedContext<K, V> {
         GridCacheMvccManager mvccMgr,
         GridCacheDeploymentManager<K, V> depMgr,
         GridCachePartitionExchangeManager<K, V> exchMgr,
-        GridCacheIoManager ioMgr
+        GridCacheIoManager ioMgr,
+        Collection<CacheStoreSessionListener> storeSesLsnrs
     ) {
         this.kernalCtx = kernalCtx;
         this.mvccMgr = add(mvccMgr);
@@ -98,6 +103,7 @@ public class GridCacheSharedContext<K, V> {
         this.depMgr = add(depMgr);
         this.exchMgr = add(exchMgr);
         this.ioMgr = add(ioMgr);
+        this.storeSesLsnrs = storeSesLsnrs;
 
         txMetrics = new TransactionMetricsAdapter();
 
@@ -524,6 +530,13 @@ public class GridCacheSharedContext<K, V> {
     }
 
     /**
+     * @return Store session listeners.
+     */
+    @Nullable public Collection<CacheStoreSessionListener> storeSessionListeners() {
+        return storeSesLsnrs;
+    }
+
+    /**
      * @param mgr Manager to add.
      * @return Added manager.
      */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b37d0046/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index 549f42f..6968fcb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache;
 import org.apache.ignite.*;
 import org.apache.ignite.cache.*;
 import org.apache.ignite.cache.affinity.*;
+import org.apache.ignite.cache.store.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.internal.*;
@@ -34,12 +35,14 @@ import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lang.*;
+import org.apache.ignite.lifecycle.*;
 import org.apache.ignite.plugin.*;
 import org.apache.ignite.transactions.*;
 import org.jetbrains.annotations.*;
 import org.jsr166.*;
 
 import javax.cache.*;
+import javax.cache.configuration.*;
 import javax.cache.expiry.*;
 import javax.cache.integration.*;
 import java.io.*;
@@ -1790,4 +1793,34 @@ public class GridCacheUtils {
 
         return res;
     }
+
+    /**
+     * Creates store session listeners.
+     *
+     * @param ctx Kernal context.
+     * @param factories Factories.
+     * @return Listeners.
+     */
+    public static Collection<CacheStoreSessionListener> createStoreSessionListeners(GridKernalContext ctx,
+        Factory<CacheStoreSessionListener>[] factories) throws IgniteCheckedException {
+        if (factories == null)
+            return null;
+
+        Collection<CacheStoreSessionListener> lsnrs = new ArrayList<>(factories.length);
+
+        for (Factory<CacheStoreSessionListener> factory : factories) {
+            CacheStoreSessionListener lsnr = factory.create();
+
+            if (lsnr != null) {
+                ctx.resource().injectGeneric(lsnr);
+
+                if (lsnr instanceof LifecycleAware)
+                    ((LifecycleAware)lsnr).start();
+
+                lsnrs.add(lsnr);
+            }
+        }
+
+        return lsnrs;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b37d0046/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
index 79ac86d..8096291 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
@@ -35,7 +35,6 @@ import org.apache.ignite.transactions.*;
 import org.jetbrains.annotations.*;
 
 import javax.cache.*;
-import javax.cache.configuration.*;
 import javax.cache.integration.*;
 import java.util.*;
 
@@ -167,39 +166,10 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
                 "Persistence store is configured, but both read-through and write-through are disabled.");
         }
 
-        sesLsnrs = createSessionListeners(cfg.getCacheStoreSessionListenerFactories());
+        sesLsnrs = CU.createStoreSessionListeners(cctx.kernalContext(), cfg.getCacheStoreSessionListenerFactories());
 
         if (sesLsnrs == null)
-            sesLsnrs = createSessionListeners(cctx.kernalContext().config().getCacheStoreSessionListenerFactories());
-    }
-
-    /**
-     * Creates session listeners.
-     *
-     * @param factories Factories.
-     * @return Listeners.
-     */
-    private Collection<CacheStoreSessionListener> createSessionListeners(Factory<CacheStoreSessionListener>[] factories)
-        throws IgniteCheckedException {
-        if (factories == null)
-            return null;
-
-        Collection<CacheStoreSessionListener> lsnrs = new ArrayList<>(factories.length);
-
-        for (Factory<CacheStoreSessionListener> factory : factories) {
-            CacheStoreSessionListener lsnr = factory.create();
-
-            if (lsnr != null) {
-                cctx.kernalContext().resource().injectGeneric(lsnr);
-
-                if (lsnr instanceof LifecycleAware)
-                    ((LifecycleAware)lsnr).start();
-
-                lsnrs.add(lsnr);
-            }
-        }
-
-        return lsnrs;
+            sesLsnrs = cctx.shared().storeSessionListeners();
     }
 
     /** {@inheritDoc} */
@@ -754,6 +724,11 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
             if (!sesHolder.get().storeEnded(store))
                 store.sessionEnd(commit);
         }
+        catch (Throwable e) {
+            last = true;
+
+            throw e;
+        }
         finally {
             if (last && sesHolder != null) {
                 sesHolder.set(null);
@@ -834,8 +809,9 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
                         lsnr.onSessionEnd(locSes, !threwEx);
                 }
 
-                if (!sesHolder.get().storeEnded(store))
-                    store.sessionEnd(!threwEx);
+                assert !sesHolder.get().storeEnded(store);
+
+                store.sessionEnd(!threwEx);
             }
         }
         catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b37d0046/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index f6d5d90..adc1c86 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -408,9 +408,18 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
         if (!storeEnabled())
             return false;
 
-        Collection<CacheStoreManager> stores = stores();
+        Collection<Integer> cacheIds = activeCacheIds();
+
+        if (!cacheIds.isEmpty()) {
+            for (int cacheId : cacheIds) {
+                CacheStoreManager store = cctx.cacheContext(cacheId).store();
 
-        return stores != null && !stores.isEmpty();
+                if (store.configured())
+                    return true;
+            }
+        }
+
+        return false;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b37d0046/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 1bed2da..fdaef47 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -18,10 +18,10 @@
 package org.apache.ignite.internal.processors.cache.transactions;
 
 import org.apache.ignite.*;
-import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.cluster.*;
 import org.apache.ignite.internal.managers.communication.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.distributed.near.*;
 import org.apache.ignite.internal.processors.cache.dr.*;
@@ -1010,24 +1010,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                 cctx.tm().resetContext();
             }
         }
-        else if (!internal()) {
-            Collection<CacheStoreManager> stores = stores();
-
-            if (stores != null && !stores.isEmpty()) {
-                try {
-                    sessionEnd(stores, true);
-                }
-                catch (IgniteCheckedException e) {
-                    commitError(e);
-
-                    setRollbackOnly();
-
-                    cctx.tm().removeCommittedTx(this);
-
-                    throw e;
-                }
-            }
-        }
 
         // Do not unlock transaction entries if one-phase commit.
         if (!onePhaseCommit()) {
@@ -1119,7 +1101,12 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                 if (!internal()) {
                     Collection<CacheStoreManager> stores = stores();
 
-                    if (stores != null && !stores.isEmpty() && (near() || F.first(stores).isWriteToStoreFromDht()))
+                    assert isWriteToStoreFromDhtValid(stores) :
+                        "isWriteToStoreFromDht can't be different within one transaction";
+
+                    boolean isWriteToStoreFromDht = F.first(stores).isWriteToStoreFromDht();
+
+                    if (stores != null && !stores.isEmpty() && (near() || isWriteToStoreFromDht))
                         sessionEnd(stores, false);
                 }
             }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b37d0046/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
index 0b0f099..1c85ed3 100644
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
+++ b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
@@ -54,7 +54,8 @@ public class GridCacheTestContext<K, V> extends GridCacheContext<K, V> {
                 new GridCacheMvccManager(),
                 new GridCacheDeploymentManager<K, V>(),
                 new GridCachePartitionExchangeManager<K, V>(),
-                new GridCacheIoManager()
+                new GridCacheIoManager(),
+                null
             ),
             defaultCacheConfiguration(),
             CacheType.USER,

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b37d0046/modules/hibernate/src/main/java/org/apache/ignite/cache/store/hibernate/CacheStoreSessionHibernateListener.java
----------------------------------------------------------------------
diff --git a/modules/hibernate/src/main/java/org/apache/ignite/cache/store/hibernate/CacheStoreSessionHibernateListener.java b/modules/hibernate/src/main/java/org/apache/ignite/cache/store/hibernate/CacheStoreSessionHibernateListener.java
index fc9eb91..fe0960e 100644
--- a/modules/hibernate/src/main/java/org/apache/ignite/cache/store/hibernate/CacheStoreSessionHibernateListener.java
+++ b/modules/hibernate/src/main/java/org/apache/ignite/cache/store/hibernate/CacheStoreSessionHibernateListener.java
@@ -17,23 +17,40 @@
 
 package org.apache.ignite.cache.store.hibernate;
 
+import org.apache.ignite.*;
 import org.apache.ignite.cache.store.*;
+import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lifecycle.*;
+import org.apache.ignite.resources.*;
 import org.hibernate.*;
+import org.hibernate.cfg.*;
 
 import javax.cache.integration.*;
+import java.io.*;
+import java.net.*;
 import java.util.*;
 
 /**
  * Cache store session listener based on Hibernate session.
  */
-public class CacheStoreSessionHibernateListener implements CacheStoreSessionListener {
+public class CacheStoreSessionHibernateListener implements CacheStoreSessionListener, LifecycleAware {
     /** Session key for JDBC connection. */
     public static final String HIBERNATE_SES_KEY = "__hibernate_ses_";
 
     /** Hibernate session factory. */
     private SessionFactory sesFactory;
 
+    /** Hibernate configuration file path. */
+    private String hibernateCfgPath;
+
+    /** Logger. */
+    @LoggerResource
+    private IgniteLogger log;
+
+    /** Whether to close session on stop. */
+    private boolean closeSesOnStop;
+
     /**
      * Sets Hibernate session factory.
      *
@@ -54,6 +71,69 @@ public class CacheStoreSessionHibernateListener implements CacheStoreSessionList
         return sesFactory;
     }
 
+    /**
+     * Sets hibernate configuration path.
+     *
+     * @param hibernateCfgPath Hibernate configuration path.
+     */
+    public void setHibernateConfigurationPath(String hibernateCfgPath) {
+        this.hibernateCfgPath = hibernateCfgPath;
+    }
+
+    /**
+     * Gets hibernate configuration path.
+     *
+     * @return Hibernate configuration path.
+     */
+    public String getHibernateConfigurationPath() {
+        return hibernateCfgPath;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("deprecation")
+    @Override public void start() throws IgniteException {
+        if (sesFactory == null && F.isEmpty(hibernateCfgPath))
+            throw new IgniteException("Either session factory or Hibernate configuration file is required by " +
+                getClass().getSimpleName() + '.');
+
+        if (!F.isEmpty(hibernateCfgPath)) {
+            if (sesFactory == null) {
+                try {
+                    URL url = new URL(hibernateCfgPath);
+
+                    sesFactory = new Configuration().configure(url).buildSessionFactory();
+                }
+                catch (MalformedURLException ignored) {
+                    // No-op.
+                }
+
+                if (sesFactory == null) {
+                    File cfgFile = new File(hibernateCfgPath);
+
+                    if (cfgFile.exists())
+                        sesFactory = new Configuration().configure(cfgFile).buildSessionFactory();
+                }
+
+                if (sesFactory == null)
+                    sesFactory = new Configuration().configure(hibernateCfgPath).buildSessionFactory();
+
+                if (sesFactory == null)
+                    throw new IgniteException("Failed to resolve Hibernate configuration file: " + hibernateCfgPath);
+
+                closeSesOnStop = true;
+            }
+            else
+                U.warn(log, "Hibernate configuration file configured in " + getClass().getSimpleName() +
+                    " will be ignored (session factory is already set).");
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void stop() throws IgniteException {
+        if (closeSesOnStop && sesFactory != null && !sesFactory.isClosed())
+            sesFactory.close();
+    }
+
     /** {@inheritDoc} */
     @Override public void onSessionStart(CacheStoreSession ses) {
         Map<String, Session> props = ses.properties();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b37d0046/modules/spring/pom.xml
----------------------------------------------------------------------
diff --git a/modules/spring/pom.xml b/modules/spring/pom.xml
index 2633c83..f49a23d 100644
--- a/modules/spring/pom.xml
+++ b/modules/spring/pom.xml
@@ -77,6 +77,12 @@
         </dependency>
 
         <dependency>
+            <groupId>org.springframework</groupId>
+            <artifactId>spring-jdbc</artifactId>
+            <version>${spring.version}</version>
+        </dependency>
+
+        <dependency>
             <groupId>commons-logging</groupId>
             <artifactId>commons-logging</artifactId>
             <version>1.1.1</version>
@@ -104,12 +110,6 @@
             <scope>test</scope>
         </dependency>
 
-        <dependency>
-            <groupId>org.springframework</groupId>
-            <artifactId>spring-jdbc</artifactId>
-            <version>${spring.version}</version>
-            <scope>test</scope>
-        </dependency>
 
         <dependency>
             <groupId>com.h2database</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b37d0046/modules/spring/src/main/java/org/apache/ignite/cache/store/spring/CacheStoreSessionSpringListener.java
----------------------------------------------------------------------
diff --git a/modules/spring/src/main/java/org/apache/ignite/cache/store/spring/CacheStoreSessionSpringListener.java b/modules/spring/src/main/java/org/apache/ignite/cache/store/spring/CacheStoreSessionSpringListener.java
index e0caad5..2fab4f0 100644
--- a/modules/spring/src/main/java/org/apache/ignite/cache/store/spring/CacheStoreSessionSpringListener.java
+++ b/modules/spring/src/main/java/org/apache/ignite/cache/store/spring/CacheStoreSessionSpringListener.java
@@ -17,32 +17,45 @@
 
 package org.apache.ignite.cache.store.spring;
 
+import org.apache.ignite.*;
 import org.apache.ignite.cache.store.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lifecycle.*;
+import org.apache.ignite.resources.*;
 import org.apache.ignite.transactions.*;
+import org.springframework.jdbc.datasource.*;
 import org.springframework.transaction.*;
 import org.springframework.transaction.support.*;
 
 import javax.cache.integration.*;
+import javax.sql.*;
 
 /**
  * Cache store session listener based on Spring cache manager.
  */
-public class CacheStoreSessionSpringListener implements CacheStoreSessionListener {
+public class CacheStoreSessionSpringListener implements CacheStoreSessionListener, LifecycleAware {
     /** Session key for transaction status. */
     public static final String TX_STATUS_KEY = "__spring_tx_status_";
 
     /** Transaction manager. */
     private PlatformTransactionManager txMgr;
 
+    /** Data source. */
+    private DataSource dataSrc;
+
+    /** Propagation behavior. */
+    private int propagation = TransactionDefinition.PROPAGATION_REQUIRED;
+
+    /** Logger. */
+    @LoggerResource
+    private IgniteLogger log;
+
     /**
      * Sets transaction manager.
      *
      * @param txMgr Transaction manager.
      */
     public void setTransactionManager(PlatformTransactionManager txMgr) {
-        A.notNull(txMgr, "txMgr");
-
         this.txMgr = txMgr;
     }
 
@@ -55,11 +68,71 @@ public class CacheStoreSessionSpringListener implements CacheStoreSessionListene
         return txMgr;
     }
 
+    /**
+     * Sets data source.
+     *
+     * @param dataSrc Data source.
+     */
+    public void setDataSource(DataSource dataSrc) {
+        this.dataSrc = dataSrc;
+    }
+
+    /**
+     * Gets data source.
+     *
+     * @return Data source.
+     */
+    public DataSource getDataSource() {
+        return dataSrc;
+    }
+
+    /**
+     * Sets propagation behavior.
+     *
+     * @param propagation Propagation behavior.
+     */
+    public void setPropagationBehavior(int propagation) {
+        this.propagation = propagation;
+    }
+
+    /**
+     * Gets propagation behavior.
+     *
+     * @return Propagation behavior.
+     */
+    public int getPropagationBehavior() {
+        return propagation;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void start() throws IgniteException {
+        if (txMgr == null && dataSrc == null)
+            throw new IgniteException("Either transaction manager or data source is required by " +
+                getClass().getSimpleName() + '.');
+
+        if (dataSrc != null) {
+            if (txMgr == null)
+                txMgr = new DataSourceTransactionManager(dataSrc);
+            else
+                U.warn(log, "Data source configured in " + getClass().getSimpleName() +
+                    " will be ignored (transaction manager is already set).");
+        }
+
+        assert txMgr != null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void stop() throws IgniteException {
+        // No-op.
+    }
+
     /** {@inheritDoc} */
     @Override public void onSessionStart(CacheStoreSession ses) {
         if (ses.isWithinTransaction()) {
             try {
-                ses.properties().put(TX_STATUS_KEY, txMgr.getTransaction(definition(ses.transaction())));
+                TransactionDefinition def = definition(ses.transaction(), ses.cacheName());
+
+                ses.properties().put(TX_STATUS_KEY, txMgr.getTransaction(def));
             }
             catch (TransactionException e) {
                 throw new CacheWriterException("Failed to start store session [tx=" + ses.transaction() + ']', e);
@@ -91,12 +164,19 @@ public class CacheStoreSessionSpringListener implements CacheStoreSessionListene
      *
      * @return DB transaction isolation.
      */
-    private TransactionDefinition definition(Transaction tx) {
+    private TransactionDefinition definition(Transaction tx, String cacheName) {
         assert tx != null;
 
         DefaultTransactionDefinition def = new DefaultTransactionDefinition();
 
+        def.setName("Ignite Tx [cache=" + (cacheName != null ? cacheName : "<default>") + ", id=" + tx.xid() + ']');
         def.setIsolationLevel(isolationLevel(tx.isolation()));
+        def.setPropagationBehavior(propagation);
+
+        long timeoutSec = (tx.timeout() + 500) / 1000;
+
+        if (timeoutSec > 0 && timeoutSec < Integer.MAX_VALUE)
+            def.setTimeout((int)timeoutSec);
 
         return def;
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b37d0046/modules/spring/src/test/java/org/apache/ignite/cache/store/spring/CacheStoreSessionSpringListenerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/spring/src/test/java/org/apache/ignite/cache/store/spring/CacheStoreSessionSpringListenerSelfTest.java b/modules/spring/src/test/java/org/apache/ignite/cache/store/spring/CacheStoreSessionSpringListenerSelfTest.java
index 79d5b5e..83ed249 100644
--- a/modules/spring/src/test/java/org/apache/ignite/cache/store/spring/CacheStoreSessionSpringListenerSelfTest.java
+++ b/modules/spring/src/test/java/org/apache/ignite/cache/store/spring/CacheStoreSessionSpringListenerSelfTest.java
@@ -54,7 +54,7 @@ public class CacheStoreSessionSpringListenerSelfTest extends CacheStoreSessionLi
             @Override public CacheStoreSessionListener create() {
                 CacheStoreSessionSpringListener lsnr = new CacheStoreSessionSpringListener();
 
-                lsnr.setTransactionManager(new DataSourceTransactionManager(DATA_SRC));
+                lsnr.setDataSource(DATA_SRC);
 
                 return lsnr;
             }