You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/03/14 08:00:46 UTC

[27/38] ignite git commit: ignite-4768

ignite-4768


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0246b74e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0246b74e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0246b74e

Branch: refs/heads/ignite-4768
Commit: 0246b74e22c547769e99aa890448b017205b62e2
Parents: bc5dbb0
Author: sboikov <sb...@gridgain.com>
Authored: Mon Mar 13 11:59:08 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Mar 13 11:59:08 2017 +0300

----------------------------------------------------------------------
 .../GridDistributedTxFinishResponse.java        | 75 ++++++++++++++++++--
 .../dht/GridDhtTxFinishResponse.java            | 60 +++++++---------
 .../cache/distributed/dht/GridDhtTxLocal.java   |  4 +-
 .../near/GridNearTxFinishResponse.java          |  5 +-
 .../cache/transactions/IgniteTxHandler.java     | 17 +++--
 5 files changed, 115 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/0246b74e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishResponse.java
index 109d665..2c446c3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishResponse.java
@@ -17,13 +17,14 @@
 
 package org.apache.ignite.internal.processors.cache.distributed;
 
-import java.io.Externalizable;
 import java.nio.ByteBuffer;
 import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
 import org.apache.ignite.internal.processors.cache.GridCacheMessage;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.tostring.GridToStringBuilder;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
@@ -41,25 +42,59 @@ public class GridDistributedTxFinishResponse extends GridCacheMessage {
     /** Future ID. */
     private IgniteUuid futId;
 
+    /** */
+    @GridToStringExclude
+    private byte flags;
+
+    /** */
+    private int part;
+
     /**
-     * Empty constructor required by {@link Externalizable}.
+     * Empty constructor required by {@link GridIoMessageFactory}.
      */
     public GridDistributedTxFinishResponse() {
         /* No-op. */
     }
 
     /**
+     * @param part Partition.
      * @param txId Transaction id.
      * @param futId Future ID.
      */
-    public GridDistributedTxFinishResponse(GridCacheVersion txId, IgniteUuid futId) {
+    public GridDistributedTxFinishResponse(int part, GridCacheVersion txId, IgniteUuid futId) {
         assert txId != null;
         assert futId != null;
 
+        this.part = part;
         this.txId = txId;
         this.futId = futId;
     }
 
+    /** {@inheritDoc} */
+    @Override public final int partition() {
+        return part;
+    }
+
+    /**
+     * Sets flag mask.
+     *
+     * @param flag Set or clear.
+     * @param mask Mask.
+     */
+    protected final void setFlag(boolean flag, int mask) {
+        flags = flag ? (byte)(flags | mask) : (byte)(flags & ~mask);
+    }
+
+    /**
+     * Reags flag mask.
+     *
+     * @param mask Mask to read.
+     * @return Flag value.
+     */
+    protected final boolean isFlag(int mask) {
+        return (flags & mask) != 0;
+    }
+
     /**
      *
      * @return Transaction id.
@@ -101,12 +136,24 @@ public class GridDistributedTxFinishResponse extends GridCacheMessage {
 
         switch (writer.state()) {
             case 3:
-                if (!writer.writeIgniteUuid("futId", futId))
+                if (!writer.writeByte("flags", flags))
                     return false;
 
                 writer.incrementState();
 
             case 4:
+                if (!writer.writeIgniteUuid("futId", futId))
+                    return false;
+
+                writer.incrementState();
+
+            case 5:
+                if (!writer.writeInt("part", part))
+                    return false;
+
+                writer.incrementState();
+
+            case 6:
                 if (!writer.writeMessage("txId", txId))
                     return false;
 
@@ -129,7 +176,7 @@ public class GridDistributedTxFinishResponse extends GridCacheMessage {
 
         switch (reader.state()) {
             case 3:
-                futId = reader.readIgniteUuid("futId");
+                flags = reader.readByte("flags");
 
                 if (!reader.isLastRead())
                     return false;
@@ -137,6 +184,22 @@ public class GridDistributedTxFinishResponse extends GridCacheMessage {
                 reader.incrementState();
 
             case 4:
+                futId = reader.readIgniteUuid("futId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 5:
+                part = reader.readInt("part");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 6:
                 txId = reader.readMessage("txId");
 
                 if (!reader.isLastRead())
@@ -156,7 +219,7 @@ public class GridDistributedTxFinishResponse extends GridCacheMessage {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 5;
+        return 7;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/0246b74e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java
index 7648178..bc9503f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java
@@ -39,6 +39,9 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
     /** */
     private static final long serialVersionUID = 0L;
 
+    /** Flag indicating if this is a check-committed response. */
+    private static final int CHECK_COMMITTED_FLAG_MASK = 0x01;
+
     /** Mini future ID. */
     private int miniId;
 
@@ -49,9 +52,6 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
     /** Serialized error. */
     private byte[] checkCommittedErrBytes;
 
-    /** Flag indicating if this is a check-committed response. */
-    private boolean checkCommitted;
-
     /** Cache return value. */
     private GridCacheReturn retVal;
 
@@ -63,12 +63,13 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
     }
 
     /**
+     * @param part Partition.
      * @param xid Xid version.
      * @param futId Future ID.
      * @param miniId Mini future ID.
      */
-    public GridDhtTxFinishResponse(GridCacheVersion xid, IgniteUuid futId, int miniId) {
-        super(xid, futId);
+    public GridDhtTxFinishResponse(int part, GridCacheVersion xid, IgniteUuid futId, int miniId) {
+        super(part, xid, futId);
 
         assert miniId != 0;
 
@@ -100,14 +101,14 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
      * @return Check committed flag.
      */
     public boolean checkCommitted() {
-        return checkCommitted;
+        return isFlag(CHECK_COMMITTED_FLAG_MASK);
     }
 
     /**
      * @param checkCommitted Check committed flag.
      */
     public void checkCommitted(boolean checkCommitted) {
-        this.checkCommitted = checkCommitted;
+        setFlag(checkCommitted, CHECK_COMMITTED_FLAG_MASK);
     }
 
     /** {@inheritDoc} */
@@ -158,11 +159,6 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
     }
 
     /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridDhtTxFinishResponse.class, this, super.toString());
-    }
-
-    /** {@inheritDoc} */
     @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
         writer.setBuffer(buf);
 
@@ -177,25 +173,19 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
         }
 
         switch (writer.state()) {
-            case 5:
-                if (!writer.writeBoolean("checkCommitted", checkCommitted))
-                    return false;
-
-                writer.incrementState();
-
-            case 6:
+            case 7:
                 if (!writer.writeByteArray("checkCommittedErrBytes", checkCommittedErrBytes))
                     return false;
 
                 writer.incrementState();
 
-            case 7:
+            case 8:
                 if (!writer.writeInt("miniId", miniId))
                     return false;
 
                 writer.incrementState();
 
-            case 8:
+            case 9:
                 if (!writer.writeMessage("retVal", retVal))
                     return false;
 
@@ -217,15 +207,7 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
             return false;
 
         switch (reader.state()) {
-            case 5:
-                checkCommitted = reader.readBoolean("checkCommitted");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 6:
+            case 7:
                 checkCommittedErrBytes = reader.readByteArray("checkCommittedErrBytes");
 
                 if (!reader.isLastRead())
@@ -233,7 +215,7 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
 
                 reader.incrementState();
 
-            case 7:
+            case 8:
                 miniId = reader.readInt("miniId");
 
                 if (!reader.isLastRead())
@@ -241,7 +223,7 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
 
                 reader.incrementState();
 
-            case 8:
+            case 9:
                 retVal = reader.readMessage("retVal");
 
                 if (!reader.isLastRead())
@@ -261,6 +243,18 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 9;
+        return 10;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        StringBuilder flags = new StringBuilder();
+
+        if (checkCommitted())
+            appendFlag(flags, "checkComm");
+
+        return S.toString(GridDhtTxFinishResponse.class, this,
+            "flags", flags.toString(),
+            "super", super.toString());
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/0246b74e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
index 1b543bf..bff69bc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
@@ -634,7 +634,9 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
                 return;
             }
 
-            GridNearTxFinishResponse res = new GridNearTxFinishResponse(nearXidVer,
+            GridNearTxFinishResponse res = new GridNearTxFinishResponse(
+                -1,
+                nearXidVer,
                 threadId,
                 nearFinFutId,
                 nearFinMiniId,

http://git-wip-us.apache.org/repos/asf/ignite/blob/0246b74e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java
index e9043ed..efe96b8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java
@@ -59,15 +59,16 @@ public class GridNearTxFinishResponse extends GridDistributedTxFinishResponse {
     }
 
     /**
+     * @param part Partition.
      * @param xid Xid version.
      * @param nearThreadId Near tx thread ID.
      * @param futId Future ID.
      * @param miniId Mini future Id.
      * @param err Error.
      */
-    public GridNearTxFinishResponse(GridCacheVersion xid, long nearThreadId, IgniteUuid futId, int miniId,
+    public GridNearTxFinishResponse(int part, GridCacheVersion xid, long nearThreadId, IgniteUuid futId, int miniId,
         @Nullable Throwable err) {
-        super(xid, futId);
+        super(part, xid, futId);
 
         assert miniId != 0;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/0246b74e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index 6daec14..56a7fa2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -796,8 +796,13 @@ public class IgniteTxHandler {
                 ", commit=" + req.commit() + ']');
 
             // Always send finish response.
-            GridCacheMessage res = new GridNearTxFinishResponse(req.version(), req.threadId(), req.futureId(),
-                req.miniId(), new IgniteCheckedException("Transaction has been already completed."));
+            GridCacheMessage res = new GridNearTxFinishResponse(
+                req.partition(),
+                req.version(),
+                req.threadId(),
+                req.futureId(),
+                req.miniId(),
+                new IgniteCheckedException("Transaction has been already completed."));
 
             try {
                 ctx.io().send(nodeId, res, req.policy());
@@ -1324,9 +1329,13 @@ public class IgniteTxHandler {
      * @param committed {@code True} if transaction committed on this node.
      * @param nearTxId Near tx version.
      */
-    protected void sendReply(UUID nodeId, GridDhtTxFinishRequest req, boolean committed, GridCacheVersion nearTxId) {
+    protected final void sendReply(UUID nodeId, GridDhtTxFinishRequest req, boolean committed, GridCacheVersion nearTxId) {
         if (req.replyRequired() || req.checkCommitted()) {
-            GridDhtTxFinishResponse res = new GridDhtTxFinishResponse(req.version(), req.futureId(), req.miniId());
+            GridDhtTxFinishResponse res = new GridDhtTxFinishResponse(
+                req.partition(),
+                req.version(),
+                req.futureId(),
+                req.miniId());
 
             if (req.checkCommitted()) {
                 res.checkCommitted(true);