You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/05/13 09:08:22 UTC
[02/19] incubator-ignite git commit: # ignite-157-1
# ignite-157-1
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a238ce35
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a238ce35
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a238ce35
Branch: refs/heads/ignite-sprint-5
Commit: a238ce357fb0cb0c5378fbfc64341c3167843db5
Parents: 93876df
Author: sboikov <sb...@gridgain.com>
Authored: Thu May 7 14:49:33 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu May 7 16:19:26 2015 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheMvccManager.java | 4 +-
.../cache/distributed/dht/GridDhtTxLocal.java | 32 ++++----
.../distributed/dht/GridDhtTxLocalAdapter.java | 27 +++++++
.../distributed/dht/GridDhtTxPrepareFuture.java | 81 ++++++++++----------
.../near/GridAbstractNearTxPrepareFuture.java | 3 +
.../near/GridNearOptimisticTxPrepareFuture.java | 13 +++-
.../GridNearPessimisticTxPrepareFuture.java | 15 +++-
.../cache/distributed/near/GridNearTxLocal.java | 43 +++++------
.../near/GridNearTxPrepareFuture.java | 20 ++---
.../cache/transactions/IgniteInternalTx.java | 4 +-
.../cache/transactions/IgniteTxAdapter.java | 2 +-
.../cache/transactions/IgniteTxHandler.java | 65 +++++++---------
.../cache/transactions/IgniteTxManager.java | 12 +--
.../cache/GridCacheAbstractFullApiSelfTest.java | 2 +-
...ePrimaryNodeFailureRecoveryAbstractTest.java | 4 +-
.../IgniteCacheFailoverTestSuite.java | 7 +-
16 files changed, 178 insertions(+), 156 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a238ce35/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
index 0bb97a9..c05e4b4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
@@ -510,7 +510,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
* @return Future.
*/
@SuppressWarnings({"unchecked"})
- @Nullable public <T> GridCacheFuture<T> future(GridCacheVersion ver, IgniteUuid futId) {
+ @Nullable public GridCacheFuture future(GridCacheVersion ver, IgniteUuid futId) {
Collection<? extends GridCacheFuture> futs = this.futs.get(ver);
if (futs != null)
@@ -519,7 +519,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
if (log.isDebugEnabled())
log.debug("Found future in futures map: " + fut);
- return (GridCacheFuture<T>)fut;
+ return fut;
}
if (log.isDebugEnabled())
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a238ce35/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
index 07ced0d..614f520 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
@@ -284,7 +284,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
}
/** {@inheritDoc} */
- @Override public IgniteInternalFuture<IgniteInternalTx> prepareAsync() {
+ @Override public IgniteInternalFuture<?> prepareAsync() {
if (optimistic()) {
assert isSystemInvalidate();
@@ -296,7 +296,6 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
nearMiniId,
null,
true,
- null,
null);
}
@@ -305,14 +304,13 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
if (fut == null) {
// Future must be created before any exception can be thrown.
- if (!prepFut.compareAndSet(null, fut = new GridDhtTxPrepareFuture<>(
+ if (!prepFut.compareAndSet(null, fut = new GridDhtTxPrepareFuture(
cctx,
this,
nearMiniId,
Collections.<IgniteTxKey, GridCacheVersion>emptyMap(),
true,
needReturnValue(),
- null,
null)))
return prepFut.get();
}
@@ -371,7 +369,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
* @param lastBackups IDs of backup nodes receiving last prepare request.
* @return Future that will be completed when locks are acquired.
*/
- public IgniteInternalFuture<IgniteInternalTx> prepareAsync(
+ public IgniteInternalFuture<GridNearTxPrepareResponse> prepareAsync(
@Nullable Iterable<IgniteTxEntry> reads,
@Nullable Iterable<IgniteTxEntry> writes,
Map<IgniteTxKey, GridCacheVersion> verMap,
@@ -379,8 +377,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
IgniteUuid nearMiniId,
Map<UUID, Collection<UUID>> txNodes,
boolean last,
- Collection<UUID> lastBackups,
- IgniteInClosure<GridNearTxPrepareResponse> completeCb
+ Collection<UUID> lastBackups
) {
// In optimistic mode prepare still can be called explicitly from salvageTx.
GridDhtTxPrepareFuture fut = prepFut.get();
@@ -389,21 +386,20 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
init();
// Future must be created before any exception can be thrown.
- if (!prepFut.compareAndSet(null, fut = new GridDhtTxPrepareFuture<>(
+ if (!prepFut.compareAndSet(null, fut = new GridDhtTxPrepareFuture(
cctx,
this,
nearMiniId,
verMap,
last,
needReturnValue(),
- lastBackups,
- completeCb))) {
+ lastBackups))) {
GridDhtTxPrepareFuture f = prepFut.get();
assert f.nearMiniId().equals(nearMiniId) : "Wrong near mini id on existing future " +
"[futMiniId=" + f.nearMiniId() + ", miniId=" + nearMiniId + ", fut=" + f + ']';
- return f;
+ return chainOnePhasePrepare(f);
}
}
else {
@@ -411,7 +407,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
"[futMiniId=" + fut.nearMiniId() + ", miniId=" + nearMiniId + ", fut=" + fut + ']';
// Prepare was called explicitly.
- return fut;
+ return chainOnePhasePrepare(fut);
}
if (state() != PREPARING) {
@@ -475,7 +471,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
}
}
- return fut;
+ return chainOnePhasePrepare(fut);
}
/** {@inheritDoc} */
@@ -517,8 +513,8 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
}
}
else
- prep.listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() {
- @Override public void apply(IgniteInternalFuture<IgniteInternalTx> f) {
+ prep.listen(new CI1<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> f) {
try {
f.get(); // Check for errors of a parent future.
@@ -605,8 +601,8 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
else {
prepFut.complete();
- prepFut.listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() {
- @Override public void apply(IgniteInternalFuture<IgniteInternalTx> f) {
+ prepFut.listen(new CI1<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> f) {
try {
f.get(); // Check for errors of a parent future.
}
@@ -686,7 +682,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
- @Nullable @Override public IgniteInternalFuture<IgniteInternalTx> currentPrepareFuture() {
+ @Nullable @Override public IgniteInternalFuture<?> currentPrepareFuture() {
return prepFut.get();
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a238ce35/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
index 08fcaf6..d886989 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
@@ -24,6 +24,7 @@ import org.apache.ignite.internal.processors.affinity.*;
import org.apache.ignite.internal.managers.communication.*;
import org.apache.ignite.internal.processors.cache.*;
import org.apache.ignite.internal.processors.cache.distributed.*;
+import org.apache.ignite.internal.processors.cache.distributed.near.*;
import org.apache.ignite.internal.processors.cache.transactions.*;
import org.apache.ignite.internal.processors.cache.version.*;
import org.apache.ignite.internal.util.*;
@@ -885,6 +886,32 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
*/
protected abstract void clearPrepareFuture(GridDhtTxPrepareFuture fut);
+ /**
+ * @return {@code True} if transaction if finished on prepare step.
+ */
+ protected final boolean commitOnPrepare() {
+ return onePhaseCommit() && !near();
+ }
+
+ /**
+ * @param prepFut Prepare future.
+ * @return If transaction if finished on prepare step returns future which is completed after transaction finish.
+ */
+ protected final IgniteInternalFuture<GridNearTxPrepareResponse> chainOnePhasePrepare(
+ final GridDhtTxPrepareFuture prepFut) {
+ if (commitOnPrepare()) {
+ return finishFuture().chain(new CX1<IgniteInternalFuture<IgniteInternalTx>, GridNearTxPrepareResponse>() {
+ @Override public GridNearTxPrepareResponse applyx(IgniteInternalFuture<IgniteInternalTx> finishFut)
+ throws IgniteCheckedException
+ {
+ return prepFut.get();
+ }
+ });
+ }
+
+ return prepFut;
+ }
+
/** {@inheritDoc} */
@Override public void rollback() throws IgniteCheckedException {
try {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a238ce35/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 3a1a80a..0e64726 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -50,19 +50,32 @@ import static org.apache.ignite.transactions.TransactionState.*;
*
*/
@SuppressWarnings("unchecked")
-public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFuture<IgniteInternalTx>
- implements GridCacheMvccFuture<IgniteInternalTx> {
+public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInternalTx, GridNearTxPrepareResponse>
+ implements GridCacheMvccFuture<GridNearTxPrepareResponse> {
/** */
private static final long serialVersionUID = 0L;
/** Logger reference. */
private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
+ /** */
+ private static final IgniteReducer<IgniteInternalTx, GridNearTxPrepareResponse> REDUCER =
+ new IgniteReducer<IgniteInternalTx, GridNearTxPrepareResponse>() {
+ @Override public boolean collect(IgniteInternalTx e) {
+ return true;
+ }
+
+ @Override public GridNearTxPrepareResponse reduce() {
+ // Nothing to aggregate.
+ return null;
+ }
+ };
+
/** Logger. */
private static IgniteLogger log;
/** Context. */
- private GridCacheSharedContext<K, V> cctx;
+ private GridCacheSharedContext<?, ?> cctx;
/** Future ID. */
private IgniteUuid futId;
@@ -128,15 +141,13 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
/** */
private boolean invoke;
- /** */
- private IgniteInClosure<GridNearTxPrepareResponse> completeCb;
-
/**
* @param cctx Context.
* @param tx Transaction.
* @param nearMiniId Near mini future id.
* @param dhtVerMap DHT versions map.
* @param last {@code True} if this is last prepare operation for node.
+ * @param retVal Return value flag.
* @param lastBackups IDs of backup nodes receiving last prepare request during this prepare.
*/
public GridDhtTxPrepareFuture(
@@ -146,19 +157,9 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
Map<IgniteTxKey, GridCacheVersion> dhtVerMap,
boolean last,
boolean retVal,
- Collection<UUID> lastBackups,
- IgniteInClosure<GridNearTxPrepareResponse> completeCb
+ Collection<UUID> lastBackups
) {
- super(cctx.kernalContext(), new IgniteReducer<IgniteInternalTx, IgniteInternalTx>() {
- @Override public boolean collect(IgniteInternalTx e) {
- return true;
- }
-
- @Override public IgniteInternalTx reduce() {
- // Nothing to aggregate.
- return tx;
- }
- });
+ super(REDUCER);
this.cctx = cctx;
this.tx = tx;
@@ -178,8 +179,6 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
this.retVal = retVal;
- this.completeCb = completeCb;
-
assert dhtMap != null;
assert nearMap != null;
}
@@ -382,7 +381,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
* @param t Error.
*/
public void onError(Throwable t) {
- onDone(tx, t);
+ onDone(null, t);
}
/**
@@ -479,7 +478,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
}
/** {@inheritDoc} */
- @Override public boolean onDone(IgniteInternalTx tx0, Throwable err) {
+ @Override public boolean onDone(GridNearTxPrepareResponse res0, Throwable err) {
assert err != null || (initialized() && !hasPending()) : "On done called for prepare future that has " +
"pending mini futures: " + this;
@@ -495,16 +494,15 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
// Must create prepare response before transaction is committed to grab correct return value.
final GridNearTxPrepareResponse res = createPrepareResponse();
- onComplete();
+ onComplete(res);
- if (!tx.near()) {
+ if (tx.commitOnPrepare()) {
if (tx.markFinalizing(IgniteInternalTx.FinalizationStatus.USER_FINISH)) {
IgniteInternalFuture<IgniteInternalTx> fut = this.err.get() == null ?
tx.commitAsync() : tx.rollbackAsync();
fut.listen(new CIX1<IgniteInternalFuture<IgniteInternalTx>>() {
- @Override
- public void applyx(IgniteInternalFuture<IgniteInternalTx> fut) {
+ @Override public void applyx(IgniteInternalFuture<IgniteInternalTx> fut) {
try {
if (replied.compareAndSet(false, true))
sendPrepareResponse(res);
@@ -530,15 +528,17 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
}
else {
if (replied.compareAndSet(false, true)) {
+ GridNearTxPrepareResponse res = createPrepareResponse();
+
try {
- sendPrepareResponse(createPrepareResponse());
+ sendPrepareResponse(res);
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to send prepare response for transaction: " + tx, e);
}
finally {
// Will call super.onDone().
- onComplete();
+ onComplete(res);
}
return true;
@@ -562,16 +562,12 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
}
/**
+ * @param res Response.
* @throws IgniteCheckedException If failed to send response.
*/
private void sendPrepareResponse(GridNearTxPrepareResponse res) throws IgniteCheckedException {
if (!tx.nearNodeId().equals(cctx.localNodeId()))
cctx.io().send(tx.nearNodeId(), res, tx.ioPolicy());
- else {
- assert completeCb != null;
-
- completeCb.apply(res);
- }
}
/**
@@ -616,10 +612,10 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
for (IgniteTxEntry e : writes) {
IgniteTxEntry txEntry = tx.entry(e.txKey());
- GridCacheContext cacheCtx = txEntry.context();
-
assert txEntry != null : "Missing tx entry for key [tx=" + tx + ", key=" + e.txKey() + ']';
+ GridCacheContext cacheCtx = txEntry.context();
+
while (true) {
try {
GridCacheEntryEx entry = txEntry.cached();
@@ -682,13 +678,14 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
/**
* Completeness callback.
*
+ * @param res Response.
* @return {@code True} if {@code done} flag was changed as a result of this call.
*/
- private boolean onComplete() {
+ private boolean onComplete(@Nullable GridNearTxPrepareResponse res) {
if (last || tx.isSystemInvalidate())
tx.state(PREPARED);
- if (super.onDone(tx, err.get())) {
+ if (super.onDone(res, err.get())) {
// Don't forget to clean up.
cctx.mvcc().removeFuture(this);
@@ -702,7 +699,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
* Completes this future.
*/
public void complete() {
- onComplete();
+ onComplete(null);
}
/**
@@ -717,7 +714,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
if (tx.empty()) {
tx.setRollbackOnly();
- onDone(tx);
+ onDone((GridNearTxPrepareResponse)null);
}
this.reads = reads;
@@ -821,7 +818,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
try {
GridDhtCacheEntry cached = (GridDhtCacheEntry)entry.cached();
- GridCacheContext<K, V> cacheCtx = cached.context();
+ GridCacheContext<?, ?> cacheCtx = cached.context();
if (entry.explicitVersion() == null) {
GridCacheMvccCandidate added = cached.candidate(version());
@@ -977,7 +974,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
GridCacheContext cacheCtx = entry.context();
- GridDhtCacheAdapter<K, V> dht = cacheCtx.isNear() ? cacheCtx.near().dht() : cacheCtx.dht();
+ GridDhtCacheAdapter<?, ?> dht = cacheCtx.isNear() ? cacheCtx.near().dht() : cacheCtx.dht();
ExpiryPolicy expiry = cacheCtx.expiryForTxEntry(entry);
@@ -1234,7 +1231,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
boolean rec = cctx.gridEvents().isRecordable(EVT_CACHE_REBALANCE_OBJECT_LOADED);
for (GridCacheEntryInfo info : res.preloadEntries()) {
- GridCacheContext<K, V> cacheCtx = cctx.cacheContext(info.cacheId());
+ GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(info.cacheId());
while (true) {
GridCacheEntryEx entry = cacheCtx.cache().entryEx(info.key());
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a238ce35/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridAbstractNearTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridAbstractNearTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridAbstractNearTxPrepareFuture.java
index 905f018..6f94f21 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridAbstractNearTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridAbstractNearTxPrepareFuture.java
@@ -153,6 +153,9 @@ public abstract class GridAbstractNearTxPrepareFuture extends GridCompoundIdenti
* @param res Response.
*/
protected final void onPrepareResponse(GridDistributedTxMapping m, GridNearTxPrepareResponse res) {
+ if (res == null)
+ return;
+
assert res.error() == null : res;
assert F.isEmpty(res.invalidPartitions()) : res;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a238ce35/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index 2fbca7b..110cca4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -535,9 +535,16 @@ public class GridNearOptimisticTxPrepareFuture extends GridAbstractNearTxPrepare
// At this point, if any new node joined, then it is
// waiting for this transaction to complete, so
// partition reassignments are not possible here.
- cctx.tm().txHandler().prepareTx(n.id(), tx, req, new CI1<GridNearTxPrepareResponse>() {
- @Override public void apply(GridNearTxPrepareResponse res) {
- fut.onResult(n.id(), res);
+ IgniteInternalFuture<GridNearTxPrepareResponse> prepFut = cctx.tm().txHandler().prepareTx(n.id(), tx, req);
+
+ prepFut.listen(new CI1<IgniteInternalFuture<GridNearTxPrepareResponse>>() {
+ @Override public void apply(IgniteInternalFuture<GridNearTxPrepareResponse> prepFut) {
+ try {
+ fut.onResult(n.id(), prepFut.get());
+ }
+ catch (IgniteCheckedException e) {
+ fut.onResult(e);
+ }
}
});
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a238ce35/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
index 84a4ab8..e3f24f5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
@@ -203,9 +203,18 @@ public class GridNearPessimisticTxPrepareFuture extends GridAbstractNearTxPrepar
add(fut);
if (node.isLocal()) {
- cctx.tm().txHandler().prepareTx(node.id(), tx, req, new CI1<GridNearTxPrepareResponse>() {
- @Override public void apply(GridNearTxPrepareResponse res) {
- fut.onResult(res);
+ IgniteInternalFuture<GridNearTxPrepareResponse> prepFut = cctx.tm().txHandler().prepareTx(node.id(),
+ tx,
+ req);
+
+ prepFut.listen(new CI1<IgniteInternalFuture<GridNearTxPrepareResponse>>() {
+ @Override public void apply(IgniteInternalFuture<GridNearTxPrepareResponse> prepFut) {
+ try {
+ fut.onResult(prepFut.get());
+ }
+ catch (IgniteCheckedException e) {
+ fut.onError(e);
+ }
}
});
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a238ce35/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index f7a43bb..a003d19 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -61,8 +61,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
/** Future. */
@GridToStringExclude
- private final AtomicReference<IgniteInternalFuture<IgniteInternalTx>> prepFut =
- new AtomicReference<>();
+ private final AtomicReference<IgniteInternalFuture<?>> prepFut = new AtomicReference<>();
/** */
@GridToStringExclude
@@ -682,7 +681,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
}
/** {@inheritDoc} */
- @Override public IgniteInternalFuture<IgniteInternalTx> prepareAsync() {
+ @Override public IgniteInternalFuture<?> prepareAsync() {
GridAbstractNearTxPrepareFuture fut = (GridAbstractNearTxPrepareFuture)prepFut.get();
if (fut == null) {
@@ -719,10 +718,10 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
cctx.mvcc().addFuture(fut);
- IgniteInternalFuture<IgniteInternalTx> prepareFut = prepFut.get();
+ IgniteInternalFuture<?> prepareFut = prepFut.get();
- prepareFut.listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() {
- @Override public void apply(IgniteInternalFuture<IgniteInternalTx> f) {
+ prepareFut.listen(new CI1<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> f) {
GridNearTxFinishFuture fut0 = commitFut.get();
try {
@@ -766,7 +765,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
cctx.mvcc().addFuture(fut);
- IgniteInternalFuture<IgniteInternalTx> prepFut = this.prepFut.get();
+ IgniteInternalFuture<?> prepFut = this.prepFut.get();
if (prepFut == null || prepFut.isDone()) {
try {
@@ -790,8 +789,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
}
}
else {
- prepFut.listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() {
- @Override public void apply(IgniteInternalFuture<IgniteInternalTx> f) {
+ prepFut.listen(new CI1<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> f) {
try {
// Check for errors in prepare future.
f.get();
@@ -834,12 +833,11 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
* @return Future that will be completed when locks are acquired.
*/
@SuppressWarnings("TypeMayBeWeakened")
- public IgniteInternalFuture<IgniteInternalTx> prepareAsyncLocal(
+ public IgniteInternalFuture<GridNearTxPrepareResponse> prepareAsyncLocal(
@Nullable Collection<IgniteTxEntry> reads,
@Nullable Collection<IgniteTxEntry> writes,
Map<UUID, Collection<UUID>> txNodes, boolean last,
- Collection<UUID> lastBackups,
- IgniteInClosure<GridNearTxPrepareResponse> completeCb
+ Collection<UUID> lastBackups
) {
if (state() != PREPARING) {
if (timedOut())
@@ -854,15 +852,14 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
init();
- GridDhtTxPrepareFuture fut = new GridDhtTxPrepareFuture<>(
+ GridDhtTxPrepareFuture fut = new GridDhtTxPrepareFuture(
cctx,
this,
IgniteUuid.randomUuid(),
Collections.<IgniteTxKey, GridCacheVersion>emptyMap(),
last,
needReturnValue() && implicit(),
- lastBackups,
- completeCb);
+ lastBackups);
try {
// At this point all the entries passed in must be enlisted in transaction because this is an
@@ -901,7 +898,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
}
}
- return fut;
+ return chainOnePhasePrepare(fut);
}
/**
@@ -917,7 +914,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
if (pessimistic())
prepareAsync();
- IgniteInternalFuture<IgniteInternalTx> prep = prepFut.get();
+ IgniteInternalFuture<?> prep = prepFut.get();
// Do not create finish future if there are no remote nodes.
if (F.isEmpty(dhtMap) && F.isEmpty(nearMap)) {
@@ -953,8 +950,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
}
}
else
- prep.listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() {
- @Override public void apply(IgniteInternalFuture<IgniteInternalTx> f) {
+ prep.listen(new CI1<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> f) {
try {
f.get(); // Check for errors of a parent future.
@@ -990,7 +987,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
cctx.mvcc().addFuture(fut);
- IgniteInternalFuture<IgniteInternalTx> prep = prepFut.get();
+ IgniteInternalFuture<?> prep = prepFut.get();
if (prep == null || prep.isDone()) {
try {
@@ -1006,8 +1003,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
fut.finish();
}
else
- prep.listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() {
- @Override public void apply(IgniteInternalFuture<IgniteInternalTx> f) {
+ prep.listen(new CI1<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> f) {
try {
f.get(); // Check for errors of a parent future.
}
@@ -1200,7 +1197,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
- @Nullable @Override public IgniteInternalFuture<IgniteInternalTx> currentPrepareFuture() {
+ @Nullable @Override public IgniteInternalFuture<?> currentPrepareFuture() {
return prepFut.get();
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a238ce35/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
index 962d973..9cf4aca 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
@@ -648,11 +648,11 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut
add(fut);
if (node.isLocal()) {
- cctx.tm().txHandler().prepareTx(node.id(), tx, req, new CI1<GridNearTxPrepareResponse>() {
- @Override public void apply(GridNearTxPrepareResponse res) {
- fut.onResult(node.id(), res);
- }
- });
+// cctx.tm().txHandler().prepareTx(node.id(), tx, req, new CI1<GridNearTxPrepareResponse>() {
+// @Override public void apply(GridNearTxPrepareResponse res) {
+// fut.onResult(node.id(), res);
+// }
+// });
}
else {
try {
@@ -755,11 +755,11 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut
// At this point, if any new node joined, then it is
// waiting for this transaction to complete, so
// partition reassignments are not possible here.
- cctx.tm().txHandler().prepareTx(n.id(), tx, req, new CI1<GridNearTxPrepareResponse>() {
- @Override public void apply(GridNearTxPrepareResponse res) {
- fut.onResult(n.id(), res);
- }
- });
+// cctx.tm().txHandler().prepareTx(n.id(), tx, req, new CI1<GridNearTxPrepareResponse>() {
+// @Override public void apply(GridNearTxPrepareResponse res) {
+// fut.onResult(n.id(), res);
+// }
+// });
}
else {
assert !tx.groupLock() : "Got group lock transaction that is mapped on remote node [tx=" + tx +
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a238ce35/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
index 8dc07cc..2bed843 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
@@ -551,7 +551,7 @@ public interface IgniteInternalTx extends AutoCloseable, GridTimeoutObject {
*
* @return Future for prepare step.
*/
- public IgniteInternalFuture<IgniteInternalTx> prepareAsync();
+ public IgniteInternalFuture<?> prepareAsync();
/**
* @param endVer End version (a.k.a. <tt>'tnc'</tt> or <tt>'transaction number counter'</tt>)
@@ -580,7 +580,7 @@ public interface IgniteInternalTx extends AutoCloseable, GridTimeoutObject {
/**
* @return Future for transaction prepare if prepare is in progress.
*/
- @Nullable public IgniteInternalFuture<IgniteInternalTx> currentPrepareFuture();
+ @Nullable public IgniteInternalFuture<?> currentPrepareFuture();
/**
* @param state Transaction state.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a238ce35/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index 82d68b3..64cc77f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -1008,7 +1008,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
}
/** {@inheritDoc} */
- @Nullable @Override public IgniteInternalFuture<IgniteInternalTx> currentPrepareFuture() {
+ @Nullable @Override public IgniteInternalFuture<?> currentPrepareFuture() {
return null;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a238ce35/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index d98b4ff..a403f28 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -58,9 +58,9 @@ public class IgniteTxHandler {
* @param req Request.
* @return Prepare future.
*/
- public IgniteInternalFuture<IgniteInternalTx> processNearTxPrepareRequest(final UUID nearNodeId,
+ public IgniteInternalFuture<?> processNearTxPrepareRequest(final UUID nearNodeId,
final GridNearTxPrepareRequest req) {
- return prepareTx(nearNodeId, null, req, null);
+ return prepareTx(nearNodeId, null, req);
}
/**
@@ -138,32 +138,28 @@ public class IgniteTxHandler {
* @param nearNodeId Near node ID that initiated transaction.
* @param locTx Optional local transaction.
* @param req Near prepare request.
- * @param completeCb Completion callback.
* @return Future for transaction.
*/
- public IgniteInternalFuture<IgniteInternalTx> prepareTx(
+ public IgniteInternalFuture<GridNearTxPrepareResponse> prepareTx(
UUID nearNodeId,
@Nullable GridNearTxLocal locTx,
- GridNearTxPrepareRequest req,
- @Nullable IgniteInClosure<GridNearTxPrepareResponse> completeCb
+ GridNearTxPrepareRequest req
) {
assert nearNodeId != null;
assert req != null;
if (locTx != null) {
- assert completeCb != null;
-
if (req.near()) {
// Make sure not to provide Near entries to DHT cache.
req.cloneEntries();
- return prepareNearTx(nearNodeId, req, completeCb);
+ return prepareNearTx(nearNodeId, req);
}
else
- return prepareColocatedTx(locTx, req, completeCb);
+ return prepareColocatedTx(locTx, req);
}
else
- return prepareNearTx(nearNodeId, req, null);
+ return prepareNearTx(nearNodeId, req);
}
/**
@@ -171,30 +167,27 @@ public class IgniteTxHandler {
*
* @param locTx Local transaction.
* @param req Near prepare request.
- * @param completeCb Completion callback.
* @return Prepare future.
*/
- private IgniteInternalFuture<IgniteInternalTx> prepareColocatedTx(
+ private IgniteInternalFuture<GridNearTxPrepareResponse> prepareColocatedTx(
final GridNearTxLocal locTx,
- final GridNearTxPrepareRequest req,
- final IgniteInClosure<GridNearTxPrepareResponse> completeCb
+ final GridNearTxPrepareRequest req
) {
IgniteInternalFuture<Object> fut = new GridFinishedFuture<>(); // TODO force preload keys.
return new GridEmbeddedFuture<>(
fut,
- new C2<Object, Exception, IgniteInternalFuture<IgniteInternalTx>>() {
- @Override public IgniteInternalFuture<IgniteInternalTx> apply(Object o, Exception ex) {
+ new C2<Object, Exception, IgniteInternalFuture<GridNearTxPrepareResponse>>() {
+ @Override public IgniteInternalFuture<GridNearTxPrepareResponse> apply(Object o, Exception ex) {
if (ex != null)
throw new GridClosureException(ex);
- IgniteInternalFuture<IgniteInternalTx> fut = locTx.prepareAsyncLocal(
+ IgniteInternalFuture<GridNearTxPrepareResponse> fut = locTx.prepareAsyncLocal(
req.reads(),
req.writes(),
req.transactionNodes(),
req.last(),
- req.lastBackups(),
- completeCb);
+ req.lastBackups());
if (locTx.isRollbackOnly())
locTx.rollbackAsync();
@@ -202,18 +195,16 @@ public class IgniteTxHandler {
return fut;
}
},
- new C2<IgniteInternalTx, Exception, IgniteInternalTx>() {
- @Nullable @Override public IgniteInternalTx apply(IgniteInternalTx tx, Exception e) {
+ new C2<GridNearTxPrepareResponse, Exception, GridNearTxPrepareResponse>() {
+ @Nullable @Override public GridNearTxPrepareResponse apply(GridNearTxPrepareResponse res, Exception e) {
if (e != null) {
- // tx can be null of exception occurred.
- if (tx != null)
- tx.setRollbackOnly(); // Just in case.
+ locTx.setRollbackOnly(); // Just in case.
if (!(e instanceof IgniteTxOptimisticCheckedException))
- U.error(log, "Failed to prepare DHT transaction: " + tx, e);
+ U.error(log, "Failed to prepare transaction: " + locTx, e);
}
- return tx;
+ return res;
}
}
);
@@ -224,13 +215,11 @@ public class IgniteTxHandler {
*
* @param nearNodeId Near node ID that initiated transaction.
* @param req Near prepare request.
- * @param completeCb Completion callback.
* @return Prepare future.
*/
- private IgniteInternalFuture<IgniteInternalTx> prepareNearTx(
+ private IgniteInternalFuture<GridNearTxPrepareResponse> prepareNearTx(
final UUID nearNodeId,
- final GridNearTxPrepareRequest req,
- IgniteInClosure<GridNearTxPrepareResponse> completeCb
+ final GridNearTxPrepareRequest req
) {
ClusterNode nearNode = ctx.node(nearNodeId);
@@ -315,7 +304,7 @@ public class IgniteTxHandler {
if (req.returnValue())
tx.needReturnValue(true);
- IgniteInternalFuture<IgniteInternalTx> fut = tx.prepareAsync(
+ IgniteInternalFuture<GridNearTxPrepareResponse> fut = tx.prepareAsync(
req.reads(),
req.writes(),
req.dhtVersions(),
@@ -323,8 +312,7 @@ public class IgniteTxHandler {
req.miniId(),
req.transactionNodes(),
req.last(),
- req.lastBackups(),
- completeCb);
+ req.lastBackups());
if (tx.isRollbackOnly()) {
try {
@@ -337,8 +325,8 @@ public class IgniteTxHandler {
final GridDhtTxLocal tx0 = tx;
- fut.listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() {
- @Override public void apply(IgniteInternalFuture<IgniteInternalTx> txFut) {
+ fut.listen(new CI1<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> txFut) {
try {
txFut.get();
}
@@ -354,7 +342,7 @@ public class IgniteTxHandler {
return fut;
}
else
- return new GridFinishedFuture<>((IgniteInternalTx)null);
+ return new GridFinishedFuture<>((GridNearTxPrepareResponse)null);
}
/**
@@ -399,8 +387,7 @@ public class IgniteTxHandler {
* @param res Response.
*/
private void processDhtTxPrepareResponse(UUID nodeId, GridDhtTxPrepareResponse res) {
- GridDhtTxPrepareFuture fut = (GridDhtTxPrepareFuture)ctx.mvcc().
- <IgniteInternalTx>future(res.version(), res.futureId());
+ GridDhtTxPrepareFuture fut = (GridDhtTxPrepareFuture)ctx.mvcc().future(res.version(), res.futureId());
if (fut == null) {
if (log.isDebugEnabled())
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a238ce35/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index 8a1d490..2122602 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -1816,7 +1816,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
if (nearVer.equals(tx.nearXidVersion())) {
TransactionState state = tx.state();
- IgniteInternalFuture<IgniteInternalTx> prepFut = tx.currentPrepareFuture();
+ IgniteInternalFuture<?> prepFut = tx.currentPrepareFuture();
if (prepFut != null && !prepFut.isDone()) {
if (log.isDebugEnabled())
@@ -1828,8 +1828,8 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
final Collection<GridCacheVersion> processedVers0 = processedVers;
- prepFut.listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() {
- @Override public void apply(IgniteInternalFuture<IgniteInternalTx> prepFut) {
+ prepFut.listen(new CI1<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> prepFut) {
if (log.isDebugEnabled())
log.debug("Transaction prepare future finished: " + tx);
@@ -2029,11 +2029,11 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
if (tx.state() == PREPARED)
commitIfPrepared(tx);
else {
- IgniteInternalFuture<IgniteInternalTx> prepFut = tx.currentPrepareFuture();
+ IgniteInternalFuture<?> prepFut = tx.currentPrepareFuture();
if (prepFut != null) {
- prepFut.listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() {
- @Override public void apply(IgniteInternalFuture<IgniteInternalTx> fut) {
+ prepFut.listen(new CI1<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> fut) {
if (tx.state() == PREPARED)
commitIfPrepared(tx);
else if (tx.setRollbackOnly())
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a238ce35/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
index 4dc371c..a346b65 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
@@ -4343,7 +4343,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
assertEquals(val1, cacheSkipStore.invoke(key, new SetValueProcessor(val2)));
assertEquals(i, map.get(key));
- assertEquals(val2, cacheSkipStore.get(key));
+ assertEquals("For key " + key, val2, cacheSkipStore.get(key));
}
for (String key : keys) {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a238ce35/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java
index ee2f16b..f996877 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java
@@ -199,7 +199,7 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends
commSpi.blockMessages(ignite(2).cluster().localNode().id()); // Do not allow to finish prepare for key2.
- IgniteInternalFuture<IgniteInternalTx> prepFut = txEx.prepareAsync();
+ IgniteInternalFuture<?> prepFut = txEx.prepareAsync();
waitPrepared(ignite(1));
@@ -360,7 +360,7 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends
commSpi.blockMessages(ignite(2).cluster().localNode().id()); // Do not allow to finish prepare for key2.
- IgniteInternalFuture<IgniteInternalTx> prepFut = txEx.prepareAsync();
+ IgniteInternalFuture<?> prepFut = txEx.prepareAsync();
waitPrepared(ignite(1));
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a238ce35/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java
index 529bd23..2acd6a3 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java
@@ -54,10 +54,9 @@ public class IgniteCacheFailoverTestSuite extends TestSuite {
suite.addTestSuite(GridCacheAtomicPrimaryWriteOrderFailoverSelfTest.class);
suite.addTestSuite(GridCacheAtomicReplicatedFailoverSelfTest.class);
- // TODO IGNITE-157.
- // suite.addTestSuite(GridCachePartitionedFailoverSelfTest.class);
- // suite.addTestSuite(GridCacheColocatedFailoverSelfTest.class);
- // suite.addTestSuite(GridCacheReplicatedFailoverSelfTest.class);
+ suite.addTestSuite(GridCachePartitionedFailoverSelfTest.class);
+ suite.addTestSuite(GridCacheColocatedFailoverSelfTest.class);
+ suite.addTestSuite(GridCacheReplicatedFailoverSelfTest.class);
suite.addTestSuite(IgniteCacheAtomicNodeJoinTest.class);
suite.addTestSuite(IgniteCacheTxNodeJoinTest.class);