You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/05/26 14:06:32 UTC
[32/39] ignite git commit: cc
cc
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/674e7dd2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/674e7dd2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/674e7dd2
Branch: refs/heads/ignite-5075-cc-debug
Commit: 674e7dd23edaaff66b84d27794a006896647c408
Parents: f651e87
Author: sboikov <sb...@gridgain.com>
Authored: Fri May 26 11:44:27 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri May 26 11:51:03 2017 +0300
----------------------------------------------------------------------
.../continuous/CacheContinuousQueryEntry.java | 23 +++++++++++++-------
.../CacheContinuousQueryEventBuffer.java | 18 ++++++++-------
.../continuous/CacheContinuousQueryManager.java | 12 ++++++----
.../CacheContinuousQueryPartitionRecovery.java | 2 +-
.../CacheContinuousQueryEventBufferTest.java | 7 +++---
...eCacheContinuousQueryImmutableEntryTest.java | 4 ++--
6 files changed, 40 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/674e7dd2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
index 28fdee3..3f463a1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
@@ -128,6 +128,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
* @param part Partition.
* @param updateCntr Update partition counter.
* @param topVer Topology version if applicable.
+ * @param flags Flags.
*/
CacheContinuousQueryEntry(
int cacheId,
@@ -138,7 +139,8 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
boolean keepBinary,
int part,
long updateCntr,
- @Nullable AffinityTopologyVersion topVer) {
+ @Nullable AffinityTopologyVersion topVer,
+ byte flags) {
this.cacheId = cacheId;
this.evtType = evtType;
this.key = key;
@@ -147,9 +149,17 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
this.part = part;
this.updateCntr = updateCntr;
this.topVer = topVer;
+ this.flags = flags;
if (keepBinary)
- flags |= KEEP_BINARY;
+ this.flags |= KEEP_BINARY;
+ }
+
+ /**
+ * @return Flags.
+ */
+ public byte flags() {
+ return flags;
}
/**
@@ -233,7 +243,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
if (!isFiltered())
return this;
- CacheContinuousQueryEntry e = new CacheContinuousQueryEntry(
+ return new CacheContinuousQueryEntry(
cacheId,
null,
null,
@@ -242,11 +252,8 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
false,
part,
updateCntr,
- topVer);
-
- e.flags = flags;
-
- return e;
+ topVer,
+ flags);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/674e7dd2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
index a072240..afe34c4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
@@ -363,7 +363,8 @@ public class CacheContinuousQueryEventBuffer {
e.isKeepBinary(),
e.partition(),
e.updateCounter(),
- e.topologyVersion());
+ e.topologyVersion(),
+ e.flags());
flushEntry.filteredCount(filtered);
@@ -399,13 +400,14 @@ public class CacheContinuousQueryEventBuffer {
private CacheContinuousQueryEntry filteredEntry(long cntr, long filtered) {
CacheContinuousQueryEntry e = new CacheContinuousQueryEntry(0,
null,
- null,
- null,
- null,
- false,
- part,
- cntr,
- topVer);
+ null,
+ null,
+ null,
+ false,
+ part,
+ cntr,
+ topVer,
+ (byte)0);
e.markFiltered();
http://git-wip-us.apache.org/repos/asf/ignite/blob/674e7dd2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index 7cbb1e1..1a655e9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -193,7 +193,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
lsnr.keepBinary(),
partId,
updCntr,
- topVer);
+ topVer,
+ (byte)0);
CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent<>(
cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0);
@@ -339,7 +340,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
lsnr.keepBinary(),
partId,
updateCntr,
- topVer);
+ topVer,
+ (byte)0);
IgniteCacheProxy jcache = cctx.kernalContext().cache().jcacheProxy(cctx.name());
@@ -400,7 +402,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
lsnr.keepBinary(),
e.partition(),
-1,
- null);
+ null,
+ (byte)0);
CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent(
cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0);
@@ -703,7 +706,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
keepBinary,
0,
-1,
- null);
+ null,
+ (byte)0);
next = new CacheContinuousQueryEvent<>(
cctx.kernalContext().cache().jcache(cctx.name()),
http://git-wip-us.apache.org/repos/asf/ignite/blob/674e7dd2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java
index 59252d2..e210c24 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java
@@ -236,7 +236,7 @@ class CacheContinuousQueryPartitionRecovery {
lastFiredEvt = e.getKey();
if (e.getValue() != HOLE && !e.getValue().isFiltered())
- entries.add(new CacheContinuousQueryEvent<K, V>(cache, cctx, e.getValue()));
+ entries.add(new CacheContinuousQueryEvent<K, V>(cache, cctx, pending));
iter.remove();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/674e7dd2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBufferTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBufferTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBufferTest.java
index 4710593..382f166 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBufferTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBufferTest.java
@@ -120,8 +120,8 @@ public class CacheContinuousQueryEventBufferTest extends GridCommonAbstractTest
false,
0,
cntr,
- null);
-
+ null,
+ (byte)0);
entries.add(entry);
@@ -140,7 +140,8 @@ public class CacheContinuousQueryEventBufferTest extends GridCommonAbstractTest
false,
0,
cntr,
- null);
+ null,
+ (byte)0);
expEntry.filteredCount(filtered);
http://git-wip-us.apache.org/repos/asf/ignite/blob/674e7dd2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java
index d230320..81a7515 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java
@@ -35,7 +35,6 @@ import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheObjectImpl;
import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
-import org.apache.ignite.internal.util.GridLongList;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
@@ -138,7 +137,8 @@ public class IgniteCacheContinuousQueryImmutableEntryTest extends GridCommonAbst
true,
1,
1L,
- new AffinityTopologyVersion(1L));
+ new AffinityTopologyVersion(1L),
+ (byte)0);
e0.markFiltered();