You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/09/07 11:16:50 UTC

[05/50] [abbrv] ignite git commit: IGNITE-3470 - Support EXPIRED events in continuous queries

IGNITE-3470 - Support EXPIRED events in continuous queries


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/aedfde69
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/aedfde69
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/aedfde69

Branch: refs/heads/ignite-2649
Commit: aedfde69af6e91277616052ab60fa0037693c2c6
Parents: b81dbbf
Author: Valentin Kulichenko <va...@gmail.com>
Authored: Tue Jul 19 16:01:32 2016 -0700
Committer: Valentin Kulichenko <va...@gmail.com>
Committed: Tue Jul 19 16:01:32 2016 -0700

----------------------------------------------------------------------
 .../ignite/cache/query/ContinuousQuery.java     | 47 +++++++++++++++----
 .../processors/cache/IgniteCacheProxy.java      |  3 +-
 .../continuous/CacheContinuousQueryManager.java |  9 ++--
 ...ridCacheContinuousQueryAbstractSelfTest.java | 48 +++++++++++++++++++-
 4 files changed, 91 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/aedfde69/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java
index bbfe8cc..49d471e 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java
@@ -21,6 +21,7 @@ import javax.cache.Cache;
 import javax.cache.configuration.Factory;
 import javax.cache.event.CacheEntryEventFilter;
 import javax.cache.event.CacheEntryUpdatedListener;
+import javax.cache.event.EventType;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
 import org.apache.ignite.configuration.IgniteConfiguration;
@@ -142,6 +143,9 @@ public final class ContinuousQuery<K, V> extends Query<Cache.Entry<K, V>> {
     /** Automatic unsubscription flag. */
     private boolean autoUnsubscribe = DFLT_AUTO_UNSUBSCRIBE;
 
+    /** Whether to notify about {@link EventType#EXPIRED} events. */
+    private boolean includeExpired;
+
     /**
      * Creates new continuous query.
      */
@@ -324,6 +328,38 @@ public final class ContinuousQuery<K, V> extends Query<Cache.Entry<K, V>> {
         return this;
     }
 
+    /**
+     * Gets automatic unsubscription flag value.
+     *
+     * @return Automatic unsubscription flag.
+     */
+    public boolean isAutoUnsubscribe() {
+        return autoUnsubscribe;
+    }
+
+    /**
+     * Sets the flag value defining whether to notify about {@link EventType#EXPIRED} events.
+     * If {@code true}, then the remote listener will get notifications about entries
+     * expired in cache. Otherwise, only {@link EventType#CREATED}, {@link EventType#UPDATED}
+     * and {@link EventType#REMOVED} events will be fired in the remote listener.
+     * <p>
+     * This flag is {@code false} by default, so {@link EventType#EXPIRED} events are disabled.
+     *
+     * @param includeExpired Whether to notify about {@link EventType#EXPIRED} events.
+     */
+    public void setIncludeExpired(boolean includeExpired) {
+        this.includeExpired = includeExpired;
+    }
+
+    /**
+     * Gets the flag value defining whether to notify about {@link EventType#EXPIRED} events.
+     *
+     * @return Whether to notify about {@link EventType#EXPIRED} events.
+     */
+    public boolean isIncludeExpired() {
+        return includeExpired;
+    }
+
     /** {@inheritDoc} */
     @Override public ContinuousQuery<K, V> setPageSize(int pageSize) {
         return (ContinuousQuery<K, V>)super.setPageSize(pageSize);
@@ -333,13 +369,4 @@ public final class ContinuousQuery<K, V> extends Query<Cache.Entry<K, V>> {
     @Override public ContinuousQuery<K, V> setLocal(boolean loc) {
         return (ContinuousQuery<K, V>)super.setLocal(loc);
     }
-
-    /**
-     * Gets automatic unsubscription flag value.
-     *
-     * @return Automatic unsubscription flag.
-     */
-    public boolean isAutoUnsubscribe() {
-        return autoUnsubscribe;
-    }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/aedfde69/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index 92e59db..249cfae 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -598,7 +598,8 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
                 qry.getTimeInterval(),
                 qry.isAutoUnsubscribe(),
                 loc,
-                keepBinary);
+                keepBinary,
+                qry.isIncludeExpired());
 
             final QueryCursor<Cache.Entry<K, V>> cur =
                 qry.getInitialQuery() != null ? query(qry.getInitialQuery()) : null;

http://git-wip-us.apache.org/repos/asf/ignite/blob/aedfde69/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index 195f3ae..a8e5a6b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -421,7 +421,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
         long timeInterval,
         boolean autoUnsubscribe,
         boolean loc,
-        final boolean keepBinary) throws IgniteCheckedException
+        final boolean keepBinary,
+        final boolean includeExpired) throws IgniteCheckedException
     {
         IgniteClosure<Boolean, CacheContinuousQueryHandler> clsr;
 
@@ -438,7 +439,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
                             rmtFilterFactory,
                             true,
                             false,
-                            true,
+                            !includeExpired,
                             false,
                             null);
                     else {
@@ -456,7 +457,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
                             (CacheEntryEventSerializableFilter)fltr,
                             true,
                             false,
-                            true,
+                            !includeExpired,
                             false);
                     }
 
@@ -473,7 +474,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
                         rmtFilter,
                         true,
                         false,
-                        true,
+                        !includeExpired,
                         false);
                 }
             };

http://git-wip-us.apache.org/repos/asf/ignite/blob/aedfde69/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
index 3d238af..08acc42 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
@@ -34,6 +34,9 @@ import javax.cache.Cache;
 import javax.cache.configuration.Factory;
 import javax.cache.event.CacheEntryEvent;
 import javax.cache.event.CacheEntryUpdatedListener;
+import javax.cache.event.EventType;
+import javax.cache.expiry.CreatedExpiryPolicy;
+import javax.cache.expiry.Duration;
 import javax.cache.integration.CacheWriterException;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
@@ -71,7 +74,6 @@ import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
-import org.jsr166.ConcurrentLinkedDeque8;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.SECONDS;
@@ -1094,6 +1096,50 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testExpired() throws Exception {
+        IgniteCache<Object, Object> cache = grid(0).cache(null).
+            withExpiryPolicy(new CreatedExpiryPolicy(new Duration(MILLISECONDS, 1000)));
+
+        final Map<Object, Object> map = new ConcurrentHashMap8<>();
+        final CountDownLatch latch = new CountDownLatch(2);
+
+        ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+        qry.setIncludeExpired(true);
+
+        qry.setLocalListener(new CacheEntryUpdatedListener<Object, Object>() {
+            @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) {
+                for (CacheEntryEvent<?, ?> e : evts) {
+                    if (e.getEventType() == EventType.EXPIRED) {
+                        assertNull(e.getValue());
+
+                        map.put(e.getKey(), e.getOldValue());
+
+                        latch.countDown();
+                    }
+                }
+            }
+        });
+
+        try (QueryCursor<Cache.Entry<Object, Object>> ignored = cache.query(qry)) {
+            cache.put(1, 1);
+            cache.put(2, 2);
+
+            // Wait for expiration.
+            Thread.sleep(2000);
+
+            assert latch.await(LATCH_TIMEOUT, MILLISECONDS);
+
+            assertEquals(2, map.size());
+
+            assertEquals(1, (int)map.get(1));
+            assertEquals(2, (int)map.get(2));
+        }
+    }
+
+    /**
      *
      */
     private static class StoreFactory implements Factory<CacheStore> {