You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/05/26 14:06:22 UTC

[22/39] ignite git commit: cc

cc


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ab5aead4
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ab5aead4
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ab5aead4

Branch: refs/heads/ignite-5075-cc-debug
Commit: ab5aead4dcb651001c362326e6a0b50350b31c2e
Parents: ff0a2dd
Author: sboikov <sb...@gridgain.com>
Authored: Thu May 25 17:33:46 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu May 25 17:33:46 2017 +0300

----------------------------------------------------------------------
 .../continuous/CacheContinuousQueryEntry.java   | 59 ++++++++++----------
 .../CacheContinuousQueryEventBuffer.java        |  7 +++
 2 files changed, 35 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/ab5aead4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
index 9db92b2..28fdee3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
@@ -51,6 +51,9 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
     private static final byte FILTERED_ENTRY = 0b0010;
 
     /** */
+    private static final byte KEEP_BINARY = 0b0100;
+
+    /** */
     private static final EventType[] EVT_TYPE_VALS = EventType.values();
 
     /**
@@ -105,9 +108,6 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
     @GridToStringInclude
     private AffinityTopologyVersion topVer;
 
-    /** Keep binary. */
-    private boolean keepBinary;
-
     /** */
     private long filteredCnt;
 
@@ -124,6 +124,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
      * @param key Key.
      * @param newVal New value.
      * @param oldVal Old value.
+     * @param keepBinary Keep binary flag.
      * @param part Partition.
      * @param updateCntr Update partition counter.
      * @param topVer Topology version if applicable.
@@ -146,7 +147,9 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
         this.part = part;
         this.updateCntr = updateCntr;
         this.topVer = topVer;
-        this.keepBinary = keepBinary;
+
+        if (keepBinary)
+            flags |= KEEP_BINARY;
     }
 
     /**
@@ -231,7 +234,15 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
             return this;
 
         CacheContinuousQueryEntry e = new CacheContinuousQueryEntry(
-                cacheId, null, null, null, null, keepBinary, part, updateCntr, topVer);
+            cacheId,
+            null,
+            null,
+            null,
+            null,
+            false,
+            part,
+            updateCntr,
+            topVer);
 
         e.flags = flags;
 
@@ -256,7 +267,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
      * @return Keep binary flag.
      */
     boolean isKeepBinary() {
-        return keepBinary;
+        return (flags & KEEP_BINARY) != 0;
     }
 
     /**
@@ -370,42 +381,36 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
                 writer.incrementState();
 
             case 4:
-                if (!writer.writeBoolean("keepBinary", keepBinary))
-                    return false;
-
-                writer.incrementState();
-
-            case 5:
                 if (!writer.writeMessage("key", isFiltered() ? null : key))
                     return false;
 
                 writer.incrementState();
 
-            case 6:
+            case 5:
                 if (!writer.writeMessage("newVal", isFiltered() ? null : newVal))
                     return false;
 
                 writer.incrementState();
 
-            case 7:
+            case 6:
                 if (!writer.writeMessage("oldVal", isFiltered() ? null : oldVal))
                     return false;
 
                 writer.incrementState();
 
-            case 8:
+            case 7:
                 if (!writer.writeInt("part", part))
                     return false;
 
                 writer.incrementState();
 
-            case 9:
+            case 8:
                 if (!writer.writeMessage("topVer", topVer))
                     return false;
 
                 writer.incrementState();
 
-            case 10:
+            case 9:
                 if (!writer.writeLong("updateCntr", updateCntr))
                     return false;
 
@@ -457,14 +462,6 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
                 reader.incrementState();
 
             case 4:
-                keepBinary = reader.readBoolean("keepBinary");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 5:
                 key = reader.readMessage("key");
 
                 if (!reader.isLastRead())
@@ -472,7 +469,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
 
                 reader.incrementState();
 
-            case 6:
+            case 5:
                 newVal = reader.readMessage("newVal");
 
                 if (!reader.isLastRead())
@@ -480,7 +477,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
 
                 reader.incrementState();
 
-            case 7:
+            case 6:
                 oldVal = reader.readMessage("oldVal");
 
                 if (!reader.isLastRead())
@@ -488,7 +485,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
 
                 reader.incrementState();
 
-            case 8:
+            case 7:
                 part = reader.readInt("part");
 
                 if (!reader.isLastRead())
@@ -496,7 +493,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
 
                 reader.incrementState();
 
-            case 9:
+            case 8:
                 topVer = reader.readMessage("topVer");
 
                 if (!reader.isLastRead())
@@ -504,7 +501,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
 
                 reader.incrementState();
 
-            case 10:
+            case 9:
                 updateCntr = reader.readLong("updateCntr");
 
                 if (!reader.isLastRead())
@@ -519,7 +516,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 11;
+        return 10;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/ab5aead4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
index f496c8c..fd4029c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
@@ -96,6 +96,13 @@ public class CacheContinuousQueryEventBuffer {
         else
             ret = entries;
 
+        if (!pending.isEmpty()) {
+            if (ret == null)
+                ret = new ArrayList<>();
+
+            ret.addAll(pending.values());
+        }
+
         return ret;
     }