You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2015/09/03 03:03:35 UTC
[26/50] [abbrv] ignite git commit: IGNITE-264 - Fixing tests WIP.
IGNITE-264 - Fixing tests WIP.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f9511aff
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f9511aff
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f9511aff
Branch: refs/heads/ignite-264
Commit: f9511aff95fd6fecff3da3bc70143d3e74e4aaaf
Parents: a733984
Author: Alexey Goncharuk <ag...@gridgain.com>
Authored: Thu Aug 13 18:28:00 2015 -0700
Committer: Alexey Goncharuk <ag...@gridgain.com>
Committed: Thu Aug 13 18:28:00 2015 -0700
----------------------------------------------------------------------
.../near/GridNearOptimisticTxPrepareFuture.java | 33 ++--
.../GridNearPessimisticTxPrepareFuture.java | 8 +-
.../near/GridNearTxFinishFuture.java | 98 +++++------
.../cache/distributed/near/GridNearTxLocal.java | 43 +----
.../dht/GridCacheTxNodeFailureSelfTest.java | 165 ++++++++++++++++---
5 files changed, 230 insertions(+), 117 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/f9511aff/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index 95e1847..28069b2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -118,8 +118,13 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
*/
void onError(@Nullable UUID nodeId, @Nullable Iterable<GridDistributedTxMapping> mappings, Throwable e) {
if (X.hasCause(e, ClusterTopologyCheckedException.class) || X.hasCause(e, ClusterTopologyException.class)) {
- if (tx.onePhaseCommit())
+ if (tx.onePhaseCommit()) {
tx.markForBackupCheck();
+
+ onComplete();
+
+ return;
+ }
}
if (err.compareAndSet(null, e)) {
@@ -189,17 +194,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
this.err.compareAndSet(null, err);
- if (err == null)
- tx.state(PREPARED);
-
- if (super.onDone(tx, err)) {
- // Don't forget to clean up.
- cctx.mvcc().removeFuture(this);
-
- return true;
- }
-
- return false;
+ return onComplete();
}
/**
@@ -213,10 +208,20 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
/**
* Completeness callback.
*/
- private void onComplete() {
- if (super.onDone(tx, err.get()))
+ private boolean onComplete() {
+ Throwable err0 = err.get();
+
+ if (err0 == null || tx.needCheckBackup())
+ tx.state(PREPARED);
+
+ if (super.onDone(tx, err0)) {
// Don't forget to clean up.
cctx.mvcc().removeFuture(this);
+
+ return true;
+ }
+
+ return false;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/f9511aff/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
index 6de46f4..6ac1033 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
@@ -242,7 +242,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
err = this.err.get();
- if (err == null)
+ if (err == null || tx.needCheckBackup())
tx.state(PREPARED);
if (super.onDone(tx, err)) {
@@ -320,9 +320,13 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
* @param e Error.
*/
void onNodeLeft(ClusterTopologyCheckedException e) {
- if (tx.onePhaseCommit())
+ if (tx.onePhaseCommit()) {
tx.markForBackupCheck();
+ // Do not fail future for one-phase transaction right away.
+ onDone(tx);
+ }
+
onError(e);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f9511aff/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index 1e16982..95f5149 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -33,6 +33,7 @@ import org.apache.ignite.internal.util.tostring.*;
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.lang.*;
+import org.apache.ignite.transactions.*;
import org.jetbrains.annotations.*;
import java.util.*;
@@ -227,29 +228,46 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
}
/** {@inheritDoc} */
- @Override public boolean onDone(IgniteInternalTx tx, Throwable err) {
+ @Override public boolean onDone(IgniteInternalTx tx0, Throwable err) {
if ((initialized() || err != null)) {
- if (this.tx.onePhaseCommit() && (this.tx.state() == COMMITTING)) {
+ if (tx.needCheckBackup()) {
+ assert tx.onePhaseCommit();
+
+ if (err != null)
+ err = new TransactionRollbackException("Failed to commit transaction.", err);
+
+ try {
+ tx.finish(err == null);
+ }
+ catch (IgniteCheckedException e) {
+ if (err != null)
+ err.addSuppressed(e);
+ else
+ err = e;
+ }
+ }
+
+ if (tx.onePhaseCommit()) {
finishOnePhase();
- this.tx.tmFinish(err == null);
+ tx.tmFinish(err == null);
}
Throwable th = this.err.get();
- if (super.onDone(tx, th != null ? th : err)) {
+ if (super.onDone(tx0, th != null ? th : err)) {
if (error() instanceof IgniteTxHeuristicCheckedException) {
- AffinityTopologyVersion topVer = this.tx.topologyVersion();
+ AffinityTopologyVersion topVer = tx.topologyVersion();
- for (IgniteTxEntry e : this.tx.writeMap().values()) {
+ for (IgniteTxEntry e : tx.writeMap().values()) {
GridCacheContext cacheCtx = e.context();
try {
if (e.op() != NOOP && !cacheCtx.affinity().localNode(e.key(), topVer)) {
- GridCacheEntryEx Entry = cacheCtx.cache().peekEx(e.key());
+ GridCacheEntryEx entry = cacheCtx.cache().peekEx(e.key());
- if (Entry != null)
- Entry.invalidate(null, this.tx.xidVersion());
+ if (entry != null)
+ entry.invalidate(null, tx.xidVersion());
}
}
catch (Throwable t) {
@@ -297,13 +315,24 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
* Initializes future.
*/
void finish() {
- if (tx.onePhaseCommit()) {
- if (commit) {
- if (tx.needCheckBackup())
- checkBackup();
- else if (needFinishOnePhase()) {
+ if (tx.needCheckBackup()) {
+ assert tx.onePhaseCommit();
+
+ checkBackup();
+
+ // If checkBackup is set, it means that primary node has crashed and we will not need to send
+ // finish request to it, so we can mark future as initialized.
+ markInitialized();
+ }
+
+ try {
+ if (tx.finish(commit) || (!commit && tx.state() == UNKNOWN)) {
+ if ((tx.onePhaseCommit() && needFinishOnePhase()) || (!tx.onePhaseCommit() && mappings != null))
finish(mappings.values());
+ markInitialized();
+
+ if (!isSync()) {
boolean complete = true;
for (IgniteInternalFuture<?> f : pending())
@@ -315,40 +344,16 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
onComplete();
}
}
-
- markInitialized();
-
- return;
+ else
+ onError(new IgniteCheckedException("Failed to commit transaction: " + CU.txString(tx)));
}
+ catch (Error | RuntimeException e) {
+ onError(e);
- if (mappings != null) {
- finish(mappings.values());
-
- markInitialized();
-
- if (!isSync()) {
- boolean complete = true;
-
- for (IgniteInternalFuture<?> f : pending())
- // Mini-future in non-sync mode gets done when message gets sent.
- if (isMini(f) && !f.isDone())
- complete = false;
-
- if (complete)
- onComplete();
- }
+ throw e;
}
- else {
- assert !commit;
-
- try {
- tx.rollback();
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to rollback empty transaction: " + tx, e);
- }
-
- markInitialized();
+ catch (IgniteCheckedException e) {
+ onError(e);
}
}
@@ -641,8 +646,9 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
void onResult(GridDhtTxFinishResponse res) {
assert backup != null;
- if (res.checkCommittedError() != null)
+ if (res.checkCommittedError() != null) {
onDone(res.checkCommittedError());
+ }
else
onDone(tx);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f9511aff/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index c40ac5e..0421309 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -712,7 +712,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
cctx.mvcc().addFuture(fut);
- IgniteInternalFuture<?> prepareFut = prepFut.get();
+ final IgniteInternalFuture<?> prepareFut = prepFut.get();
prepareFut.listen(new CI1<IgniteInternalFuture<?>>() {
@Override public void apply(IgniteInternalFuture<?> f) {
@@ -720,24 +720,15 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
try {
// Make sure that here are no exceptions.
- if (!needCheckBackup()) {
- f.get();
-
- if (finish(true))
- fut0.finish();
- else
- fut0.onError(new IgniteCheckedException("Failed to commit transaction: " +
- CU.txString(GridNearTxLocal.this)));
- }
- else {
- assert onePhaseCommit();
+ prepareFut.get();
- fut0.finish();
- }
+ fut0.finish();
}
catch (Error | RuntimeException e) {
commitErr.compareAndSet(null, e);
+ fut0.onError(e);
+
throw e;
}
catch (IgniteCheckedException e) {
@@ -779,15 +770,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
log.debug("Got optimistic tx failure [tx=" + this + ", err=" + e + ']');
}
- try {
- if (finish(false) || state() == UNKNOWN)
- fut.finish();
- else
- fut.onError(new IgniteCheckedException("Failed to gracefully rollback transaction: " + CU.txString(this)));
- }
- catch (IgniteCheckedException e) {
- fut.onError(e);
- }
+ fut.finish();
}
else {
prepFut.listen(new CI1<IgniteInternalFuture<?>>() {
@@ -803,19 +786,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
GridNearTxFinishFuture fut0 = rollbackFut.get();
- try {
- if (finish(false) || state() == UNKNOWN)
- fut0.finish();
- else
- fut0.onError(new IgniteCheckedException("Failed to gracefully rollback transaction: " +
- CU.txString(GridNearTxLocal.this)));
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to gracefully rollback transaction: " +
- CU.txString(GridNearTxLocal.this), e);
-
- fut0.onError(e);
- }
+ fut0.finish();
}
});
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f9511aff/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java
index 773ec25..bca3b6f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java
@@ -23,6 +23,10 @@ import org.apache.ignite.cache.affinity.*;
import org.apache.ignite.cluster.*;
import org.apache.ignite.configuration.*;
import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.managers.communication.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
import org.apache.ignite.plugin.extensions.communication.*;
import org.apache.ignite.spi.*;
import org.apache.ignite.spi.communication.tcp.*;
@@ -30,6 +34,7 @@ import org.apache.ignite.testframework.*;
import org.apache.ignite.testframework.junits.common.*;
import org.apache.ignite.transactions.*;
+import javax.cache.*;
import java.util.*;
import java.util.concurrent.*;
@@ -68,34 +73,94 @@ public class GridCacheTxNodeFailureSelfTest extends GridCommonAbstractTest {
* @throws Exception If failed.
*/
public void testPrimaryNodeFailureBackupCommitPessimistic() throws Exception {
- checkPrimaryNodeFailureBackupCommit(PESSIMISTIC, false);
+ checkPrimaryNodeFailureBackupCommit(PESSIMISTIC, false, true);
}
/**
* @throws Exception If failed.
*/
public void testPrimaryNodeFailureBackupCommitOptimistic() throws Exception {
- checkPrimaryNodeFailureBackupCommit(OPTIMISTIC, false);
+ checkPrimaryNodeFailureBackupCommit(OPTIMISTIC, false, true);
}
/**
* @throws Exception If failed.
*/
public void testPrimaryNodeFailureBackupCommitPessimisticOnBackup() throws Exception {
- checkPrimaryNodeFailureBackupCommit(PESSIMISTIC, true);
+ checkPrimaryNodeFailureBackupCommit(PESSIMISTIC, true, true);
}
/**
* @throws Exception If failed.
*/
public void testPrimaryNodeFailureBackupCommitOptimisticOnBackup() throws Exception {
- checkPrimaryNodeFailureBackupCommit(OPTIMISTIC, true);
+ checkPrimaryNodeFailureBackupCommit(OPTIMISTIC, true, true);
}
/**
* @throws Exception If failed.
*/
- private void checkPrimaryNodeFailureBackupCommit(final TransactionConcurrency conc, boolean backup) throws Exception {
+ public void testPrimaryNodeFailureBackupRollbackPessimistic() throws Exception {
+ checkPrimaryNodeFailureBackupCommit(PESSIMISTIC, false, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPrimaryNodeFailureBackupRollbackOptimistic() throws Exception {
+ checkPrimaryNodeFailureBackupCommit(OPTIMISTIC, false, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPrimaryNodeFailureBackupRollbackPessimisticOnBackup() throws Exception {
+ checkPrimaryNodeFailureBackupCommit(PESSIMISTIC, true, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPrimaryNodeFailureBackupRollbackOptimisticOnBackup() throws Exception {
+ checkPrimaryNodeFailureBackupCommit(OPTIMISTIC, true, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPrimaryNodeFailureBackupCommitImplicit() throws Exception {
+ checkPrimaryNodeFailureBackupCommit(null, false, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPrimaryNodeFailureBackupCommitImplicitOnBackup() throws Exception {
+ checkPrimaryNodeFailureBackupCommit(null, true, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPrimaryNodeFailureBackupRollbackImplicit() throws Exception {
+ checkPrimaryNodeFailureBackupCommit(null, false, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPrimaryNodeFailureBackupRollbackImplicitOnBackup() throws Exception {
+ checkPrimaryNodeFailureBackupCommit(null, true, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ private void checkPrimaryNodeFailureBackupCommit(
+ final TransactionConcurrency conc,
+ boolean backup,
+ final boolean commit
+ ) throws Exception {
startGrids(gridCount());
awaitPartitionMapExchange();
@@ -111,25 +176,79 @@ public class GridCacheTxNodeFailureSelfTest extends GridCommonAbstractTest {
final CountDownLatch commitLatch = new CountDownLatch(1);
- if (!backup) {
- communication(2).bannedClasses(Collections.<Class>singletonList(GridDhtTxPrepareResponse.class));
- communication(3).bannedClasses(Collections.<Class>singletonList(GridDhtTxPrepareResponse.class));
+ if (!commit) {
+ communication(1).bannedClasses(Collections.<Class>singletonList(GridDhtTxPrepareRequest.class));
+ }
+ else {
+ if (!backup) {
+ communication(2).bannedClasses(Collections.<Class>singletonList(GridDhtTxPrepareResponse.class));
+ communication(3).bannedClasses(Collections.<Class>singletonList(GridDhtTxPrepareResponse.class));
+ }
+ else
+ communication(0).bannedClasses(Collections.<Class>singletonList(GridDhtTxPrepareResponse.class));
}
- else
- communication(0).bannedClasses(Collections.<Class>singletonList(GridDhtTxPrepareResponse.class));
IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
@Override public Object call() throws Exception {
- try (Transaction tx = ignite.transactions().txStart(conc, REPEATABLE_READ)) {
- cache.put(key, key);
+ if (conc != null) {
+ try (Transaction tx = ignite.transactions().txStart(conc, REPEATABLE_READ)) {
+ cache.put(key, key);
+
+ Transaction asyncTx = (Transaction)tx.withAsync();
+
+ asyncTx.commit();
- Transaction asyncTx = (Transaction)tx.withAsync();
+ commitLatch.countDown();
- asyncTx.commit();
+ try {
+ IgniteFuture<Object> fut = asyncTx.future();
+
+ fut.get();
+
+ if (!commit) {
+ error("Transaction has been committed");
+
+ fail("Transaction has been committed: " + tx);
+ }
+ }
+ catch (TransactionRollbackException e) {
+ if (commit) {
+ error(e.getMessage(), e);
+
+ fail("Failed to commit: " + e);
+ }
+ else
+ assertTrue(X.hasCause(e, TransactionRollbackException.class));
+ }
+ }
+ }
+ else {
+ IgniteCache<Object, Object> cache0 = cache.withAsync();
+
+ cache0.put(key, key);
+
+ Thread.sleep(1000);
commitLatch.countDown();
- asyncTx.future().get();
+ try {
+ cache0.future().get();
+
+ if (!commit) {
+ error("Transaction has been committed");
+
+ fail("Transaction has been committed.");
+ }
+ }
+ catch (CacheException e) {
+ if (commit) {
+ error(e.getMessage(), e);
+
+ fail("Failed to commit: " + e);
+ }
+ else
+ assertTrue(X.hasCause(e, TransactionRollbackException.class));
+ }
}
return null;
@@ -140,8 +259,11 @@ public class GridCacheTxNodeFailureSelfTest extends GridCommonAbstractTest {
stopGrid(1);
- // No exception should happen since transaction is committed on the backup node.
+ // Check that thread successfully finished.
fut.get();
+
+ // Check there are no hanging transactions.
+ assertEquals(0, ((IgniteEx)ignite).context().cache().context().tm().idMapSize());
}
finally {
stopAllGrids();
@@ -194,9 +316,14 @@ public class GridCacheTxNodeFailureSelfTest extends GridCommonAbstractTest {
}
/** {@inheritDoc} */
- @Override public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException {
- if (!bannedClasses.contains(msg.getClass()))
- super.sendMessage(node, msg);
+ @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackClosure) throws IgniteSpiException {
+ GridIoMessage ioMsg = (GridIoMessage)msg;
+
+ if (!bannedClasses.contains(ioMsg.message().getClass())) {
+ super.sendMessage(node, msg, ackClosure);
+
+ U.debug(">>> Sending message: " + msg);
+ }
}
}
}