You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/02 12:37:54 UTC
[24/41] incubator-ignite git commit: IGNITE-891 - Cache store
improvements
IGNITE-891 - Cache store improvements
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/990bf9e3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/990bf9e3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/990bf9e3
Branch: refs/heads/ignite-218
Commit: 990bf9e38b405e97d0be1891be7680423208b279
Parents: b5b4523
Author: Valentin Kulichenko <vk...@gridgain.com>
Authored: Fri May 22 18:49:04 2015 -0700
Committer: Valentin Kulichenko <vk...@gridgain.com>
Committed: Fri May 22 18:49:04 2015 -0700
----------------------------------------------------------------------
.../hibernate/CacheHibernatePersonStore.java | 183 +--------------
.../hibernate/CacheHibernateStoreExample.java | 16 ++
.../store/jdbc/CacheJdbcPersonStore.java | 18 +-
.../store/jdbc/CacheJdbcStoreExample.java | 2 +-
.../ignite/cache/store/CacheStoreSession.java | 5 +
.../cache/store/CacheStoreSessionListener.java | 10 +-
.../jdbc/CacheJdbcStoreSessionListener.java | 143 +++++++++++
.../jdbc/CacheStoreSessionJdbcListener.java | 148 ------------
.../store/GridCacheStoreManagerAdapter.java | 32 +++
.../CacheJdbcStoreSessionListenerSelfTest.java | 175 ++++++++++++++
.../CacheStoreSessionJdbcListenerSelfTest.java | 182 --------------
.../junits/cache/TestCacheSession.java | 14 ++
.../cache/TestThreadLocalCacheSession.java | 16 ++
.../testsuites/IgniteCacheTestSuite4.java | 2 +-
.../CacheHibernateStoreSessionListener.java | 218 +++++++++++++++++
.../CacheStoreSessionHibernateListener.java | 223 ------------------
...heHibernateStoreSessionListenerSelfTest.java | 228 ++++++++++++++++++
...heStoreSessionHibernateListenerSelfTest.java | 235 -------------------
.../testsuites/IgniteHibernateTestSuite.java | 2 +-
.../spring/CacheSpringStoreSessionListener.java | 233 ++++++++++++++++++
.../spring/CacheStoreSessionSpringListener.java | 235 -------------------
...CacheSpringStoreSessionListenerSelfTest.java | 197 ++++++++++++++++
...CacheStoreSessionSpringListenerSelfTest.java | 204 ----------------
.../testsuites/IgniteSpringTestSuite.java | 2 +-
24 files changed, 1303 insertions(+), 1420 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/990bf9e3/examples/src/main/java/org/apache/ignite/examples/datagrid/store/hibernate/CacheHibernatePersonStore.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/datagrid/store/hibernate/CacheHibernatePersonStore.java b/examples/src/main/java/org/apache/ignite/examples/datagrid/store/hibernate/CacheHibernatePersonStore.java
index 577301c..557ec6f 100644
--- a/examples/src/main/java/org/apache/ignite/examples/datagrid/store/hibernate/CacheHibernatePersonStore.java
+++ b/examples/src/main/java/org/apache/ignite/examples/datagrid/store/hibernate/CacheHibernatePersonStore.java
@@ -21,10 +21,7 @@ import org.apache.ignite.cache.store.*;
import org.apache.ignite.examples.datagrid.store.*;
import org.apache.ignite.lang.*;
import org.apache.ignite.resources.*;
-import org.apache.ignite.transactions.Transaction;
import org.hibernate.*;
-import org.hibernate.cfg.*;
-import org.jetbrains.annotations.*;
import javax.cache.integration.*;
import java.util.*;
@@ -34,57 +31,30 @@ import java.util.*;
* and deals with maps {@link UUID} to {@link Person}.
*/
public class CacheHibernatePersonStore extends CacheStoreAdapter<Long, Person> {
- /** Default hibernate configuration resource path. */
- private static final String DFLT_HIBERNATE_CFG = "/org/apache/ignite/examples/datagrid/store/hibernate" +
- "/hibernate.cfg.xml";
-
- /** Session attribute name. */
- private static final String ATTR_SES = "HIBERNATE_STORE_SESSION";
-
- /** Session factory. */
- private SessionFactory sesFactory;
-
/** Auto-injected store session. */
@CacheStoreSessionResource
private CacheStoreSession ses;
- /**
- * Default constructor.
- */
- public CacheHibernatePersonStore() {
- sesFactory = new Configuration().configure(DFLT_HIBERNATE_CFG).buildSessionFactory();
- }
-
/** {@inheritDoc} */
@Override public Person load(Long key) {
- Transaction tx = transaction();
-
- System.out.println(">>> Store load [key=" + key + ", xid=" + (tx == null ? null : tx.xid()) + ']');
+ System.out.println(">>> Store load [key=" + key + ']');
- Session ses = session(tx);
+ Session hibSes = ses.attachment();
try {
- return (Person) ses.get(Person.class, key);
+ return (Person)hibSes.get(Person.class, key);
}
catch (HibernateException e) {
- rollback(ses, tx);
-
- throw new CacheLoaderException("Failed to load value from cache store with key: " + key, e);
- }
- finally {
- end(ses, tx);
+ throw new CacheLoaderException("Failed to load value from cache store [key=" + key + ']', e);
}
}
/** {@inheritDoc} */
@Override public void write(javax.cache.Cache.Entry<? extends Long, ? extends Person> entry) {
- Transaction tx = transaction();
-
Long key = entry.getKey();
-
Person val = entry.getValue();
- System.out.println(">>> Store put [key=" + key + ", val=" + val + ", xid=" + (tx == null ? null : tx.xid()) + ']');
+ System.out.println(">>> Store put [key=" + key + ", val=" + val + ']');
if (val == null) {
delete(key);
@@ -92,41 +62,29 @@ public class CacheHibernatePersonStore extends CacheStoreAdapter<Long, Person> {
return;
}
- Session ses = session(tx);
+ Session hibSes = ses.attachment();
try {
- ses.saveOrUpdate(val);
+ hibSes.saveOrUpdate(val);
}
catch (HibernateException e) {
- rollback(ses, tx);
-
throw new CacheWriterException("Failed to put value to cache store [key=" + key + ", val" + val + "]", e);
}
- finally {
- end(ses, tx);
- }
}
/** {@inheritDoc} */
@SuppressWarnings({"JpaQueryApiInspection"})
@Override public void delete(Object key) {
- Transaction tx = transaction();
-
- System.out.println(">>> Store remove [key=" + key + ", xid=" + (tx == null ? null : tx.xid()) + ']');
+ System.out.println(">>> Store remove [key=" + key + ']');
- Session ses = session(tx);
+ Session hibSes = ses.attachment();
try {
- ses.createQuery("delete " + Person.class.getSimpleName() + " where key = :key")
+ hibSes.createQuery("delete " + Person.class.getSimpleName() + " where key = :key")
.setParameter("key", key).setFlushMode(FlushMode.ALWAYS).executeUpdate();
}
catch (HibernateException e) {
- rollback(ses, tx);
-
- throw new CacheWriterException("Failed to remove value from cache store with key: " + key, e);
- }
- finally {
- end(ses, tx);
+ throw new CacheWriterException("Failed to remove value from cache store [key=" + key + ']', e);
}
}
@@ -137,12 +95,12 @@ public class CacheHibernatePersonStore extends CacheStoreAdapter<Long, Person> {
final int entryCnt = (Integer)args[0];
- Session ses = session(null);
+ Session hibSes = ses.attachment();
try {
int cnt = 0;
- List res = ses.createCriteria(Person.class).list();
+ List res = hibSes.createCriteria(Person.class).list();
if (res != null) {
Iterator iter = res.iterator();
@@ -161,120 +119,5 @@ public class CacheHibernatePersonStore extends CacheStoreAdapter<Long, Person> {
catch (HibernateException e) {
throw new CacheLoaderException("Failed to load values from cache store.", e);
}
- finally {
- end(ses, null);
- }
- }
-
- /**
- * Rolls back hibernate session.
- *
- * @param ses Hibernate session.
- * @param tx Cache ongoing transaction.
- */
- private void rollback(Session ses, Transaction tx) {
- // Rollback only if there is no cache transaction,
- // otherwise sessionEnd() will do all required work.
- if (tx == null) {
- org.hibernate.Transaction hTx = ses.getTransaction();
-
- if (hTx != null && hTx.isActive())
- hTx.rollback();
- }
- }
-
- /**
- * Ends hibernate session.
- *
- * @param ses Hibernate session.
- * @param tx Cache ongoing transaction.
- */
- private void end(Session ses, @Nullable Transaction tx) {
- // Commit only if there is no cache transaction,
- // otherwise sessionEnd() will do all required work.
- if (tx == null) {
- org.hibernate.Transaction hTx = ses.getTransaction();
-
- if (hTx != null && hTx.isActive())
- hTx.commit();
-
- ses.close();
- }
- }
-
- /** {@inheritDoc} */
- @Override public void sessionEnd(boolean commit) {
- Transaction tx = ses.transaction();
-
- Map<String, Session> props = ses.properties();
-
- Session ses = props.remove(ATTR_SES);
-
- if (ses != null) {
- org.hibernate.Transaction hTx = ses.getTransaction();
-
- if (hTx != null) {
- try {
- if (commit) {
- ses.flush();
-
- hTx.commit();
- }
- else
- hTx.rollback();
-
- System.out.println("Transaction ended [xid=" + tx.xid() + ", commit=" + commit + ']');
- }
- catch (HibernateException e) {
- throw new CacheWriterException("Failed to end transaction [xid=" + tx.xid() +
- ", commit=" + commit + ']', e);
- }
- finally {
- ses.close();
- }
- }
- }
- }
-
- /**
- * Gets Hibernate session.
- *
- * @param tx Cache transaction.
- * @return Session.
- */
- private Session session(@Nullable Transaction tx) {
- Session hbSes;
-
- if (tx != null) {
- Map<String, Session> props = ses.properties();
-
- hbSes = props.get(ATTR_SES);
-
- if (hbSes == null) {
- hbSes = sesFactory.openSession();
-
- hbSes.beginTransaction();
-
- // Store session in session properties, so it can be accessed
- // for other operations on the same transaction.
- props.put(ATTR_SES, hbSes);
-
- System.out.println("Hibernate session open [ses=" + hbSes + ", tx=" + tx.xid() + "]");
- }
- }
- else {
- hbSes = sesFactory.openSession();
-
- hbSes.beginTransaction();
- }
-
- return hbSes;
- }
-
- /**
- * @return Current transaction.
- */
- @Nullable private Transaction transaction() {
- return ses != null ? ses.transaction() : null;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/990bf9e3/examples/src/main/java/org/apache/ignite/examples/datagrid/store/hibernate/CacheHibernateStoreExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/datagrid/store/hibernate/CacheHibernateStoreExample.java b/examples/src/main/java/org/apache/ignite/examples/datagrid/store/hibernate/CacheHibernateStoreExample.java
index 5a0cd0a..ac19072 100644
--- a/examples/src/main/java/org/apache/ignite/examples/datagrid/store/hibernate/CacheHibernateStoreExample.java
+++ b/examples/src/main/java/org/apache/ignite/examples/datagrid/store/hibernate/CacheHibernateStoreExample.java
@@ -18,6 +18,8 @@
package org.apache.ignite.examples.datagrid.store.hibernate;
import org.apache.ignite.*;
+import org.apache.ignite.cache.store.*;
+import org.apache.ignite.cache.store.hibernate.*;
import org.apache.ignite.configuration.*;
import org.apache.ignite.examples.*;
import org.apache.ignite.examples.datagrid.store.*;
@@ -37,6 +39,10 @@ import static org.apache.ignite.cache.CacheAtomicityMode.*;
* start node with {@code examples/config/example-ignite.xml} configuration.
*/
public class CacheHibernateStoreExample {
+ /** Hibernate configuration resource path. */
+ private static final String HIBERNATE_CFG =
+ "/org/apache/ignite/examples/datagrid/store/hibernate/hibernate.cfg.xml";
+
/** Cache name. */
private static final String CACHE_NAME = CacheHibernateStoreExample.class.getSimpleName();
@@ -71,6 +77,16 @@ public class CacheHibernateStoreExample {
// Configure Hibernate store.
cacheCfg.setCacheStoreFactory(FactoryBuilder.factoryOf(CacheHibernatePersonStore.class));
+ cacheCfg.setCacheStoreSessionListenerFactories(new Factory<CacheStoreSessionListener>() {
+ @Override public CacheStoreSessionListener create() {
+ CacheHibernateStoreSessionListener lsnr = new CacheHibernateStoreSessionListener();
+
+ lsnr.setHibernateConfigurationPath(HIBERNATE_CFG);
+
+ return lsnr;
+ }
+ });
+
cacheCfg.setReadThrough(true);
cacheCfg.setWriteThrough(true);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/990bf9e3/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/CacheJdbcPersonStore.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/CacheJdbcPersonStore.java b/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/CacheJdbcPersonStore.java
index 856512b..6eb0386 100644
--- a/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/CacheJdbcPersonStore.java
+++ b/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/CacheJdbcPersonStore.java
@@ -19,7 +19,6 @@ package org.apache.ignite.examples.datagrid.store.jdbc;
import org.apache.ignite.*;
import org.apache.ignite.cache.store.*;
-import org.apache.ignite.cache.store.jdbc.*;
import org.apache.ignite.examples.datagrid.store.*;
import org.apache.ignite.lang.*;
import org.apache.ignite.resources.*;
@@ -70,7 +69,7 @@ public class CacheJdbcPersonStore extends CacheStoreAdapter<Long, Person> {
System.out.println(">>> Loading key: " + key);
try {
- Connection conn = connection();
+ Connection conn = ses.attachment();
try (PreparedStatement st = conn.prepareStatement("select * from PERSONS where id=?")) {
st.setString(1, key.toString());
@@ -97,7 +96,7 @@ public class CacheJdbcPersonStore extends CacheStoreAdapter<Long, Person> {
System.out.println(">>> Putting [key=" + key + ", val=" + val + ']');
try {
- Connection conn = connection();
+ Connection conn = ses.attachment();
int updated;
@@ -134,7 +133,7 @@ public class CacheJdbcPersonStore extends CacheStoreAdapter<Long, Person> {
System.out.println(">>> Removing key: " + key);
try {
- Connection conn = connection();
+ Connection conn = ses.attachment();
try (PreparedStatement st = conn.prepareStatement("delete from PERSONS where id=?")) {
st.setLong(1, (Long)key);
@@ -154,7 +153,7 @@ public class CacheJdbcPersonStore extends CacheStoreAdapter<Long, Person> {
final int entryCnt = (Integer)args[0];
- Connection conn = connection();
+ Connection conn = ses.attachment();
try (
PreparedStatement st = conn.prepareStatement("select * from PERSONS");
@@ -176,13 +175,4 @@ public class CacheJdbcPersonStore extends CacheStoreAdapter<Long, Person> {
throw new CacheLoaderException("Failed to load values from cache store.", e);
}
}
-
- /**
- * Gets JDBC connection attached to current session.
- *
- * @return Connection.
- */
- private Connection connection() {
- return ses.<String, Connection>properties().get(CacheStoreSessionJdbcListener.JDBC_CONN_KEY);
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/990bf9e3/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/CacheJdbcStoreExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/CacheJdbcStoreExample.java b/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/CacheJdbcStoreExample.java
index 82e1079..74e262c 100644
--- a/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/CacheJdbcStoreExample.java
+++ b/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/CacheJdbcStoreExample.java
@@ -77,7 +77,7 @@ public class CacheJdbcStoreExample {
// Configure JDBC session listener.
cacheCfg.setCacheStoreSessionListenerFactories(new Factory<CacheStoreSessionListener>() {
@Override public CacheStoreSessionListener create() {
- CacheStoreSessionJdbcListener lsnr = new CacheStoreSessionJdbcListener();
+ CacheJdbcStoreSessionListener lsnr = new CacheJdbcStoreSessionListener();
lsnr.setDataSource(JdbcConnectionPool.create("jdbc:h2:mem:example;DB_CLOSE_DELAY=-1", "", ""));
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/990bf9e3/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreSession.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreSession.java b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreSession.java
index 640d4a3..980c6df 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreSession.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreSession.java
@@ -19,6 +19,7 @@ package org.apache.ignite.cache.store;
import org.apache.ignite.resources.*;
import org.apache.ignite.transactions.*;
+import org.jetbrains.annotations.*;
import java.util.*;
@@ -51,6 +52,10 @@ public interface CacheStoreSession {
*/
public boolean isWithinTransaction();
+ public void attach(@Nullable Object attachment);
+
+ @Nullable public <T> T attachment();
+
/**
* Gets current session properties. You can add properties directly to the
* returned map.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/990bf9e3/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreSessionListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreSessionListener.java b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreSessionListener.java
index 8b7cd8f..b319e55 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreSessionListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreSessionListener.java
@@ -52,20 +52,20 @@ import javax.sql.*;
* details):
* <ul>
* <li>
- * {@link CacheStoreSessionJdbcListener} - JDBC-based session
+ * {@link CacheJdbcStoreSessionListener} - JDBC-based session
* listener. For each session it gets a new JDBC connection from
* provided {@link DataSource} and commits (or rolls back) it
* when session ends.
* </li>
* <li>
- * {@ignitelink org.apache.ignite.cache.store.spring.CacheStoreSessionSpringListener} -
+ * {@ignitelink org.apache.ignite.cache.store.spring.CacheSpringStoreSessionListener} -
* session listener based on Spring transaction management.
* It starts a new DB transaction for each session and commits
* (or rolls back) it when session ends. If there is no ongoing
* cache transaction, this listener is no-op.
* </li>
* <li>
- * <@ignitelink org.apache.ignite.cache.store.hibernate.CacheStoreSessionHibernateListener} -
+ * <@ignitelink org.apache.ignite.cache.store.hibernate.CacheHibernateStoreSessionListener} -
* Hibernate-based session listener. It creates a new Hibernate
* session for each Ignite session. If there is an ongoing cache
* transaction, a corresponding Hibernate transaction is created
@@ -89,7 +89,7 @@ import javax.sql.*;
* cache participates in transaction.
* </li>
* </ul>
- * For example, here is how global {@link CacheStoreSessionJdbcListener}
+ * For example, here is how global {@link CacheJdbcStoreSessionListener}
* can be configured in Spring XML configuration file:
* <pre name="code" class="xml">
* <bean class="org.apache.ignite.configuration.IgniteConfiguration">
@@ -99,7 +99,7 @@ import javax.sql.*;
* <list>
* <bean class="javax.cache.configuration.FactoryBuilder$SingletonFactory">
* <constructor-arg>
- * <bean class="org.apache.ignite.cache.store.jdbc.CacheStoreSessionJdbcListener">
+ * <bean class="org.apache.ignite.cache.store.jdbc.CacheJdbcStoreSessionListener">
* <!-- Inject external data source. -->
* <property name="dataSource" ref="jdbc-data-source"/>
* </bean>
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/990bf9e3/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcStoreSessionListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcStoreSessionListener.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcStoreSessionListener.java
new file mode 100644
index 0000000..c59e86f
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcStoreSessionListener.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.jdbc;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.store.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lifecycle.*;
+
+import javax.cache.*;
+import javax.cache.integration.*;
+import javax.sql.*;
+import java.sql.*;
+
+/**
+ * Cache store session listener based on JDBC connection.
+ * <p>
+ * For each session this listener gets a new JDBC connection
+ * from provided {@link DataSource} and commits (or rolls
+ * back) it when session ends.
+ * <p>
+ * The connection is saved as a store session
+ * {@link CacheStoreSession#attachment() attachment}.
+ * The listener guarantees that the connection will be
+ * available for any store operation. If there is an
+ * ongoing cache transaction, all operations within this
+ * transaction will be committed or rolled back only when
+ * session ends.
+ * <p>
+ * As an example, here is how the {@link CacheStore#write(Cache.Entry)}
+ * method can be implemented if {@link CacheJdbcStoreSessionListener}
+ * is configured:
+ * <pre name="code" class="java">
+ * private static class Store extends CacheStoreAdapter<Integer, Integer> {
+ * @CacheStoreSessionResource
+ * private CacheStoreSession ses;
+ *
+ * @Override public void write(Cache.Entry<? extends Integer, ? extends Integer> entry) throws CacheWriterException {
+ * // Get connection from the current session.
+ * Connection conn = ses.attachment();
+ *
+ * // Execute update SQL query.
+ * try {
+ * conn.createStatement().executeUpdate("...");
+ * }
+ * catch (SQLException e) {
+ * throw new CacheWriterException("Failed to update the store.", e);
+ * }
+ * }
+ * }
+ * </pre>
+ * JDBC connection will be automatically created by the listener
+ * at the start of the session and closed when it ends.
+ */
+public class CacheJdbcStoreSessionListener implements CacheStoreSessionListener, LifecycleAware {
+ /** Data source. */
+ private DataSource dataSrc;
+
+ /**
+ * Sets data source.
+ * <p>
+ * This is a required parameter. If data source is not set,
+ * exception will be thrown on startup.
+ *
+ * @param dataSrc Data source.
+ */
+ public void setDataSource(DataSource dataSrc) {
+ this.dataSrc = dataSrc;
+ }
+
+ /**
+ * Gets data source.
+ *
+ * @return Data source.
+ */
+ public DataSource getDataSource() {
+ return dataSrc;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void start() throws IgniteException {
+ if (dataSrc == null)
+ throw new IgniteException("Data source is required by " + getClass().getSimpleName() + '.');
+ }
+
+ /** {@inheritDoc} */
+ @Override public void stop() throws IgniteException {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onSessionStart(CacheStoreSession ses) {
+ if (ses.attachment() == null) {
+ try {
+ Connection conn = dataSrc.getConnection();
+
+ conn.setAutoCommit(false);
+
+ ses.attach(conn);
+ }
+ catch (SQLException e) {
+ throw new CacheWriterException("Failed to start store session [tx=" + ses.transaction() + ']', e);
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onSessionEnd(CacheStoreSession ses, boolean commit) {
+ Connection conn = ses.attachment();
+
+ if (conn != null) {
+ ses.attach(null);
+
+ try {
+ if (commit)
+ conn.commit();
+ else
+ conn.rollback();
+ }
+ catch (SQLException e) {
+ throw new CacheWriterException("Failed to start store session [tx=" + ses.transaction() + ']', e);
+ }
+ finally {
+ U.closeQuiet(conn);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/990bf9e3/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheStoreSessionJdbcListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheStoreSessionJdbcListener.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheStoreSessionJdbcListener.java
deleted file mode 100644
index 7920fae..0000000
--- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheStoreSessionJdbcListener.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.cache.store.jdbc;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cache.store.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lifecycle.*;
-
-import javax.cache.*;
-import javax.cache.integration.*;
-import javax.sql.*;
-import java.sql.*;
-import java.util.*;
-
-/**
- * Cache store session listener based on JDBC connection.
- * <p>
- * For each session this listener gets a new JDBC connection
- * from provided {@link DataSource} and commits (or rolls
- * back) it when session ends.
- * <p>
- * The connection is stored in store session
- * {@link CacheStoreSession#properties() properties} and can
- * be accessed at any moment by {@link #JDBC_CONN_KEY} key.
- * The listener guarantees that the connection will be
- * available for any store operation. If there is an
- * ongoing cache transaction, all operations within this
- * transaction will be committed or rolled back only when
- * session ends.
- * <p>
- * As an example, here is how the {@link CacheStore#write(Cache.Entry)}
- * method can be implemented if {@link CacheStoreSessionJdbcListener}
- * is configured:
- * <pre name="code" class="java">
- * private static class Store extends CacheStoreAdapter<Integer, Integer> {
- * @CacheStoreSessionResource
- * private CacheStoreSession ses;
- *
- * @Override public void write(Cache.Entry<? extends Integer, ? extends Integer> entry) throws CacheWriterException {
- * // Get connection from the current session.
- * Connection conn = ses.<String, Connection>properties().get(CacheStoreSessionJdbcListener.JDBC_CONN_KEY);
- *
- * // Execute update SQL query.
- * try {
- * conn.createStatement().executeUpdate("...");
- * }
- * catch (SQLException e) {
- * throw new CacheWriterException("Failed to update the store.", e);
- * }
- * }
- * }
- * </pre>
- * JDBC connection will be automatically created by the listener
- * at the start of the session and closed when it ends.
- */
-public class CacheStoreSessionJdbcListener implements CacheStoreSessionListener, LifecycleAware {
- /** Session key for JDBC connection. */
- public static final String JDBC_CONN_KEY = "__jdbc_conn_";
-
- /** Data source. */
- private DataSource dataSrc;
-
- /**
- * Sets data source.
- * <p>
- * This is a required parameter. If data source is not set,
- * exception will be thrown on startup.
- *
- * @param dataSrc Data source.
- */
- public void setDataSource(DataSource dataSrc) {
- this.dataSrc = dataSrc;
- }
-
- /**
- * Gets data source.
- *
- * @return Data source.
- */
- public DataSource getDataSource() {
- return dataSrc;
- }
-
- /** {@inheritDoc} */
- @Override public void start() throws IgniteException {
- if (dataSrc == null)
- throw new IgniteException("Data source is required by " + getClass().getSimpleName() + '.');
- }
-
- /** {@inheritDoc} */
- @Override public void stop() throws IgniteException {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public void onSessionStart(CacheStoreSession ses) {
- Map<String, Connection> props = ses.properties();
-
- if (!props.containsKey(JDBC_CONN_KEY)) {
- try {
- Connection conn = dataSrc.getConnection();
-
- conn.setAutoCommit(false);
-
- props.put(JDBC_CONN_KEY, conn);
- }
- catch (SQLException e) {
- throw new CacheWriterException("Failed to start store session [tx=" + ses.transaction() + ']', e);
- }
- }
- }
-
- /** {@inheritDoc} */
- @Override public void onSessionEnd(CacheStoreSession ses, boolean commit) {
- Connection conn = ses.<String, Connection>properties().remove(JDBC_CONN_KEY);
-
- if (conn != null) {
- try {
- if (commit)
- conn.commit();
- else
- conn.rollback();
- }
- catch (SQLException e) {
- throw new CacheWriterException("Failed to start store session [tx=" + ses.transaction() + ']', e);
- }
- finally {
- U.closeQuiet(conn);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/990bf9e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
index 8096291..11d711c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
@@ -855,6 +855,9 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
private Map<Object, Object> props;
/** */
+ private Object attachment;
+
+ /** */
private boolean started;
/** */
@@ -887,6 +890,20 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
}
/**
+ * @param attachment Attachment.
+ */
+ private void attach(Object attachment) {
+ this.attachment = attachment;
+ }
+
+ /**
+ * @return Attachment.
+ */
+ private Object attachment() {
+ return attachment;
+ }
+
+ /**
* @return Cache name.
*/
private String cacheName() {
@@ -954,6 +971,21 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
}
/** {@inheritDoc} */
+ @Override public void attach(@Nullable Object attachment) {
+ SessionData ses0 = sesHolder.get();
+
+ if (ses0 != null)
+ ses0.attach(attachment);
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public <T> T attachment() {
+ SessionData ses0 = sesHolder.get();
+
+ return ses0 != null ? (T)ses0.attachment() : null;
+ }
+
+ /** {@inheritDoc} */
@Override public <K1, V1> Map<K1, V1> properties() {
SessionData ses0 = sesHolder.get();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/990bf9e3/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcStoreSessionListenerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcStoreSessionListenerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcStoreSessionListenerSelfTest.java
new file mode 100644
index 0000000..64af249
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcStoreSessionListenerSelfTest.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.jdbc;
+
+import org.apache.ignite.cache.store.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.resources.*;
+import org.h2.jdbcx.*;
+
+import javax.cache.*;
+import javax.cache.configuration.*;
+import javax.cache.integration.*;
+import java.sql.*;
+import java.util.*;
+
+/**
+ * Tests for {@link CacheJdbcStoreSessionListener}.
+ */
+public class CacheJdbcStoreSessionListenerSelfTest extends CacheStoreSessionListenerAbstractSelfTest {
+ /** {@inheritDoc} */
+ @Override protected Factory<? extends CacheStore<Integer, Integer>> storeFactory() {
+ return new Factory<CacheStore<Integer, Integer>>() {
+ @Override public CacheStore<Integer, Integer> create() {
+ return new Store();
+ }
+ };
+ }
+
+ /** {@inheritDoc} */
+ @Override protected Factory<CacheStoreSessionListener> sessionListenerFactory() {
+ return new Factory<CacheStoreSessionListener>() {
+ @Override public CacheStoreSessionListener create() {
+ CacheJdbcStoreSessionListener lsnr = new CacheJdbcStoreSessionListener();
+
+ lsnr.setDataSource(JdbcConnectionPool.create(URL, "", ""));
+
+ return lsnr;
+ }
+ };
+ }
+
+ /**
+ */
+ private static class Store extends CacheStoreAdapter<Integer, Integer> {
+ /** */
+ private static String SES_CONN_KEY = "ses_conn";
+
+ /** */
+ @CacheStoreSessionResource
+ private CacheStoreSession ses;
+
+ /** {@inheritDoc} */
+ @Override public void loadCache(IgniteBiInClosure<Integer, Integer> clo, Object... args) {
+ loadCacheCnt.incrementAndGet();
+
+ checkConnection();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Integer load(Integer key) throws CacheLoaderException {
+ loadCnt.incrementAndGet();
+
+ checkConnection();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void write(Cache.Entry<? extends Integer, ? extends Integer> entry)
+ throws CacheWriterException {
+ writeCnt.incrementAndGet();
+
+ checkConnection();
+
+ if (write.get()) {
+ Connection conn = ses.attachment();
+
+ try {
+ String table;
+
+ switch (ses.cacheName()) {
+ case "cache1":
+ table = "Table1";
+
+ break;
+
+ case "cache2":
+ if (fail.get())
+ throw new CacheWriterException("Expected failure.");
+
+ table = "Table2";
+
+ break;
+
+ default:
+ throw new CacheWriterException("Wring cache: " + ses.cacheName());
+ }
+
+ PreparedStatement stmt = conn.prepareStatement(
+ "INSERT INTO " + table + " (key, value) VALUES (?, ?)");
+
+ stmt.setInt(1, entry.getKey());
+ stmt.setInt(2, entry.getValue());
+
+ stmt.executeUpdate();
+ }
+ catch (SQLException e) {
+ throw new CacheWriterException(e);
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void delete(Object key) throws CacheWriterException {
+ deleteCnt.incrementAndGet();
+
+ checkConnection();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void sessionEnd(boolean commit) {
+ assertNull(ses.attachment());
+ }
+
+ /**
+ */
+ private void checkConnection() {
+ Connection conn = ses.attachment();
+
+ assertNotNull(conn);
+
+ try {
+ assertFalse(conn.isClosed());
+ assertFalse(conn.getAutoCommit());
+ }
+ catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+
+ verifySameInstance(conn);
+ }
+
+ /**
+ * @param conn Connection.
+ */
+ private void verifySameInstance(Connection conn) {
+ Map<String, Connection> props = ses.properties();
+
+ Connection sesConn = props.get(SES_CONN_KEY);
+
+ if (sesConn == null)
+ props.put(SES_CONN_KEY, conn);
+ else {
+ assertSame(conn, sesConn);
+
+ reuseCnt.incrementAndGet();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/990bf9e3/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheStoreSessionJdbcListenerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheStoreSessionJdbcListenerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheStoreSessionJdbcListenerSelfTest.java
deleted file mode 100644
index e4dac88..0000000
--- a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheStoreSessionJdbcListenerSelfTest.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.cache.store.jdbc;
-
-import org.apache.ignite.cache.store.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.resources.*;
-import org.h2.jdbcx.*;
-
-import javax.cache.*;
-import javax.cache.configuration.*;
-import javax.cache.integration.*;
-import java.sql.*;
-import java.util.*;
-
-/**
- * Tests for {@link CacheStoreSessionJdbcListener}.
- */
-public class CacheStoreSessionJdbcListenerSelfTest extends CacheStoreSessionListenerAbstractSelfTest {
- /** {@inheritDoc} */
- @Override protected Factory<? extends CacheStore<Integer, Integer>> storeFactory() {
- return new Factory<CacheStore<Integer, Integer>>() {
- @Override public CacheStore<Integer, Integer> create() {
- return new Store();
- }
- };
- }
-
- /** {@inheritDoc} */
- @Override protected Factory<CacheStoreSessionListener> sessionListenerFactory() {
- return new Factory<CacheStoreSessionListener>() {
- @Override public CacheStoreSessionListener create() {
- CacheStoreSessionJdbcListener lsnr = new CacheStoreSessionJdbcListener();
-
- lsnr.setDataSource(JdbcConnectionPool.create(URL, "", ""));
-
- return lsnr;
- }
- };
- }
-
- /**
- */
- private static class Store extends CacheStoreAdapter<Integer, Integer> {
- /** */
- private static String SES_CONN_KEY = "ses_conn";
-
- /** */
- @CacheStoreSessionResource
- private CacheStoreSession ses;
-
- /** {@inheritDoc} */
- @Override public void loadCache(IgniteBiInClosure<Integer, Integer> clo, Object... args) {
- loadCacheCnt.incrementAndGet();
-
- checkConnection();
- }
-
- /** {@inheritDoc} */
- @Override public Integer load(Integer key) throws CacheLoaderException {
- loadCnt.incrementAndGet();
-
- checkConnection();
-
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public void write(Cache.Entry<? extends Integer, ? extends Integer> entry)
- throws CacheWriterException {
- writeCnt.incrementAndGet();
-
- checkConnection();
-
- if (write.get()) {
- Connection conn = connection();
-
- try {
- String table;
-
- switch (ses.cacheName()) {
- case "cache1":
- table = "Table1";
-
- break;
-
- case "cache2":
- if (fail.get())
- throw new CacheWriterException("Expected failure.");
-
- table = "Table2";
-
- break;
-
- default:
- throw new CacheWriterException("Wring cache: " + ses.cacheName());
- }
-
- PreparedStatement stmt = conn.prepareStatement(
- "INSERT INTO " + table + " (key, value) VALUES (?, ?)");
-
- stmt.setInt(1, entry.getKey());
- stmt.setInt(2, entry.getValue());
-
- stmt.executeUpdate();
- }
- catch (SQLException e) {
- throw new CacheWriterException(e);
- }
- }
- }
-
- /** {@inheritDoc} */
- @Override public void delete(Object key) throws CacheWriterException {
- deleteCnt.incrementAndGet();
-
- checkConnection();
- }
-
- /** {@inheritDoc} */
- @Override public void sessionEnd(boolean commit) {
- assertNull(connection());
- }
-
- /**
- */
- private void checkConnection() {
- Connection conn = connection();
-
- assertNotNull(conn);
-
- try {
- assertFalse(conn.isClosed());
- assertFalse(conn.getAutoCommit());
- }
- catch (SQLException e) {
- throw new RuntimeException(e);
- }
-
- verifySameInstance(conn);
- }
-
- /**
- * @param conn Connection.
- */
- private void verifySameInstance(Connection conn) {
- Map<String, Connection> props = ses.properties();
-
- Connection sesConn = props.get(SES_CONN_KEY);
-
- if (sesConn == null)
- props.put(SES_CONN_KEY, conn);
- else {
- assertSame(conn, sesConn);
-
- reuseCnt.incrementAndGet();
- }
- }
-
- /**
- * @return Connection.
- */
- private Connection connection() {
- return ses.<String, Connection>properties().get(CacheStoreSessionJdbcListener.JDBC_CONN_KEY);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/990bf9e3/modules/core/src/test/java/org/apache/ignite/testframework/junits/cache/TestCacheSession.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/cache/TestCacheSession.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/cache/TestCacheSession.java
index 0709880..bc7bf7d 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/cache/TestCacheSession.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/cache/TestCacheSession.java
@@ -34,6 +34,9 @@ public class TestCacheSession implements CacheStoreSession {
/** */
private Map<Object, Object> props;
+ /** */
+ private Object attachment;
+
/**
*
* @param tx Transaction.
@@ -55,6 +58,17 @@ public class TestCacheSession implements CacheStoreSession {
}
/** {@inheritDoc} */
+ @Override public void attach(@Nullable Object attachment) {
+ this.attachment = attachment;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Nullable @Override public <T> T attachment() {
+ return (T)attachment;
+ }
+
+ /** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public <K, V> Map<K, V> properties() {
if (props == null)
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/990bf9e3/modules/core/src/test/java/org/apache/ignite/testframework/junits/cache/TestThreadLocalCacheSession.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/cache/TestThreadLocalCacheSession.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/cache/TestThreadLocalCacheSession.java
index 2bbcf1b..2047600 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/cache/TestThreadLocalCacheSession.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/cache/TestThreadLocalCacheSession.java
@@ -54,6 +54,22 @@ public class TestThreadLocalCacheSession implements CacheStoreSession {
}
/** {@inheritDoc} */
+ @Override public void attach(@Nullable Object attachment) {
+ TestCacheSession ses = sesHolder.get();
+
+ if (ses != null)
+ ses.attach(attachment);
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Nullable @Override public <T> T attachment() {
+ TestCacheSession ses = sesHolder.get();
+
+ return ses!= null ? (T)ses.attachment() : null;
+ }
+
+ /** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public <K, V> Map<K, V> properties() {
TestCacheSession ses = sesHolder.get();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/990bf9e3/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
index afb67f5..60c7a0a 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
@@ -131,7 +131,7 @@ public class IgniteCacheTestSuite4 extends TestSuite {
suite.addTestSuite(CacheOffheapMapEntrySelfTest.class);
- suite.addTestSuite(CacheStoreSessionJdbcListenerSelfTest.class);
+ suite.addTestSuite(CacheJdbcStoreSessionListenerSelfTest.class);
return suite;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/990bf9e3/modules/hibernate/src/main/java/org/apache/ignite/cache/store/hibernate/CacheHibernateStoreSessionListener.java
----------------------------------------------------------------------
diff --git a/modules/hibernate/src/main/java/org/apache/ignite/cache/store/hibernate/CacheHibernateStoreSessionListener.java b/modules/hibernate/src/main/java/org/apache/ignite/cache/store/hibernate/CacheHibernateStoreSessionListener.java
new file mode 100644
index 0000000..61f7265
--- /dev/null
+++ b/modules/hibernate/src/main/java/org/apache/ignite/cache/store/hibernate/CacheHibernateStoreSessionListener.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.hibernate;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.store.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lifecycle.*;
+import org.apache.ignite.resources.*;
+import org.hibernate.*;
+import org.hibernate.cfg.*;
+
+import javax.cache.integration.*;
+import java.io.*;
+import java.net.*;
+
+/**
+ * Hibernate-based cache store session listener.
+ * <p>
+ * This listener creates a new Hibernate session for each store
+ * session. If there is an ongoing cache transaction, a corresponding
+ * Hibernate transaction is created as well.
+ * <p>
+ * The Hibernate session is saved as a store session
+ * {@link CacheStoreSession#attachment() attachment}.
+ * The listener guarantees that the session will be
+ * available for any store operation. If there is an
+ * ongoing cache transaction, all operations within this
+ * transaction will share a DB transaction.
+ * <p>
+ * As an example, here is how the {@link CacheStore#write(javax.cache.Cache.Entry)}
+ * method can be implemented if {@link CacheHibernateStoreSessionListener}
+ * is configured:
+ * <pre name="code" class="java">
+ * private static class Store extends CacheStoreAdapter<Integer, Integer> {
+ * @CacheStoreSessionResource
+ * private CacheStoreSession ses;
+ *
+ * @Override public void write(Cache.Entry<? extends Integer, ? extends Integer> entry) throws CacheWriterException {
+ * // Get Hibernate session from the current store session.
+ * Session hibSes = ses.attachment();
+ *
+ * // Persist the value.
+ * hibSes.persist(entry.getValue());
+ * }
+ * }
+ * </pre>
+ * Hibernate session will be automatically created by the listener
+ * at the start of the session and closed when it ends.
+ * <p>
+ * {@link CacheHibernateStoreSessionListener} requires that either
+ * {@link #setSessionFactory(SessionFactory)} session factory}
+ * or {@link #setHibernateConfigurationPath(String) Hibernate configuration file}
+ * is provided. If non of them is set, exception is thrown. Is both are provided,
+ * session factory will be used.
+ */
+public class CacheHibernateStoreSessionListener implements CacheStoreSessionListener, LifecycleAware {
+ /** Hibernate session factory. */
+ private SessionFactory sesFactory;
+
+ /** Hibernate configuration file path. */
+ private String hibernateCfgPath;
+
+ /** Logger. */
+ @LoggerResource
+ private IgniteLogger log;
+
+ /** Whether to close session on stop. */
+ private boolean closeSesOnStop;
+
+ /**
+ * Sets Hibernate session factory.
+ * <p>
+ * Either session factory or configuration file is required.
+ * If none is provided, exception will be thrown on startup.
+ *
+ * @param sesFactory Session factory.
+ */
+ public void setSessionFactory(SessionFactory sesFactory) {
+ this.sesFactory = sesFactory;
+ }
+
+ /**
+ * Gets Hibernate session factory.
+ *
+ * @return Session factory.
+ */
+ public SessionFactory getSessionFactory() {
+ return sesFactory;
+ }
+
+ /**
+ * Sets hibernate configuration path.
+ * <p>
+ * Either session factory or configuration file is required.
+ * If none is provided, exception will be thrown on startup.
+ *
+ * @param hibernateCfgPath Hibernate configuration path.
+ */
+ public void setHibernateConfigurationPath(String hibernateCfgPath) {
+ this.hibernateCfgPath = hibernateCfgPath;
+ }
+
+ /**
+ * Gets hibernate configuration path.
+ *
+ * @return Hibernate configuration path.
+ */
+ public String getHibernateConfigurationPath() {
+ return hibernateCfgPath;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("deprecation")
+ @Override public void start() throws IgniteException {
+ if (sesFactory == null && F.isEmpty(hibernateCfgPath))
+ throw new IgniteException("Either session factory or Hibernate configuration file is required by " +
+ getClass().getSimpleName() + '.');
+
+ if (!F.isEmpty(hibernateCfgPath)) {
+ if (sesFactory == null) {
+ try {
+ URL url = new URL(hibernateCfgPath);
+
+ sesFactory = new Configuration().configure(url).buildSessionFactory();
+ }
+ catch (MalformedURLException ignored) {
+ // No-op.
+ }
+
+ if (sesFactory == null) {
+ File cfgFile = new File(hibernateCfgPath);
+
+ if (cfgFile.exists())
+ sesFactory = new Configuration().configure(cfgFile).buildSessionFactory();
+ }
+
+ if (sesFactory == null)
+ sesFactory = new Configuration().configure(hibernateCfgPath).buildSessionFactory();
+
+ if (sesFactory == null)
+ throw new IgniteException("Failed to resolve Hibernate configuration file: " + hibernateCfgPath);
+
+ closeSesOnStop = true;
+ }
+ else
+ U.warn(log, "Hibernate configuration file configured in " + getClass().getSimpleName() +
+ " will be ignored (session factory is already set).");
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void stop() throws IgniteException {
+ if (closeSesOnStop && sesFactory != null && !sesFactory.isClosed())
+ sesFactory.close();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onSessionStart(CacheStoreSession ses) {
+ if (ses.attachment() == null) {
+ try {
+ Session hibSes = sesFactory.openSession();
+
+ ses.attach(hibSes);
+
+ if (ses.isWithinTransaction())
+ hibSes.beginTransaction();
+ }
+ catch (HibernateException e) {
+ throw new CacheWriterException("Failed to start store session [tx=" + ses.transaction() + ']', e);
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onSessionEnd(CacheStoreSession ses, boolean commit) {
+ Session hibSes = ses.attachment();
+
+ if (hibSes != null) {
+ ses.attach(null);
+
+ try {
+ Transaction tx = hibSes.getTransaction();
+
+ if (commit) {
+ hibSes.flush();
+
+ if (tx.isActive())
+ tx.commit();
+ }
+ else if (tx.isActive())
+ tx.rollback();
+ }
+ catch (HibernateException e) {
+ throw new CacheWriterException("Failed to end store session [tx=" + ses.transaction() + ']', e);
+ }
+ finally {
+ hibSes.close();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/990bf9e3/modules/hibernate/src/main/java/org/apache/ignite/cache/store/hibernate/CacheStoreSessionHibernateListener.java
----------------------------------------------------------------------
diff --git a/modules/hibernate/src/main/java/org/apache/ignite/cache/store/hibernate/CacheStoreSessionHibernateListener.java b/modules/hibernate/src/main/java/org/apache/ignite/cache/store/hibernate/CacheStoreSessionHibernateListener.java
deleted file mode 100644
index ea1214a..0000000
--- a/modules/hibernate/src/main/java/org/apache/ignite/cache/store/hibernate/CacheStoreSessionHibernateListener.java
+++ /dev/null
@@ -1,223 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.cache.store.hibernate;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cache.store.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lifecycle.*;
-import org.apache.ignite.resources.*;
-import org.hibernate.*;
-import org.hibernate.cfg.*;
-
-import javax.cache.integration.*;
-import java.io.*;
-import java.net.*;
-import java.util.*;
-
-/**
- * Hibernate-based cache store session listener.
- * <p>
- * This listener creates a new Hibernate session for each store
- * session. If there is an ongoing cache transaction, a corresponding
- * Hibernate transaction is created as well.
- * <p>
- * The Hibernate session is stored in store session
- * {@link CacheStoreSession#properties() properties} and can
- * be accessed at any moment by {@link #HIBERNATE_SES_KEY} key.
- * The listener guarantees that the session will be
- * available for any store operation. If there is an
- * ongoing cache transaction, all operations within this
- * transaction will share a DB transaction.
- * <p>
- * As an example, here is how the {@link CacheStore#write(javax.cache.Cache.Entry)}
- * method can be implemented if {@link CacheStoreSessionHibernateListener}
- * is configured:
- * <pre name="code" class="java">
- * private static class Store extends CacheStoreAdapter<Integer, Integer> {
- * @CacheStoreSessionResource
- * private CacheStoreSession ses;
- *
- * @Override public void write(Cache.Entry<? extends Integer, ? extends Integer> entry) throws CacheWriterException {
- * // Get Hibernate session from the current store session.
- * Session hibSes = ses.<String, Session>properties().get(CacheStoreSessionHibernateListener.HIBERNATE_SES_KEY);
- *
- * // Persist the value.
- * hibSes.persist(entry.getValue());
- * }
- * }
- * </pre>
- * Hibernate session will be automatically created by the listener
- * at the start of the session and closed when it ends.
- * <p>
- * {@link CacheStoreSessionHibernateListener} requires that either
- * {@link #setSessionFactory(SessionFactory)} session factory}
- * or {@link #setHibernateConfigurationPath(String) Hibernate configuration file}
- * is provided. If non of them is set, exception is thrown. Is both are provided,
- * session factory will be used.
- */
-public class CacheStoreSessionHibernateListener implements CacheStoreSessionListener, LifecycleAware {
- /** Session key for JDBC connection. */
- public static final String HIBERNATE_SES_KEY = "__hibernate_ses_";
-
- /** Hibernate session factory. */
- private SessionFactory sesFactory;
-
- /** Hibernate configuration file path. */
- private String hibernateCfgPath;
-
- /** Logger. */
- @LoggerResource
- private IgniteLogger log;
-
- /** Whether to close session on stop. */
- private boolean closeSesOnStop;
-
- /**
- * Sets Hibernate session factory.
- * <p>
- * Either session factory or configuration file is required.
- * If none is provided, exception will be thrown on startup.
- *
- * @param sesFactory Session factory.
- */
- public void setSessionFactory(SessionFactory sesFactory) {
- this.sesFactory = sesFactory;
- }
-
- /**
- * Gets Hibernate session factory.
- *
- * @return Session factory.
- */
- public SessionFactory getSessionFactory() {
- return sesFactory;
- }
-
- /**
- * Sets hibernate configuration path.
- * <p>
- * Either session factory or configuration file is required.
- * If none is provided, exception will be thrown on startup.
- *
- * @param hibernateCfgPath Hibernate configuration path.
- */
- public void setHibernateConfigurationPath(String hibernateCfgPath) {
- this.hibernateCfgPath = hibernateCfgPath;
- }
-
- /**
- * Gets hibernate configuration path.
- *
- * @return Hibernate configuration path.
- */
- public String getHibernateConfigurationPath() {
- return hibernateCfgPath;
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("deprecation")
- @Override public void start() throws IgniteException {
- if (sesFactory == null && F.isEmpty(hibernateCfgPath))
- throw new IgniteException("Either session factory or Hibernate configuration file is required by " +
- getClass().getSimpleName() + '.');
-
- if (!F.isEmpty(hibernateCfgPath)) {
- if (sesFactory == null) {
- try {
- URL url = new URL(hibernateCfgPath);
-
- sesFactory = new Configuration().configure(url).buildSessionFactory();
- }
- catch (MalformedURLException ignored) {
- // No-op.
- }
-
- if (sesFactory == null) {
- File cfgFile = new File(hibernateCfgPath);
-
- if (cfgFile.exists())
- sesFactory = new Configuration().configure(cfgFile).buildSessionFactory();
- }
-
- if (sesFactory == null)
- sesFactory = new Configuration().configure(hibernateCfgPath).buildSessionFactory();
-
- if (sesFactory == null)
- throw new IgniteException("Failed to resolve Hibernate configuration file: " + hibernateCfgPath);
-
- closeSesOnStop = true;
- }
- else
- U.warn(log, "Hibernate configuration file configured in " + getClass().getSimpleName() +
- " will be ignored (session factory is already set).");
- }
- }
-
- /** {@inheritDoc} */
- @Override public void stop() throws IgniteException {
- if (closeSesOnStop && sesFactory != null && !sesFactory.isClosed())
- sesFactory.close();
- }
-
- /** {@inheritDoc} */
- @Override public void onSessionStart(CacheStoreSession ses) {
- Map<String, Session> props = ses.properties();
-
- if (!props.containsKey(HIBERNATE_SES_KEY)) {
- try {
- Session hibSes = sesFactory.openSession();
-
- props.put(HIBERNATE_SES_KEY, hibSes);
-
- if (ses.isWithinTransaction())
- hibSes.beginTransaction();
- }
- catch (HibernateException e) {
- throw new CacheWriterException("Failed to start store session [tx=" + ses.transaction() + ']', e);
- }
- }
- }
-
- /** {@inheritDoc} */
- @Override public void onSessionEnd(CacheStoreSession ses, boolean commit) {
- Session hibSes = ses.<String, Session>properties().remove(HIBERNATE_SES_KEY);
-
- if (hibSes != null) {
- try {
- Transaction tx = hibSes.getTransaction();
-
- if (commit) {
- hibSes.flush();
-
- if (tx.isActive())
- tx.commit();
- }
- else if (tx.isActive())
- tx.rollback();
- }
- catch (HibernateException e) {
- throw new CacheWriterException("Failed to end store session [tx=" + ses.transaction() + ']', e);
- }
- finally {
- hibSes.close();
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/990bf9e3/modules/hibernate/src/test/java/org/apache/ignite/cache/store/hibernate/CacheHibernateStoreSessionListenerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hibernate/src/test/java/org/apache/ignite/cache/store/hibernate/CacheHibernateStoreSessionListenerSelfTest.java b/modules/hibernate/src/test/java/org/apache/ignite/cache/store/hibernate/CacheHibernateStoreSessionListenerSelfTest.java
new file mode 100644
index 0000000..c30e216
--- /dev/null
+++ b/modules/hibernate/src/test/java/org/apache/ignite/cache/store/hibernate/CacheHibernateStoreSessionListenerSelfTest.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.hibernate;
+
+import org.apache.ignite.cache.store.*;
+import org.apache.ignite.cache.store.jdbc.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.resources.*;
+import org.hibernate.*;
+import org.hibernate.cfg.Configuration;
+
+import javax.cache.Cache;
+import javax.cache.configuration.*;
+import javax.cache.integration.*;
+import javax.persistence.*;
+import java.io.*;
+import java.util.*;
+
+/**
+ * Tests for {@link CacheJdbcStoreSessionListener}.
+ */
+public class CacheHibernateStoreSessionListenerSelfTest extends CacheStoreSessionListenerAbstractSelfTest {
+ /** {@inheritDoc} */
+ @Override protected Factory<? extends CacheStore<Integer, Integer>> storeFactory() {
+ return new Factory<CacheStore<Integer, Integer>>() {
+ @Override public CacheStore<Integer, Integer> create() {
+ return new Store();
+ }
+ };
+ }
+
+ /** {@inheritDoc} */
+ @Override protected Factory<CacheStoreSessionListener> sessionListenerFactory() {
+ return new Factory<CacheStoreSessionListener>() {
+ @Override public CacheStoreSessionListener create() {
+ CacheHibernateStoreSessionListener lsnr = new CacheHibernateStoreSessionListener();
+
+ SessionFactory sesFactory = new Configuration().
+ setProperty("hibernate.connection.url", URL).
+ addAnnotatedClass(Table1.class).
+ addAnnotatedClass(Table2.class).
+ buildSessionFactory();
+
+ lsnr.setSessionFactory(sesFactory);
+
+ return lsnr;
+ }
+ };
+ }
+
+ /**
+ */
+ private static class Store extends CacheStoreAdapter<Integer, Integer> {
+ /** */
+ private static String SES_CONN_KEY = "ses_conn";
+
+ /** */
+ @CacheStoreSessionResource
+ private CacheStoreSession ses;
+
+ /** {@inheritDoc} */
+ @Override public void loadCache(IgniteBiInClosure<Integer, Integer> clo, Object... args) {
+ loadCacheCnt.incrementAndGet();
+
+ checkSession();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Integer load(Integer key) throws CacheLoaderException {
+ loadCnt.incrementAndGet();
+
+ checkSession();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void write(Cache.Entry<? extends Integer, ? extends Integer> entry)
+ throws CacheWriterException {
+ writeCnt.incrementAndGet();
+
+ checkSession();
+
+ if (write.get()) {
+ Session hibSes = ses.attachment();
+
+ switch (ses.cacheName()) {
+ case "cache1":
+ hibSes.save(new Table1(entry.getKey(), entry.getValue()));
+
+ break;
+
+ case "cache2":
+ if (fail.get())
+ throw new CacheWriterException("Expected failure.");
+
+ hibSes.save(new Table2(entry.getKey(), entry.getValue()));
+
+ break;
+
+ default:
+ throw new CacheWriterException("Wring cache: " + ses.cacheName());
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void delete(Object key) throws CacheWriterException {
+ deleteCnt.incrementAndGet();
+
+ checkSession();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void sessionEnd(boolean commit) {
+ assertNull(ses.attachment());
+ }
+
+ /**
+ */
+ private void checkSession() {
+ Session hibSes = ses.attachment();
+
+ assertNotNull(hibSes);
+
+ assertTrue(hibSes.isOpen());
+
+ Transaction tx = hibSes.getTransaction();
+
+ assertNotNull(tx);
+
+ if (ses.isWithinTransaction())
+ assertTrue(tx.isActive());
+ else
+ assertFalse(tx.isActive());
+
+ verifySameInstance(hibSes);
+ }
+
+ /**
+ * @param hibSes Session.
+ */
+ private void verifySameInstance(Session hibSes) {
+ Map<String, Session> props = ses.properties();
+
+ Session sesConn = props.get(SES_CONN_KEY);
+
+ if (sesConn == null)
+ props.put(SES_CONN_KEY, hibSes);
+ else {
+ assertSame(hibSes, sesConn);
+
+ reuseCnt.incrementAndGet();
+ }
+ }
+ }
+
+ /**
+ */
+ @Entity
+ @Table(name = "Table1")
+ private static class Table1 implements Serializable {
+ /** */
+ @Id @GeneratedValue
+ @Column(name = "id")
+ private Integer id;
+
+ /** */
+ @Column(name = "key")
+ private int key;
+
+ /** */
+ @Column(name = "value")
+ private int value;
+
+ /**
+ * @param key Key.
+ * @param value Value.
+ */
+ private Table1(int key, int value) {
+ this.key = key;
+ this.value = value;
+ }
+ }
+
+ /**
+ */
+ @Entity
+ @Table(name = "Table2")
+ private static class Table2 implements Serializable {
+ /** */
+ @Id @GeneratedValue
+ @Column(name = "id")
+ private Integer id;
+
+ /** */
+ @Column(name = "key")
+ private int key;
+
+ /** */
+ @Column(name = "value")
+ private int value;
+
+ /**
+ * @param key Key.
+ * @param value Value.
+ */
+ private Table2(int key, int value) {
+ this.key = key;
+ this.value = value;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/990bf9e3/modules/hibernate/src/test/java/org/apache/ignite/cache/store/hibernate/CacheStoreSessionHibernateListenerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hibernate/src/test/java/org/apache/ignite/cache/store/hibernate/CacheStoreSessionHibernateListenerSelfTest.java b/modules/hibernate/src/test/java/org/apache/ignite/cache/store/hibernate/CacheStoreSessionHibernateListenerSelfTest.java
deleted file mode 100644
index a9d465e..0000000
--- a/modules/hibernate/src/test/java/org/apache/ignite/cache/store/hibernate/CacheStoreSessionHibernateListenerSelfTest.java
+++ /dev/null
@@ -1,235 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.cache.store.hibernate;
-
-import org.apache.ignite.cache.store.*;
-import org.apache.ignite.cache.store.jdbc.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.resources.*;
-import org.hibernate.*;
-import org.hibernate.cfg.Configuration;
-
-import javax.cache.Cache;
-import javax.cache.configuration.*;
-import javax.cache.integration.*;
-import javax.persistence.*;
-import java.io.*;
-import java.util.*;
-
-/**
- * Tests for {@link CacheStoreSessionJdbcListener}.
- */
-public class CacheStoreSessionHibernateListenerSelfTest extends CacheStoreSessionListenerAbstractSelfTest {
- /** {@inheritDoc} */
- @Override protected Factory<? extends CacheStore<Integer, Integer>> storeFactory() {
- return new Factory<CacheStore<Integer, Integer>>() {
- @Override public CacheStore<Integer, Integer> create() {
- return new Store();
- }
- };
- }
-
- /** {@inheritDoc} */
- @Override protected Factory<CacheStoreSessionListener> sessionListenerFactory() {
- return new Factory<CacheStoreSessionListener>() {
- @Override public CacheStoreSessionListener create() {
- CacheStoreSessionHibernateListener lsnr = new CacheStoreSessionHibernateListener();
-
- SessionFactory sesFactory = new Configuration().
- setProperty("hibernate.connection.url", URL).
- addAnnotatedClass(Table1.class).
- addAnnotatedClass(Table2.class).
- buildSessionFactory();
-
- lsnr.setSessionFactory(sesFactory);
-
- return lsnr;
- }
- };
- }
-
- /**
- */
- private static class Store extends CacheStoreAdapter<Integer, Integer> {
- /** */
- private static String SES_CONN_KEY = "ses_conn";
-
- /** */
- @CacheStoreSessionResource
- private CacheStoreSession ses;
-
- /** {@inheritDoc} */
- @Override public void loadCache(IgniteBiInClosure<Integer, Integer> clo, Object... args) {
- loadCacheCnt.incrementAndGet();
-
- checkSession();
- }
-
- /** {@inheritDoc} */
- @Override public Integer load(Integer key) throws CacheLoaderException {
- loadCnt.incrementAndGet();
-
- checkSession();
-
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public void write(Cache.Entry<? extends Integer, ? extends Integer> entry)
- throws CacheWriterException {
- writeCnt.incrementAndGet();
-
- checkSession();
-
- if (write.get()) {
- Session hibSes = session();
-
- switch (ses.cacheName()) {
- case "cache1":
- hibSes.save(new Table1(entry.getKey(), entry.getValue()));
-
- break;
-
- case "cache2":
- if (fail.get())
- throw new CacheWriterException("Expected failure.");
-
- hibSes.save(new Table2(entry.getKey(), entry.getValue()));
-
- break;
-
- default:
- throw new CacheWriterException("Wring cache: " + ses.cacheName());
- }
- }
- }
-
- /** {@inheritDoc} */
- @Override public void delete(Object key) throws CacheWriterException {
- deleteCnt.incrementAndGet();
-
- checkSession();
- }
-
- /** {@inheritDoc} */
- @Override public void sessionEnd(boolean commit) {
- assertNull(session());
- }
-
- /**
- */
- private void checkSession() {
- Session hibSes = session();
-
- assertNotNull(hibSes);
-
- assertTrue(hibSes.isOpen());
-
- Transaction tx = hibSes.getTransaction();
-
- assertNotNull(tx);
-
- if (ses.isWithinTransaction())
- assertTrue(tx.isActive());
- else
- assertFalse(tx.isActive());
-
- verifySameInstance(hibSes);
- }
-
- /**
- * @param hibSes Session.
- */
- private void verifySameInstance(Session hibSes) {
- Map<String, Session> props = ses.properties();
-
- Session sesConn = props.get(SES_CONN_KEY);
-
- if (sesConn == null)
- props.put(SES_CONN_KEY, hibSes);
- else {
- assertSame(hibSes, sesConn);
-
- reuseCnt.incrementAndGet();
- }
- }
-
- /**
- * @return Connection.
- */
- private Session session() {
- return ses.<String, Session>properties().get(CacheStoreSessionHibernateListener.HIBERNATE_SES_KEY);
- }
- }
-
- /**
- */
- @Entity
- @Table(name = "Table1")
- private static class Table1 implements Serializable {
- /** */
- @Id @GeneratedValue
- @Column(name = "id")
- private Integer id;
-
- /** */
- @Column(name = "key")
- private int key;
-
- /** */
- @Column(name = "value")
- private int value;
-
- /**
- * @param key Key.
- * @param value Value.
- */
- private Table1(int key, int value) {
- this.key = key;
- this.value = value;
- }
- }
-
- /**
- */
- @Entity
- @Table(name = "Table2")
- private static class Table2 implements Serializable {
- /** */
- @Id @GeneratedValue
- @Column(name = "id")
- private Integer id;
-
- /** */
- @Column(name = "key")
- private int key;
-
- /** */
- @Column(name = "value")
- private int value;
-
- /**
- * @param key Key.
- * @param value Value.
- */
- private Table2(int key, int value) {
- this.key = key;
- this.value = value;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/990bf9e3/modules/hibernate/src/test/java/org/apache/ignite/testsuites/IgniteHibernateTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/hibernate/src/test/java/org/apache/ignite/testsuites/IgniteHibernateTestSuite.java b/modules/hibernate/src/test/java/org/apache/ignite/testsuites/IgniteHibernateTestSuite.java
index ed782e2..655e801 100644
--- a/modules/hibernate/src/test/java/org/apache/ignite/testsuites/IgniteHibernateTestSuite.java
+++ b/modules/hibernate/src/test/java/org/apache/ignite/testsuites/IgniteHibernateTestSuite.java
@@ -41,7 +41,7 @@ public class IgniteHibernateTestSuite extends TestSuite {
suite.addTestSuite(CacheHibernateBlobStoreNodeRestartTest.class);
- suite.addTestSuite(CacheStoreSessionHibernateListenerSelfTest.class);
+ suite.addTestSuite(CacheHibernateStoreSessionListenerSelfTest.class);
return suite;
}