You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/02 12:37:34 UTC
[04/41] incubator-ignite git commit: IGNITE-891 - Cache store
improvements
IGNITE-891 - Cache store improvements
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/16f045f8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/16f045f8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/16f045f8
Branch: refs/heads/ignite-218
Commit: 16f045f8d5e8ca4950bc5b0ec55a83db2b7164d1
Parents: 79258ba
Author: Valentin Kulichenko <vk...@gridgain.com>
Authored: Mon May 18 17:18:07 2015 -0700
Committer: Valentin Kulichenko <vk...@gridgain.com>
Committed: Mon May 18 17:18:07 2015 -0700
----------------------------------------------------------------------
.../cache/store/CacheStoreManager.java | 2 +-
.../store/GridCacheStoreManagerAdapter.java | 25 +++-
.../transactions/IgniteTxLocalAdapter.java | 59 ++++++---
...cheStoreSessionListenerAbstractSelfTest.java | 111 ++++++++++++++++
.../CacheStoreSessionJdbcListenerSelfTest.java | 39 +++++-
.../IgniteCrossCacheTxStoreSelfTest.java | 131 +++++++++++++++----
...heStoreSessionHibernateListenerSelfTest.java | 6 +-
...CacheStoreSessionSpringListenerSelfTest.java | 27 +++-
8 files changed, 337 insertions(+), 63 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/16f045f8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheStoreManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheStoreManager.java
index d9f50ac..327b879 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheStoreManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheStoreManager.java
@@ -160,7 +160,7 @@ public interface CacheStoreManager<K, V> extends GridCacheManager<K, V> {
* @param commit Commit.
* @throws IgniteCheckedException If failed.
*/
- public void sessionEnd(IgniteInternalTx tx, boolean commit) throws IgniteCheckedException;
+ public void sessionEnd(IgniteInternalTx tx, boolean commit, boolean last) throws IgniteCheckedException;
/**
* End session initiated by write-behind store.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/16f045f8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
index a9ea2c0..aeca58f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
@@ -637,8 +637,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
}
/** {@inheritDoc} */
- @Override public boolean removeAll(@Nullable IgniteInternalTx tx, Collection keys)
- throws IgniteCheckedException {
+ @Override public boolean removeAll(@Nullable IgniteInternalTx tx, Collection keys) throws IgniteCheckedException {
if (F.isEmpty(keys))
return true;
@@ -700,7 +699,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
}
/** {@inheritDoc} */
- @Override public void sessionEnd(IgniteInternalTx tx, boolean commit) throws IgniteCheckedException {
+ @Override public void sessionEnd(IgniteInternalTx tx, boolean commit, boolean last) throws IgniteCheckedException {
assert store != null;
sessionInit0(tx);
@@ -711,10 +710,11 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
lsnr.onSessionEnd(locSes, commit);
}
- store.sessionEnd(commit);
+ if (!sesHolder.get().storeEnded(store))
+ store.sessionEnd(commit);
}
finally {
- if (sesHolder != null) {
+ if (last && sesHolder != null) {
sesHolder.set(null);
tx.removeMeta(SES_ATTR);
@@ -752,7 +752,6 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
*/
private void sessionInit0(@Nullable IgniteInternalTx tx) {
assert sesHolder != null;
- assert sesHolder.get() == null;
SessionData ses;
@@ -794,7 +793,8 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
lsnr.onSessionEnd(locSes, !threwEx);
}
- store.sessionEnd(!threwEx);
+ if (!sesHolder.get().storeEnded(store))
+ store.sessionEnd(!threwEx);
}
}
catch (Exception e) {
@@ -840,6 +840,9 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
/** */
private boolean started;
+ /** */
+ private final Set<CacheStore> endedStores = new GridSetWrapper<>(new IdentityHashMap<CacheStore, Object>());
+
/**
* @param tx Current transaction.
* @param cacheName Cache name.
@@ -893,6 +896,14 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
return started;
}
+ /**
+ * @param store Cache store.
+ * @return Whether session already ended on this store instance.
+ */
+ private boolean storeEnded(CacheStore store) {
+ return !endedStores.add(store);
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(SessionData.class, this, "tx", CU.txString(tx));
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/16f045f8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index fa64e12..854448d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -531,11 +531,13 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
}
// Batch-process puts if cache ID has changed.
- if (writeStore != null && writeStore != cacheCtx.store() && putMap != null && !putMap.isEmpty()) {
- writeStore.putAll(this, putMap);
+ if (writeStore != null && writeStore != cacheCtx.store()) {
+ if (putMap != null && !putMap.isEmpty()) {
+ writeStore.putAll(this, putMap);
- // Reset.
- putMap.clear();
+ // Reset.
+ putMap.clear();
+ }
writeStore = null;
}
@@ -574,11 +576,13 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
writeStore = null;
}
- if (writeStore != null && writeStore != cacheCtx.store() && rmvCol != null && !rmvCol.isEmpty()) {
- writeStore.removeAll(this, rmvCol);
+ if (writeStore != null && writeStore != cacheCtx.store()) {
+ if (rmvCol != null && !rmvCol.isEmpty()) {
+ writeStore.removeAll(this, rmvCol);
- // Reset.
- rmvCol.clear();
+ // Reset.
+ rmvCol.clear();
+ }
writeStore = null;
}
@@ -623,8 +627,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
}
// Commit while locks are held.
- for (CacheStoreManager store : stores)
- store.sessionEnd(this, true);
+ sessionEnd(stores, true);
}
catch (IgniteCheckedException ex) {
commitError(ex);
@@ -649,6 +652,10 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
throw new IgniteCheckedException("Failed to commit transaction to database: " + this, ex);
}
+ finally {
+ if (isRollbackOnly())
+ sessionEnd(stores, false);
+ }
}
}
@@ -984,13 +991,12 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
cctx.tm().resetContext();
}
}
- else {
+ else if (!internal()) {
Collection<CacheStoreManager> stores = stores();
- if (stores != null && !stores.isEmpty() && !internal()) {
+ if (stores != null && !stores.isEmpty()) {
try {
- for (CacheStoreManager store : stores)
- store.sessionEnd(this, true);
+ sessionEnd(stores, true);
}
catch (IgniteCheckedException e) {
commitError(e);
@@ -1091,13 +1097,11 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
cctx.tm().rollbackTx(this);
- Collection<CacheStoreManager> stores = stores();
+ if (!internal()) {
+ Collection<CacheStoreManager> stores = stores();
- if (stores != null && !stores.isEmpty() && (near() || F.first(stores).isWriteToStoreFromDht())) {
- if (!internal()) {
- for (CacheStoreManager store : stores)
- store.sessionEnd(this, false);
- }
+ if (stores != null && !stores.isEmpty() && (near() || F.first(stores).isWriteToStoreFromDht()))
+ sessionEnd(stores, false);
}
}
catch (Error | IgniteCheckedException | RuntimeException e) {
@@ -1109,6 +1113,21 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
}
/**
+ * @param stores Store managers.
+ * @param commit Commit flag.
+ * @throws IgniteCheckedException In case of error.
+ */
+ private void sessionEnd(Collection<CacheStoreManager> stores, boolean commit) throws IgniteCheckedException {
+ Iterator<CacheStoreManager> it = stores.iterator();
+
+ while (it.hasNext()) {
+ CacheStoreManager store = it.next();
+
+ store.sessionEnd(this, commit, !it.hasNext());
+ }
+ }
+
+ /**
* Checks if there is a cached or swapped value for
* {@link #getAllAsync(GridCacheContext, Collection, GridCacheEntryEx, boolean, boolean, boolean, boolean)} method.
*
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/16f045f8/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerAbstractSelfTest.java
index 5a01c2d..5df8f68 100644
--- a/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerAbstractSelfTest.java
@@ -20,6 +20,7 @@ package org.apache.ignite.cache.store;
import org.apache.ignite.*;
import org.apache.ignite.cache.*;
import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.spi.discovery.tcp.*;
import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
@@ -27,7 +28,9 @@ import org.apache.ignite.testframework.junits.common.*;
import org.apache.ignite.transactions.*;
import javax.cache.configuration.*;
+import javax.cache.integration.*;
import java.io.*;
+import java.sql.*;
import java.util.concurrent.atomic.*;
/**
@@ -38,6 +41,9 @@ public abstract class CacheStoreSessionListenerAbstractSelfTest extends GridComm
private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
/** */
+ protected static final String URL = "jdbc:h2:mem:example;DB_CLOSE_DELAY=-1";
+
+ /** */
protected static final AtomicInteger loadCacheCnt = new AtomicInteger();
/** */
@@ -52,6 +58,12 @@ public abstract class CacheStoreSessionListenerAbstractSelfTest extends GridComm
/** */
protected static final AtomicInteger reuseCnt = new AtomicInteger();
+ /** */
+ protected static final AtomicBoolean write = new AtomicBoolean();
+
+ /** */
+ protected static final AtomicBoolean fail = new AtomicBoolean();
+
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
@@ -77,11 +89,22 @@ public abstract class CacheStoreSessionListenerAbstractSelfTest extends GridComm
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
+ try (Connection conn = DriverManager.getConnection(URL)) {
+ conn.createStatement().executeUpdate("DROP TABLE IF EXISTS Table1");
+ conn.createStatement().executeUpdate("DROP TABLE IF EXISTS Table2");
+
+ conn.createStatement().executeUpdate("CREATE TABLE Table1 (key INT, value INT)");
+ conn.createStatement().executeUpdate("CREATE TABLE Table2 (key INT, value INT)");
+ }
+
loadCacheCnt.set(0);
loadCnt.set(0);
writeCnt.set(0);
deleteCnt.set(0);
reuseCnt.set(0);
+
+ write.set(false);
+ fail.set(false);
}
/**
@@ -174,6 +197,94 @@ public abstract class CacheStoreSessionListenerAbstractSelfTest extends GridComm
}
/**
+ * @throws Exception If failed.
+ */
+ public void testCommit() throws Exception {
+ write.set(true);
+
+ CacheConfiguration<Integer, Integer> cfg1 = cacheConfiguration("cache1", CacheAtomicityMode.TRANSACTIONAL);
+ CacheConfiguration<Integer, Integer> cfg2 = cacheConfiguration("cache2", CacheAtomicityMode.TRANSACTIONAL);
+
+ try (
+ IgniteCache<Integer, Integer> cache1 = ignite(0).createCache(cfg1);
+ IgniteCache<Integer, Integer> cache2 = ignite(0).createCache(cfg2)
+ ) {
+ try (Transaction tx = ignite(0).transactions().txStart()) {
+ cache1.put(1, 1);
+ cache2.put(2, 2);
+
+ tx.commit();
+ }
+ }
+
+ try (Connection conn = DriverManager.getConnection(URL)) {
+ checkTable(conn, 1, false);
+ checkTable(conn, 2, false);
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testRollback() throws Exception {
+ write.set(true);
+ fail.set(true);
+
+ CacheConfiguration<Integer, Integer> cfg1 = cacheConfiguration("cache1", CacheAtomicityMode.TRANSACTIONAL);
+ CacheConfiguration<Integer, Integer> cfg2 = cacheConfiguration("cache2", CacheAtomicityMode.TRANSACTIONAL);
+
+ try (
+ IgniteCache<Integer, Integer> cache1 = ignite(0).createCache(cfg1);
+ IgniteCache<Integer, Integer> cache2 = ignite(0).createCache(cfg2)
+ ) {
+ try (Transaction tx = ignite(0).transactions().txStart()) {
+ cache1.put(1, 1);
+ cache2.put(2, 2);
+
+ tx.commit();
+
+ assert false : "Exception was not thrown.";
+ }
+ catch (IgniteException e) {
+ CacheWriterException we = X.cause(e, CacheWriterException.class);
+
+ assertNotNull(we);
+
+ assertEquals("Expected failure.", we.getMessage());
+ }
+ }
+
+ try (Connection conn = DriverManager.getConnection(URL)) {
+ checkTable(conn, 1, true);
+ checkTable(conn, 2, true);
+ }
+ }
+
+ /**
+ * @param conn Connection.
+ * @param idx Table index.
+ * @param empty If table expected to be empty.
+ * @throws Exception In case of error.
+ */
+ private void checkTable(Connection conn, int idx, boolean empty) throws Exception {
+ ResultSet rs = conn.createStatement().executeQuery("SELECT key, value FROM Table" + idx);
+
+ int cnt = 0;
+
+ while (rs.next()) {
+ int key = rs.getInt(1);
+ int val = rs.getInt(2);
+
+ assertEquals(idx, key);
+ assertEquals(idx, val);
+
+ cnt++;
+ }
+
+ assertEquals(empty ? 0 : 1, cnt);
+ }
+
+ /**
* @param name Cache name.
* @param atomicity Atomicity mode.
* @return Cache configuration.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/16f045f8/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheStoreSessionJdbcListenerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheStoreSessionJdbcListenerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheStoreSessionJdbcListenerSelfTest.java
index 9020e0d..e4dac88 100644
--- a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheStoreSessionJdbcListenerSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheStoreSessionJdbcListenerSelfTest.java
@@ -47,7 +47,7 @@ public class CacheStoreSessionJdbcListenerSelfTest extends CacheStoreSessionList
@Override public CacheStoreSessionListener create() {
CacheStoreSessionJdbcListener lsnr = new CacheStoreSessionJdbcListener();
- lsnr.setDataSource(JdbcConnectionPool.create("jdbc:h2:mem:example;DB_CLOSE_DELAY=-1", "", ""));
+ lsnr.setDataSource(JdbcConnectionPool.create(URL, "", ""));
return lsnr;
}
@@ -86,6 +86,43 @@ public class CacheStoreSessionJdbcListenerSelfTest extends CacheStoreSessionList
writeCnt.incrementAndGet();
checkConnection();
+
+ if (write.get()) {
+ Connection conn = connection();
+
+ try {
+ String table;
+
+ switch (ses.cacheName()) {
+ case "cache1":
+ table = "Table1";
+
+ break;
+
+ case "cache2":
+ if (fail.get())
+ throw new CacheWriterException("Expected failure.");
+
+ table = "Table2";
+
+ break;
+
+ default:
+ throw new CacheWriterException("Wring cache: " + ses.cacheName());
+ }
+
+ PreparedStatement stmt = conn.prepareStatement(
+ "INSERT INTO " + table + " (key, value) VALUES (?, ?)");
+
+ stmt.setInt(1, entry.getKey());
+ stmt.setInt(2, entry.getValue());
+
+ stmt.executeUpdate();
+ }
+ catch (SQLException e) {
+ throw new CacheWriterException(e);
+ }
+ }
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/16f045f8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java
index cb32b13..f72ea47 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java
@@ -101,19 +101,28 @@ public class IgniteCrossCacheTxStoreSelfTest extends GridCommonAbstractTest {
grid(0).cache("cacheA").removeAll();
grid(0).cache("cacheB").removeAll();
grid(0).cache("cacheC").removeAll();
+
+ for (CacheStore store : firstStores.values())
+ ((TestStore)store).clear();
+
+ for (CacheStore store : secondStores.values())
+ ((TestStore)store).clear();
}
/**
* @throws Exception If failed.
*/
- public void testWriteThrough() throws Exception {
+ public void testSameStore() throws Exception {
IgniteEx grid = grid(0);
TestStore firstStore = (TestStore)firstStores.get(grid.name());
+ TestStore secondStore = (TestStore)secondStores.get(grid.name());
assertNotNull(firstStore);
+ assertNotNull(secondStore);
- Collection<String> evts = firstStore.events();
+ Collection<String> firstStoreEvts = firstStore.events();
+ Collection<String> secondStoreEvts = secondStore.events();
try (Transaction tx = grid.transactions().txStart()) {
IgniteCache<Object, Object> cacheA = grid.cache("cacheA");
@@ -138,58 +147,122 @@ public class IgniteCrossCacheTxStoreSelfTest extends GridCommonAbstractTest {
}
assertEqualsCollections(F.asList(
- "writeAll cacheA 2",
- "writeAll cacheB 2",
- "deleteAll cacheA 2",
- "deleteAll cacheB 2",
- "write cacheA",
- "delete cacheA",
- "write cacheB",
- "sessionEnd true"
- ),
- evts);
+ "writeAll cacheA 2",
+ "writeAll cacheB 2",
+ "deleteAll cacheA 2",
+ "deleteAll cacheB 2",
+ "write cacheA",
+ "delete cacheA",
+ "write cacheB",
+ "sessionEnd true"
+ ),
+ firstStoreEvts);
+
+ assertEquals(0, secondStoreEvts.size());
}
/**
* @throws Exception If failed.
*/
- public void testIncompatibleCaches1() throws Exception {
+ public void testDifferentStores() throws Exception {
IgniteEx grid = grid(0);
- try (Transaction ignored = grid.transactions().txStart()) {
+ TestStore firstStore = (TestStore)firstStores.get(grid.name());
+ TestStore secondStore = (TestStore)secondStores.get(grid.name());
+
+ assertNotNull(firstStore);
+ assertNotNull(secondStore);
+
+ Collection<String> firstStoreEvts = firstStore.events();
+ Collection<String> secondStoreEvts = secondStore.events();
+
+ try (Transaction tx = grid.transactions().txStart()) {
IgniteCache<Object, Object> cacheA = grid.cache("cacheA");
IgniteCache<Object, Object> cacheC = grid.cache("cacheC");
- cacheA.put("1", "2");
+ cacheA.put("1", "1");
+ cacheA.put("2", "2");
+ cacheC.put("1", "1");
+ cacheC.put("2", "2");
+
+ cacheA.remove("3");
+ cacheA.remove("4");
+ cacheC.remove("3");
+ cacheC.remove("4");
+
+ cacheA.put("5", "5");
+ cacheA.remove("6");
- cacheC.put("1", "2");
+ cacheC.put("7", "7");
- fail("Must not allow to enlist caches with different stores to one transaction");
- }
- catch (CacheException e) {
- assertTrue(e.getMessage().contains("Failed to enlist new cache to existing transaction"));
+ tx.commit();
}
+
+ assertEqualsCollections(F.asList(
+ "writeAll cacheA 2",
+ "deleteAll cacheA 2",
+ "write cacheA",
+ "delete cacheA",
+ "sessionEnd true"
+ ),
+ firstStoreEvts);
+
+ assertEqualsCollections(F.asList(
+ "writeAll cacheC 2",
+ "deleteAll cacheC 2",
+ "write cacheC",
+ "sessionEnd true"
+ ),
+ secondStoreEvts);
}
/**
* @throws Exception If failed.
*/
- public void testIncompatibleCaches2() throws Exception {
+ public void testNonPersistentCache() throws Exception {
IgniteEx grid = grid(0);
- try (Transaction ignored = grid.transactions().txStart()) {
+ TestStore firstStore = (TestStore)firstStores.get(grid.name());
+ TestStore secondStore = (TestStore)secondStores.get(grid.name());
+
+ assertNotNull(firstStore);
+ assertNotNull(secondStore);
+
+ Collection<String> firstStoreEvts = firstStore.events();
+ Collection<String> secondStoreEvts = secondStore.events();
+
+ try (Transaction tx = grid.transactions().txStart()) {
IgniteCache<Object, Object> cacheA = grid.cache("cacheA");
- IgniteCache<Object, Object> cacheC = grid.cache("cacheD");
+ IgniteCache<Object, Object> cacheD = grid.cache("cacheD");
+
+ cacheA.put("1", "1");
+ cacheA.put("2", "2");
+ cacheD.put("1", "1");
+ cacheD.put("2", "2");
- cacheA.put("1", "2");
+ cacheA.remove("3");
+ cacheA.remove("4");
+ cacheD.remove("3");
+ cacheD.remove("4");
+
+ cacheA.put("5", "5");
+ cacheA.remove("6");
- cacheC.put("1", "2");
+ cacheD.put("7", "7");
- fail("Must not allow to enlist caches with different stores to one transaction");
- }
- catch (CacheException e) {
- assertTrue(e.getMessage().contains("Failed to enlist new cache to existing transaction"));
+ tx.commit();
}
+
+ assertEqualsCollections(F.asList(
+ "writeAll cacheA 2",
+ "deleteAll cacheA 2",
+ "write cacheA",
+ "delete cacheA",
+ "sessionEnd true"
+ ),
+ firstStoreEvts);
+
+ assertEquals(0, secondStoreEvts.size());
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/16f045f8/modules/hibernate/src/test/java/org/apache/ignite/cache/store/hibernate/CacheStoreSessionHibernateListenerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hibernate/src/test/java/org/apache/ignite/cache/store/hibernate/CacheStoreSessionHibernateListenerSelfTest.java b/modules/hibernate/src/test/java/org/apache/ignite/cache/store/hibernate/CacheStoreSessionHibernateListenerSelfTest.java
index 85b0b95..d631393 100644
--- a/modules/hibernate/src/test/java/org/apache/ignite/cache/store/hibernate/CacheStoreSessionHibernateListenerSelfTest.java
+++ b/modules/hibernate/src/test/java/org/apache/ignite/cache/store/hibernate/CacheStoreSessionHibernateListenerSelfTest.java
@@ -23,7 +23,6 @@ import org.apache.ignite.lang.*;
import org.apache.ignite.resources.*;
import org.hibernate.*;
import org.hibernate.cfg.Configuration;
-import org.hibernate.service.*;
import javax.cache.Cache;
import javax.cache.configuration.*;
@@ -50,10 +49,9 @@ public class CacheStoreSessionHibernateListenerSelfTest extends CacheStoreSessio
CacheStoreSessionHibernateListener lsnr = new CacheStoreSessionHibernateListener();
Configuration cfg = new Configuration().
- setProperty("hibernate.dialect", "org.hibernate.dialect.H2Dialect").
- setProperty("hibernate.connection.datasource", "jdbc:h2:mem:example;DB_CLOSE_DELAY=-1");
+ setProperty("hibernate.connection.url", URL);
- lsnr.setSessionFactory(cfg.buildSessionFactory(new ServiceRegistryBuilder().buildServiceRegistry()));
+ lsnr.setSessionFactory(cfg.buildSessionFactory());
return lsnr;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/16f045f8/modules/spring/src/test/java/org/apache/ignite/cache/store/spring/CacheStoreSessionSpringListenerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/spring/src/test/java/org/apache/ignite/cache/store/spring/CacheStoreSessionSpringListenerSelfTest.java b/modules/spring/src/test/java/org/apache/ignite/cache/store/spring/CacheStoreSessionSpringListenerSelfTest.java
index a7ca317..79d5b5e 100644
--- a/modules/spring/src/test/java/org/apache/ignite/cache/store/spring/CacheStoreSessionSpringListenerSelfTest.java
+++ b/modules/spring/src/test/java/org/apache/ignite/cache/store/spring/CacheStoreSessionSpringListenerSelfTest.java
@@ -37,7 +37,7 @@ import java.util.*;
*/
public class CacheStoreSessionSpringListenerSelfTest extends CacheStoreSessionListenerAbstractSelfTest {
/** */
- private static final DataSource DATA_SRC = new DriverManagerDataSource("jdbc:h2:mem:example;DB_CLOSE_DELAY=-1");
+ private static final DataSource DATA_SRC = new DriverManagerDataSource(URL);
/** {@inheritDoc} */
@Override protected Factory<? extends CacheStore<Integer, Integer>> storeFactory() {
@@ -106,6 +106,31 @@ public class CacheStoreSessionSpringListenerSelfTest extends CacheStoreSessionLi
checkTransaction();
checkConnection();
+
+ if (write.get()) {
+ String table;
+
+ switch (ses.cacheName()) {
+ case "cache1":
+ table = "Table1";
+
+ break;
+
+ case "cache2":
+ if (fail.get())
+ throw new CacheWriterException("Expected failure.");
+
+ table = "Table2";
+
+ break;
+
+ default:
+ throw new CacheWriterException("Wring cache: " + ses.cacheName());
+ }
+
+ jdbc.update("INSERT INTO " + table + " (key, value) VALUES (?, ?)",
+ entry.getKey(), entry.getValue());
+ }
}
/** {@inheritDoc} */