You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/05/29 14:32:31 UTC

[22/22] ignite git commit: ignite-5075

ignite-5075


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9db0d486
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9db0d486
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9db0d486

Branch: refs/heads/ignite-5075
Commit: 9db0d4862070da6db7973e2b66067dd00da0ba8d
Parents: 681454c
Author: sboikov <sb...@gridgain.com>
Authored: Mon May 29 17:19:53 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon May 29 17:19:53 2017 +0300

----------------------------------------------------------------------
 .../continuous/CacheContinuousQueryHandler.java | 15 ++++++++
 .../CacheContinuousQueryListener.java           | 16 ++++++++
 .../continuous/CacheContinuousQueryManager.java |  9 +++--
 .../query/continuous/CounterSkipContext.java    | 40 ++++++++++++--------
 4 files changed, 61 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/9db0d486/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 82b57b4..8d6aa2c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -487,6 +487,21 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
                 onEntryUpdated(evt, primary, false, null);
             }
 
+            @Override public CounterSkipContext skipUpdateCounter(GridCacheContext cctx,
+                @Nullable CounterSkipContext ctx,
+                int part,
+                long cntr,
+                AffinityTopologyVersion topVer) {
+                CacheContinuousQueryEventBuffer buf = partitionBuffer(cctx, part);
+
+                if (ctx == null)
+                    ctx = new CounterSkipContext(part, cntr, topVer);
+
+                buf.processEntry(ctx.entry(), true);
+
+                return ctx;
+            }
+
             @Override public void onPartitionEvicted(int part) {
                 entryBufs.remove(part);
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/9db0d486/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
index 84b22f9..fe9c198 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.query.continuous;
 import java.util.Map;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateFuture;
 import org.jetbrains.annotations.Nullable;
 
@@ -76,6 +77,21 @@ public interface CacheContinuousQueryListener<K, V> {
     public void skipUpdateEvent(CacheContinuousQueryEvent<K, V> evt, AffinityTopologyVersion topVer, boolean primary);
 
     /**
+     * @param cctx Cache context.
+     * @param skipCtx Context.
+     * @param part Partition.
+     * @param cntr Counter to skip.
+     * @param topVer Topology version.
+     * @return Context.
+     */
+    public CounterSkipContext skipUpdateCounter(
+        GridCacheContext cctx,
+        @Nullable CounterSkipContext skipCtx,
+        int part,
+        long cntr,
+        AffinityTopologyVersion topVer);
+
+    /**
      * @param part Partition.
      */
     public void onPartitionEvicted(int part);

http://git-wip-us.apache.org/repos/asf/ignite/blob/9db0d486/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index 6a10ed5..9910955 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -204,17 +204,20 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
     }
 
     /**
-     * @param ctx Context.
+     * @param skipCtx Context.
      * @param part Partition number.
      * @param cntr Update counter.
      * @param topVer Topology version.
      * @return Context.
      */
-    @Nullable public CounterSkipContext skipUpdateCounter(@Nullable CounterSkipContext ctx,
+    @Nullable public CounterSkipContext skipUpdateCounter(@Nullable CounterSkipContext skipCtx,
         int part,
         long cntr,
         AffinityTopologyVersion topVer) {
-        return null;
+        for (CacheContinuousQueryListener lsnr : lsnrs.values())
+            skipCtx = lsnr.skipUpdateCounter(cctx, skipCtx, part, cntr, topVer);
+
+        return skipCtx;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/9db0d486/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CounterSkipContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CounterSkipContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CounterSkipContext.java
index 89ac6f9..41183c8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CounterSkipContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CounterSkipContext.java
@@ -26,27 +26,35 @@ import org.jetbrains.annotations.Nullable;
  */
 public class CounterSkipContext {
     /** */
-    private CacheContinuousQueryEntry entry;
+    private final CacheContinuousQueryEntry entry;
 
     /** */
     private List<Runnable> readySendC;
 
-    CacheContinuousQueryEntry entry(int part, long cntr, AffinityTopologyVersion topVer) {
-        if (entry == null) {
-            entry = new CacheContinuousQueryEntry(0,
-                null,
-                null,
-                null,
-                null,
-                false,
-                part,
-                cntr,
-                topVer,
-                (byte)0);
-
-            entry.markFiltered();
-        }
+    /**
+     * @param part Partition.
+     * @param cntr Filtered counter.
+     * @param topVer Topology version.
+     */
+    CounterSkipContext(int part, long cntr, AffinityTopologyVersion topVer) {
+        entry = new CacheContinuousQueryEntry(0,
+            null,
+            null,
+            null,
+            null,
+            false,
+            part,
+            cntr,
+            topVer,
+            (byte)0);
+
+        entry.markFiltered();
+    }
 
+    /**
+     * @return Entry for filtered counter.
+     */
+    CacheContinuousQueryEntry entry() {
         return entry;
     }