You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by nt...@apache.org on 2015/11/04 15:11:01 UTC
[18/36] ignite git commit: IGNITE-426 WIP.
IGNITE-426 WIP.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7389c907
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7389c907
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7389c907
Branch: refs/heads/ignite-462-2
Commit: 7389c907000f6fb29e2f13b916191c30d61625af
Parents: a06995a
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Thu Oct 29 13:03:28 2015 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Wed Nov 4 17:02:44 2015 +0300
----------------------------------------------------------------------
.../query/continuous/CacheContinuousQueryHandler.java | 10 +++-------
.../processors/continuous/GridContinuousProcessor.java | 3 ++-
.../IgniteCacheContinuousQueryClientReconnectTest.java | 2 +-
3 files changed, 6 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/7389c907/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index e40b2d7..1240ad1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -637,7 +637,7 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
private long lastFiredEvt;
/** */
- private AffinityTopologyVersion curTop;
+ private AffinityTopologyVersion curTop = AffinityTopologyVersion.NONE;
/** */
private final Map<Long, CacheContinuousQueryEntry> pendingEvts = new TreeMap<>();
@@ -669,7 +669,7 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
synchronized (pendingEvts) {
// Received first event.
- if (curTop == null) {
+ if (curTop == AffinityTopologyVersion.NONE) {
lastFiredEvt = entry.updateIndex();
curTop = entry.topologyVersion();
@@ -678,11 +678,7 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
}
if (curTop.compareTo(entry.topologyVersion()) < 0) {
- GridCacheAffinityManager aff = cctx.affinity();
-
- if (cctx.affinity().backups(entry.partition(), entry.topologyVersion()).isEmpty() &&
- !aff.primary(entry.partition(), curTop).id().equals(aff.primary(entry.partition(),
- entry.topologyVersion()).id())) {
+ if (entry.updateIndex() == 1 && !entry.isBackup()) {
entries = new ArrayList<>(pendingEvts.size());
for (CacheContinuousQueryEntry evt : pendingEvts.values()) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/7389c907/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index 0804ffa..497c6e9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -61,6 +61,7 @@ import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
@@ -216,7 +217,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
ctx.cache().internalCache(routine.handler().cacheName());
if (interCache != null && idxs != null && interCache.context() != null
- && !interCache.isLocal()) {
+ && !interCache.isLocal() && !CU.clientNode(ctx.grid().localNode())) {
Map<Integer, Long> map = interCache.context().topology().updateCounters();
for (Map.Entry<Integer, Long> e : map.entrySet()) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/7389c907/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientReconnectTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientReconnectTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientReconnectTest.java
index 560f2e0..2e1d78d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientReconnectTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientReconnectTest.java
@@ -94,7 +94,7 @@ public class IgniteCacheContinuousQueryClientReconnectTest extends IgniteClientR
int keyCnt = 100;
- for (int i = 0; i < 30; i++) {
+ for (int i = 0; i < 10; i++) {
lsnr.latch = new CountDownLatch(keyCnt);
for (int key = 0; key < keyCnt; key++)