You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/02 12:37:53 UTC
[23/41] incubator-ignite git commit: IGNITE-891 - Cache store
improvements
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/990bf9e3/modules/spring/src/main/java/org/apache/ignite/cache/store/spring/CacheSpringStoreSessionListener.java
----------------------------------------------------------------------
diff --git a/modules/spring/src/main/java/org/apache/ignite/cache/store/spring/CacheSpringStoreSessionListener.java b/modules/spring/src/main/java/org/apache/ignite/cache/store/spring/CacheSpringStoreSessionListener.java
new file mode 100644
index 0000000..81736cd
--- /dev/null
+++ b/modules/spring/src/main/java/org/apache/ignite/cache/store/spring/CacheSpringStoreSessionListener.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.spring;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.store.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lifecycle.*;
+import org.apache.ignite.resources.*;
+import org.apache.ignite.transactions.*;
+import org.springframework.jdbc.core.*;
+import org.springframework.jdbc.datasource.*;
+import org.springframework.transaction.*;
+import org.springframework.transaction.support.*;
+
+import javax.cache.integration.*;
+import javax.sql.*;
+
+/**
+ * Cache store session listener based on Spring transaction management.
+ * <p>
+ * This listener starts a new DB transaction for each session and commits
+ * or rolls it back when session ends. If there is no ongoing
+ * cache transaction, this listener is no-op.
+ * <p>
+ * Store implementation can use any Spring APIs like {@link JdbcTemplate}
+ * and others. The listener will guarantee that if there is an
+ * ongoing cache transaction, all store operations within this
+ * transaction will be automatically enlisted in the same database
+ * transaction.
+ * <p>
+ * {@link CacheSpringStoreSessionListener} requires that either
+ * {@link #setTransactionManager(PlatformTransactionManager) transaction manager}
+ * or {@link #setDataSource(DataSource) data source} is configured. If non of them is
+ * provided, exception is thrown. Is both are provided, data source will be
+ * ignored.
+ * <p>
+ * If there is a transaction, a {@link TransactionStatus} object will be saved
+ * as a store session {@link CacheStoreSession#attachment() attachment}. It
+ * can be used to acquire current DB transaction status.
+ */
+public class CacheSpringStoreSessionListener implements CacheStoreSessionListener, LifecycleAware {
+ /** Transaction manager. */
+ private PlatformTransactionManager txMgr;
+
+ /** Data source. */
+ private DataSource dataSrc;
+
+ /** Propagation behavior. */
+ private int propagation = TransactionDefinition.PROPAGATION_REQUIRED;
+
+ /** Logger. */
+ @LoggerResource
+ private IgniteLogger log;
+
+ /**
+ * Sets transaction manager.
+ * <p>
+ * Either transaction manager or data source is required.
+ * If none is provided, exception will be thrown on startup.
+ *
+ * @param txMgr Transaction manager.
+ */
+ public void setTransactionManager(PlatformTransactionManager txMgr) {
+ this.txMgr = txMgr;
+ }
+
+ /**
+ * Gets transaction manager.
+ *
+ * @return Transaction manager.
+ */
+ public PlatformTransactionManager getTransactionManager() {
+ return txMgr;
+ }
+
+ /**
+ * Sets data source.
+ * <p>
+ * Either transaction manager or data source is required.
+ * If none is provided, exception will be thrown on startup.
+ *
+ * @param dataSrc Data source.
+ */
+ public void setDataSource(DataSource dataSrc) {
+ this.dataSrc = dataSrc;
+ }
+
+ /**
+ * Gets data source.
+ *
+ * @return Data source.
+ */
+ public DataSource getDataSource() {
+ return dataSrc;
+ }
+
+ /**
+ * Sets propagation behavior.
+ * <p>
+ * This parameter is optional.
+ *
+ * @param propagation Propagation behavior.
+ */
+ public void setPropagationBehavior(int propagation) {
+ this.propagation = propagation;
+ }
+
+ /**
+ * Gets propagation behavior.
+ *
+ * @return Propagation behavior.
+ */
+ public int getPropagationBehavior() {
+ return propagation;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void start() throws IgniteException {
+ if (txMgr == null && dataSrc == null)
+ throw new IgniteException("Either transaction manager or data source is required by " +
+ getClass().getSimpleName() + '.');
+
+ if (dataSrc != null) {
+ if (txMgr == null)
+ txMgr = new DataSourceTransactionManager(dataSrc);
+ else
+ U.warn(log, "Data source configured in " + getClass().getSimpleName() +
+ " will be ignored (transaction manager is already set).");
+ }
+
+ assert txMgr != null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void stop() throws IgniteException {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onSessionStart(CacheStoreSession ses) {
+ if (ses.isWithinTransaction()) {
+ try {
+ TransactionDefinition def = definition(ses.transaction(), ses.cacheName());
+
+ ses.attach(txMgr.getTransaction(def));
+ }
+ catch (TransactionException e) {
+ throw new CacheWriterException("Failed to start store session [tx=" + ses.transaction() + ']', e);
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onSessionEnd(CacheStoreSession ses, boolean commit) {
+ if (ses.isWithinTransaction()) {
+ TransactionStatus tx = ses.attachment();
+
+ if (tx != null) {
+ ses.attach(null);
+
+ try {
+ if (commit)
+ txMgr.commit(tx);
+ else
+ txMgr.rollback(tx);
+ }
+ catch (TransactionException e) {
+ throw new CacheWriterException("Failed to end store session [tx=" + ses.transaction() + ']', e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Gets DB transaction isolation level based on ongoing cache transaction isolation.
+ *
+ * @return DB transaction isolation.
+ */
+ private TransactionDefinition definition(Transaction tx, String cacheName) {
+ assert tx != null;
+
+ DefaultTransactionDefinition def = new DefaultTransactionDefinition();
+
+ def.setName("Ignite Tx [cache=" + (cacheName != null ? cacheName : "<default>") + ", id=" + tx.xid() + ']');
+ def.setIsolationLevel(isolationLevel(tx.isolation()));
+ def.setPropagationBehavior(propagation);
+
+ long timeoutSec = (tx.timeout() + 500) / 1000;
+
+ if (timeoutSec > 0 && timeoutSec < Integer.MAX_VALUE)
+ def.setTimeout((int)timeoutSec);
+
+ return def;
+ }
+
+ /**
+ * Gets DB transaction isolation level based on ongoing cache transaction isolation.
+ *
+ * @param isolation Cache transaction isolation.
+ * @return DB transaction isolation.
+ */
+ private int isolationLevel(TransactionIsolation isolation) {
+ switch (isolation) {
+ case READ_COMMITTED:
+ return TransactionDefinition.ISOLATION_READ_COMMITTED;
+
+ case REPEATABLE_READ:
+ return TransactionDefinition.ISOLATION_REPEATABLE_READ;
+
+ case SERIALIZABLE:
+ return TransactionDefinition.ISOLATION_SERIALIZABLE;
+
+ default:
+ throw new IllegalStateException(); // Will never happen.
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/990bf9e3/modules/spring/src/main/java/org/apache/ignite/cache/store/spring/CacheStoreSessionSpringListener.java
----------------------------------------------------------------------
diff --git a/modules/spring/src/main/java/org/apache/ignite/cache/store/spring/CacheStoreSessionSpringListener.java b/modules/spring/src/main/java/org/apache/ignite/cache/store/spring/CacheStoreSessionSpringListener.java
deleted file mode 100644
index e5201ba..0000000
--- a/modules/spring/src/main/java/org/apache/ignite/cache/store/spring/CacheStoreSessionSpringListener.java
+++ /dev/null
@@ -1,235 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.cache.store.spring;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cache.store.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lifecycle.*;
-import org.apache.ignite.resources.*;
-import org.apache.ignite.transactions.*;
-import org.springframework.jdbc.core.*;
-import org.springframework.jdbc.datasource.*;
-import org.springframework.transaction.*;
-import org.springframework.transaction.support.*;
-
-import javax.cache.integration.*;
-import javax.sql.*;
-
-/**
- * Cache store session listener based on Spring transaction management.
- * <p>
- * This listener starts a new DB transaction for each session and commits
- * or rolls it back when session ends. If there is no ongoing
- * cache transaction, this listener is no-op.
- * <p>
- * Store implementation can use any Spring APIs like {@link JdbcTemplate}
- * and others. The listener will guarantee that if there is an
- * ongoing cache transaction, all store operations within this
- * transaction will be automatically enlisted in the same database
- * transaction.
- * <p>
- * {@link CacheStoreSessionSpringListener} requires that either
- * {@link #setTransactionManager(PlatformTransactionManager) transaction manager}
- * or {@link #setDataSource(DataSource) data source} is configured. If non of them is
- * provided, exception is thrown. Is both are provided, data source will be
- * ignored.
- * <p>
- * If there is a transaction, a {@link TransactionStatus} object will be stored
- * in store session {@link CacheStoreSession#properties() properties} and can be
- * accessed at any moment by {@link #TX_STATUS_KEY} key. This can be used to
- * acquire current DB transaction status.
- */
-public class CacheStoreSessionSpringListener implements CacheStoreSessionListener, LifecycleAware {
- /** Session key for transaction status. */
- public static final String TX_STATUS_KEY = "__spring_tx_status_";
-
- /** Transaction manager. */
- private PlatformTransactionManager txMgr;
-
- /** Data source. */
- private DataSource dataSrc;
-
- /** Propagation behavior. */
- private int propagation = TransactionDefinition.PROPAGATION_REQUIRED;
-
- /** Logger. */
- @LoggerResource
- private IgniteLogger log;
-
- /**
- * Sets transaction manager.
- * <p>
- * Either transaction manager or data source is required.
- * If none is provided, exception will be thrown on startup.
- *
- * @param txMgr Transaction manager.
- */
- public void setTransactionManager(PlatformTransactionManager txMgr) {
- this.txMgr = txMgr;
- }
-
- /**
- * Gets transaction manager.
- *
- * @return Transaction manager.
- */
- public PlatformTransactionManager getTransactionManager() {
- return txMgr;
- }
-
- /**
- * Sets data source.
- * <p>
- * Either transaction manager or data source is required.
- * If none is provided, exception will be thrown on startup.
- *
- * @param dataSrc Data source.
- */
- public void setDataSource(DataSource dataSrc) {
- this.dataSrc = dataSrc;
- }
-
- /**
- * Gets data source.
- *
- * @return Data source.
- */
- public DataSource getDataSource() {
- return dataSrc;
- }
-
- /**
- * Sets propagation behavior.
- * <p>
- * This parameter is optional.
- *
- * @param propagation Propagation behavior.
- */
- public void setPropagationBehavior(int propagation) {
- this.propagation = propagation;
- }
-
- /**
- * Gets propagation behavior.
- *
- * @return Propagation behavior.
- */
- public int getPropagationBehavior() {
- return propagation;
- }
-
- /** {@inheritDoc} */
- @Override public void start() throws IgniteException {
- if (txMgr == null && dataSrc == null)
- throw new IgniteException("Either transaction manager or data source is required by " +
- getClass().getSimpleName() + '.');
-
- if (dataSrc != null) {
- if (txMgr == null)
- txMgr = new DataSourceTransactionManager(dataSrc);
- else
- U.warn(log, "Data source configured in " + getClass().getSimpleName() +
- " will be ignored (transaction manager is already set).");
- }
-
- assert txMgr != null;
- }
-
- /** {@inheritDoc} */
- @Override public void stop() throws IgniteException {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public void onSessionStart(CacheStoreSession ses) {
- if (ses.isWithinTransaction()) {
- try {
- TransactionDefinition def = definition(ses.transaction(), ses.cacheName());
-
- ses.properties().put(TX_STATUS_KEY, txMgr.getTransaction(def));
- }
- catch (TransactionException e) {
- throw new CacheWriterException("Failed to start store session [tx=" + ses.transaction() + ']', e);
- }
- }
- }
-
- /** {@inheritDoc} */
- @Override public void onSessionEnd(CacheStoreSession ses, boolean commit) {
- if (ses.isWithinTransaction()) {
- TransactionStatus tx = ses.<String, TransactionStatus>properties().remove(TX_STATUS_KEY);
-
- if (tx != null) {
- try {
- if (commit)
- txMgr.commit(tx);
- else
- txMgr.rollback(tx);
- }
- catch (TransactionException e) {
- throw new CacheWriterException("Failed to end store session [tx=" + ses.transaction() + ']', e);
- }
- }
- }
- }
-
- /**
- * Gets DB transaction isolation level based on ongoing cache transaction isolation.
- *
- * @return DB transaction isolation.
- */
- private TransactionDefinition definition(Transaction tx, String cacheName) {
- assert tx != null;
-
- DefaultTransactionDefinition def = new DefaultTransactionDefinition();
-
- def.setName("Ignite Tx [cache=" + (cacheName != null ? cacheName : "<default>") + ", id=" + tx.xid() + ']');
- def.setIsolationLevel(isolationLevel(tx.isolation()));
- def.setPropagationBehavior(propagation);
-
- long timeoutSec = (tx.timeout() + 500) / 1000;
-
- if (timeoutSec > 0 && timeoutSec < Integer.MAX_VALUE)
- def.setTimeout((int)timeoutSec);
-
- return def;
- }
-
- /**
- * Gets DB transaction isolation level based on ongoing cache transaction isolation.
- *
- * @param isolation Cache transaction isolation.
- * @return DB transaction isolation.
- */
- private int isolationLevel(TransactionIsolation isolation) {
- switch (isolation) {
- case READ_COMMITTED:
- return TransactionDefinition.ISOLATION_READ_COMMITTED;
-
- case REPEATABLE_READ:
- return TransactionDefinition.ISOLATION_REPEATABLE_READ;
-
- case SERIALIZABLE:
- return TransactionDefinition.ISOLATION_SERIALIZABLE;
-
- default:
- throw new IllegalStateException(); // Will never happen.
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/990bf9e3/modules/spring/src/test/java/org/apache/ignite/cache/store/spring/CacheSpringStoreSessionListenerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/spring/src/test/java/org/apache/ignite/cache/store/spring/CacheSpringStoreSessionListenerSelfTest.java b/modules/spring/src/test/java/org/apache/ignite/cache/store/spring/CacheSpringStoreSessionListenerSelfTest.java
new file mode 100644
index 0000000..74f5c69
--- /dev/null
+++ b/modules/spring/src/test/java/org/apache/ignite/cache/store/spring/CacheSpringStoreSessionListenerSelfTest.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.spring;
+
+import org.apache.ignite.cache.store.*;
+import org.apache.ignite.cache.store.jdbc.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.resources.*;
+import org.springframework.jdbc.core.*;
+import org.springframework.jdbc.datasource.*;
+import org.springframework.transaction.*;
+
+import javax.cache.*;
+import javax.cache.configuration.*;
+import javax.cache.integration.*;
+import javax.sql.*;
+import java.sql.*;
+import java.util.*;
+
+/**
+ * Tests for {@link CacheJdbcStoreSessionListener}.
+ */
+public class CacheSpringStoreSessionListenerSelfTest extends CacheStoreSessionListenerAbstractSelfTest {
+ /** */
+ private static final DataSource DATA_SRC = new DriverManagerDataSource(URL);
+
+ /** {@inheritDoc} */
+ @Override protected Factory<? extends CacheStore<Integer, Integer>> storeFactory() {
+ return new Factory<CacheStore<Integer, Integer>>() {
+ @Override public CacheStore<Integer, Integer> create() {
+ return new Store(new JdbcTemplate(DATA_SRC));
+ }
+ };
+ }
+
+ /** {@inheritDoc} */
+ @Override protected Factory<CacheStoreSessionListener> sessionListenerFactory() {
+ return new Factory<CacheStoreSessionListener>() {
+ @Override public CacheStoreSessionListener create() {
+ CacheSpringStoreSessionListener lsnr = new CacheSpringStoreSessionListener();
+
+ lsnr.setDataSource(DATA_SRC);
+
+ return lsnr;
+ }
+ };
+ }
+
+ /**
+ */
+ private static class Store extends CacheStoreAdapter<Integer, Integer> {
+ /** */
+ private static String SES_CONN_KEY = "ses_conn";
+
+ /** */
+ private final JdbcTemplate jdbc;
+
+ /** */
+ @CacheStoreSessionResource
+ private CacheStoreSession ses;
+
+ /**
+ * @param jdbc JDBC template.
+ */
+ private Store(JdbcTemplate jdbc) {
+ this.jdbc = jdbc;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void loadCache(IgniteBiInClosure<Integer, Integer> clo, Object... args) {
+ loadCacheCnt.incrementAndGet();
+
+ checkTransaction();
+ checkConnection();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Integer load(Integer key) throws CacheLoaderException {
+ loadCnt.incrementAndGet();
+
+ checkTransaction();
+ checkConnection();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void write(Cache.Entry<? extends Integer, ? extends Integer> entry)
+ throws CacheWriterException {
+ writeCnt.incrementAndGet();
+
+ checkTransaction();
+ checkConnection();
+
+ if (write.get()) {
+ String table;
+
+ switch (ses.cacheName()) {
+ case "cache1":
+ table = "Table1";
+
+ break;
+
+ case "cache2":
+ if (fail.get())
+ throw new CacheWriterException("Expected failure.");
+
+ table = "Table2";
+
+ break;
+
+ default:
+ throw new CacheWriterException("Wring cache: " + ses.cacheName());
+ }
+
+ jdbc.update("INSERT INTO " + table + " (key, value) VALUES (?, ?)",
+ entry.getKey(), entry.getValue());
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void delete(Object key) throws CacheWriterException {
+ deleteCnt.incrementAndGet();
+
+ checkTransaction();
+ checkConnection();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void sessionEnd(boolean commit) {
+ assertNull(ses.attachment());
+ }
+
+ /**
+ */
+ private void checkTransaction() {
+ TransactionStatus tx = ses.attachment();
+
+ if (ses.isWithinTransaction()) {
+ assertNotNull(tx);
+ assertFalse(tx.isCompleted());
+ }
+ else
+ assertNull(tx);
+ }
+
+ /**
+ */
+ private void checkConnection() {
+ Connection conn = DataSourceUtils.getConnection(jdbc.getDataSource());
+
+ assertNotNull(conn);
+
+ try {
+ assertFalse(conn.isClosed());
+ assertEquals(!ses.isWithinTransaction(), conn.getAutoCommit());
+ }
+ catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+
+ verifySameInstance(conn);
+ }
+
+ /**
+ * @param conn Connection.
+ */
+ private void verifySameInstance(Connection conn) {
+ Map<String, Connection> props = ses.properties();
+
+ Connection sesConn = props.get(SES_CONN_KEY);
+
+ if (sesConn == null)
+ props.put(SES_CONN_KEY, conn);
+ else {
+ assertSame(conn, sesConn);
+
+ reuseCnt.incrementAndGet();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/990bf9e3/modules/spring/src/test/java/org/apache/ignite/cache/store/spring/CacheStoreSessionSpringListenerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/spring/src/test/java/org/apache/ignite/cache/store/spring/CacheStoreSessionSpringListenerSelfTest.java b/modules/spring/src/test/java/org/apache/ignite/cache/store/spring/CacheStoreSessionSpringListenerSelfTest.java
deleted file mode 100644
index 83ed249..0000000
--- a/modules/spring/src/test/java/org/apache/ignite/cache/store/spring/CacheStoreSessionSpringListenerSelfTest.java
+++ /dev/null
@@ -1,204 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.cache.store.spring;
-
-import org.apache.ignite.cache.store.*;
-import org.apache.ignite.cache.store.jdbc.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.resources.*;
-import org.springframework.jdbc.core.*;
-import org.springframework.jdbc.datasource.*;
-import org.springframework.transaction.*;
-
-import javax.cache.*;
-import javax.cache.configuration.*;
-import javax.cache.integration.*;
-import javax.sql.*;
-import java.sql.*;
-import java.util.*;
-
-/**
- * Tests for {@link CacheStoreSessionJdbcListener}.
- */
-public class CacheStoreSessionSpringListenerSelfTest extends CacheStoreSessionListenerAbstractSelfTest {
- /** */
- private static final DataSource DATA_SRC = new DriverManagerDataSource(URL);
-
- /** {@inheritDoc} */
- @Override protected Factory<? extends CacheStore<Integer, Integer>> storeFactory() {
- return new Factory<CacheStore<Integer, Integer>>() {
- @Override public CacheStore<Integer, Integer> create() {
- return new Store(new JdbcTemplate(DATA_SRC));
- }
- };
- }
-
- /** {@inheritDoc} */
- @Override protected Factory<CacheStoreSessionListener> sessionListenerFactory() {
- return new Factory<CacheStoreSessionListener>() {
- @Override public CacheStoreSessionListener create() {
- CacheStoreSessionSpringListener lsnr = new CacheStoreSessionSpringListener();
-
- lsnr.setDataSource(DATA_SRC);
-
- return lsnr;
- }
- };
- }
-
- /**
- */
- private static class Store extends CacheStoreAdapter<Integer, Integer> {
- /** */
- private static String SES_CONN_KEY = "ses_conn";
-
- /** */
- private final JdbcTemplate jdbc;
-
- /** */
- @CacheStoreSessionResource
- private CacheStoreSession ses;
-
- /**
- * @param jdbc JDBC template.
- */
- private Store(JdbcTemplate jdbc) {
- this.jdbc = jdbc;
- }
-
- /** {@inheritDoc} */
- @Override public void loadCache(IgniteBiInClosure<Integer, Integer> clo, Object... args) {
- loadCacheCnt.incrementAndGet();
-
- checkTransaction();
- checkConnection();
- }
-
- /** {@inheritDoc} */
- @Override public Integer load(Integer key) throws CacheLoaderException {
- loadCnt.incrementAndGet();
-
- checkTransaction();
- checkConnection();
-
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public void write(Cache.Entry<? extends Integer, ? extends Integer> entry)
- throws CacheWriterException {
- writeCnt.incrementAndGet();
-
- checkTransaction();
- checkConnection();
-
- if (write.get()) {
- String table;
-
- switch (ses.cacheName()) {
- case "cache1":
- table = "Table1";
-
- break;
-
- case "cache2":
- if (fail.get())
- throw new CacheWriterException("Expected failure.");
-
- table = "Table2";
-
- break;
-
- default:
- throw new CacheWriterException("Wring cache: " + ses.cacheName());
- }
-
- jdbc.update("INSERT INTO " + table + " (key, value) VALUES (?, ?)",
- entry.getKey(), entry.getValue());
- }
- }
-
- /** {@inheritDoc} */
- @Override public void delete(Object key) throws CacheWriterException {
- deleteCnt.incrementAndGet();
-
- checkTransaction();
- checkConnection();
- }
-
- /** {@inheritDoc} */
- @Override public void sessionEnd(boolean commit) {
- assertNull(transaction());
- }
-
- /**
- */
- private void checkTransaction() {
- TransactionStatus tx = transaction();
-
- if (ses.isWithinTransaction()) {
- assertNotNull(tx);
- assertFalse(tx.isCompleted());
- }
- else
- assertNull(tx);
- }
-
- /**
- * @return Transaction status.
- */
- private TransactionStatus transaction() {
- return ses.<String, TransactionStatus>properties().get(CacheStoreSessionSpringListener.TX_STATUS_KEY);
- }
-
- /**
- */
- private void checkConnection() {
- Connection conn = DataSourceUtils.getConnection(jdbc.getDataSource());
-
- assertNotNull(conn);
-
- try {
- assertFalse(conn.isClosed());
- assertEquals(!ses.isWithinTransaction(), conn.getAutoCommit());
- }
- catch (SQLException e) {
- throw new RuntimeException(e);
- }
-
- verifySameInstance(conn);
- }
-
- /**
- * @param conn Connection.
- */
- private void verifySameInstance(Connection conn) {
- Map<String, Connection> props = ses.properties();
-
- Connection sesConn = props.get(SES_CONN_KEY);
-
- if (sesConn == null)
- props.put(SES_CONN_KEY, conn);
- else {
- assertSame(conn, sesConn);
-
- reuseCnt.incrementAndGet();
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/990bf9e3/modules/spring/src/test/java/org/apache/ignite/testsuites/IgniteSpringTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/spring/src/test/java/org/apache/ignite/testsuites/IgniteSpringTestSuite.java b/modules/spring/src/test/java/org/apache/ignite/testsuites/IgniteSpringTestSuite.java
index 0b7e471..12dd494 100644
--- a/modules/spring/src/test/java/org/apache/ignite/testsuites/IgniteSpringTestSuite.java
+++ b/modules/spring/src/test/java/org/apache/ignite/testsuites/IgniteSpringTestSuite.java
@@ -48,7 +48,7 @@ public class IgniteSpringTestSuite extends TestSuite {
suite.addTest(new TestSuite(IgniteStartFromStreamConfigurationTest.class));
- suite.addTestSuite(CacheStoreSessionSpringListenerSelfTest.class);
+ suite.addTestSuite(CacheSpringStoreSessionListenerSelfTest.class);
return suite;
}