You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/05/25 15:37:07 UTC
[04/18] ignite git commit: cc
cc
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/31f32998
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/31f32998
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/31f32998
Branch: refs/heads/ignite-5075-cc-debug
Commit: 31f32998c8cbcd51797d34069711e27db65e48ab
Parents: 7bf63c0
Author: sboikov <sb...@gridgain.com>
Authored: Thu May 25 11:47:09 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu May 25 11:47:09 2017 +0300
----------------------------------------------------------------------
.../continuous/CacheContinuousQueryEventBuffer.java | 14 +++++++-------
.../query/continuous/CacheContinuousQueryHandler.java | 6 ++++++
.../IgniteCacheContinuousQueryBackupQueueTest.java | 13 +++++++++----
3 files changed, 22 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/31f32998/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
index 949ea67..f0640b1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
@@ -41,13 +41,6 @@ public class CacheContinuousQueryEventBuffer {
/** */
protected final int part;
- /**
- * @param part Partition number.
- */
- CacheContinuousQueryEventBuffer(int part) {
- this.part = part;
- }
-
/** */
private AtomicReference<Batch> curBatch = new AtomicReference<>();
@@ -58,6 +51,13 @@ public class CacheContinuousQueryEventBuffer {
private ConcurrentSkipListMap<Long, CacheContinuousQueryEntry> pending = new ConcurrentSkipListMap<>();
/**
+ * @param part Partition number.
+ */
+ CacheContinuousQueryEventBuffer(int part) {
+ this.part = part;
+ }
+
+ /**
* @param updateCntr Acknowledged counter.
*/
void cleanupBackupQueue(Long updateCntr) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/31f32998/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 540f871..9866e7b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -954,6 +954,12 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
/** {@inheritDoc} */
@Override public void onNodeLeft() {
nodeLeft = true;
+
+ for (Map.Entry<Integer, CacheContinuousQueryEventBuffer> bufE : entryBufs.entrySet()) {
+ CacheContinuousQueryEventBuffer buf = bufE.getValue();
+
+ buf.resetBackupQueue();
+ }
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/31f32998/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryBackupQueueTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryBackupQueueTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryBackupQueueTest.java
index 26c7d41..85d68d3 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryBackupQueueTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryBackupQueueTest.java
@@ -262,11 +262,16 @@ public class IgniteCacheContinuousQueryBackupQueueTest extends GridCommonAbstrac
GridContinuousHandler hnd = GridTestUtils.getFieldValue(info, "hnd");
if (hnd.isQuery() && hnd.cacheName().equals(CACHE_NAME)) {
- Collection<Object> q = GridTestUtils.getFieldValue(hnd,
- CacheContinuousQueryHandler.class, "backupQueue");
+ Map<Integer, CacheContinuousQueryEventBuffer> map = GridTestUtils.getFieldValue(hnd,
+ CacheContinuousQueryHandler.class, "entryBufs");
- if (q != null)
- backupQueues.add(q);
+ for (CacheContinuousQueryEventBuffer buf : map.values()) {
+ Collection<Object> q = GridTestUtils.getFieldValue(buf,
+ CacheContinuousQueryEventBuffer.class, "backupQ");
+
+ if (q != null)
+ backupQueues.add(q);
+ }
}
}