You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by al...@apache.org on 2020/12/24 14:41:39 UTC
[ignite] branch master updated: IGNITE-13708 Add thin client
support for Spring Transactions - Fixes #8556.
This is an automated email from the ASF dual-hosted git repository.
alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 9123e97 IGNITE-13708 Add thin client support for Spring Transactions - Fixes #8556.
9123e97 is described below
commit 9123e97d32c02a967088a087ace2248ca7f2fe7b
Author: Mikhail Petrov <pm...@gmail.com>
AuthorDate: Thu Dec 24 17:24:22 2020 +0300
IGNITE-13708 Add thin client support for Spring Transactions - Fixes #8556.
Signed-off-by: Aleksey Plekhanov <pl...@gmail.com>
---
.../transactions/proxy/ClientTransactionProxy.java | 60 ++++
.../proxy/ClientTransactionProxyFactory.java | 61 ++++
.../transactions/proxy/IgniteTransactionProxy.java | 60 ++++
.../proxy/IgniteTransactionProxyFactory.java | 62 +++++
.../transactions/proxy/TransactionProxy.java | 41 +++
.../proxy/TransactionProxyFactory.java | 27 ++
.../spring/AbstractSpringTransactionManager.java | 309 +++++++++++++++++++++
.../IgniteClientSpringTransactionManager.java | 117 ++++++++
.../spring/IgniteTransactionHolder.java | 9 +-
.../spring/SpringTransactionManager.java | 288 +++----------------
.../ignite/testsuites/IgniteSpringTestSuite.java | 2 +
.../GridSpringTransactionManagerAbstractTest.java | 4 +-
.../GridSpringTransactionManagerSelfTest.java | 7 +-
...SpringTransactionManagerSpringBeanSelfTest.java | 7 +-
.../spring/GridSpringTransactionService.java | 91 +++++-
.../IgniteClientSpringTransactionManagerTest.java | 118 ++++++++
16 files changed, 1002 insertions(+), 261 deletions(-)
diff --git a/modules/spring/src/main/java/org/apache/ignite/internal/transactions/proxy/ClientTransactionProxy.java b/modules/spring/src/main/java/org/apache/ignite/internal/transactions/proxy/ClientTransactionProxy.java
new file mode 100644
index 0000000..eca575b
--- /dev/null
+++ b/modules/spring/src/main/java/org/apache/ignite/internal/transactions/proxy/ClientTransactionProxy.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.transactions.proxy;
+
+import org.apache.ignite.client.ClientTransaction;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * Represents {@link TransactionProxy} implementation that uses {@link ClientTransaction} to perform transaction
+ * operations.
+ */
+public class ClientTransactionProxy implements TransactionProxy {
+ /** */
+ private final ClientTransaction tx;
+
+ /** */
+ public ClientTransactionProxy(ClientTransaction tx) {
+ this.tx = tx;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void commit() {
+ tx.commit();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void rollback() {
+ tx.rollback();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() {
+ tx.close();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean setRollbackOnly() {
+ throw new UnsupportedOperationException("Operation is not supported by thin client.");
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(ClientTransactionProxy.class, this);
+ }
+}
diff --git a/modules/spring/src/main/java/org/apache/ignite/internal/transactions/proxy/ClientTransactionProxyFactory.java b/modules/spring/src/main/java/org/apache/ignite/internal/transactions/proxy/ClientTransactionProxyFactory.java
new file mode 100644
index 0000000..440867d
--- /dev/null
+++ b/modules/spring/src/main/java/org/apache/ignite/internal/transactions/proxy/ClientTransactionProxyFactory.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.transactions.proxy;
+
+import org.apache.ignite.client.ClientTransactions;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+
+/**
+ * Represents {@link TransactionProxyFactory} implementation that uses Ignite thin client transaction facade to start
+ * new transaction.
+ */
+public class ClientTransactionProxyFactory implements TransactionProxyFactory {
+ /** */
+ private final ClientTransactions txs;
+
+ /** */
+ public ClientTransactionProxyFactory(ClientTransactions txs) {
+ this.txs = txs;
+ }
+
+ /** {@inheritDoc} */
+ @Override public TransactionProxy txStart(
+ TransactionConcurrency concurrency,
+ TransactionIsolation isolation,
+ long timeout
+ ) {
+ return new ClientTransactionProxy(txs.txStart(concurrency, isolation, timeout));
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object other) {
+ if (this == other)
+ return true;
+
+ if (other == null || getClass() != other.getClass())
+ return false;
+
+ return txs.equals(((ClientTransactionProxyFactory)other).txs);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return txs.hashCode();
+ }
+}
diff --git a/modules/spring/src/main/java/org/apache/ignite/internal/transactions/proxy/IgniteTransactionProxy.java b/modules/spring/src/main/java/org/apache/ignite/internal/transactions/proxy/IgniteTransactionProxy.java
new file mode 100644
index 0000000..15eaaae
--- /dev/null
+++ b/modules/spring/src/main/java/org/apache/ignite/internal/transactions/proxy/IgniteTransactionProxy.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.transactions.proxy;
+
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.transactions.Transaction;
+
+/**
+ * Represents {@link TransactionProxy} implementation that uses {@link Transaction} to perform transaction
+ * operations.
+ */
+public class IgniteTransactionProxy implements TransactionProxy {
+ /** */
+ private final Transaction tx;
+
+ /** */
+ public IgniteTransactionProxy(Transaction tx) {
+ this.tx = tx;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void commit() {
+ tx.commit();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void rollback() {
+ tx.rollback();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() {
+ tx.close();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean setRollbackOnly() {
+ return tx.setRollbackOnly();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(IgniteTransactionProxy.class, this);
+ }
+}
diff --git a/modules/spring/src/main/java/org/apache/ignite/internal/transactions/proxy/IgniteTransactionProxyFactory.java b/modules/spring/src/main/java/org/apache/ignite/internal/transactions/proxy/IgniteTransactionProxyFactory.java
new file mode 100644
index 0000000..efc0c51
--- /dev/null
+++ b/modules/spring/src/main/java/org/apache/ignite/internal/transactions/proxy/IgniteTransactionProxyFactory.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.transactions.proxy;
+
+import java.util.Objects;
+import org.apache.ignite.IgniteTransactions;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+
+/**
+ * Represents {@link TransactionProxyFactory} implementation that uses Ignite node transaction facade to start new
+ * transaction.
+ */
+public class IgniteTransactionProxyFactory implements TransactionProxyFactory {
+ /** */
+ private final IgniteTransactions txs;
+
+ /** */
+ public IgniteTransactionProxyFactory(IgniteTransactions txs) {
+ this.txs = txs;
+ }
+
+ /** {@inheritDoc} */
+ @Override public TransactionProxy txStart(
+ TransactionConcurrency concurrency,
+ TransactionIsolation isolation,
+ long timeout
+ ) {
+ return new IgniteTransactionProxy(txs.txStart(concurrency, isolation, timeout, 0));
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object other) {
+ if (this == other)
+ return true;
+
+ if (other == null || getClass() != other.getClass())
+ return false;
+
+ return txs.equals(((IgniteTransactionProxyFactory)other).txs);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return Objects.hash(txs);
+ }
+}
diff --git a/modules/spring/src/main/java/org/apache/ignite/internal/transactions/proxy/TransactionProxy.java b/modules/spring/src/main/java/org/apache/ignite/internal/transactions/proxy/TransactionProxy.java
new file mode 100644
index 0000000..a40a5bd
--- /dev/null
+++ b/modules/spring/src/main/java/org/apache/ignite/internal/transactions/proxy/TransactionProxy.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.transactions.proxy;
+
+/** Represents Ignite client-independent transaction operations. */
+public interface TransactionProxy extends AutoCloseable {
+ /** Commits this transaction. */
+ public void commit();
+
+ /** Rolls back this transaction. */
+ public void rollback();
+
+ /** Ends the transaction. Transaction will be rolled back if it has not been committed. */
+ @Override public void close();
+
+ /**
+ * Modify the transaction associated with the current thread such that the
+ * only possible outcome of the transaction is to roll back the
+ * transaction.
+ *
+ * @return {@code True} if rollback-only flag was set as a result of this operation,
+ * {@code false} if it was already set prior to this call or could not be set
+ * because transaction is already finishing up committing or rolling back.
+ */
+ public boolean setRollbackOnly();
+}
diff --git a/modules/spring/src/main/java/org/apache/ignite/internal/transactions/proxy/TransactionProxyFactory.java b/modules/spring/src/main/java/org/apache/ignite/internal/transactions/proxy/TransactionProxyFactory.java
new file mode 100644
index 0000000..5484723
--- /dev/null
+++ b/modules/spring/src/main/java/org/apache/ignite/internal/transactions/proxy/TransactionProxyFactory.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.transactions.proxy;
+
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+
+/** Represents Ignite client-independent transaction factory. */
+public interface TransactionProxyFactory {
+ /** Starts transaction with specified concurrency, isolation and timeout. */
+ public TransactionProxy txStart(TransactionConcurrency concurrency, TransactionIsolation isolation, long timeout);
+}
diff --git a/modules/spring/src/main/java/org/apache/ignite/transactions/spring/AbstractSpringTransactionManager.java b/modules/spring/src/main/java/org/apache/ignite/transactions/spring/AbstractSpringTransactionManager.java
new file mode 100644
index 0000000..717baeb
--- /dev/null
+++ b/modules/spring/src/main/java/org/apache/ignite/transactions/spring/AbstractSpringTransactionManager.java
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.transactions.spring;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.transactions.proxy.TransactionProxy;
+import org.apache.ignite.internal.transactions.proxy.TransactionProxyFactory;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+import org.springframework.context.ApplicationListener;
+import org.springframework.context.event.ContextRefreshedEvent;
+import org.springframework.transaction.CannotCreateTransactionException;
+import org.springframework.transaction.InvalidIsolationLevelException;
+import org.springframework.transaction.TransactionDefinition;
+import org.springframework.transaction.TransactionException;
+import org.springframework.transaction.TransactionSystemException;
+import org.springframework.transaction.support.AbstractPlatformTransactionManager;
+import org.springframework.transaction.support.DefaultTransactionStatus;
+import org.springframework.transaction.support.ResourceTransactionManager;
+import org.springframework.transaction.support.SmartTransactionObject;
+import org.springframework.transaction.support.TransactionSynchronizationManager;
+import org.springframework.transaction.support.TransactionSynchronizationUtils;
+
+/** Abstract implementation of Spring Transaction manager with omitted Ignite cluster access logic. */
+public abstract class AbstractSpringTransactionManager extends AbstractPlatformTransactionManager
+ implements ResourceTransactionManager, ApplicationListener<ContextRefreshedEvent>
+{
+ /** Transaction factory.*/
+ private TransactionProxyFactory txFactory;
+
+ /** Ignite logger. */
+ private IgniteLogger log;
+
+ /** Transaction concurrency level. */
+ private TransactionConcurrency txConcurrency;
+
+ /** Default transaction isolation. */
+ private TransactionIsolation dfltTxIsolation;
+
+ /** Default transaction timeout. */
+ private long dfltTxTimeout;
+
+ /**
+ * Gets transaction concurrency level.
+ *
+ * @return Transaction concurrency level.
+ */
+ public TransactionConcurrency getTransactionConcurrency() {
+ return txConcurrency;
+ }
+
+ /**
+ * Sets transaction concurrency level.
+ *
+ * @param txConcurrency transaction concurrency level.
+ */
+ public void setTransactionConcurrency(TransactionConcurrency txConcurrency) {
+ this.txConcurrency = txConcurrency;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onApplicationEvent(ContextRefreshedEvent evt) {
+ if (txConcurrency == null)
+ txConcurrency = defaultTransactionConcurrency();
+
+ dfltTxIsolation = defaultTransactionIsolation();
+
+ dfltTxTimeout = defaultTransactionTimeout();
+
+ log = log();
+
+ txFactory = createTransactionFactory();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected Object doGetTransaction() throws TransactionException {
+ IgniteTransactionObject txObj = new IgniteTransactionObject();
+
+ txObj.setTransactionHolder(
+ (IgniteTransactionHolder)TransactionSynchronizationManager.getResource(txFactory), false);
+
+ return txObj;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void doBegin(Object transaction, TransactionDefinition definition) throws TransactionException {
+ if (definition.getIsolationLevel() == TransactionDefinition.ISOLATION_READ_UNCOMMITTED)
+ throw new InvalidIsolationLevelException("Ignite does not support READ_UNCOMMITTED isolation level.");
+
+ IgniteTransactionObject txObj = (IgniteTransactionObject)transaction;
+ TransactionProxy tx = null;
+
+ try {
+ if (txObj.getTransactionHolder() == null || txObj.getTransactionHolder().isSynchronizedWithTransaction()) {
+ long timeout = dfltTxTimeout;
+
+ if (definition.getTimeout() > 0)
+ timeout = TimeUnit.SECONDS.toMillis(definition.getTimeout());
+
+ TransactionProxy newTx = txFactory.txStart(txConcurrency,
+ convertToIgniteIsolationLevel(definition.getIsolationLevel()), timeout);
+
+ if (log.isDebugEnabled())
+ log.debug("Started Ignite transaction: " + newTx);
+
+ txObj.setTransactionHolder(new IgniteTransactionHolder(newTx), true);
+ }
+
+ txObj.getTransactionHolder().setSynchronizedWithTransaction(true);
+ txObj.getTransactionHolder().setTransactionActive(true);
+
+ tx = txObj.getTransactionHolder().getTransaction();
+
+ // Bind the session holder to the thread.
+ if (txObj.isNewTransactionHolder())
+ TransactionSynchronizationManager.bindResource(txFactory, txObj.getTransactionHolder());
+ }
+ catch (Exception ex) {
+ if (tx != null)
+ tx.close();
+
+ throw new CannotCreateTransactionException("Could not create Ignite transaction", ex);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void doCommit(DefaultTransactionStatus status) throws TransactionException {
+ IgniteTransactionObject txObj = (IgniteTransactionObject)status.getTransaction();
+ TransactionProxy tx = txObj.getTransactionHolder().getTransaction();
+
+ if (status.isDebug() && log.isDebugEnabled())
+ log.debug("Committing Ignite transaction: " + tx);
+
+ try {
+ tx.commit();
+ }
+ catch (Exception e) {
+ throw new TransactionSystemException("Could not commit Ignite transaction", e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void doRollback(DefaultTransactionStatus status) throws TransactionException {
+ IgniteTransactionObject txObj = (IgniteTransactionObject)status.getTransaction();
+ TransactionProxy tx = txObj.getTransactionHolder().getTransaction();
+
+ if (status.isDebug() && log.isDebugEnabled())
+ log.debug("Rolling back Ignite transaction: " + tx);
+
+ try {
+ tx.rollback();
+ }
+ catch (Exception e) {
+ throw new TransactionSystemException("Could not rollback Ignite transaction", e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void doSetRollbackOnly(DefaultTransactionStatus status) throws TransactionException {
+ IgniteTransactionObject txObj = (IgniteTransactionObject)status.getTransaction();
+ TransactionProxy tx = txObj.getTransactionHolder().getTransaction();
+
+ assert tx != null;
+
+ if (status.isDebug() && log.isDebugEnabled())
+ log.debug("Setting Ignite transaction rollback-only: " + tx);
+
+ tx.setRollbackOnly();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void doCleanupAfterCompletion(Object transaction) {
+ IgniteTransactionObject txObj = (IgniteTransactionObject)transaction;
+
+ // Remove the transaction holder from the thread, if exposed.
+ if (txObj.isNewTransactionHolder()) {
+ TransactionProxy tx = txObj.getTransactionHolder().getTransaction();
+ TransactionSynchronizationManager.unbindResource(txFactory);
+
+ if (log.isDebugEnabled())
+ log.debug("Releasing Ignite transaction: " + tx);
+ }
+
+ txObj.getTransactionHolder().clear();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected boolean isExistingTransaction(Object transaction) throws TransactionException {
+ IgniteTransactionObject txObj = (IgniteTransactionObject)transaction;
+
+ return (txObj.getTransactionHolder() != null && txObj.getTransactionHolder().isTransactionActive());
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object getResourceFactory() {
+ return txFactory;
+ }
+
+ /**
+ * @param isolationLevel Spring isolation level.
+ * @return Ignite isolation level.
+ */
+ private TransactionIsolation convertToIgniteIsolationLevel(int isolationLevel) {
+ TransactionIsolation isolation = dfltTxIsolation;
+
+ switch (isolationLevel) {
+ case TransactionDefinition.ISOLATION_READ_COMMITTED:
+ isolation = TransactionIsolation.READ_COMMITTED;
+
+ break;
+
+ case TransactionDefinition.ISOLATION_REPEATABLE_READ:
+ isolation = TransactionIsolation.REPEATABLE_READ;
+
+ break;
+
+ case TransactionDefinition.ISOLATION_SERIALIZABLE:
+ isolation = TransactionIsolation.SERIALIZABLE;
+ }
+
+ return isolation;
+ }
+
+ /** @return Default transaction isolation. */
+ protected abstract TransactionIsolation defaultTransactionIsolation();
+
+ /** @return Default transaction timeout. */
+ protected abstract long defaultTransactionTimeout();
+
+ /** @return Default transaction concurrency. */
+ protected abstract TransactionConcurrency defaultTransactionConcurrency();
+
+ /** Creates instance of {@link TransactionProxyFactory} that will be used to start new Ignite transactions. */
+ protected abstract TransactionProxyFactory createTransactionFactory();
+
+ /** @return Ignite logger. */
+ protected abstract IgniteLogger log();
+
+ /**
+ * An object representing a managed Ignite transaction.
+ */
+ protected static class IgniteTransactionObject implements SmartTransactionObject {
+ /** */
+ private IgniteTransactionHolder txHolder;
+
+ /** */
+ private boolean newTxHolder;
+
+ /**
+ * Sets the resource holder being used to hold Ignite resources in the
+ * transaction.
+ *
+ * @param txHolder the transaction resource holder
+ * @param newTxHolder true if the holder was created for this transaction,
+ * false if it already existed
+ */
+ private void setTransactionHolder(IgniteTransactionHolder txHolder, boolean newTxHolder) {
+ this.txHolder = txHolder;
+ this.newTxHolder = newTxHolder;
+ }
+
+ /**
+ * Returns the resource holder being used to hold Ignite resources in the
+ * transaction.
+ *
+ * @return the transaction resource holder
+ */
+ protected IgniteTransactionHolder getTransactionHolder() {
+ return txHolder;
+ }
+
+ /**
+ * Returns true if the transaction holder was created for the current
+ * transaction and false if it existed prior to the transaction.
+ *
+ * @return true if the holder was created for this transaction, false if it
+ * already existed
+ */
+ private boolean isNewTransactionHolder() {
+ return newTxHolder;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isRollbackOnly() {
+ return txHolder.isRollbackOnly();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void flush() {
+ TransactionSynchronizationUtils.triggerFlush();
+ }
+ }
+}
diff --git a/modules/spring/src/main/java/org/apache/ignite/transactions/spring/IgniteClientSpringTransactionManager.java b/modules/spring/src/main/java/org/apache/ignite/transactions/spring/IgniteClientSpringTransactionManager.java
new file mode 100644
index 0000000..25eff87
--- /dev/null
+++ b/modules/spring/src/main/java/org/apache/ignite/transactions/spring/IgniteClientSpringTransactionManager.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.transactions.spring;
+
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.client.IgniteClient;
+import org.apache.ignite.configuration.ClientTransactionConfiguration;
+import org.apache.ignite.internal.transactions.proxy.ClientTransactionProxyFactory;
+import org.apache.ignite.internal.transactions.proxy.TransactionProxyFactory;
+import org.apache.ignite.logger.NullLogger;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+import org.springframework.context.event.ContextRefreshedEvent;
+import org.springframework.transaction.TransactionException;
+import org.springframework.transaction.support.DefaultTransactionStatus;
+
+/**
+ * Represents {@link AbstractSpringTransactionManager} implementation that uses thin client to access the cluster and
+ * manage transactions. It requires thin client instance to be set before manager use
+ * (see {@link #setClientInstance(IgniteClient)}).
+ *
+ * You can provide ignite client instance to a Spring configuration XML file, like below:
+ *
+ * <pre name="code" class="xml">
+ * <beans xmlns="http://www.springframework.org/schema/beans"
+ * xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ * xmlns:tx="http://www.springframework.org/schema/tx"
+ * xsi:schemaLocation="
+ * http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
+ * http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd">
+ * <-- Provide Ignite client instance. -->
+ * <bean id="transactionManager" class="org.apache.ignite.transactions.spring.IgniteClientSpringTransactionManager">
+ * <property name="clientInstance" ref="igniteClientBean"/>
+ * </bean>
+ *
+ * <-- Use annotation-driven transaction configuration. -->
+ * <tx:annotation-driven/>
+ * </beans>
+ * </pre>
+ *
+ * Note that the same thin client instance must be used to both initialize the transaction manager and perform
+ * transactional operations.
+ *
+ * @see SpringTransactionManager to configure Transaction Manager access to the cluster through the Ignite client node.
+ */
+public class IgniteClientSpringTransactionManager extends AbstractSpringTransactionManager {
+ /** No-op Ignite logger. */
+ private static final IgniteLogger NOOP_LOG = new NullLogger();
+
+ /** Thin client instance. */
+ private IgniteClient cli;
+
+ /** @return Thin client instance that is used for accessing the Ignite cluster. */
+ public IgniteClient getClientInstance() {
+ return cli;
+ }
+
+ /** Sets thin client instance that is used for accessing the Ignite cluster. */
+ public void setClientInstance(IgniteClient cli) {
+ this.cli = cli;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onApplicationEvent(ContextRefreshedEvent evt) {
+ if (cli == null) {
+ throw new IllegalArgumentException("Failed to obtain thin client instance for accessing the Ignite" +
+ " cluster. Check that 'clientInstance' property is set.");
+ }
+
+ super.onApplicationEvent(evt);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected TransactionIsolation defaultTransactionIsolation() {
+ return ClientTransactionConfiguration.DFLT_TX_ISOLATION;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected long defaultTransactionTimeout() {
+ return ClientTransactionConfiguration.DFLT_TRANSACTION_TIMEOUT;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected TransactionConcurrency defaultTransactionConcurrency() {
+ return ClientTransactionConfiguration.DFLT_TX_CONCURRENCY;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected TransactionProxyFactory createTransactionFactory() {
+ return new ClientTransactionProxyFactory(cli.transactions());
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteLogger log() {
+ return NOOP_LOG;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void doSetRollbackOnly(DefaultTransactionStatus status) throws TransactionException {
+ ((IgniteTransactionObject)status.getTransaction()).getTransactionHolder().setRollbackOnly();
+ }
+}
diff --git a/modules/spring/src/main/java/org/apache/ignite/transactions/spring/IgniteTransactionHolder.java b/modules/spring/src/main/java/org/apache/ignite/transactions/spring/IgniteTransactionHolder.java
index e2c7133..d0363a1 100644
--- a/modules/spring/src/main/java/org/apache/ignite/transactions/spring/IgniteTransactionHolder.java
+++ b/modules/spring/src/main/java/org/apache/ignite/transactions/spring/IgniteTransactionHolder.java
@@ -17,6 +17,7 @@
package org.apache.ignite.transactions.spring;
+import org.apache.ignite.internal.transactions.proxy.TransactionProxy;
import org.apache.ignite.transactions.Transaction;
import org.springframework.transaction.support.ResourceHolderSupport;
@@ -26,7 +27,7 @@ import org.springframework.transaction.support.ResourceHolderSupport;
*/
class IgniteTransactionHolder extends ResourceHolderSupport {
/** */
- private Transaction transaction;
+ private TransactionProxy transaction;
/** */
private boolean transactionActive;
@@ -36,7 +37,7 @@ class IgniteTransactionHolder extends ResourceHolderSupport {
*
* @param transaction the transaction to hold
*/
- IgniteTransactionHolder(Transaction transaction) {
+ IgniteTransactionHolder(TransactionProxy transaction) {
this.transaction = transaction;
}
@@ -54,7 +55,7 @@ class IgniteTransactionHolder extends ResourceHolderSupport {
*
* @param transaction the transaction
*/
- void setTransaction(Transaction transaction) {
+ void setTransaction(TransactionProxy transaction) {
this.transaction = transaction;
}
@@ -63,7 +64,7 @@ class IgniteTransactionHolder extends ResourceHolderSupport {
*
* @return the transaction or null
*/
- Transaction getTransaction() {
+ TransactionProxy getTransaction() {
return this.transaction;
}
diff --git a/modules/spring/src/main/java/org/apache/ignite/transactions/spring/SpringTransactionManager.java b/modules/spring/src/main/java/org/apache/ignite/transactions/spring/SpringTransactionManager.java
index 0d7056e..8736c6f 100644
--- a/modules/spring/src/main/java/org/apache/ignite/transactions/spring/SpringTransactionManager.java
+++ b/modules/spring/src/main/java/org/apache/ignite/transactions/spring/SpringTransactionManager.java
@@ -17,32 +17,23 @@
package org.apache.ignite.transactions.spring;
-import java.util.concurrent.TimeUnit;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSpring;
import org.apache.ignite.Ignition;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.TransactionConfiguration;
+import org.apache.ignite.internal.transactions.proxy.IgniteTransactionProxyFactory;
+import org.apache.ignite.internal.transactions.proxy.TransactionProxyFactory;
import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
+import org.springframework.beans.BeansException;
+import org.springframework.beans.factory.DisposableBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
-import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
-import org.springframework.transaction.CannotCreateTransactionException;
-import org.springframework.transaction.InvalidIsolationLevelException;
-import org.springframework.transaction.PlatformTransactionManager;
-import org.springframework.transaction.TransactionDefinition;
-import org.springframework.transaction.TransactionException;
-import org.springframework.transaction.TransactionSystemException;
-import org.springframework.transaction.support.AbstractPlatformTransactionManager;
-import org.springframework.transaction.support.DefaultTransactionStatus;
-import org.springframework.transaction.support.ResourceTransactionManager;
-import org.springframework.transaction.support.TransactionSynchronizationManager;
/**
* Implementation of Spring transaction abstraction based on Ignite transaction.
@@ -199,71 +190,29 @@ import org.springframework.transaction.support.TransactionSynchronizationManager
* }
* </pre>
*/
-public class SpringTransactionManager extends AbstractPlatformTransactionManager
- implements ResourceTransactionManager, PlatformTransactionManager, ApplicationListener<ContextRefreshedEvent>, ApplicationContextAware {
- /**
- * Logger.
- */
- private IgniteLogger log;
-
- /**
- * Transaction concurrency level.
- */
- private TransactionConcurrency transactionConcurrency;
-
- /**
- * Grid configuration file path.
- */
+public class SpringTransactionManager extends AbstractSpringTransactionManager implements ApplicationContextAware,
+ DisposableBean
+{
+ /** Grid configuration file path. */
private String cfgPath;
- /**
- * Ignite configuration.
- */
+ /** Ignite configuration. */
private IgniteConfiguration cfg;
- /**
- * Ignite instance name.
- */
+ /** Ignite instance name. */
private String igniteInstanceName;
- /**
- * Ignite instance.
- */
+ /** Ignite instance. */
private Ignite ignite;
- /** Spring context */
- private ApplicationContext springCtx;
+ /** Flag indicating that Ignite instance was not created inside current transaction manager. */
+ private boolean externalIgniteInstance;
- /** {@inheritDoc} */
- @Override public void setApplicationContext(ApplicationContext ctx) {
- this.springCtx = ctx;
- }
-
- /**
- * Constructs the transaction manager with no target Ignite instance. An
- * instance must be set before use.
- */
- public SpringTransactionManager() {
- setNestedTransactionAllowed(false);
- }
+ /** Ignite transactions configuration. */
+ private TransactionConfiguration txCfg;
- /**
- * Gets transaction concurrency level.
- *
- * @return Transaction concurrency level.
- */
- public TransactionConcurrency getTransactionConcurrency() {
- return transactionConcurrency;
- }
-
- /**
- * Sets transaction concurrency level.
- *
- * @param transactionConcurrency transaction concurrency level.
- */
- public void setTransactionConcurrency(TransactionConcurrency transactionConcurrency) {
- this.transactionConcurrency = transactionConcurrency;
- }
+ /** Spring context */
+ private ApplicationContext springCtx;
/**
* Gets configuration file path.
@@ -342,220 +291,69 @@ public class SpringTransactionManager extends AbstractPlatformTransactionManager
}
/** {@inheritDoc} */
- @Override public void onApplicationEvent(ContextRefreshedEvent event) {
+ @Override public void onApplicationEvent(ContextRefreshedEvent evt) {
if (ignite == null) {
if (cfgPath != null && cfg != null) {
throw new IllegalArgumentException("Both 'configurationPath' and 'configuration' are " +
"provided. Set only one of these properties if you need to start a Ignite node inside of " +
- "SpringCacheManager. If you already have a node running, omit both of them and set" +
+ "SpringTransactionManager. If you already have a node running, omit both of them and set" +
"'igniteInstanceName' property.");
}
try {
- if (cfgPath != null) {
+ if (cfgPath != null)
ignite = IgniteSpring.start(cfgPath, springCtx);
- }
else if (cfg != null)
ignite = IgniteSpring.start(cfg, springCtx);
- else
+ else {
ignite = Ignition.ignite(igniteInstanceName);
+
+ externalIgniteInstance = true;
+ }
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
}
- }
-
- if (transactionConcurrency == null)
- transactionConcurrency = ignite.configuration().getTransactionConfiguration().getDefaultTxConcurrency();
-
- log = ignite.log();
- }
-
- /** {@inheritDoc} */
- @Override protected Object doGetTransaction() throws TransactionException {
- IgniteTransactionObject txObj = new IgniteTransactionObject();
-
- txObj.setTransactionHolder(
- (IgniteTransactionHolder)TransactionSynchronizationManager.getResource(this.ignite), false);
-
- return txObj;
- }
-
- /** {@inheritDoc} */
- @Override protected void doBegin(Object transaction, TransactionDefinition definition) throws TransactionException {
- if (definition.getIsolationLevel() == TransactionDefinition.ISOLATION_READ_UNCOMMITTED)
- throw new InvalidIsolationLevelException("Ignite does not support READ_UNCOMMITTED isolation level.");
-
- IgniteTransactionObject txObj = (IgniteTransactionObject)transaction;
- Transaction tx = null;
-
- try {
- if (txObj.getTransactionHolder() == null || txObj.getTransactionHolder().isSynchronizedWithTransaction()) {
- long timeout = ignite.configuration().getTransactionConfiguration().getDefaultTxTimeout();
-
- if (definition.getTimeout() > 0)
- timeout = TimeUnit.SECONDS.toMillis(definition.getTimeout());
-
- Transaction newTx = ignite.transactions().txStart(transactionConcurrency,
- convertToIgniteIsolationLevel(definition.getIsolationLevel()), timeout, 0);
-
- if (log.isDebugEnabled())
- log.debug("Started Ignite transaction: " + newTx);
-
- txObj.setTransactionHolder(new IgniteTransactionHolder(newTx), true);
- }
-
- txObj.getTransactionHolder().setSynchronizedWithTransaction(true);
- txObj.getTransactionHolder().setTransactionActive(true);
-
- tx = txObj.getTransactionHolder().getTransaction();
- // Bind the session holder to the thread.
- if (txObj.isNewTransactionHolder())
- TransactionSynchronizationManager.bindResource(this.ignite, txObj.getTransactionHolder());
+ txCfg = ignite.configuration().getTransactionConfiguration();
}
- catch (Exception ex) {
- if (tx != null)
- tx.close();
- throw new CannotCreateTransactionException("Could not create Ignite transaction", ex);
- }
+ super.onApplicationEvent(evt);
}
/** {@inheritDoc} */
- @Override protected void doCommit(DefaultTransactionStatus status) throws TransactionException {
- IgniteTransactionObject txObj = (IgniteTransactionObject)status.getTransaction();
- Transaction tx = txObj.getTransactionHolder().getTransaction();
-
- if (status.isDebug() && log.isDebugEnabled())
- log.debug("Committing Ignite transaction: " + tx);
-
- try {
- tx.commit();
- }
- catch (IgniteException e) {
- throw new TransactionSystemException("Could not commit Ignite transaction", e);
- }
+ @Override public void setApplicationContext(ApplicationContext ctx) throws BeansException {
+ this.springCtx = ctx;
}
/** {@inheritDoc} */
- @Override protected void doRollback(DefaultTransactionStatus status) throws TransactionException {
- IgniteTransactionObject txObj = (IgniteTransactionObject)status.getTransaction();
- Transaction tx = txObj.getTransactionHolder().getTransaction();
-
- if (status.isDebug() && log.isDebugEnabled())
- log.debug("Rolling back Ignite transaction: " + tx);
-
- try {
- tx.rollback();
- }
- catch (IgniteException e) {
- throw new TransactionSystemException("Could not rollback Ignite transaction", e);
- }
+ @Override protected TransactionIsolation defaultTransactionIsolation() {
+ return txCfg.getDefaultTxIsolation();
}
/** {@inheritDoc} */
- @Override protected void doSetRollbackOnly(DefaultTransactionStatus status) throws TransactionException {
- IgniteTransactionObject txObj = (IgniteTransactionObject)status.getTransaction();
- Transaction tx = txObj.getTransactionHolder().getTransaction();
-
- assert tx != null;
-
- if (status.isDebug() && log.isDebugEnabled())
- log.debug("Setting Ignite transaction rollback-only: " + tx);
-
- tx.setRollbackOnly();
+ @Override protected long defaultTransactionTimeout() {
+ return txCfg.getDefaultTxTimeout();
}
/** {@inheritDoc} */
- @Override protected void doCleanupAfterCompletion(Object transaction) {
- IgniteTransactionObject txObj = (IgniteTransactionObject)transaction;
-
- // Remove the transaction holder from the thread, if exposed.
- if (txObj.isNewTransactionHolder()) {
- Transaction tx = txObj.getTransactionHolder().getTransaction();
- TransactionSynchronizationManager.unbindResource(this.ignite);
-
- if (log.isDebugEnabled())
- log.debug("Releasing Ignite transaction: " + tx);
- }
-
- txObj.getTransactionHolder().clear();
+ @Override protected IgniteLogger log() {
+ return ignite.log();
}
/** {@inheritDoc} */
- @Override protected boolean isExistingTransaction(Object transaction) throws TransactionException {
- IgniteTransactionObject txObj = (IgniteTransactionObject)transaction;
-
- return (txObj.getTransactionHolder() != null && txObj.getTransactionHolder().isTransactionActive());
+ @Override protected TransactionConcurrency defaultTransactionConcurrency() {
+ return txCfg.getDefaultTxConcurrency();
}
/** {@inheritDoc} */
- @Override public Object getResourceFactory() {
- return this.ignite;
- }
-
- /**
- * @param isolationLevel Spring isolation level.
- * @return Ignite isolation level.
- */
- private TransactionIsolation convertToIgniteIsolationLevel(int isolationLevel) {
- TransactionIsolation isolation = ignite.configuration().getTransactionConfiguration().getDefaultTxIsolation();
- switch (isolationLevel) {
- case TransactionDefinition.ISOLATION_READ_COMMITTED:
- isolation = TransactionIsolation.READ_COMMITTED;
- break;
- case TransactionDefinition.ISOLATION_REPEATABLE_READ:
- isolation = TransactionIsolation.REPEATABLE_READ;
- break;
- case TransactionDefinition.ISOLATION_SERIALIZABLE:
- isolation = TransactionIsolation.SERIALIZABLE;
- }
- return isolation;
+ @Override protected TransactionProxyFactory createTransactionFactory() {
+ return new IgniteTransactionProxyFactory(ignite.transactions());
}
- /**
- * An object representing a managed Ignite transaction.
- */
- private static class IgniteTransactionObject {
- /** */
- private IgniteTransactionHolder transactionHolder;
-
- /** */
- private boolean newTransactionHolder;
-
- /**
- * Sets the resource holder being used to hold Ignite resources in the
- * transaction.
- *
- * @param transactionHolder the transaction resource holder
- * @param newHolder true if the holder was created for this transaction,
- * false if it already existed
- */
- private void setTransactionHolder(IgniteTransactionHolder transactionHolder, boolean newHolder) {
- this.transactionHolder = transactionHolder;
- this.newTransactionHolder = newHolder;
- }
-
- /**
- * Returns the resource holder being used to hold Ignite resources in the
- * transaction.
- *
- * @return the transaction resource holder
- */
- private IgniteTransactionHolder getTransactionHolder() {
- return transactionHolder;
- }
-
- /**
- * Returns true if the transaction holder was created for the current
- * transaction and false if it existed prior to the transaction.
- *
- * @return true if the holder was created for this transaction, false if it
- * already existed
- */
- private boolean isNewTransactionHolder() {
- return newTransactionHolder;
- }
+ /** {@inheritDoc} */
+ @Override public void destroy() {
+ if (!externalIgniteInstance)
+ ignite.close();
}
}
diff --git a/modules/spring/src/test/java/org/apache/ignite/testsuites/IgniteSpringTestSuite.java b/modules/spring/src/test/java/org/apache/ignite/testsuites/IgniteSpringTestSuite.java
index c259db8..312ae5c 100644
--- a/modules/spring/src/test/java/org/apache/ignite/testsuites/IgniteSpringTestSuite.java
+++ b/modules/spring/src/test/java/org/apache/ignite/testsuites/IgniteSpringTestSuite.java
@@ -44,6 +44,7 @@ import org.apache.ignite.spring.injection.IgniteSpringBeanSpringResourceInjectio
import org.apache.ignite.startup.cmdline.GridCommandLineLoaderTest;
import org.apache.ignite.transactions.spring.GridSpringTransactionManagerSelfTest;
import org.apache.ignite.transactions.spring.GridSpringTransactionManagerSpringBeanSelfTest;
+import org.apache.ignite.transactions.spring.IgniteClientSpringTransactionManagerTest;
import org.apache.ignite.transactions.spring.SpringTransactionManagerContextInjectionTest;
import org.junit.runner.RunWith;
import org.junit.runners.Suite;
@@ -80,6 +81,7 @@ import org.junit.runners.Suite;
GridSpringTransactionManagerSelfTest.class,
GridSpringTransactionManagerSpringBeanSelfTest.class,
+ IgniteClientSpringTransactionManagerTest.class,
GridServiceInjectionSpringResourceTest.class,
IgniteSpringBeanSpringResourceInjectionTest.class,
diff --git a/modules/spring/src/test/java/org/apache/ignite/transactions/spring/GridSpringTransactionManagerAbstractTest.java b/modules/spring/src/test/java/org/apache/ignite/transactions/spring/GridSpringTransactionManagerAbstractTest.java
index e40b2bc..eb7cd78 100644
--- a/modules/spring/src/test/java/org/apache/ignite/transactions/spring/GridSpringTransactionManagerAbstractTest.java
+++ b/modules/spring/src/test/java/org/apache/ignite/transactions/spring/GridSpringTransactionManagerAbstractTest.java
@@ -17,9 +17,9 @@
package org.apache.ignite.transactions.spring;
-import org.apache.ignite.IgniteCache;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.spring.GridSpringTransactionService.CacheProxy;
import org.junit.Test;
import org.springframework.transaction.IllegalTransactionStateException;
import org.springframework.transaction.InvalidIsolationLevelException;
@@ -33,7 +33,7 @@ public abstract class GridSpringTransactionManagerAbstractTest extends GridCommo
protected static final String CACHE_NAME = "testCache";
/** */
- public abstract IgniteCache<Integer, String> cache();
+ public abstract CacheProxy<Integer, String> cache();
/** */
public abstract GridSpringTransactionService service();
diff --git a/modules/spring/src/test/java/org/apache/ignite/transactions/spring/GridSpringTransactionManagerSelfTest.java b/modules/spring/src/test/java/org/apache/ignite/transactions/spring/GridSpringTransactionManagerSelfTest.java
index 1f85f5a..d5e2a8e 100644
--- a/modules/spring/src/test/java/org/apache/ignite/transactions/spring/GridSpringTransactionManagerSelfTest.java
+++ b/modules/spring/src/test/java/org/apache/ignite/transactions/spring/GridSpringTransactionManagerSelfTest.java
@@ -17,10 +17,11 @@
package org.apache.ignite.transactions.spring;
-import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.transactions.spring.GridSpringTransactionService.CacheProxy;
+import org.apache.ignite.transactions.spring.GridSpringTransactionService.IgniteCacheProxy;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.GenericXmlApplicationContext;
@@ -45,8 +46,8 @@ public class GridSpringTransactionManagerSelfTest extends GridSpringTransactionM
return cfg;
}
- @Override public IgniteCache<Integer, String> cache() {
- return grid().cache(CACHE_NAME);
+ @Override public CacheProxy<Integer, String> cache() {
+ return new IgniteCacheProxy<>(grid().cache(CACHE_NAME));
}
@Override public GridSpringTransactionService service() {
diff --git a/modules/spring/src/test/java/org/apache/ignite/transactions/spring/GridSpringTransactionManagerSpringBeanSelfTest.java b/modules/spring/src/test/java/org/apache/ignite/transactions/spring/GridSpringTransactionManagerSpringBeanSelfTest.java
index 6ed14cf..19774a6 100644
--- a/modules/spring/src/test/java/org/apache/ignite/transactions/spring/GridSpringTransactionManagerSpringBeanSelfTest.java
+++ b/modules/spring/src/test/java/org/apache/ignite/transactions/spring/GridSpringTransactionManagerSpringBeanSelfTest.java
@@ -18,7 +18,8 @@
package org.apache.ignite.transactions.spring;
import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteCache;
+import org.apache.ignite.transactions.spring.GridSpringTransactionService.CacheProxy;
+import org.apache.ignite.transactions.spring.GridSpringTransactionService.IgniteCacheProxy;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.GenericXmlApplicationContext;
@@ -30,8 +31,8 @@ public class GridSpringTransactionManagerSpringBeanSelfTest extends GridSpringTr
/** */
private GridSpringTransactionService service;
- @Override public IgniteCache<Integer, String> cache() {
- return ignite.cache(CACHE_NAME);
+ @Override public CacheProxy<Integer, String> cache() {
+ return new IgniteCacheProxy<>(ignite.cache(CACHE_NAME));
}
@Override public GridSpringTransactionService service() {
diff --git a/modules/spring/src/test/java/org/apache/ignite/transactions/spring/GridSpringTransactionService.java b/modules/spring/src/test/java/org/apache/ignite/transactions/spring/GridSpringTransactionService.java
index 1a4c2b6..4a0d02f 100644
--- a/modules/spring/src/test/java/org/apache/ignite/transactions/spring/GridSpringTransactionService.java
+++ b/modules/spring/src/test/java/org/apache/ignite/transactions/spring/GridSpringTransactionService.java
@@ -18,6 +18,8 @@
package org.apache.ignite.transactions.spring;
import org.apache.ignite.IgniteCache;
+import org.apache.ignite.client.ClientCache;
+import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.annotation.Isolation;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
@@ -26,12 +28,16 @@ import org.springframework.transaction.annotation.Transactional;
* Service.
*/
public class GridSpringTransactionService {
+ /** */
+ @Autowired
+ private GridSpringTransactionService self;
+
/**
* @param cache Cache.
* @param entryCnt Entries count.
*/
@Transactional
- public void put(IgniteCache<Integer, String> cache, int entryCnt) {
+ public void put(CacheProxy<Integer, String> cache, int entryCnt) {
for (int i = 0; i < entryCnt; i++)
cache.put(i, String.valueOf(i));
}
@@ -41,7 +47,7 @@ public class GridSpringTransactionService {
* @param entryCnt Entries count.
*/
@Transactional
- public void putWithError(IgniteCache<Integer, String> cache, int entryCnt) {
+ public void putWithError(CacheProxy<Integer, String> cache, int entryCnt) {
for (int i = 0; i < entryCnt; i++)
cache.put(i, String.valueOf(i));
@@ -52,7 +58,7 @@ public class GridSpringTransactionService {
* @param cache Cache.
*/
@Transactional(propagation = Propagation.MANDATORY)
- public void putWithMandatoryPropagation(IgniteCache<Integer, String> cache) {
+ public void putWithMandatoryPropagation(CacheProxy<Integer, String> cache) {
cache.put(1, "1");
}
@@ -60,7 +66,84 @@ public class GridSpringTransactionService {
* @param cache Cache.
*/
@Transactional(isolation = Isolation.READ_UNCOMMITTED)
- public void putWithUnsupportedIsolationLevel(IgniteCache<Integer, String> cache) {
+ public void putWithUnsupportedIsolationLevel(CacheProxy<Integer, String> cache) {
cache.put(1, "1");
}
+
+ /** */
+ @Transactional
+ public void putWithNestedError(CacheProxy<Integer, String> cache, int entryCnt) {
+ self.put(cache, entryCnt);
+
+ try {
+ self.putWithError(cache, entryCnt);
+ }
+ catch (Exception ignored) {
+ // No-op.
+ }
+ }
+
+ /** */
+ public static class ClientCacheProxy<K, V> implements CacheProxy<K, V> {
+ /** */
+ private final ClientCache<K, V> cliCache;
+
+ /** */
+ public ClientCacheProxy(ClientCache<K, V> cliCache) {
+ this.cliCache = cliCache;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void put(K key, V val) {
+ cliCache.put(key, val);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int size() {
+ return cliCache.size();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void removeAll() {
+ cliCache.removeAll();
+ }
+ }
+
+ /** */
+ public static class IgniteCacheProxy<K, V> implements CacheProxy<K, V> {
+ /** */
+ private final IgniteCache<K, V> cache;
+
+ /** */
+ public IgniteCacheProxy(IgniteCache<K, V> cache) {
+ this.cache = cache;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void put(K key, V val) {
+ cache.put(key, val);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int size() {
+ return cache.size();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void removeAll() {
+ cache.removeAll();
+ }
+ }
+
+ /** */
+ public static interface CacheProxy<K, V> {
+ /** */
+ public void put(K key, V val);
+
+ /** */
+ public int size();
+
+ /** */
+ public void removeAll();
+ }
}
diff --git a/modules/spring/src/test/java/org/apache/ignite/transactions/spring/IgniteClientSpringTransactionManagerTest.java b/modules/spring/src/test/java/org/apache/ignite/transactions/spring/IgniteClientSpringTransactionManagerTest.java
new file mode 100644
index 0000000..0d333ea
--- /dev/null
+++ b/modules/spring/src/test/java/org/apache/ignite/transactions/spring/IgniteClientSpringTransactionManagerTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.transactions.spring;
+
+import org.apache.ignite.Ignition;
+import org.apache.ignite.client.IgniteClient;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.ClientConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.transactions.spring.GridSpringTransactionService.CacheProxy;
+import org.apache.ignite.transactions.spring.GridSpringTransactionService.ClientCacheProxy;
+import org.springframework.context.annotation.AnnotationConfigApplicationContext;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.transaction.UnexpectedRollbackException;
+import org.springframework.transaction.annotation.EnableTransactionManagement;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.configuration.ClientConnectorConfiguration.DFLT_PORT;
+
+/** Tests Spring Transactions manager implementation that uses thin client to access the Ignite cluster. */
+public class IgniteClientSpringTransactionManagerTest extends GridSpringTransactionManagerAbstractTest {
+ /** Spring application context. */
+ private static AnnotationConfigApplicationContext ctx;
+
+ /** Ignite thin client instance. */
+ private static IgniteClient cli;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ return super.getConfiguration(igniteInstanceName)
+ .setCacheConfiguration(new CacheConfiguration<>(CACHE_NAME)
+ .setAtomicityMode(TRANSACTIONAL));
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ startGrid();
+
+ ctx = new AnnotationConfigApplicationContext(IgniteClientSpringTransactionManagerApplicationContext.class);
+ cli = ctx.getBean(IgniteClient.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ super.afterTestsStopped();
+
+ ctx.close();
+ }
+
+ /** {@inheritDoc} */
+ @Override public CacheProxy<Integer, String> cache() {
+ return new ClientCacheProxy<>(cli.cache(CACHE_NAME));
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridSpringTransactionService service() {
+ return ctx.getBean(GridSpringTransactionService.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void testDoSetRollbackOnlyInExistingTransaction() {
+ GridTestUtils.assertThrowsAnyCause(
+ log,
+ () -> {
+ service().putWithNestedError(cache(), 1_000);
+
+ return null;
+ },
+ UnexpectedRollbackException.class,
+ "Transaction rolled back because it has been marked as rollback-only");
+
+ assertEquals(0, cache().size());
+ }
+
+ /** */
+ @Configuration
+ @EnableTransactionManagement
+ public static class IgniteClientSpringTransactionManagerApplicationContext {
+ /** */
+ @Bean
+ public GridSpringTransactionService transactionService() {
+ return new GridSpringTransactionService();
+ }
+
+ /** */
+ @Bean
+ public IgniteClient igniteClient() {
+ return Ignition.startClient(new ClientConfiguration().setAddresses("127.0.0.1:" + DFLT_PORT));
+ }
+
+ /** */
+ @Bean
+ public AbstractSpringTransactionManager transactionManager(IgniteClient cli) {
+ IgniteClientSpringTransactionManager mgr = new IgniteClientSpringTransactionManager();
+
+ mgr.setClientInstance(cli);
+
+ return mgr;
+ }
+ }
+}