You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/03/14 08:00:46 UTC
[27/38] ignite git commit: ignite-4768
ignite-4768
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0246b74e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0246b74e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0246b74e
Branch: refs/heads/ignite-4768
Commit: 0246b74e22c547769e99aa890448b017205b62e2
Parents: bc5dbb0
Author: sboikov <sb...@gridgain.com>
Authored: Mon Mar 13 11:59:08 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Mar 13 11:59:08 2017 +0300
----------------------------------------------------------------------
.../GridDistributedTxFinishResponse.java | 75 ++++++++++++++++++--
.../dht/GridDhtTxFinishResponse.java | 60 +++++++---------
.../cache/distributed/dht/GridDhtTxLocal.java | 4 +-
.../near/GridNearTxFinishResponse.java | 5 +-
.../cache/transactions/IgniteTxHandler.java | 17 +++--
5 files changed, 115 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/0246b74e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishResponse.java
index 109d665..2c446c3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishResponse.java
@@ -17,13 +17,14 @@
package org.apache.ignite.internal.processors.cache.distributed;
-import java.io.Externalizable;
import java.nio.ByteBuffer;
import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
import org.apache.ignite.internal.processors.cache.GridCacheMessage;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.tostring.GridToStringBuilder;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
@@ -41,25 +42,59 @@ public class GridDistributedTxFinishResponse extends GridCacheMessage {
/** Future ID. */
private IgniteUuid futId;
+ /** */
+ @GridToStringExclude
+ private byte flags;
+
+ /** */
+ private int part;
+
/**
- * Empty constructor required by {@link Externalizable}.
+ * Empty constructor required by {@link GridIoMessageFactory}.
*/
public GridDistributedTxFinishResponse() {
/* No-op. */
}
/**
+ * @param part Partition.
* @param txId Transaction id.
* @param futId Future ID.
*/
- public GridDistributedTxFinishResponse(GridCacheVersion txId, IgniteUuid futId) {
+ public GridDistributedTxFinishResponse(int part, GridCacheVersion txId, IgniteUuid futId) {
assert txId != null;
assert futId != null;
+ this.part = part;
this.txId = txId;
this.futId = futId;
}
+ /** {@inheritDoc} */
+ @Override public final int partition() {
+ return part;
+ }
+
+ /**
+ * Sets flag mask.
+ *
+ * @param flag Set or clear.
+ * @param mask Mask.
+ */
+ protected final void setFlag(boolean flag, int mask) {
+ flags = flag ? (byte)(flags | mask) : (byte)(flags & ~mask);
+ }
+
+ /**
+ * Reags flag mask.
+ *
+ * @param mask Mask to read.
+ * @return Flag value.
+ */
+ protected final boolean isFlag(int mask) {
+ return (flags & mask) != 0;
+ }
+
/**
*
* @return Transaction id.
@@ -101,12 +136,24 @@ public class GridDistributedTxFinishResponse extends GridCacheMessage {
switch (writer.state()) {
case 3:
- if (!writer.writeIgniteUuid("futId", futId))
+ if (!writer.writeByte("flags", flags))
return false;
writer.incrementState();
case 4:
+ if (!writer.writeIgniteUuid("futId", futId))
+ return false;
+
+ writer.incrementState();
+
+ case 5:
+ if (!writer.writeInt("part", part))
+ return false;
+
+ writer.incrementState();
+
+ case 6:
if (!writer.writeMessage("txId", txId))
return false;
@@ -129,7 +176,7 @@ public class GridDistributedTxFinishResponse extends GridCacheMessage {
switch (reader.state()) {
case 3:
- futId = reader.readIgniteUuid("futId");
+ flags = reader.readByte("flags");
if (!reader.isLastRead())
return false;
@@ -137,6 +184,22 @@ public class GridDistributedTxFinishResponse extends GridCacheMessage {
reader.incrementState();
case 4:
+ futId = reader.readIgniteUuid("futId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 5:
+ part = reader.readInt("part");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 6:
txId = reader.readMessage("txId");
if (!reader.isLastRead())
@@ -156,7 +219,7 @@ public class GridDistributedTxFinishResponse extends GridCacheMessage {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 5;
+ return 7;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/0246b74e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java
index 7648178..bc9503f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java
@@ -39,6 +39,9 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
/** */
private static final long serialVersionUID = 0L;
+ /** Flag indicating if this is a check-committed response. */
+ private static final int CHECK_COMMITTED_FLAG_MASK = 0x01;
+
/** Mini future ID. */
private int miniId;
@@ -49,9 +52,6 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
/** Serialized error. */
private byte[] checkCommittedErrBytes;
- /** Flag indicating if this is a check-committed response. */
- private boolean checkCommitted;
-
/** Cache return value. */
private GridCacheReturn retVal;
@@ -63,12 +63,13 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
}
/**
+ * @param part Partition.
* @param xid Xid version.
* @param futId Future ID.
* @param miniId Mini future ID.
*/
- public GridDhtTxFinishResponse(GridCacheVersion xid, IgniteUuid futId, int miniId) {
- super(xid, futId);
+ public GridDhtTxFinishResponse(int part, GridCacheVersion xid, IgniteUuid futId, int miniId) {
+ super(part, xid, futId);
assert miniId != 0;
@@ -100,14 +101,14 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
* @return Check committed flag.
*/
public boolean checkCommitted() {
- return checkCommitted;
+ return isFlag(CHECK_COMMITTED_FLAG_MASK);
}
/**
* @param checkCommitted Check committed flag.
*/
public void checkCommitted(boolean checkCommitted) {
- this.checkCommitted = checkCommitted;
+ setFlag(checkCommitted, CHECK_COMMITTED_FLAG_MASK);
}
/** {@inheritDoc} */
@@ -158,11 +159,6 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
}
/** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridDhtTxFinishResponse.class, this, super.toString());
- }
-
- /** {@inheritDoc} */
@Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
writer.setBuffer(buf);
@@ -177,25 +173,19 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
}
switch (writer.state()) {
- case 5:
- if (!writer.writeBoolean("checkCommitted", checkCommitted))
- return false;
-
- writer.incrementState();
-
- case 6:
+ case 7:
if (!writer.writeByteArray("checkCommittedErrBytes", checkCommittedErrBytes))
return false;
writer.incrementState();
- case 7:
+ case 8:
if (!writer.writeInt("miniId", miniId))
return false;
writer.incrementState();
- case 8:
+ case 9:
if (!writer.writeMessage("retVal", retVal))
return false;
@@ -217,15 +207,7 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
return false;
switch (reader.state()) {
- case 5:
- checkCommitted = reader.readBoolean("checkCommitted");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 6:
+ case 7:
checkCommittedErrBytes = reader.readByteArray("checkCommittedErrBytes");
if (!reader.isLastRead())
@@ -233,7 +215,7 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
reader.incrementState();
- case 7:
+ case 8:
miniId = reader.readInt("miniId");
if (!reader.isLastRead())
@@ -241,7 +223,7 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
reader.incrementState();
- case 8:
+ case 9:
retVal = reader.readMessage("retVal");
if (!reader.isLastRead())
@@ -261,6 +243,18 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 9;
+ return 10;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ StringBuilder flags = new StringBuilder();
+
+ if (checkCommitted())
+ appendFlag(flags, "checkComm");
+
+ return S.toString(GridDhtTxFinishResponse.class, this,
+ "flags", flags.toString(),
+ "super", super.toString());
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0246b74e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
index 1b543bf..bff69bc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
@@ -634,7 +634,9 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
return;
}
- GridNearTxFinishResponse res = new GridNearTxFinishResponse(nearXidVer,
+ GridNearTxFinishResponse res = new GridNearTxFinishResponse(
+ -1,
+ nearXidVer,
threadId,
nearFinFutId,
nearFinMiniId,
http://git-wip-us.apache.org/repos/asf/ignite/blob/0246b74e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java
index e9043ed..efe96b8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java
@@ -59,15 +59,16 @@ public class GridNearTxFinishResponse extends GridDistributedTxFinishResponse {
}
/**
+ * @param part Partition.
* @param xid Xid version.
* @param nearThreadId Near tx thread ID.
* @param futId Future ID.
* @param miniId Mini future Id.
* @param err Error.
*/
- public GridNearTxFinishResponse(GridCacheVersion xid, long nearThreadId, IgniteUuid futId, int miniId,
+ public GridNearTxFinishResponse(int part, GridCacheVersion xid, long nearThreadId, IgniteUuid futId, int miniId,
@Nullable Throwable err) {
- super(xid, futId);
+ super(part, xid, futId);
assert miniId != 0;
http://git-wip-us.apache.org/repos/asf/ignite/blob/0246b74e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index 6daec14..56a7fa2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -796,8 +796,13 @@ public class IgniteTxHandler {
", commit=" + req.commit() + ']');
// Always send finish response.
- GridCacheMessage res = new GridNearTxFinishResponse(req.version(), req.threadId(), req.futureId(),
- req.miniId(), new IgniteCheckedException("Transaction has been already completed."));
+ GridCacheMessage res = new GridNearTxFinishResponse(
+ req.partition(),
+ req.version(),
+ req.threadId(),
+ req.futureId(),
+ req.miniId(),
+ new IgniteCheckedException("Transaction has been already completed."));
try {
ctx.io().send(nodeId, res, req.policy());
@@ -1324,9 +1329,13 @@ public class IgniteTxHandler {
* @param committed {@code True} if transaction committed on this node.
* @param nearTxId Near tx version.
*/
- protected void sendReply(UUID nodeId, GridDhtTxFinishRequest req, boolean committed, GridCacheVersion nearTxId) {
+ protected final void sendReply(UUID nodeId, GridDhtTxFinishRequest req, boolean committed, GridCacheVersion nearTxId) {
if (req.replyRequired() || req.checkCommitted()) {
- GridDhtTxFinishResponse res = new GridDhtTxFinishResponse(req.version(), req.futureId(), req.miniId());
+ GridDhtTxFinishResponse res = new GridDhtTxFinishResponse(
+ req.partition(),
+ req.version(),
+ req.futureId(),
+ req.miniId());
if (req.checkCommitted()) {
res.checkCommitted(true);