You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by nt...@apache.org on 2016/02/25 18:29:52 UTC

ignite git commit: IGNITE-1186 Fixed tests.

Repository: ignite
Updated Branches:
  refs/heads/ignite-1186 4604f1e58 -> c2b381ffe


IGNITE-1186 Fixed tests.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c2b381ff
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c2b381ff
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c2b381ff

Branch: refs/heads/ignite-1186
Commit: c2b381ffea6c2f73457e3c08c4adf29d9423377a
Parents: 4604f1e
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Thu Feb 25 17:43:33 2016 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Thu Feb 25 17:43:33 2016 +0300

----------------------------------------------------------------------
 .../ignite/events/CacheQueryExecutedEvent.java  |  8 +--
 .../continuous/CacheContinuousQueryHandler.java | 26 +++++---
 .../CacheContinuousQueryHandlerV2.java          | 16 ++++-
 .../continuous/CacheContinuousQueryManager.java | 63 ++++++++++++++------
 .../IgniteCacheEntryListenerAbstractTest.java   | 16 +++--
 .../cache/IgniteCacheEntryListenerTxTest.java   |  5 --
 ...ContinuousQueryFailoverAbstractSelfTest.java |  2 +-
 7 files changed, 93 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/c2b381ff/modules/core/src/main/java/org/apache/ignite/events/CacheQueryExecutedEvent.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/events/CacheQueryExecutedEvent.java b/modules/core/src/main/java/org/apache/ignite/events/CacheQueryExecutedEvent.java
index cc031b5..ed33ccc 100644
--- a/modules/core/src/main/java/org/apache/ignite/events/CacheQueryExecutedEvent.java
+++ b/modules/core/src/main/java/org/apache/ignite/events/CacheQueryExecutedEvent.java
@@ -18,7 +18,7 @@
 package org.apache.ignite.events;
 
 import java.util.UUID;
-import javax.cache.event.CacheEntryEventFilter;
+import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -83,7 +83,7 @@ public class CacheQueryExecutedEvent<K, V> extends EventAdapter {
 
     /** Continuous query filter. */
     @GridToStringInclude
-    private final CacheEntryEventFilter<K, V> contQryFilter;
+    private final CacheEntryEventSerializableFilter<K, V> contQryFilter;
 
     /** Query arguments. */
     @GridToStringInclude
@@ -116,7 +116,7 @@ public class CacheQueryExecutedEvent<K, V> extends EventAdapter {
         @Nullable String clsName,
         @Nullable String clause,
         @Nullable IgniteBiPredicate<K, V> scanQryFilter,
-        @Nullable CacheEntryEventFilter<K, V> contQryFilter,
+        @Nullable CacheEntryEventSerializableFilter<K, V> contQryFilter,
         @Nullable Object[] args,
         @Nullable UUID subjId,
         @Nullable String taskName) {
@@ -194,7 +194,7 @@ public class CacheQueryExecutedEvent<K, V> extends EventAdapter {
      *
      * @return Continuous query filter.
      */
-    @Nullable public CacheEntryEventFilter<K, V> continuousQueryFilter() {
+    @Nullable public CacheEntryEventSerializableFilter<K, V> continuousQueryFilter() {
         return contQryFilter;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/c2b381ff/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index a69d14e..2815dc2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -37,6 +37,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicLong;
 import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryEventFilter;
 import javax.cache.event.CacheEntryUpdatedListener;
 import javax.cache.event.EventType;
 import org.apache.ignite.IgniteCache;
@@ -262,8 +263,8 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
         if (locLsnr != null)
             ctx.resource().injectGeneric(locLsnr);
 
-        if (rmtFilter != null)
-            ctx.resource().injectGeneric(rmtFilter);
+        if (getRemoteFilter() != null)
+            ctx.resource().injectGeneric(getRemoteFilter());
 
         entryBufs = new ConcurrentHashMap<>();
 
@@ -303,7 +304,8 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
                         null,
                         null,
                         null,
-                        rmtFilter,
+                        getRemoteFilter() instanceof CacheEntryEventSerializableFilter ?
+                            (CacheEntryEventSerializableFilter)getRemoteFilter() : null,
                         null,
                         nodeId,
                         taskName()
@@ -332,9 +334,9 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
 
                 boolean notify = !evt.entry().isFiltered();
 
-                if (notify && rmtFilter != null) {
+                if (notify && getRemoteFilter() != null) {
                     try {
-                        notify = rmtFilter.evaluate(evt);
+                        notify = getRemoteFilter().evaluate(evt);
                     }
                     catch (Exception e) {
                         U.error(cctx.logger(CacheContinuousQueryHandler.class), "CacheEntryEventFilter failed: " + e);
@@ -422,7 +424,8 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
                         null,
                         null,
                         null,
-                        rmtFilter,
+                        getRemoteFilter() instanceof CacheEntryEventSerializableFilter ?
+                            (CacheEntryEventSerializableFilter)getRemoteFilter() : null,
                         null,
                         nodeId,
                         taskName(),
@@ -435,8 +438,8 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
             }
 
             @Override public void onUnregister() {
-                if (rmtFilter instanceof PlatformContinuousQueryFilter)
-                    ((PlatformContinuousQueryFilter)rmtFilter).onQueryUnregister();
+                if (getRemoteFilter() instanceof PlatformContinuousQueryFilter)
+                    ((PlatformContinuousQueryFilter)getRemoteFilter()).onQueryUnregister();
             }
 
             @Override public void cleanupBackupQueue(Map<Integer, Long> updateCntrs) {
@@ -517,6 +520,13 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
     }
 
     /**
+     * @return Remote filter.
+     */
+    protected CacheEntryEventFilter<K, V> getRemoteFilter() {
+        return rmtFilter;
+    }
+
+    /**
      * @param cctx Context.
      * @param nodeId ID of the node that started routine.
      * @param entry Entry.

http://git-wip-us.apache.org/repos/asf/ignite/blob/c2b381ff/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java
index 6cdc5af..7f00fc1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java
@@ -23,6 +23,7 @@ import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import java.util.UUID;
 import javax.cache.configuration.Factory;
+import javax.cache.event.CacheEntryEventFilter;
 import javax.cache.event.CacheEntryUpdatedListener;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
@@ -45,6 +46,9 @@ public class CacheContinuousQueryHandlerV2<K, V> extends CacheContinuousQueryHan
     /** Deployable object for filter factory. */
     private DeployableObject rmtFilterFactoryDep;
 
+    /** */
+    protected transient CacheEntryEventFilter<K, V> rmtNonSerFilter;
+
     /**
      * Required by {@link Externalizable}.
      */
@@ -88,12 +92,18 @@ public class CacheContinuousQueryHandlerV2<K, V> extends CacheContinuousQueryHan
         super(cacheName, topic, locLsnr, rmtFilter, internal, notifyExisting, oldValRequired, sync, ignoreExpired,
             taskHash, skipPrimaryCheck, locCache, keepBinary, ignoreClassNotFound);
 
-        assert rmtFilter != null ^ rmtFilterFactory != null || rmtFilter == null && rmtFilterFactory == null;
+        assert rmtFilter != null ^ rmtFilterFactory != null || rmtFilter == null && rmtFilterFactory == null :
+            "Remote Filter and Remote Filter Factory both are not null. Should be set only one.";
 
         this.rmtFilterFactory = rmtFilterFactory;
 
         if (rmtFilterFactory != null)
-            this.rmtFilter = rmtFilterFactory.create();
+            this.rmtNonSerFilter = rmtFilterFactory.create();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheEntryEventFilter<K, V> getRemoteFilter() {
+        return rmtNonSerFilter != null ? rmtNonSerFilter : rmtFilter;
     }
 
     /** {@inheritDoc} */
@@ -193,6 +203,6 @@ public class CacheContinuousQueryHandlerV2<K, V> extends CacheContinuousQueryHan
             rmtFilterFactory = (Factory<CacheEntryEventSerializableFilter<K, V>>)in.readObject();
 
         if (rmtFilter == null && rmtFilterFactory != null)
-            rmtFilter = rmtFilterFactory.create();
+            rmtNonSerFilter = rmtFilterFactory.create();
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/c2b381ff/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index 118a342..4469cf0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -559,7 +559,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
      */
     private UUID executeQuery0(CacheEntryUpdatedListener locLsnr,
         final CacheEntryEventSerializableFilter rmtFilter,
-        final Factory<? extends CacheEntryEventSerializableFilter> rmtFilterFactory,
+        final Factory<? extends CacheEntryEventFilter> rmtFilterFactory,
         int bufSize,
         long timeInterval,
         boolean autoUnsubscribe,
@@ -600,12 +600,22 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
                 cctx.isLocal(),
                 keepBinary,
                 ignoreClassNotFound);
-        else
+        else {
+            CacheEntryEventFilter fltr = null;
+
+            if (rmtFilterFactory != null) {
+                fltr = rmtFilterFactory.create();
+
+                if (!(fltr instanceof CacheEntryEventSerializableFilter))
+                    throw new IgniteCheckedException("Cache entry event filter must implement " +
+                        "org.apache.ignite.cache.CacheEntryEventSerializableFilter: " + fltr);
+            }
+
             hnd = new CacheContinuousQueryHandler(
                 cctx.name(),
                 TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()),
                 locLsnr,
-                rmtFilter,
+                (CacheEntryEventSerializableFilter)fltr,
                 internal,
                 notifyExisting,
                 oldValRequired,
@@ -616,6 +626,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
                 cctx.isLocal(),
                 keepBinary,
                 ignoreClassNotFound);
+        }
 
         IgnitePredicate<ClusterNode> pred = (loc || cctx.config().getCacheMode() == CacheMode.LOCAL) ?
             F.nodeForNodeId(cctx.localNodeId()) : F.<ClusterNode>alwaysTrue();
@@ -812,22 +823,10 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
                 locLsnrImpl,
                 log);
 
-            CacheEntryEventFilter fltr = null;
-
-            if (cfg.getCacheEntryEventFilterFactory() != null) {
-                fltr = (CacheEntryEventFilter)cfg.getCacheEntryEventFilterFactory().create();
-
-                if (!(fltr instanceof Serializable))
-                    throw new IgniteCheckedException("Cache entry event filter must implement java.io.Serializable: "
-                        + fltr);
-            }
-
-            CacheEntryEventSerializableFilter rmtFilter = new JCacheQueryRemoteFilter(fltr, types);
-
             routineId = executeQuery0(
                 locLsnr,
-                rmtFilter,
-                cfg.getCacheEntryEventFilterFactory(),
+                null,
+                new JCacheRemoteQueryFactory(cfg.getCacheEntryEventFilterFactory(), types),
                 ContinuousQuery.DFLT_PAGE_SIZE,
                 ContinuousQuery.DFLT_TIME_INTERVAL,
                 ContinuousQuery.DFLT_AUTO_UNSUBSCRIBE,
@@ -856,6 +855,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
     }
 
     /**
+     *
      */
     private static class JCacheQueryLocalListener<K, V> implements CacheEntryUpdatedListener<K, V> {
         /** */
@@ -938,6 +938,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
     }
 
     /**
+     * For handler version 2.0 this filter should not be serialized.
      */
     private static class JCacheQueryRemoteFilter implements CacheEntryEventSerializableFilter, Externalizable {
         /** */
@@ -1021,6 +1022,34 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
     }
 
     /**
+     *
+     */
+    private static class JCacheRemoteQueryFactory implements Factory<CacheEntryEventFilter> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** Factory. */
+        private Factory<CacheEntryEventFilter> impl;
+
+        /** */
+        private byte types;
+
+        /**
+         * @param impl Factory.
+         * @param types Types.
+         */
+        public JCacheRemoteQueryFactory(@Nullable Factory<CacheEntryEventFilter> impl, byte types) {
+            this.impl = impl;
+            this.types = types;
+        }
+
+        /** {@inheritDoc} */
+        @Override public CacheEntryEventFilter create() {
+            return new JCacheQueryRemoteFilter(impl != null ? impl.create() : null, types);
+        }
+    }
+
+    /**
      * Task flash backup queue.
      */
     private static final class BackupCleaner implements Runnable {

http://git-wip-us.apache.org/repos/asf/ignite/blob/c2b381ff/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
index e6bfd87..62a4153 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
@@ -178,11 +178,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
                     return new CreateUpdateRemoveExpireListener();
                 }
             },
-            new Factory<CacheEntryEventSerializableFilter<Object, Object>>() {
-                @Override public CacheEntryEventSerializableFilter<Object, Object> create() {
-                    return new ExceptionFilter();
-                }
-            },
+            new ExceptionFilterFactory(),
             false,
             false
         );
@@ -1467,4 +1463,14 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
             return S.toString(ListenerTestValue.class, this);
         }
     }
+
+    /**
+     *
+     */
+    static class ExceptionFilterFactory implements Factory<CacheEntryEventSerializableFilter<Object, Object>> {
+        /** {@inheritDoc} */
+        @Override public CacheEntryEventSerializableFilter<Object, Object> create() {
+            return new ExceptionFilter();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/c2b381ff/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxTest.java
index 41725e7..cad57f0 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxTest.java
@@ -47,9 +47,4 @@ public class IgniteCacheEntryListenerTxTest extends IgniteCacheEntryListenerAbst
     @Override protected NearCacheConfiguration nearConfiguration() {
         return null;
     }
-
-    /** {@inheritDoc} */
-    @Override public void testEvents(){
-        fail("https://issues.apache.org/jira/browse/IGNITE-1600");
-    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/c2b381ff/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
index a42f056..f104f21 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
@@ -1432,7 +1432,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
             GridContinuousHandler hnd = GridTestUtils.getFieldValue(info, "hnd");
 
             if (hnd.isQuery() && hnd.cacheName() == null) {
-                backupQueue = GridTestUtils.getFieldValue(hnd, "backupQueue");
+                backupQueue = GridTestUtils.getFieldValue(hnd, CacheContinuousQueryHandler.class, "backupQueue");
 
                 break;
             }