You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vk...@apache.org on 2015/06/02 05:51:11 UTC
[17/34] incubator-ignite git commit: IGNITE-891 - Cache store
improvements
IGNITE-891 - Cache store improvements
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/b37d0046
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/b37d0046
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/b37d0046
Branch: refs/heads/ignite-sprint-5
Commit: b37d00465f1a162436082660ec69a1f765492373
Parents: 80ebfe0
Author: Valentin Kulichenko <vk...@gridgain.com>
Authored: Thu May 21 18:52:15 2015 -0700
Committer: Valentin Kulichenko <vk...@gridgain.com>
Committed: Thu May 21 18:52:15 2015 -0700
----------------------------------------------------------------------
.../jdbc/CacheStoreSessionJdbcListener.java | 16 +++-
.../processors/cache/GridCacheProcessor.java | 10 ++-
.../cache/GridCacheSharedContext.java | 15 +++-
.../processors/cache/GridCacheUtils.java | 33 +++++++
.../store/GridCacheStoreManagerAdapter.java | 44 +++-------
.../cache/transactions/IgniteTxAdapter.java | 13 ++-
.../transactions/IgniteTxLocalAdapter.java | 27 ++----
.../loadtests/hashmap/GridCacheTestContext.java | 3 +-
.../CacheStoreSessionHibernateListener.java | 82 +++++++++++++++++-
modules/spring/pom.xml | 12 +--
.../spring/CacheStoreSessionSpringListener.java | 90 ++++++++++++++++++--
...CacheStoreSessionSpringListenerSelfTest.java | 2 +-
12 files changed, 271 insertions(+), 76 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b37d0046/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheStoreSessionJdbcListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheStoreSessionJdbcListener.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheStoreSessionJdbcListener.java
index c683abe..e4cd617 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheStoreSessionJdbcListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheStoreSessionJdbcListener.java
@@ -17,9 +17,10 @@
package org.apache.ignite.cache.store.jdbc;
+import org.apache.ignite.*;
import org.apache.ignite.cache.store.*;
import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.resources.*;
+import org.apache.ignite.lifecycle.*;
import javax.cache.integration.*;
import javax.sql.*;
@@ -29,7 +30,7 @@ import java.util.*;
/**
* Cache store session listener based on JDBC connection.
*/
-public class CacheStoreSessionJdbcListener implements CacheStoreSessionListener {
+public class CacheStoreSessionJdbcListener implements CacheStoreSessionListener, LifecycleAware {
/** Session key for JDBC connection. */
public static final String JDBC_CONN_KEY = "__jdbc_conn_";
@@ -57,6 +58,17 @@ public class CacheStoreSessionJdbcListener implements CacheStoreSessionListener
}
/** {@inheritDoc} */
+ @Override public void start() throws IgniteException {
+ if (dataSrc == null)
+ throw new IgniteException("Data source is required by " + getClass().getSimpleName() + '.');
+ }
+
+ /** {@inheritDoc} */
+ @Override public void stop() throws IgniteException {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
@Override public void onSessionStart(CacheStoreSession ses) {
Map<String, Connection> props = ses.properties();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b37d0046/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 0e1a9c2..5b57817 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -567,7 +567,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
CacheConfiguration[] cfgs = ctx.config().getCacheConfiguration();
- sharedCtx = createSharedContext(ctx);
+ sharedCtx = createSharedContext(ctx, CU.createStoreSessionListeners(ctx,
+ ctx.config().getCacheStoreSessionListenerFactories()));
ctx.performance().add("Disable serializable transactions (set 'txSerializableEnabled' to false)",
!ctx.config().getTransactionConfiguration().isTxSerializableEnabled());
@@ -1562,10 +1563,12 @@ public class GridCacheProcessor extends GridProcessorAdapter {
* Creates shared context.
*
* @param kernalCtx Kernal context.
+ * @param storeSesLsnrs Store session listeners.
* @return Shared context.
*/
@SuppressWarnings("unchecked")
- private GridCacheSharedContext createSharedContext(GridKernalContext kernalCtx) {
+ private GridCacheSharedContext createSharedContext(GridKernalContext kernalCtx,
+ Collection<CacheStoreSessionListener> storeSesLsnrs) {
IgniteTxManager tm = new IgniteTxManager();
GridCacheMvccManager mvccMgr = new GridCacheMvccManager();
GridCacheVersionManager verMgr = new GridCacheVersionManager();
@@ -1580,7 +1583,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
mvccMgr,
depMgr,
exchMgr,
- ioMgr
+ ioMgr,
+ storeSesLsnrs
);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b37d0046/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
index b16885e..45634b4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.cache;
import org.apache.ignite.*;
+import org.apache.ignite.cache.store.*;
import org.apache.ignite.cluster.*;
import org.apache.ignite.configuration.*;
import org.apache.ignite.internal.*;
@@ -77,6 +78,9 @@ public class GridCacheSharedContext<K, V> {
/** Preloaders start future. */
private IgniteInternalFuture<Object> preloadersStartFut;
+ /** Store session listeners. */
+ private Collection<CacheStoreSessionListener> storeSesLsnrs;
+
/**
* @param txMgr Transaction manager.
* @param verMgr Version manager.
@@ -89,7 +93,8 @@ public class GridCacheSharedContext<K, V> {
GridCacheMvccManager mvccMgr,
GridCacheDeploymentManager<K, V> depMgr,
GridCachePartitionExchangeManager<K, V> exchMgr,
- GridCacheIoManager ioMgr
+ GridCacheIoManager ioMgr,
+ Collection<CacheStoreSessionListener> storeSesLsnrs
) {
this.kernalCtx = kernalCtx;
this.mvccMgr = add(mvccMgr);
@@ -98,6 +103,7 @@ public class GridCacheSharedContext<K, V> {
this.depMgr = add(depMgr);
this.exchMgr = add(exchMgr);
this.ioMgr = add(ioMgr);
+ this.storeSesLsnrs = storeSesLsnrs;
txMetrics = new TransactionMetricsAdapter();
@@ -524,6 +530,13 @@ public class GridCacheSharedContext<K, V> {
}
/**
+ * @return Store session listeners.
+ */
+ @Nullable public Collection<CacheStoreSessionListener> storeSessionListeners() {
+ return storeSesLsnrs;
+ }
+
+ /**
* @param mgr Manager to add.
* @return Added manager.
*/
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b37d0046/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index 549f42f..6968fcb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache;
import org.apache.ignite.*;
import org.apache.ignite.cache.*;
import org.apache.ignite.cache.affinity.*;
+import org.apache.ignite.cache.store.*;
import org.apache.ignite.cluster.*;
import org.apache.ignite.configuration.*;
import org.apache.ignite.internal.*;
@@ -34,12 +35,14 @@ import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.lang.*;
+import org.apache.ignite.lifecycle.*;
import org.apache.ignite.plugin.*;
import org.apache.ignite.transactions.*;
import org.jetbrains.annotations.*;
import org.jsr166.*;
import javax.cache.*;
+import javax.cache.configuration.*;
import javax.cache.expiry.*;
import javax.cache.integration.*;
import java.io.*;
@@ -1790,4 +1793,34 @@ public class GridCacheUtils {
return res;
}
+
+ /**
+ * Creates store session listeners.
+ *
+ * @param ctx Kernal context.
+ * @param factories Factories.
+ * @return Listeners.
+ */
+ public static Collection<CacheStoreSessionListener> createStoreSessionListeners(GridKernalContext ctx,
+ Factory<CacheStoreSessionListener>[] factories) throws IgniteCheckedException {
+ if (factories == null)
+ return null;
+
+ Collection<CacheStoreSessionListener> lsnrs = new ArrayList<>(factories.length);
+
+ for (Factory<CacheStoreSessionListener> factory : factories) {
+ CacheStoreSessionListener lsnr = factory.create();
+
+ if (lsnr != null) {
+ ctx.resource().injectGeneric(lsnr);
+
+ if (lsnr instanceof LifecycleAware)
+ ((LifecycleAware)lsnr).start();
+
+ lsnrs.add(lsnr);
+ }
+ }
+
+ return lsnrs;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b37d0046/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
index 79ac86d..8096291 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
@@ -35,7 +35,6 @@ import org.apache.ignite.transactions.*;
import org.jetbrains.annotations.*;
import javax.cache.*;
-import javax.cache.configuration.*;
import javax.cache.integration.*;
import java.util.*;
@@ -167,39 +166,10 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
"Persistence store is configured, but both read-through and write-through are disabled.");
}
- sesLsnrs = createSessionListeners(cfg.getCacheStoreSessionListenerFactories());
+ sesLsnrs = CU.createStoreSessionListeners(cctx.kernalContext(), cfg.getCacheStoreSessionListenerFactories());
if (sesLsnrs == null)
- sesLsnrs = createSessionListeners(cctx.kernalContext().config().getCacheStoreSessionListenerFactories());
- }
-
- /**
- * Creates session listeners.
- *
- * @param factories Factories.
- * @return Listeners.
- */
- private Collection<CacheStoreSessionListener> createSessionListeners(Factory<CacheStoreSessionListener>[] factories)
- throws IgniteCheckedException {
- if (factories == null)
- return null;
-
- Collection<CacheStoreSessionListener> lsnrs = new ArrayList<>(factories.length);
-
- for (Factory<CacheStoreSessionListener> factory : factories) {
- CacheStoreSessionListener lsnr = factory.create();
-
- if (lsnr != null) {
- cctx.kernalContext().resource().injectGeneric(lsnr);
-
- if (lsnr instanceof LifecycleAware)
- ((LifecycleAware)lsnr).start();
-
- lsnrs.add(lsnr);
- }
- }
-
- return lsnrs;
+ sesLsnrs = cctx.shared().storeSessionListeners();
}
/** {@inheritDoc} */
@@ -754,6 +724,11 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
if (!sesHolder.get().storeEnded(store))
store.sessionEnd(commit);
}
+ catch (Throwable e) {
+ last = true;
+
+ throw e;
+ }
finally {
if (last && sesHolder != null) {
sesHolder.set(null);
@@ -834,8 +809,9 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
lsnr.onSessionEnd(locSes, !threwEx);
}
- if (!sesHolder.get().storeEnded(store))
- store.sessionEnd(!threwEx);
+ assert !sesHolder.get().storeEnded(store);
+
+ store.sessionEnd(!threwEx);
}
}
catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b37d0046/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index f6d5d90..adc1c86 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -408,9 +408,18 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
if (!storeEnabled())
return false;
- Collection<CacheStoreManager> stores = stores();
+ Collection<Integer> cacheIds = activeCacheIds();
+
+ if (!cacheIds.isEmpty()) {
+ for (int cacheId : cacheIds) {
+ CacheStoreManager store = cctx.cacheContext(cacheId).store();
- return stores != null && !stores.isEmpty();
+ if (store.configured())
+ return true;
+ }
+ }
+
+ return false;
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b37d0046/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 1bed2da..fdaef47 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -18,10 +18,10 @@
package org.apache.ignite.internal.processors.cache.transactions;
import org.apache.ignite.*;
-import org.apache.ignite.internal.processors.affinity.*;
import org.apache.ignite.internal.*;
import org.apache.ignite.internal.cluster.*;
import org.apache.ignite.internal.managers.communication.*;
+import org.apache.ignite.internal.processors.affinity.*;
import org.apache.ignite.internal.processors.cache.*;
import org.apache.ignite.internal.processors.cache.distributed.near.*;
import org.apache.ignite.internal.processors.cache.dr.*;
@@ -1010,24 +1010,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
cctx.tm().resetContext();
}
}
- else if (!internal()) {
- Collection<CacheStoreManager> stores = stores();
-
- if (stores != null && !stores.isEmpty()) {
- try {
- sessionEnd(stores, true);
- }
- catch (IgniteCheckedException e) {
- commitError(e);
-
- setRollbackOnly();
-
- cctx.tm().removeCommittedTx(this);
-
- throw e;
- }
- }
- }
// Do not unlock transaction entries if one-phase commit.
if (!onePhaseCommit()) {
@@ -1119,7 +1101,12 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
if (!internal()) {
Collection<CacheStoreManager> stores = stores();
- if (stores != null && !stores.isEmpty() && (near() || F.first(stores).isWriteToStoreFromDht()))
+ assert isWriteToStoreFromDhtValid(stores) :
+ "isWriteToStoreFromDht can't be different within one transaction";
+
+ boolean isWriteToStoreFromDht = F.first(stores).isWriteToStoreFromDht();
+
+ if (stores != null && !stores.isEmpty() && (near() || isWriteToStoreFromDht))
sessionEnd(stores, false);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b37d0046/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
index 0b0f099..1c85ed3 100644
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
+++ b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
@@ -54,7 +54,8 @@ public class GridCacheTestContext<K, V> extends GridCacheContext<K, V> {
new GridCacheMvccManager(),
new GridCacheDeploymentManager<K, V>(),
new GridCachePartitionExchangeManager<K, V>(),
- new GridCacheIoManager()
+ new GridCacheIoManager(),
+ null
),
defaultCacheConfiguration(),
CacheType.USER,
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b37d0046/modules/hibernate/src/main/java/org/apache/ignite/cache/store/hibernate/CacheStoreSessionHibernateListener.java
----------------------------------------------------------------------
diff --git a/modules/hibernate/src/main/java/org/apache/ignite/cache/store/hibernate/CacheStoreSessionHibernateListener.java b/modules/hibernate/src/main/java/org/apache/ignite/cache/store/hibernate/CacheStoreSessionHibernateListener.java
index fc9eb91..fe0960e 100644
--- a/modules/hibernate/src/main/java/org/apache/ignite/cache/store/hibernate/CacheStoreSessionHibernateListener.java
+++ b/modules/hibernate/src/main/java/org/apache/ignite/cache/store/hibernate/CacheStoreSessionHibernateListener.java
@@ -17,23 +17,40 @@
package org.apache.ignite.cache.store.hibernate;
+import org.apache.ignite.*;
import org.apache.ignite.cache.store.*;
+import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lifecycle.*;
+import org.apache.ignite.resources.*;
import org.hibernate.*;
+import org.hibernate.cfg.*;
import javax.cache.integration.*;
+import java.io.*;
+import java.net.*;
import java.util.*;
/**
* Cache store session listener based on Hibernate session.
*/
-public class CacheStoreSessionHibernateListener implements CacheStoreSessionListener {
+public class CacheStoreSessionHibernateListener implements CacheStoreSessionListener, LifecycleAware {
/** Session key for JDBC connection. */
public static final String HIBERNATE_SES_KEY = "__hibernate_ses_";
/** Hibernate session factory. */
private SessionFactory sesFactory;
+ /** Hibernate configuration file path. */
+ private String hibernateCfgPath;
+
+ /** Logger. */
+ @LoggerResource
+ private IgniteLogger log;
+
+ /** Whether to close session on stop. */
+ private boolean closeSesOnStop;
+
/**
* Sets Hibernate session factory.
*
@@ -54,6 +71,69 @@ public class CacheStoreSessionHibernateListener implements CacheStoreSessionList
return sesFactory;
}
+ /**
+ * Sets hibernate configuration path.
+ *
+ * @param hibernateCfgPath Hibernate configuration path.
+ */
+ public void setHibernateConfigurationPath(String hibernateCfgPath) {
+ this.hibernateCfgPath = hibernateCfgPath;
+ }
+
+ /**
+ * Gets hibernate configuration path.
+ *
+ * @return Hibernate configuration path.
+ */
+ public String getHibernateConfigurationPath() {
+ return hibernateCfgPath;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("deprecation")
+ @Override public void start() throws IgniteException {
+ if (sesFactory == null && F.isEmpty(hibernateCfgPath))
+ throw new IgniteException("Either session factory or Hibernate configuration file is required by " +
+ getClass().getSimpleName() + '.');
+
+ if (!F.isEmpty(hibernateCfgPath)) {
+ if (sesFactory == null) {
+ try {
+ URL url = new URL(hibernateCfgPath);
+
+ sesFactory = new Configuration().configure(url).buildSessionFactory();
+ }
+ catch (MalformedURLException ignored) {
+ // No-op.
+ }
+
+ if (sesFactory == null) {
+ File cfgFile = new File(hibernateCfgPath);
+
+ if (cfgFile.exists())
+ sesFactory = new Configuration().configure(cfgFile).buildSessionFactory();
+ }
+
+ if (sesFactory == null)
+ sesFactory = new Configuration().configure(hibernateCfgPath).buildSessionFactory();
+
+ if (sesFactory == null)
+ throw new IgniteException("Failed to resolve Hibernate configuration file: " + hibernateCfgPath);
+
+ closeSesOnStop = true;
+ }
+ else
+ U.warn(log, "Hibernate configuration file configured in " + getClass().getSimpleName() +
+ " will be ignored (session factory is already set).");
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void stop() throws IgniteException {
+ if (closeSesOnStop && sesFactory != null && !sesFactory.isClosed())
+ sesFactory.close();
+ }
+
/** {@inheritDoc} */
@Override public void onSessionStart(CacheStoreSession ses) {
Map<String, Session> props = ses.properties();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b37d0046/modules/spring/pom.xml
----------------------------------------------------------------------
diff --git a/modules/spring/pom.xml b/modules/spring/pom.xml
index 2633c83..f49a23d 100644
--- a/modules/spring/pom.xml
+++ b/modules/spring/pom.xml
@@ -77,6 +77,12 @@
</dependency>
<dependency>
+ <groupId>org.springframework</groupId>
+ <artifactId>spring-jdbc</artifactId>
+ <version>${spring.version}</version>
+ </dependency>
+
+ <dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<version>1.1.1</version>
@@ -104,12 +110,6 @@
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>org.springframework</groupId>
- <artifactId>spring-jdbc</artifactId>
- <version>${spring.version}</version>
- <scope>test</scope>
- </dependency>
<dependency>
<groupId>com.h2database</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b37d0046/modules/spring/src/main/java/org/apache/ignite/cache/store/spring/CacheStoreSessionSpringListener.java
----------------------------------------------------------------------
diff --git a/modules/spring/src/main/java/org/apache/ignite/cache/store/spring/CacheStoreSessionSpringListener.java b/modules/spring/src/main/java/org/apache/ignite/cache/store/spring/CacheStoreSessionSpringListener.java
index e0caad5..2fab4f0 100644
--- a/modules/spring/src/main/java/org/apache/ignite/cache/store/spring/CacheStoreSessionSpringListener.java
+++ b/modules/spring/src/main/java/org/apache/ignite/cache/store/spring/CacheStoreSessionSpringListener.java
@@ -17,32 +17,45 @@
package org.apache.ignite.cache.store.spring;
+import org.apache.ignite.*;
import org.apache.ignite.cache.store.*;
import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lifecycle.*;
+import org.apache.ignite.resources.*;
import org.apache.ignite.transactions.*;
+import org.springframework.jdbc.datasource.*;
import org.springframework.transaction.*;
import org.springframework.transaction.support.*;
import javax.cache.integration.*;
+import javax.sql.*;
/**
* Cache store session listener based on Spring cache manager.
*/
-public class CacheStoreSessionSpringListener implements CacheStoreSessionListener {
+public class CacheStoreSessionSpringListener implements CacheStoreSessionListener, LifecycleAware {
/** Session key for transaction status. */
public static final String TX_STATUS_KEY = "__spring_tx_status_";
/** Transaction manager. */
private PlatformTransactionManager txMgr;
+ /** Data source. */
+ private DataSource dataSrc;
+
+ /** Propagation behavior. */
+ private int propagation = TransactionDefinition.PROPAGATION_REQUIRED;
+
+ /** Logger. */
+ @LoggerResource
+ private IgniteLogger log;
+
/**
* Sets transaction manager.
*
* @param txMgr Transaction manager.
*/
public void setTransactionManager(PlatformTransactionManager txMgr) {
- A.notNull(txMgr, "txMgr");
-
this.txMgr = txMgr;
}
@@ -55,11 +68,71 @@ public class CacheStoreSessionSpringListener implements CacheStoreSessionListene
return txMgr;
}
+ /**
+ * Sets data source.
+ *
+ * @param dataSrc Data source.
+ */
+ public void setDataSource(DataSource dataSrc) {
+ this.dataSrc = dataSrc;
+ }
+
+ /**
+ * Gets data source.
+ *
+ * @return Data source.
+ */
+ public DataSource getDataSource() {
+ return dataSrc;
+ }
+
+ /**
+ * Sets propagation behavior.
+ *
+ * @param propagation Propagation behavior.
+ */
+ public void setPropagationBehavior(int propagation) {
+ this.propagation = propagation;
+ }
+
+ /**
+ * Gets propagation behavior.
+ *
+ * @return Propagation behavior.
+ */
+ public int getPropagationBehavior() {
+ return propagation;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void start() throws IgniteException {
+ if (txMgr == null && dataSrc == null)
+ throw new IgniteException("Either transaction manager or data source is required by " +
+ getClass().getSimpleName() + '.');
+
+ if (dataSrc != null) {
+ if (txMgr == null)
+ txMgr = new DataSourceTransactionManager(dataSrc);
+ else
+ U.warn(log, "Data source configured in " + getClass().getSimpleName() +
+ " will be ignored (transaction manager is already set).");
+ }
+
+ assert txMgr != null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void stop() throws IgniteException {
+ // No-op.
+ }
+
/** {@inheritDoc} */
@Override public void onSessionStart(CacheStoreSession ses) {
if (ses.isWithinTransaction()) {
try {
- ses.properties().put(TX_STATUS_KEY, txMgr.getTransaction(definition(ses.transaction())));
+ TransactionDefinition def = definition(ses.transaction(), ses.cacheName());
+
+ ses.properties().put(TX_STATUS_KEY, txMgr.getTransaction(def));
}
catch (TransactionException e) {
throw new CacheWriterException("Failed to start store session [tx=" + ses.transaction() + ']', e);
@@ -91,12 +164,19 @@ public class CacheStoreSessionSpringListener implements CacheStoreSessionListene
*
* @return DB transaction isolation.
*/
- private TransactionDefinition definition(Transaction tx) {
+ private TransactionDefinition definition(Transaction tx, String cacheName) {
assert tx != null;
DefaultTransactionDefinition def = new DefaultTransactionDefinition();
+ def.setName("Ignite Tx [cache=" + (cacheName != null ? cacheName : "<default>") + ", id=" + tx.xid() + ']');
def.setIsolationLevel(isolationLevel(tx.isolation()));
+ def.setPropagationBehavior(propagation);
+
+ long timeoutSec = (tx.timeout() + 500) / 1000;
+
+ if (timeoutSec > 0 && timeoutSec < Integer.MAX_VALUE)
+ def.setTimeout((int)timeoutSec);
return def;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b37d0046/modules/spring/src/test/java/org/apache/ignite/cache/store/spring/CacheStoreSessionSpringListenerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/spring/src/test/java/org/apache/ignite/cache/store/spring/CacheStoreSessionSpringListenerSelfTest.java b/modules/spring/src/test/java/org/apache/ignite/cache/store/spring/CacheStoreSessionSpringListenerSelfTest.java
index 79d5b5e..83ed249 100644
--- a/modules/spring/src/test/java/org/apache/ignite/cache/store/spring/CacheStoreSessionSpringListenerSelfTest.java
+++ b/modules/spring/src/test/java/org/apache/ignite/cache/store/spring/CacheStoreSessionSpringListenerSelfTest.java
@@ -54,7 +54,7 @@ public class CacheStoreSessionSpringListenerSelfTest extends CacheStoreSessionLi
@Override public CacheStoreSessionListener create() {
CacheStoreSessionSpringListener lsnr = new CacheStoreSessionSpringListener();
- lsnr.setTransactionManager(new DataSourceTransactionManager(DATA_SRC));
+ lsnr.setDataSource(DATA_SRC);
return lsnr;
}