You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/03 12:55:35 UTC
[05/50] [abbrv] incubator-ignite git commit: IGNITE-891 - Cache store
improvements
IGNITE-891 - Cache store improvements
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/ada1b2a7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/ada1b2a7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/ada1b2a7
Branch: refs/heads/ignite-gg-10299
Commit: ada1b2a7c4d82722cc5721bad50042af7216bfdc
Parents: 990bf9e
Author: Valentin Kulichenko <vk...@gridgain.com>
Authored: Sun May 24 20:42:53 2015 -0700
Committer: Valentin Kulichenko <vk...@gridgain.com>
Committed: Sun May 24 20:42:53 2015 -0700
----------------------------------------------------------------------
.../hibernate/CacheHibernatePersonStore.java | 27 +-
.../store/jdbc/CacheJdbcPersonStore.java | 69 ++--
.../store/jdbc/CacheJdbcStoreExample.java | 3 +-
.../store/spring/CacheSpringPersonStore.java | 128 ++++++
.../store/spring/CacheSpringStoreExample.java | 143 +++++++
.../datagrid/store/spring/package-info.java | 22 ++
.../processors/cache/GridCacheProcessor.java | 4 +-
.../processors/cache/GridCacheUtils.java | 25 +-
.../store/GridCacheStoreManagerAdapter.java | 61 ++-
...heStoreSessionListenerLifeCycleSelfTest.java | 395 +++++++++++++++++++
.../IgniteCrossCacheTxStoreSelfTest.java | 24 --
.../junits/common/GridCommonAbstractTest.java | 24 ++
.../spring/CacheSpringStoreSessionListener.java | 2 +-
13 files changed, 810 insertions(+), 117 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ada1b2a7/examples/src/main/java/org/apache/ignite/examples/datagrid/store/hibernate/CacheHibernatePersonStore.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/datagrid/store/hibernate/CacheHibernatePersonStore.java b/examples/src/main/java/org/apache/ignite/examples/datagrid/store/hibernate/CacheHibernatePersonStore.java
index 557ec6f..80a9f22 100644
--- a/examples/src/main/java/org/apache/ignite/examples/datagrid/store/hibernate/CacheHibernatePersonStore.java
+++ b/examples/src/main/java/org/apache/ignite/examples/datagrid/store/hibernate/CacheHibernatePersonStore.java
@@ -54,13 +54,7 @@ public class CacheHibernatePersonStore extends CacheStoreAdapter<Long, Person> {
Long key = entry.getKey();
Person val = entry.getValue();
- System.out.println(">>> Store put [key=" + key + ", val=" + val + ']');
-
- if (val == null) {
- delete(key);
-
- return;
- }
+ System.out.println(">>> Store write [key=" + key + ", val=" + val + ']');
Session hibSes = ses.attachment();
@@ -75,13 +69,14 @@ public class CacheHibernatePersonStore extends CacheStoreAdapter<Long, Person> {
/** {@inheritDoc} */
@SuppressWarnings({"JpaQueryApiInspection"})
@Override public void delete(Object key) {
- System.out.println(">>> Store remove [key=" + key + ']');
+ System.out.println(">>> Store delete [key=" + key + ']');
Session hibSes = ses.attachment();
try {
- hibSes.createQuery("delete " + Person.class.getSimpleName() + " where key = :key")
- .setParameter("key", key).setFlushMode(FlushMode.ALWAYS).executeUpdate();
+ hibSes.createQuery("delete " + Person.class.getSimpleName() + " where key = :key").
+ setParameter("key", key).
+ executeUpdate();
}
catch (HibernateException e) {
throw new CacheWriterException("Failed to remove value from cache store [key=" + key + ']', e);
@@ -100,13 +95,13 @@ public class CacheHibernatePersonStore extends CacheStoreAdapter<Long, Person> {
try {
int cnt = 0;
- List res = hibSes.createCriteria(Person.class).list();
-
- if (res != null) {
- Iterator iter = res.iterator();
+ List list = hibSes.createCriteria(Person.class).
+ setMaxResults(entryCnt).
+ list();
- while (cnt < entryCnt && iter.hasNext()) {
- Person person = (Person)iter.next();
+ if (list != null) {
+ for (Object obj : list) {
+ Person person = (Person)obj;
clo.apply(person.getId(), person);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ada1b2a7/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/CacheJdbcPersonStore.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/CacheJdbcPersonStore.java b/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/CacheJdbcPersonStore.java
index 6eb0386..ed14a99 100644
--- a/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/CacheJdbcPersonStore.java
+++ b/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/CacheJdbcPersonStore.java
@@ -22,9 +22,11 @@ import org.apache.ignite.cache.store.*;
import org.apache.ignite.examples.datagrid.store.*;
import org.apache.ignite.lang.*;
import org.apache.ignite.resources.*;
+import org.h2.jdbcx.*;
import javax.cache.*;
import javax.cache.integration.*;
+import javax.sql.*;
import java.sql.*;
/**
@@ -32,6 +34,10 @@ import java.sql.*;
* transaction with cache transactions and maps {@link Long} to {@link Person}.
*/
public class CacheJdbcPersonStore extends CacheStoreAdapter<Long, Person> {
+ /** Data source. */
+ public static final DataSource DATA_SRC =
+ JdbcConnectionPool.create("jdbc:h2:mem:example;DB_CLOSE_DELAY=-1", "", "");
+
/** Store session. */
@CacheStoreSessionResource
private CacheStoreSession ses;
@@ -52,12 +58,10 @@ public class CacheJdbcPersonStore extends CacheStoreAdapter<Long, Person> {
* @throws IgniteException If failed.
*/
private void prepareDb() throws IgniteException {
- try (
- Connection conn = DriverManager.getConnection("jdbc:h2:mem:example;DB_CLOSE_DELAY=-1");
- Statement st = conn.createStatement()
- ) {
- st.execute("create table if not exists PERSONS (id number unique, firstName varchar(255), " +
- "lastName varchar(255))");
+ try (Connection conn = DATA_SRC.getConnection()) {
+ conn.createStatement().execute(
+ "create table if not exists PERSONS (" +
+ "id number unique, firstName varchar(255), lastName varchar(255))");
}
catch (SQLException e) {
throw new IgniteException("Failed to create database table.", e);
@@ -66,34 +70,28 @@ public class CacheJdbcPersonStore extends CacheStoreAdapter<Long, Person> {
/** {@inheritDoc} */
@Override public Person load(Long key) {
- System.out.println(">>> Loading key: " + key);
+ System.out.println(">>> Store load [key=" + key + ']');
- try {
- Connection conn = ses.attachment();
+ Connection conn = ses.attachment();
- try (PreparedStatement st = conn.prepareStatement("select * from PERSONS where id=?")) {
- st.setString(1, key.toString());
+ try (PreparedStatement st = conn.prepareStatement("select * from PERSONS where id = ?")) {
+ st.setString(1, key.toString());
- ResultSet rs = st.executeQuery();
+ ResultSet rs = st.executeQuery();
- if (rs.next())
- return new Person(rs.getLong(1), rs.getString(2), rs.getString(3));
- }
+ return rs.next() ? new Person(rs.getLong(1), rs.getString(2), rs.getString(3)) : null;
}
catch (SQLException e) {
- throw new CacheLoaderException("Failed to load object: " + key, e);
+ throw new CacheLoaderException("Failed to load object [key=" + key + ']', e);
}
-
- return null;
}
/** {@inheritDoc} */
@Override public void write(Cache.Entry<? extends Long, ? extends Person> entry) {
Long key = entry.getKey();
-
Person val = entry.getValue();
- System.out.println(">>> Putting [key=" + key + ", val=" + val + ']');
+ System.out.println(">>> Store write [key=" + key + ", val=" + val + ']');
try {
Connection conn = ses.attachment();
@@ -103,7 +101,7 @@ public class CacheJdbcPersonStore extends CacheStoreAdapter<Long, Person> {
// Try update first. If it does not work, then try insert.
// Some databases would allow these to be done in one 'upsert' operation.
try (PreparedStatement st = conn.prepareStatement(
- "update PERSONS set firstName=?, lastName=? where id=?")) {
+ "update PERSONS set firstName = ?, lastName = ? where id = ?")) {
st.setString(1, val.getFirstName());
st.setString(2, val.getLastName());
st.setLong(3, val.getId());
@@ -114,7 +112,7 @@ public class CacheJdbcPersonStore extends CacheStoreAdapter<Long, Person> {
// If update failed, try to insert.
if (updated == 0) {
try (PreparedStatement st = conn.prepareStatement(
- "insert into PERSONS (id, firstName, lastName) values(?, ?, ?)")) {
+ "insert into PERSONS (id, firstName, lastName) values (?, ?, ?)")) {
st.setLong(1, val.getId());
st.setString(2, val.getFirstName());
st.setString(3, val.getLastName());
@@ -124,25 +122,23 @@ public class CacheJdbcPersonStore extends CacheStoreAdapter<Long, Person> {
}
}
catch (SQLException e) {
- throw new CacheLoaderException("Failed to put object [key=" + key + ", val=" + val + ']', e);
+ throw new CacheWriterException("Failed to write object [key=" + key + ", val=" + val + ']', e);
}
}
/** {@inheritDoc} */
@Override public void delete(Object key) {
- System.out.println(">>> Removing key: " + key);
+ System.out.println(">>> Store delete [key=" + key + ']');
- try {
- Connection conn = ses.attachment();
+ Connection conn = ses.attachment();
- try (PreparedStatement st = conn.prepareStatement("delete from PERSONS where id=?")) {
- st.setLong(1, (Long)key);
+ try (PreparedStatement st = conn.prepareStatement("delete from PERSONS where id=?")) {
+ st.setLong(1, (Long)key);
- st.executeUpdate();
- }
+ st.executeUpdate();
}
catch (SQLException e) {
- throw new CacheWriterException("Failed to remove object: " + key, e);
+ throw new CacheWriterException("Failed to delete object [key=" + key + ']', e);
}
}
@@ -155,13 +151,14 @@ public class CacheJdbcPersonStore extends CacheStoreAdapter<Long, Person> {
Connection conn = ses.attachment();
- try (
- PreparedStatement st = conn.prepareStatement("select * from PERSONS");
- ResultSet rs = st.executeQuery()
- ) {
+ try (PreparedStatement stmt = conn.prepareStatement("select * from PERSONS limit ?")) {
+ stmt.setInt(1, entryCnt);
+
+ ResultSet rs = stmt.executeQuery();
+
int cnt = 0;
- while (cnt < entryCnt && rs.next()) {
+ while (rs.next()) {
Person person = new Person(rs.getLong(1), rs.getString(2), rs.getString(3));
clo.apply(person.getId(), person);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ada1b2a7/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/CacheJdbcStoreExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/CacheJdbcStoreExample.java b/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/CacheJdbcStoreExample.java
index 74e262c..637d6dc 100644
--- a/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/CacheJdbcStoreExample.java
+++ b/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/CacheJdbcStoreExample.java
@@ -24,7 +24,6 @@ import org.apache.ignite.configuration.*;
import org.apache.ignite.examples.*;
import org.apache.ignite.examples.datagrid.store.*;
import org.apache.ignite.transactions.*;
-import org.h2.jdbcx.*;
import javax.cache.configuration.*;
import java.util.*;
@@ -79,7 +78,7 @@ public class CacheJdbcStoreExample {
@Override public CacheStoreSessionListener create() {
CacheJdbcStoreSessionListener lsnr = new CacheJdbcStoreSessionListener();
- lsnr.setDataSource(JdbcConnectionPool.create("jdbc:h2:mem:example;DB_CLOSE_DELAY=-1", "", ""));
+ lsnr.setDataSource(CacheJdbcPersonStore.DATA_SRC);
return lsnr;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ada1b2a7/examples/src/main/java/org/apache/ignite/examples/datagrid/store/spring/CacheSpringPersonStore.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/datagrid/store/spring/CacheSpringPersonStore.java b/examples/src/main/java/org/apache/ignite/examples/datagrid/store/spring/CacheSpringPersonStore.java
new file mode 100644
index 0000000..50149ba
--- /dev/null
+++ b/examples/src/main/java/org/apache/ignite/examples/datagrid/store/spring/CacheSpringPersonStore.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.datagrid.store.spring;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.store.*;
+import org.apache.ignite.examples.datagrid.store.*;
+import org.apache.ignite.lang.*;
+import org.springframework.dao.*;
+import org.springframework.jdbc.core.*;
+import org.springframework.jdbc.datasource.*;
+
+import javax.cache.*;
+import javax.cache.integration.*;
+import javax.sql.*;
+import java.sql.*;
+import java.util.concurrent.atomic.*;
+
+/**
+ * Example of {@link CacheStore} implementation that uses JDBC
+ * transaction with cache transactions and maps {@link Long} to {@link Person}.
+ */
+public class CacheSpringPersonStore extends CacheStoreAdapter<Long, Person> {
+ /** Data source. */
+ public static final DataSource DATA_SRC = new DriverManagerDataSource("jdbc:h2:mem:example;DB_CLOSE_DELAY=-1");
+
+ /** Spring JDBC template. */
+ private JdbcTemplate jdbcTemplate;
+
+ /**
+ * Constructor.
+ *
+ * @throws IgniteException If failed.
+ */
+ public CacheSpringPersonStore() throws IgniteException {
+ jdbcTemplate = new JdbcTemplate(DATA_SRC);
+
+ prepareDb();
+ }
+
+ /**
+ * Prepares database for example execution. This method will create a
+ * table called "PERSONS" so it can be used by store implementation.
+ *
+ * @throws IgniteException If failed.
+ */
+ private void prepareDb() throws IgniteException {
+ jdbcTemplate.update(
+ "create table if not exists PERSONS (" +
+ "id number unique, firstName varchar(255), lastName varchar(255))");
+ }
+
+ /** {@inheritDoc} */
+ @Override public Person load(Long key) {
+ System.out.println(">>> Store load [key=" + key + ']');
+
+ try {
+ return jdbcTemplate.queryForObject("select * from PERSONS where id = ?", new RowMapper<Person>() {
+ @Override public Person mapRow(ResultSet rs, int rowNum) throws SQLException {
+ return new Person(rs.getLong(1), rs.getString(2), rs.getString(3));
+ }
+ }, key);
+ }
+ catch (EmptyResultDataAccessException ignored) {
+ return null;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void write(Cache.Entry<? extends Long, ? extends Person> entry) {
+ Long key = entry.getKey();
+ Person val = entry.getValue();
+
+ System.out.println(">>> Store write [key=" + key + ", val=" + val + ']');
+
+ int updated = jdbcTemplate.update("update PERSONS set firstName = ?, lastName = ? where id = ?",
+ val.getFirstName(), val.getLastName(), val.getId());
+
+ if (updated == 0) {
+ jdbcTemplate.update("insert into PERSONS (id, firstName, lastName) values (?, ?, ?)",
+ val.getId(), val.getFirstName(), val.getLastName());
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void delete(Object key) {
+ System.out.println(">>> Store delete [key=" + key + ']');
+
+ jdbcTemplate.update("delete from PERSONS where id = ?", key);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void loadCache(final IgniteBiInClosure<Long, Person> clo, Object... args) {
+ if (args == null || args.length == 0 || args[0] == null)
+ throw new CacheLoaderException("Expected entry count parameter is not provided.");
+
+ int entryCnt = (Integer)args[0];
+
+ final AtomicInteger cnt = new AtomicInteger();
+
+ jdbcTemplate.query("select * from PERSONS limit ?", new RowCallbackHandler() {
+ @Override public void processRow(ResultSet rs) throws SQLException {
+ Person person = new Person(rs.getLong(1), rs.getString(2), rs.getString(3));
+
+ clo.apply(person.getId(), person);
+
+ cnt.incrementAndGet();
+ }
+ }, entryCnt);
+
+ System.out.println(">>> Loaded " + cnt + " values into cache.");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ada1b2a7/examples/src/main/java/org/apache/ignite/examples/datagrid/store/spring/CacheSpringStoreExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/datagrid/store/spring/CacheSpringStoreExample.java b/examples/src/main/java/org/apache/ignite/examples/datagrid/store/spring/CacheSpringStoreExample.java
new file mode 100644
index 0000000..9be6672
--- /dev/null
+++ b/examples/src/main/java/org/apache/ignite/examples/datagrid/store/spring/CacheSpringStoreExample.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.datagrid.store.spring;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.store.*;
+import org.apache.ignite.cache.store.jdbc.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.examples.*;
+import org.apache.ignite.examples.datagrid.store.*;
+import org.apache.ignite.transactions.*;
+
+import javax.cache.configuration.*;
+import java.util.*;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+
+/**
+ * Demonstrates usage of cache with underlying persistent store configured.
+ * <p>
+ * This example uses {@link CacheSpringPersonStore} as a persistent store.
+ * <p>
+ * Remote nodes can be started with {@link ExampleNodeStartup} in another JVM which will
+ * start node with {@code examples/config/example-ignite.xml} configuration.
+ */
+public class CacheSpringStoreExample {
+ /** Cache name. */
+ private static final String CACHE_NAME = CacheSpringStoreExample.class.getSimpleName();
+
+ /** Heap size required to run this example. */
+ public static final int MIN_MEMORY = 1024 * 1024 * 1024;
+
+ /** Number of entries to load. */
+ private static final int ENTRY_COUNT = 100_000;
+
+ /** Global person ID to use across entire example. */
+ private static final Long id = Math.abs(UUID.randomUUID().getLeastSignificantBits());
+
+ /**
+ * Executes example.
+ *
+ * @param args Command line arguments, none required.
+ * @throws IgniteException If example execution failed.
+ */
+ public static void main(String[] args) throws IgniteException {
+ ExamplesUtils.checkMinMemory(MIN_MEMORY);
+
+ // To start ignite with desired configuration uncomment the appropriate line.
+ try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) {
+ System.out.println();
+ System.out.println(">>> Cache store example started.");
+
+ CacheConfiguration<Long, Person> cacheCfg = new CacheConfiguration<>(CACHE_NAME);
+
+ // Set atomicity as transaction, since we are showing transactions in example.
+ cacheCfg.setAtomicityMode(TRANSACTIONAL);
+
+ // Configure JDBC store.
+ cacheCfg.setCacheStoreFactory(FactoryBuilder.factoryOf(CacheSpringPersonStore.class));
+
+ // Configure JDBC session listener.
+ cacheCfg.setCacheStoreSessionListenerFactories(new Factory<CacheStoreSessionListener>() {
+ @Override public CacheStoreSessionListener create() {
+ CacheJdbcStoreSessionListener lsnr = new CacheJdbcStoreSessionListener();
+
+ lsnr.setDataSource(CacheSpringPersonStore.DATA_SRC);
+
+ return lsnr;
+ }
+ });
+
+ cacheCfg.setReadThrough(true);
+ cacheCfg.setWriteThrough(true);
+
+ try (IgniteCache<Long, Person> cache = ignite.getOrCreateCache(cacheCfg)) {
+ // Make initial cache loading from persistent store. This is a
+ // distributed operation and will call CacheStore.loadCache(...)
+ // method on all nodes in topology.
+ loadCache(cache);
+
+ // Start transaction and execute several cache operations with
+ // read/write-through to persistent store.
+ executeTransaction(cache);
+ }
+ }
+ }
+
+ /**
+ * Makes initial cache loading.
+ *
+ * @param cache Cache to load.
+ */
+ private static void loadCache(IgniteCache<Long, Person> cache) {
+ long start = System.currentTimeMillis();
+
+ // Start loading cache from persistent store on all caching nodes.
+ cache.loadCache(null, ENTRY_COUNT);
+
+ long end = System.currentTimeMillis();
+
+ System.out.println(">>> Loaded " + cache.size() + " keys with backups in " + (end - start) + "ms.");
+ }
+
+ /**
+ * Executes transaction with read/write-through to persistent store.
+ *
+ * @param cache Cache to execute transaction on.
+ */
+ private static void executeTransaction(IgniteCache<Long, Person> cache) {
+ try (Transaction tx = Ignition.ignite().transactions().txStart()) {
+ Person val = cache.get(id);
+
+ System.out.println("Read value: " + val);
+
+ val = cache.getAndPut(id, new Person(id, "Isaac", "Newton"));
+
+ System.out.println("Overwrote old value: " + val);
+
+ val = cache.get(id);
+
+ System.out.println("Read value: " + val);
+
+ tx.commit();
+ }
+
+ System.out.println("Read value after commit: " + cache.get(id));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ada1b2a7/examples/src/main/java/org/apache/ignite/examples/datagrid/store/spring/package-info.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/datagrid/store/spring/package-info.java b/examples/src/main/java/org/apache/ignite/examples/datagrid/store/spring/package-info.java
new file mode 100644
index 0000000..211239f
--- /dev/null
+++ b/examples/src/main/java/org/apache/ignite/examples/datagrid/store/spring/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * <!-- Package description. -->
+ * Contains Spring-based cache store implementation.
+ */
+package org.apache.ignite.examples.datagrid.store.spring;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ada1b2a7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 5b57817..4457f98 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -567,7 +567,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
CacheConfiguration[] cfgs = ctx.config().getCacheConfiguration();
- sharedCtx = createSharedContext(ctx, CU.createStoreSessionListeners(ctx,
+ sharedCtx = createSharedContext(ctx, CU.startStoreSessionListeners(ctx,
ctx.config().getCacheStoreSessionListenerFactories()));
ctx.performance().add("Disable serializable transactions (set 'txSerializableEnabled' to false)",
@@ -813,6 +813,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
mgr.stop(cancel);
}
+ CU.stopStoreSessionListeners(ctx, sharedCtx.storeSessionListeners());
+
sharedCtx.cleanup();
if (log.isDebugEnabled())
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ada1b2a7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index 6968fcb..7096da5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -1795,13 +1795,14 @@ public class GridCacheUtils {
}
/**
- * Creates store session listeners.
+ * Creates and starts store session listeners.
*
* @param ctx Kernal context.
* @param factories Factories.
* @return Listeners.
+ * @throws IgniteCheckedException In case of error.
*/
- public static Collection<CacheStoreSessionListener> createStoreSessionListeners(GridKernalContext ctx,
+ public static Collection<CacheStoreSessionListener> startStoreSessionListeners(GridKernalContext ctx,
Factory<CacheStoreSessionListener>[] factories) throws IgniteCheckedException {
if (factories == null)
return null;
@@ -1823,4 +1824,24 @@ public class GridCacheUtils {
return lsnrs;
}
+
+ /**
+ * Stops store session listeners.
+ *
+ * @param ctx Kernal context.
+ * @param sesLsnrs Session listeners.
+ * @throws IgniteCheckedException In case of error.
+ */
+ public static void stopStoreSessionListeners(GridKernalContext ctx, Collection<CacheStoreSessionListener> sesLsnrs)
+ throws IgniteCheckedException {
+ if (sesLsnrs == null)
+ return;
+
+ for (CacheStoreSessionListener lsnr : sesLsnrs) {
+ if (lsnr instanceof LifecycleAware)
+ ((LifecycleAware)lsnr).stop();
+
+ ctx.resource().cleanupGeneric(lsnr);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ada1b2a7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
index 11d711c..bc5a0a2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
@@ -70,6 +70,9 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
/** */
private Collection<CacheStoreSessionListener> sesLsnrs;
+ /** */
+ private boolean globalSesLsnrs;
+
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public void initialize(@Nullable CacheStore cfgStore, Map sesHolders) throws IgniteCheckedException {
@@ -166,10 +169,13 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
"Persistence store is configured, but both read-through and write-through are disabled.");
}
- sesLsnrs = CU.createStoreSessionListeners(cctx.kernalContext(), cfg.getCacheStoreSessionListenerFactories());
+ sesLsnrs = CU.startStoreSessionListeners(cctx.kernalContext(), cfg.getCacheStoreSessionListenerFactories());
- if (sesLsnrs == null)
+ if (sesLsnrs == null) {
sesLsnrs = cctx.shared().storeSessionListeners();
+
+ globalSesLsnrs = true;
+ }
}
/** {@inheritDoc} */
@@ -187,18 +193,12 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
}
}
- if (sesLsnrs != null) {
- for (CacheStoreSessionListener lsnr : sesLsnrs) {
- if (lsnr instanceof LifecycleAware)
- ((LifecycleAware)lsnr).stop();
-
- try {
- cctx.kernalContext().resource().cleanupGeneric(lsnr);
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to remove injected resources from store session listener (ignoring): " +
- lsnr, e);
- }
+ if (!globalSesLsnrs) {
+ try {
+ CU.stopStoreSessionListeners(cctx.kernalContext(), sesLsnrs);
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to stop store session listeners for cache: " + cctx.name(), e);
}
}
}
@@ -721,7 +721,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
lsnr.onSessionEnd(locSes, commit);
}
- if (!sesHolder.get().storeEnded(store))
+ if (!sesHolder.get().ended(store))
store.sessionEnd(commit);
}
catch (Throwable e) {
@@ -788,13 +788,9 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
sesHolder.set(ses);
- if (!ses.started()) {
- if (sesLsnrs != null) {
- for (CacheStoreSessionListener lsnr : sesLsnrs)
- lsnr.onSessionStart(locSes);
- }
-
- ses.onStarted();
+ if (sesLsnrs != null && !ses.started(this)) {
+ for (CacheStoreSessionListener lsnr : sesLsnrs)
+ lsnr.onSessionStart(locSes);
}
}
@@ -809,7 +805,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
lsnr.onSessionEnd(locSes, !threwEx);
}
- assert !sesHolder.get().storeEnded(store);
+ assert !sesHolder.get().ended(store);
store.sessionEnd(!threwEx);
}
@@ -858,10 +854,11 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
private Object attachment;
/** */
- private boolean started;
+ private final Set<CacheStoreManager> started =
+ new GridSetWrapper<>(new IdentityHashMap<CacheStoreManager, Object>());
/** */
- private final Set<CacheStore> endedStores = new GridSetWrapper<>(new IdentityHashMap<CacheStore, Object>());
+ private final Set<CacheStore> ended = new GridSetWrapper<>(new IdentityHashMap<CacheStore, Object>());
/**
* @param tx Current transaction.
@@ -918,24 +915,18 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
}
/**
- */
- private void onStarted() {
- started = true;
- }
-
- /**
* @return If session is started.
*/
- private boolean started() {
- return started;
+ private boolean started(CacheStoreManager mgr) {
+ return !started.add(mgr);
}
/**
* @param store Cache store.
* @return Whether session already ended on this store instance.
*/
- private boolean storeEnded(CacheStore store) {
- return !endedStores.add(store);
+ private boolean ended(CacheStore store) {
+ return !ended.add(store);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ada1b2a7/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerLifeCycleSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerLifeCycleSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerLifeCycleSelfTest.java
new file mode 100644
index 0000000..814c8a5
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerLifeCycleSelfTest.java
@@ -0,0 +1,395 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store;
+
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.lifecycle.*;
+import org.apache.ignite.resources.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.junits.common.*;
+import org.apache.ignite.transactions.*;
+
+import javax.cache.*;
+import javax.cache.configuration.*;
+import javax.cache.integration.*;
+import java.util.*;
+import java.util.concurrent.*;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+
+/**
+ * Store session listeners test.
+ */
+public class CacheStoreSessionListenerLifecycleSelfTest extends GridCommonAbstractTest {
+ /** */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private static final Queue<String> evts = new ConcurrentLinkedDeque<>();
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.setCacheStoreSessionListenerFactories(
+ new SessionListenerFactory("Shared 1"),
+ new SessionListenerFactory("Shared 2")
+ );
+
+ TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+ disco.setIpFinder(IP_FINDER);
+
+ cfg.setDiscoverySpi(disco);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ evts.clear();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testNoCaches() throws Exception {
+ try {
+ startGrid();
+ }
+ finally {
+ stopGrid();
+ }
+
+ assertEqualsCollections(Arrays.asList("Shared 1 START", "Shared 2 START", "Shared 1 STOP", "Shared 2 STOP"),
+ evts);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testNoOverride() throws Exception {
+ try {
+ Ignite ignite = startGrid();
+
+ for (int i = 0; i < 2; i++) {
+ CacheConfiguration<Integer, Integer> cacheCfg = cacheConfiguration("cache-" + i);
+
+ cacheCfg.setAtomicityMode(TRANSACTIONAL);
+
+ ignite.createCache(cacheCfg);
+ }
+
+ ignite.cache("cache-0").put(1, 1);
+ ignite.cache("cache-1").put(1, 1);
+
+ try (Transaction tx = ignite.transactions().txStart()) {
+ ignite.cache("cache-0").put(2, 2);
+ ignite.cache("cache-0").put(3, 3);
+ ignite.cache("cache-1").put(2, 2);
+ ignite.cache("cache-1").put(3, 3);
+
+ tx.commit();
+ }
+ }
+ finally {
+ stopGrid();
+ }
+
+ assertEqualsCollections(Arrays.asList(
+ "Shared 1 START",
+ "Shared 2 START",
+
+ // Put to cache-0.
+ "Shared 1 SESSION START cache-0",
+ "Shared 2 SESSION START cache-0",
+ "Shared 1 SESSION END cache-0",
+ "Shared 2 SESSION END cache-0",
+
+ // Put to cache-1.
+ "Shared 1 SESSION START cache-1",
+ "Shared 2 SESSION START cache-1",
+ "Shared 1 SESSION END cache-1",
+ "Shared 2 SESSION END cache-1",
+
+ // Transaction.
+ "Shared 1 SESSION START cache-0",
+ "Shared 2 SESSION START cache-0",
+ "Shared 1 SESSION START cache-1",
+ "Shared 2 SESSION START cache-1",
+ "Shared 1 SESSION END cache-0",
+ "Shared 2 SESSION END cache-0",
+ "Shared 1 SESSION END cache-1",
+ "Shared 2 SESSION END cache-1",
+
+ "Shared 1 STOP",
+ "Shared 2 STOP"
+ ), evts);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPartialOverride() throws Exception {
+ try {
+ Ignite ignite = startGrid();
+
+ for (int i = 0; i < 2; i++) {
+ String name = "cache-" + i;
+
+ CacheConfiguration cacheCfg = cacheConfiguration(name);
+
+ cacheCfg.setAtomicityMode(TRANSACTIONAL);
+
+ if (i == 0) {
+ cacheCfg.setCacheStoreSessionListenerFactories(
+ new SessionListenerFactory(name + " 1"),
+ new SessionListenerFactory(name + " 2")
+ );
+ }
+
+ ignite.createCache(cacheCfg);
+ }
+
+ ignite.cache("cache-0").put(1, 1);
+ ignite.cache("cache-1").put(1, 1);
+
+ try (Transaction tx = ignite.transactions().txStart()) {
+ ignite.cache("cache-0").put(2, 2);
+ ignite.cache("cache-0").put(3, 3);
+ ignite.cache("cache-1").put(2, 2);
+ ignite.cache("cache-1").put(3, 3);
+
+ tx.commit();
+ }
+ }
+ finally {
+ stopGrid();
+ }
+
+ assertEqualsCollections(Arrays.asList(
+ "Shared 1 START",
+ "Shared 2 START",
+ "cache-0 1 START",
+ "cache-0 2 START",
+
+ // Put to cache-0.
+ "cache-0 1 SESSION START cache-0",
+ "cache-0 2 SESSION START cache-0",
+ "cache-0 1 SESSION END cache-0",
+ "cache-0 2 SESSION END cache-0",
+
+ // Put to cache-1.
+ "Shared 1 SESSION START cache-1",
+ "Shared 2 SESSION START cache-1",
+ "Shared 1 SESSION END cache-1",
+ "Shared 2 SESSION END cache-1",
+
+ // Transaction.
+ "cache-0 1 SESSION START cache-0",
+ "cache-0 2 SESSION START cache-0",
+ "Shared 1 SESSION START cache-1",
+ "Shared 2 SESSION START cache-1",
+ "cache-0 1 SESSION END cache-0",
+ "cache-0 2 SESSION END cache-0",
+ "Shared 1 SESSION END cache-1",
+ "Shared 2 SESSION END cache-1",
+
+ "cache-0 1 STOP",
+ "cache-0 2 STOP",
+ "Shared 1 STOP",
+ "Shared 2 STOP"
+ ), evts);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testOverride() throws Exception {
+ try {
+ Ignite ignite = startGrid();
+
+ for (int i = 0; i < 2; i++) {
+ String name = "cache-" + i;
+
+ CacheConfiguration cacheCfg = cacheConfiguration(name);
+
+ cacheCfg.setCacheStoreSessionListenerFactories(new SessionListenerFactory(name + " 1"), new SessionListenerFactory(name + " 2"));
+
+ ignite.createCache(cacheCfg);
+ }
+
+ ignite.cache("cache-0").put(1, 1);
+ ignite.cache("cache-1").put(1, 1);
+
+ try (Transaction tx = ignite.transactions().txStart()) {
+ ignite.cache("cache-0").put(2, 2);
+ ignite.cache("cache-0").put(3, 3);
+ ignite.cache("cache-1").put(2, 2);
+ ignite.cache("cache-1").put(3, 3);
+
+ tx.commit();
+ }
+ }
+ finally {
+ stopGrid();
+ }
+
+ assertEqualsCollections(Arrays.asList(
+ "Shared 1 START",
+ "Shared 2 START",
+ "cache-0 1 START",
+ "cache-0 2 START",
+ "cache-1 1 START",
+ "cache-1 2 START",
+
+ // Put to cache-0.
+ "cache-0 1 SESSION START cache-0",
+ "cache-0 2 SESSION START cache-0",
+ "cache-0 1 SESSION END cache-0",
+ "cache-0 2 SESSION END cache-0",
+
+ // Put to cache-1.
+ "cache-1 1 SESSION START cache-1",
+ "cache-1 2 SESSION START cache-1",
+ "cache-1 1 SESSION END cache-1",
+ "cache-1 2 SESSION END cache-1",
+
+ // Transaction.
+ "cache-0 1 SESSION START cache-0",
+ "cache-0 2 SESSION START cache-0",
+ "cache-1 1 SESSION START cache-1",
+ "cache-1 2 SESSION START cache-1",
+ "cache-0 1 SESSION END cache-0",
+ "cache-0 2 SESSION END cache-0",
+ "cache-1 1 SESSION END cache-1",
+ "cache-1 2 SESSION END cache-1",
+
+ "cache-0 1 STOP",
+ "cache-0 2 STOP",
+ "cache-1 1 STOP",
+ "cache-1 2 STOP",
+ "Shared 1 STOP",
+ "Shared 2 STOP"
+ ), evts);
+ }
+
+ /**
+ * @param name Cache name.
+ * @return Cache configuration.
+ */
+ private CacheConfiguration<Integer, Integer> cacheConfiguration(String name) {
+ CacheConfiguration<Integer, Integer> cacheCfg = new CacheConfiguration<>(name);
+
+ cacheCfg.setAtomicityMode(TRANSACTIONAL);
+ cacheCfg.setCacheStoreFactory(FactoryBuilder.factoryOf(Store.class));
+ cacheCfg.setWriteThrough(true);
+
+ return cacheCfg;
+ }
+
+ /**
+ */
+ private static class SessionListener implements CacheStoreSessionListener, LifecycleAware {
+ /** */
+ private final String name;
+
+ /** */
+ @IgniteInstanceResource
+ private Ignite ignite;
+
+ /**
+ * @param name Name.
+ */
+ private SessionListener(String name) {
+ this.name = name;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void start() throws IgniteException {
+ assertNotNull(ignite);
+
+ evts.add(name + " START");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void stop() throws IgniteException {
+ assertNotNull(ignite);
+
+ evts.add(name + " STOP");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onSessionStart(CacheStoreSession ses) {
+ assertNotNull(ignite);
+
+ evts.add(name + " SESSION START " + ses.cacheName());
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onSessionEnd(CacheStoreSession ses, boolean commit) {
+ assertNotNull(ignite);
+
+ evts.add(name + " SESSION END " + ses.cacheName());
+ }
+ }
+
+ /**
+ */
+ private static class SessionListenerFactory implements Factory<CacheStoreSessionListener> {
+ /** */
+ private String name;
+
+ /**
+ * @param name Name.
+ */
+ private SessionListenerFactory(String name) {
+ this.name = name;
+ }
+
+ @Override public CacheStoreSessionListener create() {
+ return new SessionListener(name);
+ }
+ }
+
+ /**
+ */
+ public static class Store extends CacheStoreAdapter<Integer, Integer> {
+ public Store() {
+ }
+
+ /** {@inheritDoc} */
+ @Override public Integer load(Integer key) throws CacheLoaderException {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void write(Cache.Entry<? extends Integer, ? extends Integer> entry)
+ throws CacheWriterException {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void delete(Object key) throws CacheWriterException {
+ // No-op.
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ada1b2a7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java
index f72ea47..f2de8ce 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java
@@ -266,30 +266,6 @@ public class IgniteCrossCacheTxStoreSelfTest extends GridCommonAbstractTest {
}
/**
- * @param col1 Collection 1.
- * @param col2 Collection 2.
- */
- private static void assertEqualsCollections(Collection<?> col1, Collection<?> col2) {
- if (col1.size() != col2.size())
- fail("Collections are not equal:\nExpected:\t" + col1 + "\nActual:\t" + col2);
-
- Iterator<?> it1 = col1.iterator();
- Iterator<?> it2 = col2.iterator();
-
- int idx = 0;
-
- while (it1.hasNext()) {
- Object item1 = it1.next();
- Object item2 = it2.next();
-
- if (!F.eq(item1, item2))
- fail("Collections are not equal (position " + idx + "):\nExpected: " + col1 + "\nActual: " + col2);
-
- idx++;
- }
- }
-
- /**
*
*/
private static class TestStore implements CacheStore<Object, Object> {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ada1b2a7/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
index 5533897..a19ea23 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
@@ -858,4 +858,28 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
ccfg.getAtomicWriteOrderMode() == CacheAtomicWriteOrderMode.CLOCK)
U.sleep(50);
}
+
+ /**
+ * @param exp Expected.
+ * @param act Actual.
+ */
+ protected void assertEqualsCollections(Collection<?> exp, Collection<?> act) {
+ if (exp.size() != act.size())
+ fail("Collections are not equal:\nExpected:\t" + exp + "\nActual:\t" + act);
+
+ Iterator<?> it1 = exp.iterator();
+ Iterator<?> it2 = act.iterator();
+
+ int idx = 0;
+
+ while (it1.hasNext()) {
+ Object item1 = it1.next();
+ Object item2 = it2.next();
+
+ if (!F.eq(item1, item2))
+ fail("Collections are not equal (position " + idx + "):\nExpected: " + exp + "\nActual: " + act);
+
+ idx++;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ada1b2a7/modules/spring/src/main/java/org/apache/ignite/cache/store/spring/CacheSpringStoreSessionListener.java
----------------------------------------------------------------------
diff --git a/modules/spring/src/main/java/org/apache/ignite/cache/store/spring/CacheSpringStoreSessionListener.java b/modules/spring/src/main/java/org/apache/ignite/cache/store/spring/CacheSpringStoreSessionListener.java
index 81736cd..90431d7 100644
--- a/modules/spring/src/main/java/org/apache/ignite/cache/store/spring/CacheSpringStoreSessionListener.java
+++ b/modules/spring/src/main/java/org/apache/ignite/cache/store/spring/CacheSpringStoreSessionListener.java
@@ -154,7 +154,7 @@ public class CacheSpringStoreSessionListener implements CacheStoreSessionListene
/** {@inheritDoc} */
@Override public void onSessionStart(CacheStoreSession ses) {
- if (ses.isWithinTransaction()) {
+ if (ses.isWithinTransaction() && ses.attachment() == null) {
try {
TransactionDefinition def = definition(ses.transaction(), ses.cacheName());