You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/03/17 14:50:05 UTC
[08/16] ignite git commit: Internal cache API cleanup.
http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
index 35d0776..77272e1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
@@ -17,6 +17,25 @@
package org.apache.ignite.internal.processors.igfs;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.CountDownLatch;
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.EntryProcessorResult;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteCompute;
import org.apache.ignite.IgniteException;
@@ -47,7 +66,7 @@ import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
import org.apache.ignite.internal.processors.cache.GridCacheInternal;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.processors.igfs.client.IgfsClientAbstractCallable;
import org.apache.ignite.internal.processors.igfs.client.meta.IgfsClientMetaIdsForPathCallable;
import org.apache.ignite.internal.processors.igfs.client.meta.IgfsClientMetaInfoForPathCallable;
@@ -76,26 +95,6 @@ import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
import org.jetbrains.annotations.Nullable;
-import javax.cache.processor.EntryProcessor;
-import javax.cache.processor.EntryProcessorResult;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Deque;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeMap;
-import java.util.TreeSet;
-import java.util.concurrent.CountDownLatch;
-
import static org.apache.ignite.events.EventType.EVT_IGFS_DIR_RENAMED;
import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_RENAMED;
@@ -604,7 +603,7 @@ public class IgfsMetaManager extends IgfsManager {
assert fileId != null;
- try (IgniteInternalTx tx = startTx()) {
+ try (GridNearTxLocal tx = startTx()) {
// Lock file ID for this transaction.
IgfsEntryInfo oldInfo = info(fileId);
@@ -1005,7 +1004,7 @@ public class IgfsMetaManager extends IgfsManager {
srcPathIds.addExistingIds(lockIds, relaxed);
dstPathIds.addExistingIds(lockIds, relaxed);
- try (IgniteInternalTx tx = startTx()) {
+ try (GridNearTxLocal tx = startTx()) {
// Obtain the locks.
final Map<IgniteUuid, IgfsEntryInfo> lockInfos = lockIds(lockIds);
@@ -1145,7 +1144,7 @@ public class IgfsMetaManager extends IgfsManager {
IgniteUuid trashId = IgfsUtils.randomTrashId();
- try (IgniteInternalTx tx = startTx()) {
+ try (GridNearTxLocal tx = startTx()) {
// NB: We may lock root because its id is less than any other id:
final IgfsEntryInfo rootInfo = lockIds(IgfsUtils.ROOT_ID, trashId).get(IgfsUtils.ROOT_ID);
@@ -1268,7 +1267,7 @@ public class IgfsMetaManager extends IgfsManager {
allIds.add(trashId);
- try (IgniteInternalTx tx = startTx()) {
+ try (GridNearTxLocal tx = startTx()) {
// Lock participants.
Map<IgniteUuid, IgfsEntryInfo> lockInfos = lockIds(allIds);
@@ -1345,7 +1344,7 @@ public class IgfsMetaManager extends IgfsManager {
assert listing != null;
validTxState(false);
- try (IgniteInternalTx tx = startTx()) {
+ try (GridNearTxLocal tx = startTx()) {
Collection<IgniteUuid> res = new HashSet<>();
// Obtain all necessary locks in one hop.
@@ -1431,7 +1430,7 @@ public class IgfsMetaManager extends IgfsManager {
try {
validTxState(false);
- try (IgniteInternalTx tx = startTx()) {
+ try (GridNearTxLocal tx = startTx()) {
Map<IgniteUuid, IgfsEntryInfo> infos = lockIds(parentId, id);
IgfsEntryInfo victim = infos.get(id);
@@ -1517,7 +1516,7 @@ public class IgfsMetaManager extends IgfsManager {
try {
validTxState(false);
- try (IgniteInternalTx tx = startTx()) {
+ try (GridNearTxLocal tx = startTx()) {
IgfsEntryInfo info = updatePropertiesNonTx(fileId, props);
tx.commit();
@@ -1551,7 +1550,7 @@ public class IgfsMetaManager extends IgfsManager {
if (log.isDebugEnabled())
log.debug("Reserve file space: " + fileId);
- try (IgniteInternalTx tx = startTx()) {
+ try (GridNearTxLocal tx = startTx()) {
// Lock file ID for this transaction.
IgfsEntryInfo oldInfo = info(fileId);
@@ -1596,7 +1595,7 @@ public class IgfsMetaManager extends IgfsManager {
if (log.isDebugEnabled())
log.debug("Update file info [fileId=" + fileId + ", proc=" + proc + ']');
- try (IgniteInternalTx tx = startTx()) {
+ try (GridNearTxLocal tx = startTx()) {
// Lock file ID for this transaction.
IgfsEntryInfo oldInfo = info(fileId);
@@ -1658,7 +1657,7 @@ public class IgfsMetaManager extends IgfsManager {
pathIds.addSurrogateIds(lockIds);
// Start TX.
- try (IgniteInternalTx tx = startTx()) {
+ try (GridNearTxLocal tx = startTx()) {
final Map<IgniteUuid, IgfsEntryInfo> lockInfos = lockIds(lockIds);
if (!pathIds.verifyIntegrity(lockInfos, relaxed))
@@ -1709,7 +1708,7 @@ public class IgfsMetaManager extends IgfsManager {
try {
validTxState(false);
- try (IgniteInternalTx tx = startTx()) {
+ try (GridNearTxLocal tx = startTx()) {
Object prev = val != null ? metaCache.getAndPut(sampling, val) : metaCache.getAndRemove(sampling);
tx.commit();
@@ -2602,7 +2601,7 @@ public class IgfsMetaManager extends IgfsManager {
pathIds.add(idsForPath(path));
// Start pessimistic.
- try (IgniteInternalTx tx = startTx()) {
+ try (GridNearTxLocal tx = startTx()) {
// Lock the very first existing parents and possibly the leaf as well.
Map<IgfsPath, IgfsPath> pathToParent = new HashMap<>();
@@ -2788,7 +2787,7 @@ public class IgfsMetaManager extends IgfsManager {
*
* @return Transaction.
*/
- private IgniteInternalTx startTx() {
+ private GridNearTxLocal startTx() {
return metaCache.txStartEx(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ);
}
@@ -2817,7 +2816,7 @@ public class IgfsMetaManager extends IgfsManager {
pathIds.addExistingIds(lockIds, relaxed);
// Start TX.
- try (IgniteInternalTx tx = startTx()) {
+ try (GridNearTxLocal tx = startTx()) {
Map<IgniteUuid, IgfsEntryInfo> lockInfos = lockIds(lockIds);
if (secondaryFs != null && isRetryForSecondary(pathIds, lockInfos))
@@ -2927,7 +2926,7 @@ public class IgfsMetaManager extends IgfsManager {
pathIds.addSurrogateIds(lockIds);
// Start TX.
- try (IgniteInternalTx tx = startTx()) {
+ try (GridNearTxLocal tx = startTx()) {
Map<IgniteUuid, IgfsEntryInfo> lockInfos = lockIds(lockIds);
if (!pathIds.verifyIntegrity(lockInfos, relaxed))
@@ -3034,7 +3033,7 @@ public class IgfsMetaManager extends IgfsManager {
}
// Start TX.
- try (IgniteInternalTx tx = startTx()) {
+ try (GridNearTxLocal tx = startTx()) {
Map<IgniteUuid, IgfsEntryInfo> lockInfos = lockIds(lockIds);
if (secondaryCtx != null && isRetryForSecondary(pathIds, lockInfos))
http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index 41dbdd0..a680a88 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -66,15 +66,16 @@ import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage;
import org.apache.ignite.internal.processors.cache.CacheEntryImpl;
import org.apache.ignite.internal.processors.cache.CacheIteratorConverter;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.processors.cache.query.CacheQuery;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.marshaller.MappingAcceptedMessage;
import org.apache.ignite.internal.processors.marshaller.MappingProposedMessage;
import org.apache.ignite.internal.processors.task.GridInternal;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
import org.apache.ignite.internal.util.GridEmptyIterator;
import org.apache.ignite.internal.util.GridSpinBusyLock;
+import org.apache.ignite.internal.util.SerializableTransient;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -92,7 +93,6 @@ import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.marshaller.Marshaller;
-import org.apache.ignite.internal.util.SerializableTransient;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.resources.JobContextResource;
import org.apache.ignite.resources.LoggerResource;
@@ -945,7 +945,7 @@ public class GridServiceProcessor extends GridProcessorAdapter {
else
nodes = null;
- try (IgniteInternalTx tx = cache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
+ try (GridNearTxLocal tx = cache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
GridServiceAssignmentsKey key = new GridServiceAssignmentsKey(cfg.getName());
GridServiceAssignments oldAssigns = (GridServiceAssignments)cache.get(key);
http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java b/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java
index 53e6add..c4d8a79 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java
@@ -88,6 +88,8 @@ public class TestRecordingCommunicationSpi extends TcpCommunicationSpi {
blockedMsgs.add(new T2<>(node, ioMsg));
+ notifyAll();
+
return;
}
}
@@ -137,6 +139,33 @@ public class TestRecordingCommunicationSpi extends TcpCommunicationSpi {
}
/**
+ * @param cls Message class.
+ * @param nodeName Node name.
+ * @throws InterruptedException If interrupted.
+ */
+ public void waitForMessage(Class<?> cls, String nodeName) throws InterruptedException {
+ synchronized (this) {
+ while (!hasMessage(cls, nodeName))
+ wait();
+ }
+ }
+
+ /**
+ * @param cls Message class.
+ * @param nodeName Node name.
+ * @return {@code True} if has blocked message.
+ */
+ private boolean hasMessage(Class<?> cls, String nodeName) {
+ for (T2<ClusterNode, GridIoMessage> msg : blockedMsgs) {
+ if (msg.get2().message().getClass() == cls &&
+ nodeName.equals(msg.get1().attribute(ATTR_IGNITE_INSTANCE_NAME)))
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
* @param blockP Message block predicate.
*/
public void blockMessages(IgnitePredicate<GridIoMessage> blockP) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestStore.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestStore.java
index 3622964..eb30927 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestStore.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestStore.java
@@ -338,8 +338,6 @@ public final class GridCacheTestStore implements CacheStore<Integer, String> {
txs.add(tx);
- assertTrue("Unexpected tx class: " + tx.getClass(), tx instanceof TransactionProxy);
-
IgniteInternalTx tx0 = GridTestUtils.getFieldValue(tx, "tx");
if (!tx0.local())
http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxConfigCacheSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxConfigCacheSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxConfigCacheSelfTest.java
index 6e8c2a1..4fd4989 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxConfigCacheSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxConfigCacheSelfTest.java
@@ -33,8 +33,8 @@ import org.apache.ignite.configuration.TransactionConfiguration;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteInClosure;
@@ -210,7 +210,7 @@ public class IgniteTxConfigCacheSelfTest extends GridCommonAbstractTest {
* @throws Exception If failed.
*/
protected void checkStartTxSuccess(final IgniteInternalCache<Object, Object> cache) throws Exception {
- try (final IgniteInternalTx tx = CU.txStartInternal(cache.context(), cache, PESSIMISTIC, READ_COMMITTED)) {
+ try (final GridNearTxLocal tx = CU.txStartInternal(cache.context(), cache, PESSIMISTIC, READ_COMMITTED)) {
assert tx != null;
sleepForTxFailure();
http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSystemTransactionsSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSystemTransactionsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSystemTransactionsSelfTest.java
index 17570aa..f821a45 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSystemTransactionsSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSystemTransactionsSelfTest.java
@@ -18,21 +18,18 @@
package org.apache.ignite.internal.processors.cache.distributed;
import java.util.Map;
-import java.util.concurrent.Callable;
import org.apache.ignite.IgniteCache;
-import org.apache.ignite.IgniteException;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.processors.cache.GridCacheAbstractSelfTest;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.transactions.Transaction;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
@@ -84,7 +81,7 @@ public class IgniteCacheSystemTransactionsSelfTest extends GridCacheAbstractSelf
utilityCache.getAndPutIfAbsent("2", "2");
- try (IgniteInternalTx itx = utilityCache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
+ try (GridNearTxLocal itx = utilityCache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
assertEquals(null, utilityCache.get("1"));
assertEquals("2", utilityCache.get("2"));
assertEquals(null, utilityCache.get("3"));
http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxCachePrimarySyncTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxCachePrimarySyncTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxCachePrimarySyncTest.java
index b0aa67a..9ac9e31 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxCachePrimarySyncTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxCachePrimarySyncTest.java
@@ -105,6 +105,11 @@ public class IgniteTxCachePrimarySyncTest extends GridCommonAbstractTest {
}
/** {@inheritDoc} */
+ @Override protected long getTestTimeout() {
+ return 15 * 60_000;
+ }
+
+ /** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
super.beforeTestsStarted();
http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java
index 702a883..91e3b26 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java
@@ -37,7 +37,7 @@ import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.managers.communication.GridIoMessage;
import org.apache.ignite.internal.processors.cache.GridCacheAbstractSelfTest;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.processors.cache.transactions.TransactionProxyImpl;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.typedef.F;
@@ -169,14 +169,14 @@ public abstract class IgniteTxOriginatingNodeFailureAbstractSelfTest extends Gri
TransactionProxyImpl tx = (TransactionProxyImpl)txIgniteNode.transactions().txStart();
- IgniteInternalTx txEx = tx.tx();
+ GridNearTxLocal txEx = tx.tx();
assertTrue(txEx.optimistic());
cache.putAll(map);
try {
- txEx.prepareAsync().get(3, TimeUnit.SECONDS);
+ txEx.prepareNearTxLocal().get(3, TimeUnit.SECONDS);
}
catch (IgniteFutureTimeoutCheckedException ignored) {
info("Failed to wait for prepare future completion: " + partial);
http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest.java
index 3c1ae8e..4997b20 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest.java
@@ -42,7 +42,7 @@ import org.apache.ignite.internal.managers.communication.GridIoMessage;
import org.apache.ignite.internal.processors.cache.GridCacheAbstractSelfTest;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager;
import org.apache.ignite.internal.processors.cache.transactions.TransactionProxyImpl;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
@@ -349,7 +349,7 @@ public abstract class IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest
TransactionProxyImpl txProxy = (TransactionProxyImpl)tx;
- IgniteInternalTx txEx = txProxy.tx();
+ GridNearTxLocal txEx = txProxy.tx();
assertTrue(txEx.pessimistic());
http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java
index a0a5627..7ca3914 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java
@@ -37,6 +37,7 @@ import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.managers.communication.GridIoMessage;
import org.apache.ignite.internal.processors.cache.IgniteCacheAbstractTest;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager;
@@ -210,11 +211,11 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends
log.info("Start prepare.");
- IgniteInternalTx txEx = ((TransactionProxyImpl)tx).tx();
+ GridNearTxLocal txEx = ((TransactionProxyImpl)tx).tx();
commSpi.blockMessages(ignite(2).cluster().localNode().id()); // Do not allow to finish prepare for key2.
- IgniteInternalFuture<?> prepFut = txEx.prepareAsync();
+ IgniteInternalFuture<?> prepFut = txEx.prepareNearTxLocal();
waitPrepared(ignite(1));
@@ -371,11 +372,11 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends
log.info("Start prepare.");
- IgniteInternalTx txEx = ((TransactionProxyImpl)tx).tx();
+ GridNearTxLocal txEx = ((TransactionProxyImpl)tx).tx();
commSpi.blockMessages(ignite(2).cluster().localNode().id()); // Do not allow to finish prepare for key2.
- IgniteInternalFuture<?> prepFut = txEx.prepareAsync();
+ IgniteInternalFuture<?> prepFut = txEx.prepareNearTxLocal();
waitPrepared(ignite(1));
http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheTxRecoveryRollbackTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheTxRecoveryRollbackTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheTxRecoveryRollbackTest.java
new file mode 100644
index 0000000..cfe9029
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheTxRecoveryRollbackTest.java
@@ -0,0 +1,501 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.cache.Cache;
+import javax.cache.configuration.Factory;
+import javax.cache.integration.CacheLoaderException;
+import javax.cache.integration.CacheWriterException;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.store.CacheStore;
+import org.apache.ignite.cache.store.CacheStoreAdapter;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishRequest;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.resources.LoggerResource;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheRebalanceMode.ASYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+
+/**
+ *
+ */
+public class IgniteCacheTxRecoveryRollbackTest extends GridCommonAbstractTest {
+ /** */
+ private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private static ConcurrentHashMap<Object, Object> storeMap = new ConcurrentHashMap<>();
+
+ /** */
+ private boolean client;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.setConsistentId(gridName);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+ TestRecordingCommunicationSpi commSpi = new TestRecordingCommunicationSpi();
+
+ cfg.setCommunicationSpi(commSpi);
+
+ cfg.setClientMode(client);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ try {
+ for (Ignite node : G.allGrids()) {
+ Collection<IgniteInternalTx> txs = ((IgniteKernal)node).context().cache().context().tm().txs();
+
+ assertTrue("Unfinished txs [node=" + node.name() + ", txs=" + txs + ']', txs.isEmpty());
+ }
+ }
+ finally {
+ stopAllGrids();
+
+ storeMap.clear();
+
+ super.afterTest();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testNearTx1Implicit() throws Exception {
+ nearTx1(null);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testNearTx1Optimistic() throws Exception {
+ nearTx1(OPTIMISTIC);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testNearTx1Pessimistic() throws Exception {
+ nearTx1(PESSIMISTIC);
+ }
+
+ /**
+ * Stop tx near node (client2), near cache tx on client1 is either committed
+ * by primary or invalidated.
+ *
+ * @param concurrency Tx concurrency or {@code null} for implicit transaction.
+ * @throws Exception If failed.
+ */
+ private void nearTx1(final TransactionConcurrency concurrency) throws Exception {
+ startGrids(4);
+
+ Ignite srv0 = grid(0);
+
+ final IgniteCache<Integer, Integer> srvCache = srv0.createCache(cacheConfiguration(2, false, false));
+
+ awaitPartitionMapExchange();
+
+ client = true;
+
+ Ignite client1 = startGrid(4);
+ final Ignite client2 = startGrid(5);
+
+ final Integer key = primaryKey(srv0.cache(null));
+
+ final IgniteCache<Integer, Integer> cache1 =
+ client1.createNearCache(null, new NearCacheConfiguration<Integer, Integer>());
+
+ final IgniteCache<Integer, Integer> cache2 =
+ client2.createNearCache(null, new NearCacheConfiguration<Integer, Integer>());
+
+ cache1.put(key, 1);
+
+ final Integer newVal = 2;
+
+ testSpi(client2).blockMessages(GridNearTxFinishRequest.class, srv0.name());
+
+ IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ log.info("Start put, concurrency: " + concurrency);
+
+ if (concurrency != null) {
+ try (Transaction tx = client2.transactions().txStart(concurrency, REPEATABLE_READ)) {
+ cache2.put(key, newVal);
+
+ tx.commit();
+ }
+ }
+ else
+ cache2.put(key, newVal);
+
+ return null;
+ }
+ });
+
+ U.sleep(500);
+
+ assertFalse(fut.isDone());
+
+ testSpi(client2).waitForMessage(GridNearTxFinishRequest.class, srv0.name());
+
+ stopGrid(client2.name());
+
+ try {
+ fut.get();
+ }
+ catch (IgniteCheckedException ignore) {
+ // No-op.
+ }
+
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return newVal.equals(srvCache.get(key)) && newVal.equals(cache1.get(key));
+ }
+ }, 5000);
+
+ checkData(F.asMap(key, newVal));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testNearTx2Implicit() throws Exception {
+ nearTx2(null);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testNearTx2Optimistic() throws Exception {
+ nearTx2(OPTIMISTIC);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testNearTx2Pessimistic() throws Exception {
+ nearTx2(PESSIMISTIC);
+ }
+
+ /**
+ * Stop both tx near node (client2) and primary node, near cache tx on client1 is invalidated.
+ *
+ * @param concurrency Tx concurrency or {@code null} for implicit transaction.
+ * @throws Exception If failed.
+ */
+ private void nearTx2(final TransactionConcurrency concurrency) throws Exception {
+ startGrids(4);
+
+ Ignite srv0 = grid(0);
+
+ srv0.createCache(cacheConfiguration(2, false, false));
+
+ awaitPartitionMapExchange();
+
+ client = true;
+
+ Ignite client1 = startGrid(4);
+ final Ignite client2 = startGrid(5);
+
+ final Integer key = primaryKey(srv0.cache(null));
+
+ final IgniteCache<Integer, Integer> cache1 =
+ client1.createNearCache(null, new NearCacheConfiguration<Integer, Integer>());
+
+ final IgniteCache<Integer, Integer> cache2 =
+ client2.createNearCache(null, new NearCacheConfiguration<Integer, Integer>());
+
+ cache1.put(key, 1);
+
+ final Integer newVal = 2;
+
+ testSpi(client2).blockMessages(GridNearTxFinishRequest.class, srv0.name());
+
+ testSpi(srv0).blockMessages(new IgnitePredicate<GridIoMessage>() {
+ @Override public boolean apply(GridIoMessage msg) {
+ return msg.message() instanceof GridDhtTxFinishRequest;
+ }
+ });
+
+ IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ log.info("Start put, concurrency: " + concurrency);
+
+ if (concurrency != null) {
+ try (Transaction tx = client2.transactions().txStart(concurrency, REPEATABLE_READ)) {
+ cache2.put(key, newVal);
+
+ tx.commit();
+ }
+ }
+ else
+ cache2.put(key, newVal);
+
+ return null;
+ }
+ });
+
+ U.sleep(500);
+
+ assertFalse(fut.isDone());
+
+ testSpi(client2).waitForMessage(GridNearTxFinishRequest.class, srv0.name());
+
+ stopGrid(client2.name());
+ stopGrid(srv0.name());
+
+ try {
+ fut.get();
+ }
+ catch (IgniteCheckedException ignore) {
+ // No-op.
+ }
+
+ final IgniteCache<Integer, Integer> srvCache = grid(1).cache(null);
+
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return newVal.equals(srvCache.get(key)) && newVal.equals(cache1.get(key));
+ }
+ }, 5000);
+
+ checkData(F.asMap(key, newVal));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxWithStoreImplicit() throws Exception {
+ txWithStore(null, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxWithStoreOptimistic() throws Exception {
+ txWithStore(OPTIMISTIC, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxWithStorePessimistic() throws Exception {
+ txWithStore(PESSIMISTIC, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxWithStoreNoWriteThroughImplicit() throws Exception {
+ txWithStore(null, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxWithStoreNoWriteThroughOptimistic() throws Exception {
+ txWithStore(OPTIMISTIC, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxWithStoreNoWriteThroughPessimistic() throws Exception {
+ txWithStore(PESSIMISTIC, false);
+ }
+
+ /**
+ * @param concurrency Tx concurrency or {@code null} for implicit transaction.
+ * @param writeThrough Store write through flag.
+ * @throws Exception If failed.
+ */
+ private void txWithStore(final TransactionConcurrency concurrency, boolean writeThrough) throws Exception {
+ startGrids(4);
+
+ Ignite srv0 = grid(0);
+
+ IgniteCache<Integer, Integer> srv0Cache = srv0.createCache(cacheConfiguration(1, true, writeThrough));
+
+ awaitPartitionMapExchange();
+
+ final Integer key = primaryKey(srv0Cache);
+
+ srv0Cache.put(key, 1);
+
+ client = true;
+
+ Ignite client = startGrid(4);
+
+ testSpi(srv0).blockMessages(GridNearTxPrepareResponse.class, client.name());
+
+ final IgniteCache<Integer, Integer> clientCache = client.cache(null);
+
+ IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ log.info("Start put");
+
+ clientCache.put(key, 2);
+
+ return null;
+ }
+ });
+
+ U.sleep(500);
+
+ assertFalse(fut.isDone());
+
+ testSpi(srv0).waitForMessage(GridNearTxPrepareResponse.class, client.name());
+
+ stopGrid(client.name());
+
+ try {
+ fut.get();
+ }
+ catch (IgniteCheckedException ignore) {
+ // No-op.
+ }
+
+ U.sleep(1000);
+
+ if (writeThrough)
+ checkData(F.asMap(key, 1));
+ else
+ checkData(F.asMap(key, 2));
+ }
+
+ /**
+ * @param node Node.
+ * @return Node communication SPI.
+ */
+ private TestRecordingCommunicationSpi testSpi(Ignite node) {
+ return (TestRecordingCommunicationSpi)node.configuration().getCommunicationSpi();
+ }
+
+ /**
+ * @param backups Number of backups.
+ * @param store Cache store flag.
+ * @param writeThrough Store write through flag.
+ * @return Cache configuration.
+ */
+ private CacheConfiguration<Integer, Integer> cacheConfiguration(int backups, boolean store, boolean writeThrough) {
+ CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>();
+
+ ccfg.setAtomicityMode(TRANSACTIONAL);
+ ccfg.setWriteSynchronizationMode(FULL_SYNC);
+ ccfg.setBackups(backups);
+ ccfg.setRebalanceMode(ASYNC);
+
+ if (store) {
+ ccfg.setWriteThrough(writeThrough);
+
+ ccfg.setCacheStoreFactory(new TestStoreFactory());
+ }
+
+ return ccfg;
+ }
+
+ /**
+ * @param expData Expected cache data.
+ */
+ private void checkData(Map<Integer, Integer> expData) {
+ assert !expData.isEmpty();
+
+ List<Ignite> nodes = G.allGrids();
+
+ assertFalse(nodes.isEmpty());
+
+ for (Ignite node : nodes) {
+ IgniteCache<Integer, Integer> cache = node.cache(null);
+
+ for (Map.Entry<Integer, Integer> e : expData.entrySet()) {
+ assertEquals("Invalid value [key=" + e.getKey() + ", node=" + node.name() + ']',
+ e.getValue(),
+ cache.get(e.getKey()));
+ }
+ }
+ }
+
+ /**
+ *
+ */
+ private static class TestStoreFactory implements Factory<CacheStore<Object, Object>> {
+ /** */
+ @LoggerResource
+ private IgniteLogger log;
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override public CacheStore<Object, Object> create() {
+ return new CacheStoreAdapter() {
+ @Override public Object load(Object key) throws CacheLoaderException {
+ return storeMap.get(key);
+ }
+
+ @Override public void write(Cache.Entry entry) throws CacheWriterException {
+ log.info("Store write [key=" + entry.getKey() + ", val=" + entry.getValue() + ']');
+
+ storeMap.put(entry.getKey(), entry.getValue());
+ }
+
+ @Override public void delete(Object key) throws CacheWriterException {
+ log.info("Store delete [key=" + key + ']');
+
+ storeMap.remove(key);
+ }
+ };
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedTxSalvageSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedTxSalvageSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedTxSalvageSelfTest.java
index 1517672..3e56b00 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedTxSalvageSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedTxSalvageSelfTest.java
@@ -28,7 +28,6 @@ import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.TransactionProxyImpl;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
@@ -153,7 +152,7 @@ public class GridCachePartitionedTxSalvageSelfTest extends GridCommonAbstractTes
* Check whether caches has no transactions after salvage timeout.
*
* @param mode Transaction mode (PESSIMISTIC, OPTIMISTIC).
- * @param prepare Whether to prepare transaction state (i.e. call {@link IgniteInternalTx#prepare()}).
+ * @param prepare Whether to prepare transaction state (i.e. call {@link GridNearTxLocal#prepare()}).
* @throws Exception If failed.
*/
private void checkSalvageAfterTimeout(TransactionConcurrency mode, boolean prepare) throws Exception {
@@ -172,7 +171,7 @@ public class GridCachePartitionedTxSalvageSelfTest extends GridCommonAbstractTes
*
* @param mode Transaction mode (PESSIMISTIC, OPTIMISTIC).
* @param prepare Whether to prepare transaction state
- * (i.e. call {@link IgniteInternalTx#prepare()}).
+ * (i.e. call {@link GridNearTxLocal#prepare()}).
* @throws Exception If failed.
*/
private void checkSalvageBeforeTimeout(TransactionConcurrency mode, boolean prepare) throws Exception {
@@ -198,7 +197,7 @@ public class GridCachePartitionedTxSalvageSelfTest extends GridCommonAbstractTes
* Start new transaction on the grid(0) and put some keys to it.
*
* @param mode Transaction mode (PESSIMISTIC, OPTIMISTIC).
- * @param prepare Whether to prepare transaction state (i.e. call {@link IgniteInternalTx#prepare()}).
+ * @param prepare Whether to prepare transaction state (i.e. call {@link GridNearTxLocal#prepare()}).
* @throws Exception If failed.
*/
private void startTxAndPutKeys(final TransactionConcurrency mode, final boolean prepare) throws Exception {
http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticDeadlockDetectionTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticDeadlockDetectionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticDeadlockDetectionTest.java
index 005e4a5..212675b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticDeadlockDetectionTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticDeadlockDetectionTest.java
@@ -25,6 +25,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.Callable;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@@ -111,9 +112,6 @@ public class TxOptimisticDeadlockDetectionTest extends GridCommonAbstractTest {
cfg.setClientMode(client);
- // Test spi blocks message send, this can cause hang with striped pool.
- cfg.setStripedPoolSize(-1);
-
return cfg;
}
@@ -555,15 +553,25 @@ public class TxOptimisticDeadlockDetectionTest extends GridCommonAbstractTest {
GridCacheVersion txId = req.version();
- if (TX_IDS.contains(txId)) {
- while (TX_IDS.size() < TX_CNT) {
- try {
- U.sleep(50);
- }
- catch (IgniteInterruptedCheckedException e) {
- e.printStackTrace();
+ if (TX_IDS.contains(txId) && TX_IDS.size() < TX_CNT) {
+ GridTestUtils.runAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ while (TX_IDS.size() < TX_CNT) {
+ try {
+ U.sleep(50);
+ }
+ catch (IgniteInterruptedCheckedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ TestCommunicationSpi.super.sendMessage(node, msg, ackC);
+
+ return null;
}
- }
+ });
+
+ return;
}
}
else if (msg0 instanceof GridNearTxPrepareResponse) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsBlockMessageSystemPoolStarvationSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsBlockMessageSystemPoolStarvationSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsBlockMessageSystemPoolStarvationSelfTest.java
index fba78c8..16596ed 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsBlockMessageSystemPoolStarvationSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsBlockMessageSystemPoolStarvationSelfTest.java
@@ -34,7 +34,7 @@ import org.apache.ignite.igfs.IgfsPath;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.util.lang.GridAbsPredicateX;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.G;
@@ -118,7 +118,7 @@ public class IgfsBlockMessageSystemPoolStarvationSelfTest extends IgfsCommonAbst
@Override public Void call() throws Exception {
GridCacheAdapter dataCache = dataCache(attacker);
- try (IgniteInternalTx tx = dataCache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
+ try (GridNearTxLocal tx = dataCache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
dataCache.put(DATA_KEY, 0);
txStartLatch.countDown();
@@ -257,7 +257,7 @@ public class IgfsBlockMessageSystemPoolStarvationSelfTest extends IgfsCommonAbst
cfg.setLocalHost("127.0.0.1");
cfg.setConnectorConfiguration(null);
- cfg.setStripedPoolSize(0);
+ cfg.setStripedPoolSize(2);
cfg.setSystemThreadPoolSize(2);
cfg.setRebalanceThreadPoolSize(1);
cfg.setPublicThreadPoolSize(1);
http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite2.java
index 0513786..1901283 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite2.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite2.java
@@ -44,7 +44,8 @@ public class IgniteCacheRestartTestSuite2 extends TestSuite {
suite.addTestSuite(IgniteCachePutAllRestartTest.class);
suite.addTestSuite(GridCachePutAllFailoverSelfTest.class);
- suite.addTestSuite(IgniteBinaryMetadataUpdateNodeRestartTest.class);
+ // TODO IGNITE-4768.
+ //suite.addTestSuite(IgniteBinaryMetadataUpdateNodeRestartTest.class);
suite.addTestSuite(IgniteCacheGetRestartTest.class);
http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTxRecoverySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTxRecoverySelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTxRecoverySelfTestSuite.java
index 7363c7c..7bd7797 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTxRecoverySelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTxRecoverySelfTestSuite.java
@@ -25,6 +25,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCacheCo
import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCachePartitionedNearDisabledPrimaryNodeFailureRecoveryTest;
import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCachePartitionedPrimaryNodeFailureRecoveryTest;
import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCachePartitionedTwoBackupsPrimaryNodeFailureRecoveryTest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCacheTxRecoveryRollbackTest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheNearTxPessimisticOriginatingNodeFailureSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.replicated.GridCacheReplicatedTxOriginatingNodeFailureSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.replicated.GridCacheReplicatedTxPessimisticOriginatingNodeFailureSelfTest;
@@ -54,6 +55,8 @@ public class IgniteCacheTxRecoverySelfTestSuite extends TestSuite {
suite.addTestSuite(GridCacheNearTxPessimisticOriginatingNodeFailureSelfTest.class);
suite.addTestSuite(GridCacheReplicatedTxPessimisticOriginatingNodeFailureSelfTest.class);
+ suite.addTestSuite(IgniteCacheTxRecoveryRollbackTest.class);
+
return suite;
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/hibernate/src/main/java/org/apache/ignite/cache/hibernate/HibernateReadWriteAccessStrategy.java
----------------------------------------------------------------------
diff --git a/modules/hibernate/src/main/java/org/apache/ignite/cache/hibernate/HibernateReadWriteAccessStrategy.java b/modules/hibernate/src/main/java/org/apache/ignite/cache/hibernate/HibernateReadWriteAccessStrategy.java
index bbb1d4e..1e2aded 100644
--- a/modules/hibernate/src/main/java/org/apache/ignite/cache/hibernate/HibernateReadWriteAccessStrategy.java
+++ b/modules/hibernate/src/main/java/org/apache/ignite/cache/hibernate/HibernateReadWriteAccessStrategy.java
@@ -22,8 +22,8 @@ import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.util.GridLeanSet;
-import org.apache.ignite.transactions.Transaction;
import org.hibernate.cache.CacheException;
import org.hibernate.cache.spi.access.AccessType;
import org.hibernate.cache.spi.access.SoftLock;
@@ -250,15 +250,15 @@ public class HibernateReadWriteAccessStrategy extends HibernateAccessStrategyAda
if (ctx.unlocked(key)) { // Finish transaction if last key is unlocked.
txCtx.remove();
- Transaction tx = cache.tx();
+ GridNearTxLocal tx = cache.tx();
assert tx != null;
try {
- tx.commit();
+ tx.proxy().commit();
}
finally {
- tx.close();
+ tx.proxy().close();
}
assert cache.tx() == null;
@@ -275,10 +275,10 @@ public class HibernateReadWriteAccessStrategy extends HibernateAccessStrategyAda
if (ctx != null) {
txCtx.remove();
- Transaction tx = cache.tx();
+ GridNearTxLocal tx = cache.tx();
if (tx != null)
- tx.rollback();
+ tx.proxy().rollback();
}
}
catch (IgniteException e) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaManager.java
----------------------------------------------------------------------
diff --git a/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaManager.java b/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaManager.java
index f581ebb..5047491 100644
--- a/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaManager.java
+++ b/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaManager.java
@@ -27,6 +27,7 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.jta.CacheTmLookup;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.TransactionConfiguration;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.lifecycle.LifecycleAware;
import org.jetbrains.annotations.Nullable;
@@ -151,7 +152,7 @@ public class CacheJtaManager extends CacheJtaManagerAdapter {
Transaction jtaTx = jtaTm.getTransaction();
if (jtaTx != null) {
- IgniteInternalTx tx = cctx.tm().userTx();
+ GridNearTxLocal tx = cctx.tm().userTx();
if (tx == null) {
TransactionConfiguration tCfg = cctx.kernalContext().config()
http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaResource.java
----------------------------------------------------------------------
diff --git a/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaResource.java b/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaResource.java
index f43981e..649f7c4 100644
--- a/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaResource.java
+++ b/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaResource.java
@@ -27,7 +27,7 @@ import javax.transaction.xa.Xid;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.transactions.TransactionState;
@@ -51,7 +51,7 @@ final class CacheJtaResource implements XAResource, Synchronization {
private static final Xid[] NO_XID = new Xid[] {};
/** Cache transaction. */
- private IgniteInternalTx cacheTx;
+ private GridNearTxLocal cacheTx;
/** */
private Xid xid;
@@ -60,7 +60,7 @@ final class CacheJtaResource implements XAResource, Synchronization {
* @param cacheTx Cache jta.
* @param ctx Kernal context.
*/
- public CacheJtaResource(IgniteInternalTx cacheTx, GridKernalContext ctx) {
+ CacheJtaResource(GridNearTxLocal cacheTx, GridKernalContext ctx) {
assert cacheTx != null;
assert ctx != null;
@@ -291,7 +291,7 @@ final class CacheJtaResource implements XAResource, Synchronization {
*
* @return {@code true} if jta was already committed or rolled back.
*/
- public boolean isFinished() {
+ boolean isFinished() {
TransactionState state = cacheTx.state();
return state == COMMITTED || state == ROLLED_BACK;