You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/02 12:37:49 UTC

[19/41] incubator-ignite git commit: IGNITE-891 - Cache store improvements

IGNITE-891 - Cache store improvements


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/e47f85ac
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/e47f85ac
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/e47f85ac

Branch: refs/heads/ignite-218
Commit: e47f85acae51f609e0bd1a1465c38d4a1bc96576
Parents: 4a55d29
Author: Valentin Kulichenko <vk...@gridgain.com>
Authored: Fri May 22 00:02:07 2015 -0700
Committer: Valentin Kulichenko <vk...@gridgain.com>
Committed: Fri May 22 00:02:07 2015 -0700

----------------------------------------------------------------------
 .../apache/ignite/cache/store/CacheStore.java   |  2 +
 .../cache/store/CacheStoreSessionListener.java  | 99 +++++++++++++++++++-
 .../jdbc/CacheStoreSessionJdbcListener.java     | 44 ++++++++-
 .../configuration/CacheConfiguration.java       |  6 ++
 .../configuration/IgniteConfiguration.java      |  7 ++
 .../CacheStoreSessionHibernateListener.java     | 48 +++++++++-
 .../spring/CacheStoreSessionSpringListener.java | 32 ++++++-
 7 files changed, 230 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e47f85ac/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStore.java b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStore.java
index d018298..5bfdda1 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStore.java
@@ -94,6 +94,8 @@ public interface CacheStore<K, V> extends CacheLoader<K, V>, CacheWriter<K, V> {
      * @throws CacheWriterException If commit or rollback failed. Note that commit failure in some cases
      *      may bring cache transaction into {@link TransactionState#UNKNOWN} which will
      *      consequently cause all transacted entries to be invalidated.
+     * @deprecated Use {@link CacheStoreSessionListener} instead (refer to its JavaDoc for details).
      */
+    @Deprecated
     public void sessionEnd(boolean commit) throws CacheWriterException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e47f85ac/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreSessionListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreSessionListener.java b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreSessionListener.java
index cba66c3..8b7cd8f 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreSessionListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreSessionListener.java
@@ -17,12 +17,104 @@
 
 package org.apache.ignite.cache.store;
 
+import org.apache.ignite.cache.store.jdbc.*;
+import org.apache.ignite.configuration.*;
+
+import javax.cache.configuration.*;
+import javax.sql.*;
+
 /**
- * Store session listener.
+ * Cache store session listener that allows to implement callbacks
+ * for session lifecycle.
+ * <p>
+ * The most common use case for session listeners is database
+ * connection and transaction management. Store can be invoked one
+ * or several times during one session, depending on whether it's
+ * executed within cache transaction or not. In any case, you have
+ * to create a connection when session is started and commit it or
+ * rollback when session is finished.
+ * <p>
+ * Cache store session listener allows to implement this and other
+ * scenarios providing to callback methods:
+ * <ul>
+ *     <li>
+ *         {@link #onSessionStart(CacheStoreSession)} - called
+ *         before any store operation within a session is invoked.
+ *     </li>
+ *     <li>
+ *         {@link #onSessionEnd(CacheStoreSession, boolean)} - called
+ *         after all operations within a session are invoked.
+ *     </li>
+ * </ul>
+ * <h2>Implementations</h2>
+ * Ignites provides several out-of-the-box implementations
+ * of session listener (refer to individual JavaDocs for more
+ * details):
+ * <ul>
+ *     <li>
+ *         {@link CacheStoreSessionJdbcListener} - JDBC-based session
+ *         listener. For each session it gets a new JDBC connection from
+ *         provided {@link DataSource} and commits (or rolls back) it
+ *         when session ends.
+ *     </li>
+ *     <li>
+ *         {@ignitelink org.apache.ignite.cache.store.spring.CacheStoreSessionSpringListener} -
+ *         session listener based on Spring transaction management.
+ *         It starts a new DB transaction for each session and commits
+ *         (or rolls back) it when session ends. If there is no ongoing
+ *         cache transaction, this listener is no-op.
+ *     </li>
+ *     <li>
+ *         <@ignitelink org.apache.ignite.cache.store.hibernate.CacheStoreSessionHibernateListener} -
+ *         Hibernate-based session listener. It creates a new Hibernate
+ *         session for each Ignite session. If there is an ongoing cache
+ *         transaction, a corresponding Hibernate transaction is created
+ *         as well.
+ *     </li>
+ * </ul>
+ * <h2>Configuration</h2>
+ * There are two ways to configure a session listener:
+ * <ul>
+ *     <li>
+ *         Provide a global listener for all caches via
+ *         {@link IgniteConfiguration#setCacheStoreSessionListenerFactories(Factory[])}
+ *         configuration property. This will we called for any store
+ *         session, not depending on what caches participate in
+ *         transaction.
+ *     </li>
+ *     <li>
+ *         Provide a listener for a particular cache via
+ *         {@link CacheConfiguration#setCacheStoreSessionListenerFactories(Factory[])}
+ *         configuration property. This will be called only if the
+ *         cache participates in transaction.
+ *     </li>
+ * </ul>
+ * For example, here is how global {@link CacheStoreSessionJdbcListener}
+ * can be configured in Spring XML configuration file:
+ * <pre name="code" class="xml">
+ * &lt;bean class="org.apache.ignite.configuration.IgniteConfiguration"&gt;
+ *     ...
+ *
+ *     &lt;property name="CacheStoreSessionListenerFactories"&gt;
+ *         &lt;list&gt;
+ *             &lt;bean class="javax.cache.configuration.FactoryBuilder$SingletonFactory"&gt;
+ *                 &lt;constructor-arg&gt;
+ *                     &lt;bean class="org.apache.ignite.cache.store.jdbc.CacheStoreSessionJdbcListener"&gt;
+ *                         &lt;!-- Inject external data source. --&gt;
+ *                         &lt;property name="dataSource" ref="jdbc-data-source"/&gt;
+ *                     &lt;/bean&gt;
+ *                 &lt;/constructor-arg&gt;
+ *             &lt;/bean&gt;
+ *         &lt;/list&gt;
+ *     &lt;/property&gt;
+ * &lt;/bean&gt;
+ * </pre>
  */
 public interface CacheStoreSessionListener {
     /**
      * On session start callback.
+     * <p>
+     * Called before any store operation within a session is invoked.
      *
      * @param ses Current session.
      */
@@ -30,9 +122,12 @@ public interface CacheStoreSessionListener {
 
     /**
      * On session end callback.
+     * <p>
+     * Called after all operations within a session are invoked.
      *
      * @param ses Current session.
-     * @param commit {@code True} if transaction should commit, {@code false} for rollback.
+     * @param commit {@code True} if persistence store transaction
+     *      should commit, {@code false} for rollback.
      */
     public void onSessionEnd(CacheStoreSession ses, boolean commit);
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e47f85ac/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheStoreSessionJdbcListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheStoreSessionJdbcListener.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheStoreSessionJdbcListener.java
index e4cd617..7920fae 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheStoreSessionJdbcListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheStoreSessionJdbcListener.java
@@ -22,6 +22,7 @@ import org.apache.ignite.cache.store.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lifecycle.*;
 
+import javax.cache.*;
 import javax.cache.integration.*;
 import javax.sql.*;
 import java.sql.*;
@@ -29,6 +30,44 @@ import java.util.*;
 
 /**
  * Cache store session listener based on JDBC connection.
+ * <p>
+ * For each session this listener gets a new JDBC connection
+ * from provided {@link DataSource} and commits (or rolls
+ * back) it when session ends.
+ * <p>
+ * The connection is stored in store session
+ * {@link CacheStoreSession#properties() properties} and can
+ * be accessed at any moment by {@link #JDBC_CONN_KEY} key.
+ * The listener guarantees that the connection will be
+ * available for any store operation. If there is an
+ * ongoing cache transaction, all operations within this
+ * transaction will be committed or rolled back only when
+ * session ends.
+ * <p>
+ * As an example, here is how the {@link CacheStore#write(Cache.Entry)}
+ * method can be implemented if {@link CacheStoreSessionJdbcListener}
+ * is configured:
+ * <pre name="code" class="java">
+ * private static class Store extends CacheStoreAdapter&lt;Integer, Integer&gt; {
+ *     &#64;CacheStoreSessionResource
+ *     private CacheStoreSession ses;
+ *
+ *     &#64;Override public void write(Cache.Entry&lt;? extends Integer, ? extends Integer&gt; entry) throws CacheWriterException {
+ *         // Get connection from the current session.
+ *         Connection conn = ses.<String, Connection>properties().get(CacheStoreSessionJdbcListener.JDBC_CONN_KEY);
+ *
+ *         // Execute update SQL query.
+ *         try {
+ *             conn.createStatement().executeUpdate("...");
+ *         }
+ *         catch (SQLException e) {
+ *             throw new CacheWriterException("Failed to update the store.", e);
+ *         }
+ *     }
+ * }
+ * </pre>
+ * JDBC connection will be automatically created by the listener
+ * at the start of the session and closed when it ends.
  */
 public class CacheStoreSessionJdbcListener implements CacheStoreSessionListener, LifecycleAware {
     /** Session key for JDBC connection. */
@@ -39,12 +78,13 @@ public class CacheStoreSessionJdbcListener implements CacheStoreSessionListener,
 
     /**
      * Sets data source.
+     * <p>
+     * This is a required parameter. If data source is not set,
+     * exception will be thrown on startup.
      *
      * @param dataSrc Data source.
      */
     public void setDataSource(DataSource dataSrc) {
-        A.notNull(dataSrc, "dataSrc");
-
         this.dataSrc = dataSrc;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e47f85ac/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
index 33a5711c..0018218 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
@@ -1742,6 +1742,7 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
      * Gets cache store session listener factories.
      *
      * @return Cache store session listener factories.
+     * @see CacheStoreSessionListener
      */
     public Factory<? extends CacheStoreSessionListener>[] getCacheStoreSessionListenerFactories() {
         return storeSesLsnrs;
@@ -1749,9 +1750,14 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
 
     /**
      * Cache store session listener factories.
+     * <p>
+     * These listeners override global listeners provided in
+     * {@link IgniteConfiguration#setCacheStoreSessionListenerFactories(Factory[])}
+     * configuration property.
      *
      * @param storeSesLsnrs Cache store session listener factories.
      * @return {@code this} for chaining.
+     * @see CacheStoreSessionListener
      */
     public CacheConfiguration setCacheStoreSessionListenerFactories(
         Factory<? extends CacheStoreSessionListener>... storeSesLsnrs) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e47f85ac/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
index 96ac7e3..5d4c98f 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
@@ -2249,6 +2249,7 @@ public class IgniteConfiguration {
      * Gets cache store session listener factories.
      *
      * @return Cache store session listener factories.
+     * @see CacheStoreSessionListener
      */
     public Factory<CacheStoreSessionListener>[] getCacheStoreSessionListenerFactories() {
         return storeSesLsnrs;
@@ -2256,9 +2257,15 @@ public class IgniteConfiguration {
 
     /**
      * Cache store session listener factories.
+     * <p>
+     * These are global store session listeners, so they are applied to
+     * all caches. If you need to override listeners for a
+     * particular cache, use {@link CacheConfiguration#setCacheStoreSessionListenerFactories(Factory[])}
+     * configuration property.
      *
      * @param storeSesLsnrs Cache store session listener factories.
      * @return {@code this} for chaining.
+     * @see CacheStoreSessionListener
      */
     public IgniteConfiguration setCacheStoreSessionListenerFactories(
         Factory<CacheStoreSessionListener>... storeSesLsnrs) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e47f85ac/modules/hibernate/src/main/java/org/apache/ignite/cache/store/hibernate/CacheStoreSessionHibernateListener.java
----------------------------------------------------------------------
diff --git a/modules/hibernate/src/main/java/org/apache/ignite/cache/store/hibernate/CacheStoreSessionHibernateListener.java b/modules/hibernate/src/main/java/org/apache/ignite/cache/store/hibernate/CacheStoreSessionHibernateListener.java
index fe0960e..ea1214a 100644
--- a/modules/hibernate/src/main/java/org/apache/ignite/cache/store/hibernate/CacheStoreSessionHibernateListener.java
+++ b/modules/hibernate/src/main/java/org/apache/ignite/cache/store/hibernate/CacheStoreSessionHibernateListener.java
@@ -32,7 +32,45 @@ import java.net.*;
 import java.util.*;
 
 /**
- * Cache store session listener based on Hibernate session.
+ * Hibernate-based cache store session listener.
+ * <p>
+ * This listener creates a new Hibernate session for each store
+ * session. If there is an ongoing cache transaction, a corresponding
+ * Hibernate transaction is created as well.
+ * <p>
+ * The Hibernate session is stored in store session
+ * {@link CacheStoreSession#properties() properties} and can
+ * be accessed at any moment by {@link #HIBERNATE_SES_KEY} key.
+ * The listener guarantees that the session will be
+ * available for any store operation. If there is an
+ * ongoing cache transaction, all operations within this
+ * transaction will share a DB transaction.
+ * <p>
+ * As an example, here is how the {@link CacheStore#write(javax.cache.Cache.Entry)}
+ * method can be implemented if {@link CacheStoreSessionHibernateListener}
+ * is configured:
+ * <pre name="code" class="java">
+ * private static class Store extends CacheStoreAdapter&lt;Integer, Integer&gt; {
+ *     &#64;CacheStoreSessionResource
+ *     private CacheStoreSession ses;
+ *
+ *     &#64;Override public void write(Cache.Entry&lt;? extends Integer, ? extends Integer&gt; entry) throws CacheWriterException {
+ *         // Get Hibernate session from the current store session.
+ *         Session hibSes = ses.<String, Session>properties().get(CacheStoreSessionHibernateListener.HIBERNATE_SES_KEY);
+ *
+ *         // Persist the value.
+ *         hibSes.persist(entry.getValue());
+ *     }
+ * }
+ * </pre>
+ * Hibernate session will be automatically created by the listener
+ * at the start of the session and closed when it ends.
+ * <p>
+ * {@link CacheStoreSessionHibernateListener} requires that either
+ * {@link #setSessionFactory(SessionFactory)} session factory}
+ * or {@link #setHibernateConfigurationPath(String) Hibernate configuration file}
+ * is provided. If non of them is set, exception is thrown. Is both are provided,
+ * session factory will be used.
  */
 public class CacheStoreSessionHibernateListener implements CacheStoreSessionListener, LifecycleAware {
     /** Session key for JDBC connection. */
@@ -53,12 +91,13 @@ public class CacheStoreSessionHibernateListener implements CacheStoreSessionList
 
     /**
      * Sets Hibernate session factory.
+     * <p>
+     * Either session factory or configuration file is required.
+     * If none is provided, exception will be thrown on startup.
      *
      * @param sesFactory Session factory.
      */
     public void setSessionFactory(SessionFactory sesFactory) {
-        A.notNull(sesFactory, "sesFactory");
-
         this.sesFactory = sesFactory;
     }
 
@@ -73,6 +112,9 @@ public class CacheStoreSessionHibernateListener implements CacheStoreSessionList
 
     /**
      * Sets hibernate configuration path.
+     * <p>
+     * Either session factory or configuration file is required.
+     * If none is provided, exception will be thrown on startup.
      *
      * @param hibernateCfgPath Hibernate configuration path.
      */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e47f85ac/modules/spring/src/main/java/org/apache/ignite/cache/store/spring/CacheStoreSessionSpringListener.java
----------------------------------------------------------------------
diff --git a/modules/spring/src/main/java/org/apache/ignite/cache/store/spring/CacheStoreSessionSpringListener.java b/modules/spring/src/main/java/org/apache/ignite/cache/store/spring/CacheStoreSessionSpringListener.java
index 2fab4f0..e5201ba 100644
--- a/modules/spring/src/main/java/org/apache/ignite/cache/store/spring/CacheStoreSessionSpringListener.java
+++ b/modules/spring/src/main/java/org/apache/ignite/cache/store/spring/CacheStoreSessionSpringListener.java
@@ -23,6 +23,7 @@ import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lifecycle.*;
 import org.apache.ignite.resources.*;
 import org.apache.ignite.transactions.*;
+import org.springframework.jdbc.core.*;
 import org.springframework.jdbc.datasource.*;
 import org.springframework.transaction.*;
 import org.springframework.transaction.support.*;
@@ -31,7 +32,28 @@ import javax.cache.integration.*;
 import javax.sql.*;
 
 /**
- * Cache store session listener based on Spring cache manager.
+ * Cache store session listener based on Spring transaction management.
+ * <p>
+ * This listener starts a new DB transaction for each session and commits
+ * or rolls it back when session ends. If there is no ongoing
+ * cache transaction, this listener is no-op.
+ * <p>
+ * Store implementation can use any Spring APIs like {@link JdbcTemplate}
+ * and others. The listener will guarantee that if there is an
+ * ongoing cache transaction, all store operations within this
+ * transaction will be automatically enlisted in the same database
+ * transaction.
+ * <p>
+ * {@link CacheStoreSessionSpringListener} requires that either
+ * {@link #setTransactionManager(PlatformTransactionManager) transaction manager}
+ * or {@link #setDataSource(DataSource) data source} is configured. If non of them is
+ * provided, exception is thrown. Is both are provided, data source will be
+ * ignored.
+ * <p>
+ * If there is a transaction, a {@link TransactionStatus} object will be stored
+ * in store session {@link CacheStoreSession#properties() properties} and can be
+ * accessed at any moment by {@link #TX_STATUS_KEY} key. This can be used to
+ * acquire current DB transaction status.
  */
 public class CacheStoreSessionSpringListener implements CacheStoreSessionListener, LifecycleAware {
     /** Session key for transaction status. */
@@ -52,6 +74,9 @@ public class CacheStoreSessionSpringListener implements CacheStoreSessionListene
 
     /**
      * Sets transaction manager.
+     * <p>
+     * Either transaction manager or data source is required.
+     * If none is provided, exception will be thrown on startup.
      *
      * @param txMgr Transaction manager.
      */
@@ -70,6 +95,9 @@ public class CacheStoreSessionSpringListener implements CacheStoreSessionListene
 
     /**
      * Sets data source.
+     * <p>
+     * Either transaction manager or data source is required.
+     * If none is provided, exception will be thrown on startup.
      *
      * @param dataSrc Data source.
      */
@@ -88,6 +116,8 @@ public class CacheStoreSessionSpringListener implements CacheStoreSessionListene
 
     /**
      * Sets propagation behavior.
+     * <p>
+     * This parameter is optional.
      *
      * @param propagation Propagation behavior.
      */