You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by nt...@apache.org on 2016/02/25 18:29:52 UTC
ignite git commit: IGNITE-1186 Fixed tests.
Repository: ignite
Updated Branches:
refs/heads/ignite-1186 4604f1e58 -> c2b381ffe
IGNITE-1186 Fixed tests.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c2b381ff
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c2b381ff
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c2b381ff
Branch: refs/heads/ignite-1186
Commit: c2b381ffea6c2f73457e3c08c4adf29d9423377a
Parents: 4604f1e
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Thu Feb 25 17:43:33 2016 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Thu Feb 25 17:43:33 2016 +0300
----------------------------------------------------------------------
.../ignite/events/CacheQueryExecutedEvent.java | 8 +--
.../continuous/CacheContinuousQueryHandler.java | 26 +++++---
.../CacheContinuousQueryHandlerV2.java | 16 ++++-
.../continuous/CacheContinuousQueryManager.java | 63 ++++++++++++++------
.../IgniteCacheEntryListenerAbstractTest.java | 16 +++--
.../cache/IgniteCacheEntryListenerTxTest.java | 5 --
...ContinuousQueryFailoverAbstractSelfTest.java | 2 +-
7 files changed, 93 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/c2b381ff/modules/core/src/main/java/org/apache/ignite/events/CacheQueryExecutedEvent.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/events/CacheQueryExecutedEvent.java b/modules/core/src/main/java/org/apache/ignite/events/CacheQueryExecutedEvent.java
index cc031b5..ed33ccc 100644
--- a/modules/core/src/main/java/org/apache/ignite/events/CacheQueryExecutedEvent.java
+++ b/modules/core/src/main/java/org/apache/ignite/events/CacheQueryExecutedEvent.java
@@ -18,7 +18,7 @@
package org.apache.ignite.events;
import java.util.UUID;
-import javax.cache.event.CacheEntryEventFilter;
+import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -83,7 +83,7 @@ public class CacheQueryExecutedEvent<K, V> extends EventAdapter {
/** Continuous query filter. */
@GridToStringInclude
- private final CacheEntryEventFilter<K, V> contQryFilter;
+ private final CacheEntryEventSerializableFilter<K, V> contQryFilter;
/** Query arguments. */
@GridToStringInclude
@@ -116,7 +116,7 @@ public class CacheQueryExecutedEvent<K, V> extends EventAdapter {
@Nullable String clsName,
@Nullable String clause,
@Nullable IgniteBiPredicate<K, V> scanQryFilter,
- @Nullable CacheEntryEventFilter<K, V> contQryFilter,
+ @Nullable CacheEntryEventSerializableFilter<K, V> contQryFilter,
@Nullable Object[] args,
@Nullable UUID subjId,
@Nullable String taskName) {
@@ -194,7 +194,7 @@ public class CacheQueryExecutedEvent<K, V> extends EventAdapter {
*
* @return Continuous query filter.
*/
- @Nullable public CacheEntryEventFilter<K, V> continuousQueryFilter() {
+ @Nullable public CacheEntryEventSerializableFilter<K, V> continuousQueryFilter() {
return contQryFilter;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c2b381ff/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index a69d14e..2815dc2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -37,6 +37,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryEventFilter;
import javax.cache.event.CacheEntryUpdatedListener;
import javax.cache.event.EventType;
import org.apache.ignite.IgniteCache;
@@ -262,8 +263,8 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
if (locLsnr != null)
ctx.resource().injectGeneric(locLsnr);
- if (rmtFilter != null)
- ctx.resource().injectGeneric(rmtFilter);
+ if (getRemoteFilter() != null)
+ ctx.resource().injectGeneric(getRemoteFilter());
entryBufs = new ConcurrentHashMap<>();
@@ -303,7 +304,8 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
null,
null,
null,
- rmtFilter,
+ getRemoteFilter() instanceof CacheEntryEventSerializableFilter ?
+ (CacheEntryEventSerializableFilter)getRemoteFilter() : null,
null,
nodeId,
taskName()
@@ -332,9 +334,9 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
boolean notify = !evt.entry().isFiltered();
- if (notify && rmtFilter != null) {
+ if (notify && getRemoteFilter() != null) {
try {
- notify = rmtFilter.evaluate(evt);
+ notify = getRemoteFilter().evaluate(evt);
}
catch (Exception e) {
U.error(cctx.logger(CacheContinuousQueryHandler.class), "CacheEntryEventFilter failed: " + e);
@@ -422,7 +424,8 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
null,
null,
null,
- rmtFilter,
+ getRemoteFilter() instanceof CacheEntryEventSerializableFilter ?
+ (CacheEntryEventSerializableFilter)getRemoteFilter() : null,
null,
nodeId,
taskName(),
@@ -435,8 +438,8 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
}
@Override public void onUnregister() {
- if (rmtFilter instanceof PlatformContinuousQueryFilter)
- ((PlatformContinuousQueryFilter)rmtFilter).onQueryUnregister();
+ if (getRemoteFilter() instanceof PlatformContinuousQueryFilter)
+ ((PlatformContinuousQueryFilter)getRemoteFilter()).onQueryUnregister();
}
@Override public void cleanupBackupQueue(Map<Integer, Long> updateCntrs) {
@@ -517,6 +520,13 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
}
/**
+ * @return Remote filter.
+ */
+ protected CacheEntryEventFilter<K, V> getRemoteFilter() {
+ return rmtFilter;
+ }
+
+ /**
* @param cctx Context.
* @param nodeId ID of the node that started routine.
* @param entry Entry.
http://git-wip-us.apache.org/repos/asf/ignite/blob/c2b381ff/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java
index 6cdc5af..7f00fc1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java
@@ -23,6 +23,7 @@ import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.UUID;
import javax.cache.configuration.Factory;
+import javax.cache.event.CacheEntryEventFilter;
import javax.cache.event.CacheEntryUpdatedListener;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
@@ -45,6 +46,9 @@ public class CacheContinuousQueryHandlerV2<K, V> extends CacheContinuousQueryHan
/** Deployable object for filter factory. */
private DeployableObject rmtFilterFactoryDep;
+ /** */
+ protected transient CacheEntryEventFilter<K, V> rmtNonSerFilter;
+
/**
* Required by {@link Externalizable}.
*/
@@ -88,12 +92,18 @@ public class CacheContinuousQueryHandlerV2<K, V> extends CacheContinuousQueryHan
super(cacheName, topic, locLsnr, rmtFilter, internal, notifyExisting, oldValRequired, sync, ignoreExpired,
taskHash, skipPrimaryCheck, locCache, keepBinary, ignoreClassNotFound);
- assert rmtFilter != null ^ rmtFilterFactory != null || rmtFilter == null && rmtFilterFactory == null;
+ assert rmtFilter != null ^ rmtFilterFactory != null || rmtFilter == null && rmtFilterFactory == null :
+ "Remote Filter and Remote Filter Factory both are not null. Should be set only one.";
this.rmtFilterFactory = rmtFilterFactory;
if (rmtFilterFactory != null)
- this.rmtFilter = rmtFilterFactory.create();
+ this.rmtNonSerFilter = rmtFilterFactory.create();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected CacheEntryEventFilter<K, V> getRemoteFilter() {
+ return rmtNonSerFilter != null ? rmtNonSerFilter : rmtFilter;
}
/** {@inheritDoc} */
@@ -193,6 +203,6 @@ public class CacheContinuousQueryHandlerV2<K, V> extends CacheContinuousQueryHan
rmtFilterFactory = (Factory<CacheEntryEventSerializableFilter<K, V>>)in.readObject();
if (rmtFilter == null && rmtFilterFactory != null)
- rmtFilter = rmtFilterFactory.create();
+ rmtNonSerFilter = rmtFilterFactory.create();
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c2b381ff/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index 118a342..4469cf0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -559,7 +559,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
*/
private UUID executeQuery0(CacheEntryUpdatedListener locLsnr,
final CacheEntryEventSerializableFilter rmtFilter,
- final Factory<? extends CacheEntryEventSerializableFilter> rmtFilterFactory,
+ final Factory<? extends CacheEntryEventFilter> rmtFilterFactory,
int bufSize,
long timeInterval,
boolean autoUnsubscribe,
@@ -600,12 +600,22 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
cctx.isLocal(),
keepBinary,
ignoreClassNotFound);
- else
+ else {
+ CacheEntryEventFilter fltr = null;
+
+ if (rmtFilterFactory != null) {
+ fltr = rmtFilterFactory.create();
+
+ if (!(fltr instanceof CacheEntryEventSerializableFilter))
+ throw new IgniteCheckedException("Cache entry event filter must implement " +
+ "org.apache.ignite.cache.CacheEntryEventSerializableFilter: " + fltr);
+ }
+
hnd = new CacheContinuousQueryHandler(
cctx.name(),
TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()),
locLsnr,
- rmtFilter,
+ (CacheEntryEventSerializableFilter)fltr,
internal,
notifyExisting,
oldValRequired,
@@ -616,6 +626,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
cctx.isLocal(),
keepBinary,
ignoreClassNotFound);
+ }
IgnitePredicate<ClusterNode> pred = (loc || cctx.config().getCacheMode() == CacheMode.LOCAL) ?
F.nodeForNodeId(cctx.localNodeId()) : F.<ClusterNode>alwaysTrue();
@@ -812,22 +823,10 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
locLsnrImpl,
log);
- CacheEntryEventFilter fltr = null;
-
- if (cfg.getCacheEntryEventFilterFactory() != null) {
- fltr = (CacheEntryEventFilter)cfg.getCacheEntryEventFilterFactory().create();
-
- if (!(fltr instanceof Serializable))
- throw new IgniteCheckedException("Cache entry event filter must implement java.io.Serializable: "
- + fltr);
- }
-
- CacheEntryEventSerializableFilter rmtFilter = new JCacheQueryRemoteFilter(fltr, types);
-
routineId = executeQuery0(
locLsnr,
- rmtFilter,
- cfg.getCacheEntryEventFilterFactory(),
+ null,
+ new JCacheRemoteQueryFactory(cfg.getCacheEntryEventFilterFactory(), types),
ContinuousQuery.DFLT_PAGE_SIZE,
ContinuousQuery.DFLT_TIME_INTERVAL,
ContinuousQuery.DFLT_AUTO_UNSUBSCRIBE,
@@ -856,6 +855,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
}
/**
+ *
*/
private static class JCacheQueryLocalListener<K, V> implements CacheEntryUpdatedListener<K, V> {
/** */
@@ -938,6 +938,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
}
/**
+ * For handler version 2.0 this filter should not be serialized.
*/
private static class JCacheQueryRemoteFilter implements CacheEntryEventSerializableFilter, Externalizable {
/** */
@@ -1021,6 +1022,34 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
}
/**
+ *
+ */
+ private static class JCacheRemoteQueryFactory implements Factory<CacheEntryEventFilter> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Factory. */
+ private Factory<CacheEntryEventFilter> impl;
+
+ /** */
+ private byte types;
+
+ /**
+ * @param impl Factory.
+ * @param types Types.
+ */
+ public JCacheRemoteQueryFactory(@Nullable Factory<CacheEntryEventFilter> impl, byte types) {
+ this.impl = impl;
+ this.types = types;
+ }
+
+ /** {@inheritDoc} */
+ @Override public CacheEntryEventFilter create() {
+ return new JCacheQueryRemoteFilter(impl != null ? impl.create() : null, types);
+ }
+ }
+
+ /**
* Task flash backup queue.
*/
private static final class BackupCleaner implements Runnable {
http://git-wip-us.apache.org/repos/asf/ignite/blob/c2b381ff/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
index e6bfd87..62a4153 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
@@ -178,11 +178,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
return new CreateUpdateRemoveExpireListener();
}
},
- new Factory<CacheEntryEventSerializableFilter<Object, Object>>() {
- @Override public CacheEntryEventSerializableFilter<Object, Object> create() {
- return new ExceptionFilter();
- }
- },
+ new ExceptionFilterFactory(),
false,
false
);
@@ -1467,4 +1463,14 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
return S.toString(ListenerTestValue.class, this);
}
}
+
+ /**
+ *
+ */
+ static class ExceptionFilterFactory implements Factory<CacheEntryEventSerializableFilter<Object, Object>> {
+ /** {@inheritDoc} */
+ @Override public CacheEntryEventSerializableFilter<Object, Object> create() {
+ return new ExceptionFilter();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c2b381ff/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxTest.java
index 41725e7..cad57f0 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxTest.java
@@ -47,9 +47,4 @@ public class IgniteCacheEntryListenerTxTest extends IgniteCacheEntryListenerAbst
@Override protected NearCacheConfiguration nearConfiguration() {
return null;
}
-
- /** {@inheritDoc} */
- @Override public void testEvents(){
- fail("https://issues.apache.org/jira/browse/IGNITE-1600");
- }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/c2b381ff/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
index a42f056..f104f21 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
@@ -1432,7 +1432,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
GridContinuousHandler hnd = GridTestUtils.getFieldValue(info, "hnd");
if (hnd.isQuery() && hnd.cacheName() == null) {
- backupQueue = GridTestUtils.getFieldValue(hnd, "backupQueue");
+ backupQueue = GridTestUtils.getFieldValue(hnd, CacheContinuousQueryHandler.class, "backupQueue");
break;
}