You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/09 08:33:03 UTC
[14/24] incubator-ignite git commit: ignite-545: merge from
ignite-sprint-6
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index 1c02356..eb8825e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -179,9 +179,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
/** */
protected int txSize;
- /** Group lock key, if any. */
- protected IgniteTxKey grpLockKey;
-
/** */
@GridToStringExclude
private AtomicReference<GridFutureAdapter<IgniteInternalTx>> finFut = new AtomicReference<>();
@@ -233,7 +230,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
* @param isolation Isolation.
* @param timeout Timeout.
* @param txSize Transaction size.
- * @param grpLockKey Group lock key if this is group-lock transaction.
*/
protected IgniteTxAdapter(
GridCacheSharedContext<?, ?> cctx,
@@ -249,7 +245,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
boolean invalidate,
boolean storeEnabled,
int txSize,
- @Nullable IgniteTxKey grpLockKey,
@Nullable UUID subjId,
int taskNameHash
) {
@@ -269,7 +264,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
this.invalidate = invalidate;
this.storeEnabled = storeEnabled;
this.txSize = txSize;
- this.grpLockKey = grpLockKey;
this.subjId = subjId;
this.taskNameHash = taskNameHash;
@@ -294,7 +288,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
* @param isolation Isolation.
* @param timeout Timeout.
* @param txSize Transaction size.
- * @param grpLockKey Group lock key if this is group-lock transaction.
*/
protected IgniteTxAdapter(
GridCacheSharedContext<?, ?> cctx,
@@ -308,7 +301,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
TransactionIsolation isolation,
long timeout,
int txSize,
- @Nullable IgniteTxKey grpLockKey,
@Nullable UUID subjId,
int taskNameHash
) {
@@ -323,7 +315,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
this.isolation = isolation;
this.timeout = timeout;
this.txSize = txSize;
- this.grpLockKey = grpLockKey;
this.subjId = subjId;
this.taskNameHash = taskNameHash;
@@ -387,30 +378,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
/** {@inheritDoc} */
@Override public Collection<IgniteTxEntry> optimisticLockEntries() {
- if (!groupLock())
- return writeEntries();
- else {
- if (!F.isEmpty(invalidParts)) {
- assert invalidParts.size() == 1 : "Only one partition expected for group lock transaction " +
- "[tx=" + this + ", invalidParts=" + invalidParts + ']';
- assert groupLockEntry() == null : "Group lock key should be rejected " +
- "[tx=" + this + ", groupLockEntry=" + groupLockEntry() + ']';
- assert F.isEmpty(writeMap()) : "All entries should be rejected for group lock transaction " +
- "[tx=" + this + ", writes=" + writeMap() + ']';
-
- return Collections.emptyList();
- }
-
- IgniteTxEntry grpLockEntry = groupLockEntry();
-
- assert grpLockEntry != null || (near() && !local()):
- "Group lock entry was not enlisted into transaction [tx=" + this +
- ", grpLockKey=" + groupLockKey() + ']';
-
- return grpLockEntry == null ?
- Collections.<IgniteTxEntry>emptyList() :
- Collections.singletonList(grpLockEntry);
- }
+ return writeEntries();
}
/** {@inheritDoc} */
@@ -482,16 +450,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
cctx.tm().uncommitTx(this);
}
- /**
- * This method uses unchecked assignment to cast group lock key entry to transaction generic signature.
- *
- * @return Group lock tx entry.
- */
- @SuppressWarnings("unchecked")
- public IgniteTxEntry groupLockEntry() {
- return this.entry(groupLockKey());
- }
-
/** {@inheritDoc} */
@Override public UUID otherNodeId() {
return null;
@@ -603,16 +561,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
public abstract boolean isStarted();
/** {@inheritDoc} */
- @Override public boolean groupLock() {
- return grpLockKey != null;
- }
-
- /** {@inheritDoc} */
- @Override public IgniteTxKey groupLockKey() {
- return grpLockKey;
- }
-
- /** {@inheritDoc} */
@Override public int size() {
return txSize;
}
@@ -798,9 +746,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
GridCacheVersion explicit = txEntry == null ? null : txEntry.explicitVersion();
- assert !txEntry.groupLockEntry() || groupLock() : "Can not have group-locked tx entries in " +
- "non-group-lock transactions [txEntry=" + txEntry + ", tx=" + this + ']';
-
return local() && !cacheCtx.isDht() ?
entry.lockedByThread(threadId()) || (explicit != null && entry.lockedBy(explicit)) :
// If candidate is not there, then lock was explicit.
@@ -817,9 +762,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
GridCacheVersion explicit = txEntry == null ? null : txEntry.explicitVersion();
- assert !txEntry.groupLockEntry() || groupLock() : "Can not have group-locked tx entries in " +
- "non-group-lock transactions [txEntry=" + txEntry + ", tx=" + this + ']';
-
return local() && !cacheCtx.isDht() ?
entry.lockedByThreadUnsafe(threadId()) || (explicit != null && entry.lockedByUnsafe(explicit)) :
// If candidate is not there, then lock was explicit.
@@ -1008,7 +950,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
}
/** {@inheritDoc} */
- @Nullable @Override public IgniteInternalFuture<IgniteInternalTx> currentPrepareFuture() {
+ @Nullable @Override public IgniteInternalFuture<?> currentPrepareFuture() {
return null;
}
@@ -1554,7 +1496,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
/** {@inheritDoc} */
@Override public String toString() {
return GridToStringBuilder.toString(IgniteTxAdapter.class, this,
- "duration", (U.currentTimeMillis() - startTime) + "ms", "grpLock", groupLock(),
+ "duration", (U.currentTimeMillis() - startTime) + "ms",
"onePhaseCommit", onePhaseCommit);
}
@@ -1779,16 +1721,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
}
/** {@inheritDoc} */
- @Override public boolean groupLock() {
- return false;
- }
-
- /** {@inheritDoc} */
- @Nullable @Override public IgniteTxKey groupLockKey() {
- throw new IllegalStateException("Deserialized transaction can only be used as read-only.");
- }
-
- /** {@inheritDoc} */
@Override public boolean markPreparing() {
throw new IllegalStateException("Deserialized transaction can only be used as read-only.");
}
@@ -1964,7 +1896,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
}
/** {@inheritDoc} */
- @Nullable @Override public <K, V> GridTuple<CacheObject> peek(GridCacheContext ctx,
+ @Nullable @Override public GridTuple<CacheObject> peek(GridCacheContext ctx,
boolean failFast,
KeyCacheObject key,
@Nullable CacheEntryPredicate[] filter) {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
index 0d7aeaf..247d350 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
@@ -137,9 +137,6 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
@GridDirectTransient
private boolean locMapped;
- /** Group lock entry flag. */
- private boolean grpLock;
-
/** Expiry policy. */
@GridDirectTransient
private ExpiryPolicy expiryPlc;
@@ -277,22 +274,6 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
}
/**
- * @return {@code True} if this entry was added in group lock transaction and
- * this is not a group lock entry.
- */
- public boolean groupLockEntry() {
- return grpLock;
- }
-
- /**
- * @param grpLock {@code True} if this entry was added in group lock transaction and
- * this is not a group lock entry.
- */
- public void groupLockEntry(boolean grpLock) {
- this.grpLock = grpLock;
- }
-
- /**
* @param ctx Context.
* @return Clean copy of this entry.
*/
@@ -311,7 +292,6 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
cp.ttl = ttl;
cp.conflictExpireTime = conflictExpireTime;
cp.explicitVer = explicitVer;
- cp.grpLock = grpLock;
cp.conflictVer = conflictVer;
cp.expiryPlc = expiryPlc;
cp.flags = flags;
@@ -851,30 +831,24 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
writer.incrementState();
case 7:
- if (!writer.writeBoolean("grpLock", grpLock))
- return false;
-
- writer.incrementState();
-
- case 8:
if (!writer.writeMessage("key", key))
return false;
writer.incrementState();
- case 9:
+ case 8:
if (!writer.writeByteArray("transformClosBytes", transformClosBytes))
return false;
writer.incrementState();
- case 10:
+ case 9:
if (!writer.writeLong("ttl", ttl))
return false;
writer.incrementState();
- case 11:
+ case 10:
if (!writer.writeMessage("val", val))
return false;
@@ -950,14 +924,6 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
reader.incrementState();
case 7:
- grpLock = reader.readBoolean("grpLock");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 8:
key = reader.readMessage("key");
if (!reader.isLastRead())
@@ -965,7 +931,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
reader.incrementState();
- case 9:
+ case 8:
transformClosBytes = reader.readByteArray("transformClosBytes");
if (!reader.isLastRead())
@@ -973,7 +939,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
reader.incrementState();
- case 10:
+ case 9:
ttl = reader.readLong("ttl");
if (!reader.isLastRead())
@@ -981,7 +947,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
reader.incrementState();
- case 11:
+ case 10:
val = reader.readMessage("val");
if (!reader.isLastRead())
@@ -1001,7 +967,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 12;
+ return 11;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index 6b45fee..f466bf2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -31,7 +31,6 @@ import org.apache.ignite.internal.util.future.*;
import org.apache.ignite.internal.util.lang.*;
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
import org.jetbrains.annotations.*;
import java.util.*;
@@ -53,9 +52,14 @@ public class IgniteTxHandler {
/** Shared cache context. */
private GridCacheSharedContext<?, ?> ctx;
- public IgniteInternalFuture<IgniteInternalTx> processNearTxPrepareRequest(final UUID nearNodeId,
+ /**
+ * @param nearNodeId Node ID.
+ * @param req Request.
+ * @return Prepare future.
+ */
+ public IgniteInternalFuture<?> processNearTxPrepareRequest(final UUID nearNodeId,
final GridNearTxPrepareRequest req) {
- return prepareTx(nearNodeId, null, req, null);
+ return prepareTx(nearNodeId, null, req);
}
/**
@@ -114,16 +118,16 @@ public class IgniteTxHandler {
}
});
- ctx.io().addHandler(0, GridCacheOptimisticCheckPreparedTxRequest.class,
- new CI2<UUID, GridCacheOptimisticCheckPreparedTxRequest>() {
- @Override public void apply(UUID nodeId, GridCacheOptimisticCheckPreparedTxRequest req) {
+ ctx.io().addHandler(0, GridCacheTxRecoveryRequest.class,
+ new CI2<UUID, GridCacheTxRecoveryRequest>() {
+ @Override public void apply(UUID nodeId, GridCacheTxRecoveryRequest req) {
processCheckPreparedTxRequest(nodeId, req);
}
});
- ctx.io().addHandler(0, GridCacheOptimisticCheckPreparedTxResponse.class,
- new CI2<UUID, GridCacheOptimisticCheckPreparedTxResponse>() {
- @Override public void apply(UUID nodeId, GridCacheOptimisticCheckPreparedTxResponse res) {
+ ctx.io().addHandler(0, GridCacheTxRecoveryResponse.class,
+ new CI2<UUID, GridCacheTxRecoveryResponse>() {
+ @Override public void apply(UUID nodeId, GridCacheTxRecoveryResponse res) {
processCheckPreparedTxResponse(nodeId, res);
}
});
@@ -135,29 +139,26 @@ public class IgniteTxHandler {
* @param req Near prepare request.
* @return Future for transaction.
*/
- public IgniteInternalFuture<IgniteInternalTx> prepareTx(
+ public IgniteInternalFuture<GridNearTxPrepareResponse> prepareTx(
UUID nearNodeId,
@Nullable GridNearTxLocal locTx,
- GridNearTxPrepareRequest req,
- @Nullable IgniteInClosure<GridNearTxPrepareResponse> completeCb
+ GridNearTxPrepareRequest req
) {
assert nearNodeId != null;
assert req != null;
if (locTx != null) {
- assert completeCb != null;
-
if (req.near()) {
// Make sure not to provide Near entries to DHT cache.
req.cloneEntries();
- return prepareNearTx(nearNodeId, req, completeCb);
+ return prepareNearTx(nearNodeId, req);
}
else
- return prepareColocatedTx(locTx, req, completeCb);
+ return prepareColocatedTx(locTx, req);
}
else
- return prepareNearTx(nearNodeId, req, null);
+ return prepareNearTx(nearNodeId, req);
}
/**
@@ -167,28 +168,25 @@ public class IgniteTxHandler {
* @param req Near prepare request.
* @return Prepare future.
*/
- private IgniteInternalFuture<IgniteInternalTx> prepareColocatedTx(
+ private IgniteInternalFuture<GridNearTxPrepareResponse> prepareColocatedTx(
final GridNearTxLocal locTx,
- final GridNearTxPrepareRequest req,
- final IgniteInClosure<GridNearTxPrepareResponse> completeCb
+ final GridNearTxPrepareRequest req
) {
-
IgniteInternalFuture<Object> fut = new GridFinishedFuture<>(); // TODO force preload keys.
return new GridEmbeddedFuture<>(
fut,
- new C2<Object, Exception, IgniteInternalFuture<IgniteInternalTx>>() {
- @Override public IgniteInternalFuture<IgniteInternalTx> apply(Object o, Exception ex) {
+ new C2<Object, Exception, IgniteInternalFuture<GridNearTxPrepareResponse>>() {
+ @Override public IgniteInternalFuture<GridNearTxPrepareResponse> apply(Object o, Exception ex) {
if (ex != null)
throw new GridClosureException(ex);
- IgniteInternalFuture<IgniteInternalTx> fut = locTx.prepareAsyncLocal(
+ IgniteInternalFuture<GridNearTxPrepareResponse> fut = locTx.prepareAsyncLocal(
req.reads(),
req.writes(),
req.transactionNodes(),
req.last(),
- req.lastBackups(),
- completeCb);
+ req.lastBackups());
if (locTx.isRollbackOnly())
locTx.rollbackAsync();
@@ -196,18 +194,16 @@ public class IgniteTxHandler {
return fut;
}
},
- new C2<IgniteInternalTx, Exception, IgniteInternalTx>() {
- @Nullable @Override public IgniteInternalTx apply(IgniteInternalTx tx, Exception e) {
+ new C2<GridNearTxPrepareResponse, Exception, GridNearTxPrepareResponse>() {
+ @Nullable @Override public GridNearTxPrepareResponse apply(GridNearTxPrepareResponse res, Exception e) {
if (e != null) {
- // tx can be null of exception occurred.
- if (tx != null)
- tx.setRollbackOnly(); // Just in case.
+ locTx.setRollbackOnly(); // Just in case.
if (!(e instanceof IgniteTxOptimisticCheckedException))
- U.error(log, "Failed to prepare DHT transaction: " + tx, e);
+ U.error(log, "Failed to prepare transaction: " + locTx, e);
}
- return tx;
+ return res;
}
}
);
@@ -220,10 +216,9 @@ public class IgniteTxHandler {
* @param req Near prepare request.
* @return Prepare future.
*/
- private IgniteInternalFuture<IgniteInternalTx> prepareNearTx(
+ private IgniteInternalFuture<GridNearTxPrepareResponse> prepareNearTx(
final UUID nearNodeId,
- final GridNearTxPrepareRequest req,
- IgniteInClosure<GridNearTxPrepareResponse> completeCb
+ final GridNearTxPrepareRequest req
) {
ClusterNode nearNode = ctx.node(nearNodeId);
@@ -276,8 +271,6 @@ public class IgniteTxHandler {
req.isInvalidate(),
false,
req.txSize(),
- req.groupLockKey(),
- req.partitionLock(),
req.transactionNodes(),
req.subjectId(),
req.taskNameHash()
@@ -308,7 +301,7 @@ public class IgniteTxHandler {
if (req.returnValue())
tx.needReturnValue(true);
- IgniteInternalFuture<IgniteInternalTx> fut = tx.prepareAsync(
+ IgniteInternalFuture<GridNearTxPrepareResponse> fut = tx.prepareAsync(
req.reads(),
req.writes(),
req.dhtVersions(),
@@ -316,8 +309,7 @@ public class IgniteTxHandler {
req.miniId(),
req.transactionNodes(),
req.last(),
- req.lastBackups(),
- completeCb);
+ req.lastBackups());
if (tx.isRollbackOnly()) {
try {
@@ -330,8 +322,8 @@ public class IgniteTxHandler {
final GridDhtTxLocal tx0 = tx;
- fut.listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() {
- @Override public void apply(IgniteInternalFuture<IgniteInternalTx> txFut) {
+ fut.listen(new CI1<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> txFut) {
try {
txFut.get();
}
@@ -347,7 +339,7 @@ public class IgniteTxHandler {
return fut;
}
else
- return new GridFinishedFuture<>((IgniteInternalTx)null);
+ return new GridFinishedFuture<>((GridNearTxPrepareResponse)null);
}
/**
@@ -355,7 +347,7 @@ public class IgniteTxHandler {
* @param res Response.
*/
private void processNearTxPrepareResponse(UUID nodeId, GridNearTxPrepareResponse res) {
- GridNearTxPrepareFuture fut = (GridNearTxPrepareFuture)ctx.mvcc()
+ GridNearTxPrepareFutureAdapter fut = (GridNearTxPrepareFutureAdapter)ctx.mvcc()
.<IgniteInternalTx>future(res.version(), res.futureId());
if (fut == null) {
@@ -392,8 +384,7 @@ public class IgniteTxHandler {
* @param res Response.
*/
private void processDhtTxPrepareResponse(UUID nodeId, GridDhtTxPrepareResponse res) {
- GridDhtTxPrepareFuture fut = (GridDhtTxPrepareFuture)ctx.mvcc().
- <IgniteInternalTx>future(res.version(), res.futureId());
+ GridDhtTxPrepareFuture fut = (GridDhtTxPrepareFuture)ctx.mvcc().future(res.version(), res.futureId());
if (fut == null) {
if (log.isDebugEnabled())
@@ -437,6 +428,7 @@ public class IgniteTxHandler {
/**
* @param nodeId Node ID.
+ * @param locTx Local transaction.
* @param req Request.
* @return Future.
*/
@@ -560,8 +552,6 @@ public class IgniteTxHandler {
req.isInvalidate(),
req.storeEnabled(),
req.txSize(),
- req.groupLockKey(),
- false,
null,
req.subjectId(),
req.taskNameHash()));
@@ -1008,7 +998,6 @@ public class IgniteTxHandler {
req.isInvalidate(),
req.timeout(),
req.writes() != null ? Math.max(req.writes().size(), req.txSize()) : req.txSize(),
- req.groupLockKey(),
req.nearXidVersion(),
req.transactionNodes(),
req.subjectId(),
@@ -1094,6 +1083,7 @@ public class IgniteTxHandler {
}
/**
+ * @param cacheCtx Context.
* @param key Key
* @param ver Version.
* @throws IgniteCheckedException If invalidate failed.
@@ -1141,7 +1131,6 @@ public class IgniteTxHandler {
req.timeout(),
req.nearWrites(),
req.txSize(),
- req.groupLockKey(),
req.subjectId(),
req.taskNameHash()
);
@@ -1178,12 +1167,13 @@ public class IgniteTxHandler {
* @param req Request.
*/
protected void processCheckPreparedTxRequest(final UUID nodeId,
- final GridCacheOptimisticCheckPreparedTxRequest req)
+ final GridCacheTxRecoveryRequest req)
{
if (log.isDebugEnabled())
log.debug("Processing check prepared transaction requests [nodeId=" + nodeId + ", req=" + req + ']');
- IgniteInternalFuture<Boolean> fut = ctx.tm().txsPreparedOrCommitted(req.nearXidVersion(), req.transactions());
+ IgniteInternalFuture<Boolean> fut = req.nearTxCheck() ? ctx.tm().txCommitted(req.nearXidVersion()) :
+ ctx.tm().txsPreparedOrCommitted(req.nearXidVersion(), req.transactions());
if (fut == null || fut.isDone()) {
boolean prepared;
@@ -1225,10 +1215,10 @@ public class IgniteTxHandler {
* @param prepared {@code True} if all transaction prepared or committed.
*/
private void sendCheckPreparedResponse(UUID nodeId,
- GridCacheOptimisticCheckPreparedTxRequest req,
+ GridCacheTxRecoveryRequest req,
boolean prepared) {
- GridCacheOptimisticCheckPreparedTxResponse res =
- new GridCacheOptimisticCheckPreparedTxResponse(req.version(), req.futureId(), req.miniId(), prepared);
+ GridCacheTxRecoveryResponse res =
+ new GridCacheTxRecoveryResponse(req.version(), req.futureId(), req.miniId(), prepared);
try {
if (log.isDebugEnabled())
@@ -1250,11 +1240,11 @@ public class IgniteTxHandler {
* @param nodeId Node ID.
* @param res Response.
*/
- protected void processCheckPreparedTxResponse(UUID nodeId, GridCacheOptimisticCheckPreparedTxResponse res) {
+ protected void processCheckPreparedTxResponse(UUID nodeId, GridCacheTxRecoveryResponse res) {
if (log.isDebugEnabled())
log.debug("Processing check prepared transaction response [nodeId=" + nodeId + ", res=" + res + ']');
- GridCacheOptimisticCheckPreparedTxFuture fut = (GridCacheOptimisticCheckPreparedTxFuture)ctx.mvcc().
+ GridCacheTxRecoveryFuture fut = (GridCacheTxRecoveryFuture)ctx.mvcc().
<Boolean>future(res.version(), res.futureId());
if (fut == null) {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index dfce09c..d57786e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -86,9 +86,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
/** Base for completed versions. */
private GridCacheVersion completedBase;
- /** Flag indicating partition lock in group lock transaction. */
- private boolean partLock;
-
/** Flag indicating that transformed values should be sent to remote nodes. */
private boolean sndTransformedVals;
@@ -123,8 +120,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
* @param isolation Isolation.
* @param timeout Timeout.
* @param txSize Expected transaction size.
- * @param grpLockKey Group lock key if this is a group-lock transaction.
- * @param partLock {@code True} if this is a group-lock transaction and lock is acquired for whole partition.
*/
protected IgniteTxLocalAdapter(
GridCacheSharedContext cctx,
@@ -139,17 +134,11 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
boolean invalidate,
boolean storeEnabled,
int txSize,
- @Nullable IgniteTxKey grpLockKey,
- boolean partLock,
@Nullable UUID subjId,
int taskNameHash
) {
super(cctx, xidVer, implicit, implicitSingle, /*local*/true, sys, plc, concurrency, isolation, timeout,
- invalidate, storeEnabled, txSize, grpLockKey, subjId, taskNameHash);
-
- assert !partLock || grpLockKey != null;
-
- this.partLock = partLock;
+ invalidate, storeEnabled, txSize, subjId, taskNameHash);
minVer = xidVer;
}
@@ -182,11 +171,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
}
/** {@inheritDoc} */
- @Override public boolean partitionLock() {
- return partLock;
- }
-
- /** {@inheritDoc} */
@Override public Throwable commitError() {
return commitErr.get();
}
@@ -330,7 +314,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
/** {@inheritDoc} */
@SuppressWarnings({"RedundantTypeArguments"})
- @Nullable @Override public <K, V> GridTuple<CacheObject> peek(
+ @Nullable @Override public GridTuple<CacheObject> peek(
GridCacheContext cacheCtx,
boolean failFast,
KeyCacheObject key,
@@ -499,7 +483,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
CacheStoreManager store = store();
if (store != null && store.isWriteThrough() && storeEnabled() &&
- (!internal() || groupLock()) && (near() || store.isWriteToStoreFromDht())) {
+ !internal() && (near() || store.isWriteToStoreFromDht())) {
try {
if (writeEntries != null) {
Map<Object, IgniteBiTuple<Object, GridCacheVersion>> putMap = null;
@@ -679,9 +663,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
if (!empty || colocated())
cctx.tm().addCommittedTx(this);
- if (groupLock())
- addGroupTxMapping(writeSet());
-
if (!empty) {
batchStoreCommit(writeMap().values());
@@ -738,7 +719,9 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
// in order to keep near entries on backup nodes until
// backup remote transaction completes.
if (cacheCtx.isNear()) {
- ((GridNearCacheEntry)cached).recordDhtVersion(txEntry.dhtVersion());
+ if (txEntry.op() == CREATE || txEntry.op() == UPDATE ||
+ txEntry.op() == DELETE || txEntry.op() == TRANSFORM)
+ ((GridNearCacheEntry)cached).recordDhtVersion(txEntry.dhtVersion());
if ((txEntry.op() == CREATE || txEntry.op() == UPDATE) &&
txEntry.conflictExpireTime() == CU.EXPIRE_TIME_CALCULATE) {
@@ -909,10 +892,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
log.debug("Ignoring READ entry when committing: " + txEntry);
}
else {
- assert !groupLock() || txEntry.groupLockEntry() || ownsLock(txEntry.cached()):
- "Transaction does not own lock for group lock entry during commit [tx=" +
- this + ", txEntry=" + txEntry + ']';
-
if (conflictCtx == null || !conflictCtx.isUseOld()) {
if (txEntry.ttl() != CU.TTL_NOT_CHANGED)
cached.updateTtl(null, txEntry.ttl());
@@ -927,7 +906,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
// we are not changing obsolete entries.
// (innerSet and innerRemove will throw an exception
// if an entry is obsolete).
- if (txEntry.op() != READ && !txEntry.groupLockEntry())
+ if (txEntry.op() != READ)
checkCommitLocks(cached);
// Break out of while loop.
@@ -996,7 +975,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
else {
CacheStoreManager store = store();
- if (store != null && (!internal() || groupLock())) {
+ if (store != null && !internal()) {
try {
store.sessionEnd(this, true);
}
@@ -1102,7 +1081,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
CacheStoreManager store = store();
if (store != null && (near() || store.isWriteToStoreFromDht())) {
- if (!internal() || groupLock())
+ if (!internal())
store.sessionEnd(this, false);
}
}
@@ -1152,8 +1131,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
cacheCtx.checkSecurity(SecurityPermission.CACHE_READ);
- groupLockSanityCheck(cacheCtx, keys);
-
boolean single = keysCnt == 1;
Collection<KeyCacheObject> lockKeys = null;
@@ -1185,7 +1162,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
cacheCtx.addResult(map, key, val, skipVals, keepCacheObjects, deserializePortable, false);
}
else {
- assert txEntry.op() == TRANSFORM || (groupLock() && !txEntry.groupLockEntry());
+ assert txEntry.op() == TRANSFORM;
while (true) {
try {
@@ -1263,7 +1240,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
CacheObject val = null;
- if (!pessimistic() || readCommitted() || groupLock() && !skipVals) {
+ if (!pessimistic() || readCommitted() && !skipVals) {
IgniteCacheExpiryPolicy accessPlc =
optimistic() ? accessPolicy(cacheCtx, txKey, expiryPlc) : null;
@@ -1311,8 +1288,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
null,
skipStore);
- if (groupLock())
- txEntry.groupLockEntry(true);
// As optimization, mark as checked immediately
// for non-pessimistic if value is not null.
@@ -1527,7 +1502,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
nextVer = cctx.versions().next(topologyVersion());
while (true) {
- assert txEntry != null || readCommitted() || groupLock() || skipVals;
+ assert txEntry != null || readCommitted() || skipVals;
GridCacheEntryEx e = txEntry == null ? entryEx(cacheCtx, txKey) : txEntry.cached();
@@ -1544,8 +1519,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
log.debug("Got removed entry in transaction getAll method " +
"(will try again): " + e);
- if (pessimistic() && !readCommitted() && !isRollbackOnly() &&
- (!groupLock() || F.eq(e.key(), groupLockKey()))) {
+ if (pessimistic() && !readCommitted() && !isRollbackOnly()) {
U.error(log, "Inconsistent transaction state (entry got removed while " +
"holding lock) [entry=" + e + ", tx=" + IgniteTxLocalAdapter.this + "]");
@@ -1563,7 +1537,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
// In pessimistic mode, we should always be able to set.
assert set || !pessimistic();
- if (readCommitted() || groupLock() || skipVals) {
+ if (readCommitted() || skipVals) {
cacheCtx.evicts().touch(e, topologyVersion());
if (visibleVal != null) {
@@ -1654,7 +1628,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
return new GridFinishedFuture<>(retMap);
// Handle locks.
- if (pessimistic() && !readCommitted() && !groupLock() && !skipVals) {
+ if (pessimistic() && !readCommitted() && !skipVals) {
if (expiryPlc == null)
expiryPlc = cacheCtx.expiry();
@@ -1760,7 +1734,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
}
}
- if (!missed.isEmpty() && (cacheCtx.isReplicated() || cacheCtx.isLocal())) {
+ if (!missed.isEmpty() && cacheCtx.isLocal()) {
return checkMissed(cacheCtx,
retMap,
missed,
@@ -1811,7 +1785,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
}
}
else {
- assert optimistic() || readCommitted() || groupLock() || skipVals;
+ assert optimistic() || readCommitted() || skipVals;
final Collection<KeyCacheObject> redos = new ArrayList<>();
@@ -2036,11 +2010,9 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
if (invokeMap != null)
transform = true;
- groupLockSanityCheck(cacheCtx, keys);
-
for (Object key : keys) {
if (key == null) {
- setRollbackOnly();
+ rollback();
throw new NullPointerException("Null key.");
}
@@ -2191,15 +2163,12 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
drVer,
skipStore);
- if (!implicit() && readCommitted())
+ if (!implicit() && readCommitted() && !cacheCtx.offheapTiered())
cacheCtx.evicts().touch(entry, topologyVersion());
- if (groupLock() && !lockOnly)
- txEntry.groupLockEntry(true);
-
enlisted.add(cacheKey);
- if ((!pessimistic() && !implicit()) || (groupLock() && !lockOnly)) {
+ if (!pessimistic() && !implicit()) {
txEntry.markValid();
if (old == null) {
@@ -2644,7 +2613,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
null,
opCtx != null && opCtx.skipStore());
- if (pessimistic() && !groupLock()) {
+ if (pessimistic()) {
// Loose all skipped.
final Set<KeyCacheObject> loaded = loadFut.get();
@@ -2867,7 +2836,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
// Acquire locks only after having added operation to the write set.
// Otherwise, during rollback we will not know whether locks need
// to be rolled back.
- if (pessimistic() && !groupLock()) {
+ if (pessimistic()) {
// Loose all skipped.
final Collection<KeyCacheObject> passedKeys = F.view(enlisted, F0.notIn(loadFut.get()));
@@ -2934,19 +2903,17 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
assert loadFut.isDone();
return nonInterruptable(commitAsync().chain(new CX1<IgniteInternalFuture<IgniteInternalTx>, GridCacheReturn>() {
- @Override
- public GridCacheReturn applyx(IgniteInternalFuture<IgniteInternalTx> txFut)
+ @Override public GridCacheReturn applyx(IgniteInternalFuture<IgniteInternalTx> txFut)
throws IgniteCheckedException {
txFut.get();
- return (GridCacheReturn)implicitRes;
+ return implicitRes;
}
}));
}
else
return nonInterruptable(loadFut.chain(new CX1<IgniteInternalFuture<Set<KeyCacheObject>>, GridCacheReturn>() {
- @Override
- public GridCacheReturn applyx(IgniteInternalFuture<Set<KeyCacheObject>> f)
+ @Override public GridCacheReturn applyx(IgniteInternalFuture<Set<KeyCacheObject>> f)
throws IgniteCheckedException {
f.get();
@@ -2987,108 +2954,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
}
/**
- * Adds key mapping to transaction.
- * @param keys Keys to add.
- */
- protected void addGroupTxMapping(Collection<IgniteTxKey> keys) {
- // No-op. This method is overriden in transactions that store key to remote node mapping
- // for commit.
- }
-
- /**
- * Checks that affinity keys are enlisted in group transaction on start.
- *
- * @param cacheCtx Cache context.
- * @param keys Keys to check.
- * @throws IgniteCheckedException If sanity check failed.
- */
- private <K> void groupLockSanityCheck(GridCacheContext cacheCtx, Iterable<? extends K> keys)
- throws IgniteCheckedException
- {
- if (groupLock() && cctx.kernalContext().config().isCacheSanityCheckEnabled()) {
- // Note that affinity is called without mapper on purpose.
- int affinityPart = cacheCtx.config().getAffinity().partition(grpLockKey.key());
-
- for (K key : keys) {
- if (partitionLock()) {
- int part = cacheCtx.affinity().partition(key);
-
- if (affinityPart != part)
- throw new IgniteCheckedException("Failed to enlist key into group-lock transaction (given " +
- "key does not belong to locked partition) [key=" + key + ", affinityPart=" + affinityPart +
- ", part=" + part + ", groupLockKey=" + grpLockKey + ']');
- }
- else {
- KeyCacheObject cacheKey =
- cacheCtx.toCacheKeyObject(cacheCtx.config().getAffinityMapper().affinityKey(key));
-
- IgniteTxKey affinityKey = cacheCtx.txKey(cacheKey);
-
- if (!grpLockKey.equals(affinityKey))
- throw new IgniteCheckedException("Failed to enlist key into group-lock transaction (affinity key was " +
- "not enlisted to transaction on start) [key=" + key + ", affinityKey=" + affinityKey +
- ", groupLockKey=" + grpLockKey + ']');
- }
- }
- }
- }
-
- /**
- * Performs keys locking for affinity-based group lock transactions.
- * @return Lock future.
- */
- @Override public <K> IgniteInternalFuture<?> groupLockAsync(GridCacheContext cacheCtx, Collection<K> keys) {
- assert groupLock();
-
- try {
- init();
-
- GridCacheReturn ret = new GridCacheReturn(localResult(), false);
-
- Collection<KeyCacheObject> enlisted = new ArrayList<>();
-
- Set<KeyCacheObject> skipped = enlistWrite(
- cacheCtx,
- keys,
- /** cached entry */null,
- /** expiry - leave unchanged */null,
- /** implicit */false,
- /** lookup map */null,
- /** invoke map */null,
- /** invoke arguments */null,
- /** retval */false,
- /** lock only */true,
- CU.empty0(),
- ret,
- enlisted,
- null,
- null,
- cacheCtx.skipStore()
- ).get();
-
- // No keys should be skipped with empty filter.
- assert F.isEmpty(skipped);
-
- // Lock group key in pessimistic mode only.
- return pessimistic() ?
- cacheCtx.cache().txLockAsync(enlisted,
- lockTimeout(),
- this,
- false,
- false,
- isolation,
- isInvalidate(),
- -1L) :
- new GridFinishedFuture<>();
- }
- catch (IgniteCheckedException e) {
- setRollbackOnly();
-
- return new GridFinishedFuture<Object>(e);
- }
- }
-
- /**
* Initializes read map.
*
* @return {@code True} if transaction was successfully started.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
index 61041e1..14562ab 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
@@ -58,14 +58,9 @@ public interface IgniteTxLocalEx extends IgniteInternalTx {
public void userRollback() throws IgniteCheckedException;
/**
- * @return Group lock entry if this is a group-lock transaction.
- */
- @Nullable public IgniteTxEntry groupLockEntry();
-
- /**
* @param cacheCtx Cache context.
* @param keys Keys to get.
- * @param cached Cached entry if this method is called from entry wrapper.
+ * @param cached Cached entry if this method is called from entry wrapper
* Cached entry is passed if and only if there is only one key in collection of keys.
* @param deserializePortable Deserialize portable flag.
* @param skipVals Skip values flag.
@@ -144,20 +139,6 @@ public interface IgniteTxLocalEx extends IgniteInternalTx {
Map<KeyCacheObject, GridCacheVersion> drMap);
/**
- * Performs keys locking for affinity-based group lock transactions.
- *
- * @param cacheCtx Cache context.
- * @param keys Keys to lock.
- * @return Lock future.
- */
- public <K> IgniteInternalFuture<?> groupLockAsync(GridCacheContext cacheCtx, Collection<K> keys);
-
- /**
- * @return {@code True} if keys from the same partition are allowed to be enlisted in group-lock transaction.
- */
- public boolean partitionLock();
-
- /**
* @return Return value for
*/
public GridCacheReturn implicitSingleResult();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index c494602..4666cca 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -347,8 +347,6 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
* @param isolation Isolation.
* @param timeout transaction timeout.
* @param txSize Expected transaction size.
- * @param grpLockKey Group lock key if this is a group-lock transaction.
- * @param partLock {@code True} if partition is locked.
* @return New transaction.
*/
public IgniteTxLocalAdapter newTx(
@@ -359,9 +357,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
TransactionIsolation isolation,
long timeout,
boolean storeEnabled,
- int txSize,
- @Nullable IgniteTxKey grpLockKey,
- boolean partLock) {
+ int txSize) {
assert sysCacheCtx == null || sysCacheCtx.systemTx();
UUID subjId = null; // TODO GG-9141 how to get subj ID?
@@ -379,8 +375,6 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
timeout,
storeEnabled,
txSize,
- grpLockKey,
- partLock,
subjId,
taskNameHash);
@@ -639,6 +633,30 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
}
/**
+ * @return Any transaction associated with the current thread.
+ */
+ public IgniteInternalTx anyActiveThreadTx() {
+ long threadId = Thread.currentThread().getId();
+
+ IgniteInternalTx tx = threadMap.get(threadId);
+
+ if (tx != null && tx.topologyVersionSnapshot() != null)
+ return tx;
+
+ for (GridCacheContext cacheCtx : cctx.cache().context().cacheContexts()) {
+ if (!cacheCtx.systemTx())
+ continue;
+
+ tx = sysThreadMap.get(new TxThreadKey(threadId, cacheCtx.cacheId()));
+
+ if (tx != null && tx.topologyVersionSnapshot() != null)
+ return tx;
+ }
+
+ return null;
+ }
+
+ /**
* @return Local transaction.
*/
@Nullable public IgniteInternalTx localTxx() {
@@ -727,14 +745,6 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
}
/**
- * @param txId Transaction ID.
- * @return Transaction with given ID.
- */
- @Nullable public IgniteInternalTx txx(GridCacheVersion txId) {
- return idMap.get(txId);
- }
-
- /**
* Handles prepare stage of 2PC.
*
* @param tx Transaction to prepare.
@@ -1215,13 +1225,10 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
cctx.kernalContext().dataStructures().onTxCommitted(tx);
// 4. Unlock write resources.
- if (tx.groupLock())
- unlockGroupLocks(tx);
- else
- unlockMultiple(tx, tx.writeEntries());
+ unlockMultiple(tx, tx.writeEntries());
// 5. For pessimistic transaction, unlock read resources if required.
- if (tx.pessimistic() && !tx.readCommitted() && !tx.groupLock())
+ if (tx.pessimistic() && !tx.readCommitted())
unlockMultiple(tx, tx.readEntries());
// 6. Notify evictions.
@@ -1449,7 +1456,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
* @param tx Transaction to notify evictions for.
*/
private void notifyEvitions(IgniteInternalTx tx) {
- if (tx.internal() && !tx.groupLock())
+ if (tx.internal())
return;
for (IgniteTxEntry txEntry : tx.allEntries())
@@ -1625,51 +1632,6 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
}
/**
- * Unlocks entries locked by group transaction.
- *
- * @param txx Transaction.
- */
- @SuppressWarnings("unchecked")
- private void unlockGroupLocks(IgniteInternalTx txx) {
- IgniteTxKey grpLockKey = txx.groupLockKey();
-
- assert grpLockKey != null;
-
- if (grpLockKey == null)
- return;
-
- IgniteTxEntry txEntry = txx.entry(grpLockKey);
-
- assert txEntry != null || (txx.near() && !txx.local());
-
- if (txEntry != null) {
- GridCacheContext cacheCtx = txEntry.context();
-
- // Group-locked entries must be locked.
- while (true) {
- try {
- GridCacheEntryEx entry = txEntry.cached();
-
- assert entry != null;
-
- entry.txUnlock(txx);
-
- break;
- }
- catch (GridCacheEntryRemovedException ignored) {
- if (log.isDebugEnabled())
- log.debug("Got removed entry in TM unlockGroupLocks(..) method (will retry): " + txEntry);
-
- GridCacheAdapter cache = cacheCtx.cache();
-
- // Renew cache entry.
- txEntry.cached(cache.entryEx(txEntry.key()));
- }
- }
- }
- }
-
- /**
* @param tx Owning transaction.
* @param entries Entries to unlock.
*/
@@ -1770,6 +1732,45 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
}
/**
+ * @param ver Version.
+ * @return Future for flag indicating if transactions was committed.
+ */
+ public IgniteInternalFuture<Boolean> txCommitted(GridCacheVersion ver) {
+ final GridFutureAdapter<Boolean> resFut = new GridFutureAdapter<>();
+
+ final IgniteInternalTx tx = cctx.tm().tx(ver);
+
+ if (tx != null) {
+ assert tx.near() && tx.local() : tx;
+
+ if (log.isDebugEnabled())
+ log.debug("Found near transaction, will wait for completion: " + tx);
+
+ tx.finishFuture().listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() {
+ @Override public void apply(IgniteInternalFuture<IgniteInternalTx> fut) {
+ TransactionState state = tx.state();
+
+ if (log.isDebugEnabled())
+ log.debug("Near transaction finished with state: " + state);
+
+ resFut.onDone(state == COMMITTED);
+ }
+ });
+
+ return resFut;
+ }
+
+ Boolean committed = completedVers.get(ver);
+
+ if (log.isDebugEnabled())
+ log.debug("Near transaction committed: " + committed);
+
+ resFut.onDone(committed != null && committed);
+
+ return resFut;
+ }
+
+ /**
* @param nearVer Near version ID.
* @param txNum Number of transactions.
* @param fut Result future.
@@ -1785,7 +1786,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
if (nearVer.equals(tx.nearXidVersion())) {
TransactionState state = tx.state();
- IgniteInternalFuture<IgniteInternalTx> prepFut = tx.currentPrepareFuture();
+ IgniteInternalFuture<?> prepFut = tx.currentPrepareFuture();
if (prepFut != null && !prepFut.isDone()) {
if (log.isDebugEnabled())
@@ -1797,8 +1798,8 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
final Collection<GridCacheVersion> processedVers0 = processedVers;
- prepFut.listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() {
- @Override public void apply(IgniteInternalFuture<IgniteInternalTx> prepFut) {
+ prepFut.listen(new CI1<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> prepFut) {
if (log.isDebugEnabled())
log.debug("Transaction prepare future finished: " + tx);
@@ -1900,40 +1901,12 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
}
/**
- * Gets local transaction for pessimistic tx recovery.
- *
- * @param nearXidVer Near tx ID.
- * @return Near local or colocated local transaction.
- */
- @Nullable public IgniteInternalTx localTxForRecovery(GridCacheVersion nearXidVer, boolean markFinalizing) {
- // First check if we have near transaction with this ID.
- IgniteInternalTx tx = idMap.get(nearXidVer);
-
- if (tx == null) {
- // Check all local transactions and mark them as waiting for recovery to prevent finish race.
- for (IgniteInternalTx txEx : idMap.values()) {
- if (nearXidVer.equals(txEx.nearXidVersion())) {
- if (!markFinalizing || !txEx.markFinalizing(RECOVERY_WAIT))
- tx = txEx;
- }
- }
- }
-
- // Either we found near transaction or one of transactions is being committed by user.
- // Wait for it and send reply.
- if (tx != null && tx.local())
- return tx;
-
- return null;
- }
-
- /**
* Commits or rolls back prepared transaction.
*
* @param tx Transaction.
* @param commit Whether transaction should be committed or rolled back.
*/
- public void finishOptimisticTxOnRecovery(final IgniteInternalTx tx, boolean commit) {
+ public void finishTxOnRecovery(final IgniteInternalTx tx, boolean commit) {
if (log.isDebugEnabled())
log.debug("Finishing prepared transaction [tx=" + tx + ", commit=" + commit + ']');
@@ -1958,67 +1931,28 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
}
/**
- * Commits or rolls back pessimistic transaction.
+ * Commits transaction in case when node started transaction failed, but all related
+ * transactions were prepared (invalidates transaction if it is not fully prepared).
*
- * @param tx Transaction to finish.
- * @param commitInfo Commit information.
+ * @param tx Transaction.
*/
- public void finishPessimisticTxOnRecovery(final IgniteInternalTx tx, GridCacheCommittedTxInfo commitInfo) {
- if (!tx.markFinalizing(RECOVERY_FINISH)) {
- if (log.isDebugEnabled())
- log.debug("Will not try to finish pessimistic transaction (could not mark as finalizing): " + tx);
-
- return;
- }
-
- if (tx instanceof GridDistributedTxRemoteAdapter) {
- IgniteTxRemoteEx rmtTx = (IgniteTxRemoteEx)tx;
-
- rmtTx.doneRemote(tx.xidVersion(),
- Collections.<GridCacheVersion>emptyList(),
- Collections.<GridCacheVersion>emptyList(),
- Collections.<GridCacheVersion>emptyList());
- }
-
- try {
- tx.prepare();
-
- if (commitInfo != null) {
- for (IgniteTxEntry entry : commitInfo.recoveryWrites()) {
- IgniteTxEntry write = tx.writeMap().get(entry.txKey());
-
- if (write != null) {
- GridCacheEntryEx cached = write.cached();
-
- IgniteTxEntry recovered = entry.cleanCopy(write.context());
+ public void commitIfPrepared(IgniteInternalTx tx) {
+ assert tx instanceof GridDhtTxLocal || tx instanceof GridDhtTxRemote : tx;
+ assert !F.isEmpty(tx.transactionNodes()) : tx;
+ assert tx.nearXidVersion() != null : tx;
- if (cached == null || cached.detached())
- cached = write.context().cache().entryEx(entry.key(), tx.topologyVersion());
-
- recovered.cached(cached);
-
- tx.writeMap().put(entry.txKey(), recovered);
-
- continue;
- }
-
- // If write was not found, check read.
- IgniteTxEntry read = tx.readMap().remove(entry.txKey());
+ GridCacheTxRecoveryFuture fut = new GridCacheTxRecoveryFuture(
+ cctx,
+ tx,
+ tx.originatingNodeId(),
+ tx.transactionNodes());
- if (read != null)
- tx.writeMap().put(entry.txKey(), entry);
- }
+ cctx.mvcc().addFuture(fut);
- tx.commitAsync().listen(new CommitListener(tx));
- }
- else
- tx.rollbackAsync();
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to prepare pessimistic transaction (will invalidate): " + tx, e);
+ if (log.isDebugEnabled())
+ log.debug("Checking optimistic transaction state on remote nodes [tx=" + tx + ", fut=" + fut + ']');
- salvageTx(tx);
- }
+ fut.prepare();
}
/**
@@ -2065,11 +1999,11 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
if (tx.state() == PREPARED)
commitIfPrepared(tx);
else {
- IgniteInternalFuture<IgniteInternalTx> prepFut = tx.currentPrepareFuture();
+ IgniteInternalFuture<?> prepFut = tx.currentPrepareFuture();
if (prepFut != null) {
- prepFut.listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() {
- @Override public void apply(IgniteInternalFuture<IgniteInternalTx> fut) {
+ prepFut.listen(new CI1<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> fut) {
if (tx.state() == PREPARED)
commitIfPrepared(tx);
else if (tx.setRollbackOnly())
@@ -2091,31 +2025,6 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
cctx.kernalContext().gateway().readUnlock();
}
}
-
- /**
- * Commits optimistic transaction in case when node started transaction failed, but all related
- * transactions were prepared (invalidates transaction if it is not fully prepared).
- *
- * @param tx Transaction.
- */
- private void commitIfPrepared(IgniteInternalTx tx) {
- assert tx instanceof GridDhtTxLocal || tx instanceof GridDhtTxRemote : tx;
- assert !F.isEmpty(tx.transactionNodes()) : tx;
- assert tx.nearXidVersion() != null : tx;
-
- GridCacheOptimisticCheckPreparedTxFuture fut = new GridCacheOptimisticCheckPreparedTxFuture<>(
- cctx,
- tx,
- evtNodeId,
- tx.transactionNodes());
-
- cctx.mvcc().addFuture(fut);
-
- if (log.isDebugEnabled())
- log.debug("Checking optimistic transaction state on remote nodes [tx=" + tx + ", fut=" + fut + ']');
-
- fut.prepare();
- }
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index a69e033..db3d350 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -1420,6 +1420,8 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
GridDrType.DR_LOAD);
cctx.evicts().touch(entry, topVer);
+
+ CU.unwindEvicts(cctx);
}
catch (GridDhtInvalidPartitionException | GridCacheEntryRemovedException ignored) {
// No-op.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
index 319b696..aa6427d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
@@ -202,7 +202,10 @@ public class IgfsDataManager extends IgfsManager {
/** {@inheritDoc} */
@Override protected void onKernalStart0() throws IgniteCheckedException {
+ igfsCtx.kernalContext().cache().getOrStartCache(igfsCtx.configuration().getDataCacheName());
dataCachePrj = igfsCtx.kernalContext().cache().internalCache(igfsCtx.configuration().getDataCacheName());
+
+ igfsCtx.kernalContext().cache().getOrStartCache(igfsCtx.configuration().getDataCacheName());
dataCache = igfsCtx.kernalContext().cache().internalCache(igfsCtx.configuration().getDataCacheName());
metrics = igfsCtx.igfs().localMetrics();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteWorker.java
index 250b3a0..1b2d3fe 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteWorker.java
@@ -21,6 +21,7 @@ import org.apache.ignite.*;
import org.apache.ignite.cluster.*;
import org.apache.ignite.events.*;
import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.cluster.*;
import org.apache.ignite.internal.managers.communication.*;
import org.apache.ignite.internal.managers.eventstorage.*;
import org.apache.ignite.internal.util.future.*;
@@ -155,6 +156,9 @@ public class IgfsDeleteWorker extends IgfsThread {
try {
info = meta.info(TRASH_ID);
}
+ catch(ClusterTopologyServerNotFoundException e) {
+ LT.warn(log, e, "Server nodes not found.");
+ }
catch (IgniteCheckedException e) {
U.error(log, "Cannot obtain trash directory info.", e);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
index 35ca8bb..e33e0d4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
@@ -108,7 +108,7 @@ public class IgfsMetaManager extends IgfsManager {
/** {@inheritDoc} */
@Override protected void onKernalStart0() throws IgniteCheckedException {
- metaCache = igfsCtx.kernalContext().cache().cache(cfg.getMetaCacheName());
+ metaCache = igfsCtx.kernalContext().cache().getOrStartCache(cfg.getMetaCacheName());
assert metaCache != null;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
index 2a915ec..4b0234f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.igfs;
import org.apache.ignite.*;
import org.apache.ignite.igfs.*;
+import org.apache.ignite.internal.cluster.*;
import org.apache.ignite.internal.util.typedef.*;
import java.lang.reflect.*;
@@ -50,9 +51,13 @@ public class IgfsUtils {
if (err0 != null)
// Dealing with a kind of IGFS error, wrap it once again, preserving message and root cause.
err0 = newIgfsException(err0.getClass(), err0.getMessage(), err0);
- else
- // Unknown error nature.
- err0 = new IgfsException("Generic IGFS error occurred.", err);
+ else {
+ if (err instanceof ClusterTopologyServerNotFoundException)
+ err0 = new IgfsException("Cache server nodes not found.", err);
+ else
+ // Unknown error nature.
+ err0 = new IgfsException("Generic IGFS error occurred.", err);
+ }
}
return err0;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/internal/processors/offheap/GridOffHeapProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/offheap/GridOffHeapProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/offheap/GridOffHeapProcessor.java
index ebedadb..a99c4c0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/offheap/GridOffHeapProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/offheap/GridOffHeapProcessor.java
@@ -293,6 +293,23 @@ public class GridOffHeapProcessor extends GridProcessorAdapter {
}
/**
+ * Gets iterator over contents of the given space.
+ *
+ * @param spaceName Space name.
+ * @param c Key/value closure.
+ * @param part Partition.
+ * @return Iterator.
+ */
+ public <T> GridCloseableIterator<T> iterator(@Nullable String spaceName,
+ CX2<T2<Long, Integer>, T2<Long, Integer>, T> c, int part) {
+ assert c != null;
+
+ GridOffHeapPartitionedMap m = offheap(spaceName);
+
+ return m == null ? new GridEmptyCloseableIterator<T>() : m.iterator(c, part);
+ }
+
+ /**
* Gets number of elements in the given space.
*
* @param spaceName Space name. Optional.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
index fe029eb..0bb820d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
@@ -26,6 +26,7 @@ import org.apache.ignite.internal.processors.cache.query.*;
import org.apache.ignite.internal.util.*;
import org.apache.ignite.internal.util.lang.*;
import org.apache.ignite.lang.*;
+import org.apache.ignite.plugin.extensions.communication.*;
import org.apache.ignite.spi.indexing.*;
import org.jetbrains.annotations.*;
@@ -148,6 +149,22 @@ public interface GridQueryIndexing {
public void unregisterCache(CacheConfiguration<?, ?> ccfg) throws IgniteCheckedException;
/**
+ * Checks if the given class can be mapped to a simple SQL type.
+ *
+ * @param cls Class.
+ * @return {@code true} If can.
+ */
+ public boolean isSqlType(Class<?> cls);
+
+ /**
+ * Checks if the given class is GEOMETRY.
+ *
+ * @param cls Class.
+ * @return {@code true} If this is geometry.
+ */
+ public boolean isGeometryClass(Class<?> cls);
+
+ /**
* Registers type if it was not known before or updates it otherwise.
*
* @param spaceName Space name.
@@ -178,8 +195,8 @@ public interface GridQueryIndexing {
* @param expirationTime Expiration time or 0 if never expires.
* @throws IgniteCheckedException If failed.
*/
- public void store(@Nullable String spaceName, GridQueryTypeDescriptor type, Object key, Object val, byte[] ver,
- long expirationTime) throws IgniteCheckedException;
+ public void store(@Nullable String spaceName, GridQueryTypeDescriptor type, CacheObject key, CacheObject val,
+ byte[] ver, long expirationTime) throws IgniteCheckedException;
/**
* Removes index entry by key.
@@ -189,7 +206,7 @@ public interface GridQueryIndexing {
* @param val Value.
* @throws IgniteCheckedException If failed.
*/
- public void remove(@Nullable String spaceName, Object key, Object val) throws IgniteCheckedException;
+ public void remove(@Nullable String spaceName, CacheObject key, CacheObject val) throws IgniteCheckedException;
/**
* Will be called when entry with given key is swapped.
@@ -198,7 +215,7 @@ public interface GridQueryIndexing {
* @param key Key.
* @throws IgniteCheckedException If failed.
*/
- public void onSwap(@Nullable String spaceName, Object key) throws IgniteCheckedException;
+ public void onSwap(@Nullable String spaceName, CacheObject key) throws IgniteCheckedException;
/**
* Will be called when entry with given key is unswapped.
@@ -206,10 +223,9 @@ public interface GridQueryIndexing {
* @param spaceName Space name.
* @param key Key.
* @param val Value.
- * @param valBytes Value bytes.
* @throws IgniteCheckedException If failed.
*/
- public void onUnswap(@Nullable String spaceName, Object key, Object val, byte[] valBytes) throws IgniteCheckedException;
+ public void onUnswap(@Nullable String spaceName, CacheObject key, CacheObject val) throws IgniteCheckedException;
/**
* Rebuilds all indexes of given type.
@@ -225,4 +241,11 @@ public interface GridQueryIndexing {
* @return Backup filter.
*/
public IndexingQueryFilter backupFilter();
+
+ /**
+ * Gets message factory.
+ *
+ * @return Message factory.
+ */
+ public MessageFactory messageFactory();
}