You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/05/26 14:06:22 UTC
[22/39] ignite git commit: cc
cc
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ab5aead4
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ab5aead4
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ab5aead4
Branch: refs/heads/ignite-5075-cc-debug
Commit: ab5aead4dcb651001c362326e6a0b50350b31c2e
Parents: ff0a2dd
Author: sboikov <sb...@gridgain.com>
Authored: Thu May 25 17:33:46 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu May 25 17:33:46 2017 +0300
----------------------------------------------------------------------
.../continuous/CacheContinuousQueryEntry.java | 59 ++++++++++----------
.../CacheContinuousQueryEventBuffer.java | 7 +++
2 files changed, 35 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/ab5aead4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
index 9db92b2..28fdee3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
@@ -51,6 +51,9 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
private static final byte FILTERED_ENTRY = 0b0010;
/** */
+ private static final byte KEEP_BINARY = 0b0100;
+
+ /** */
private static final EventType[] EVT_TYPE_VALS = EventType.values();
/**
@@ -105,9 +108,6 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
@GridToStringInclude
private AffinityTopologyVersion topVer;
- /** Keep binary. */
- private boolean keepBinary;
-
/** */
private long filteredCnt;
@@ -124,6 +124,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
* @param key Key.
* @param newVal New value.
* @param oldVal Old value.
+ * @param keepBinary Keep binary flag.
* @param part Partition.
* @param updateCntr Update partition counter.
* @param topVer Topology version if applicable.
@@ -146,7 +147,9 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
this.part = part;
this.updateCntr = updateCntr;
this.topVer = topVer;
- this.keepBinary = keepBinary;
+
+ if (keepBinary)
+ flags |= KEEP_BINARY;
}
/**
@@ -231,7 +234,15 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
return this;
CacheContinuousQueryEntry e = new CacheContinuousQueryEntry(
- cacheId, null, null, null, null, keepBinary, part, updateCntr, topVer);
+ cacheId,
+ null,
+ null,
+ null,
+ null,
+ false,
+ part,
+ updateCntr,
+ topVer);
e.flags = flags;
@@ -256,7 +267,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
* @return Keep binary flag.
*/
boolean isKeepBinary() {
- return keepBinary;
+ return (flags & KEEP_BINARY) != 0;
}
/**
@@ -370,42 +381,36 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
writer.incrementState();
case 4:
- if (!writer.writeBoolean("keepBinary", keepBinary))
- return false;
-
- writer.incrementState();
-
- case 5:
if (!writer.writeMessage("key", isFiltered() ? null : key))
return false;
writer.incrementState();
- case 6:
+ case 5:
if (!writer.writeMessage("newVal", isFiltered() ? null : newVal))
return false;
writer.incrementState();
- case 7:
+ case 6:
if (!writer.writeMessage("oldVal", isFiltered() ? null : oldVal))
return false;
writer.incrementState();
- case 8:
+ case 7:
if (!writer.writeInt("part", part))
return false;
writer.incrementState();
- case 9:
+ case 8:
if (!writer.writeMessage("topVer", topVer))
return false;
writer.incrementState();
- case 10:
+ case 9:
if (!writer.writeLong("updateCntr", updateCntr))
return false;
@@ -457,14 +462,6 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
reader.incrementState();
case 4:
- keepBinary = reader.readBoolean("keepBinary");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 5:
key = reader.readMessage("key");
if (!reader.isLastRead())
@@ -472,7 +469,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
reader.incrementState();
- case 6:
+ case 5:
newVal = reader.readMessage("newVal");
if (!reader.isLastRead())
@@ -480,7 +477,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
reader.incrementState();
- case 7:
+ case 6:
oldVal = reader.readMessage("oldVal");
if (!reader.isLastRead())
@@ -488,7 +485,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
reader.incrementState();
- case 8:
+ case 7:
part = reader.readInt("part");
if (!reader.isLastRead())
@@ -496,7 +493,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
reader.incrementState();
- case 9:
+ case 8:
topVer = reader.readMessage("topVer");
if (!reader.isLastRead())
@@ -504,7 +501,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
reader.incrementState();
- case 10:
+ case 9:
updateCntr = reader.readLong("updateCntr");
if (!reader.isLastRead())
@@ -519,7 +516,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 11;
+ return 10;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/ab5aead4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
index f496c8c..fd4029c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
@@ -96,6 +96,13 @@ public class CacheContinuousQueryEventBuffer {
else
ret = entries;
+ if (!pending.isEmpty()) {
+ if (ret == null)
+ ret = new ArrayList<>();
+
+ ret.addAll(pending.values());
+ }
+
return ret;
}