You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2015/09/03 03:03:17 UTC
[08/50] [abbrv] ignite git commit: Merge branch sprint-3 into
ignite-264
http://git-wip-us.apache.org/repos/asf/ignite/blob/905a139e/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
----------------------------------------------------------------------
diff --cc modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
index 8467382,9551680..de9d112
--- a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
+++ b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
@@@ -139,12 -141,8 +141,12 @@@ public class MessageCodeGenerator
MessageCodeGenerator gen = new MessageCodeGenerator(srcDir);
- gen.generateAll(true);
+ gen.generateAndWrite(DataStreamerEntry.class);
+// gen.generateAndWrite(GridDistributedUnlockRequest.class);
+// gen.generateAndWrite(GridNearUnlockRequest.class);
+// gen.generateAndWrite(GridDhtUnlockRequest.class);
+//
// gen.generateAndWrite(GridDistributedLockRequest.class);
// gen.generateAndWrite(GridDistributedLockResponse.class);
// gen.generateAndWrite(GridNearLockRequest.class);
http://git-wip-us.apache.org/repos/asf/ignite/blob/905a139e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/905a139e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/905a139e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java
index 18bcc05,b0f5b2e..ca6ca1a
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java
@@@ -553,10 -650,15 +554,10 @@@ public final class GridCacheMvcc
*
* @param ver Version to mark as ready.
* @param mappedVer Mapped dht version.
- * @param committedVers Committed versions.
- * @param rolledBackVers Rolled back versions.
- * @param pending Pending dht versions that are not owned and which version is less then mapped.
* @return Lock owner after reassignment.
*/
- @Nullable public GridCacheMvccCandidate<K> readyNearLocal(GridCacheVersion ver, GridCacheVersion mappedVer) {
- GridCacheMvccCandidate<K> cand = candidate(locs, ver);
- @Nullable public GridCacheMvccCandidate readyNearLocal(GridCacheVersion ver, GridCacheVersion mappedVer,
- Collection<GridCacheVersion> committedVers, Collection<GridCacheVersion> rolledBackVers,
- Collection<GridCacheVersion> pending) {
++ @Nullable public GridCacheMvccCandidate readyNearLocal(GridCacheVersion ver, GridCacheVersion mappedVer) {
+ GridCacheMvccCandidate cand = candidate(locs, ver);
if (cand != null) {
assert cand.nearLocal() : "Near local candidate is not marked as near local: " + cand;
@@@ -610,10 -730,16 +611,10 @@@
* Sets remote candidate to done.
*
* @param ver Version.
- * @param pending Pending versions.
- * @param committed Committed versions.
- * @param rolledback Rolledback versions.
* @return Lock owner.
*/
- @Nullable public GridCacheMvccCandidate<K> doneRemote(
+ @Nullable public GridCacheMvccCandidate doneRemote(
- GridCacheVersion ver,
- Collection<GridCacheVersion> pending,
- Collection<GridCacheVersion> committed,
- Collection<GridCacheVersion> rolledback) {
+ GridCacheVersion ver) {
assert ver != null;
if (log.isDebugEnabled())
@@@ -708,13 -843,9 +709,13 @@@
}
}
+ // No assignment can happen in near local cache when remote candidate is present.
+ if (cctx.isNear() && firstRmt != null)
+ return;
+
if (locs != null) {
- for (ListIterator<GridCacheMvccCandidate<K>> it = locs.listIterator(); it.hasNext(); ) {
- GridCacheMvccCandidate<K> cand = it.next();
+ for (ListIterator<GridCacheMvccCandidate> it = locs.listIterator(); it.hasNext(); ) {
+ GridCacheMvccCandidate cand = it.next();
if (cand.owner())
return;
http://git-wip-us.apache.org/repos/asf/ignite/blob/905a139e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/905a139e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/905a139e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java
index 429023b,6d3007b..f6bcc14
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java
@@@ -154,9 -153,10 +154,13 @@@ public class GridCacheOptimisticCheckPr
add(fut);
- GridCacheOptimisticCheckPreparedTxRequest<K, V>
- req = new GridCacheOptimisticCheckPreparedTxRequest<>(tx,
- nodeTransactions(id), futureId(), fut.futureId(), false);
- GridCacheOptimisticCheckPreparedTxRequest req = new GridCacheOptimisticCheckPreparedTxRequest(tx,
- nodeTransactions(id),
- futureId(),
- fut.futureId());
++ GridCacheOptimisticCheckPreparedTxRequest req = new GridCacheOptimisticCheckPreparedTxRequest(
++ tx,
++ nodeTransactions(id),
++ futureId(),
++ fut.futureId(),
++ false
++ );
try {
cctx.io().send(id, req, tx.ioPolicy());
@@@ -176,8 -176,8 +180,13 @@@
add(fut);
- GridCacheOptimisticCheckPreparedTxRequest<K, V> req = new GridCacheOptimisticCheckPreparedTxRequest<>(
- tx, nodeTransactions(nodeId), futureId(), fut.futureId(), false);
+ GridCacheOptimisticCheckPreparedTxRequest req = new GridCacheOptimisticCheckPreparedTxRequest(
- tx, nodeTransactions(nodeId), futureId(), fut.futureId());
++ tx,
++ nodeTransactions(nodeId),
++ futureId(),
++ fut.futureId(),
++ false
++ );
try {
cctx.io().send(nodeId, req, tx.ioPolicy());
http://git-wip-us.apache.org/repos/asf/ignite/blob/905a139e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxRequest.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxRequest.java
index 4b61b7e,e83db66..677d2dc
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxRequest.java
@@@ -65,13 -62,8 +65,13 @@@ public class GridCacheOptimisticCheckPr
* @param futId Future ID.
* @param miniId Mini future ID.
*/
- public GridCacheOptimisticCheckPreparedTxRequest(IgniteInternalTx tx, int txNum, IgniteUuid futId,
- IgniteUuid miniId) {
+ public GridCacheOptimisticCheckPreparedTxRequest(
- IgniteInternalTx<K, V> tx,
++ IgniteInternalTx tx,
+ int txNum,
+ IgniteUuid futId,
+ IgniteUuid miniId,
+ boolean nearCheck
+ ) {
super(tx.xidVersion(), 0);
nearXidVer = tx.nearXidVersion();
http://git-wip-us.apache.org/repos/asf/ignite/blob/905a139e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxResponse.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/905a139e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedBaseMessage.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/905a139e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java
index cb7207d,a1d83d5..8b424ee
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java
@@@ -163,13 -170,13 +169,13 @@@ public class GridDistributedCacheEntry
GridCacheVersion ver,
long timeout,
boolean tx,
- boolean implicitSingle,
- @Nullable GridCacheVersion owned) throws GridDistributedLockCancelledException,
+ boolean implicitSingle
+ ) throws GridDistributedLockCancelledException,
GridCacheEntryRemovedException {
- GridCacheMvccCandidate<K> prev;
- GridCacheMvccCandidate<K> owner;
+ GridCacheMvccCandidate prev;
+ GridCacheMvccCandidate owner;
- V val;
+ CacheObject val;
synchronized (this) {
// Check removed locks prior to obsolete flag.
@@@ -499,12 -512,14 +505,12 @@@
*
* @throws GridCacheEntryRemovedException If entry is removed.
*/
- @Nullable public GridCacheMvccCandidate<K> readyNearLock(GridCacheVersion ver, GridCacheVersion mapped)
- @Nullable public GridCacheMvccCandidate readyNearLock(GridCacheVersion ver, GridCacheVersion mapped,
- Collection<GridCacheVersion> committed,
- Collection<GridCacheVersion> rolledBack,
- Collection<GridCacheVersion> pending) throws GridCacheEntryRemovedException {
++ @Nullable public GridCacheMvccCandidate readyNearLock(GridCacheVersion ver, GridCacheVersion mapped)
+ throws GridCacheEntryRemovedException {
- GridCacheMvccCandidate<K> prev = null;
- GridCacheMvccCandidate<K> owner = null;
+ GridCacheMvccCandidate prev = null;
+ GridCacheMvccCandidate owner = null;
- V val;
+ CacheObject val;
synchronized (this) {
checkObsolete();
@@@ -547,14 -609,40 +553,14 @@@
* @throws GridCacheEntryRemovedException If entry has been removed.
* @return Owner.
*/
- @Nullable public GridCacheMvccCandidate<K> doneRemote(
+ @Nullable public GridCacheMvccCandidate doneRemote(
GridCacheVersion lockVer,
GridCacheVersion baseVer,
- Collection<GridCacheVersion> committedVers,
- Collection<GridCacheVersion> rolledbackVers,
- boolean sysInvalidate) throws GridCacheEntryRemovedException {
- return doneRemote(lockVer, baseVer, Collections.<GridCacheVersion>emptySet(), committedVers,
- rolledbackVers, sysInvalidate);
- }
-
- /**
- *
- * @param lockVer Done version.
- * @param baseVer Base version.
- * @param pendingVers Pending versions that are less than lock version.
- * @param committedVers Completed versions for reordering.
- * @param rolledbackVers Rolled back versions for reordering.
- * @param sysInvalidate Flag indicating if this entry is done from invalidated transaction (in case of tx
- * salvage). In this case all locks before salvaged lock will marked as used and corresponding
- * transactions will be invalidated.
- * @throws GridCacheEntryRemovedException If entry has been removed.
- * @return Owner.
- */
- @Nullable public GridCacheMvccCandidate doneRemote(
- GridCacheVersion lockVer,
- GridCacheVersion baseVer,
- @Nullable Collection<GridCacheVersion> pendingVers,
- Collection<GridCacheVersion> committedVers,
- Collection<GridCacheVersion> rolledbackVers,
boolean sysInvalidate) throws GridCacheEntryRemovedException {
- GridCacheMvccCandidate<K> prev = null;
- GridCacheMvccCandidate<K> owner = null;
+ GridCacheMvccCandidate prev = null;
+ GridCacheMvccCandidate owner = null;
- V val;
+ CacheObject val;
synchronized (this) {
checkObsolete();
http://git-wip-us.apache.org/repos/asf/ignite/blob/905a139e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java
index 1da0e0f,37b34e7..58a93ee
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java
@@@ -284,19 -295,27 +260,21 @@@ public class GridDistributedLockReques
/** {@inheritDoc}
* @param ctx*/
- @Override public void prepareMarshal(GridCacheSharedContext<K, V> ctx) throws IgniteCheckedException {
+ @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
super.prepareMarshal(ctx);
- if (keyBytes == null && keys != null)
- keyBytes = marshalCollection(keys, ctx);
+ GridCacheContext cctx = ctx.cacheContext(cacheId);
+
+ prepareMarshalCacheObjects(keys, cctx);
-
- if (grpLockKey != null)
- grpLockKey.prepareMarshal(cctx);
}
/** {@inheritDoc} */
- @Override public void finishUnmarshal(GridCacheSharedContext<K, V> ctx, ClassLoader ldr) throws IgniteCheckedException {
+ @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
super.finishUnmarshal(ctx, ldr);
- if (keys == null)
- keys = unmarshalCollection(keyBytes, ctx, ldr);
+ GridCacheContext cctx = ctx.cacheContext(cacheId);
+
+ finishUnmarshalCacheObjects(keys, cctx, ldr);
-
- if (grpLockKey != null)
- grpLockKey.finishUnmarshal(cctx, ldr);
}
/** {@inheritDoc} */
@@@ -344,8 -369,8 +322,8 @@@
writer.incrementState();
- case 14:
+ case 11:
- if (!writer.writeCollection("keyBytes", keyBytes, MessageCollectionItemType.BYTE_ARR))
+ if (!writer.writeCollection("keys", keys, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
@@@ -446,8 -485,8 +424,8 @@@
reader.incrementState();
- case 14:
+ case 11:
- keyBytes = reader.readCollection("keyBytes", MessageCollectionItemType.BYTE_ARR);
+ keys = reader.readCollection("keys", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
return false;
http://git-wip-us.apache.org/repos/asf/ignite/blob/905a139e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java
index 0560234,b5e8d61..0fb1854
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java
@@@ -170,10 -164,15 +164,10 @@@ public class GridDistributedLockRespons
/**
* @param idx Candidates index.
* @param cands Collection of candidates.
- * @param committedVers Committed versions relative to lock version.
- * @param rolledbackVers Rolled back versions relative to lock version.
*/
- public void setCandidates(int idx, Collection<GridCacheMvccCandidate<K>> cands) {
- public void setCandidates(int idx, Collection<GridCacheMvccCandidate> cands,
- Collection<GridCacheVersion> committedVers, Collection<GridCacheVersion> rolledbackVers) {
++ public void setCandidates(int idx, Collection<GridCacheMvccCandidate> cands) {
assert idx >= 0;
- completedVersions(committedVers, rolledbackVers);
-
candidatesByIndex(idx, cands);
}
@@@ -298,8 -255,8 +250,8 @@@
writer.incrementState();
- case 10:
+ case 8:
- if (!writer.writeCollection("valBytes", valBytes, MessageCollectionItemType.MSG))
+ if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
@@@ -336,8 -293,8 +288,8 @@@
reader.incrementState();
- case 10:
+ case 8:
- valBytes = reader.readCollection("valBytes", MessageCollectionItemType.MSG);
+ vals = reader.readCollection("vals", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
return false;
http://git-wip-us.apache.org/repos/asf/ignite/blob/905a139e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
index fbdcde5,9672a75..18ae318
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
@@@ -57,9 -63,18 +62,12 @@@ public class GridDistributedTxFinishReq
/** Expected txSize. */
private int txSize;
- /** Group lock key. */
- private IgniteTxKey grpLockKey;
-
- /** System transaction flag. */
+ /** System flag. */
private boolean sys;
+ /** IO policy. */
+ private GridIoPolicy plc;
+
/**
* Empty constructor required by {@link Externalizable}.
*/
@@@ -74,8 -89,13 +82,9 @@@
* @param commitVer Commit version.
* @param commit Commit flag.
* @param invalidate Invalidate flag.
- * @param sys System transaction flag.
+ * @param sys System flag.
+ * @param plc IO policy.
- * @param baseVer Base version.
- * @param committedVers Committed versions.
- * @param rolledbackVers Rolled back versions.
* @param txSize Expected transaction size.
- * @param grpLockKey Group lock key if this is a group-lock transaction.
*/
public GridDistributedTxFinishRequest(
GridCacheVersion xidVer,
@@@ -85,9 -105,14 +94,10 @@@
boolean commit,
boolean invalidate,
boolean sys,
+ GridIoPolicy plc,
boolean syncCommit,
boolean syncRollback,
- GridCacheVersion baseVer,
- Collection<GridCacheVersion> committedVers,
- Collection<GridCacheVersion> rolledbackVers,
- int txSize,
- @Nullable IgniteTxKey grpLockKey
+ int txSize
) {
super(xidVer, 0);
assert xidVer != null;
@@@ -98,9 -123,14 +108,10 @@@
this.commit = commit;
this.invalidate = invalidate;
this.sys = sys;
+ this.plc = plc;
this.syncCommit = syncCommit;
this.syncRollback = syncRollback;
- this.baseVer = baseVer;
this.txSize = txSize;
- this.grpLockKey = grpLockKey;
-
- completedVersions(committedVers, rolledbackVers);
}
/**
@@@ -292,15 -401,27 +310,10 @@@
reader.incrementState();
- case 14:
- byte plcOrd;
-
- plcOrd = reader.readByte("plc");
-
- if (!reader.isLastRead())
- return false;
-
- plc = GridIoPolicy.fromOrdinal(plcOrd);
-
- reader.incrementState();
-
- case 15:
+ case 10:
syncCommit = reader.readBoolean("syncCommit");
--
-- if (!reader.isLastRead())
-- return false;
--
-- reader.incrementState();
--
- case 16:
++
+ case 11:
syncRollback = reader.readBoolean("syncRollback");
if (!reader.isLastRead())
http://git-wip-us.apache.org/repos/asf/ignite/blob/905a139e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
index 2e99ef5,ec02e6e..1ecbb4e
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
@@@ -85,12 -78,28 +78,16 @@@ public class GridDistributedTxPrepareRe
/** DHT versions to verify. */
@GridToStringInclude
@GridDirectTransient
- private Map<IgniteTxKey<K>, GridCacheVersion> dhtVers;
+ private Map<IgniteTxKey, GridCacheVersion> dhtVers;
- /** Serialized map. */
- @GridToStringExclude
- private byte[] dhtVersBytes;
+ /** */
+ @GridDirectCollection(IgniteTxKey.class)
+ private Collection<IgniteTxKey> dhtVerKeys;
+
+ /** */
+ @GridDirectCollection(GridCacheVersion.class)
+ private Collection<GridCacheVersion> dhtVerVals;
- /** Group lock key, if any. */
- @GridToStringInclude
- @GridDirectTransient
- private IgniteTxKey grpLockKey;
-
- /** Group lock key bytes. */
- @GridToStringExclude
- private byte[] grpLockKeyBytes;
-
- /** Partition lock flag. */
- private boolean partLock;
-
/** Expected transaction size. */
private int txSize;
@@@ -122,9 -136,11 +122,9 @@@
* @param onePhaseCommit One phase commit flag.
*/
public GridDistributedTxPrepareRequest(
- IgniteInternalTx<K, V> tx,
- @Nullable Collection<IgniteTxEntry<K, V>> reads,
- Collection<IgniteTxEntry<K, V>> writes,
+ IgniteInternalTx tx,
+ @Nullable Collection<IgniteTxEntry> reads,
+ Collection<IgniteTxEntry> writes,
- IgniteTxKey grpLockKey,
- boolean partLock,
Map<UUID, Collection<UUID>> txNodes,
boolean onePhaseCommit
) {
@@@ -261,119 -301,66 +269,60 @@@
/** {@inheritDoc}
* @param ctx*/
- @Override public void prepareMarshal(GridCacheSharedContext<K, V> ctx) throws IgniteCheckedException {
+ @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
super.prepareMarshal(ctx);
- if (writes != null) {
+ if (writes != null)
marshalTx(writes, ctx);
- writesBytes = new ArrayList<>(writes.size());
-
- for (IgniteTxEntry<K, V> e : writes)
- writesBytes.add(ctx.marshaller().marshal(e));
- }
-
- if (reads != null) {
+ if (reads != null)
marshalTx(reads, ctx);
- readsBytes = new ArrayList<>(reads.size());
- if (grpLockKey != null && grpLockKeyBytes == null)
- grpLockKeyBytes = ctx.marshaller().marshal(grpLockKey);
-
+ if (dhtVers != null) {
+ for (IgniteTxKey key : dhtVers.keySet()) {
+ GridCacheContext cctx = ctx.cacheContext(key.cacheId());
- for (IgniteTxEntry<K, V> e : reads)
- readsBytes.add(ctx.marshaller().marshal(e));
- }
+ key.prepareMarshal(cctx);
+ }
- if (dhtVers != null && dhtVersBytes == null)
- dhtVersBytes = ctx.marshaller().marshal(dhtVers);
+ dhtVerKeys = dhtVers.keySet();
+ dhtVerVals = dhtVers.values();
+ }
if (txNodes != null)
txNodesBytes = ctx.marshaller().marshal(txNodes);
}
/** {@inheritDoc} */
- @Override public void finishUnmarshal(GridCacheSharedContext<K, V> ctx, ClassLoader ldr) throws IgniteCheckedException {
+ @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
super.finishUnmarshal(ctx, ldr);
- if (writesBytes != null) {
- writes = new ArrayList<>(writesBytes.size());
-
- for (byte[] arr : writesBytes)
- writes.add(ctx.marshaller().<IgniteTxEntry<K, V>>unmarshal(arr, ldr));
-
+ if (writes != null)
unmarshalTx(writes, false, ctx, ldr);
- }
-
- if (readsBytes != null) {
- reads = new ArrayList<>(readsBytes.size());
-
- for (byte[] arr : readsBytes)
- reads.add(ctx.marshaller().<IgniteTxEntry<K, V>>unmarshal(arr, ldr));
+ if (reads != null)
unmarshalTx(reads, false, ctx, ldr);
- }
- if (dhtVersBytes != null && dhtVers == null)
- dhtVers = ctx.marshaller().unmarshal(dhtVersBytes, ldr);
- if (grpLockKeyBytes != null && grpLockKey == null)
- grpLockKey = ctx.marshaller().unmarshal(grpLockKeyBytes, ldr);
-
+ if (dhtVerKeys != null && dhtVers == null) {
+ assert dhtVerVals != null;
+ assert dhtVerKeys.size() == dhtVerVals.size();
- if (txNodesBytes != null)
- txNodes = ctx.marshaller().unmarshal(txNodesBytes, ldr);
- }
+ Iterator<IgniteTxKey> keyIt = dhtVerKeys.iterator();
+ Iterator<GridCacheVersion> verIt = dhtVerVals.iterator();
- /**
- *
- * @param out Output.
- * @param col Set to write.
- * @throws IOException If write failed.
- */
- private void writeCollection(ObjectOutput out, Collection<IgniteTxEntry<K, V>> col) throws IOException {
- boolean empty = F.isEmpty(col);
-
- if (!empty) {
- out.writeInt(col.size());
-
- for (IgniteTxEntry<K, V> e : col) {
- V val = e.value();
- boolean hasWriteVal = e.hasWriteValue();
- boolean hasReadVal = e.hasReadValue();
-
- try {
- // Don't serialize value if invalidate is set to true.
- if (invalidate)
- e.value(null, false, false);
-
- out.writeObject(e);
- }
- finally {
- // Set original value back.
- e.value(val, hasWriteVal, hasReadVal);
- }
- }
- }
- else
- out.writeInt(-1);
- }
-
- /**
- * @param in Input.
- * @return Deserialized set.
- * @throws IOException If deserialization failed.
- * @throws ClassNotFoundException If deserialized class could not be found.
- */
- @SuppressWarnings({"unchecked"})
- @Nullable private Collection<IgniteTxEntry<K, V>> readCollection(ObjectInput in) throws IOException,
- ClassNotFoundException {
- List<IgniteTxEntry<K, V>> col = null;
+ dhtVers = U.newHashMap(dhtVerKeys.size());
- int size = in.readInt();
+ while (keyIt.hasNext()) {
+ IgniteTxKey key = keyIt.next();
- // Check null flag.
- if (size != -1) {
- col = new ArrayList<>(size);
+ key.finishUnmarshal(ctx.cacheContext(key.cacheId()), ldr);
- for (int i = 0; i < size; i++)
- col.add((IgniteTxEntry<K, V>)in.readObject());
+ dhtVers.put(key, verIt.next());
+ }
}
- return col == null ? Collections.<IgniteTxEntry<K,V>>emptyList() : col;
+ if (txNodesBytes != null)
+ txNodes = ctx.marshaller().unmarshal(txNodesBytes, ldr);
}
/** {@inheritDoc} */
@@@ -403,7 -396,13 +358,7 @@@
writer.incrementState();
- case 8:
- case 11:
- if (!writer.writeByteArray("grpLockKeyBytes", grpLockKeyBytes))
- return false;
-
- writer.incrementState();
-
+ case 12:
if (!writer.writeBoolean("invalidate", invalidate))
return false;
@@@ -421,8 -420,14 +376,8 @@@
writer.incrementState();
- case 11:
- if (!writer.writeCollection("readsBytes", readsBytes, MessageCollectionItemType.BYTE_ARR))
- case 15:
- if (!writer.writeBoolean("partLock", partLock))
- return false;
-
- writer.incrementState();
-
+ case 16:
+ if (!writer.writeByte("plc", plc != null ? (byte)plc.ordinal() : -1))
return false;
writer.incrementState();
http://git-wip-us.apache.org/repos/asf/ignite/blob/905a139e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/905a139e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index e6c5893,ceb8a7c..7ae4835
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@@ -188,9 -195,23 +192,13 @@@ public class GridDistributedTxRemoteAda
// No-op.
}
- /**
- * Adds group lock key to remote transaction.
- *
- * @param key Key.
- */
- public void groupLockKey(IgniteTxKey key) {
- if (grpLockKey == null)
- grpLockKey = key;
- }
-
/** {@inheritDoc} */
- @Override public GridTuple<V> peek(GridCacheContext<K, V> cacheCtx, boolean failFast, K key,
- IgnitePredicate<Cache.Entry<K, V>>[] filter) throws GridCacheFilterFailedException {
+ @Override public <K, V> GridTuple<CacheObject> peek(GridCacheContext cacheCtx,
+ boolean failFast,
+ KeyCacheObject key,
+ CacheEntryPredicate[] filter)
+ throws GridCacheFilterFailedException
+ {
assert false : "Method peek can only be called on user transaction: " + this;
throw new IllegalStateException("Method peek can only be called on user transaction: " + this);
@@@ -218,16 -239,19 +226,16 @@@
/**
* @param baseVer Base version.
- * @param committedVers Committed versions.
- * @param rolledbackVers Rolled back versions.
*/
- @Override public void doneRemote(GridCacheVersion baseVer, Collection<GridCacheVersion> committedVers,
- Collection<GridCacheVersion> rolledbackVers, Collection<GridCacheVersion> pendingVers) {
+ @Override public void doneRemote(GridCacheVersion baseVer) {
if (readMap != null && !readMap.isEmpty()) {
- for (IgniteTxEntry<K, V> txEntry : readMap.values())
+ for (IgniteTxEntry txEntry : readMap.values())
- doneRemote(txEntry, baseVer, committedVers, rolledbackVers, pendingVers);
+ doneRemote(txEntry, baseVer);
}
if (writeMap != null && !writeMap.isEmpty()) {
- for (IgniteTxEntry<K, V> txEntry : writeMap.values())
+ for (IgniteTxEntry txEntry : writeMap.values())
- doneRemote(txEntry, baseVer, committedVers, rolledbackVers, pendingVers);
+ doneRemote(txEntry, baseVer);
}
}
@@@ -236,10 -260,15 +244,10 @@@
*
* @param txEntry Entry.
* @param baseVer Base version for completed versions.
- * @param committedVers Completed versions relative to base version.
- * @param rolledbackVers Rolled back versions relative to base version.
- * @param pendingVers Pending versions.
*/
- private void doneRemote(IgniteTxEntry<K, V> txEntry, GridCacheVersion baseVer) {
- private void doneRemote(IgniteTxEntry txEntry, GridCacheVersion baseVer,
- Collection<GridCacheVersion> committedVers, Collection<GridCacheVersion> rolledbackVers,
- Collection<GridCacheVersion> pendingVers) {
++ private void doneRemote(IgniteTxEntry txEntry, GridCacheVersion baseVer) {
while (true) {
- GridDistributedCacheEntry<K, V> entry = (GridDistributedCacheEntry<K, V>)txEntry.cached();
+ GridDistributedCacheEntry entry = (GridDistributedCacheEntry)txEntry.cached();
try {
// Handle explicit locks.
@@@ -409,9 -438,9 +416,9 @@@
assert txEntry != null : "Missing transaction entry for tx: " + this;
while (true) {
- GridCacheEntryEx<K, V> entry = txEntry.cached();
- GridCacheEntryEx Entry = txEntry.cached();
++ GridCacheEntryEx entry = txEntry.cached();
- assert Entry != null : "Missing cached entry for transaction entry: " + txEntry;
+ assert entry != null : "Missing cached entry for transaction entry: " + txEntry;
try {
GridCacheVersion ver = txEntry.explicitVersion() != null ? txEntry.explicitVersion() : xidVer;
http://git-wip-us.apache.org/repos/asf/ignite/blob/905a139e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedUnlockRequest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/905a139e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index 98fbd47,a38a51e..be200d7
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@@ -304,27 -329,7 +329,27 @@@ public class GridClientPartitionTopolog
}
/** {@inheritDoc} */
+ @Override public GridDhtPartitionState partitionState(UUID nodeId, int part) {
+ lock.readLock().lock();
+
+ try {
+ GridDhtPartitionMap partMap = node2part.get(nodeId);
+
+ if (partMap != null) {
+ GridDhtPartitionState state = partMap.get(part);
+
+ return state == null ? EVICTED : state;
+ }
+
+ return EVICTED;
+ }
+ finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
- @Override public Collection<ClusterNode> nodes(int p, long topVer) {
+ @Override public Collection<ClusterNode> nodes(int p, AffinityTopologyVersion topVer) {
lock.readLock().lock();
try {
http://git-wip-us.apache.org/repos/asf/ignite/blob/905a139e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/905a139e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
index 53c13ba,718e565..1af96e9
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
@@@ -741,14 -742,16 +742,14 @@@ public final class GridDhtLockFuture<K
if (log.isDebugEnabled())
log.debug("Mapping entry for DHT lock future: " + this);
- boolean hasRmtNodes = false;
-
// Assign keys to primary nodes.
- for (GridDhtCacheEntry<K, V> entry : entries) {
+ for (GridDhtCacheEntry entry : entries) {
try {
while (true) {
try {
- hasRmtNodes = cctx.dhtMap(nearNodeId, topVer, entry, log, dhtMap, null);
+ cctx.dhtMap(nearNodeId, topVer, entry, log, dhtMap, null);
- GridCacheMvccCandidate<K> cand = entry.mappings(lockVer);
+ GridCacheMvccCandidate cand = entry.mappings(lockVer);
// Possible in case of lock cancellation.
if (cand == null) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/905a139e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java
index d4c0350,0574f17..41b5489
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java
@@@ -19,11 -19,13 +19,11 @@@ package org.apache.ignite.internal.proc
import org.apache.ignite.*;
import org.apache.ignite.internal.*;
+ import org.apache.ignite.internal.processors.affinity.*;
import org.apache.ignite.internal.processors.cache.*;
import org.apache.ignite.internal.processors.cache.distributed.*;
-import org.apache.ignite.internal.processors.cache.transactions.*;
import org.apache.ignite.internal.processors.cache.version.*;
-import org.apache.ignite.internal.util.*;
import org.apache.ignite.internal.util.tostring.*;
- import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.lang.*;
import org.apache.ignite.plugin.extensions.communication.*;
@@@ -57,8 -54,16 +52,8 @@@ public class GridDhtLockRequest extend
/** Mini future ID. */
private IgniteUuid miniId;
- /** Owner mapped version, if any. */
- @GridToStringInclude
- @GridDirectTransient
- private Map<KeyCacheObject, GridCacheVersion> owned;
-
- /** Owner mapped version bytes. */
- private byte[] ownedBytes;
-
/** Topology version. */
- private long topVer;
+ private AffinityTopologyVersion topVer;
/** Subject ID. */
private UUID subjId;
@@@ -278,18 -294,23 +257,17 @@@
/** {@inheritDoc}
* @param ctx*/
- @Override public void prepareMarshal(GridCacheSharedContext<K, V> ctx) throws IgniteCheckedException {
+ @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
super.prepareMarshal(ctx);
- assert F.isEmpty(nearKeys) || !F.isEmpty(nearKeyBytes);
+ prepareMarshalCacheObjects(nearKeys, ctx.cacheContext(cacheId));
-
- if (owned != null)
- ownedBytes = CU.marshal(ctx, owned);
}
/** {@inheritDoc} */
- @Override public void finishUnmarshal(GridCacheSharedContext<K, V> ctx, ClassLoader ldr) throws IgniteCheckedException {
+ @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
super.finishUnmarshal(ctx, ldr);
- if (nearKeys == null && nearKeyBytes != null)
- nearKeys = unmarshalCollection(nearKeyBytes, ctx, ldr);
+ finishUnmarshalCacheObjects(nearKeys, ctx.cacheContext(cacheId), ldr);
-
- if (ownedBytes != null)
- owned = ctx.marshaller().unmarshal(ownedBytes, ldr);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/905a139e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockResponse.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/905a139e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/905a139e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 40fde60,c33ffcb..899c580
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@@ -572,27 -602,7 +602,27 @@@ class GridDhtPartitionTopologyImpl<K, V
}
/** {@inheritDoc} */
+ @Override public GridDhtPartitionState partitionState(UUID nodeId, int part) {
+ lock.readLock().lock();
+
+ try {
+ GridDhtPartitionMap partMap = node2part.get(nodeId);
+
+ if (partMap != null) {
+ GridDhtPartitionState state = partMap.get(part);
+
+ return state == null ? EVICTED : state;
+ }
+
+ return EVICTED;
+ }
+ finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
- @Override public Collection<ClusterNode> nodes(int p, long topVer) {
+ @Override public Collection<ClusterNode> nodes(int p, AffinityTopologyVersion topVer) {
Collection<ClusterNode> affNodes = cctx.affinity().nodes(p, topVer);
lock.readLock().lock();
http://git-wip-us.apache.org/repos/asf/ignite/blob/905a139e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index 6a4ff97,737e3ed..fa679ef
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@@ -666,19 -675,31 +670,19 @@@ public abstract class GridDhtTransactio
IgniteInternalFuture<Object> keyFut = null;
- if (req.onePhaseCommit()) {
- boolean forceKeys = req.hasTransforms() || req.filter() != null;
-
- if (!forceKeys) {
- for (int i = 0; i < req.keysCount() && !forceKeys; i++)
- forceKeys |= req.returnValue(i);
- }
-
- if (forceKeys)
- keyFut = ctx.dht().dhtPreloader().request(keys, req.topologyVersion());
- }
-
if (keyFut == null)
- keyFut = new GridFinishedFutureEx<>();
+ keyFut = new GridFinishedFuture<>();
- return new GridEmbeddedFuture<>(true, keyFut,
- new C2<Object, Exception, IgniteInternalFuture<GridNearLockResponse<K,V>>>() {
- @Override public IgniteInternalFuture<GridNearLockResponse<K, V>> apply(Object o, Exception exx) {
+ return new GridEmbeddedFuture<>(keyFut,
+ new C2<Object, Exception, IgniteInternalFuture<GridNearLockResponse>>() {
+ @Override public IgniteInternalFuture<GridNearLockResponse> apply(Object o, Exception exx) {
if (exx != null)
- return new GridDhtFinishedFuture<>(ctx.kernalContext(), exx);
+ return new GridDhtFinishedFuture<>(exx);
- IgnitePredicate<Cache.Entry<K, V>>[] filter = filter0;
+ CacheEntryPredicate[] filter = filter0;
// Set message into thread context.
- GridDhtTxLocal<K, V> tx = null;
+ GridDhtTxLocal tx = null;
try {
int cnt = keys.size();
@@@ -808,20 -832,22 +814,21 @@@
if (log.isDebugEnabled())
log.debug("Performing DHT lock [tx=" + tx + ", entries=" + entries + ']');
- IgniteInternalFuture<GridCacheReturn<V>> txFut = tx.lockAllAsync(
+ IgniteInternalFuture<GridCacheReturn> txFut = tx.lockAllAsync(
cacheCtx,
entries,
- req.onePhaseCommit(),
req.messageId(),
req.txRead(),
+ req.needReturnValue(),
req.accessTtl());
- final GridDhtTxLocal<K, V> t = tx;
+ final GridDhtTxLocal t = tx;
- return new GridDhtEmbeddedFuture<>(
+ return new GridDhtEmbeddedFuture(
txFut,
- new C2<GridCacheReturn<V>, Exception, IgniteInternalFuture<GridNearLockResponse<K, V>>>() {
- @Override public IgniteInternalFuture<GridNearLockResponse<K, V>> apply(
- GridCacheReturn<V> o, Exception e) {
+ new C2<GridCacheReturn, Exception, IgniteInternalFuture<GridNearLockResponse>>() {
+ @Override public IgniteInternalFuture<GridNearLockResponse> apply(
+ GridCacheReturn o, Exception e) {
if (e != null)
e = U.unwrap(e);
@@@ -943,10 -968,17 +949,10 @@@
req.version(), req.futureId(), req.miniId(), tx != null && tx.onePhaseCommit(), entries.size(), err);
if (err == null) {
- res.pending(localDhtPendingVersions(entries, mappedVer));
-
- // We have to add completed versions for cases when nearLocal and remote transactions
- // execute concurrently.
- res.completedVersions(ctx.tm().committedVersions(req.version()),
- ctx.tm().rolledbackVersions(req.version()));
-
int i = 0;
- for (ListIterator<GridCacheEntryEx<K, V>> it = entries.listIterator(); it.hasNext();) {
- GridCacheEntryEx<K, V> e = it.next();
+ for (ListIterator<GridCacheEntryEx> it = entries.listIterator(); it.hasNext();) {
+ GridCacheEntryEx e = it.next();
assert e != null;
@@@ -1330,13 -1399,16 +1336,13 @@@
}
}
- Collection<GridCacheVersion> committed = ctx.tm().committedVersions(ver);
- Collection<GridCacheVersion> rolledback = ctx.tm().rolledbackVersions(ver);
-
// Backups.
- for (Map.Entry<ClusterNode, List<T2<K, byte[]>>> entry : dhtMap.entrySet()) {
+ for (Map.Entry<ClusterNode, List<KeyCacheObject>> entry : dhtMap.entrySet()) {
ClusterNode n = entry.getKey();
- List<T2<K, byte[]>> keyBytes = entry.getValue();
+ List<KeyCacheObject> keyBytes = entry.getValue();
- GridDhtUnlockRequest<K, V> req = new GridDhtUnlockRequest<>(ctx.cacheId(), keyBytes.size());
+ GridDhtUnlockRequest req = new GridDhtUnlockRequest(ctx.cacheId(), keyBytes.size());
req.version(dhtVer);
@@@ -1347,9 -1419,11 +1353,9 @@@
keyBytes = nearMap.get(n);
if (keyBytes != null)
- for (T2<K, byte[]> key : keyBytes)
- req.addNearKey(key.get1(), key.get2(), ctx.shared());
+ for (KeyCacheObject key : keyBytes)
+ req.addNearKey(key, ctx.shared());
- req.completedVersions(committed, rolledback);
-
ctx.io().send(n, req, ctx.ioPolicy());
}
catch (ClusterTopologyCheckedException ignore) {
@@@ -1373,9 -1447,11 +1379,9 @@@
req.version(dhtVer);
try {
- for (T2<K, byte[]> key : keyBytes)
- req.addNearKey(key.get1(), key.get2(), ctx.shared());
+ for (KeyCacheObject key : keyBytes)
+ req.addNearKey(key, ctx.shared());
- req.completedVersions(committed, rolledback);
-
ctx.io().send(n, req, ctx.ioPolicy());
}
catch (ClusterTopologyCheckedException ignore) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/905a139e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/905a139e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
index 60211e5,d20a7c3..0e9bdff
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
@@@ -17,12 -17,17 +17,18 @@@
package org.apache.ignite.internal.processors.cache.distributed.dht;
+ import org.apache.ignite.internal.*;
+ import org.apache.ignite.internal.processors.affinity.*;
+ import org.apache.ignite.internal.managers.communication.*;
import org.apache.ignite.internal.processors.cache.distributed.*;
-import org.apache.ignite.internal.processors.cache.transactions.*;
import org.apache.ignite.internal.processors.cache.version.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.transactions.*;
+ import org.apache.ignite.internal.util.tostring.*;
import org.apache.ignite.internal.util.typedef.internal.*;
+ import org.apache.ignite.lang.*;
import org.apache.ignite.plugin.extensions.communication.*;
+ import org.apache.ignite.transactions.*;
import org.jetbrains.annotations.*;
import java.io.*;
@@@ -49,10 -54,12 +55,10 @@@ public class GridDhtTxFinishRequest ext
private boolean sysInvalidate;
/** Topology version. */
- private long topVer;
+ private AffinityTopologyVersion topVer;
- /** Pending versions with order less than one for this message (needed for commit ordering). */
- @GridToStringInclude
- @GridDirectCollection(GridCacheVersion.class)
- private Collection<GridCacheVersion> pendingVers;
+ /** Check comitted flag. */
+ private boolean checkCommitted;
/** One phase commit write version. */
private GridCacheVersion writeVer;
@@@ -108,7 -125,9 +115,19 @@@
@Nullable UUID subjId,
int taskNameHash
) {
- super(xidVer, futId, commitVer, threadId, commit, invalidate, sys, syncCommit, syncRollback, txSize);
- super(xidVer, futId, commitVer, threadId, commit, invalidate, sys, plc, syncCommit, syncRollback, baseVer,
- committedVers, rolledbackVers, txSize, grpLockKey);
++ super(
++ xidVer,
++ futId,
++ commitVer,
++ threadId,
++ commit,
++ invalidate,
++ sys,
++ plc,
++ syncCommit,
++ syncRollback,
++ txSize
++ );
assert miniId != null;
assert nearNodeId != null;
@@@ -185,24 -205,20 +204,31 @@@
}
/**
- * @return Topology version.
+ * @return Check committed flag.
*/
+ @Override public AffinityTopologyVersion topologyVersion() {
+ return topVer;
+ }
+
+ /**
- * Gets versions of not acquired locks with version less then one of transaction being committed.
- *
- * @return Versions of locks for entries participating in transaction that have not been acquired yet
- * have version less then one of transaction being committed.
++ * @return Check committed flag.
++ */
+ public boolean checkCommitted() {
+ return checkCommitted;
+ }
+
+ /**
+ * @param checkCommitted Check committed flag.
*/
- public Collection<GridCacheVersion> pendingVersions() {
- return pendingVers == null ? Collections.<GridCacheVersion>emptyList() : pendingVers;
+ public void checkCommitted(boolean checkCommitted) {
+ this.checkCommitted = checkCommitted;
+ }
+
+ /**
+ * @return Topology version.
+ */
+ @Override public long topologyVersion() {
+ return topVer;
}
/** {@inheritDoc} */
@@@ -249,7 -259,13 +269,7 @@@
writer.incrementState();
- case 19:
- case 23:
- if (!writer.writeCollection("pendingVers", pendingVers, MessageCollectionItemType.MSG))
- return false;
-
- writer.incrementState();
-
+ case 24:
if (!writer.writeUuid("subjId", subjId))
return false;
@@@ -331,7 -339,15 +343,7 @@@
reader.incrementState();
- case 19:
- case 23:
- pendingVers = reader.readCollection("pendingVers", MessageCollectionItemType.MSG);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
+ case 24:
subjId = reader.readUuid("subjId");
if (!reader.isLastRead())
http://git-wip-us.apache.org/repos/asf/ignite/blob/905a139e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/905a139e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/905a139e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
index c9be37a,d76a730..cc15429
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
@@@ -80,9 -87,11 +81,9 @@@ public abstract class GridDhtTxLocalAda
* @param isolation Isolation.
* @param timeout Timeout.
* @param txSize Expected transaction size.
- * @param grpLockKey Group lock key if this is a group-lock transaction.
- * @param partLock If this is a group-lock transaction and the whole partition should be locked.
*/
protected GridDhtTxLocalAdapter(
- GridCacheSharedContext<K, V> cctx,
+ GridCacheSharedContext cctx,
GridCacheVersion xidVer,
boolean implicit,
boolean implicitSingle,
@@@ -97,8 -106,10 +99,23 @@@
@Nullable UUID subjId,
int taskNameHash
) {
- super(cctx, xidVer, implicit, implicitSingle, sys, concurrency, isolation, timeout, invalidate, storeEnabled,
- onePhaseCommit, txSize, subjId, taskNameHash);
- super(cctx, xidVer, implicit, implicitSingle, sys, plc, concurrency, isolation, timeout, invalidate,
- storeEnabled, txSize, grpLockKey, partLock, subjId, taskNameHash);
++ super(
++ cctx,
++ xidVer,
++ implicit,
++ implicitSingle,
++ sys,
++ plc,
++ concurrency,
++ isolation,
++ timeout,
++ invalidate,
++ storeEnabled,
++ onePhaseCommit,
++ txSize,
++ subjId,
++ taskNameHash
++ );
assert cctx != null;
@@@ -478,11 -502,13 +481,12 @@@
* @return Lock future.
*/
@SuppressWarnings("ForLoopReplaceableByForEach")
- IgniteInternalFuture<GridCacheReturn<V>> lockAllAsync(
- GridCacheContext<K, V> cacheCtx,
- List<GridCacheEntryEx<K, V>> entries,
+ IgniteInternalFuture<GridCacheReturn> lockAllAsync(
+ GridCacheContext cacheCtx,
+ List<GridCacheEntryEx> entries,
- boolean onePhaseCommit,
long msgId,
final boolean read,
+ final boolean needRetVal,
long accessTtl
) {
try {
http://git-wip-us.apache.org/repos/asf/ignite/blob/905a139e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 33ee64c,db2742e..1b097b6
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@@ -294,10 -292,10 +292,10 @@@ public final class GridDhtTxPrepareFutu
boolean hasFilters = !F.isEmptyOrNulls(txEntry.filters()) && !F.isAlwaysTrue(txEntry.filters());
- if (hasFilters || retVal || txEntry.op() == GridCacheOperation.DELETE) {
+ if (hasFilters || retVal || txEntry.op() == DELETE || txEntry.op() == TRANSFORM) {
cached.unswap(true, retVal);
- V val = cached.innerGet(
+ CacheObject val = cached.innerGet(
tx,
/*swap*/true,
/*read through*/(retVal || hasFilters) && cacheCtx.config().isLoadPreviousValue(),
@@@ -311,9 -309,11 +309,11 @@@
null,
null);
- if (retVal) {
+ if (retVal || txEntry.op() == TRANSFORM) {
if (!F.isEmpty(txEntry.entryProcessors())) {
- K key = txEntry.key();
+ invoke = true;
+
+ KeyCacheObject key = txEntry.key();
Object procRes = null;
Exception err = null;
@@@ -335,11 -336,8 +336,10 @@@
}
}
+ txEntry.entryProcessorCalculatedValue(val);
+
if (err != null || procRes != null)
- ret.addEntryProcessResult(key,
- err == null ? new CacheInvokeResult<>(procRes) : new CacheInvokeResult<>(err));
+ ret.addEntryProcessResult(txEntry.context(), key, null, procRes, err);
else
ret.invokeResult(true);
}
@@@ -413,24 -411,27 +413,24 @@@
if (log.isDebugEnabled())
log.debug("Marking all local candidates as ready: " + this);
- Iterable<IgniteTxEntry> checkEntries = tx.groupLock() ?
- Collections.singletonList(tx.groupLockEntry()) : writes;
+ Iterable<IgniteTxEntry<K, V>> checkEntries = writes;
- for (IgniteTxEntry<K, V> txEntry : checkEntries) {
- GridCacheContext<K, V> cacheCtx = txEntry.context();
+ for (IgniteTxEntry txEntry : checkEntries) {
+ GridCacheContext cacheCtx = txEntry.context();
if (cacheCtx.isLocal())
continue;
- GridDistributedCacheEntry<K, V> entry = (GridDistributedCacheEntry<K, V>)txEntry.cached();
+ GridDistributedCacheEntry entry = (GridDistributedCacheEntry)txEntry.cached();
if (entry == null) {
- entry = (GridDistributedCacheEntry<K, V>)cacheCtx.cache().entryEx(txEntry.key());
+ entry = (GridDistributedCacheEntry)cacheCtx.cache().entryEx(txEntry.key());
- txEntry.cached(entry, txEntry.keyBytes());
+ txEntry.cached(entry);
}
- if (tx.optimistic() && txEntry.explicitVersion() == null) {
- if (!tx.groupLock() || tx.groupLockKey().equals(entry.txKey()))
- lockKeys.add(txEntry.txKey());
- }
+ if (tx.optimistic() && txEntry.explicitVersion() == null)
+ lockKeys.add(txEntry.txKey());
while (true) {
try {
@@@ -561,15 -560,9 +563,15 @@@
/**
* @throws IgniteCheckedException If failed to send response.
*/
- private void sendPrepareResponse(GridNearTxPrepareResponse<K, V> res) throws IgniteCheckedException {
+ private void sendPrepareResponse(GridNearTxPrepareResponse res) throws IgniteCheckedException {
- if (!tx.nearNodeId().equals(cctx.localNodeId()))
+ if (!tx.nearNodeId().equals(cctx.localNodeId())) {
+ Throwable err = this.err.get();
+
+ if (err != null && err instanceof IgniteFutureCancelledException)
+ return;
+
cctx.io().send(tx.nearNodeId(), res, tx.ioPolicy());
+ }
else {
assert completeCb != null;
@@@ -605,39 -604,15 +607,39 @@@
}
/**
+ * Checks if transaction involves a near-enabled cache on originating node.
+ *
+ * @return {@code True} if originating node has a near cache enabled and that cache participates in
+ * the transaction.
+ */
+ private boolean originatingNodeHasNearCache() {
+ ClusterNode node = cctx.discovery().node(tx.originatingNodeId());
+
+ if (node == null)
+ return false;
+
+ GridCacheAttributes[] attrs = node.attribute(IgniteNodeAttributes.ATTR_CACHE);
+
+ for (GridCacheAttributes attr : attrs) {
+ if (attr.nearCacheEnabled()) {
+ if (tx.activeCacheIds().contains(CU.cacheId(attr.cacheName())))
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ /**
* @param res Response being sent.
*/
- private void addDhtValues(GridNearTxPrepareResponse<K, V> res) {
+ private void addDhtValues(GridNearTxPrepareResponse res) {
// Interceptor on near node needs old values to execute callbacks.
if (!F.isEmpty(writes)) {
- for (IgniteTxEntry<K, V> e : writes) {
- IgniteTxEntry<K, V> txEntry = tx.entry(e.txKey());
+ for (IgniteTxEntry e : writes) {
+ IgniteTxEntry txEntry = tx.entry(e.txKey());
- GridCacheContext<K, V> cacheCtx = txEntry.context();
+ GridCacheContext cacheCtx = txEntry.context();
assert txEntry != null : "Missing tx entry for key [tx=" + tx + ", key=" + e.txKey() + ']';
@@@ -790,30 -741,31 +768,33 @@@
return;
try {
+ // We are holding transaction-level locks for entries here, so we can get next write version.
onEntriesLocked();
+ tx.writeVersion(cctx.versions().next(tx.topologyVersion()));
+
{
- Map<UUID, GridDistributedTxMapping<K, V>> futDhtMap = new HashMap<>();
- Map<UUID, GridDistributedTxMapping<K, V>> futNearMap = new HashMap<>();
+ Map<UUID, GridDistributedTxMapping> futDhtMap = new HashMap<>();
+ Map<UUID, GridDistributedTxMapping> futNearMap = new HashMap<>();
- boolean hasRemoteNodes = false;
-
// Assign keys to primary nodes.
if (!F.isEmpty(writes)) {
- for (IgniteTxEntry<K, V> write : writes)
+ for (IgniteTxEntry write : writes)
- hasRemoteNodes |= map(tx.entry(write.txKey()), futDhtMap, futNearMap);
+ map(tx.entry(write.txKey()), futDhtMap, futNearMap);
}
if (!F.isEmpty(reads)) {
- for (IgniteTxEntry<K, V> read : reads)
+ for (IgniteTxEntry read : reads)
- hasRemoteNodes |= map(tx.entry(read.txKey()), futDhtMap, futNearMap);
+ map(tx.entry(read.txKey()), futDhtMap, futNearMap);
}
-
- tx.needsCompletedVersions(hasRemoteNodes);
}
+ // We are holding transaction-level locks for entries here, so we can get next write version.
+ if (tx.onePhaseCommit())
+ tx.writeVersion(tx.nearXidVersion());
+ else
+ tx.writeVersion(cctx.versions().next(tx.topologyVersion()));
+
if (isDone())
return;
@@@ -866,11 -820,11 +847,11 @@@
GridCacheContext<K, V> cacheCtx = cached.context();
if (entry.explicitVersion() == null) {
- GridCacheMvccCandidate<K> added = cached.candidate(version());
+ GridCacheMvccCandidate added = cached.candidate(version());
- assert added != null || entry.groupLockEntry() : "Null candidate for non-group-lock entry " +
+ assert added != null : "Null candidate for non-group-lock entry " +
"[added=" + added + ", entry=" + entry + ']';
- assert added == null || added.dhtLocal() : "Got non-dht-local candidate for prepare future" +
+ assert added.dhtLocal() : "Got non-dht-local candidate for prepare future" +
"[added=" + added + ", entry=" + entry + ']';
if (added != null && added.ownerVersion() != null)
@@@ -954,13 -910,13 +935,13 @@@
tx.subjectId(),
tx.taskNameHash());
- for (IgniteTxEntry<K, V> entry : nearMapping.writes()) {
+ for (IgniteTxEntry entry : nearMapping.writes()) {
try {
- GridCacheMvccCandidate<K> added = entry.cached().candidate(version());
+ GridCacheMvccCandidate added = entry.cached().candidate(version());
- assert added != null || entry.groupLockEntry() : "Null candidate for non-group-lock entry " +
+ assert added != null : "Null candidate for non-group-lock entry " +
"[added=" + added + ", entry=" + entry + ']';
- assert added == null || added.dhtLocal() : "Got non-dht-local candidate for prepare future" +
+ assert added.dhtLocal() : "Got non-dht-local candidate for prepare future" +
"[added=" + added + ", entry=" + entry + ']';
if (added != null && added.ownerVersion() != null)
@@@ -1001,10 -957,9 +982,10 @@@
* @return {@code True} if mapped.
*/
private boolean map(
- IgniteTxEntry<K, V> entry,
- Map<UUID, GridDistributedTxMapping<K, V>> futDhtMap,
- Map<UUID, GridDistributedTxMapping<K, V>> futNearMap
+ IgniteTxEntry entry,
+ Map<UUID, GridDistributedTxMapping> futDhtMap,
- Map<UUID, GridDistributedTxMapping> futNearMap) {
++ Map<UUID, GridDistributedTxMapping> futNearMap
+ ) {
if (entry.cached().isLocal())
return false;
@@@ -1071,33 -1026,16 +1052,33 @@@
* @param locMap Exclude map.
* @return {@code True} if mapped.
*/
- private boolean map(IgniteTxEntry entry, Iterable<ClusterNode> nodes,
- Map<UUID, GridDistributedTxMapping> globalMap, Map<UUID, GridDistributedTxMapping> locMap) {
+ private boolean map(
- IgniteTxEntry<K, V> entry,
++ IgniteTxEntry entry,
+ Iterable<ClusterNode> nodes,
- Map<UUID, GridDistributedTxMapping<K, V>> globalMap,
- Map<UUID, GridDistributedTxMapping<K, V>> locMap
++ Map<UUID, GridDistributedTxMapping> globalMap,
++ Map<UUID, GridDistributedTxMapping> locMap
+ ) {
boolean ret = false;
if (nodes != null) {
for (ClusterNode n : nodes) {
- GridDistributedTxMapping<K, V> global = globalMap.get(n.id());
+ GridDistributedTxMapping global = globalMap.get(n.id());
+ if (!F.isEmpty(entry.entryProcessors())) {
+ GridDhtPartitionState state = entry.context().topology().partitionState(n.id(),
+ entry.cached().partition());
+
+ if (state != GridDhtPartitionState.OWNING && state != GridDhtPartitionState.EVICTED) {
+ V procVal = entry.entryProcessorCalculatedValue();
+
+ entry.op(procVal == null ? DELETE : UPDATE);
+ entry.value(procVal, true, false);
+ entry.entryProcessors(null);
+ }
+ }
+
if (global == null)
- globalMap.put(n.id(), global = new GridDistributedTxMapping<>(n));
+ globalMap.put(n.id(), global = new GridDistributedTxMapping(n));
global.add(entry);
@@@ -1115,9 -1053,35 +1096,9 @@@
return ret;
}
- /**
- * Collects versions of pending candidates versions less than base.
- *
- * @param entries Tx entries to process.
- * @param baseVer Base version.
- * @return Collection of pending candidates versions.
- */
- private Collection<GridCacheVersion> localDhtPendingVersions(Iterable<IgniteTxEntry> entries,
- GridCacheVersion baseVer) {
- Collection<GridCacheVersion> lessPending = new GridLeanSet<>(5);
-
- for (IgniteTxEntry entry : entries) {
- try {
- for (GridCacheMvccCandidate cand : entry.cached().localCandidates()) {
- if (cand.version().isLess(baseVer))
- lessPending.add(cand.version());
- }
- }
- catch (GridCacheEntryRemovedException ignored) {
- // No-op, no candidates.
- }
- }
-
- return lessPending;
- }
-
/** {@inheritDoc} */
@Override public String toString() {
- return S.toString(GridDhtTxPrepareFuture.class, this, "super", super.toString());
+ return S.toString(GridDhtTxPrepareFuture.class, this, "xid", tx.xidVersion(), "super", super.toString());
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/905a139e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
index 795764f,c033273..c5db487
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
@@@ -110,10 -114,12 +112,10 @@@ public class GridDhtTxPrepareRequest ex
public GridDhtTxPrepareRequest(
IgniteUuid futId,
IgniteUuid miniId,
- long topVer,
- GridDhtTxLocalAdapter<K, V> tx,
- Collection<IgniteTxEntry<K, V>> dhtWrites,
- Collection<IgniteTxEntry<K, V>> nearWrites,
+ AffinityTopologyVersion topVer,
+ GridDhtTxLocalAdapter tx,
+ Collection<IgniteTxEntry> dhtWrites,
+ Collection<IgniteTxEntry> nearWrites,
- IgniteTxKey grpLockKey,
- boolean partLock,
Map<UUID, Collection<UUID>> txNodes,
GridCacheVersion nearXidVer,
boolean last,
http://git-wip-us.apache.org/repos/asf/ignite/blob/905a139e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/905a139e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
index ed37ae4,ffbc0a2..9441b3a
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
@@@ -98,8 -103,8 +101,22 @@@ public class GridDhtTxRemote extends Gr
@Nullable UUID subjId,
int taskNameHash
) {
- super(ctx, nodeId, rmtThreadId, xidVer, commitVer, sys, concurrency, isolation, invalidate, timeout, txSize,
- subjId, taskNameHash);
- super(ctx, nodeId, rmtThreadId, xidVer, commitVer, sys, plc, concurrency, isolation, invalidate, timeout,
- txSize, grpLockKey, subjId, taskNameHash);
++ super(
++ ctx,
++ nodeId,
++ rmtThreadId,
++ xidVer,
++ commitVer,
++ sys,
++ plc,
++ concurrency,
++ isolation,
++ invalidate,
++ timeout,
++ txSize,
++ subjId,
++ taskNameHash
++ );
assert nearNodeId != null;
assert rmtFutId != null;
@@@ -134,9 -139,10 +151,9 @@@
* @param timeout Timeout.
* @param ctx Cache context.
* @param txSize Expected transaction size.
- * @param grpLockKey Group lock key if transaction is group-lock.
*/
public GridDhtTxRemote(
- GridCacheSharedContext<K, V> ctx,
+ GridCacheSharedContext ctx,
UUID nearNodeId,
IgniteUuid rmtFutId,
UUID nodeId,
@@@ -154,8 -161,9 +172,22 @@@
@Nullable UUID subjId,
int taskNameHash
) {
- super(ctx, nodeId, rmtThreadId, xidVer, commitVer, sys, concurrency, isolation, invalidate, timeout, txSize,
- subjId, taskNameHash);
- super(ctx, nodeId, rmtThreadId, xidVer, commitVer, sys, plc, concurrency, isolation, invalidate, timeout,
- txSize, grpLockKey, subjId, taskNameHash);
++ super(
++ ctx,
++ nodeId,
++ rmtThreadId,
++ xidVer,
++ commitVer,
++ sys,
++ plc,
++ concurrency,
++ isolation,
++ invalidate,
++ timeout,
++ txSize,
++ subjId,
++ taskNameHash
++ );
assert nearNodeId != null;
assert rmtFutId != null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/905a139e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtUnlockRequest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/905a139e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index a17d011,cdf1638..49c1fb6
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@@ -557,12 -575,17 +575,12 @@@ public class GridDhtColocatedCache<K, V
if (map == null || map.isEmpty())
return;
- for (Map.Entry<ClusterNode, GridNearUnlockRequest<K, V>> mapping : map.entrySet()) {
- Collection<GridCacheVersion> committed = ctx.tm().committedVersions(ver);
- Collection<GridCacheVersion> rolledback = ctx.tm().rolledbackVersions(ver);
-
+ for (Map.Entry<ClusterNode, GridNearUnlockRequest> mapping : map.entrySet()) {
ClusterNode n = mapping.getKey();
- GridDistributedUnlockRequest<K, V> req = mapping.getValue();
+ GridDistributedUnlockRequest req = mapping.getValue();
- if (!F.isEmpty(req.keyBytes()) || !F.isEmpty(req.keys())) {
+ if (!F.isEmpty(req.keys())) {
- req.completedVersions(committed, rolledback);
-
// We don't wait for reply to this message.
ctx.io().send(n, req, ctx.ioPolicy());
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/905a139e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index bd51b8c,3087dff..a0ce0eb
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@@ -1029,8 -1028,12 +1026,8 @@@ public final class GridDhtColocatedLock
// If primary node left the grid before lock acquisition, fail the whole future.
throw newTopologyException(null, primary.id());
- if (inTx() && tx.groupLock() && !primary.isLocal())
- throw new IgniteCheckedException("Failed to start group lock transaction (local node is not primary for " +
- " key) [key=" + key + ", primaryNodeId=" + primary.id() + ']');
-
if (mapping == null || !primary.id().equals(mapping.node().id()))
- mapping = new GridNearLockMapping<>(primary, key);
+ mapping = new GridNearLockMapping(primary, key);
else
mapping.addKey(key);
http://git-wip-us.apache.org/repos/asf/ignite/blob/905a139e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
index bdb52d1,c3d138e..ce614cb
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
@@@ -1028,9 -1017,10 +1015,9 @@@ public final class GridNearLockFuture<K
// Lock is held at this point, so we can set the
// returned value if any.
- entry.resetFromPrimary(newVal, newBytes, lockVer, dhtVer, node.id());
+ entry.resetFromPrimary(newVal, lockVer, dhtVer, node.id());
- entry.readyNearLock(lockVer, mappedVer, res.committedVersions(),
- res.rolledbackVersions(), res.pending());
+ entry.readyNearLock(lockVer, mappedVer);
if (inTx() && implicitTx() && tx.onePhaseCommit()) {
boolean pass = res.filterResult(i);
@@@ -1157,8 -1150,12 +1147,8 @@@
// If primary node left the grid before lock acquisition, fail the whole future.
throw newTopologyException(null, primary.id());
- if (inTx() && tx.groupLock() && !primary.isLocal())
- throw new IgniteCheckedException("Failed to start group lock transaction (local node is not primary for " +
- " key) [key=" + key + ", primaryNodeId=" + primary.id() + ']');
-
if (mapping == null || !primary.id().equals(mapping.node().id()))
- mapping = new GridNearLockMapping<>(primary, key);
+ mapping = new GridNearLockMapping(primary, key);
else
mapping.addKey(key);
@@@ -1390,10 -1374,11 +1367,10 @@@
if (inTx() && implicitTx() && tx.onePhaseCommit()) {
boolean pass = res.filterResult(i);
- tx.entry(cctx.txKey(k)).filters(pass ? CU.<K, V>empty() : CU.<K, V>alwaysFalse());
+ tx.entry(cctx.txKey(k)).filters(pass ? CU.empty0() : CU.alwaysFalse0Arr());
}
- entry.readyNearLock(lockVer, mappedVer, res.committedVersions(), res.rolledbackVersions(),
- res.pending());
+ entry.readyNearLock(lockVer, mappedVer);
if (retval) {
if (readRecordable)
http://git-wip-us.apache.org/repos/asf/ignite/blob/905a139e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
index 6bc286e,696cea4..c86e773
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
@@@ -18,9 -18,10 +18,9 @@@
package org.apache.ignite.internal.processors.cache.distributed.near;
import org.apache.ignite.*;
- import org.apache.ignite.internal.*;
+ import org.apache.ignite.internal.processors.affinity.*;
import org.apache.ignite.internal.processors.cache.*;
import org.apache.ignite.internal.processors.cache.distributed.*;
-import org.apache.ignite.internal.processors.cache.transactions.*;
import org.apache.ignite.internal.processors.cache.version.*;
import org.apache.ignite.internal.util.tostring.*;
import org.apache.ignite.internal.util.typedef.internal.*;
@@@ -141,9 -148,11 +140,9 @@@ public class GridNearLockRequest extend
isInvalidate,
timeout,
keyCnt,
- txSize,
- grpLockKey,
- partLock);
+ txSize);
- assert topVer > 0;
+ assert topVer.compareTo(AffinityTopologyVersion.ZERO) > 0;
this.topVer = topVer;
this.implicitTx = implicitTx;
@@@ -410,8 -463,8 +439,8 @@@
reader.incrementState();
- case 24:
+ case 20:
- filterBytes = reader.readObjectArray("filterBytes", MessageCollectionItemType.BYTE_ARR, byte[].class);
+ filter = reader.readObjectArray("filter", MessageCollectionItemType.MSG, CacheEntryPredicate.class);
if (!reader.isLastRead())
return false;
@@@ -474,8 -543,8 +503,8 @@@
reader.incrementState();
- case 34:
+ case 28:
- topVer = reader.readLong("topVer");
+ topVer = reader.readMessage("topVer");
if (!reader.isLastRead())
return false;
http://git-wip-us.apache.org/repos/asf/ignite/blob/905a139e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockResponse.java
----------------------------------------------------------------------