You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/03/10 13:55:25 UTC
[1/8] ignite git commit: ignite-4768
Repository: ignite
Updated Branches:
refs/heads/ignite-4768-1 4b4409189 -> 26316bb06
ignite-4768
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/784b171d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/784b171d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/784b171d
Branch: refs/heads/ignite-4768-1
Commit: 784b171deefbc573e7ec6b59128a274acc307946
Parents: 35dad8f
Author: sboikov <sb...@gridgain.com>
Authored: Thu Mar 9 15:45:55 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Mar 9 15:49:52 2017 +0300
----------------------------------------------------------------------
.../cache/distributed/GridDistributedTxPrepareRequest.java | 9 ---------
.../cache/distributed/dht/GridDhtTxPrepareFuture.java | 2 --
.../cache/distributed/dht/GridDhtTxPrepareRequest.java | 2 --
.../cache/distributed/near/GridNearLockFuture.java | 2 ++
.../near/GridNearOptimisticSerializableTxPrepareFuture.java | 1 -
.../distributed/near/GridNearOptimisticTxPrepareFuture.java | 1 -
.../near/GridNearPessimisticTxPrepareFuture.java | 9 ---------
.../cache/distributed/near/GridNearTxPrepareRequest.java | 2 --
8 files changed, 2 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/784b171d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
index e30c456..329dc8b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
@@ -74,9 +74,6 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
/** */
private static final int SYSTEM_TX_FLAG_MASK = 0x10;
- /** */
- private static final int MAPPING_KNOWN_FLAG_MASK = 0x20;
-
/** Collection to message converter. */
private static final C1<Collection<UUID>, UUIDCollectionMessage> COL_TO_MSG = new C1<Collection<UUID>, UUIDCollectionMessage>() {
@Override public UUIDCollectionMessage apply(Collection<UUID> uuids) {
@@ -177,7 +174,6 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
@Nullable Collection<IgniteTxEntry> reads,
Collection<IgniteTxEntry> writes,
Map<UUID, Collection<UUID>> txNodes,
- boolean mappingKnown,
boolean retVal,
boolean last,
boolean onePhaseCommit,
@@ -202,11 +198,6 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
setFlag(tx.isInvalidate(), INVALIDATE_FLAG_MASK);
setFlag(onePhaseCommit, ONE_PHASE_COMMIT_FLAG_MASK);
setFlag(last, LAST_REQ_FLAG_MASK);
- setFlag(mappingKnown, MAPPING_KNOWN_FLAG_MASK);
- }
-
- public final boolean mappingKnown() {
- return isFlag(MAPPING_KNOWN_FLAG_MASK);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/784b171d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 053c3b2..d093b4a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -1273,7 +1273,6 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
nearWrites,
txNodes,
tx.nearXidVersion(),
- false,
true,
tx.onePhaseCommit(),
tx.subjectId(),
@@ -1386,7 +1385,6 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
nearMapping.writes(),
tx.transactionNodes(),
tx.nearXidVersion(),
- false,
true,
tx.onePhaseCommit(),
tx.subjectId(),
http://git-wip-us.apache.org/repos/asf/ignite/blob/784b171d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
index e55d189..8c01302 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
@@ -128,7 +128,6 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
Collection<IgniteTxEntry> nearWrites,
Map<UUID, Collection<UUID>> txNodes,
GridCacheVersion nearXidVer,
- boolean mappingKnown,
boolean last,
boolean onePhaseCommit,
UUID subjId,
@@ -140,7 +139,6 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
null,
dhtWrites,
txNodes,
- mappingKnown,
retVal,
last,
onePhaseCommit,
http://git-wip-us.apache.org/repos/asf/ignite/blob/784b171d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
index 7b19884..ffc84d8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
@@ -1181,6 +1181,8 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
req.filter(filter, cctx);
if (node.isLocal()) {
+ req.miniId(-1);
+
if (log.isDebugEnabled())
log.debug("Before locally locking near request: " + req);
http://git-wip-us.apache.org/repos/asf/ignite/blob/784b171d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
index bb1609d..f8e0584 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
@@ -442,7 +442,6 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
m.writes(),
m.near(),
txMapping.transactionNodes(),
- false,
m.last(),
tx.onePhaseCommit(),
tx.needReturnValue() && tx.implicit(),
http://git-wip-us.apache.org/repos/asf/ignite/blob/784b171d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index 8bb79c1..4233002 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -500,7 +500,6 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
m.writes(),
m.near(),
txMapping.transactionNodes(),
- false,
m.last(),
tx.onePhaseCommit(),
tx.needReturnValue() && tx.implicit(),
http://git-wip-us.apache.org/repos/asf/ignite/blob/784b171d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
index fb2c2fd..ddee7b7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
@@ -184,8 +184,6 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
*
*/
private void preparePessimistic() {
- boolean mappingKnown = true;
-
Map<IgniteBiTuple<ClusterNode, Boolean>, GridDistributedTxMapping> mappings = new HashMap<>();
AffinityTopologyVersion topVer = tx.topologyVersion();
@@ -202,9 +200,6 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
if (!cacheCtx.isLocal()) {
GridDhtPartitionTopology top = cacheCtx.topology();
- if (mappingKnown && (!top.rebalanceFinished(topVer) || cctx.discovery().hasNearCache(cacheCtx.cacheId(), topVer)))
- mappingKnown = false;
-
nodes = cacheCtx.topology().nodes(cacheCtx.affinity().partition(txEntry.key()), topVer);
}
else
@@ -237,9 +232,6 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
checkOnePhase(txMapping);
- if (mappingKnown && tx.onePhaseCommit())
- mappingKnown = false;
-
long timeout = tx.remainingTime();
if (timeout == -1) {
@@ -262,7 +254,6 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
m.writes(),
m.near(),
txMapping.transactionNodes(),
- mappingKnown,
true,
tx.onePhaseCommit(),
tx.needReturnValue() && tx.implicit(),
http://git-wip-us.apache.org/repos/asf/ignite/blob/784b171d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
index 3eff9e5..cccc7b4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
@@ -107,7 +107,6 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
Collection<IgniteTxEntry> writes,
boolean near,
Map<UUID, Collection<UUID>> txNodes,
- boolean mappingKnown,
boolean last,
boolean onePhaseCommit,
boolean retVal,
@@ -123,7 +122,6 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
reads,
writes,
txNodes,
- mappingKnown,
retVal,
last,
onePhaseCommit,
[8/8] ignite git commit: ignite-4768
Posted by sb...@apache.org.
ignite-4768
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/26316bb0
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/26316bb0
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/26316bb0
Branch: refs/heads/ignite-4768-1
Commit: 26316bb06d18569676af91c3d41671ebe3346533
Parents: 7cf4b98
Author: sboikov <sb...@gridgain.com>
Authored: Fri Mar 10 13:14:39 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Mar 10 15:10:12 2017 +0300
----------------------------------------------------------------------
.../GridDistributedTxFinishRequest.java | 217 +++++++++++++----
.../GridDistributedTxFinishResponse.java | 74 +++++-
.../GridDistributedTxPrepareResponse.java | 30 ++-
.../distributed/dht/GridDhtTxFinishFuture.java | 9 +-
.../distributed/dht/GridDhtTxFinishRequest.java | 241 +++++--------------
.../dht/GridDhtTxFinishResponse.java | 79 +++---
.../cache/distributed/dht/GridDhtTxLocal.java | 9 +-
.../distributed/dht/GridDhtTxPrepareFuture.java | 8 +-
.../dht/GridDhtTxPrepareResponse.java | 38 +--
...arOptimisticSerializableTxPrepareFuture.java | 7 +-
.../near/GridNearOptimisticTxPrepareFuture.java | 4 +-
.../GridNearPessimisticTxPrepareFuture.java | 5 +-
.../near/GridNearTxFinishFuture.java | 128 ++++++----
.../near/GridNearTxFinishRequest.java | 168 ++-----------
.../near/GridNearTxFinishResponse.java | 19 +-
.../near/GridNearTxPrepareFutureAdapter.java | 4 +-
.../near/GridNearTxPrepareResponse.java | 42 ++--
.../cache/transactions/IgniteTxHandler.java | 34 +--
18 files changed, 576 insertions(+), 540 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/26316bb0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
index 3e47cc9..ab9f0ff 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
@@ -20,18 +20,24 @@ package org.apache.ignite.internal.processors.cache.distributed;
import java.io.Externalizable;
import java.nio.ByteBuffer;
import java.util.Collection;
+import java.util.UUID;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxState;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxStateAware;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.tostring.GridToStringBuilder;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
+
/**
* Transaction completion message.
*/
@@ -39,6 +45,27 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i
/** */
private static final long serialVersionUID = 0L;
+ /** */
+ protected static final int WAIT_REMOTE_TX_FLAG_MASK = 0x01;
+
+ /** */
+ protected static final int CHECK_COMMITTED_FLAG_MASK = 0x02;
+
+ /** */
+ protected static final int NEED_RETURN_VALUE_FLAG_MASK = 0x04;
+
+ /** */
+ protected static final int SYS_INVALIDATE_FLAG_MASK = 0x08;
+
+ /** */
+ protected static final int EXPLICIT_LOCK_FLAG_MASK = 0x10;
+
+ /** */
+ protected static final int STORE_ENABLED_FLAG_MASK = 0x20;
+
+ /** Topology version. */
+ private AffinityTopologyVersion topVer;
+
/** Future ID. */
private IgniteUuid futId;
@@ -54,14 +81,6 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i
/** Commit flag. */
private boolean commit;
- /** Sync commit flag. */
- @Deprecated
- private boolean syncCommit;
-
- /** Sync commit flag. */
- @Deprecated
- private boolean syncRollback;
-
/** Min version used as base for completed versions. */
private GridCacheVersion baseVer;
@@ -74,6 +93,18 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i
/** IO policy. */
private byte plc;
+ /** Subject ID. */
+ private UUID subjId;
+
+ /** Task name hash. */
+ private int taskNameHash;
+
+ /** */
+ private byte flags;
+
+ /** Write synchronization mode. */
+ private CacheWriteSynchronizationMode syncMode;
+
/** Transient TX state. */
@GridDirectTransient
private IgniteTxState txState;
@@ -94,8 +125,7 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i
* @param invalidate Invalidate flag.
* @param sys System transaction flag.
* @param plc IO policy.
- * @param syncCommit Sync commit flag.
- * @param syncRollback Sync rollback flag.
+ * @param syncMode Write synchronization mode.
* @param baseVer Base version.
* @param committedVers Committed versions.
* @param rolledbackVers Rolled back versions.
@@ -105,39 +135,93 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i
public GridDistributedTxFinishRequest(
GridCacheVersion xidVer,
IgniteUuid futId,
+ @NotNull AffinityTopologyVersion topVer,
@Nullable GridCacheVersion commitVer,
long threadId,
boolean commit,
boolean invalidate,
boolean sys,
byte plc,
- boolean syncCommit,
- boolean syncRollback,
+ CacheWriteSynchronizationMode syncMode,
GridCacheVersion baseVer,
Collection<GridCacheVersion> committedVers,
Collection<GridCacheVersion> rolledbackVers,
+ @Nullable UUID subjId,
+ int taskNameHash,
int txSize,
boolean addDepInfo
) {
super(xidVer, 0, addDepInfo);
+
assert xidVer != null;
+ assert syncMode != null;
this.futId = futId;
+ this.topVer = topVer;
this.commitVer = commitVer;
this.threadId = threadId;
this.commit = commit;
this.invalidate = invalidate;
this.sys = sys;
this.plc = plc;
- this.syncCommit = syncCommit;
- this.syncRollback = syncRollback;
+ this.syncMode = syncMode;
this.baseVer = baseVer;
+ this.subjId = subjId;
+ this.taskNameHash = taskNameHash;
this.txSize = txSize;
completedVersions(committedVers, rolledbackVers);
}
/**
+ * @return Transaction write synchronization mode (can be null is message sent from old nodes).
+ */
+ public final CacheWriteSynchronizationMode syncMode() {
+ return syncMode;
+ }
+
+ /**
+ * Sets flag mask.
+ *
+ * @param flag Set or clear.
+ * @param mask Mask.
+ */
+ protected final void setFlag(boolean flag, int mask) {
+ flags = flag ? (byte)(flags | mask) : (byte)(flags & ~mask);
+ }
+
+ /**
+ * Reags flag mask.
+ *
+ * @param mask Mask to read.
+ * @return Flag value.
+ */
+ protected final boolean isFlag(int mask) {
+ return (flags & mask) != 0;
+ }
+
+ /**
+ * @return Subject ID.
+ */
+ @Nullable public final UUID subjectId() {
+ return subjId;
+ }
+
+ /**
+ * @return Task name hash.
+ */
+ public final int taskNameHash() {
+ return taskNameHash;
+ }
+
+ /**
+ * @return Topology version.
+ */
+ @Override public final AffinityTopologyVersion topologyVersion() {
+ return topVer;
+ }
+
+ /**
* @return System transaction flag.
*/
public boolean system() {
@@ -188,27 +272,6 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i
}
/**
- * @return Sync commit flag.
- */
- public boolean syncCommit() {
- return syncCommit;
- }
-
- /**
- * @param syncCommit Sync commit flag.
- */
- public void syncCommit(boolean syncCommit) {
- this.syncCommit = syncCommit;
- }
-
- /**
- * @return Sync rollback flag.
- */
- public boolean syncRollback() {
- return syncRollback;
- }
-
- /**
* @return Base version.
*/
public GridCacheVersion baseVersion() {
@@ -227,7 +290,9 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i
* @return {@code True} if reply is required.
*/
public boolean replyRequired() {
- return commit ? syncCommit : syncRollback;
+ assert syncMode != null;
+
+ return syncMode == FULL_SYNC;
}
/** {@inheritDoc} */
@@ -279,48 +344,66 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i
writer.incrementState();
case 10:
- if (!writer.writeIgniteUuid("futId", futId))
+ if (!writer.writeByte("flags", flags))
return false;
writer.incrementState();
case 11:
- if (!writer.writeBoolean("invalidate", invalidate))
+ if (!writer.writeIgniteUuid("futId", futId))
return false;
writer.incrementState();
case 12:
- if (!writer.writeByte("plc", plc))
+ if (!writer.writeBoolean("invalidate", invalidate))
return false;
writer.incrementState();
case 13:
- if (!writer.writeBoolean("syncCommit", syncCommit))
+ if (!writer.writeByte("plc", plc))
return false;
writer.incrementState();
case 14:
- if (!writer.writeBoolean("syncRollback", syncRollback))
+ if (!writer.writeUuid("subjId", subjId))
return false;
writer.incrementState();
case 15:
- if (!writer.writeBoolean("sys", sys))
+ if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1))
return false;
writer.incrementState();
case 16:
- if (!writer.writeLong("threadId", threadId))
+ if (!writer.writeBoolean("sys", sys))
return false;
writer.incrementState();
case 17:
+ if (!writer.writeInt("taskNameHash", taskNameHash))
+ return false;
+
+ writer.incrementState();
+
+ case 18:
+ if (!writer.writeLong("threadId", threadId))
+ return false;
+
+ writer.incrementState();
+
+ case 19:
+ if (!writer.writeMessage("topVer", topVer))
+ return false;
+
+ writer.incrementState();
+
+ case 20:
if (!writer.writeInt("txSize", txSize))
return false;
@@ -367,7 +450,7 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i
reader.incrementState();
case 10:
- futId = reader.readIgniteUuid("futId");
+ flags = reader.readByte("flags");
if (!reader.isLastRead())
return false;
@@ -375,7 +458,7 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i
reader.incrementState();
case 11:
- invalidate = reader.readBoolean("invalidate");
+ futId = reader.readIgniteUuid("futId");
if (!reader.isLastRead())
return false;
@@ -383,7 +466,7 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i
reader.incrementState();
case 12:
- plc = reader.readByte("plc");
+ invalidate = reader.readBoolean("invalidate");
if (!reader.isLastRead())
return false;
@@ -391,7 +474,7 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i
reader.incrementState();
case 13:
- syncCommit = reader.readBoolean("syncCommit");
+ plc = reader.readByte("plc");
if (!reader.isLastRead())
return false;
@@ -399,7 +482,7 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i
reader.incrementState();
case 14:
- syncRollback = reader.readBoolean("syncRollback");
+ subjId = reader.readUuid("subjId");
if (!reader.isLastRead())
return false;
@@ -407,15 +490,19 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i
reader.incrementState();
case 15:
- sys = reader.readBoolean("sys");
+ byte syncModeOrd;
+
+ syncModeOrd = reader.readByte("syncMode");
if (!reader.isLastRead())
return false;
+ syncMode = CacheWriteSynchronizationMode.fromOrdinal(syncModeOrd);
+
reader.incrementState();
case 16:
- threadId = reader.readLong("threadId");
+ sys = reader.readBoolean("sys");
if (!reader.isLastRead())
return false;
@@ -423,6 +510,30 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i
reader.incrementState();
case 17:
+ taskNameHash = reader.readInt("taskNameHash");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 18:
+ threadId = reader.readLong("threadId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 19:
+ topVer = reader.readMessage("topVer");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 20:
txSize = reader.readInt("txSize");
if (!reader.isLastRead())
@@ -442,7 +553,7 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 18;
+ return 21;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/26316bb0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishResponse.java
index 109d665..1f61033 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishResponse.java
@@ -20,10 +20,12 @@ package org.apache.ignite.internal.processors.cache.distributed;
import java.io.Externalizable;
import java.nio.ByteBuffer;
import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
import org.apache.ignite.internal.processors.cache.GridCacheMessage;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.tostring.GridToStringBuilder;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
@@ -41,25 +43,59 @@ public class GridDistributedTxFinishResponse extends GridCacheMessage {
/** Future ID. */
private IgniteUuid futId;
+ /** */
+ @GridToStringExclude
+ private byte flags;
+
+ /** */
+ private int part;
+
/**
- * Empty constructor required by {@link Externalizable}.
+ * Empty constructor required by {@link GridIoMessageFactory}.
*/
public GridDistributedTxFinishResponse() {
/* No-op. */
}
/**
+ * @param part Partition.
* @param txId Transaction id.
* @param futId Future ID.
*/
- public GridDistributedTxFinishResponse(GridCacheVersion txId, IgniteUuid futId) {
+ public GridDistributedTxFinishResponse(int part, GridCacheVersion txId, IgniteUuid futId) {
assert txId != null;
assert futId != null;
+ this.part = part;
this.txId = txId;
this.futId = futId;
}
+ /** {@inheritDoc} */
+ @Override public final int partition() {
+ return part;
+ }
+
+ /**
+ * Sets flag mask.
+ *
+ * @param flag Set or clear.
+ * @param mask Mask.
+ */
+ protected final void setFlag(boolean flag, int mask) {
+ flags = flag ? (byte)(flags | mask) : (byte)(flags & ~mask);
+ }
+
+ /**
+ * Reags flag mask.
+ *
+ * @param mask Mask to read.
+ * @return Flag value.
+ */
+ protected final boolean isFlag(int mask) {
+ return (flags & mask) != 0;
+ }
+
/**
*
* @return Transaction id.
@@ -101,12 +137,24 @@ public class GridDistributedTxFinishResponse extends GridCacheMessage {
switch (writer.state()) {
case 3:
- if (!writer.writeIgniteUuid("futId", futId))
+ if (!writer.writeByte("flags", flags))
return false;
writer.incrementState();
case 4:
+ if (!writer.writeIgniteUuid("futId", futId))
+ return false;
+
+ writer.incrementState();
+
+ case 5:
+ if (!writer.writeInt("part", part))
+ return false;
+
+ writer.incrementState();
+
+ case 6:
if (!writer.writeMessage("txId", txId))
return false;
@@ -129,7 +177,7 @@ public class GridDistributedTxFinishResponse extends GridCacheMessage {
switch (reader.state()) {
case 3:
- futId = reader.readIgniteUuid("futId");
+ flags = reader.readByte("flags");
if (!reader.isLastRead())
return false;
@@ -137,6 +185,22 @@ public class GridDistributedTxFinishResponse extends GridCacheMessage {
reader.incrementState();
case 4:
+ futId = reader.readIgniteUuid("futId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 5:
+ part = reader.readInt("part");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 6:
txId = reader.readMessage("txId");
if (!reader.isLastRead())
@@ -156,7 +220,7 @@ public class GridDistributedTxFinishResponse extends GridCacheMessage {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 5;
+ return 7;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/26316bb0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java
index c19b8c1..53a1391 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java
@@ -184,6 +184,18 @@ public class GridDistributedTxPrepareResponse extends GridDistributedBaseMessage
writer.incrementState();
+ case 8:
+ if (!writer.writeByte("flags", flags))
+ return false;
+
+ writer.incrementState();
+
+ case 9:
+ if (!writer.writeInt("part", part))
+ return false;
+
+ writer.incrementState();
+
}
return true;
@@ -208,6 +220,22 @@ public class GridDistributedTxPrepareResponse extends GridDistributedBaseMessage
reader.incrementState();
+ case 8:
+ flags = reader.readByte("flags");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 9:
+ part = reader.readInt("part");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
}
return reader.afterMessageRead(GridDistributedTxPrepareResponse.class);
@@ -220,7 +248,7 @@ public class GridDistributedTxPrepareResponse extends GridDistributedBaseMessage
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 8;
+ return 10;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/26316bb0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
index 5d4e610..7011996 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
@@ -325,8 +325,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
tx.system(),
tx.ioPolicy(),
tx.isSystemInvalidate(),
- sync,
- sync,
+ sync ? FULL_SYNC : tx.syncMode(),
tx.completedBase(),
tx.committedVersions(),
tx.rolledbackVersions(),
@@ -426,8 +425,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
tx.system(),
tx.ioPolicy(),
tx.isSystemInvalidate(),
- sync,
- sync,
+ sync ? FULL_SYNC : tx.syncMode(),
tx.completedBase(),
tx.committedVersions(),
tx.rolledbackVersions(),
@@ -497,8 +495,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
tx.system(),
tx.ioPolicy(),
tx.isSystemInvalidate(),
- sync,
- sync,
+ sync ? FULL_SYNC : tx.syncMode(),
tx.completedBase(),
tx.committedVersions(),
tx.rolledbackVersions(),
http://git-wip-us.apache.org/repos/asf/ignite/blob/26316bb0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
index c618a18..7194848 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
@@ -21,6 +21,7 @@ import java.io.Externalizable;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.UUID;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.internal.GridDirectCollection;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxFinishRequest;
@@ -43,12 +44,6 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
/** */
private static final long serialVersionUID = 0L;
- /** */
- public static final int WAIT_REMOTE_TX_FLAG_MASK = 0x01;
-
- /** */
- public static final int NEED_RETURN_VALUE_FLAG_MASK = 0x02;
-
/** Near node ID. */
private UUID nearNodeId;
@@ -58,20 +53,11 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
/** Mini future ID. */
private IgniteUuid miniId;
- /** System invalidation flag. */
- private boolean sysInvalidate;
-
- /** Topology version. */
- private AffinityTopologyVersion topVer;
-
/** Pending versions with order less than one for this message (needed for commit ordering). */
@GridToStringInclude
@GridDirectCollection(GridCacheVersion.class)
private Collection<GridCacheVersion> pendingVers;
- /** Check committed flag. */
- private boolean checkCommitted;
-
/** Partition update counter. */
@GridToStringInclude
@GridDirectCollection(Long.class)
@@ -80,15 +66,6 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
/** One phase commit write version. */
private GridCacheVersion writeVer;
- /** Subject ID. */
- private UUID subjId;
-
- /** Task name hash. */
- private int taskNameHash;
-
- /** */
- private byte flags;
-
/**
* Empty constructor required for {@link Externalizable}.
*/
@@ -110,8 +87,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
* @param sys System flag.
* @param plc IO policy.
* @param sysInvalidate System invalidation flag.
- * @param syncCommit Synchronous commit flag.
- * @param syncRollback Synchronous rollback flag.
+ * @param syncMode Write synchronization mode.
* @param baseVer Base version.
* @param committedVers Committed versions.
* @param rolledbackVers Rolled back versions.
@@ -135,8 +111,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
boolean sys,
byte plc,
boolean sysInvalidate,
- boolean syncCommit,
- boolean syncRollback,
+ CacheWriteSynchronizationMode syncMode,
GridCacheVersion baseVer,
Collection<GridCacheVersion> committedVers,
Collection<GridCacheVersion> rolledbackVers,
@@ -151,17 +126,19 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
super(
xidVer,
futId,
+ topVer,
commitVer,
threadId,
commit,
invalidate,
sys,
plc,
- syncCommit,
- syncRollback,
+ syncMode,
baseVer,
committedVers,
rolledbackVers,
+ subjId,
+ taskNameHash,
txSize,
addDepInfo);
@@ -170,16 +147,13 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
assert isolation != null;
this.pendingVers = pendingVers;
- this.topVer = topVer;
this.nearNodeId = nearNodeId;
this.isolation = isolation;
this.miniId = miniId;
- this.sysInvalidate = sysInvalidate;
- this.subjId = subjId;
- this.taskNameHash = taskNameHash;
needReturnValue(retVal);
waitRemoteTransactions(waitRemoteTxs);
+ systemInvalidate(sysInvalidate);
}
/**
@@ -196,8 +170,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
* @param sys System flag.
* @param plc IO policy.
* @param sysInvalidate System invalidation flag.
- * @param syncCommit Synchronous commit flag.
- * @param syncRollback Synchronous rollback flag.
+ * @param syncMode Write synchronization mode.
* @param baseVer Base version.
* @param committedVers Committed versions.
* @param rolledbackVers Rolled back versions.
@@ -222,8 +195,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
boolean sys,
byte plc,
boolean sysInvalidate,
- boolean syncCommit,
- boolean syncRollback,
+ CacheWriteSynchronizationMode syncMode,
GridCacheVersion baseVer,
Collection<GridCacheVersion> committedVers,
Collection<GridCacheVersion> rolledbackVers,
@@ -236,9 +208,30 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
boolean retVal,
boolean waitRemoteTxs
) {
- this(nearNodeId, futId, miniId, topVer, xidVer, commitVer, threadId, isolation, commit, invalidate, sys, plc,
- sysInvalidate, syncCommit, syncRollback, baseVer, committedVers, rolledbackVers, pendingVers, txSize,
- subjId, taskNameHash, addDepInfo, retVal, waitRemoteTxs);
+ this(nearNodeId,
+ futId,
+ miniId,
+ topVer,
+ xidVer,
+ commitVer,
+ threadId,
+ isolation,
+ commit,
+ invalidate,
+ sys,
+ plc,
+ sysInvalidate,
+ syncMode,
+ baseVer,
+ committedVers,
+ rolledbackVers,
+ pendingVers,
+ txSize,
+ subjId,
+ taskNameHash,
+ addDepInfo,
+ retVal,
+ waitRemoteTxs);
if (updateIdxs != null && !updateIdxs.isEmpty()) {
partUpdateCnt = new GridLongList(updateIdxs.size());
@@ -263,20 +256,6 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
}
/**
- * @return Subject ID.
- */
- @Nullable public UUID subjectId() {
- return subjId;
- }
-
- /**
- * @return Task name hash.
- */
- public int taskNameHash() {
- return taskNameHash;
- }
-
- /**
* @return Transaction isolation.
*/
public TransactionIsolation isolation() {
@@ -294,7 +273,14 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
* @return System invalidate flag.
*/
public boolean isSystemInvalidate() {
- return sysInvalidate;
+ return isFlag(SYS_INVALIDATE_FLAG_MASK);
+ }
+
+ /**
+ * @param sysInvalidate System invalidation flag.
+ */
+ private void systemInvalidate(boolean sysInvalidate) {
+ setFlag(sysInvalidate, SYS_INVALIDATE_FLAG_MASK);
}
/**
@@ -312,63 +298,45 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
}
/**
- * @return Topology version.
- */
- @Override public AffinityTopologyVersion topologyVersion() {
- return topVer;
- }
-
- /**
* @return Check committed flag.
*/
public boolean checkCommitted() {
- return checkCommitted;
+ return isFlag(CHECK_COMMITTED_FLAG_MASK);
}
/**
* @param checkCommitted Check committed flag.
*/
public void checkCommitted(boolean checkCommitted) {
- this.checkCommitted = checkCommitted;
+ setFlag(checkCommitted, CHECK_COMMITTED_FLAG_MASK);
}
/**
* @return {@code True}
*/
public boolean waitRemoteTransactions() {
- return (flags & WAIT_REMOTE_TX_FLAG_MASK) != 0;
+ return isFlag(WAIT_REMOTE_TX_FLAG_MASK);
}
/**
* @param waitRemoteTxs Wait remote transactions flag.
*/
- public void waitRemoteTransactions(boolean waitRemoteTxs) {
- if (waitRemoteTxs)
- flags = (byte)(flags | WAIT_REMOTE_TX_FLAG_MASK);
- else
- flags &= ~WAIT_REMOTE_TX_FLAG_MASK;
+ private void waitRemoteTransactions(boolean waitRemoteTxs) {
+ setFlag(waitRemoteTxs, WAIT_REMOTE_TX_FLAG_MASK);
}
/**
* @return Flag indicating whether transaction needs return value.
*/
public boolean needReturnValue() {
- return (flags & NEED_RETURN_VALUE_FLAG_MASK) != 0;
+ return isFlag(NEED_RETURN_VALUE_FLAG_MASK);
}
/**
* @param retVal Need return value.
*/
public void needReturnValue(boolean retVal) {
- if (retVal)
- flags = (byte)(flags | NEED_RETURN_VALUE_FLAG_MASK);
- else
- flags &= ~NEED_RETURN_VALUE_FLAG_MASK;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridDhtTxFinishRequest.class, this, super.toString());
+ setFlag(retVal, NEED_RETURN_VALUE_FLAG_MASK);
}
/** {@inheritDoc} */
@@ -386,73 +354,37 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
}
switch (writer.state()) {
- case 18:
- if (!writer.writeBoolean("checkCommitted", checkCommitted))
- return false;
-
- writer.incrementState();
-
- case 19:
- if (!writer.writeByte("flags", flags))
- return false;
-
- writer.incrementState();
-
- case 20:
- if (!writer.writeByte("isolation", isolation != null ? (byte)isolation.ordinal() : -1))
- return false;
-
- writer.incrementState();
-
case 21:
- if (!writer.writeIgniteUuid("miniId", miniId))
+ if (!writer.writeByte("isolation", isolation != null ? (byte)isolation.ordinal() : -1))
return false;
writer.incrementState();
case 22:
- if (!writer.writeUuid("nearNodeId", nearNodeId))
+ if (!writer.writeIgniteUuid("miniId", miniId))
return false;
writer.incrementState();
case 23:
- if (!writer.writeMessage("partUpdateCnt", partUpdateCnt))
+ if (!writer.writeUuid("nearNodeId", nearNodeId))
return false;
writer.incrementState();
case 24:
- if (!writer.writeCollection("pendingVers", pendingVers, MessageCollectionItemType.MSG))
+ if (!writer.writeMessage("partUpdateCnt", partUpdateCnt))
return false;
writer.incrementState();
case 25:
- if (!writer.writeUuid("subjId", subjId))
+ if (!writer.writeCollection("pendingVers", pendingVers, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
case 26:
- if (!writer.writeBoolean("sysInvalidate", sysInvalidate))
- return false;
-
- writer.incrementState();
-
- case 27:
- if (!writer.writeInt("taskNameHash", taskNameHash))
- return false;
-
- writer.incrementState();
-
- case 28:
- if (!writer.writeMessage("topVer", topVer))
- return false;
-
- writer.incrementState();
-
- case 29:
if (!writer.writeMessage("writeVer", writeVer))
return false;
@@ -474,23 +406,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
return false;
switch (reader.state()) {
- case 18:
- checkCommitted = reader.readBoolean("checkCommitted");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 19:
- flags = reader.readByte("flags");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 20:
+ case 21:
byte isolationOrd;
isolationOrd = reader.readByte("isolation");
@@ -502,16 +418,8 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
reader.incrementState();
- case 21:
- miniId = reader.readIgniteUuid("miniId");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
case 22:
- nearNodeId = reader.readUuid("nearNodeId");
+ miniId = reader.readIgniteUuid("miniId");
if (!reader.isLastRead())
return false;
@@ -519,7 +427,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
reader.incrementState();
case 23:
- partUpdateCnt = reader.readMessage("partUpdateCnt");
+ nearNodeId = reader.readUuid("nearNodeId");
if (!reader.isLastRead())
return false;
@@ -527,7 +435,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
reader.incrementState();
case 24:
- pendingVers = reader.readCollection("pendingVers", MessageCollectionItemType.MSG);
+ partUpdateCnt = reader.readMessage("partUpdateCnt");
if (!reader.isLastRead())
return false;
@@ -535,7 +443,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
reader.incrementState();
case 25:
- subjId = reader.readUuid("subjId");
+ pendingVers = reader.readCollection("pendingVers", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
return false;
@@ -543,30 +451,6 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
reader.incrementState();
case 26:
- sysInvalidate = reader.readBoolean("sysInvalidate");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 27:
- taskNameHash = reader.readInt("taskNameHash");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 28:
- topVer = reader.readMessage("topVer");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 29:
writeVer = reader.readMessage("writeVer");
if (!reader.isLastRead())
@@ -586,6 +470,11 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 30;
+ return 27;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridDhtTxFinishRequest.class, this, super.toString());
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/26316bb0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java
index 8fb1f4e..4808289 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java
@@ -39,6 +39,12 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
/** */
private static final long serialVersionUID = 0L;
+ /** */
+ private static final int NEAR_RES_FLAG_MASK = 0x01;
+
+ /** Flag indicating if this is a check-committed response. */
+ private static final int CHECK_COMMITTED_FLAG_MASK = 0x02;
+
/** Mini future ID. */
private IgniteUuid miniId;
@@ -49,9 +55,6 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
/** Serialized error. */
private byte[] checkCommittedErrBytes;
- /** Flag indicating if this is a check-committed response. */
- private boolean checkCommitted;
-
/** Cache return value. */
private GridCacheReturn retVal;
@@ -63,12 +66,13 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
}
/**
+ * @param part Partition.
* @param xid Xid version.
* @param futId Future ID.
* @param miniId Mini future ID.
*/
- public GridDhtTxFinishResponse(GridCacheVersion xid, IgniteUuid futId, IgniteUuid miniId) {
- super(xid, futId);
+ public GridDhtTxFinishResponse(int part, GridCacheVersion xid, IgniteUuid futId, IgniteUuid miniId) {
+ super(part, xid, futId);
assert miniId != null;
@@ -76,6 +80,20 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
}
/**
+ * @return {@code True} if this is reply for near node.
+ */
+ public boolean nearNodeResponse() {
+ return isFlag(NEAR_RES_FLAG_MASK);
+ }
+
+ /**
+ * @param val {@code True} if this is reply for near node.
+ */
+ public void nearNodeResponse(boolean val) {
+ setFlag(val, NEAR_RES_FLAG_MASK);
+ }
+
+ /**
* @return Mini future ID.
*/
public IgniteUuid miniId() {
@@ -100,14 +118,14 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
* @return Check committed flag.
*/
public boolean checkCommitted() {
- return checkCommitted;
+ return isFlag(CHECK_COMMITTED_FLAG_MASK);
}
/**
* @param checkCommitted Check committed flag.
*/
public void checkCommitted(boolean checkCommitted) {
- this.checkCommitted = checkCommitted;
+ setFlag(checkCommitted, CHECK_COMMITTED_FLAG_MASK);
}
/** {@inheritDoc} */
@@ -158,11 +176,6 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
}
/** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridDhtTxFinishResponse.class, this, super.toString());
- }
-
- /** {@inheritDoc} */
@Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
writer.setBuffer(buf);
@@ -177,25 +190,19 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
}
switch (writer.state()) {
- case 5:
- if (!writer.writeBoolean("checkCommitted", checkCommitted))
- return false;
-
- writer.incrementState();
-
- case 6:
+ case 7:
if (!writer.writeByteArray("checkCommittedErrBytes", checkCommittedErrBytes))
return false;
writer.incrementState();
- case 7:
+ case 8:
if (!writer.writeIgniteUuid("miniId", miniId))
return false;
writer.incrementState();
- case 8:
+ case 9:
if (!writer.writeMessage("retVal", retVal))
return false;
@@ -217,15 +224,7 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
return false;
switch (reader.state()) {
- case 5:
- checkCommitted = reader.readBoolean("checkCommitted");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 6:
+ case 7:
checkCommittedErrBytes = reader.readByteArray("checkCommittedErrBytes");
if (!reader.isLastRead())
@@ -233,7 +232,7 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
reader.incrementState();
- case 7:
+ case 8:
miniId = reader.readIgniteUuid("miniId");
if (!reader.isLastRead())
@@ -241,7 +240,7 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
reader.incrementState();
- case 8:
+ case 9:
retVal = reader.readMessage("retVal");
if (!reader.isLastRead())
@@ -261,6 +260,20 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 9;
+ return 10;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ StringBuilder flags = new StringBuilder();
+
+ if (checkCommitted())
+ appendFlag(flags, "checkComm");
+ if (nearNodeResponse())
+ appendFlag(flags, "nearRes");
+
+ return S.toString(GridDhtTxFinishResponse.class, this,
+ "flags", flags.toString(),
+ "super", super.toString());
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/26316bb0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
index 79c371c..c2c2918 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
@@ -641,8 +641,13 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
return;
}
- GridNearTxFinishResponse res = new GridNearTxFinishResponse(nearXidVer, threadId, nearFinFutId,
- nearFinMiniId, err);
+ GridNearTxFinishResponse res = new GridNearTxFinishResponse(
+ -1,
+ nearXidVer,
+ threadId,
+ nearFinFutId,
+ nearFinMiniId,
+ err);
try {
cctx.io().send(nearNodeId, res, ioPolicy());
http://git-wip-us.apache.org/repos/asf/ignite/blob/26316bb0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 7645d58..ca028f8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -1252,8 +1252,6 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
if (last) {
int miniId = 0;
- assert tx.transactionNodes() != null;
-
final long timeout = timeoutObj != null ? timeoutObj.timeout : 0;
// Create mini futures.
@@ -1288,7 +1286,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
GridDhtTxPrepareRequest req = new GridDhtTxPrepareRequest(
futId,
- fut != null ? fut.futureId() : null,
+ fut != null ? fut.futureId() : 0,
nearFutId,
nearMiniId,
tx.topologyVersion(),
@@ -1315,8 +1313,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
GridCacheContext<?, ?> cacheCtx = cached.context();
// Do not invalidate near entry on originating transaction node.
- req.invalidateNearEntry(idx, !tx.nearNodeId().equals(n.id()) &&
- cached.readerId(n.id()) != null);
+ req.invalidateNearEntry(idx,
+ !tx.nearNodeId().equals(n.id()) && cached.readerId(n.id()) != null);
if (cached.isNewLocked()) {
List<ClusterNode> owners = cacheCtx.topology().owners(cached.partition(),
http://git-wip-us.apache.org/repos/asf/ignite/blob/26316bb0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
index e7153da..2fd72f3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
@@ -247,11 +247,6 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
}
/** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridDhtTxPrepareResponse.class, this, "super", super.toString());
- }
-
- /** {@inheritDoc} */
@Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
writer.setBuffer(buf);
@@ -266,31 +261,31 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
}
switch (writer.state()) {
- case 8:
+ case 10:
if (!writer.writeIgniteUuid("futId", futId))
return false;
writer.incrementState();
- case 9:
+ case 11:
if (!writer.writeMap("invalidParts", invalidParts, MessageCollectionItemType.INT, MessageCollectionItemType.INT_ARR))
return false;
writer.incrementState();
- case 10:
+ case 12:
if (!writer.writeInt("miniId", miniId))
return false;
writer.incrementState();
- case 11:
+ case 13:
if (!writer.writeCollection("nearEvicted", nearEvicted, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
- case 12:
+ case 14:
if (!writer.writeCollection("preloadEntries", preloadEntries, MessageCollectionItemType.MSG))
return false;
@@ -312,7 +307,7 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
return false;
switch (reader.state()) {
- case 8:
+ case 10:
futId = reader.readIgniteUuid("futId");
if (!reader.isLastRead())
@@ -320,7 +315,7 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
reader.incrementState();
- case 9:
+ case 11:
invalidParts = reader.readMap("invalidParts", MessageCollectionItemType.INT, MessageCollectionItemType.INT_ARR, false);
if (!reader.isLastRead())
@@ -328,7 +323,7 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
reader.incrementState();
- case 10:
+ case 12:
miniId = reader.readInt("miniId");
if (!reader.isLastRead())
@@ -336,7 +331,7 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
reader.incrementState();
- case 11:
+ case 13:
nearEvicted = reader.readCollection("nearEvicted", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
@@ -344,7 +339,7 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
reader.incrementState();
- case 12:
+ case 14:
preloadEntries = reader.readCollection("preloadEntries", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
@@ -364,6 +359,17 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 13;
+ return 15;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ StringBuilder flags = new StringBuilder();
+
+ if (nearNodeResponse())
+ appendFlag(flags, "nearRes");
+
+ return S.toString(GridDhtTxPrepareResponse.class, this,
+ "super", super.toString());
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/26316bb0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
index 652eec2..cd0e7fd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
@@ -195,7 +195,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
}
/** {@inheritDoc} */
- @Override public void onResult(UUID nodeId, GridNearTxPrepareResponse res) {
+ @Override public void onPrimaryResponse(UUID nodeId, GridNearTxPrepareResponse res) {
if (!isDone()) {
MiniFuture mini = miniFuture(res.miniId());
@@ -885,7 +885,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
onDone(res);
}
else {
- parent.onPrimaryPrepareResponse(m, res);
+ parent.processPrimaryPrepareResponse(m, res);
// Finish this mini future (need result only on client node).
onDone(parent.cctx.kernalContext().clientNode() ? res : null);
@@ -899,8 +899,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
*/
private void remap(final GridNearTxPrepareResponse res) {
parent.prepareOnTopology(true, new Runnable() {
- @Override
- public void run() {
+ @Override public void run() {
onDone(res);
}
});
http://git-wip-us.apache.org/repos/asf/ignite/blob/26316bb0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index 5cfb1a2..d321cb9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -174,7 +174,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
}
/** {@inheritDoc} */
- @Override public void onResult(UUID nodeId, GridNearTxPrepareResponse res) {
+ @Override public void onPrimaryResponse(UUID nodeId, GridNearTxPrepareResponse res) {
if (!isDone()) {
MiniFuture mini = miniFuture(res.miniId());
@@ -918,7 +918,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
remap();
}
else {
- parent.onPrimaryPrepareResponse(m, res);
+ parent.processPrimaryPrepareResponse(m, res);
// Proceed prepare before finishing mini future.
if (mappings != null)
http://git-wip-us.apache.org/repos/asf/ignite/blob/26316bb0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
index b476336..03ee7af 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
@@ -95,7 +95,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
}
/** {@inheritDoc} */
- @Override public void onResult(UUID nodeId, GridNearTxPrepareResponse res) {
+ @Override public void onPrimaryResponse(UUID nodeId, GridNearTxPrepareResponse res) {
if (!isDone()) {
assert res.clientRemapVersion() == null : res;
@@ -205,7 +205,6 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
*/
private void preparePessimistic() {
// TODO IGNITE-4768: need detect on lock step?
-
boolean dhtReplyNear = true;
Map<IgniteBiTuple<ClusterNode, Boolean>, GridDistributedTxMapping> mappings = new HashMap<>();
@@ -473,7 +472,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
else {
assert dhtNodes == null;
- onPrimaryPrepareResponse(m, res);
+ processPrimaryPrepareResponse(m, res);
onDone();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/26316bb0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index 06cf878..1d52b6d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -148,7 +148,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
if (isMini(fut)) {
MinFuture f = (MinFuture)fut;
- if (f.onNodeLeft(nodeId, true)) {
+ if (f.onNodeLeft(nodeId)) {
// Remove previous mapping.
mappings.remove(nodeId);
@@ -184,7 +184,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
* @param res Result.
*/
@SuppressWarnings("ForLoopReplaceableByForEach")
- public void onResult(UUID nodeId, GridNearTxFinishResponse res) {
+ public void onPrimaryResponse(UUID nodeId, GridNearTxFinishResponse res) {
if (!isDone()) {
FinishMiniFuture finishFut = null;
@@ -209,7 +209,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
}
if (finishFut != null)
- finishFut.onNearFinishResponse(res);
+ finishFut.onPrimaryResponse(res);
else {
if (msgLog.isDebugEnabled()) {
msgLog.debug("Near finish fut, failed to find mini future [txId=" + tx.nearXidVersion() +
@@ -233,34 +233,64 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
* @param nodeId Sender.
* @param res Result.
*/
- public void onResult(UUID nodeId, GridDhtTxFinishResponse res) {
+ public void onDhtResponse(UUID nodeId, GridDhtTxFinishResponse res) {
+ assert res.checkCommitted() ^ res.nearNodeResponse() : res;
+
if (!isDone()) {
- boolean found = false;
+ MinFuture foundFut = null;
+
+ synchronized (sync) {
+ int size = futuresCountNoLock();
- for (IgniteInternalFuture<IgniteInternalTx> fut : futures()) {
- if (fut.getClass() == CheckBackupMiniFuture.class) {
- CheckBackupMiniFuture f = (CheckBackupMiniFuture)fut;
+ for (int i = 0; i < size; i++) {
+ IgniteInternalFuture<IgniteInternalTx> fut = future(i);
- if (f.futureId().equals(res.miniId())) {
- found = true;
+ if (res.nearNodeResponse()) {
+ assert tx.dhtReplyNear() && tx.syncMode() == FULL_SYNC;
- assert f.node().id().equals(nodeId);
+ if (fut.getClass() == FinishMiniFuture.class) {
+ FinishMiniFuture f = (FinishMiniFuture)fut;
- if (res.returnValue() != null)
- tx.implicitSingleResult(res.returnValue());
+ if (f.futureId().equals(res.miniId())) {
+ foundFut = (MinFuture)fut;
- f.onDhtFinishResponse(res);
+ break;
+ }
+ }
}
- }
- else if (fut.getClass() == CheckRemoteTxMiniFuture.class) {
- CheckRemoteTxMiniFuture f = (CheckRemoteTxMiniFuture)fut;
+ else {
+ assert res.checkCommitted();
+
+ if (fut.getClass() == CheckBackupMiniFuture.class) {
+ CheckBackupMiniFuture f = (CheckBackupMiniFuture)fut;
+
+ if (f.futureId().equals(res.miniId())) {
+ assert f.node().id().equals(nodeId);
- if (f.futureId().equals(res.miniId()))
- f.onDhtFinishResponse(nodeId);
+ foundFut = f;
+
+ if (res.returnValue() != null)
+ tx.implicitSingleResult(res.returnValue());
+
+ break;
+ }
+ }
+ else if (fut.getClass() == CheckRemoteTxMiniFuture.class) {
+ CheckRemoteTxMiniFuture f = (CheckRemoteTxMiniFuture)fut;
+
+ if (f.futureId().equals(res.miniId())) {
+ foundFut = f;
+
+ break;
+ }
+ }
+ }
}
}
- if (!found && msgLog.isDebugEnabled()) {
+ if (foundFut != null)
+ foundFut.onDhtFinishResponse(nodeId, res);
+ else if (msgLog.isDebugEnabled()) {
msgLog.debug("Near finish fut, failed to find mini future [txId=" + tx.nearXidVersion() +
", node=" + nodeId +
", res=" + res +
@@ -571,7 +601,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
}
}
catch (ClusterTopologyCheckedException ignored) {
- mini.onNodeLeft(backupId, false);
+ mini.onNodeLeft(backupId);
}
catch (IgniteCheckedException e) {
if (msgLog.isDebugEnabled()) {
@@ -731,7 +761,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
// Remove previous mapping.
mappings.remove(m.primary().id());
- fut.onNodeLeft(n.id(), false);
+ fut.onNodeLeft(n.id());
}
catch (IgniteCheckedException e) {
if (msgLog.isDebugEnabled()) {
@@ -813,8 +843,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
tx.system(),
tx.ioPolicy(),
false,
- tx.syncMode() == FULL_SYNC,
- tx.syncMode() == FULL_SYNC,
+ tx.syncMode(),
null,
null,
null,
@@ -840,10 +869,15 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
/**
* @param nodeId Node ID.
- * @param discoThread {@code True} if executed from discovery thread.
* @return {@code True} if future processed node failure.
*/
- abstract boolean onNodeLeft(UUID nodeId, boolean discoThread);
+ abstract boolean onNodeLeft(UUID nodeId);
+
+ /**
+ * @param nodeId Node ID.
+ * @param res Response.
+ */
+ abstract void onDhtFinishResponse(UUID nodeId, GridDhtTxFinishResponse res);
/**
* @return Future ID.
@@ -890,7 +924,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
}
/** {@inheritDoc} */
- boolean onNodeLeft(UUID nodeId, boolean discoThread) {
+ boolean onNodeLeft(UUID nodeId) {
if (nodeId.equals(m.primary().id())) {
if (msgLog.isDebugEnabled()) {
msgLog.debug("Near finish fut, mini future node left [txId=" + tx.nearXidVersion() +
@@ -919,7 +953,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
fut.listen(new CI1<IgniteInternalFuture<?>>() {
@Override public void apply(IgniteInternalFuture<?> fut) {
- mini.onDhtFinishResponse(cctx.localNodeId());
+ mini.onDhtFinishResponse(cctx.localNodeId(), null);
}
});
}
@@ -928,7 +962,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
cctx.io().send(backup, req, tx.ioPolicy());
}
catch (ClusterTopologyCheckedException ignored) {
- mini.onNodeLeft(backupId, discoThread);
+ mini.onNodeLeft(backupId);
}
catch (IgniteCheckedException e) {
mini.onDone(e);
@@ -936,7 +970,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
}
}
else
- mini.onDhtFinishResponse(backupId);
+ mini.onDhtFinishResponse(backupId, null);
}
}
}
@@ -950,10 +984,24 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
return false;
}
+ /** {@inheritDoc} */
+ @Override void onDhtFinishResponse(UUID nodeId, GridDhtTxFinishResponse res) {
+ assert dhtNodes != null;
+
+ boolean done;
+
+ synchronized (dhtNodes) {
+ done = dhtNodes.remove(nodeId) && dhtNodes.isEmpty();
+ }
+
+ if (done)
+ onDone(tx);
+ }
+
/**
- * @param res Result callback.
+ * @param res Response.
*/
- void onNearFinishResponse(GridNearTxFinishResponse res) {
+ void onPrimaryResponse(GridNearTxFinishResponse res) {
assert dhtNodes == null || res.error() != null;
if (res.error() != null)
@@ -999,7 +1047,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
}
/** {@inheritDoc} */
- @Override boolean onNodeLeft(UUID nodeId, boolean discoThread) {
+ @Override boolean onNodeLeft(UUID nodeId) {
if (nodeId.equals(backup.id())) {
readyNearMappingFromBackup(m);
@@ -1011,10 +1059,8 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
return false;
}
- /**
- * @param res Response.
- */
- void onDhtFinishResponse(GridDhtTxFinishResponse res) {
+ /** {@inheritDoc} */
+ @Override void onDhtFinishResponse(UUID nodeId, GridDhtTxFinishResponse res) {
readyNearMappingFromBackup(m);
Throwable err = res.checkCommittedError();
@@ -1060,14 +1106,12 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
}
/** {@inheritDoc} */
- @Override boolean onNodeLeft(UUID nodeId, boolean discoThread) {
+ @Override boolean onNodeLeft(UUID nodeId) {
return onResponse(nodeId);
}
- /**
- * @param nodeId Node ID.
- */
- void onDhtFinishResponse(UUID nodeId) {
+ /** {@inheritDoc} */
+ @Override void onDhtFinishResponse(UUID nodeId, GridDhtTxFinishResponse res) {
onResponse(nodeId);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/26316bb0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
index dfbbe18..8be4304 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
@@ -42,24 +42,6 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest {
/** Mini future ID. */
private IgniteUuid miniId;
- /** Explicit lock flag. */
- private boolean explicitLock;
-
- /** Store enabled flag. */
- private boolean storeEnabled;
-
- /** Topology version. */
- private AffinityTopologyVersion topVer;
-
- /** Subject ID. */
- private UUID subjId;
-
- /** Task name hash. */
- private int taskNameHash;
-
- /** Write synchronization mode. */
- private CacheWriteSynchronizationMode syncMode;
-
/**
* Empty constructor required for {@link Externalizable}.
*/
@@ -109,48 +91,53 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest {
super(
xidVer,
futId,
+ topVer,
null,
threadId,
commit,
invalidate,
sys,
plc,
- syncMode == CacheWriteSynchronizationMode.FULL_SYNC,
- syncMode == CacheWriteSynchronizationMode.FULL_SYNC,
+ syncMode,
baseVer,
committedVers,
rolledbackVers,
+ subjId,
+ taskNameHash,
txSize,
addDepInfo
);
- this.syncMode = syncMode;
- this.explicitLock = explicitLock;
- this.storeEnabled = storeEnabled;
- this.topVer = topVer;
- this.subjId = subjId;
- this.taskNameHash = taskNameHash;
+ explicitLock(explicitLock);
+ storeEnabled(storeEnabled);
}
/**
- * @return Transaction write synchronization mode (can be null is message sent from old nodes).
+ * @return Explicit lock flag.
*/
- @Nullable public CacheWriteSynchronizationMode syncMode() {
- return syncMode;
+ public boolean explicitLock() {
+ return isFlag(EXPLICIT_LOCK_FLAG_MASK);
}
/**
- * @return Explicit lock flag.
+ * @param explicitLock Explicit lock flag.
*/
- public boolean explicitLock() {
- return explicitLock;
+ private void explicitLock(boolean explicitLock) {
+ setFlag(explicitLock, EXPLICIT_LOCK_FLAG_MASK);
}
/**
* @return Store enabled flag.
*/
public boolean storeEnabled() {
- return storeEnabled;
+ return isFlag(STORE_ENABLED_FLAG_MASK);
+ }
+
+ /**
+ * @param storeEnabled Store enabled flag.
+ */
+ private void storeEnabled(boolean storeEnabled) {
+ setFlag(storeEnabled, STORE_ENABLED_FLAG_MASK);
}
/**
@@ -167,27 +154,6 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest {
this.miniId = miniId;
}
- /**
- * @return Subject ID.
- */
- @Nullable public UUID subjectId() {
- return subjId;
- }
-
- /**
- * @return Task name hash.
- */
- public int taskNameHash() {
- return taskNameHash;
- }
-
- /**
- * @return Topology version.
- */
- @Override public AffinityTopologyVersion topologyVersion() {
- return topVer;
- }
-
/** {@inheritDoc} */
@Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
writer.setBuffer(buf);
@@ -203,44 +169,8 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest {
}
switch (writer.state()) {
- case 18:
- if (!writer.writeBoolean("explicitLock", explicitLock))
- return false;
-
- writer.incrementState();
-
- case 19:
- if (!writer.writeIgniteUuid("miniId", miniId))
- return false;
-
- writer.incrementState();
-
- case 20:
- if (!writer.writeBoolean("storeEnabled", storeEnabled))
- return false;
-
- writer.incrementState();
-
case 21:
- if (!writer.writeUuid("subjId", subjId))
- return false;
-
- writer.incrementState();
-
- case 22:
- if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1))
- return false;
-
- writer.incrementState();
-
- case 23:
- if (!writer.writeInt("taskNameHash", taskNameHash))
- return false;
-
- writer.incrementState();
-
- case 24:
- if (!writer.writeMessage("topVer", topVer))
+ if (!writer.writeIgniteUuid("miniId", miniId))
return false;
writer.incrementState();
@@ -261,60 +191,8 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest {
return false;
switch (reader.state()) {
- case 18:
- explicitLock = reader.readBoolean("explicitLock");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 19:
- miniId = reader.readIgniteUuid("miniId");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 20:
- storeEnabled = reader.readBoolean("storeEnabled");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
case 21:
- subjId = reader.readUuid("subjId");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 22:
- byte syncModeOrd;
-
- syncModeOrd = reader.readByte("syncMode");
-
- if (!reader.isLastRead())
- return false;
-
- syncMode = CacheWriteSynchronizationMode.fromOrdinal(syncModeOrd);
-
- reader.incrementState();
-
- case 23:
- taskNameHash = reader.readInt("taskNameHash");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 24:
- topVer = reader.readMessage("topVer");
+ miniId = reader.readIgniteUuid("miniId");
if (!reader.isLastRead())
return false;
@@ -333,7 +211,7 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 25;
+ return 22;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/26316bb0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java
index 37fbb36..cf1ba46 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java
@@ -59,15 +59,16 @@ public class GridNearTxFinishResponse extends GridDistributedTxFinishResponse {
}
/**
+ * @param part Partition.
* @param xid Xid version.
* @param nearThreadId Near tx thread ID.
* @param futId Future ID.
* @param miniId Mini future Id.
* @param err Error.
*/
- public GridNearTxFinishResponse(GridCacheVersion xid, long nearThreadId, IgniteUuid futId, IgniteUuid miniId,
+ public GridNearTxFinishResponse(int part, GridCacheVersion xid, long nearThreadId, IgniteUuid futId, IgniteUuid miniId,
@Nullable Throwable err) {
- super(xid, futId);
+ super(part, xid, futId);
assert miniId != null;
@@ -127,19 +128,19 @@ public class GridNearTxFinishResponse extends GridDistributedTxFinishResponse {
}
switch (writer.state()) {
- case 5:
+ case 7:
if (!writer.writeByteArray("errBytes", errBytes))
return false;
writer.incrementState();
- case 6:
+ case 8:
if (!writer.writeIgniteUuid("miniId", miniId))
return false;
writer.incrementState();
- case 7:
+ case 9:
if (!writer.writeLong("nearThreadId", nearThreadId))
return false;
@@ -161,7 +162,7 @@ public class GridNearTxFinishResponse extends GridDistributedTxFinishResponse {
return false;
switch (reader.state()) {
- case 5:
+ case 7:
errBytes = reader.readByteArray("errBytes");
if (!reader.isLastRead())
@@ -169,7 +170,7 @@ public class GridNearTxFinishResponse extends GridDistributedTxFinishResponse {
reader.incrementState();
- case 6:
+ case 8:
miniId = reader.readIgniteUuid("miniId");
if (!reader.isLastRead())
@@ -177,7 +178,7 @@ public class GridNearTxFinishResponse extends GridDistributedTxFinishResponse {
reader.incrementState();
- case 7:
+ case 9:
nearThreadId = reader.readLong("nearThreadId");
if (!reader.isLastRead())
@@ -197,7 +198,7 @@ public class GridNearTxFinishResponse extends GridDistributedTxFinishResponse {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 8;
+ return 10;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/26316bb0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
index 71d2353..8a8dc7e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
@@ -152,7 +152,7 @@ public abstract class GridNearTxPrepareFutureAdapter extends
* @param nodeId Sender.
* @param res Result.
*/
- public abstract void onResult(UUID nodeId, GridNearTxPrepareResponse res);
+ public abstract void onPrimaryResponse(UUID nodeId, GridNearTxPrepareResponse res);
/**
* @param nodeId Sender.
@@ -197,7 +197,7 @@ public abstract class GridNearTxPrepareFutureAdapter extends
* @param res Response.
*/
@SuppressWarnings("ThrowableResultOfMethodCallIgnored")
- final void onPrimaryPrepareResponse(GridDistributedTxMapping m, GridNearTxPrepareResponse res) {
+ final void processPrimaryPrepareResponse(GridDistributedTxMapping m, GridNearTxPrepareResponse res) {
if (res == null)
return;
[6/8] ignite git commit: Merge remote-tracking branch
'remotes/origin/ignite-4768' into ignite-4768-1
Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/origin/ignite-4768' into ignite-4768-1
# Conflicts:
# modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
# modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
# modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
# modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
# modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
# modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7cf4b986
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7cf4b986
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7cf4b986
Branch: refs/heads/ignite-4768-1
Commit: 7cf4b9860a6b64d3cb63b347e166988b568ec5b6
Parents: e1961ff
Author: sboikov <sb...@gridgain.com>
Authored: Fri Mar 10 13:09:44 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Mar 10 13:09:44 2017 +0300
----------------------------------------------------------------------
.../managers/communication/GridIoMessageFactory.java | 1 -
.../processors/cache/distributed/dht/GridDhtLockFuture.java | 6 ++++++
.../cache/distributed/dht/GridDhtTxPrepareFuture.java | 1 +
.../cache/distributed/dht/GridDhtTxPrepareResponse.java | 2 +-
.../dht/colocated/GridDhtColocatedLockFuture.java | 6 ++++++
.../cache/distributed/near/GridNearLockFuture.java | 8 +++++++-
.../near/GridNearOptimisticSerializableTxPrepareFuture.java | 1 +
.../distributed/near/GridNearOptimisticTxPrepareFuture.java | 1 +
.../internal/processors/cache/local/GridLocalLockFuture.java | 6 ++++++
.../processors/cache/transactions/IgniteTxHandler.java | 4 +++-
10 files changed, 32 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/7cf4b986/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index ce2c72c..6f95400 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -62,7 +62,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLockRe
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLockResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishResponse;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxNearPrepareResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxOnePhaseCommitAckRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareResponse;
http://git-wip-us.apache.org/repos/asf/ignite/blob/7cf4b986/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
index a0270b0..cda4641 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
@@ -49,6 +49,7 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.GridCacheMappedVersion;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheEntry;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedLockCancelledException;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -660,6 +661,11 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
return false;
}
+ /** {@inheritDoc} */
+ @Override public IgniteInternalTx tx() {
+ return tx;
+ }
+
/**
* @return {@code True} if locks have been acquired.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/7cf4b986/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index a787f5f..7645d58 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -1418,6 +1418,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
nearMapping.writes(),
tx.transactionNodes(),
tx.nearXidVersion(),
+ false,
true,
tx.onePhaseCommit(),
tx.subjectId(),
http://git-wip-us.apache.org/repos/asf/ignite/blob/7cf4b986/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
index cb10374..e7153da 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
@@ -134,7 +134,7 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
/**
* @param val {@code True} if this is reply for near node.
*/
- void nearNodeResponse(boolean val) {
+ public void nearNodeResponse(boolean val) {
setFlag(val, NEAR_RES_FLAG_MASK);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7cf4b986/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index 0ce380d..3a84535 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -50,6 +50,7 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLock
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockResponse;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
import org.apache.ignite.internal.processors.cache.transactions.TxDeadlock;
@@ -235,6 +236,11 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
return false;
}
+ /** {@inheritDoc} */
+ @Override public IgniteInternalTx tx() {
+ return tx;
+ }
+
/**
* @return Future ID.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/7cf4b986/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
index ffc84d8..6b07857 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
@@ -48,6 +48,7 @@ import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCa
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTransactionalCacheAdapter;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
import org.apache.ignite.internal.processors.cache.transactions.TxDeadlock;
@@ -236,10 +237,15 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
return lockVer;
}
+ /** {@inheritDoc} */
+ @Override public IgniteInternalTx tx() {
+ return tx;
+ }
+
/**
* @return Entries.
*/
- public synchronized List<GridDistributedCacheEntry> entriesCopy() {
+ synchronized List<GridDistributedCacheEntry> entriesCopy() {
return new ArrayList<>(entries);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7cf4b986/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
index aef40ce..652eec2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
@@ -449,6 +449,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
m.writes(),
m.near(),
txMapping.transactionNodes(),
+ false,
m.last(),
tx.onePhaseCommit(),
tx.needReturnValue() && tx.implicit(),
http://git-wip-us.apache.org/repos/asf/ignite/blob/7cf4b986/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index 0e840fe..5cfb1a2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -505,6 +505,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
m.writes(),
m.near(),
txMapping.transactionNodes(),
+ false,
m.last(),
tx.onePhaseCommit(),
tx.needReturnValue() && tx.implicit(),
http://git-wip-us.apache.org/repos/asf/ignite/blob/7cf4b986/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java
index 8e224c8..e954821 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java
@@ -35,6 +35,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedExceptio
import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
import org.apache.ignite.internal.processors.cache.GridCacheMvccFuture;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
import org.apache.ignite.internal.processors.cache.transactions.TxDeadlock;
@@ -378,6 +379,11 @@ public final class GridLocalLockFuture<K, V> extends GridFutureAdapter<Boolean>
}
/** {@inheritDoc} */
+ @Override public IgniteInternalTx tx() {
+ return tx;
+ }
+
+ /** {@inheritDoc} */
@SuppressWarnings({"ThrowableInstanceNeverThrown"})
@Override public boolean cancel() {
if (onCancelled()) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/7cf4b986/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index 33c8a18..aaba610 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -969,8 +969,10 @@ public class IgniteTxHandler {
req.partition(),
req.nearXidVersion(),
req.nearFutureId(),
- null, //req.nearMiniId(),
+ req.nearMiniId(),
req.deployInfo() != null);
+
+ nearRes.nearNodeResponse(true);
}
else {
res = new GridDhtTxPrepareResponse(
[2/8] ignite git commit: ignite-4768
Posted by sb...@apache.org.
ignite-4768
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d3d4a36b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d3d4a36b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d3d4a36b
Branch: refs/heads/ignite-4768-1
Commit: d3d4a36b4f0c71b5635dadd9211f73e728e29483
Parents: 784b171
Author: sboikov <sb...@gridgain.com>
Authored: Thu Mar 9 16:38:54 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Mar 9 16:38:54 2017 +0300
----------------------------------------------------------------------
.../distributed/dht/GridDhtTxPrepareFuture.java | 85 +++++++++++---------
.../dht/GridDhtTxPrepareRequest.java | 12 +--
.../dht/GridDhtTxPrepareResponse.java | 20 ++---
3 files changed, 64 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/d3d4a36b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index d093b4a..735653d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -562,7 +562,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
* @return Mini future.
*/
@SuppressWarnings("ForLoopReplaceableByForEach")
- private MiniFuture miniFuture(IgniteUuid miniId) {
+ private MiniFuture miniFuture(int miniId) {
// We iterate directly over the futs collection here to avoid copy.
synchronized (sync) {
int size = futuresCountNoLock();
@@ -576,7 +576,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
MiniFuture mini = (MiniFuture)fut;
- if (mini.futureId().equals(miniId)) {
+ if (mini.futureId() == miniId) {
if (!mini.isDone())
return mini;
else
@@ -1233,6 +1233,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
return;
if (last) {
+ int miniId = 0;
+
assert tx.transactionNodes() != null;
final long timeout = timeoutObj != null ? timeoutObj.timeout : 0;
@@ -1257,7 +1259,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
if (tx.remainingTime() == -1)
return;
- MiniFuture fut = new MiniFuture(n.id(), dhtMapping, nearMapping);
+ MiniFuture fut = new MiniFuture(n.id(), ++miniId, dhtMapping, nearMapping);
add(fut); // Append new future.
@@ -1371,7 +1373,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
if (tx.remainingTime() == -1)
return;
- MiniFuture fut = new MiniFuture(nearMapping.primary().id(), null, nearMapping);
+ MiniFuture fut = new MiniFuture(nearMapping.primary().id(), ++miniId, null, nearMapping);
add(fut); // Append new future.
@@ -1481,25 +1483,35 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
try {
List<ClusterNode> dhtNodes = dht.topology().nodes(cached.partition(), tx.topologyVersion());
+ assert dhtNodes.size() > 0 && dhtNodes.get(0).id().equals(cctx.localNodeId()) : dhtNodes;
+
if (log.isDebugEnabled())
log.debug("Mapping entry to DHT nodes [nodes=" + U.toShortString(dhtNodes) +
", entry=" + entry + ']');
- // Exclude local node.
- map(entry, F.view(dhtNodes, F.remoteNodes(cctx.localNodeId())), dhtMap);
+ for (int i = 1; i < dhtNodes.size(); i++) {
+ ClusterNode node = dhtNodes.get(i);
+
+ addMapping(entry, node, dhtMap);
+ }
Collection<UUID> readers = cached.readers();
if (!F.isEmpty(readers)) {
- Collection<ClusterNode> nearNodes =
- cctx.discovery().nodes(readers, F0.not(F.idForNodeId(tx.nearNodeId())));
+ for (UUID readerId : readers) {
+ if (readerId.equals(tx.nearNodeId()))
+ continue;
- if (log.isDebugEnabled())
- log.debug("Mapping entry to near nodes [nodes=" + U.toShortString(nearNodes) +
- ", entry=" + entry + ']');
+ ClusterNode readerNode = cctx.discovery().node(readerId);
+
+ if (readerNode == null || dhtNodes.contains(readerNode))
+ continue;
+
+ if (log.isDebugEnabled())
+ log.debug("Mapping entry to near node [node=" + readerNode + ", entry=" + entry + ']');
- // Exclude DHT nodes.
- map(entry, F.view(nearNodes, F0.notIn(dhtNodes)), nearMap);
+ addMapping(entry, readerNode, nearMap);
+ }
}
else if (log.isDebugEnabled())
log.debug("Entry has no near readers: " + entry);
@@ -1516,39 +1528,35 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
/**
* @param entry Entry.
- * @param nodes Nodes.
+ * @param n Node.
* @param globalMap Map.
*/
- private void map(
+ private void addMapping(
IgniteTxEntry entry,
- Iterable<ClusterNode> nodes,
+ ClusterNode n,
Map<UUID, GridDistributedTxMapping> globalMap
) {
- if (nodes != null) {
- for (ClusterNode n : nodes) {
- GridDistributedTxMapping global = globalMap.get(n.id());
-
- if (!F.isEmpty(entry.entryProcessors())) {
- GridDhtPartitionState state = entry.context().topology().partitionState(n.id(),
- entry.cached().partition());
+ GridDistributedTxMapping global = globalMap.get(n.id());
- if (state != GridDhtPartitionState.OWNING && state != GridDhtPartitionState.EVICTED) {
- T2<GridCacheOperation, CacheObject> procVal = entry.entryProcessorCalculatedValue();
+ if (!F.isEmpty(entry.entryProcessors())) {
+ GridDhtPartitionState state = entry.context().topology().partitionState(n.id(),
+ entry.cached().partition());
- assert procVal != null : entry;
+ if (state != GridDhtPartitionState.OWNING && state != GridDhtPartitionState.EVICTED) {
+ T2<GridCacheOperation, CacheObject> procVal = entry.entryProcessorCalculatedValue();
- entry.op(procVal.get1());
- entry.value(procVal.get2(), true, false);
- entry.entryProcessors(null);
- }
- }
+ assert procVal != null : entry;
- if (global == null)
- globalMap.put(n.id(), global = new GridDistributedTxMapping(n));
-
- global.add(entry);
+ entry.op(procVal.get1());
+ entry.value(procVal.get2(), true, false);
+ entry.entryProcessors(null);
}
}
+
+ if (global == null)
+ globalMap.put(n.id(), global = new GridDistributedTxMapping(n));
+
+ global.add(entry);
}
/**
@@ -1602,7 +1610,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
private static final long serialVersionUID = 0L;
/** */
- private final IgniteUuid futId = IgniteUuid.randomUuid();
+ private final int futId;
/** Node ID. */
private UUID nodeId;
@@ -1617,17 +1625,20 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
/**
* @param nodeId Node ID.
+ * @param futId Future ID.
* @param dhtMapping Mapping.
* @param nearMapping nearMapping.
*/
MiniFuture(
UUID nodeId,
+ int futId,
GridDistributedTxMapping dhtMapping,
GridDistributedTxMapping nearMapping
) {
assert dhtMapping == null || nearMapping == null || dhtMapping.primary().equals(nearMapping.primary());
this.nodeId = nodeId;
+ this.futId = futId;
this.dhtMapping = dhtMapping;
this.nearMapping = nearMapping;
}
@@ -1635,7 +1646,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
/**
* @return Future ID.
*/
- IgniteUuid futureId() {
+ int futureId() {
return futId;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/d3d4a36b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
index 8c01302..8898803 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
@@ -59,7 +59,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
private IgniteUuid futId;
/** Mini future ID. */
- private IgniteUuid miniId;
+ private int miniId;
/** Topology version. */
private AffinityTopologyVersion topVer;
@@ -120,7 +120,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
*/
public GridDhtTxPrepareRequest(
IgniteUuid futId,
- IgniteUuid miniId,
+ int miniId,
AffinityTopologyVersion topVer,
GridDhtTxLocalAdapter tx,
long timeout,
@@ -145,7 +145,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
addDepInfo);
assert futId != null;
- assert miniId != null;
+ assert miniId != 0;
this.topVer = topVer;
this.futId = futId;
@@ -245,7 +245,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
/**
* @return Mini future ID.
*/
- public IgniteUuid miniId() {
+ public int miniId() {
return miniId;
}
@@ -361,7 +361,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
writer.incrementState();
case 22:
- if (!writer.writeIgniteUuid("miniId", miniId))
+ if (!writer.writeInt("miniId", miniId))
return false;
writer.incrementState();
@@ -453,7 +453,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
case 22:
- miniId = reader.readIgniteUuid("miniId");
+ miniId = reader.readInt("miniId");
if (!reader.isLastRead())
return false;
http://git-wip-us.apache.org/repos/asf/ignite/blob/d3d4a36b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
index fdead95..416540a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
@@ -58,7 +58,7 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
private IgniteUuid futId;
/** Mini future ID. */
- private IgniteUuid miniId;
+ private int miniId;
/** Invalid partitions by cache ID. */
@GridDirectMap(keyType = Integer.class, valueType = int[].class)
@@ -81,11 +81,11 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
* @param miniId Mini future ID.
* @param addDepInfo Deployment info flag.
*/
- public GridDhtTxPrepareResponse(GridCacheVersion xid, IgniteUuid futId, IgniteUuid miniId, boolean addDepInfo) {
+ public GridDhtTxPrepareResponse(GridCacheVersion xid, IgniteUuid futId, int miniId, boolean addDepInfo) {
super(xid, addDepInfo);
assert futId != null;
- assert miniId != null;
+ assert miniId != 0;
this.futId = futId;
this.miniId = miniId;
@@ -98,12 +98,12 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
* @param err Error.
* @param addDepInfo Deployment enabled.
*/
- public GridDhtTxPrepareResponse(GridCacheVersion xid, IgniteUuid futId, IgniteUuid miniId, Throwable err,
+ public GridDhtTxPrepareResponse(GridCacheVersion xid, IgniteUuid futId, int miniId, Throwable err,
boolean addDepInfo) {
super(xid, err, addDepInfo);
assert futId != null;
- assert miniId != null;
+ assert miniId != 0;
this.futId = futId;
this.miniId = miniId;
@@ -112,7 +112,7 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
/**
* @return Evicted readers.
*/
- public Collection<IgniteTxKey> nearEvicted() {
+ Collection<IgniteTxKey> nearEvicted() {
return nearEvicted;
}
@@ -133,14 +133,14 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
/**
* @return Mini future ID.
*/
- public IgniteUuid miniId() {
+ public int miniId() {
return miniId;
}
/**
* @return Map from cacheId to an array of invalid partitions.
*/
- public Map<Integer, int[]> invalidPartitionsByCacheId() {
+ Map<Integer, int[]> invalidPartitionsByCacheId() {
return invalidParts;
}
@@ -250,7 +250,7 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
writer.incrementState();
case 10:
- if (!writer.writeIgniteUuid("miniId", miniId))
+ if (!writer.writeInt("miniId", miniId))
return false;
writer.incrementState();
@@ -300,7 +300,7 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
reader.incrementState();
case 10:
- miniId = reader.readIgniteUuid("miniId");
+ miniId = reader.readInt("miniId");
if (!reader.isLastRead())
return false;
[7/8] ignite git commit: ignite-4768
Posted by sb...@apache.org.
http://git-wip-us.apache.org/repos/asf/ignite/blob/26316bb0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
index d9b648c..66fe902 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
@@ -344,61 +344,61 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
}
switch (writer.state()) {
- case 8:
+ case 10:
if (!writer.writeMessage("clientRemapVer", clientRemapVer))
return false;
writer.incrementState();
- case 9:
+ case 11:
if (!writer.writeMessage("dhtVer", dhtVer))
return false;
writer.incrementState();
- case 10:
+ case 12:
if (!writer.writeCollection("filterFailedKeys", filterFailedKeys, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
- case 11:
+ case 13:
if (!writer.writeIgniteUuid("futId", futId))
return false;
writer.incrementState();
- case 12:
+ case 14:
if (!writer.writeInt("miniId", miniId))
return false;
writer.incrementState();
- case 13:
+ case 15:
if (!writer.writeCollection("ownedValKeys", ownedValKeys, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
- case 14:
+ case 16:
if (!writer.writeCollection("ownedValVals", ownedValVals, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
- case 15:
+ case 17:
if (!writer.writeCollection("pending", pending, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
- case 16:
+ case 18:
if (!writer.writeMessage("retVal", retVal))
return false;
writer.incrementState();
- case 17:
+ case 19:
if (!writer.writeMessage("writeVer", writeVer))
return false;
@@ -420,7 +420,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
return false;
switch (reader.state()) {
- case 8:
+ case 10:
clientRemapVer = reader.readMessage("clientRemapVer");
if (!reader.isLastRead())
@@ -428,7 +428,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
reader.incrementState();
- case 9:
+ case 11:
dhtVer = reader.readMessage("dhtVer");
if (!reader.isLastRead())
@@ -436,7 +436,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
reader.incrementState();
- case 10:
+ case 12:
filterFailedKeys = reader.readCollection("filterFailedKeys", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
@@ -444,7 +444,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
reader.incrementState();
- case 11:
+ case 13:
futId = reader.readIgniteUuid("futId");
if (!reader.isLastRead())
@@ -452,7 +452,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
reader.incrementState();
- case 12:
+ case 14:
miniId = reader.readInt("miniId");
if (!reader.isLastRead())
@@ -460,7 +460,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
reader.incrementState();
- case 13:
+ case 15:
ownedValKeys = reader.readCollection("ownedValKeys", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
@@ -468,7 +468,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
reader.incrementState();
- case 14:
+ case 16:
ownedValVals = reader.readCollection("ownedValVals", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
@@ -476,7 +476,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
reader.incrementState();
- case 15:
+ case 17:
pending = reader.readCollection("pending", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
@@ -484,7 +484,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
reader.incrementState();
- case 16:
+ case 18:
retVal = reader.readMessage("retVal");
if (!reader.isLastRead())
@@ -492,7 +492,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
reader.incrementState();
- case 17:
+ case 19:
writeVer = reader.readMessage("writeVer");
if (!reader.isLastRead())
@@ -512,7 +512,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 18;
+ return 20;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/26316bb0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index aaba610..22e6b69 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -582,7 +582,7 @@ public class IgniteTxHandler {
res.txState(tx.txState());
- fut.onResult(nodeId, res);
+ fut.onPrimaryResponse(nodeId, res);
}
/**
@@ -607,7 +607,7 @@ public class IgniteTxHandler {
return;
}
- fut.onResult(nodeId, res);
+ fut.onPrimaryResponse(nodeId, res);
}
/**
@@ -650,7 +650,7 @@ public class IgniteTxHandler {
assert nodeId != null;
assert res != null;
- if (res.checkCommitted()) {
+ if (res.nearNodeResponse() || res.checkCommitted()) {
GridNearTxFinishFuture fut = (GridNearTxFinishFuture)ctx.mvcc().<IgniteInternalTx>future(res.futureId());
if (fut == null) {
@@ -669,7 +669,7 @@ public class IgniteTxHandler {
", node=" + nodeId + ']');
}
- fut.onResult(nodeId, res);
+ fut.onDhtResponse(nodeId, res);
}
else {
GridDhtTxFinishFuture fut = (GridDhtTxFinishFuture)ctx.mvcc().<IgniteInternalTx>future(res.futureId());
@@ -803,8 +803,13 @@ public class IgniteTxHandler {
", commit=" + req.commit() + ']');
// Always send finish response.
- GridCacheMessage res = new GridNearTxFinishResponse(req.version(), req.threadId(), req.futureId(),
- req.miniId(), new IgniteCheckedException("Transaction has been already completed."));
+ GridCacheMessage res = new GridNearTxFinishResponse(
+ req.partition(),
+ req.version(),
+ req.threadId(),
+ req.futureId(),
+ req.miniId(),
+ new IgniteCheckedException("Transaction has been already completed."));
try {
ctx.io().send(nodeId, res, req.policy());
@@ -844,14 +849,9 @@ public class IgniteTxHandler {
try {
assert tx != null : "Transaction is null for near finish request [nodeId=" +
nodeId + ", req=" + req + "]";
+ assert req.syncMode() != null : req;
- if (req.syncMode() == null) {
- boolean sync = req.commit() ? req.syncCommit() : req.syncRollback();
-
- tx.syncMode(sync ? FULL_SYNC : FULL_ASYNC);
- }
- else
- tx.syncMode(req.syncMode());
+ tx.syncMode(req.syncMode());
if (req.commit()) {
tx.storeEnabled(req.storeEnabled());
@@ -1121,7 +1121,7 @@ public class IgniteTxHandler {
if (req.checkCommitted()) {
boolean committed = req.waitRemoteTransactions() || !ctx.tm().addRolledbackTx(null, req.version());
- if (!committed || !req.syncCommit())
+ if (!committed || req.syncMode() != FULL_SYNC)
sendReply(nodeId, req, committed, null);
else {
IgniteInternalFuture<?> fut = ctx.tm().remoteTxFinishFuture(req.version());
@@ -1359,7 +1359,11 @@ public class IgniteTxHandler {
*/
private void sendReply(UUID nodeId, GridDhtTxFinishRequest req, boolean committed, GridCacheVersion nearTxId) {
if (req.replyRequired() || req.checkCommitted()) {
- GridDhtTxFinishResponse res = new GridDhtTxFinishResponse(req.version(), req.futureId(), req.miniId());
+ GridDhtTxFinishResponse res = new GridDhtTxFinishResponse(
+ req.partition(),
+ req.version(),
+ req.futureId(),
+ req.miniId());
if (req.checkCommitted()) {
res.checkCommitted(true);
[5/8] ignite git commit: Merge remote-tracking branch
'remotes/origin/ignite-4768' into ignite-4768-1
Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/origin/ignite-4768' into ignite-4768-1
# Conflicts:
# modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
# modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
# modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
# modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
# modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
# modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e1961ff6
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e1961ff6
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e1961ff6
Branch: refs/heads/ignite-4768-1
Commit: e1961ff620d8f73469101da96eccda9ed2eddc99
Parents: 5596e69 eeb10b8
Author: sboikov <sb...@gridgain.com>
Authored: Fri Mar 10 13:09:28 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Mar 10 13:09:28 2017 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheMessage.java | 11 +++++++++++
.../processors/cache/KeyCacheObjectImpl.java | 9 +++++++++
.../GridDistributedTxPrepareRequest.java | 16 ++++++++++++++++
.../distributed/dht/GridDhtTxPrepareFuture.java | 20 +++++++++++++-------
.../dht/GridDhtTxPrepareRequest.java | 12 ++++++------
.../dht/GridDhtTxPrepareResponse.java | 16 ++++++++--------
.../distributed/near/GridNearLockFuture.java | 2 ++
...arOptimisticSerializableTxPrepareFuture.java | 8 ++++----
.../near/GridNearOptimisticTxPrepareFuture.java | 1 -
.../near/GridNearTxPrepareRequest.java | 17 ++++++++++++++++-
10 files changed, 85 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/e1961ff6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/e1961ff6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 454c8fb,735653d..a787f5f
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@@ -1272,13 -1259,9 +1276,13 @@@ public final class GridDhtTxPrepareFutu
if (tx.remainingTime() == -1)
return;
- MiniFuture fut = new MiniFuture(n.id(), ++miniId, dhtMapping, nearMapping);
+ MiniFuture fut = null;
- add(fut); // Append new future.
+ if (!tx.dhtReplyNear()) {
- fut = new MiniFuture(n.id(), dhtMapping, nearMapping);
++ fut = new MiniFuture(n.id(), ++miniId, dhtMapping, nearMapping);
+
+ add(fut); // Append new future.
+ }
assert txNodes != null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/e1961ff6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
index fe2d293,8898803..85a65a8
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
@@@ -59,14 -59,8 +59,14 @@@ public class GridDhtTxPrepareRequest ex
private IgniteUuid futId;
/** Mini future ID. */
- private IgniteUuid miniId;
+ private int miniId;
+ /** Future ID. */
+ private IgniteUuid nearFutId;
+
+ /** Mini future ID. */
+ private int nearMiniId;
+
/** Topology version. */
private AffinityTopologyVersion topVer;
@@@ -128,9 -120,7 +128,9 @@@
*/
public GridDhtTxPrepareRequest(
IgniteUuid futId,
- IgniteUuid miniId,
+ int miniId,
+ IgniteUuid nearFutId,
+ int nearMiniId,
AffinityTopologyVersion topVer,
GridDhtTxLocalAdapter tx,
long timeout,
@@@ -156,8 -144,8 +156,8 @@@
onePhaseCommit,
addDepInfo);
- assert dhtNearReply || (futId != null && miniId != null);
- assert futId != null;
- assert miniId != 0;
++ assert dhtNearReply || (futId != null && miniId != 0);
+ assert !dhtNearReply || (nearFutId != null && nearMiniId != 0);
this.topVer = topVer;
this.futId = futId;
http://git-wip-us.apache.org/repos/asf/ignite/blob/e1961ff6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
index c37ac80,416540a..cb10374
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
@@@ -85,16 -81,11 +85,16 @@@ public class GridDhtTxPrepareResponse e
* @param miniId Mini future ID.
* @param addDepInfo Deployment info flag.
*/
- public GridDhtTxPrepareResponse(GridCacheVersion xid, IgniteUuid futId, int miniId, boolean addDepInfo) {
- super(xid, addDepInfo);
+ public GridDhtTxPrepareResponse(
+ int part,
+ GridCacheVersion xid,
+ IgniteUuid futId,
- IgniteUuid miniId,
++ int miniId,
+ boolean addDepInfo) {
+ super(part, xid, addDepInfo);
assert futId != null;
- assert miniId != null;
+ assert miniId != 0;
this.futId = futId;
this.miniId = miniId;
@@@ -108,17 -98,12 +108,17 @@@
* @param err Error.
* @param addDepInfo Deployment enabled.
*/
- public GridDhtTxPrepareResponse(GridCacheVersion xid, IgniteUuid futId, int miniId, Throwable err,
+ public GridDhtTxPrepareResponse(
+ int part,
+ GridCacheVersion xid,
+ IgniteUuid futId,
- IgniteUuid miniId,
++ int miniId,
+ Throwable err,
boolean addDepInfo) {
- super(xid, err, addDepInfo);
+ super(part, xid, err, addDepInfo);
assert futId != null;
- assert miniId != null;
+ assert miniId != 0;
this.futId = futId;
this.miniId = miniId;
http://git-wip-us.apache.org/repos/asf/ignite/blob/e1961ff6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/e1961ff6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/e1961ff6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
----------------------------------------------------------------------
[3/8] ignite git commit: ignite-4768
Posted by sb...@apache.org.
ignite-4768
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/eeb10b85
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/eeb10b85
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/eeb10b85
Branch: refs/heads/ignite-4768-1
Commit: eeb10b85ba41b14d0beb590a32bff06225c7de57
Parents: d3d4a36
Author: sboikov <sb...@gridgain.com>
Authored: Fri Mar 10 11:00:04 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Mar 10 11:04:42 2017 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheIoManager.java | 3 +++
.../processors/cache/GridCacheMessage.java | 11 +++++++++++
.../processors/cache/KeyCacheObjectImpl.java | 9 +++++++++
.../GridDistributedTxPrepareRequest.java | 16 ++++++++++++++++
...dNearOptimisticSerializableTxPrepareFuture.java | 7 ++++---
.../distributed/near/GridNearTxPrepareRequest.java | 17 ++++++++++++++++-
6 files changed, 59 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/eeb10b85/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 50f58cc..7cac367 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -367,6 +367,9 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
unmarshall(nodeId, cacheMsg);
+// if (!cacheMsg.partitionExchangeMessage())
+// log.info("Message [from=" + nodeId + ", msg=" + cacheMsg + ']');
+
if (cacheMsg.classError() != null)
processFailedMessage(nodeId, cacheMsg, c);
else
http://git-wip-us.apache.org/repos/asf/ignite/blob/eeb10b85/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
index 0646d5a..023d12c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
@@ -701,6 +701,17 @@ public abstract class GridCacheMessage implements Message {
return reader.afterMessageRead(GridCacheMessage.class);
}
+ /**
+ * @param str Bulder.
+ * @param name Flag name.
+ */
+ protected final void appendFlag(StringBuilder str, String name) {
+ if (str.length() > 0)
+ str.append('|');
+
+ str.append(name);
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridCacheMessage.class, this, "cacheId", cacheId);
http://git-wip-us.apache.org/repos/asf/ignite/blob/eeb10b85/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java
index 4f8570c..48797b8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache;
import java.nio.ByteBuffer;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.jetbrains.annotations.Nullable;
@@ -200,4 +201,12 @@ public class KeyCacheObjectImpl extends CacheObjectAdapter implements KeyCacheOb
return val.equals(other.val);
}
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(S.INCLUDE_SENSITIVE ? getClass().getSimpleName() : "KeyCacheObject",
+ "part", part, true,
+ "val", val, true,
+ "hasValBytes", valBytes != null, false);
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/eeb10b85/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
index 329dc8b..b5848a7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
@@ -40,6 +40,7 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxStateAwa
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.UUIDCollectionMessage;
import org.apache.ignite.internal.util.tostring.GridToStringBuilder;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.C1;
import org.apache.ignite.internal.util.typedef.F;
@@ -150,6 +151,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
private IgniteTxState txState;
/** */
+ @GridToStringExclude
private byte flags;
/**
@@ -682,7 +684,21 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
/** {@inheritDoc} */
@Override public String toString() {
+ StringBuilder flags = new StringBuilder();
+
+ if (needReturnValue())
+ flags.append("retVal");
+ if (isInvalidate())
+ flags.append("invalidate");
+ if (onePhaseCommit())
+ flags.append("onePhase");
+ if (last())
+ flags.append("last");
+ if (system())
+ flags.append("sys");
+
return GridToStringBuilder.toString(GridDistributedTxPrepareRequest.class, this,
+ "flags", flags.toString(),
"super", super.toString());
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/eeb10b85/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
index f8e0584..80508dc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
@@ -75,6 +75,9 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
@GridToStringExclude
private ClientRemapFuture remapFut;
+ /** */
+ private int miniId;
+
/**
* @param cctx Context.
* @param tx Transaction.
@@ -222,7 +225,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
* @return Mini future.
*/
@SuppressWarnings("ForLoopReplaceableByForEach")
- private MiniFuture miniFuture(long miniId) {
+ private MiniFuture miniFuture(int miniId) {
// We iterate directly over the futs collection here to avoid copy.
synchronized (sync) {
int size = futuresCountNoLock();
@@ -362,8 +365,6 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
checkOnePhase(txMapping);
- int miniId = 0;
-
for (GridDistributedTxMapping m : mappings.values()) {
assert !m.empty();
http://git-wip-us.apache.org/repos/asf/ignite/blob/eeb10b85/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
index cccc7b4..ffeeb51 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
@@ -28,6 +28,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxPrepareRequest;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgniteUuid;
@@ -70,6 +71,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
private int taskNameHash;
/** */
+ @GridToStringExclude
private byte flags;
/**
@@ -402,6 +404,19 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
/** {@inheritDoc} */
@Override public String toString() {
- return S.toString(GridNearTxPrepareRequest.class, this, super.toString());
+ StringBuilder flags = new StringBuilder();
+
+ if (near())
+ flags.append("near");
+ if (firstClientRequest())
+ flags.append("clientReq");
+ if (implicitSingle())
+ flags.append("single");
+ if (explicitLock())
+ flags.append("explicitLock");
+
+ return S.toString(GridNearTxPrepareRequest.class, this,
+ "flags", flags.toString(),
+ "super", super.toString());
}
}
[4/8] ignite git commit: ignite-4768
Posted by sb...@apache.org.
ignite-4768
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5596e69f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5596e69f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5596e69f
Branch: refs/heads/ignite-4768-1
Commit: 5596e69f021a8e3da21479a11ee6079314e33144
Parents: 4b44091
Author: sboikov <sb...@gridgain.com>
Authored: Fri Mar 10 12:58:34 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Mar 10 12:58:34 2017 +0300
----------------------------------------------------------------------
.../communication/GridIoMessageFactory.java | 5 -
.../processors/cache/GridCacheIoManager.java | 2 +
.../processors/cache/GridCacheMvccFuture.java | 3 +
.../GridDistributedTxPrepareResponse.java | 44 +++-
.../dht/GridDhtTxNearPrepareResponse.java | 232 -------------------
.../distributed/dht/GridDhtTxPrepareFuture.java | 1 +
.../dht/GridDhtTxPrepareResponse.java | 43 +++-
...arOptimisticSerializableTxPrepareFuture.java | 4 +-
.../near/GridNearOptimisticTxPrepareFuture.java | 4 +-
.../GridNearPessimisticTxPrepareFuture.java | 9 +-
.../near/GridNearTxFinishFuture.java | 40 ++--
.../near/GridNearTxPrepareFutureAdapter.java | 9 +-
.../near/GridNearTxPrepareResponse.java | 4 +-
.../cache/transactions/IgniteTxHandler.java | 62 ++---
modules/yardstick/config/ignite-base-config.xml | 171 +-------------
.../yardstick/cache/IgnitePutTxBenchmark.java | 5 +-
16 files changed, 150 insertions(+), 488 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/5596e69f/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index df3d7e8..ce2c72c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -174,11 +174,6 @@ public class GridIoMessageFactory implements MessageFactory {
Message msg = null;
switch (type) {
- case -50:
- msg = new GridDhtTxNearPrepareResponse();
-
- break;
-
case -44:
msg = new TcpCommunicationSpi.HandshakeMessage2();
http://git-wip-us.apache.org/repos/asf/ignite/blob/5596e69f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 7cac367..71f4e1c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -547,6 +547,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
GridDhtTxPrepareRequest req = (GridDhtTxPrepareRequest)msg;
GridDhtTxPrepareResponse res = new GridDhtTxPrepareResponse(
+ req.partition(),
req.version(),
req.futureId(),
req.miniId(),
@@ -672,6 +673,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
GridNearTxPrepareRequest req = (GridNearTxPrepareRequest)msg;
GridNearTxPrepareResponse res = new GridNearTxPrepareResponse(
+ req.partition(),
req.version(),
req.futureId(),
req.miniId(),
http://git-wip-us.apache.org/repos/asf/ignite/blob/5596e69f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccFuture.java
index 080a6f1..a0acf18 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccFuture.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.cache;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
/**
@@ -34,4 +35,6 @@ public interface GridCacheMvccFuture<T> extends GridCacheFuture<T> {
* @return {@code True} if future cares about this entry.
*/
public boolean onOwnerChanged(GridCacheEntryEx entry, GridCacheMvccCandidate owner);
+
+ public IgniteInternalTx tx();
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/5596e69f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java
index 99f36c2..c19b8c1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java
@@ -51,6 +51,12 @@ public class GridDistributedTxPrepareResponse extends GridDistributedBaseMessage
@GridDirectTransient
private IgniteTxState txState;
+ /** */
+ private int part;
+
+ /** */
+ private byte flags;
+
/**
* Empty constructor (required by {@link Externalizable}).
*/
@@ -59,24 +65,54 @@ public class GridDistributedTxPrepareResponse extends GridDistributedBaseMessage
}
/**
- * @param xid Transaction ID.
+ * @param part Partition.
+ * @param xid Lock or transaction ID.
* @param addDepInfo Deployment info flag.
*/
- public GridDistributedTxPrepareResponse(GridCacheVersion xid, boolean addDepInfo) {
+ public GridDistributedTxPrepareResponse(int part, GridCacheVersion xid, boolean addDepInfo) {
super(xid, 0, addDepInfo);
+
+ this.part = part;
}
/**
- * @param xid Lock ID.
+ * @param part Partition.
+ * @param xid Lock or transaction ID.
* @param err Error.
* @param addDepInfo Deployment info flag.
*/
- public GridDistributedTxPrepareResponse(GridCacheVersion xid, Throwable err, boolean addDepInfo) {
+ public GridDistributedTxPrepareResponse(int part, GridCacheVersion xid, Throwable err, boolean addDepInfo) {
super(xid, 0, addDepInfo);
+ this.part = part;
this.err = err;
}
+ /**
+ * Sets flag mask.
+ *
+ * @param flag Set or clear.
+ * @param mask Mask.
+ */
+ protected final void setFlag(boolean flag, int mask) {
+ flags = flag ? (byte)(flags | mask) : (byte)(flags & ~mask);
+ }
+
+ /**
+ * Reags flag mask.
+ *
+ * @param mask Mask to read.
+ * @return Flag value.
+ */
+ protected final boolean isFlag(int mask) {
+ return (flags & mask) != 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int partition() {
+ return part;
+ }
+
/** {@inheritDoc} */
@Override public Throwable error() {
return err;
http://git-wip-us.apache.org/repos/asf/ignite/blob/5596e69f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxNearPrepareResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxNearPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxNearPrepareResponse.java
deleted file mode 100644
index e582bd2..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxNearPrepareResponse.java
+++ /dev/null
@@ -1,232 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.dht;
-
-import java.nio.ByteBuffer;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.internal.GridDirectTransient;
-import org.apache.ignite.internal.processors.cache.GridCacheMessage;
-import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteTxState;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteTxStateAware;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.lang.IgniteUuid;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
-
-/**
- *
- */
-public class GridDhtTxNearPrepareResponse extends GridCacheMessage implements IgniteTxStateAware {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** */
- private int partId;
-
- /** */
- private GridCacheVersion nearTxId;
-
- /** Future ID. */
- private IgniteUuid futId;
-
- /** Mini future ID. */
- private int miniId;
-
- /** Transient TX state. */
- @GridDirectTransient
- private IgniteTxState txState;
-
- /**
- *
- */
- public GridDhtTxNearPrepareResponse() {
- // No-op.
- }
-
- /**
- * @param partId Partition ID.
- * @param nearTxId Near transaction ID.
- * @param futId Future ID.
- * @param miniId Mini future ID.
- */
- public GridDhtTxNearPrepareResponse(int partId, GridCacheVersion nearTxId, IgniteUuid futId, int miniId) {
- assert nearTxId != null;
- assert futId != null;
- assert miniId > 0;
-
- this.partId = partId;
- this.nearTxId = nearTxId;
- this.futId = futId;
- this.miniId = miniId;
- }
-
- /**
- * @return Near transaction ID.
- */
- public GridCacheVersion nearTxId() {
- return nearTxId;
- }
-
- /**
- * @return Future ID.
- */
- public IgniteUuid futureId() {
- return futId;
- }
-
- /**
- * @return Mini future ID.
- */
- public int miniId() {
- return miniId;
- }
-
- /** {@inheritDoc} */
- @Override public int partition() {
- return partId;
- }
-
- /** {@inheritDoc} */
- @Override public IgniteTxState txState() {
- return txState;
- }
-
- /** {@inheritDoc} */
- @Override public void txState(IgniteTxState txState) {
- this.txState = txState;
- }
-
- /** {@inheritDoc} */
- @Override public IgniteLogger messageLogger(GridCacheSharedContext ctx) {
- return ctx.txPrepareMessageLogger();
- }
-
- /** {@inheritDoc} */
- @Override public byte directType() {
- return -50;
- }
-
- /** {@inheritDoc} */
- @Override public byte fieldsCount() {
- return 7;
- }
-
- /** {@inheritDoc} */
- @Override public boolean addDeploymentInfo() {
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!super.writeTo(buf, writer))
- return false;
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType(), fieldsCount()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- switch (writer.state()) {
- case 3:
- if (!writer.writeIgniteUuid("futId", futId))
- return false;
-
- writer.incrementState();
-
- case 4:
- if (!writer.writeInt("miniId", miniId))
- return false;
-
- writer.incrementState();
-
- case 5:
- if (!writer.writeMessage("nearTxId", nearTxId))
- return false;
-
- writer.incrementState();
-
- case 6:
- if (!writer.writeInt("partId", partId))
- return false;
-
- writer.incrementState();
-
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- if (!reader.beforeMessageRead())
- return false;
-
- if (!super.readFrom(buf, reader))
- return false;
-
- switch (reader.state()) {
- case 3:
- futId = reader.readIgniteUuid("futId");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 4:
- miniId = reader.readInt("miniId");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 5:
- nearTxId = reader.readMessage("nearTxId");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 6:
- partId = reader.readInt("partId");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- }
-
- return reader.afterMessageRead(GridDhtTxNearPrepareResponse.class);
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridDhtTxNearPrepareResponse.class, this);
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5596e69f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 7f0cddd..454c8fb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -858,6 +858,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
assert F.isEmpty(tx.invalidPartitions());
GridNearTxPrepareResponse res = new GridNearTxPrepareResponse(
+ -1,
tx.nearXidVersion(),
tx.colocated() ? tx.xid() : tx.nearFutureId(),
nearMiniId,
http://git-wip-us.apache.org/repos/asf/ignite/blob/5596e69f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
index fdead95..c37ac80 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
@@ -49,6 +49,9 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
/** */
private static final long serialVersionUID = 0L;
+ /** */
+ private static final int NEAR_RES_FLAG_MASK = 0x01;
+
/** Evicted readers. */
@GridToStringInclude
@GridDirectCollection(IgniteTxKey.class)
@@ -76,13 +79,19 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
}
/**
+ * @param part Partition.
* @param xid Xid version.
* @param futId Future ID.
* @param miniId Mini future ID.
* @param addDepInfo Deployment info flag.
*/
- public GridDhtTxPrepareResponse(GridCacheVersion xid, IgniteUuid futId, IgniteUuid miniId, boolean addDepInfo) {
- super(xid, addDepInfo);
+ public GridDhtTxPrepareResponse(
+ int part,
+ GridCacheVersion xid,
+ IgniteUuid futId,
+ IgniteUuid miniId,
+ boolean addDepInfo) {
+ super(part, xid, addDepInfo);
assert futId != null;
assert miniId != null;
@@ -92,15 +101,21 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
}
/**
+ * @param part Partition.
* @param xid Xid version.
* @param futId Future ID.
* @param miniId Mini future ID.
* @param err Error.
* @param addDepInfo Deployment enabled.
*/
- public GridDhtTxPrepareResponse(GridCacheVersion xid, IgniteUuid futId, IgniteUuid miniId, Throwable err,
+ public GridDhtTxPrepareResponse(
+ int part,
+ GridCacheVersion xid,
+ IgniteUuid futId,
+ IgniteUuid miniId,
+ Throwable err,
boolean addDepInfo) {
- super(xid, err, addDepInfo);
+ super(part, xid, err, addDepInfo);
assert futId != null;
assert miniId != null;
@@ -110,9 +125,23 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
}
/**
+ * @return {@code True} if this is reply for near node.
+ */
+ public boolean nearNodeResponse() {
+ return isFlag(NEAR_RES_FLAG_MASK);
+ }
+
+ /**
+ * @param val {@code True} if this is reply for near node.
+ */
+ void nearNodeResponse(boolean val) {
+ setFlag(val, NEAR_RES_FLAG_MASK);
+ }
+
+ /**
* @return Evicted readers.
*/
- public Collection<IgniteTxKey> nearEvicted() {
+ Collection<IgniteTxKey> nearEvicted() {
return nearEvicted;
}
@@ -140,7 +169,7 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
/**
* @return Map from cacheId to an array of invalid partitions.
*/
- public Map<Integer, int[]> invalidPartitionsByCacheId() {
+ Map<Integer, int[]> invalidPartitionsByCacheId() {
return invalidParts;
}
@@ -156,7 +185,7 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
*
* @return Collection of entry infos need to be preloaded.
*/
- public Collection<GridCacheEntryInfo> preloadEntries() {
+ Collection<GridCacheEntryInfo> preloadEntries() {
return preloadEntries == null ? Collections.<GridCacheEntryInfo>emptyList() : preloadEntries;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5596e69f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
index fcd714b..3d919f9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
@@ -37,7 +37,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxMapping;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxNearPrepareResponse;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareResponse;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -202,7 +202,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
}
/** {@inheritDoc} */
- @Override public void onDhtResponse(UUID nodeId, GridDhtTxNearPrepareResponse res) {
+ @Override public void onDhtResponse(UUID nodeId, GridDhtTxPrepareResponse res) {
assert false; // TODO IGNITE-4768.
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5596e69f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index 9a7f500..5cfb1a2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -40,7 +40,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxMapping;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxNearPrepareResponse;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareResponse;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
@@ -203,7 +203,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
}
/** {@inheritDoc} */
- @Override public void onDhtResponse(UUID nodeId, GridDhtTxNearPrepareResponse res) {
+ @Override public void onDhtResponse(UUID nodeId, GridDhtTxPrepareResponse res) {
assert false; // TODO IGNITE-4768.
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5596e69f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
index ed3f2f1..b476336 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
@@ -37,7 +37,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxMapping;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxNearPrepareResponse;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareResponse;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
@@ -126,7 +126,9 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
}
/** {@inheritDoc} */
- @Override public void onDhtResponse(UUID nodeId, GridDhtTxNearPrepareResponse res) {
+ @Override public void onDhtResponse(UUID nodeId, GridDhtTxPrepareResponse res) {
+ assert res.nearNodeResponse() : res;
+
MiniFuture f = miniFuture(res.miniId());
if (f != null)
@@ -438,6 +440,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
this.m = m;
this.futId = futId;
+ // TODO: IGNITE-4768, check nodes alive.
if (req.dhtReplyNear()) {
dhtNodes = new HashSet<>(req.transactionNodes().get(m.primary().id()));
@@ -510,7 +513,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
* @param nodeId Node ID.
* @param res Response.
*/
- void onDhtResponse(UUID nodeId, GridDhtTxNearPrepareResponse res) {
+ void onDhtResponse(UUID nodeId, GridDhtTxPrepareResponse res) {
assert dhtNodes != null;
boolean done;
http://git-wip-us.apache.org/repos/asf/ignite/blob/5596e69f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index c13650e..06cf878 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -256,7 +256,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
CheckRemoteTxMiniFuture f = (CheckRemoteTxMiniFuture)fut;
if (f.futureId().equals(res.miniId()))
- f.onDhtFinishResponse(nodeId, false);
+ f.onDhtFinishResponse(nodeId);
}
}
@@ -690,6 +690,8 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
tx.activeCachesDeploymentEnabled()
);
+ boolean dhtReplyNear = tx.dhtReplyNear() && syncMode == FULL_SYNC;
+
// If this is the primary node for the keys.
if (n.isLocal()) {
req.miniId(IgniteUuid.randomUuid());
@@ -701,7 +703,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
add(fut);
}
else {
- FinishMiniFuture fut = new FinishMiniFuture(m);
+ FinishMiniFuture fut = new FinishMiniFuture(m, dhtReplyNear);
req.miniId(fut.futureId());
@@ -862,11 +864,22 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
@GridToStringInclude
private GridDistributedTxMapping m;
+ /** */
+ private final Set<UUID> dhtNodes;
+
/**
* @param m Mapping.
*/
- FinishMiniFuture(GridDistributedTxMapping m) {
+ FinishMiniFuture(GridDistributedTxMapping m, boolean dhtReplyNear) {
this.m = m;
+
+ if (dhtReplyNear) {
+ dhtNodes = new HashSet<>(tx.transactionNodes().get(m.primary().id()));
+
+ assert !dhtNodes.isEmpty();
+ }
+ else
+ dhtNodes = null;
}
/**
@@ -876,13 +889,6 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
return m.primary();
}
- /**
- * @return Keys.
- */
- public GridDistributedTxMapping mapping() {
- return m;
- }
-
/** {@inheritDoc} */
boolean onNodeLeft(UUID nodeId, boolean discoThread) {
if (nodeId.equals(m.primary().id())) {
@@ -913,7 +919,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
fut.listen(new CI1<IgniteInternalFuture<?>>() {
@Override public void apply(IgniteInternalFuture<?> fut) {
- mini.onDhtFinishResponse(cctx.localNodeId(), true);
+ mini.onDhtFinishResponse(cctx.localNodeId());
}
});
}
@@ -930,7 +936,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
}
}
else
- mini.onDhtFinishResponse(backupId, true);
+ mini.onDhtFinishResponse(backupId);
}
}
}
@@ -948,6 +954,8 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
* @param res Result callback.
*/
void onNearFinishResponse(GridNearTxFinishResponse res) {
+ assert dhtNodes == null || res.error() != null;
+
if (res.error() != null)
onDone(res.error());
else
@@ -956,7 +964,10 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
/** {@inheritDoc} */
@Override public String toString() {
- return S.toString(FinishMiniFuture.class, this, "done", isDone(), "cancelled", isCancelled(), "err", error());
+ return S.toString(FinishMiniFuture.class, this,
+ "done", isDone(),
+ "cancelled", isCancelled(),
+ "err", error());
}
}
@@ -1055,9 +1066,8 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
/**
* @param nodeId Node ID.
- * @param discoThread {@code True} if executed from discovery thread.
*/
- void onDhtFinishResponse(UUID nodeId, boolean discoThread) {
+ void onDhtFinishResponse(UUID nodeId) {
onResponse(nodeId);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5596e69f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
index 7f94e9f..71d2353 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
@@ -30,7 +30,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheMvccFuture;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxMapping;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxNearPrepareResponse;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtDetachedCacheEntry;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
@@ -39,7 +39,6 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteReducer;
@@ -159,7 +158,7 @@ public abstract class GridNearTxPrepareFutureAdapter extends
* @param nodeId Sender.
* @param res Response.
*/
- public abstract void onDhtResponse(UUID nodeId, GridDhtTxNearPrepareResponse res);
+ public abstract void onDhtResponse(UUID nodeId, GridDhtTxPrepareResponse res);
/**
* Checks if mapped transaction can be committed on one phase.
@@ -189,8 +188,8 @@ public abstract class GridNearTxPrepareFutureAdapter extends
* @param m Mapping.
* @param res Response.
*/
- final void onDhtPrepareResponse(GridDistributedTxMapping m, GridDhtTxNearPrepareResponse res) {
-
+ final void processDhtPrepareResponse(GridDistributedTxMapping m, GridDhtTxPrepareResponse res) {
+ // TODO IGNITE-4768.
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/5596e69f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
index 08b071d..d9b648c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
@@ -102,6 +102,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
}
/**
+ * @param part Partition.
* @param xid Xid version.
* @param futId Future ID.
* @param miniId Mini future ID.
@@ -113,6 +114,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
* @param addDepInfo Deployment info flag.
*/
public GridNearTxPrepareResponse(
+ int part,
GridCacheVersion xid,
IgniteUuid futId,
int miniId,
@@ -123,7 +125,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
AffinityTopologyVersion clientRemapVer,
boolean addDepInfo
) {
- super(xid, err, addDepInfo);
+ super(part, xid, err, addDepInfo);
assert futId != null;
assert dhtVer != null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/5596e69f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index 07777e2..33c8a18 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -32,6 +32,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheMvccFuture;
import org.apache.ignite.internal.processors.cache.GridCacheReturnCompletableWrapper;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
@@ -45,7 +46,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFini
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxLocal;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxNearPrepareResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxOnePhaseCommitAckRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareRequest;
@@ -152,12 +152,6 @@ public class IgniteTxHandler {
}
});
- ctx.io().addHandler(0, GridDhtTxNearPrepareResponse.class, new CI2<UUID, GridCacheMessage>() {
- @Override public void apply(UUID nodeId, GridCacheMessage msg) {
- processDhtTxNearPrepareResponse(nodeId, (GridDhtTxNearPrepareResponse)msg);
- }
- });
-
ctx.io().addHandler(0, GridNearTxFinishRequest.class, new CI2<UUID, GridCacheMessage>() {
@Override public void apply(UUID nodeId, GridCacheMessage msg) {
processNearTxFinishRequest(nodeId, (GridNearTxFinishRequest)msg);
@@ -278,6 +272,7 @@ public class IgniteTxHandler {
U.error(log, "Failed to prepare DHT transaction: " + locTx, e);
return new GridNearTxPrepareResponse(
+ req.partition(),
req.version(),
req.futureId(),
req.miniId(),
@@ -389,6 +384,7 @@ public class IgniteTxHandler {
}
GridNearTxPrepareResponse res = new GridNearTxPrepareResponse(
+ req.partition(),
req.version(),
req.futureId(),
req.miniId(),
@@ -565,34 +561,6 @@ public class IgniteTxHandler {
* @param nodeId Node ID.
* @param res Response.
*/
- private void processDhtTxNearPrepareResponse(UUID nodeId, GridDhtTxNearPrepareResponse res) {
- if (txPrepareMsgLog.isDebugEnabled())
- txPrepareMsgLog.debug("Received dht near prepare response [txId=" + res.nearTxId() + ", node=" + nodeId + ']');
-
- GridNearTxPrepareFutureAdapter fut = (GridNearTxPrepareFutureAdapter)ctx.mvcc()
- .<IgniteInternalTx>mvccFuture(res.nearTxId(), res.futureId());
-
- if (fut == null) {
- U.warn(log, "Failed to find future for dht near prepare response [txId=" + res.nearTxId() +
- ", node=" + nodeId +
- ", res=" + res + ']');
-
- return;
- }
-
- IgniteInternalTx tx = fut.tx();
-
- assert tx != null;
-
- res.txState(tx.txState());
-
- fut.onDhtResponse(nodeId, res);
- }
-
- /**
- * @param nodeId Node ID.
- * @param res Response.
- */
private void processNearTxPrepareResponse(UUID nodeId, GridNearTxPrepareResponse res) {
if (txPrepareMsgLog.isDebugEnabled())
txPrepareMsgLog.debug("Received near prepare response [txId=" + res.version() + ", node=" + nodeId + ']');
@@ -647,7 +615,7 @@ public class IgniteTxHandler {
* @param res Response.
*/
private void processDhtTxPrepareResponse(UUID nodeId, GridDhtTxPrepareResponse res) {
- GridDhtTxPrepareFuture fut = (GridDhtTxPrepareFuture)ctx.mvcc().mvccFuture(res.version(), res.futureId());
+ GridCacheMvccFuture<?> fut = ctx.mvcc().mvccFuture(res.version(), res.futureId());
if (fut == null) {
if (txPrepareMsgLog.isDebugEnabled()) {
@@ -668,7 +636,10 @@ public class IgniteTxHandler {
res.txState(tx.txState());
- fut.onResult(nodeId, res);
+ if (res.nearNodeResponse())
+ ((GridNearTxPrepareFutureAdapter)fut).onDhtResponse(nodeId, res);
+ else
+ ((GridDhtTxPrepareFuture)fut).onResult(nodeId, res);
}
/**
@@ -990,18 +961,21 @@ public class IgniteTxHandler {
GridNearTxRemote nearTx = null;
GridDhtTxPrepareResponse res = null;
- GridDhtTxNearPrepareResponse nearRes = null;
+ GridDhtTxPrepareResponse nearRes = null;
try {
if (req.dhtReplyNear()) {
- nearRes = new GridDhtTxNearPrepareResponse(
+ nearRes = new GridDhtTxPrepareResponse(
req.partition(),
req.nearXidVersion(),
req.nearFutureId(),
- req.nearMiniId());
+ null, //req.nearMiniId(),
+ req.deployInfo() != null);
}
else {
- res = new GridDhtTxPrepareResponse(req.version(),
+ res = new GridDhtTxPrepareResponse(
+ req.partition(),
+ req.version(),
req.futureId(),
req.miniId(),
req.deployInfo() != null);
@@ -1061,7 +1035,11 @@ public class IgniteTxHandler {
if (nearTx != null)
nearTx.rollback();
- res = new GridDhtTxPrepareResponse(req.version(), req.futureId(), req.miniId(), e,
+ res = new GridDhtTxPrepareResponse(req.partition(),
+ req.version(),
+ req.futureId(),
+ req.miniId(),
+ e,
req.deployInfo() != null);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5596e69f/modules/yardstick/config/ignite-base-config.xml
----------------------------------------------------------------------
diff --git a/modules/yardstick/config/ignite-base-config.xml b/modules/yardstick/config/ignite-base-config.xml
index 615cb42..5ee42a7 100644
--- a/modules/yardstick/config/ignite-base-config.xml
+++ b/modules/yardstick/config/ignite-base-config.xml
@@ -32,41 +32,6 @@
<property name="cacheConfiguration">
<list>
<bean class="org.apache.ignite.configuration.CacheConfiguration">
- <property name="name" value="atomic"/>
-
- <property name="cacheMode" value="PARTITIONED"/>
-
- <property name="atomicityMode" value="ATOMIC"/>
-
- <property name="swapEnabled" value="false"/>
- </bean>
-
- <bean class="org.apache.ignite.configuration.CacheConfiguration">
- <property name="name" value="atomic-offheap"/>
-
- <property name="cacheMode" value="PARTITIONED"/>
-
- <property name="atomicityMode" value="ATOMIC"/>
-
- <property name="swapEnabled" value="false"/>
-
- <property name="memoryMode" value="OFFHEAP_TIERED"/>
- </bean>
-
- <bean class="org.apache.ignite.configuration.CacheConfiguration">
- <property name="name" value="atomic-offheap-values"/>
-
- <property name="cacheMode" value="PARTITIONED"/>
-
- <property name="atomicityMode" value="ATOMIC"/>
-
- <property name="swapEnabled" value="false"/>
-
- <property name="memoryMode" value="OFFHEAP_VALUES"/>
-
- </bean>
-
- <bean class="org.apache.ignite.configuration.CacheConfiguration">
<property name="name" value="tx"/>
<property name="cacheMode" value="PARTITIONED"/>
@@ -75,141 +40,9 @@
<property name="swapEnabled" value="false"/>
- </bean>
-
- <bean class="org.apache.ignite.configuration.CacheConfiguration">
- <property name="name" value="tx-offheap"/>
-
- <property name="cacheMode" value="PARTITIONED"/>
-
- <property name="atomicityMode" value="TRANSACTIONAL"/>
-
- <property name="swapEnabled" value="false"/>
-
- <property name="memoryMode" value="OFFHEAP_TIERED"/>
-
- </bean>
+ <property name="writeSynchronizationMode" value="FULL_SYNC"/>
- <bean class="org.apache.ignite.configuration.CacheConfiguration">
- <property name="name" value="tx-offheap-values"/>
-
- <property name="cacheMode" value="PARTITIONED"/>
-
- <property name="atomicityMode" value="TRANSACTIONAL"/>
-
- <property name="swapEnabled" value="false"/>
-
- <property name="memoryMode" value="OFFHEAP_VALUES"/>
-
- </bean>
-
- <bean class="org.apache.ignite.configuration.CacheConfiguration">
- <property name="name" value="atomic-index"/>
-
- <property name="cacheMode" value="PARTITIONED"/>
-
- <property name="atomicityMode" value="ATOMIC"/>
-
- <property name="swapEnabled" value="false"/>
-
- <property name="indexedTypes">
- <list>
- <value>java.lang.Integer</value>
- <value>org.apache.ignite.yardstick.cache.model.Person1</value>
-
- <value>java.lang.Integer</value>
- <value>org.apache.ignite.yardstick.cache.model.Person2</value>
-
- <value>java.lang.Integer</value>
- <value>org.apache.ignite.yardstick.cache.model.Person8</value>
- </list>
- </property>
- </bean>
-
- <bean class="org.apache.ignite.configuration.CacheConfiguration">
- <property name="name" value="atomic-index-with-eviction"/>
-
- <property name="evictionPolicy">
- <bean class="org.apache.ignite.cache.eviction.fifo.FifoEvictionPolicy">
- <!-- default range (1000000) x (1 + default backups number (1)) / default nodes number (4) -->
- <constructor-arg value="500000"/>
- </bean>
- </property>
-
- <property name="cacheMode" value="PARTITIONED"/>
-
- <property name="atomicityMode" value="ATOMIC"/>
-
- <property name="swapEnabled" value="false"/>
-
- <property name="indexedTypes">
- <list>
- <value>java.lang.Integer</value>
- <value>org.apache.ignite.yardstick.cache.model.Person1</value>
-
- <value>java.lang.Integer</value>
- <value>org.apache.ignite.yardstick.cache.model.Person2</value>
-
- <value>java.lang.Integer</value>
- <value>org.apache.ignite.yardstick.cache.model.Person8</value>
- </list>
- </property>
- </bean>
-
- <bean class="org.apache.ignite.configuration.CacheConfiguration">
- <property name="name" value="query"/>
-
- <property name="cacheMode" value="PARTITIONED"/>
-
- <property name="atomicityMode" value="ATOMIC"/>
-
- <property name="indexedTypes">
- <list>
- <value>java.lang.Integer</value>
- <value>org.apache.ignite.yardstick.cache.model.Organization</value>
-
- <value>java.lang.Integer</value>
- <value>org.apache.ignite.yardstick.cache.model.Person</value>
-
- <value>java.lang.Integer</value>
- <value>org.apache.ignite.yardstick.cache.model.SampleValue</value>
-
- <value>java.lang.Integer</value>
- <value>java.lang.Integer</value>
- </list>
- </property>
- </bean>
-
- <bean class="org.apache.ignite.configuration.CacheConfiguration">
- <property name="name" value="query-offheap"/>
-
- <property name="cacheMode" value="PARTITIONED"/>
-
- <property name="atomicityMode" value="ATOMIC"/>
-
- <property name="swapEnabled" value="false"/>
-
- <property name="memoryMode" value="OFFHEAP_TIERED"/>
-
- <property name="indexedTypes">
- <list>
- <value>java.lang.Integer</value>
- <value>org.apache.ignite.yardstick.cache.model.Organization</value>
-
- <value>java.lang.Integer</value>
- <value>org.apache.ignite.yardstick.cache.model.Person</value>
- </list>
- </property>
- </bean>
-
- <bean class="org.apache.ignite.configuration.CacheConfiguration">
- <property name="name" value="compute"/>
-
- <property name="cacheMode" value="PARTITIONED"/>
-
- <property name="atomicityMode" value="TRANSACTIONAL"/>
-
- <property name="swapEnabled" value="false"/>
+ <property name="backups" value="2"/>
</bean>
</list>
</property>
http://git-wip-us.apache.org/repos/asf/ignite/blob/5596e69f/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgnitePutTxBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgnitePutTxBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgnitePutTxBenchmark.java
index 15b7cd6..3ad5a9d 100644
--- a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgnitePutTxBenchmark.java
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgnitePutTxBenchmark.java
@@ -22,9 +22,12 @@ import java.util.concurrent.Callable;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.IgniteTransactions;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
import org.apache.ignite.yardstick.IgniteBenchmarkUtils;
import org.apache.ignite.yardstick.cache.model.SampleValue;
import org.yardstickframework.BenchmarkConfiguration;
+import org.yardstickframework.BenchmarkUtils;
/**
* Ignite benchmark that performs transactional put operations.
@@ -58,7 +61,7 @@ public class IgnitePutTxBenchmark extends IgniteCacheAbstractBenchmark<Integer,
/** {@inheritDoc} */
@Override public boolean test(Map<Object, Object> ctx) throws Exception {
- IgniteBenchmarkUtils.doInTransaction(txs, args.txConcurrency(), args.txIsolation(), clo);
+ IgniteBenchmarkUtils.doInTransaction(txs, TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ, clo);
return true;
}