You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/05/31 09:23:15 UTC
[25/51] ignite git commit: ignite-5075
ignite-5075
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9db0d486
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9db0d486
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9db0d486
Branch: refs/heads/ignite-5075-pds
Commit: 9db0d4862070da6db7973e2b66067dd00da0ba8d
Parents: 681454c
Author: sboikov <sb...@gridgain.com>
Authored: Mon May 29 17:19:53 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon May 29 17:19:53 2017 +0300
----------------------------------------------------------------------
.../continuous/CacheContinuousQueryHandler.java | 15 ++++++++
.../CacheContinuousQueryListener.java | 16 ++++++++
.../continuous/CacheContinuousQueryManager.java | 9 +++--
.../query/continuous/CounterSkipContext.java | 40 ++++++++++++--------
4 files changed, 61 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/9db0d486/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 82b57b4..8d6aa2c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -487,6 +487,21 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
onEntryUpdated(evt, primary, false, null);
}
+ @Override public CounterSkipContext skipUpdateCounter(GridCacheContext cctx,
+ @Nullable CounterSkipContext ctx,
+ int part,
+ long cntr,
+ AffinityTopologyVersion topVer) {
+ CacheContinuousQueryEventBuffer buf = partitionBuffer(cctx, part);
+
+ if (ctx == null)
+ ctx = new CounterSkipContext(part, cntr, topVer);
+
+ buf.processEntry(ctx.entry(), true);
+
+ return ctx;
+ }
+
@Override public void onPartitionEvicted(int part) {
entryBufs.remove(part);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/9db0d486/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
index 84b22f9..fe9c198 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.query.continuous;
import java.util.Map;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateFuture;
import org.jetbrains.annotations.Nullable;
@@ -76,6 +77,21 @@ public interface CacheContinuousQueryListener<K, V> {
public void skipUpdateEvent(CacheContinuousQueryEvent<K, V> evt, AffinityTopologyVersion topVer, boolean primary);
/**
+ * @param cctx Cache context.
+ * @param skipCtx Context.
+ * @param part Partition.
+ * @param cntr Counter to skip.
+ * @param topVer Topology version.
+ * @return Context.
+ */
+ public CounterSkipContext skipUpdateCounter(
+ GridCacheContext cctx,
+ @Nullable CounterSkipContext skipCtx,
+ int part,
+ long cntr,
+ AffinityTopologyVersion topVer);
+
+ /**
* @param part Partition.
*/
public void onPartitionEvicted(int part);
http://git-wip-us.apache.org/repos/asf/ignite/blob/9db0d486/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index 6a10ed5..9910955 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -204,17 +204,20 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
}
/**
- * @param ctx Context.
+ * @param skipCtx Context.
* @param part Partition number.
* @param cntr Update counter.
* @param topVer Topology version.
* @return Context.
*/
- @Nullable public CounterSkipContext skipUpdateCounter(@Nullable CounterSkipContext ctx,
+ @Nullable public CounterSkipContext skipUpdateCounter(@Nullable CounterSkipContext skipCtx,
int part,
long cntr,
AffinityTopologyVersion topVer) {
- return null;
+ for (CacheContinuousQueryListener lsnr : lsnrs.values())
+ skipCtx = lsnr.skipUpdateCounter(cctx, skipCtx, part, cntr, topVer);
+
+ return skipCtx;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/9db0d486/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CounterSkipContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CounterSkipContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CounterSkipContext.java
index 89ac6f9..41183c8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CounterSkipContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CounterSkipContext.java
@@ -26,27 +26,35 @@ import org.jetbrains.annotations.Nullable;
*/
public class CounterSkipContext {
/** */
- private CacheContinuousQueryEntry entry;
+ private final CacheContinuousQueryEntry entry;
/** */
private List<Runnable> readySendC;
- CacheContinuousQueryEntry entry(int part, long cntr, AffinityTopologyVersion topVer) {
- if (entry == null) {
- entry = new CacheContinuousQueryEntry(0,
- null,
- null,
- null,
- null,
- false,
- part,
- cntr,
- topVer,
- (byte)0);
-
- entry.markFiltered();
- }
+ /**
+ * @param part Partition.
+ * @param cntr Filtered counter.
+ * @param topVer Topology version.
+ */
+ CounterSkipContext(int part, long cntr, AffinityTopologyVersion topVer) {
+ entry = new CacheContinuousQueryEntry(0,
+ null,
+ null,
+ null,
+ null,
+ false,
+ part,
+ cntr,
+ topVer,
+ (byte)0);
+
+ entry.markFiltered();
+ }
+ /**
+ * @return Entry for filtered counter.
+ */
+ CacheContinuousQueryEntry entry() {
return entry;
}