You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/03/14 08:00:39 UTC
[20/38] ignite git commit: ignite-4768
ignite-4768
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/26316bb0
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/26316bb0
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/26316bb0
Branch: refs/heads/ignite-4768
Commit: 26316bb06d18569676af91c3d41671ebe3346533
Parents: 7cf4b98
Author: sboikov <sb...@gridgain.com>
Authored: Fri Mar 10 13:14:39 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Mar 10 15:10:12 2017 +0300
----------------------------------------------------------------------
.../GridDistributedTxFinishRequest.java | 217 +++++++++++++----
.../GridDistributedTxFinishResponse.java | 74 +++++-
.../GridDistributedTxPrepareResponse.java | 30 ++-
.../distributed/dht/GridDhtTxFinishFuture.java | 9 +-
.../distributed/dht/GridDhtTxFinishRequest.java | 241 +++++--------------
.../dht/GridDhtTxFinishResponse.java | 79 +++---
.../cache/distributed/dht/GridDhtTxLocal.java | 9 +-
.../distributed/dht/GridDhtTxPrepareFuture.java | 8 +-
.../dht/GridDhtTxPrepareResponse.java | 38 +--
...arOptimisticSerializableTxPrepareFuture.java | 7 +-
.../near/GridNearOptimisticTxPrepareFuture.java | 4 +-
.../GridNearPessimisticTxPrepareFuture.java | 5 +-
.../near/GridNearTxFinishFuture.java | 128 ++++++----
.../near/GridNearTxFinishRequest.java | 168 ++-----------
.../near/GridNearTxFinishResponse.java | 19 +-
.../near/GridNearTxPrepareFutureAdapter.java | 4 +-
.../near/GridNearTxPrepareResponse.java | 42 ++--
.../cache/transactions/IgniteTxHandler.java | 34 +--
18 files changed, 576 insertions(+), 540 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/26316bb0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
index 3e47cc9..ab9f0ff 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
@@ -20,18 +20,24 @@ package org.apache.ignite.internal.processors.cache.distributed;
import java.io.Externalizable;
import java.nio.ByteBuffer;
import java.util.Collection;
+import java.util.UUID;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxState;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxStateAware;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.tostring.GridToStringBuilder;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
+
/**
* Transaction completion message.
*/
@@ -39,6 +45,27 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i
/** */
private static final long serialVersionUID = 0L;
+ /** */
+ protected static final int WAIT_REMOTE_TX_FLAG_MASK = 0x01;
+
+ /** */
+ protected static final int CHECK_COMMITTED_FLAG_MASK = 0x02;
+
+ /** */
+ protected static final int NEED_RETURN_VALUE_FLAG_MASK = 0x04;
+
+ /** */
+ protected static final int SYS_INVALIDATE_FLAG_MASK = 0x08;
+
+ /** */
+ protected static final int EXPLICIT_LOCK_FLAG_MASK = 0x10;
+
+ /** */
+ protected static final int STORE_ENABLED_FLAG_MASK = 0x20;
+
+ /** Topology version. */
+ private AffinityTopologyVersion topVer;
+
/** Future ID. */
private IgniteUuid futId;
@@ -54,14 +81,6 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i
/** Commit flag. */
private boolean commit;
- /** Sync commit flag. */
- @Deprecated
- private boolean syncCommit;
-
- /** Sync commit flag. */
- @Deprecated
- private boolean syncRollback;
-
/** Min version used as base for completed versions. */
private GridCacheVersion baseVer;
@@ -74,6 +93,18 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i
/** IO policy. */
private byte plc;
+ /** Subject ID. */
+ private UUID subjId;
+
+ /** Task name hash. */
+ private int taskNameHash;
+
+ /** */
+ private byte flags;
+
+ /** Write synchronization mode. */
+ private CacheWriteSynchronizationMode syncMode;
+
/** Transient TX state. */
@GridDirectTransient
private IgniteTxState txState;
@@ -94,8 +125,7 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i
* @param invalidate Invalidate flag.
* @param sys System transaction flag.
* @param plc IO policy.
- * @param syncCommit Sync commit flag.
- * @param syncRollback Sync rollback flag.
+ * @param syncMode Write synchronization mode.
* @param baseVer Base version.
* @param committedVers Committed versions.
* @param rolledbackVers Rolled back versions.
@@ -105,39 +135,93 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i
public GridDistributedTxFinishRequest(
GridCacheVersion xidVer,
IgniteUuid futId,
+ @NotNull AffinityTopologyVersion topVer,
@Nullable GridCacheVersion commitVer,
long threadId,
boolean commit,
boolean invalidate,
boolean sys,
byte plc,
- boolean syncCommit,
- boolean syncRollback,
+ CacheWriteSynchronizationMode syncMode,
GridCacheVersion baseVer,
Collection<GridCacheVersion> committedVers,
Collection<GridCacheVersion> rolledbackVers,
+ @Nullable UUID subjId,
+ int taskNameHash,
int txSize,
boolean addDepInfo
) {
super(xidVer, 0, addDepInfo);
+
assert xidVer != null;
+ assert syncMode != null;
this.futId = futId;
+ this.topVer = topVer;
this.commitVer = commitVer;
this.threadId = threadId;
this.commit = commit;
this.invalidate = invalidate;
this.sys = sys;
this.plc = plc;
- this.syncCommit = syncCommit;
- this.syncRollback = syncRollback;
+ this.syncMode = syncMode;
this.baseVer = baseVer;
+ this.subjId = subjId;
+ this.taskNameHash = taskNameHash;
this.txSize = txSize;
completedVersions(committedVers, rolledbackVers);
}
/**
+ * @return Transaction write synchronization mode (can be null is message sent from old nodes).
+ */
+ public final CacheWriteSynchronizationMode syncMode() {
+ return syncMode;
+ }
+
+ /**
+ * Sets flag mask.
+ *
+ * @param flag Set or clear.
+ * @param mask Mask.
+ */
+ protected final void setFlag(boolean flag, int mask) {
+ flags = flag ? (byte)(flags | mask) : (byte)(flags & ~mask);
+ }
+
+ /**
+ * Reags flag mask.
+ *
+ * @param mask Mask to read.
+ * @return Flag value.
+ */
+ protected final boolean isFlag(int mask) {
+ return (flags & mask) != 0;
+ }
+
+ /**
+ * @return Subject ID.
+ */
+ @Nullable public final UUID subjectId() {
+ return subjId;
+ }
+
+ /**
+ * @return Task name hash.
+ */
+ public final int taskNameHash() {
+ return taskNameHash;
+ }
+
+ /**
+ * @return Topology version.
+ */
+ @Override public final AffinityTopologyVersion topologyVersion() {
+ return topVer;
+ }
+
+ /**
* @return System transaction flag.
*/
public boolean system() {
@@ -188,27 +272,6 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i
}
/**
- * @return Sync commit flag.
- */
- public boolean syncCommit() {
- return syncCommit;
- }
-
- /**
- * @param syncCommit Sync commit flag.
- */
- public void syncCommit(boolean syncCommit) {
- this.syncCommit = syncCommit;
- }
-
- /**
- * @return Sync rollback flag.
- */
- public boolean syncRollback() {
- return syncRollback;
- }
-
- /**
* @return Base version.
*/
public GridCacheVersion baseVersion() {
@@ -227,7 +290,9 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i
* @return {@code True} if reply is required.
*/
public boolean replyRequired() {
- return commit ? syncCommit : syncRollback;
+ assert syncMode != null;
+
+ return syncMode == FULL_SYNC;
}
/** {@inheritDoc} */
@@ -279,48 +344,66 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i
writer.incrementState();
case 10:
- if (!writer.writeIgniteUuid("futId", futId))
+ if (!writer.writeByte("flags", flags))
return false;
writer.incrementState();
case 11:
- if (!writer.writeBoolean("invalidate", invalidate))
+ if (!writer.writeIgniteUuid("futId", futId))
return false;
writer.incrementState();
case 12:
- if (!writer.writeByte("plc", plc))
+ if (!writer.writeBoolean("invalidate", invalidate))
return false;
writer.incrementState();
case 13:
- if (!writer.writeBoolean("syncCommit", syncCommit))
+ if (!writer.writeByte("plc", plc))
return false;
writer.incrementState();
case 14:
- if (!writer.writeBoolean("syncRollback", syncRollback))
+ if (!writer.writeUuid("subjId", subjId))
return false;
writer.incrementState();
case 15:
- if (!writer.writeBoolean("sys", sys))
+ if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1))
return false;
writer.incrementState();
case 16:
- if (!writer.writeLong("threadId", threadId))
+ if (!writer.writeBoolean("sys", sys))
return false;
writer.incrementState();
case 17:
+ if (!writer.writeInt("taskNameHash", taskNameHash))
+ return false;
+
+ writer.incrementState();
+
+ case 18:
+ if (!writer.writeLong("threadId", threadId))
+ return false;
+
+ writer.incrementState();
+
+ case 19:
+ if (!writer.writeMessage("topVer", topVer))
+ return false;
+
+ writer.incrementState();
+
+ case 20:
if (!writer.writeInt("txSize", txSize))
return false;
@@ -367,7 +450,7 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i
reader.incrementState();
case 10:
- futId = reader.readIgniteUuid("futId");
+ flags = reader.readByte("flags");
if (!reader.isLastRead())
return false;
@@ -375,7 +458,7 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i
reader.incrementState();
case 11:
- invalidate = reader.readBoolean("invalidate");
+ futId = reader.readIgniteUuid("futId");
if (!reader.isLastRead())
return false;
@@ -383,7 +466,7 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i
reader.incrementState();
case 12:
- plc = reader.readByte("plc");
+ invalidate = reader.readBoolean("invalidate");
if (!reader.isLastRead())
return false;
@@ -391,7 +474,7 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i
reader.incrementState();
case 13:
- syncCommit = reader.readBoolean("syncCommit");
+ plc = reader.readByte("plc");
if (!reader.isLastRead())
return false;
@@ -399,7 +482,7 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i
reader.incrementState();
case 14:
- syncRollback = reader.readBoolean("syncRollback");
+ subjId = reader.readUuid("subjId");
if (!reader.isLastRead())
return false;
@@ -407,15 +490,19 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i
reader.incrementState();
case 15:
- sys = reader.readBoolean("sys");
+ byte syncModeOrd;
+
+ syncModeOrd = reader.readByte("syncMode");
if (!reader.isLastRead())
return false;
+ syncMode = CacheWriteSynchronizationMode.fromOrdinal(syncModeOrd);
+
reader.incrementState();
case 16:
- threadId = reader.readLong("threadId");
+ sys = reader.readBoolean("sys");
if (!reader.isLastRead())
return false;
@@ -423,6 +510,30 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i
reader.incrementState();
case 17:
+ taskNameHash = reader.readInt("taskNameHash");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 18:
+ threadId = reader.readLong("threadId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 19:
+ topVer = reader.readMessage("topVer");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 20:
txSize = reader.readInt("txSize");
if (!reader.isLastRead())
@@ -442,7 +553,7 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 18;
+ return 21;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/26316bb0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishResponse.java
index 109d665..1f61033 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishResponse.java
@@ -20,10 +20,12 @@ package org.apache.ignite.internal.processors.cache.distributed;
import java.io.Externalizable;
import java.nio.ByteBuffer;
import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
import org.apache.ignite.internal.processors.cache.GridCacheMessage;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.tostring.GridToStringBuilder;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
@@ -41,25 +43,59 @@ public class GridDistributedTxFinishResponse extends GridCacheMessage {
/** Future ID. */
private IgniteUuid futId;
+ /** */
+ @GridToStringExclude
+ private byte flags;
+
+ /** */
+ private int part;
+
/**
- * Empty constructor required by {@link Externalizable}.
+ * Empty constructor required by {@link GridIoMessageFactory}.
*/
public GridDistributedTxFinishResponse() {
/* No-op. */
}
/**
+ * @param part Partition.
* @param txId Transaction id.
* @param futId Future ID.
*/
- public GridDistributedTxFinishResponse(GridCacheVersion txId, IgniteUuid futId) {
+ public GridDistributedTxFinishResponse(int part, GridCacheVersion txId, IgniteUuid futId) {
assert txId != null;
assert futId != null;
+ this.part = part;
this.txId = txId;
this.futId = futId;
}
+ /** {@inheritDoc} */
+ @Override public final int partition() {
+ return part;
+ }
+
+ /**
+ * Sets flag mask.
+ *
+ * @param flag Set or clear.
+ * @param mask Mask.
+ */
+ protected final void setFlag(boolean flag, int mask) {
+ flags = flag ? (byte)(flags | mask) : (byte)(flags & ~mask);
+ }
+
+ /**
+ * Reags flag mask.
+ *
+ * @param mask Mask to read.
+ * @return Flag value.
+ */
+ protected final boolean isFlag(int mask) {
+ return (flags & mask) != 0;
+ }
+
/**
*
* @return Transaction id.
@@ -101,12 +137,24 @@ public class GridDistributedTxFinishResponse extends GridCacheMessage {
switch (writer.state()) {
case 3:
- if (!writer.writeIgniteUuid("futId", futId))
+ if (!writer.writeByte("flags", flags))
return false;
writer.incrementState();
case 4:
+ if (!writer.writeIgniteUuid("futId", futId))
+ return false;
+
+ writer.incrementState();
+
+ case 5:
+ if (!writer.writeInt("part", part))
+ return false;
+
+ writer.incrementState();
+
+ case 6:
if (!writer.writeMessage("txId", txId))
return false;
@@ -129,7 +177,7 @@ public class GridDistributedTxFinishResponse extends GridCacheMessage {
switch (reader.state()) {
case 3:
- futId = reader.readIgniteUuid("futId");
+ flags = reader.readByte("flags");
if (!reader.isLastRead())
return false;
@@ -137,6 +185,22 @@ public class GridDistributedTxFinishResponse extends GridCacheMessage {
reader.incrementState();
case 4:
+ futId = reader.readIgniteUuid("futId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 5:
+ part = reader.readInt("part");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 6:
txId = reader.readMessage("txId");
if (!reader.isLastRead())
@@ -156,7 +220,7 @@ public class GridDistributedTxFinishResponse extends GridCacheMessage {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 5;
+ return 7;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/26316bb0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java
index c19b8c1..53a1391 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java
@@ -184,6 +184,18 @@ public class GridDistributedTxPrepareResponse extends GridDistributedBaseMessage
writer.incrementState();
+ case 8:
+ if (!writer.writeByte("flags", flags))
+ return false;
+
+ writer.incrementState();
+
+ case 9:
+ if (!writer.writeInt("part", part))
+ return false;
+
+ writer.incrementState();
+
}
return true;
@@ -208,6 +220,22 @@ public class GridDistributedTxPrepareResponse extends GridDistributedBaseMessage
reader.incrementState();
+ case 8:
+ flags = reader.readByte("flags");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 9:
+ part = reader.readInt("part");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
}
return reader.afterMessageRead(GridDistributedTxPrepareResponse.class);
@@ -220,7 +248,7 @@ public class GridDistributedTxPrepareResponse extends GridDistributedBaseMessage
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 8;
+ return 10;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/26316bb0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
index 5d4e610..7011996 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
@@ -325,8 +325,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
tx.system(),
tx.ioPolicy(),
tx.isSystemInvalidate(),
- sync,
- sync,
+ sync ? FULL_SYNC : tx.syncMode(),
tx.completedBase(),
tx.committedVersions(),
tx.rolledbackVersions(),
@@ -426,8 +425,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
tx.system(),
tx.ioPolicy(),
tx.isSystemInvalidate(),
- sync,
- sync,
+ sync ? FULL_SYNC : tx.syncMode(),
tx.completedBase(),
tx.committedVersions(),
tx.rolledbackVersions(),
@@ -497,8 +495,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
tx.system(),
tx.ioPolicy(),
tx.isSystemInvalidate(),
- sync,
- sync,
+ sync ? FULL_SYNC : tx.syncMode(),
tx.completedBase(),
tx.committedVersions(),
tx.rolledbackVersions(),
http://git-wip-us.apache.org/repos/asf/ignite/blob/26316bb0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
index c618a18..7194848 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
@@ -21,6 +21,7 @@ import java.io.Externalizable;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.UUID;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.internal.GridDirectCollection;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxFinishRequest;
@@ -43,12 +44,6 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
/** */
private static final long serialVersionUID = 0L;
- /** */
- public static final int WAIT_REMOTE_TX_FLAG_MASK = 0x01;
-
- /** */
- public static final int NEED_RETURN_VALUE_FLAG_MASK = 0x02;
-
/** Near node ID. */
private UUID nearNodeId;
@@ -58,20 +53,11 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
/** Mini future ID. */
private IgniteUuid miniId;
- /** System invalidation flag. */
- private boolean sysInvalidate;
-
- /** Topology version. */
- private AffinityTopologyVersion topVer;
-
/** Pending versions with order less than one for this message (needed for commit ordering). */
@GridToStringInclude
@GridDirectCollection(GridCacheVersion.class)
private Collection<GridCacheVersion> pendingVers;
- /** Check committed flag. */
- private boolean checkCommitted;
-
/** Partition update counter. */
@GridToStringInclude
@GridDirectCollection(Long.class)
@@ -80,15 +66,6 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
/** One phase commit write version. */
private GridCacheVersion writeVer;
- /** Subject ID. */
- private UUID subjId;
-
- /** Task name hash. */
- private int taskNameHash;
-
- /** */
- private byte flags;
-
/**
* Empty constructor required for {@link Externalizable}.
*/
@@ -110,8 +87,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
* @param sys System flag.
* @param plc IO policy.
* @param sysInvalidate System invalidation flag.
- * @param syncCommit Synchronous commit flag.
- * @param syncRollback Synchronous rollback flag.
+ * @param syncMode Write synchronization mode.
* @param baseVer Base version.
* @param committedVers Committed versions.
* @param rolledbackVers Rolled back versions.
@@ -135,8 +111,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
boolean sys,
byte plc,
boolean sysInvalidate,
- boolean syncCommit,
- boolean syncRollback,
+ CacheWriteSynchronizationMode syncMode,
GridCacheVersion baseVer,
Collection<GridCacheVersion> committedVers,
Collection<GridCacheVersion> rolledbackVers,
@@ -151,17 +126,19 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
super(
xidVer,
futId,
+ topVer,
commitVer,
threadId,
commit,
invalidate,
sys,
plc,
- syncCommit,
- syncRollback,
+ syncMode,
baseVer,
committedVers,
rolledbackVers,
+ subjId,
+ taskNameHash,
txSize,
addDepInfo);
@@ -170,16 +147,13 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
assert isolation != null;
this.pendingVers = pendingVers;
- this.topVer = topVer;
this.nearNodeId = nearNodeId;
this.isolation = isolation;
this.miniId = miniId;
- this.sysInvalidate = sysInvalidate;
- this.subjId = subjId;
- this.taskNameHash = taskNameHash;
needReturnValue(retVal);
waitRemoteTransactions(waitRemoteTxs);
+ systemInvalidate(sysInvalidate);
}
/**
@@ -196,8 +170,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
* @param sys System flag.
* @param plc IO policy.
* @param sysInvalidate System invalidation flag.
- * @param syncCommit Synchronous commit flag.
- * @param syncRollback Synchronous rollback flag.
+ * @param syncMode Write synchronization mode.
* @param baseVer Base version.
* @param committedVers Committed versions.
* @param rolledbackVers Rolled back versions.
@@ -222,8 +195,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
boolean sys,
byte plc,
boolean sysInvalidate,
- boolean syncCommit,
- boolean syncRollback,
+ CacheWriteSynchronizationMode syncMode,
GridCacheVersion baseVer,
Collection<GridCacheVersion> committedVers,
Collection<GridCacheVersion> rolledbackVers,
@@ -236,9 +208,30 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
boolean retVal,
boolean waitRemoteTxs
) {
- this(nearNodeId, futId, miniId, topVer, xidVer, commitVer, threadId, isolation, commit, invalidate, sys, plc,
- sysInvalidate, syncCommit, syncRollback, baseVer, committedVers, rolledbackVers, pendingVers, txSize,
- subjId, taskNameHash, addDepInfo, retVal, waitRemoteTxs);
+ this(nearNodeId,
+ futId,
+ miniId,
+ topVer,
+ xidVer,
+ commitVer,
+ threadId,
+ isolation,
+ commit,
+ invalidate,
+ sys,
+ plc,
+ sysInvalidate,
+ syncMode,
+ baseVer,
+ committedVers,
+ rolledbackVers,
+ pendingVers,
+ txSize,
+ subjId,
+ taskNameHash,
+ addDepInfo,
+ retVal,
+ waitRemoteTxs);
if (updateIdxs != null && !updateIdxs.isEmpty()) {
partUpdateCnt = new GridLongList(updateIdxs.size());
@@ -263,20 +256,6 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
}
/**
- * @return Subject ID.
- */
- @Nullable public UUID subjectId() {
- return subjId;
- }
-
- /**
- * @return Task name hash.
- */
- public int taskNameHash() {
- return taskNameHash;
- }
-
- /**
* @return Transaction isolation.
*/
public TransactionIsolation isolation() {
@@ -294,7 +273,14 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
* @return System invalidate flag.
*/
public boolean isSystemInvalidate() {
- return sysInvalidate;
+ return isFlag(SYS_INVALIDATE_FLAG_MASK);
+ }
+
+ /**
+ * @param sysInvalidate System invalidation flag.
+ */
+ private void systemInvalidate(boolean sysInvalidate) {
+ setFlag(sysInvalidate, SYS_INVALIDATE_FLAG_MASK);
}
/**
@@ -312,63 +298,45 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
}
/**
- * @return Topology version.
- */
- @Override public AffinityTopologyVersion topologyVersion() {
- return topVer;
- }
-
- /**
* @return Check committed flag.
*/
public boolean checkCommitted() {
- return checkCommitted;
+ return isFlag(CHECK_COMMITTED_FLAG_MASK);
}
/**
* @param checkCommitted Check committed flag.
*/
public void checkCommitted(boolean checkCommitted) {
- this.checkCommitted = checkCommitted;
+ setFlag(checkCommitted, CHECK_COMMITTED_FLAG_MASK);
}
/**
* @return {@code True}
*/
public boolean waitRemoteTransactions() {
- return (flags & WAIT_REMOTE_TX_FLAG_MASK) != 0;
+ return isFlag(WAIT_REMOTE_TX_FLAG_MASK);
}
/**
* @param waitRemoteTxs Wait remote transactions flag.
*/
- public void waitRemoteTransactions(boolean waitRemoteTxs) {
- if (waitRemoteTxs)
- flags = (byte)(flags | WAIT_REMOTE_TX_FLAG_MASK);
- else
- flags &= ~WAIT_REMOTE_TX_FLAG_MASK;
+ private void waitRemoteTransactions(boolean waitRemoteTxs) {
+ setFlag(waitRemoteTxs, WAIT_REMOTE_TX_FLAG_MASK);
}
/**
* @return Flag indicating whether transaction needs return value.
*/
public boolean needReturnValue() {
- return (flags & NEED_RETURN_VALUE_FLAG_MASK) != 0;
+ return isFlag(NEED_RETURN_VALUE_FLAG_MASK);
}
/**
* @param retVal Need return value.
*/
public void needReturnValue(boolean retVal) {
- if (retVal)
- flags = (byte)(flags | NEED_RETURN_VALUE_FLAG_MASK);
- else
- flags &= ~NEED_RETURN_VALUE_FLAG_MASK;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridDhtTxFinishRequest.class, this, super.toString());
+ setFlag(retVal, NEED_RETURN_VALUE_FLAG_MASK);
}
/** {@inheritDoc} */
@@ -386,73 +354,37 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
}
switch (writer.state()) {
- case 18:
- if (!writer.writeBoolean("checkCommitted", checkCommitted))
- return false;
-
- writer.incrementState();
-
- case 19:
- if (!writer.writeByte("flags", flags))
- return false;
-
- writer.incrementState();
-
- case 20:
- if (!writer.writeByte("isolation", isolation != null ? (byte)isolation.ordinal() : -1))
- return false;
-
- writer.incrementState();
-
case 21:
- if (!writer.writeIgniteUuid("miniId", miniId))
+ if (!writer.writeByte("isolation", isolation != null ? (byte)isolation.ordinal() : -1))
return false;
writer.incrementState();
case 22:
- if (!writer.writeUuid("nearNodeId", nearNodeId))
+ if (!writer.writeIgniteUuid("miniId", miniId))
return false;
writer.incrementState();
case 23:
- if (!writer.writeMessage("partUpdateCnt", partUpdateCnt))
+ if (!writer.writeUuid("nearNodeId", nearNodeId))
return false;
writer.incrementState();
case 24:
- if (!writer.writeCollection("pendingVers", pendingVers, MessageCollectionItemType.MSG))
+ if (!writer.writeMessage("partUpdateCnt", partUpdateCnt))
return false;
writer.incrementState();
case 25:
- if (!writer.writeUuid("subjId", subjId))
+ if (!writer.writeCollection("pendingVers", pendingVers, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
case 26:
- if (!writer.writeBoolean("sysInvalidate", sysInvalidate))
- return false;
-
- writer.incrementState();
-
- case 27:
- if (!writer.writeInt("taskNameHash", taskNameHash))
- return false;
-
- writer.incrementState();
-
- case 28:
- if (!writer.writeMessage("topVer", topVer))
- return false;
-
- writer.incrementState();
-
- case 29:
if (!writer.writeMessage("writeVer", writeVer))
return false;
@@ -474,23 +406,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
return false;
switch (reader.state()) {
- case 18:
- checkCommitted = reader.readBoolean("checkCommitted");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 19:
- flags = reader.readByte("flags");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 20:
+ case 21:
byte isolationOrd;
isolationOrd = reader.readByte("isolation");
@@ -502,16 +418,8 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
reader.incrementState();
- case 21:
- miniId = reader.readIgniteUuid("miniId");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
case 22:
- nearNodeId = reader.readUuid("nearNodeId");
+ miniId = reader.readIgniteUuid("miniId");
if (!reader.isLastRead())
return false;
@@ -519,7 +427,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
reader.incrementState();
case 23:
- partUpdateCnt = reader.readMessage("partUpdateCnt");
+ nearNodeId = reader.readUuid("nearNodeId");
if (!reader.isLastRead())
return false;
@@ -527,7 +435,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
reader.incrementState();
case 24:
- pendingVers = reader.readCollection("pendingVers", MessageCollectionItemType.MSG);
+ partUpdateCnt = reader.readMessage("partUpdateCnt");
if (!reader.isLastRead())
return false;
@@ -535,7 +443,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
reader.incrementState();
case 25:
- subjId = reader.readUuid("subjId");
+ pendingVers = reader.readCollection("pendingVers", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
return false;
@@ -543,30 +451,6 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
reader.incrementState();
case 26:
- sysInvalidate = reader.readBoolean("sysInvalidate");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 27:
- taskNameHash = reader.readInt("taskNameHash");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 28:
- topVer = reader.readMessage("topVer");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 29:
writeVer = reader.readMessage("writeVer");
if (!reader.isLastRead())
@@ -586,6 +470,11 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 30;
+ return 27;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridDhtTxFinishRequest.class, this, super.toString());
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/26316bb0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java
index 8fb1f4e..4808289 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java
@@ -39,6 +39,12 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
/** */
private static final long serialVersionUID = 0L;
+ /** */
+ private static final int NEAR_RES_FLAG_MASK = 0x01;
+
+ /** Flag indicating if this is a check-committed response. */
+ private static final int CHECK_COMMITTED_FLAG_MASK = 0x02;
+
/** Mini future ID. */
private IgniteUuid miniId;
@@ -49,9 +55,6 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
/** Serialized error. */
private byte[] checkCommittedErrBytes;
- /** Flag indicating if this is a check-committed response. */
- private boolean checkCommitted;
-
/** Cache return value. */
private GridCacheReturn retVal;
@@ -63,12 +66,13 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
}
/**
+ * @param part Partition.
* @param xid Xid version.
* @param futId Future ID.
* @param miniId Mini future ID.
*/
- public GridDhtTxFinishResponse(GridCacheVersion xid, IgniteUuid futId, IgniteUuid miniId) {
- super(xid, futId);
+ public GridDhtTxFinishResponse(int part, GridCacheVersion xid, IgniteUuid futId, IgniteUuid miniId) {
+ super(part, xid, futId);
assert miniId != null;
@@ -76,6 +80,20 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
}
/**
+ * @return {@code True} if this is reply for near node.
+ */
+ public boolean nearNodeResponse() {
+ return isFlag(NEAR_RES_FLAG_MASK);
+ }
+
+ /**
+ * @param val {@code True} if this is reply for near node.
+ */
+ public void nearNodeResponse(boolean val) {
+ setFlag(val, NEAR_RES_FLAG_MASK);
+ }
+
+ /**
* @return Mini future ID.
*/
public IgniteUuid miniId() {
@@ -100,14 +118,14 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
* @return Check committed flag.
*/
public boolean checkCommitted() {
- return checkCommitted;
+ return isFlag(CHECK_COMMITTED_FLAG_MASK);
}
/**
* @param checkCommitted Check committed flag.
*/
public void checkCommitted(boolean checkCommitted) {
- this.checkCommitted = checkCommitted;
+ setFlag(checkCommitted, CHECK_COMMITTED_FLAG_MASK);
}
/** {@inheritDoc} */
@@ -158,11 +176,6 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
}
/** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridDhtTxFinishResponse.class, this, super.toString());
- }
-
- /** {@inheritDoc} */
@Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
writer.setBuffer(buf);
@@ -177,25 +190,19 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
}
switch (writer.state()) {
- case 5:
- if (!writer.writeBoolean("checkCommitted", checkCommitted))
- return false;
-
- writer.incrementState();
-
- case 6:
+ case 7:
if (!writer.writeByteArray("checkCommittedErrBytes", checkCommittedErrBytes))
return false;
writer.incrementState();
- case 7:
+ case 8:
if (!writer.writeIgniteUuid("miniId", miniId))
return false;
writer.incrementState();
- case 8:
+ case 9:
if (!writer.writeMessage("retVal", retVal))
return false;
@@ -217,15 +224,7 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
return false;
switch (reader.state()) {
- case 5:
- checkCommitted = reader.readBoolean("checkCommitted");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 6:
+ case 7:
checkCommittedErrBytes = reader.readByteArray("checkCommittedErrBytes");
if (!reader.isLastRead())
@@ -233,7 +232,7 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
reader.incrementState();
- case 7:
+ case 8:
miniId = reader.readIgniteUuid("miniId");
if (!reader.isLastRead())
@@ -241,7 +240,7 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
reader.incrementState();
- case 8:
+ case 9:
retVal = reader.readMessage("retVal");
if (!reader.isLastRead())
@@ -261,6 +260,20 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 9;
+ return 10;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ StringBuilder flags = new StringBuilder();
+
+ if (checkCommitted())
+ appendFlag(flags, "checkComm");
+ if (nearNodeResponse())
+ appendFlag(flags, "nearRes");
+
+ return S.toString(GridDhtTxFinishResponse.class, this,
+ "flags", flags.toString(),
+ "super", super.toString());
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/26316bb0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
index 79c371c..c2c2918 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
@@ -641,8 +641,13 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
return;
}
- GridNearTxFinishResponse res = new GridNearTxFinishResponse(nearXidVer, threadId, nearFinFutId,
- nearFinMiniId, err);
+ GridNearTxFinishResponse res = new GridNearTxFinishResponse(
+ -1,
+ nearXidVer,
+ threadId,
+ nearFinFutId,
+ nearFinMiniId,
+ err);
try {
cctx.io().send(nearNodeId, res, ioPolicy());
http://git-wip-us.apache.org/repos/asf/ignite/blob/26316bb0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 7645d58..ca028f8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -1252,8 +1252,6 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
if (last) {
int miniId = 0;
- assert tx.transactionNodes() != null;
-
final long timeout = timeoutObj != null ? timeoutObj.timeout : 0;
// Create mini futures.
@@ -1288,7 +1286,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
GridDhtTxPrepareRequest req = new GridDhtTxPrepareRequest(
futId,
- fut != null ? fut.futureId() : null,
+ fut != null ? fut.futureId() : 0,
nearFutId,
nearMiniId,
tx.topologyVersion(),
@@ -1315,8 +1313,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
GridCacheContext<?, ?> cacheCtx = cached.context();
// Do not invalidate near entry on originating transaction node.
- req.invalidateNearEntry(idx, !tx.nearNodeId().equals(n.id()) &&
- cached.readerId(n.id()) != null);
+ req.invalidateNearEntry(idx,
+ !tx.nearNodeId().equals(n.id()) && cached.readerId(n.id()) != null);
if (cached.isNewLocked()) {
List<ClusterNode> owners = cacheCtx.topology().owners(cached.partition(),
http://git-wip-us.apache.org/repos/asf/ignite/blob/26316bb0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
index e7153da..2fd72f3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
@@ -247,11 +247,6 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
}
/** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridDhtTxPrepareResponse.class, this, "super", super.toString());
- }
-
- /** {@inheritDoc} */
@Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
writer.setBuffer(buf);
@@ -266,31 +261,31 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
}
switch (writer.state()) {
- case 8:
+ case 10:
if (!writer.writeIgniteUuid("futId", futId))
return false;
writer.incrementState();
- case 9:
+ case 11:
if (!writer.writeMap("invalidParts", invalidParts, MessageCollectionItemType.INT, MessageCollectionItemType.INT_ARR))
return false;
writer.incrementState();
- case 10:
+ case 12:
if (!writer.writeInt("miniId", miniId))
return false;
writer.incrementState();
- case 11:
+ case 13:
if (!writer.writeCollection("nearEvicted", nearEvicted, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
- case 12:
+ case 14:
if (!writer.writeCollection("preloadEntries", preloadEntries, MessageCollectionItemType.MSG))
return false;
@@ -312,7 +307,7 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
return false;
switch (reader.state()) {
- case 8:
+ case 10:
futId = reader.readIgniteUuid("futId");
if (!reader.isLastRead())
@@ -320,7 +315,7 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
reader.incrementState();
- case 9:
+ case 11:
invalidParts = reader.readMap("invalidParts", MessageCollectionItemType.INT, MessageCollectionItemType.INT_ARR, false);
if (!reader.isLastRead())
@@ -328,7 +323,7 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
reader.incrementState();
- case 10:
+ case 12:
miniId = reader.readInt("miniId");
if (!reader.isLastRead())
@@ -336,7 +331,7 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
reader.incrementState();
- case 11:
+ case 13:
nearEvicted = reader.readCollection("nearEvicted", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
@@ -344,7 +339,7 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
reader.incrementState();
- case 12:
+ case 14:
preloadEntries = reader.readCollection("preloadEntries", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
@@ -364,6 +359,17 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 13;
+ return 15;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ StringBuilder flags = new StringBuilder();
+
+ if (nearNodeResponse())
+ appendFlag(flags, "nearRes");
+
+ return S.toString(GridDhtTxPrepareResponse.class, this,
+ "super", super.toString());
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/26316bb0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
index 652eec2..cd0e7fd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
@@ -195,7 +195,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
}
/** {@inheritDoc} */
- @Override public void onResult(UUID nodeId, GridNearTxPrepareResponse res) {
+ @Override public void onPrimaryResponse(UUID nodeId, GridNearTxPrepareResponse res) {
if (!isDone()) {
MiniFuture mini = miniFuture(res.miniId());
@@ -885,7 +885,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
onDone(res);
}
else {
- parent.onPrimaryPrepareResponse(m, res);
+ parent.processPrimaryPrepareResponse(m, res);
// Finish this mini future (need result only on client node).
onDone(parent.cctx.kernalContext().clientNode() ? res : null);
@@ -899,8 +899,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
*/
private void remap(final GridNearTxPrepareResponse res) {
parent.prepareOnTopology(true, new Runnable() {
- @Override
- public void run() {
+ @Override public void run() {
onDone(res);
}
});
http://git-wip-us.apache.org/repos/asf/ignite/blob/26316bb0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index 5cfb1a2..d321cb9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -174,7 +174,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
}
/** {@inheritDoc} */
- @Override public void onResult(UUID nodeId, GridNearTxPrepareResponse res) {
+ @Override public void onPrimaryResponse(UUID nodeId, GridNearTxPrepareResponse res) {
if (!isDone()) {
MiniFuture mini = miniFuture(res.miniId());
@@ -918,7 +918,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
remap();
}
else {
- parent.onPrimaryPrepareResponse(m, res);
+ parent.processPrimaryPrepareResponse(m, res);
// Proceed prepare before finishing mini future.
if (mappings != null)
http://git-wip-us.apache.org/repos/asf/ignite/blob/26316bb0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
index b476336..03ee7af 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
@@ -95,7 +95,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
}
/** {@inheritDoc} */
- @Override public void onResult(UUID nodeId, GridNearTxPrepareResponse res) {
+ @Override public void onPrimaryResponse(UUID nodeId, GridNearTxPrepareResponse res) {
if (!isDone()) {
assert res.clientRemapVersion() == null : res;
@@ -205,7 +205,6 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
*/
private void preparePessimistic() {
// TODO IGNITE-4768: need detect on lock step?
-
boolean dhtReplyNear = true;
Map<IgniteBiTuple<ClusterNode, Boolean>, GridDistributedTxMapping> mappings = new HashMap<>();
@@ -473,7 +472,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
else {
assert dhtNodes == null;
- onPrimaryPrepareResponse(m, res);
+ processPrimaryPrepareResponse(m, res);
onDone();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/26316bb0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index 06cf878..1d52b6d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -148,7 +148,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
if (isMini(fut)) {
MinFuture f = (MinFuture)fut;
- if (f.onNodeLeft(nodeId, true)) {
+ if (f.onNodeLeft(nodeId)) {
// Remove previous mapping.
mappings.remove(nodeId);
@@ -184,7 +184,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
* @param res Result.
*/
@SuppressWarnings("ForLoopReplaceableByForEach")
- public void onResult(UUID nodeId, GridNearTxFinishResponse res) {
+ public void onPrimaryResponse(UUID nodeId, GridNearTxFinishResponse res) {
if (!isDone()) {
FinishMiniFuture finishFut = null;
@@ -209,7 +209,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
}
if (finishFut != null)
- finishFut.onNearFinishResponse(res);
+ finishFut.onPrimaryResponse(res);
else {
if (msgLog.isDebugEnabled()) {
msgLog.debug("Near finish fut, failed to find mini future [txId=" + tx.nearXidVersion() +
@@ -233,34 +233,64 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
* @param nodeId Sender.
* @param res Result.
*/
- public void onResult(UUID nodeId, GridDhtTxFinishResponse res) {
+ public void onDhtResponse(UUID nodeId, GridDhtTxFinishResponse res) {
+ assert res.checkCommitted() ^ res.nearNodeResponse() : res;
+
if (!isDone()) {
- boolean found = false;
+ MinFuture foundFut = null;
+
+ synchronized (sync) {
+ int size = futuresCountNoLock();
- for (IgniteInternalFuture<IgniteInternalTx> fut : futures()) {
- if (fut.getClass() == CheckBackupMiniFuture.class) {
- CheckBackupMiniFuture f = (CheckBackupMiniFuture)fut;
+ for (int i = 0; i < size; i++) {
+ IgniteInternalFuture<IgniteInternalTx> fut = future(i);
- if (f.futureId().equals(res.miniId())) {
- found = true;
+ if (res.nearNodeResponse()) {
+ assert tx.dhtReplyNear() && tx.syncMode() == FULL_SYNC;
- assert f.node().id().equals(nodeId);
+ if (fut.getClass() == FinishMiniFuture.class) {
+ FinishMiniFuture f = (FinishMiniFuture)fut;
- if (res.returnValue() != null)
- tx.implicitSingleResult(res.returnValue());
+ if (f.futureId().equals(res.miniId())) {
+ foundFut = (MinFuture)fut;
- f.onDhtFinishResponse(res);
+ break;
+ }
+ }
}
- }
- else if (fut.getClass() == CheckRemoteTxMiniFuture.class) {
- CheckRemoteTxMiniFuture f = (CheckRemoteTxMiniFuture)fut;
+ else {
+ assert res.checkCommitted();
+
+ if (fut.getClass() == CheckBackupMiniFuture.class) {
+ CheckBackupMiniFuture f = (CheckBackupMiniFuture)fut;
+
+ if (f.futureId().equals(res.miniId())) {
+ assert f.node().id().equals(nodeId);
- if (f.futureId().equals(res.miniId()))
- f.onDhtFinishResponse(nodeId);
+ foundFut = f;
+
+ if (res.returnValue() != null)
+ tx.implicitSingleResult(res.returnValue());
+
+ break;
+ }
+ }
+ else if (fut.getClass() == CheckRemoteTxMiniFuture.class) {
+ CheckRemoteTxMiniFuture f = (CheckRemoteTxMiniFuture)fut;
+
+ if (f.futureId().equals(res.miniId())) {
+ foundFut = f;
+
+ break;
+ }
+ }
+ }
}
}
- if (!found && msgLog.isDebugEnabled()) {
+ if (foundFut != null)
+ foundFut.onDhtFinishResponse(nodeId, res);
+ else if (msgLog.isDebugEnabled()) {
msgLog.debug("Near finish fut, failed to find mini future [txId=" + tx.nearXidVersion() +
", node=" + nodeId +
", res=" + res +
@@ -571,7 +601,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
}
}
catch (ClusterTopologyCheckedException ignored) {
- mini.onNodeLeft(backupId, false);
+ mini.onNodeLeft(backupId);
}
catch (IgniteCheckedException e) {
if (msgLog.isDebugEnabled()) {
@@ -731,7 +761,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
// Remove previous mapping.
mappings.remove(m.primary().id());
- fut.onNodeLeft(n.id(), false);
+ fut.onNodeLeft(n.id());
}
catch (IgniteCheckedException e) {
if (msgLog.isDebugEnabled()) {
@@ -813,8 +843,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
tx.system(),
tx.ioPolicy(),
false,
- tx.syncMode() == FULL_SYNC,
- tx.syncMode() == FULL_SYNC,
+ tx.syncMode(),
null,
null,
null,
@@ -840,10 +869,15 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
/**
* @param nodeId Node ID.
- * @param discoThread {@code True} if executed from discovery thread.
* @return {@code True} if future processed node failure.
*/
- abstract boolean onNodeLeft(UUID nodeId, boolean discoThread);
+ abstract boolean onNodeLeft(UUID nodeId);
+
+ /**
+ * @param nodeId Node ID.
+ * @param res Response.
+ */
+ abstract void onDhtFinishResponse(UUID nodeId, GridDhtTxFinishResponse res);
/**
* @return Future ID.
@@ -890,7 +924,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
}
/** {@inheritDoc} */
- boolean onNodeLeft(UUID nodeId, boolean discoThread) {
+ boolean onNodeLeft(UUID nodeId) {
if (nodeId.equals(m.primary().id())) {
if (msgLog.isDebugEnabled()) {
msgLog.debug("Near finish fut, mini future node left [txId=" + tx.nearXidVersion() +
@@ -919,7 +953,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
fut.listen(new CI1<IgniteInternalFuture<?>>() {
@Override public void apply(IgniteInternalFuture<?> fut) {
- mini.onDhtFinishResponse(cctx.localNodeId());
+ mini.onDhtFinishResponse(cctx.localNodeId(), null);
}
});
}
@@ -928,7 +962,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
cctx.io().send(backup, req, tx.ioPolicy());
}
catch (ClusterTopologyCheckedException ignored) {
- mini.onNodeLeft(backupId, discoThread);
+ mini.onNodeLeft(backupId);
}
catch (IgniteCheckedException e) {
mini.onDone(e);
@@ -936,7 +970,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
}
}
else
- mini.onDhtFinishResponse(backupId);
+ mini.onDhtFinishResponse(backupId, null);
}
}
}
@@ -950,10 +984,24 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
return false;
}
+ /** {@inheritDoc} */
+ @Override void onDhtFinishResponse(UUID nodeId, GridDhtTxFinishResponse res) {
+ assert dhtNodes != null;
+
+ boolean done;
+
+ synchronized (dhtNodes) {
+ done = dhtNodes.remove(nodeId) && dhtNodes.isEmpty();
+ }
+
+ if (done)
+ onDone(tx);
+ }
+
/**
- * @param res Result callback.
+ * @param res Response.
*/
- void onNearFinishResponse(GridNearTxFinishResponse res) {
+ void onPrimaryResponse(GridNearTxFinishResponse res) {
assert dhtNodes == null || res.error() != null;
if (res.error() != null)
@@ -999,7 +1047,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
}
/** {@inheritDoc} */
- @Override boolean onNodeLeft(UUID nodeId, boolean discoThread) {
+ @Override boolean onNodeLeft(UUID nodeId) {
if (nodeId.equals(backup.id())) {
readyNearMappingFromBackup(m);
@@ -1011,10 +1059,8 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
return false;
}
- /**
- * @param res Response.
- */
- void onDhtFinishResponse(GridDhtTxFinishResponse res) {
+ /** {@inheritDoc} */
+ @Override void onDhtFinishResponse(UUID nodeId, GridDhtTxFinishResponse res) {
readyNearMappingFromBackup(m);
Throwable err = res.checkCommittedError();
@@ -1060,14 +1106,12 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
}
/** {@inheritDoc} */
- @Override boolean onNodeLeft(UUID nodeId, boolean discoThread) {
+ @Override boolean onNodeLeft(UUID nodeId) {
return onResponse(nodeId);
}
- /**
- * @param nodeId Node ID.
- */
- void onDhtFinishResponse(UUID nodeId) {
+ /** {@inheritDoc} */
+ @Override void onDhtFinishResponse(UUID nodeId, GridDhtTxFinishResponse res) {
onResponse(nodeId);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/26316bb0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
index dfbbe18..8be4304 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
@@ -42,24 +42,6 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest {
/** Mini future ID. */
private IgniteUuid miniId;
- /** Explicit lock flag. */
- private boolean explicitLock;
-
- /** Store enabled flag. */
- private boolean storeEnabled;
-
- /** Topology version. */
- private AffinityTopologyVersion topVer;
-
- /** Subject ID. */
- private UUID subjId;
-
- /** Task name hash. */
- private int taskNameHash;
-
- /** Write synchronization mode. */
- private CacheWriteSynchronizationMode syncMode;
-
/**
* Empty constructor required for {@link Externalizable}.
*/
@@ -109,48 +91,53 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest {
super(
xidVer,
futId,
+ topVer,
null,
threadId,
commit,
invalidate,
sys,
plc,
- syncMode == CacheWriteSynchronizationMode.FULL_SYNC,
- syncMode == CacheWriteSynchronizationMode.FULL_SYNC,
+ syncMode,
baseVer,
committedVers,
rolledbackVers,
+ subjId,
+ taskNameHash,
txSize,
addDepInfo
);
- this.syncMode = syncMode;
- this.explicitLock = explicitLock;
- this.storeEnabled = storeEnabled;
- this.topVer = topVer;
- this.subjId = subjId;
- this.taskNameHash = taskNameHash;
+ explicitLock(explicitLock);
+ storeEnabled(storeEnabled);
}
/**
- * @return Transaction write synchronization mode (can be null is message sent from old nodes).
+ * @return Explicit lock flag.
*/
- @Nullable public CacheWriteSynchronizationMode syncMode() {
- return syncMode;
+ public boolean explicitLock() {
+ return isFlag(EXPLICIT_LOCK_FLAG_MASK);
}
/**
- * @return Explicit lock flag.
+ * @param explicitLock Explicit lock flag.
*/
- public boolean explicitLock() {
- return explicitLock;
+ private void explicitLock(boolean explicitLock) {
+ setFlag(explicitLock, EXPLICIT_LOCK_FLAG_MASK);
}
/**
* @return Store enabled flag.
*/
public boolean storeEnabled() {
- return storeEnabled;
+ return isFlag(STORE_ENABLED_FLAG_MASK);
+ }
+
+ /**
+ * @param storeEnabled Store enabled flag.
+ */
+ private void storeEnabled(boolean storeEnabled) {
+ setFlag(storeEnabled, STORE_ENABLED_FLAG_MASK);
}
/**
@@ -167,27 +154,6 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest {
this.miniId = miniId;
}
- /**
- * @return Subject ID.
- */
- @Nullable public UUID subjectId() {
- return subjId;
- }
-
- /**
- * @return Task name hash.
- */
- public int taskNameHash() {
- return taskNameHash;
- }
-
- /**
- * @return Topology version.
- */
- @Override public AffinityTopologyVersion topologyVersion() {
- return topVer;
- }
-
/** {@inheritDoc} */
@Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
writer.setBuffer(buf);
@@ -203,44 +169,8 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest {
}
switch (writer.state()) {
- case 18:
- if (!writer.writeBoolean("explicitLock", explicitLock))
- return false;
-
- writer.incrementState();
-
- case 19:
- if (!writer.writeIgniteUuid("miniId", miniId))
- return false;
-
- writer.incrementState();
-
- case 20:
- if (!writer.writeBoolean("storeEnabled", storeEnabled))
- return false;
-
- writer.incrementState();
-
case 21:
- if (!writer.writeUuid("subjId", subjId))
- return false;
-
- writer.incrementState();
-
- case 22:
- if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1))
- return false;
-
- writer.incrementState();
-
- case 23:
- if (!writer.writeInt("taskNameHash", taskNameHash))
- return false;
-
- writer.incrementState();
-
- case 24:
- if (!writer.writeMessage("topVer", topVer))
+ if (!writer.writeIgniteUuid("miniId", miniId))
return false;
writer.incrementState();
@@ -261,60 +191,8 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest {
return false;
switch (reader.state()) {
- case 18:
- explicitLock = reader.readBoolean("explicitLock");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 19:
- miniId = reader.readIgniteUuid("miniId");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 20:
- storeEnabled = reader.readBoolean("storeEnabled");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
case 21:
- subjId = reader.readUuid("subjId");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 22:
- byte syncModeOrd;
-
- syncModeOrd = reader.readByte("syncMode");
-
- if (!reader.isLastRead())
- return false;
-
- syncMode = CacheWriteSynchronizationMode.fromOrdinal(syncModeOrd);
-
- reader.incrementState();
-
- case 23:
- taskNameHash = reader.readInt("taskNameHash");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 24:
- topVer = reader.readMessage("topVer");
+ miniId = reader.readIgniteUuid("miniId");
if (!reader.isLastRead())
return false;
@@ -333,7 +211,7 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 25;
+ return 22;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/26316bb0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java
index 37fbb36..cf1ba46 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java
@@ -59,15 +59,16 @@ public class GridNearTxFinishResponse extends GridDistributedTxFinishResponse {
}
/**
+ * @param part Partition.
* @param xid Xid version.
* @param nearThreadId Near tx thread ID.
* @param futId Future ID.
* @param miniId Mini future Id.
* @param err Error.
*/
- public GridNearTxFinishResponse(GridCacheVersion xid, long nearThreadId, IgniteUuid futId, IgniteUuid miniId,
+ public GridNearTxFinishResponse(int part, GridCacheVersion xid, long nearThreadId, IgniteUuid futId, IgniteUuid miniId,
@Nullable Throwable err) {
- super(xid, futId);
+ super(part, xid, futId);
assert miniId != null;
@@ -127,19 +128,19 @@ public class GridNearTxFinishResponse extends GridDistributedTxFinishResponse {
}
switch (writer.state()) {
- case 5:
+ case 7:
if (!writer.writeByteArray("errBytes", errBytes))
return false;
writer.incrementState();
- case 6:
+ case 8:
if (!writer.writeIgniteUuid("miniId", miniId))
return false;
writer.incrementState();
- case 7:
+ case 9:
if (!writer.writeLong("nearThreadId", nearThreadId))
return false;
@@ -161,7 +162,7 @@ public class GridNearTxFinishResponse extends GridDistributedTxFinishResponse {
return false;
switch (reader.state()) {
- case 5:
+ case 7:
errBytes = reader.readByteArray("errBytes");
if (!reader.isLastRead())
@@ -169,7 +170,7 @@ public class GridNearTxFinishResponse extends GridDistributedTxFinishResponse {
reader.incrementState();
- case 6:
+ case 8:
miniId = reader.readIgniteUuid("miniId");
if (!reader.isLastRead())
@@ -177,7 +178,7 @@ public class GridNearTxFinishResponse extends GridDistributedTxFinishResponse {
reader.incrementState();
- case 7:
+ case 9:
nearThreadId = reader.readLong("nearThreadId");
if (!reader.isLastRead())
@@ -197,7 +198,7 @@ public class GridNearTxFinishResponse extends GridDistributedTxFinishResponse {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 8;
+ return 10;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/26316bb0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
index 71d2353..8a8dc7e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
@@ -152,7 +152,7 @@ public abstract class GridNearTxPrepareFutureAdapter extends
* @param nodeId Sender.
* @param res Result.
*/
- public abstract void onResult(UUID nodeId, GridNearTxPrepareResponse res);
+ public abstract void onPrimaryResponse(UUID nodeId, GridNearTxPrepareResponse res);
/**
* @param nodeId Sender.
@@ -197,7 +197,7 @@ public abstract class GridNearTxPrepareFutureAdapter extends
* @param res Response.
*/
@SuppressWarnings("ThrowableResultOfMethodCallIgnored")
- final void onPrimaryPrepareResponse(GridDistributedTxMapping m, GridNearTxPrepareResponse res) {
+ final void processPrimaryPrepareResponse(GridDistributedTxMapping m, GridNearTxPrepareResponse res) {
if (res == null)
return;