You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by nt...@apache.org on 2017/08/15 18:00:04 UTC
[03/29] ignite git commit: ignite-5712 Context switching for
optimistic transactions
ignite-5712 Context switching for optimistic transactions
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e255a564
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e255a564
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e255a564
Branch: refs/heads/ignite-5947
Commit: e255a564985a12113984ec02f15a4443495b8ffc
Parents: 3fdf453
Author: Nikolay Izhikov <ni...@gmail.com>
Authored: Wed Aug 2 11:52:44 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Aug 2 18:55:36 2017 +0300
----------------------------------------------------------------------
.../ignite/tests/utils/TestTransaction.java | 10 +
.../cache/GridCacheSharedContext.java | 24 +
.../cache/distributed/near/GridNearTxLocal.java | 51 ++
.../store/GridCacheStoreManagerAdapter.java | 10 +
.../cache/transactions/IgniteTxAdapter.java | 20 +-
.../cache/transactions/IgniteTxManager.java | 74 ++
.../cache/transactions/IgniteTxMap.java | 2 +-
.../transactions/TransactionProxyImpl.java | 46 +-
.../apache/ignite/transactions/Transaction.java | 14 +
.../ignite/transactions/TransactionState.java | 7 +-
...ptimisticTxSuspendResumeMultiServerTest.java | 30 +
.../IgniteOptimisticTxSuspendResumeTest.java | 751 +++++++++++++++++++
.../IgnitePessimisticTxSuspendResumeTest.java | 91 +++
.../ignite/testframework/GridTestUtils.java | 26 +
.../cache/GridAbstractCacheStoreSelfTest.java | 10 +
.../testsuites/IgniteCacheTestSuite6.java | 42 ++
.../processors/cache/jta/CacheJtaManager.java | 5 +-
.../processors/cache/jta/CacheJtaResource.java | 28 +-
.../GridJtaTransactionManagerSelfTest.java | 208 +++++
.../ignite/testsuites/IgniteJtaTestSuite.java | 5 +-
20 files changed, 1438 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/e255a564/modules/cassandra/store/src/test/java/org/apache/ignite/tests/utils/TestTransaction.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/utils/TestTransaction.java b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/utils/TestTransaction.java
index 4a03d25..e587bd7 100644
--- a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/utils/TestTransaction.java
+++ b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/utils/TestTransaction.java
@@ -140,4 +140,14 @@ public class TestTransaction implements Transaction {
@Override public IgniteFuture<Void> rollbackAsync() throws IgniteException {
return null;
}
+
+ /** {@inheritDoc} */
+ @Override public void suspend() throws IgniteException{
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void resume() throws IgniteException {
+ // No-op.
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e255a564/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
index 5387cc8..3fb63dc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
@@ -944,6 +944,30 @@ public class GridCacheSharedContext<K, V> {
}
/**
+ * Suspends transaction. It could be resume later. Supported only for optimistic transactions.
+ *
+ * @param tx Transaction to suspend.
+ * @throws IgniteCheckedException If suspension failed.
+ */
+ public void suspendTx(GridNearTxLocal tx) throws IgniteCheckedException {
+ tx.txState().awaitLastFut(this);
+
+ tx.suspend();
+ }
+
+ /**
+ * Resume transaction if it was previously suspended.
+ *
+ * @param tx Transaction to resume.
+ * @throws IgniteCheckedException If resume failed.
+ */
+ public void resumeTx(GridNearTxLocal tx) throws IgniteCheckedException {
+ tx.txState().awaitLastFut(this);
+
+ tx.resume();
+ }
+
+ /**
* @return Store session listeners.
*/
@Nullable public Collection<CacheStoreSessionListener> storeSessionListeners() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/e255a564/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index 81e5ca8..2fb0ff3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -105,12 +105,14 @@ import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRA
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.UPDATE;
import static org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry.SER_READ_EMPTY_ENTRY_VER;
import static org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry.SER_READ_NOT_EMPTY_VER;
+import static org.apache.ignite.transactions.TransactionState.ACTIVE;
import static org.apache.ignite.transactions.TransactionState.COMMITTED;
import static org.apache.ignite.transactions.TransactionState.COMMITTING;
import static org.apache.ignite.transactions.TransactionState.PREPARED;
import static org.apache.ignite.transactions.TransactionState.PREPARING;
import static org.apache.ignite.transactions.TransactionState.ROLLED_BACK;
import static org.apache.ignite.transactions.TransactionState.ROLLING_BACK;
+import static org.apache.ignite.transactions.TransactionState.SUSPENDED;
import static org.apache.ignite.transactions.TransactionState.UNKNOWN;
/**
@@ -2851,6 +2853,47 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea
}
/**
+ * Suspends transaction. It could be resumed later. Supported only for optimistic transactions.
+ *
+ * @throws IgniteCheckedException If the transaction is in an incorrect state, or timed out.
+ */
+ public void suspend() throws IgniteCheckedException {
+ if (log.isDebugEnabled())
+ log.debug("Suspend near local tx: " + this);
+
+ if (pessimistic())
+ throw new UnsupportedOperationException("Suspension is not supported for pessimistic transactions.");
+
+ if (threadId() != Thread.currentThread().getId())
+ throw new IgniteCheckedException("Only thread started transaction can suspend it.");
+
+ synchronized (this) {
+ checkValid();
+
+ cctx.tm().suspendTx(this);
+ }
+ }
+
+ /**
+ * Resumes transaction (possibly in another thread) if it was previously suspended.
+ *
+ * @throws IgniteCheckedException If the transaction is in an incorrect state, or timed out.
+ */
+ public void resume() throws IgniteCheckedException {
+ if (log.isDebugEnabled())
+ log.debug("Resume near local tx: " + this);
+
+ if (pessimistic())
+ throw new UnsupportedOperationException("Resume is not supported for pessimistic transactions.");
+
+ synchronized (this) {
+ checkValid();
+
+ cctx.tm().resumeTx(this);
+ }
+ }
+
+ /**
* @param maps Mappings.
*/
void addEntryMapping(@Nullable Collection<GridDistributedTxMapping> maps) {
@@ -3952,6 +3995,14 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea
}
/**
+ * @param threadId new owner of transaction.
+ * @throws IgniteCheckedException if method executed not in the middle of resume or suspend.
+ */
+ public void threadId(long threadId) {
+ this.threadId = threadId;
+ }
+
+ /**
* Post-lock closure.
*
* @param <T> Return type.
http://git-wip-us.apache.org/repos/asf/ignite/blob/e255a564/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
index c02e2c7..bb16ad1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
@@ -1358,6 +1358,11 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
}
/** {@inheritDoc} */
+ @Override public void suspend() throws IgniteException {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
@Override public long timeout() {
return tx.timeout();
}
@@ -1403,6 +1408,11 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
}
/** {@inheritDoc} */
+ @Override public void resume() throws IgniteException {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
@Override public IgniteAsyncSupport withAsync() {
throw new UnsupportedOperationException();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e255a564/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index 91ce3ce..61ca78c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -97,6 +97,7 @@ import static org.apache.ignite.transactions.TransactionState.PREPARED;
import static org.apache.ignite.transactions.TransactionState.PREPARING;
import static org.apache.ignite.transactions.TransactionState.ROLLED_BACK;
import static org.apache.ignite.transactions.TransactionState.ROLLING_BACK;
+import static org.apache.ignite.transactions.TransactionState.SUSPENDED;
/**
* Managed transaction adapter.
@@ -977,10 +978,10 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
switch (state) {
case ACTIVE: {
- valid = false;
+ valid = prev == SUSPENDED;
break;
- } // Active is initial state and cannot be transitioned to.
+ }
case PREPARING: {
valid = prev == ACTIVE;
@@ -1025,15 +1026,20 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
}
case MARKED_ROLLBACK: {
- valid = prev == ACTIVE || prev == PREPARING || prev == PREPARED;
+ valid = prev == ACTIVE || prev == PREPARING || prev == PREPARED || prev == SUSPENDED;
break;
}
case ROLLING_BACK: {
- valid =
- prev == ACTIVE || prev == MARKED_ROLLBACK || prev == PREPARING ||
- prev == PREPARED || (prev == COMMITTING && local() && !dht());
+ valid = prev == ACTIVE || prev == MARKED_ROLLBACK || prev == PREPARING ||
+ prev == PREPARED || prev == SUSPENDED || (prev == COMMITTING && local() && !dht());
+
+ break;
+ }
+
+ case SUSPENDED: {
+ valid = prev == ACTIVE;
break;
}
@@ -1064,7 +1070,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
if (valid) {
// Seal transactions maps.
- if (state != ACTIVE)
+ if (state != ACTIVE && state != SUSPENDED)
seal();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e255a564/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index 7d612ec..a427da3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -113,6 +113,7 @@ import static org.apache.ignite.transactions.TransactionState.COMMITTING;
import static org.apache.ignite.transactions.TransactionState.MARKED_ROLLBACK;
import static org.apache.ignite.transactions.TransactionState.PREPARED;
import static org.apache.ignite.transactions.TransactionState.PREPARING;
+import static org.apache.ignite.transactions.TransactionState.SUSPENDED;
import static org.apache.ignite.transactions.TransactionState.UNKNOWN;
import static org.jsr166.ConcurrentLinkedHashMap.QueuePolicy.PER_SEGMENT_Q;
@@ -2240,6 +2241,79 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
}
/**
+ * Suspends transaction.
+ * Should not be used directly. Use tx.suspend() instead.
+ *
+ * @param tx Transaction to be suspended.
+ *
+ * @see #resumeTx(GridNearTxLocal)
+ * @see GridNearTxLocal#suspend()
+ * @see GridNearTxLocal#resume()
+ */
+ public void suspendTx(final GridNearTxLocal tx) throws IgniteCheckedException {
+ assert tx != null && !tx.system() : tx;
+
+ if (!tx.state(SUSPENDED)) {
+ throw new IgniteCheckedException("Trying to suspend transaction with incorrect state "
+ + "[expected=" + ACTIVE + ", actual=" + tx.state() + ']');
+ }
+
+ clearThreadMap(tx);
+
+ transactionMap(tx).remove(tx.xidVersion(), tx);
+ }
+
+ /**
+ * Resume transaction in current thread.
+ * Please don't use directly. Use tx.resume() instead.
+ *
+ * @param tx Transaction to be resumed.
+ *
+ * @see #suspendTx(GridNearTxLocal)
+ * @see GridNearTxLocal#suspend()
+ * @see GridNearTxLocal#resume()
+ */
+ public void resumeTx(GridNearTxLocal tx) throws IgniteCheckedException {
+ assert tx != null && !tx.system() : tx;
+ assert !threadMap.containsValue(tx) : tx;
+ assert !transactionMap(tx).containsValue(tx) : tx;
+ assert !haveSystemTxForThread(Thread.currentThread().getId());
+
+ if(!tx.state(ACTIVE)) {
+ throw new IgniteCheckedException("Trying to resume transaction with incorrect state "
+ + "[expected=" + SUSPENDED + ", actual=" + tx.state() + ']');
+ }
+
+ long threadId = Thread.currentThread().getId();
+
+ if (threadMap.putIfAbsent(threadId, tx) != null)
+ throw new IgniteCheckedException("Thread already start a transaction.");
+
+ if (transactionMap(tx).putIfAbsent(tx.xidVersion(), tx) != null)
+ throw new IgniteCheckedException("Thread already start a transaction.");
+
+ tx.threadId(threadId);
+ }
+
+ /**
+ * @param threadId Thread id.
+ * @return True if thread have system transaction. False otherwise.
+ */
+ private boolean haveSystemTxForThread(long threadId) {
+ if (!sysThreadMap.isEmpty()) {
+ for (GridCacheContext cacheCtx : cctx.cache().context().cacheContexts()) {
+ if (!cacheCtx.systemTx())
+ continue;
+
+ if (sysThreadMap.containsKey(new TxThreadKey(threadId, cacheCtx.cacheId())))
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ /**
* Timeout object for node failure handler.
*/
private final class NodeFailureTimeoutObject extends GridTimeoutObjectAdapter {
http://git-wip-us.apache.org/repos/asf/ignite/blob/e255a564/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxMap.java
index 429c995..6b79550 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxMap.java
@@ -190,4 +190,4 @@ public class IgniteTxMap extends AbstractMap<IgniteTxKey, IgniteTxEntry> impleme
@Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
throw new IllegalStateException("Transaction view map should never be serialized: " + this);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e255a564/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java
index 8750cab..f25fc36 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java
@@ -44,6 +44,8 @@ import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
import org.apache.ignite.transactions.TransactionState;
+import static org.apache.ignite.transactions.TransactionState.SUSPENDED;
+
/**
* Cache transaction proxy.
*/
@@ -98,6 +100,18 @@ public class TransactionProxyImpl<K, V> implements TransactionProxy, Externaliza
* Enters a call.
*/
private void enter() {
+ enter(false);
+ }
+
+ /**
+ * Enters a call.
+ *
+ * @param resume Flag to indicate that resume operation in progress.
+ */
+ private void enter(boolean resume) {
+ if (!resume && state() == SUSPENDED)
+ throw new IgniteException("Tx in SUSPENDED state. All operations except resume are prohibited.");
+
if (cctx.deploymentEnabled())
cctx.deploy().onEnter();
@@ -204,6 +218,21 @@ public class TransactionProxyImpl<K, V> implements TransactionProxy, Externaliza
}
/** {@inheritDoc} */
+ @Override public void suspend() throws IgniteException {
+ enter();
+
+ try {
+ cctx.suspendTx(tx);
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ finally {
+ leave();
+ }
+ }
+
+ /** {@inheritDoc} */
@Override public long timeout(long timeout) {
return tx.timeout(timeout);
}
@@ -333,6 +362,21 @@ public class TransactionProxyImpl<K, V> implements TransactionProxy, Externaliza
}
}
+ /** {@inheritDoc} */
+ @Override public void resume() throws IgniteException {
+ enter(true);
+
+ try {
+ cctx.resumeTx(tx);
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ finally {
+ leave();
+ }
+ }
+
/**
* @param res Result to convert to finished future.
*/
@@ -377,4 +421,4 @@ public class TransactionProxyImpl<K, V> implements TransactionProxy, Externaliza
@Override public String toString() {
return S.toString(TransactionProxyImpl.class, this);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e255a564/modules/core/src/main/java/org/apache/ignite/transactions/Transaction.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/transactions/Transaction.java b/modules/core/src/main/java/org/apache/ignite/transactions/Transaction.java
index 57a2b00..a1b4d78 100644
--- a/modules/core/src/main/java/org/apache/ignite/transactions/Transaction.java
+++ b/modules/core/src/main/java/org/apache/ignite/transactions/Transaction.java
@@ -272,4 +272,18 @@ public interface Transaction extends AutoCloseable, IgniteAsyncSupport {
* @throws IgniteException If rollback failed.
*/
public IgniteFuture<Void> rollbackAsync() throws IgniteException;
+
+ /**
+ * Resume transaction if it was previously suspended. <strong>Supported only for optimistic transactions.</strong>
+ *
+ * @throws IgniteException If resume failed.
+ */
+ public void resume() throws IgniteException;
+
+ /**
+ * Suspends transaction. It could be resumed later. <strong>Supported only for optimistic transactions.</strong>
+ *
+ * @throws IgniteException If suspension failed.
+ */
+ public void suspend() throws IgniteException;
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/e255a564/modules/core/src/main/java/org/apache/ignite/transactions/TransactionState.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/transactions/TransactionState.java b/modules/core/src/main/java/org/apache/ignite/transactions/TransactionState.java
index 1980242..d01c0fa 100644
--- a/modules/core/src/main/java/org/apache/ignite/transactions/TransactionState.java
+++ b/modules/core/src/main/java/org/apache/ignite/transactions/TransactionState.java
@@ -48,7 +48,10 @@ public enum TransactionState {
ROLLED_BACK,
/** Transaction rollback failed or is otherwise unknown state. */
- UNKNOWN;
+ UNKNOWN,
+
+ /** Transaction has been suspended by user. */
+ SUSPENDED;
/** Enumerated values. */
private static final TransactionState[] VALS = values();
@@ -62,4 +65,4 @@ public enum TransactionState {
@Nullable public static TransactionState fromOrdinal(int ord) {
return ord >= 0 && ord < VALS.length ? VALS[ord] : null;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e255a564/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteOptimisticTxSuspendResumeMultiServerTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteOptimisticTxSuspendResumeMultiServerTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteOptimisticTxSuspendResumeMultiServerTest.java
new file mode 100644
index 0000000..a6318d4
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteOptimisticTxSuspendResumeMultiServerTest.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+/**
+ *
+ */
+public class IgniteOptimisticTxSuspendResumeMultiServerTest extends IgniteOptimisticTxSuspendResumeTest {
+ /**
+ * @return Number of server nodes.
+ */
+ protected int serversNumber() {
+ return 4;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e255a564/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteOptimisticTxSuspendResumeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteOptimisticTxSuspendResumeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteOptimisticTxSuspendResumeTest.java
new file mode 100644
index 0000000..d16aebd
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteOptimisticTxSuspendResumeTest.java
@@ -0,0 +1,751 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.util.typedef.CI1;
+import org.apache.ignite.internal.util.typedef.CI2;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionIsolation;
+import org.apache.ignite.transactions.TransactionTimeoutException;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
+import static org.apache.ignite.transactions.TransactionState.ACTIVE;
+import static org.apache.ignite.transactions.TransactionState.COMMITTED;
+import static org.apache.ignite.transactions.TransactionState.MARKED_ROLLBACK;
+import static org.apache.ignite.transactions.TransactionState.ROLLED_BACK;
+import static org.apache.ignite.transactions.TransactionState.SUSPENDED;
+
+/**
+ *
+ */
+public class IgniteOptimisticTxSuspendResumeTest extends GridCommonAbstractTest {
+ /** Transaction timeout. */
+ private static final long TX_TIMEOUT = 100;
+
+ /** Future timeout */
+ private static final int FUT_TIMEOUT = 5000;
+
+ private boolean client = false;
+
+ /**
+ * List of closures to execute transaction operation that prohibited in suspended state.
+ */
+ private static final List<CI1Exc<Transaction>> SUSPENDED_TX_PROHIBITED_OPS = Arrays.asList(
+ new CI1Exc<Transaction>() {
+ @Override public void applyx(Transaction tx) throws Exception {
+ tx.suspend();
+ }
+ },
+ new CI1Exc<Transaction>() {
+ @Override public void applyx(Transaction tx) throws Exception {
+ tx.close();
+ }
+ },
+ new CI1Exc<Transaction>() {
+ @Override public void applyx(Transaction tx) throws Exception {
+ tx.commit();
+ }
+ },
+ new CI1Exc<Transaction>() {
+ @Override public void applyx(Transaction tx) throws Exception {
+ tx.commitAsync().get(FUT_TIMEOUT);
+ }
+ },
+ new CI1Exc<Transaction>() {
+ @Override public void applyx(Transaction tx) throws Exception {
+ tx.rollback();
+ }
+ },
+ new CI1Exc<Transaction>() {
+ @Override public void applyx(Transaction tx) throws Exception {
+ tx.rollbackAsync().get(FUT_TIMEOUT);
+ }
+ },
+ new CI1Exc<Transaction>() {
+ @Override public void applyx(Transaction tx) throws Exception {
+ tx.setRollbackOnly();
+ }
+ }
+ );
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ cfg.setClientMode(client);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ startGrids(serversNumber());
+
+ if (serversNumber() > 1) {
+ client = true;
+
+ startGrid(serversNumber());
+
+ startGrid(serversNumber() + 1);
+
+ client = false;
+ }
+
+ awaitPartitionMapExchange();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ super.afterTestsStopped();
+
+ stopAllGrids(true);
+ }
+
+ /**
+ * @return Number of server nodes.
+ */
+ protected int serversNumber() {
+ return 1;
+ }
+
+ /**
+ * Test for transaction starting in one thread, continuing in another.
+ *
+ * @throws Exception If failed.
+ */
+ public void testResumeTxInAnotherThread() throws Exception {
+ executeTestForAllCaches(new CI2Exc<Ignite, IgniteCache<Integer, Integer>>() {
+ @Override public void applyx(Ignite ignite, final IgniteCache<Integer, Integer> cache) throws Exception {
+ for (TransactionIsolation isolation : TransactionIsolation.values()) {
+ final Transaction tx = ignite.transactions().txStart(OPTIMISTIC, isolation);
+
+ final AtomicInteger cntr = new AtomicInteger(0);
+
+ cache.put(-1, -1);
+ cache.put(cntr.get(), cntr.getAndIncrement());
+
+ tx.suspend();
+
+ assertEquals(SUSPENDED, tx.state());
+
+ assertNull("Thread already have tx", ignite.transactions().tx());
+
+ assertNull(cache.get(-1));
+ assertNull(cache.get(cntr.get()));
+
+ for (int i = 0; i < 10; i++) {
+ GridTestUtils.runAsync(new Runnable() {
+ @Override public void run() {
+ assertEquals(SUSPENDED, tx.state());
+
+ tx.resume();
+
+ assertEquals(ACTIVE, tx.state());
+
+ cache.put(cntr.get(), cntr.getAndIncrement());
+
+ tx.suspend();
+ }
+ }).get(FUT_TIMEOUT);
+ }
+
+ tx.resume();
+
+ cache.remove(-1);
+
+ tx.commit();
+
+ assertEquals(COMMITTED, tx.state());
+
+ for (int i = 0; i < cntr.get(); i++)
+ assertEquals(i, (int)cache.get(i));
+
+ assertFalse(cache.containsKey(-1));
+
+ cache.removeAll();
+ }
+ }
+ });
+ }
+
+ /**
+ * Test for transaction starting in one thread, continuing in another, and resuming in initiating thread.
+ * Cache operations performed for a couple of caches.
+ *
+ * @throws Exception If failed.
+ */
+ public void testCrossCacheTxInAnotherThread() throws Exception {
+ executeTestForAllCaches(new CI2Exc<Ignite, IgniteCache<Integer, Integer>>() {
+ @Override public void applyx(Ignite ignite, final IgniteCache<Integer, Integer> cache) throws Exception {
+ for (TransactionIsolation isolation : TransactionIsolation.values()) {
+ final IgniteCache<Integer, Integer> otherCache =
+ ignite.getOrCreateCache(cacheConfiguration(PARTITIONED, 0, false).setName("otherCache"));
+
+ final Transaction tx = ignite.transactions().txStart(OPTIMISTIC, isolation);
+
+ final AtomicInteger cntr = new AtomicInteger(0);
+
+ cache.put(-1, -1);
+ otherCache.put(-1, -1);
+
+ tx.suspend();
+
+ for (int i = 0; i < 10; i++) {
+ GridTestUtils.runAsync(new Runnable() {
+ @Override public void run() {
+ tx.resume();
+
+ assertEquals(ACTIVE, tx.state());
+
+ cache.put(cntr.get(), cntr.get());
+ otherCache.put(cntr.get(), cntr.getAndIncrement());
+
+ tx.suspend();
+ }
+ }).get(FUT_TIMEOUT);
+ }
+
+ tx.resume();
+
+ cache.remove(-1);
+ otherCache.remove(-1);
+
+ tx.commit();
+
+ assertEquals(COMMITTED, tx.state());
+
+ for (int i = 0; i < cntr.get(); i++) {
+ assertEquals(i, (int)cache.get(i));
+ assertEquals(i, (int)otherCache.get(i));
+ }
+
+ assertFalse(cache.containsKey(-1));
+ assertFalse(otherCache.containsKey(-1));
+
+ cache.removeAll();
+ otherCache.removeAll();
+ }
+ }
+ });
+ }
+
+ /**
+ * Test for transaction rollback.
+ *
+ * @throws Exception If failed.
+ */
+ public void testTxRollback() throws Exception {
+ executeTestForAllCaches(new CI2Exc<Ignite, IgniteCache<Integer, Integer>>() {
+ @Override public void applyx(Ignite ignite, final IgniteCache<Integer, Integer> cache) throws Exception {
+ for (TransactionIsolation isolation : TransactionIsolation.values()) {
+ final Transaction tx = ignite.transactions().txStart(OPTIMISTIC, isolation);
+
+ cache.put(1, 1);
+ cache.put(2, 2);
+
+ tx.suspend();
+
+ assertNull("There is no transaction for current thread", ignite.transactions().tx());
+
+ assertEquals(SUSPENDED, tx.state());
+
+ GridTestUtils.runAsync(new Runnable() {
+ @Override public void run() {
+ tx.resume();
+
+ assertEquals(ACTIVE, tx.state());
+
+ cache.put(3, 3);
+
+ tx.rollback();
+ }
+ }).get(FUT_TIMEOUT);
+
+ assertEquals(ROLLED_BACK, tx.state());
+
+ assertFalse(cache.containsKey(1));
+ assertFalse(cache.containsKey(2));
+ assertFalse(cache.containsKey(3));
+
+ cache.removeAll();
+ }
+ }
+ });
+ }
+
+ /**
+ * Test for starting and suspending transactions, and then resuming and committing in another thread.
+ *
+ * @throws Exception If failed.
+ */
+ public void testMultiTxSuspendResume() throws Exception {
+ executeTestForAllCaches(new CI2Exc<Ignite, IgniteCache<Integer, Integer>>() {
+ @Override public void applyx(Ignite ignite, final IgniteCache<Integer, Integer> cache) throws Exception {
+ for (TransactionIsolation isolation : TransactionIsolation.values()) {
+ final List<Transaction> clientTxs = new ArrayList<>();
+
+ for (int i = 0; i < 10; i++) {
+ Transaction tx = ignite.transactions().txStart(OPTIMISTIC, isolation);
+
+ cache.put(i, i);
+
+ tx.suspend();
+
+ clientTxs.add(tx);
+ }
+
+ GridTestUtils.runMultiThreaded(new CI1Exc<Integer>() {
+ public void applyx(Integer idx) throws Exception {
+ Transaction tx = clientTxs.get(idx);
+
+ assertEquals(SUSPENDED, tx.state());
+
+ tx.resume();
+
+ assertEquals(ACTIVE, tx.state());
+
+ tx.commit();
+ }
+ }, 10, "th-suspend");
+
+ for (int i = 0; i < 10; i++)
+ assertEquals(i, (int)cache.get(i));
+
+ cache.removeAll();
+ }
+ }
+ });
+ }
+
+ /**
+ * Test checking all operations(exception resume) on suspended transaction from the other thread are prohibited.
+ *
+ * @throws Exception If failed.
+ */
+ public void testOpsProhibitedOnSuspendedTxFromOtherThread() throws Exception {
+ executeTestForAllCaches(new CI2Exc<Ignite, IgniteCache<Integer, Integer>>() {
+ @Override public void applyx(Ignite ignite, final IgniteCache<Integer, Integer> cache) throws Exception {
+ for (final CI1Exc<Transaction> txOperation : SUSPENDED_TX_PROHIBITED_OPS) {
+ for (TransactionIsolation isolation : TransactionIsolation.values()) {
+ final Transaction tx = ignite.transactions().txStart(OPTIMISTIC, isolation);
+
+ cache.put(1, 1);
+
+ tx.suspend();
+
+ multithreaded(new RunnableX() {
+ @Override public void runx() throws Exception {
+ GridTestUtils.assertThrowsWithCause(txOperation, tx, IgniteException.class);
+ }
+ }, 1);
+
+ tx.resume();
+ tx.close();
+
+ assertNull(cache.get(1));
+ }
+ }
+ }
+ });
+ }
+
+ /**
+ * Test checking all operations(exception resume) on suspended transaction are prohibited.
+ *
+ * @throws Exception If failed.
+ */
+ public void testOpsProhibitedOnSuspendedTx() throws Exception {
+ executeTestForAllCaches(new CI2Exc<Ignite, IgniteCache<Integer, Integer>>() {
+ @Override public void applyx(Ignite ignite, final IgniteCache<Integer, Integer> cache) throws Exception {
+ for (CI1Exc<Transaction> txOperation : SUSPENDED_TX_PROHIBITED_OPS) {
+ for (TransactionIsolation isolation : TransactionIsolation.values()) {
+ Transaction tx = ignite.transactions().txStart(OPTIMISTIC, isolation);
+
+ cache.put(1, 1);
+
+ tx.suspend();
+
+ GridTestUtils.assertThrowsWithCause(txOperation, tx, IgniteException.class);
+
+ tx.resume();
+ tx.close();
+
+ assertNull(cache.get(1));
+ }
+ }
+ }
+ });
+ }
+
+ /**
+ * Test checking timeout on resumed transaction.
+ *
+ * @throws Exception If failed.
+ */
+ public void testTxTimeoutOnResumed() throws Exception {
+ executeTestForAllCaches(new CI2Exc<Ignite, IgniteCache<Integer, Integer>>() {
+ @Override public void applyx(Ignite ignite, final IgniteCache<Integer, Integer> cache) throws Exception {
+ for (TransactionIsolation isolation : TransactionIsolation.values()) {
+ final Transaction tx = ignite.transactions().txStart(OPTIMISTIC, isolation, TX_TIMEOUT, 0);
+
+ cache.put(1, 1);
+
+ tx.suspend();
+
+ Thread.sleep(TX_TIMEOUT * 2);
+
+ GridTestUtils.assertThrowsWithCause(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ tx.resume();
+
+ return null;
+ }
+ }, TransactionTimeoutException.class);
+
+ assertEquals(MARKED_ROLLBACK, tx.state());
+
+ tx.close();
+ }
+ }
+ });
+ }
+
+ /**
+ * Test checking timeout on suspended transaction.
+ *
+ * @throws Exception If failed.
+ */
+ public void testTxTimeoutOnSuspend() throws Exception {
+ executeTestForAllCaches(new CI2Exc<Ignite, IgniteCache<Integer, Integer>>() {
+ @Override public void applyx(Ignite ignite, final IgniteCache<Integer, Integer> cache) throws Exception {
+ for (TransactionIsolation isolation : TransactionIsolation.values()) {
+ final Transaction tx = ignite.transactions().txStart(OPTIMISTIC, isolation, TX_TIMEOUT, 0);
+
+ cache.put(1, 1);
+
+ Thread.sleep(TX_TIMEOUT * 2);
+
+ GridTestUtils.assertThrowsWithCause(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ tx.suspend();
+
+ return null;
+ }
+ }, TransactionTimeoutException.class);
+
+ assertEquals(MARKED_ROLLBACK, tx.state());
+
+ tx.close();
+
+ assertNull(cache.get(1));
+ }
+ }
+ });
+ }
+
+ /**
+ * Test start 1 transaction, suspendTx it. And then start another transaction, trying to write
+ * the same key and commit it.
+ *
+ * @throws Exception If failed.
+ */
+ public void testSuspendTxAndStartNew() throws Exception {
+ executeTestForAllCaches(new CI2Exc<Ignite, IgniteCache<Integer, Integer>>() {
+ @Override public void applyx(Ignite ignite, final IgniteCache<Integer, Integer> cache) throws Exception {
+ for (TransactionIsolation tx1Isolation : TransactionIsolation.values()) {
+ for (TransactionIsolation tx2Isolation : TransactionIsolation.values()) {
+ Transaction tx1 = ignite.transactions().txStart(OPTIMISTIC, tx1Isolation);
+
+ cache.put(1, 1);
+
+ tx1.suspend();
+
+ assertFalse(cache.containsKey(1));
+
+ Transaction tx2 = ignite.transactions().txStart(OPTIMISTIC, tx2Isolation);
+
+ cache.put(1, 2);
+
+ tx2.commit();
+
+ assertEquals(2, (int)cache.get(1));
+
+ tx1.resume();
+
+ assertEquals(1, (int)cache.get(1));
+
+ tx1.close();
+
+ cache.removeAll();
+ }
+ }
+ }
+ });
+ }
+
+ /**
+ * Test start 1 transaction, suspendTx it. And then start another transaction, trying to write
+ * the same key.
+ *
+ * @throws Exception If failed.
+ */
+ public void testSuspendTxAndStartNewWithoutCommit() throws Exception {
+ executeTestForAllCaches(new CI2Exc<Ignite, IgniteCache<Integer, Integer>>() {
+ @Override public void applyx(Ignite ignite, final IgniteCache<Integer, Integer> cache) throws Exception {
+ for (TransactionIsolation tx1Isolation : TransactionIsolation.values()) {
+ for (TransactionIsolation tx2Isolation : TransactionIsolation.values()) {
+ Transaction tx1 = ignite.transactions().txStart(OPTIMISTIC, tx1Isolation);
+
+ cache.put(1, 1);
+
+ tx1.suspend();
+
+ assertFalse(cache.containsKey(1));
+
+ Transaction tx2 = ignite.transactions().txStart(OPTIMISTIC, tx2Isolation);
+
+ cache.put(1, 2);
+
+ tx2.suspend();
+
+ assertFalse(cache.containsKey(1));
+
+ tx1.resume();
+
+ assertEquals(1, (int)cache.get(1));
+
+ tx1.suspend();
+
+ tx2.resume();
+
+ assertEquals(2, (int)cache.get(1));
+
+ tx2.rollback();
+
+ tx1.resume();
+ tx1.rollback();
+
+ cache.removeAll();
+ }
+ }
+ }
+ });
+ }
+
+ /**
+ * Test we can resume and complete transaction if topology changed while transaction is suspended.
+ *
+ * @throws Exception If failed.
+ */
+ public void testSuspendTxAndResumeAfterTopologyChange() throws Exception {
+ executeTestForAllCaches(new CI2Exc<Ignite, IgniteCache<Integer, Integer>>() {
+ @Override public void applyx(Ignite ignite, final IgniteCache<Integer, Integer> cache) throws Exception {
+ for (TransactionIsolation isolation : TransactionIsolation.values()) {
+ Transaction tx = ignite.transactions().txStart(OPTIMISTIC, isolation);
+
+ cache.put(1, 1);
+
+ tx.suspend();
+
+ assertEquals(SUSPENDED, tx.state());
+
+ try (IgniteEx g = startGrid(serversNumber() + 3)) {
+ tx.resume();
+
+ assertEquals(ACTIVE, tx.state());
+
+ assertEquals(1, (int)cache.get(1));
+
+ tx.commit();
+
+ assertEquals(1, (int)cache.get(1));
+ }
+
+ cache.removeAll();
+ }
+ }
+ });
+ }
+
+ /**
+ * @return Cache configurations to test.
+ */
+ private List<CacheConfiguration<Integer, Integer>> cacheConfigurations() {
+ List<CacheConfiguration<Integer, Integer>> cfgs = new ArrayList<>();
+
+ cfgs.add(cacheConfiguration(PARTITIONED, 0, false));
+ cfgs.add(cacheConfiguration(PARTITIONED, 1, false));
+ cfgs.add(cacheConfiguration(PARTITIONED, 1, true));
+ cfgs.add(cacheConfiguration(REPLICATED, 0, false));
+
+ return cfgs;
+ }
+
+ /**
+ * @param cacheMode Cache mode.
+ * @param backups Number of backups.
+ * @param nearCache If {@code true} near cache is enabled.
+ * @return Cache configuration.
+ */
+ private CacheConfiguration<Integer, Integer> cacheConfiguration(
+ CacheMode cacheMode,
+ int backups,
+ boolean nearCache) {
+ CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME);
+
+ ccfg.setCacheMode(cacheMode);
+ ccfg.setAtomicityMode(TRANSACTIONAL);
+ ccfg.setWriteSynchronizationMode(FULL_SYNC);
+
+ if (cacheMode == PARTITIONED)
+ ccfg.setBackups(backups);
+
+ if (nearCache)
+ ccfg.setNearConfiguration(new NearCacheConfiguration<Integer, Integer>());
+
+ return ccfg;
+ }
+
+ /**
+ * @param c Closure.
+ * @throws Exception If failed.
+ */
+ private void executeTestForAllCaches(CI2<Ignite, IgniteCache<Integer, Integer>> c) throws Exception {
+ for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
+ ignite(0).createCache(ccfg);
+
+ log.info("Run test for cache [cache=" + ccfg.getCacheMode() +
+ ", backups=" + ccfg.getBackups() +
+ ", near=" + (ccfg.getNearConfiguration() != null) + "]");
+
+ int srvNum = serversNumber();
+ if (serversNumber() > 1) {
+ ignite(serversNumber() + 1).createNearCache(ccfg.getName(), new NearCacheConfiguration<>());
+ srvNum += 2;
+ }
+
+ try {
+ for (int i = 0; i < srvNum; i++) {
+ Ignite ignite = ignite(i);
+
+ log.info("Run test for node [node=" + i + ", client=" + ignite.configuration().isClientMode() + ']');
+
+ c.apply(ignite, ignite.<Integer, Integer>cache(ccfg.getName()));
+ }
+ }
+ finally {
+ ignite(0).destroyCache(ccfg.getName());
+ }
+ }
+ }
+
+ /**
+ * Closure with 2 parameters that can throw any exception.
+ *
+ * @param <E1> Type of first closure parameter.
+ * @param <E2> Type of second closure parameter.
+ */
+ public static abstract class CI2Exc<E1, E2> implements CI2<E1, E2> {
+ /**
+ * Closure body.
+ *
+ * @param e1 First closure argument.
+ * @param e2 Second closure argument.
+ * @throws Exception If failed.
+ */
+ public abstract void applyx(E1 e1, E2 e2) throws Exception;
+
+ /** {@inheritdoc} */
+ @Override public void apply(E1 e1, E2 e2) {
+ try {
+ applyx(e1, e2);
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ /**
+ * Closure that can throw any exception.
+ *
+ * @param <T> Type of closure parameter.
+ */
+ public static abstract class CI1Exc<T> implements CI1<T> {
+ /**
+ * Closure body.
+ *
+ * @param o Closure argument.
+ * @throws Exception If failed.
+ */
+ public abstract void applyx(T o) throws Exception;
+
+ /** {@inheritdoc} */
+ @Override public void apply(T o) {
+ try {
+ applyx(o);
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ /**
+ * Runnable that can throw any exception.
+ */
+ public static abstract class RunnableX implements Runnable {
+ /**
+ * Closure body.
+ *
+ * @throws Exception If failed.
+ */
+ public abstract void runx() throws Exception;
+
+ /** {@inheritdoc} */
+ @Override public void run() {
+ try {
+ runx();
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e255a564/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgnitePessimisticTxSuspendResumeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgnitePessimisticTxSuspendResumeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgnitePessimisticTxSuspendResumeTest.java
new file mode 100644
index 0000000..57a1470
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgnitePessimisticTxSuspendResumeTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import java.util.concurrent.Callable;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteTransactions;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+
+/**
+ *
+ */
+public class IgnitePessimisticTxSuspendResumeTest extends GridCommonAbstractTest {
+ /**
+ * Creates new cache configuration.
+ *
+ * @return CacheConfiguration New cache configuration.
+ */
+ protected CacheConfiguration<Integer, String> getCacheConfiguration() {
+ CacheConfiguration<Integer, String> cacheCfg = defaultCacheConfiguration();
+
+ cacheCfg.setCacheMode(PARTITIONED);
+
+ return cacheCfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ cfg.setClientMode(false);
+ cfg.setCacheConfiguration(getCacheConfiguration());
+
+ return cfg;
+ }
+
+ /**
+ * Test for suspension on pessimistic transaction.
+ *
+ * @throws Exception If failed.
+ */
+ public void testSuspendPessimisticTx() throws Exception {
+ try (Ignite g = startGrid()) {
+ IgniteCache<Integer, String> cache = jcache();
+
+ IgniteTransactions txs = g.transactions();
+
+ for (TransactionIsolation isolation : TransactionIsolation.values()) {
+ final Transaction tx = txs.txStart(TransactionConcurrency.PESSIMISTIC, isolation);
+
+ cache.put(1, "1");
+
+ GridTestUtils.assertThrowsWithCause(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ tx.suspend();
+
+ return null;
+ }
+ }, UnsupportedOperationException.class);
+
+ tx.close();
+
+ assertNull(cache.get(1));
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e255a564/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
index cbcbaee..585c759 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
@@ -426,6 +426,32 @@ public final class GridTestUtils {
}
/**
+ * Checks whether closure throws exception, which is itself of a specified
+ * class, or has a cause of the specified class.
+ *
+ * @param call Closure.
+ * @param p Parameter passed to closure.
+ * @param cls Expected class.
+ * @return Thrown throwable.
+ */
+ public static <P> Throwable assertThrowsWithCause(IgniteInClosure<P> call, P p, Class<? extends Throwable> cls) {
+ assert call != null;
+ assert cls != null;
+
+ try {
+ call.apply(p);
+ }
+ catch (Throwable e) {
+ if (!X.hasCause(e, cls))
+ fail("Exception is neither of a specified class, nor has a cause of the specified class: " + cls, e);
+
+ return e;
+ }
+
+ throw new AssertionError("Exception has not been thrown.");
+ }
+
+ /**
* Throw assertion error with specified error message and initialized cause.
*
* @param msg Error message.
http://git-wip-us.apache.org/repos/asf/ignite/blob/e255a564/modules/core/src/test/java/org/apache/ignite/testframework/junits/cache/GridAbstractCacheStoreSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/cache/GridAbstractCacheStoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/cache/GridAbstractCacheStoreSelfTest.java
index c5673b3..f764212 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/cache/GridAbstractCacheStoreSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/cache/GridAbstractCacheStoreSelfTest.java
@@ -579,6 +579,16 @@ public abstract class GridAbstractCacheStoreSelfTest<T extends CacheStore<Object
}
/** {@inheritDoc} */
+ @Override public void suspend() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void resume() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
@Override public IgniteFuture<Void> rollbackAsync() throws IgniteException {
return null;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e255a564/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
new file mode 100644
index 0000000..90190d0
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.testsuites;
+
+import junit.framework.TestSuite;
+import org.apache.ignite.internal.processors.cache.distributed.IgniteOptimisticTxSuspendResumeMultiServerTest;
+import org.apache.ignite.internal.processors.cache.distributed.IgniteOptimisticTxSuspendResumeTest;
+import org.apache.ignite.internal.processors.cache.distributed.IgnitePessimisticTxSuspendResumeTest;
+
+/**
+ * Test suite.
+ */
+public class IgniteCacheTestSuite6 extends TestSuite {
+ /**
+ * @return IgniteCache test suite.
+ * @throws Exception Thrown in case of the failure.
+ */
+ public static TestSuite suite() throws Exception {
+ TestSuite suite = new TestSuite("IgniteCache Test Suite part 6");
+
+ suite.addTestSuite(IgniteOptimisticTxSuspendResumeTest.class);
+ suite.addTestSuite(IgniteOptimisticTxSuspendResumeMultiServerTest.class);
+ suite.addTestSuite(IgnitePessimisticTxSuspendResumeTest.class);
+
+ return suite;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e255a564/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaManager.java
----------------------------------------------------------------------
diff --git a/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaManager.java b/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaManager.java
index 5047491..dd5f6b7 100644
--- a/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaManager.java
+++ b/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaManager.java
@@ -28,10 +28,11 @@ import org.apache.ignite.cache.jta.CacheTmLookup;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.TransactionConfiguration;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.lifecycle.LifecycleAware;
import org.jetbrains.annotations.Nullable;
+import static org.apache.ignite.transactions.TransactionState.SUSPENDED;
+
/**
* Implementation of {@link CacheJtaManagerAdapter}.
*/
@@ -147,7 +148,7 @@ public class CacheJtaManager extends CacheJtaManagerAdapter {
if (jtaTm != null) {
CacheJtaResource rsrc = this.rsrc.get();
- if (rsrc == null || rsrc.isFinished()) {
+ if (rsrc == null || rsrc.isFinished() || rsrc.cacheTx().state() == SUSPENDED) {
try {
Transaction jtaTx = jtaTm.getTransaction();
http://git-wip-us.apache.org/repos/asf/ignite/blob/e255a564/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaResource.java
----------------------------------------------------------------------
diff --git a/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaResource.java b/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaResource.java
index 649f7c4..e29c44b 100644
--- a/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaResource.java
+++ b/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaResource.java
@@ -71,12 +71,21 @@ final class CacheJtaResource implements XAResource, Synchronization {
}
/** {@inheritDoc} */
- @Override public void start(Xid xid, int flags) {
+ @Override public void start(Xid xid, int flags) throws XAException {
if (log.isDebugEnabled())
log.debug("XA resource start(...) [xid=" + xid + ", flags=<" + flags(flags) + ">]");
// Simply save global transaction id.
this.xid = xid;
+
+ if ((flags & TMRESUME) == TMRESUME) {
+ try {
+ cacheTx.resume();
+ }
+ catch (IgniteCheckedException e) {
+ throwException("Failed to resume cache transaction: " + e.getMessage(), e);
+ }
+ }
}
/**
@@ -128,7 +137,7 @@ final class CacheJtaResource implements XAResource, Synchronization {
}
/** {@inheritDoc} */
- @Override public void end(Xid xid, int flags) {
+ @Override public void end(Xid xid, int flags) throws XAException {
assert this.xid.equals(xid);
if (log.isDebugEnabled())
@@ -136,6 +145,14 @@ final class CacheJtaResource implements XAResource, Synchronization {
if ((flags & TMFAIL) > 0)
cacheTx.setRollbackOnly();
+ else if ((flags & TMSUSPEND) == TMSUSPEND) {
+ try {
+ cacheTx.suspend();
+ }
+ catch (IgniteCheckedException e) {
+ throwException("Failed to suspend cache transaction: " + e.getMessage(), e);
+ }
+ }
}
/** {@inheritDoc} */
@@ -297,6 +314,13 @@ final class CacheJtaResource implements XAResource, Synchronization {
return state == COMMITTED || state == ROLLED_BACK;
}
+ /**
+ * @return Internal tx
+ */
+ GridNearTxLocal cacheTx() {
+ return cacheTx;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(CacheJtaResource.class, this);
http://git-wip-us.apache.org/repos/asf/ignite/blob/e255a564/modules/jta/src/test/java/org/apache/ignite/internal/processors/cache/GridJtaTransactionManagerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/jta/src/test/java/org/apache/ignite/internal/processors/cache/GridJtaTransactionManagerSelfTest.java b/modules/jta/src/test/java/org/apache/ignite/internal/processors/cache/GridJtaTransactionManagerSelfTest.java
new file mode 100644
index 0000000..a181068
--- /dev/null
+++ b/modules/jta/src/test/java/org/apache/ignite/internal/processors/cache/GridJtaTransactionManagerSelfTest.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import javax.cache.configuration.Factory;
+import javax.transaction.Transaction;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.TransactionConfiguration;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+import org.objectweb.jotm.Current;
+import org.objectweb.jotm.Jotm;
+import org.objectweb.transaction.jta.TransactionManager;
+
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.transactions.TransactionState.ACTIVE;
+
+/**
+ * JTA Tx Manager test.
+ */
+public class GridJtaTransactionManagerSelfTest extends GridCommonAbstractTest {
+ /** Java Open Transaction Manager facade. */
+ private static Jotm jotm;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName).
+ setCacheConfiguration(defaultCacheConfiguration().setCacheMode(PARTITIONED));
+
+ cfg.getTransactionConfiguration().setTxManagerFactory(new Factory<TransactionManager>() {
+ private static final long serialVersionUID = 0L;
+
+ @Override public TransactionManager create() {
+ return jotm.getTransactionManager();
+ }
+ });
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ jotm = new Jotm(true, false);
+
+ Current.setAppServer(false);
+
+ startGrid();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ super.afterTestsStopped();
+
+ stopAllGrids();
+
+ jotm.stop();
+ }
+
+ /**
+ * Test for switching tx context by JTA Manager.
+ *
+ * @throws Exception If failed.
+ */
+ public void testJtaTxContextSwitch() throws Exception {
+ for (TransactionIsolation isolation : TransactionIsolation.values()) {
+ TransactionConfiguration cfg = grid().context().config().getTransactionConfiguration();
+
+ cfg.setDefaultTxConcurrency(TransactionConcurrency.OPTIMISTIC);
+ cfg.setDefaultTxIsolation(isolation);
+
+ TransactionManager jtaTm = jotm.getTransactionManager();
+
+ IgniteCache<Integer, String> cache = jcache();
+
+ assertNull(grid().transactions().tx());
+
+ jtaTm.begin();
+
+ Transaction tx1 = jtaTm.getTransaction();
+
+ cache.put(1, Integer.toString(1));
+
+ assertNotNull(grid().transactions().tx());
+
+ assertEquals(ACTIVE, grid().transactions().tx().state());
+
+ assertEquals(Integer.toString(1), cache.get(1));
+
+ jtaTm.suspend();
+
+ assertNull(grid().transactions().tx());
+
+ assertNull(cache.get(1));
+
+ jtaTm.begin();
+
+ Transaction tx2 = jtaTm.getTransaction();
+
+ assertNotSame(tx1, tx2);
+
+ cache.put(2, Integer.toString(2));
+
+ assertNotNull(grid().transactions().tx());
+
+ assertEquals(ACTIVE, grid().transactions().tx().state());
+
+ assertEquals(Integer.toString(2), cache.get(2));
+
+ jtaTm.commit();
+
+ assertNull(grid().transactions().tx());
+
+ assertEquals(Integer.toString(2), cache.get(2));
+
+ jtaTm.resume(tx1);
+
+ assertNotNull(grid().transactions().tx());
+
+ assertEquals(ACTIVE, grid().transactions().tx().state());
+
+ cache.put(3, Integer.toString(3));
+
+ jtaTm.commit();
+
+ assertEquals("1", cache.get(1));
+ assertEquals("2", cache.get(2));
+ assertEquals("3", cache.get(3));
+
+ assertNull(grid().transactions().tx());
+
+ cache.removeAll();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testJtaTxContextSwitchWithExistingTx() throws Exception {
+ for (TransactionIsolation isolation : TransactionIsolation.values()) {
+ TransactionConfiguration cfg = grid().context().config().getTransactionConfiguration();
+
+ cfg.setDefaultTxConcurrency(TransactionConcurrency.OPTIMISTIC);
+ cfg.setDefaultTxIsolation(isolation);
+
+ TransactionManager jtaTm = jotm.getTransactionManager();
+
+ IgniteCache<Integer, String> cache = jcache();
+
+ jtaTm.begin();
+
+ Transaction tx1 = jtaTm.getTransaction();
+
+ cache.put(1, Integer.toString(1));
+
+ assertNotNull(grid().transactions().tx());
+
+ assertEquals(ACTIVE, grid().transactions().tx().state());
+
+ assertEquals(Integer.toString(1), cache.get(1));
+
+ jtaTm.suspend();
+
+ jtaTm.begin();
+
+ Transaction tx2 = jtaTm.getTransaction();
+
+ assertNotSame(tx1, tx2);
+
+ cache.put(2, Integer.toString(2));
+
+ try {
+ jtaTm.resume(tx1);
+
+ fail("jtaTm.resume shouldn't success.");
+ }
+ catch (IllegalStateException ignored) {
+ // No-op.
+ }
+ finally {
+ jtaTm.rollback(); //rolling back tx2
+ }
+
+ jtaTm.resume(tx1);
+ jtaTm.rollback();
+
+ cache.removeAll();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e255a564/modules/jta/src/test/java/org/apache/ignite/testsuites/IgniteJtaTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/jta/src/test/java/org/apache/ignite/testsuites/IgniteJtaTestSuite.java b/modules/jta/src/test/java/org/apache/ignite/testsuites/IgniteJtaTestSuite.java
index 4ae5df0..0775c1a 100644
--- a/modules/jta/src/test/java/org/apache/ignite/testsuites/IgniteJtaTestSuite.java
+++ b/modules/jta/src/test/java/org/apache/ignite/testsuites/IgniteJtaTestSuite.java
@@ -21,13 +21,14 @@ import junit.framework.TestSuite;
import org.apache.ignite.internal.processors.cache.CacheJndiTmFactorySelfTest;
import org.apache.ignite.internal.processors.cache.GridCacheJtaConfigurationValidationSelfTest;
import org.apache.ignite.internal.processors.cache.GridCacheJtaFactoryConfigValidationSelfTest;
+import org.apache.ignite.internal.processors.cache.GridJtaLifecycleAwareSelfTest;
+import org.apache.ignite.internal.processors.cache.GridJtaTransactionManagerSelfTest;
import org.apache.ignite.internal.processors.cache.GridPartitionedCacheJtaFactorySelfTest;
import org.apache.ignite.internal.processors.cache.GridPartitionedCacheJtaFactoryUseSyncSelfTest;
import org.apache.ignite.internal.processors.cache.GridPartitionedCacheJtaLookupClassNameSelfTest;
import org.apache.ignite.internal.processors.cache.GridReplicatedCacheJtaFactorySelfTest;
import org.apache.ignite.internal.processors.cache.GridReplicatedCacheJtaFactoryUseSyncSelfTest;
import org.apache.ignite.internal.processors.cache.GridReplicatedCacheJtaLookupClassNameSelfTest;
-import org.apache.ignite.internal.processors.cache.GridJtaLifecycleAwareSelfTest;
import org.apache.ignite.testframework.IgniteTestSuite;
/**
@@ -54,6 +55,8 @@ public class IgniteJtaTestSuite extends TestSuite {
suite.addTestSuite(GridCacheJtaConfigurationValidationSelfTest.class);
suite.addTestSuite(GridCacheJtaFactoryConfigValidationSelfTest.class);
+ suite.addTestSuite(GridJtaTransactionManagerSelfTest.class);
+
// Factory
suite.addTestSuite(CacheJndiTmFactorySelfTest.class);