You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/08/24 14:04:46 UTC
[23/50] [abbrv] ignite git commit: IGNITE-3470 - Support EXPIRED
events in continuous queries
IGNITE-3470 - Support EXPIRED events in continuous queries
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/aedfde69
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/aedfde69
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/aedfde69
Branch: refs/heads/ignite-2649
Commit: aedfde69af6e91277616052ab60fa0037693c2c6
Parents: b81dbbf
Author: Valentin Kulichenko <va...@gmail.com>
Authored: Tue Jul 19 16:01:32 2016 -0700
Committer: Valentin Kulichenko <va...@gmail.com>
Committed: Tue Jul 19 16:01:32 2016 -0700
----------------------------------------------------------------------
.../ignite/cache/query/ContinuousQuery.java | 47 +++++++++++++++----
.../processors/cache/IgniteCacheProxy.java | 3 +-
.../continuous/CacheContinuousQueryManager.java | 9 ++--
...ridCacheContinuousQueryAbstractSelfTest.java | 48 +++++++++++++++++++-
4 files changed, 91 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/aedfde69/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java
index bbfe8cc..49d471e 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java
@@ -21,6 +21,7 @@ import javax.cache.Cache;
import javax.cache.configuration.Factory;
import javax.cache.event.CacheEntryEventFilter;
import javax.cache.event.CacheEntryUpdatedListener;
+import javax.cache.event.EventType;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
import org.apache.ignite.configuration.IgniteConfiguration;
@@ -142,6 +143,9 @@ public final class ContinuousQuery<K, V> extends Query<Cache.Entry<K, V>> {
/** Automatic unsubscription flag. */
private boolean autoUnsubscribe = DFLT_AUTO_UNSUBSCRIBE;
+ /** Whether to notify about {@link EventType#EXPIRED} events. */
+ private boolean includeExpired;
+
/**
* Creates new continuous query.
*/
@@ -324,6 +328,38 @@ public final class ContinuousQuery<K, V> extends Query<Cache.Entry<K, V>> {
return this;
}
+ /**
+ * Gets automatic unsubscription flag value.
+ *
+ * @return Automatic unsubscription flag.
+ */
+ public boolean isAutoUnsubscribe() {
+ return autoUnsubscribe;
+ }
+
+ /**
+ * Sets the flag value defining whether to notify about {@link EventType#EXPIRED} events.
+ * If {@code true}, then the remote listener will get notifications about entries
+ * expired in cache. Otherwise, only {@link EventType#CREATED}, {@link EventType#UPDATED}
+ * and {@link EventType#REMOVED} events will be fired in the remote listener.
+ * <p>
+ * This flag is {@code false} by default, so {@link EventType#EXPIRED} events are disabled.
+ *
+ * @param includeExpired Whether to notify about {@link EventType#EXPIRED} events.
+ */
+ public void setIncludeExpired(boolean includeExpired) {
+ this.includeExpired = includeExpired;
+ }
+
+ /**
+ * Gets the flag value defining whether to notify about {@link EventType#EXPIRED} events.
+ *
+ * @return Whether to notify about {@link EventType#EXPIRED} events.
+ */
+ public boolean isIncludeExpired() {
+ return includeExpired;
+ }
+
/** {@inheritDoc} */
@Override public ContinuousQuery<K, V> setPageSize(int pageSize) {
return (ContinuousQuery<K, V>)super.setPageSize(pageSize);
@@ -333,13 +369,4 @@ public final class ContinuousQuery<K, V> extends Query<Cache.Entry<K, V>> {
@Override public ContinuousQuery<K, V> setLocal(boolean loc) {
return (ContinuousQuery<K, V>)super.setLocal(loc);
}
-
- /**
- * Gets automatic unsubscription flag value.
- *
- * @return Automatic unsubscription flag.
- */
- public boolean isAutoUnsubscribe() {
- return autoUnsubscribe;
- }
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/aedfde69/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index 92e59db..249cfae 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -598,7 +598,8 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
qry.getTimeInterval(),
qry.isAutoUnsubscribe(),
loc,
- keepBinary);
+ keepBinary,
+ qry.isIncludeExpired());
final QueryCursor<Cache.Entry<K, V>> cur =
qry.getInitialQuery() != null ? query(qry.getInitialQuery()) : null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/aedfde69/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index 195f3ae..a8e5a6b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -421,7 +421,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
long timeInterval,
boolean autoUnsubscribe,
boolean loc,
- final boolean keepBinary) throws IgniteCheckedException
+ final boolean keepBinary,
+ final boolean includeExpired) throws IgniteCheckedException
{
IgniteClosure<Boolean, CacheContinuousQueryHandler> clsr;
@@ -438,7 +439,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
rmtFilterFactory,
true,
false,
- true,
+ !includeExpired,
false,
null);
else {
@@ -456,7 +457,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
(CacheEntryEventSerializableFilter)fltr,
true,
false,
- true,
+ !includeExpired,
false);
}
@@ -473,7 +474,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
rmtFilter,
true,
false,
- true,
+ !includeExpired,
false);
}
};
http://git-wip-us.apache.org/repos/asf/ignite/blob/aedfde69/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
index 3d238af..08acc42 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
@@ -34,6 +34,9 @@ import javax.cache.Cache;
import javax.cache.configuration.Factory;
import javax.cache.event.CacheEntryEvent;
import javax.cache.event.CacheEntryUpdatedListener;
+import javax.cache.event.EventType;
+import javax.cache.expiry.CreatedExpiryPolicy;
+import javax.cache.expiry.Duration;
import javax.cache.integration.CacheWriterException;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
@@ -71,7 +74,6 @@ import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
-import org.jsr166.ConcurrentLinkedDeque8;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
@@ -1094,6 +1096,50 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
}
/**
+ * @throws Exception If failed.
+ */
+ public void testExpired() throws Exception {
+ IgniteCache<Object, Object> cache = grid(0).cache(null).
+ withExpiryPolicy(new CreatedExpiryPolicy(new Duration(MILLISECONDS, 1000)));
+
+ final Map<Object, Object> map = new ConcurrentHashMap8<>();
+ final CountDownLatch latch = new CountDownLatch(2);
+
+ ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+ qry.setIncludeExpired(true);
+
+ qry.setLocalListener(new CacheEntryUpdatedListener<Object, Object>() {
+ @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) {
+ for (CacheEntryEvent<?, ?> e : evts) {
+ if (e.getEventType() == EventType.EXPIRED) {
+ assertNull(e.getValue());
+
+ map.put(e.getKey(), e.getOldValue());
+
+ latch.countDown();
+ }
+ }
+ }
+ });
+
+ try (QueryCursor<Cache.Entry<Object, Object>> ignored = cache.query(qry)) {
+ cache.put(1, 1);
+ cache.put(2, 2);
+
+ // Wait for expiration.
+ Thread.sleep(2000);
+
+ assert latch.await(LATCH_TIMEOUT, MILLISECONDS);
+
+ assertEquals(2, map.size());
+
+ assertEquals(1, (int)map.get(1));
+ assertEquals(2, (int)map.get(2));
+ }
+ }
+
+ /**
*
*/
private static class StoreFactory implements Factory<CacheStore> {