You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/03/04 09:06:13 UTC

[07/10] ignite git commit: Fixed IGNITE-1186 "Filter is sent instead of factory when continuous query is created".

http://git-wip-us.apache.org/repos/asf/ignite/blob/baa13122/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
index 62ed66f..cdf4ffd 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
@@ -28,11 +28,14 @@ import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import javax.cache.Cache;
 import javax.cache.configuration.Factory;
 import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryEventFilter;
+import javax.cache.event.CacheEntryListenerException;
 import javax.cache.event.CacheEntryUpdatedListener;
 import javax.cache.integration.CacheLoaderException;
 import javax.cache.integration.CacheWriterException;
@@ -40,7 +43,9 @@ import javax.cache.processor.EntryProcessor;
 import javax.cache.processor.MutableEntry;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
 import org.apache.ignite.cache.CacheMemoryMode;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.affinity.Affinity;
@@ -55,6 +60,7 @@ import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.apache.ignite.transactions.Transaction;
 import org.apache.ignite.transactions.TransactionConcurrency;
@@ -132,6 +138,51 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
     /**
      * @throws Exception If failed.
      */
+    public void testFilterAndFactoryProvided() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            1,
+            ATOMIC,
+            ONHEAP_TIERED,
+            false);
+
+        final IgniteCache<Object, Object> cache = grid(0).getOrCreateCache(ccfg);
+
+        try {
+            final ContinuousQuery qry = new ContinuousQuery();
+
+            qry.setRemoteFilterFactory(new Factory<CacheEntryEventFilter>() {
+                @Override public CacheEntryEventFilter create() {
+                    return null;
+                }
+            });
+
+            qry.setRemoteFilter(new CacheEntryEventSerializableFilter() {
+                @Override public boolean evaluate(CacheEntryEvent event) throws CacheEntryListenerException {
+                    return false;
+                }
+            });
+
+            qry.setLocalListener(new CacheEntryUpdatedListener() {
+                @Override public void onUpdated(Iterable iterable) throws CacheEntryListenerException {
+                    // No-op.
+                }
+            });
+
+            GridTestUtils.assertThrows(log, new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    return cache.query(qry);
+                }
+            }, IgniteException.class, null);
+
+        }
+        finally {
+            cache.destroy();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testAtomicClient() throws Exception {
         CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
             1,
@@ -576,7 +627,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
      * @param deploy The place where continuous query will be started.
      * @throws Exception If failed.
      */
-    private void testContinuousQuery(CacheConfiguration<Object, Object> ccfg, ContinuousDeploy deploy)
+    protected void testContinuousQuery(CacheConfiguration<Object, Object> ccfg, ContinuousDeploy deploy)
         throws Exception {
         ignite(0).createCache(ccfg);
 
@@ -1124,7 +1175,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
      * @param store If {@code true} configures dummy cache store.
      * @return Cache configuration.
      */
-    private CacheConfiguration<Object, Object> cacheConfiguration(
+    protected CacheConfiguration<Object, Object> cacheConfiguration(
         CacheMode cacheMode,
         int backups,
         CacheAtomicityMode atomicityMode,
@@ -1176,7 +1227,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
     /**
      *
      */
-    static class QueryTestKey implements Serializable, Comparable {
+    public static class QueryTestKey implements Serializable, Comparable {
         /** */
         private final Integer key;
 
@@ -1219,12 +1270,12 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
     /**
      *
      */
-    static class QueryTestValue implements Serializable {
+    public static class QueryTestValue implements Serializable {
         /** */
-        private final Integer val1;
+        protected final Integer val1;
 
         /** */
-        private final String val2;
+        protected final String val2;
 
         /**
          * @param val Value.

http://git-wip-us.apache.org/repos/asf/ignite/blob/baa13122/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentEntryEventFilter.java
----------------------------------------------------------------------
diff --git a/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentEntryEventFilter.java b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentEntryEventFilter.java
new file mode 100644
index 0000000..359dd58
--- /dev/null
+++ b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentEntryEventFilter.java
@@ -0,0 +1,33 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.ignite.tests.p2p;
+
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryEventFilter;
+import javax.cache.event.CacheEntryListenerException;
+
+/**
+ * Event filter for deployment.
+ */
+public class CacheDeploymentEntryEventFilter implements CacheEntryEventFilter<Integer, Integer> {
+    /** {@inheritDoc} */
+    @Override public boolean evaluate(CacheEntryEvent<? extends Integer, ? extends Integer> evt)
+        throws CacheEntryListenerException {
+        return evt.getValue() == null || evt.getValue() % 2 != 0;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/baa13122/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentEntryEventFilterFactory.java
----------------------------------------------------------------------
diff --git a/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentEntryEventFilterFactory.java b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentEntryEventFilterFactory.java
new file mode 100644
index 0000000..0d6eceb
--- /dev/null
+++ b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentEntryEventFilterFactory.java
@@ -0,0 +1,31 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.ignite.tests.p2p;
+
+import javax.cache.configuration.Factory;
+import javax.cache.event.CacheEntryEventFilter;
+
+/**
+ * Event filter factory for deployment.
+ */
+public class CacheDeploymentEntryEventFilterFactory implements Factory<CacheEntryEventFilter<Integer, Integer>> {
+    /** {@inheritDoc} */
+    @Override public CacheEntryEventFilter<Integer, Integer> create() {
+        return new CacheDeploymentEntryEventFilter();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/baa13122/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
index 968dbf6..083af1e 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
@@ -79,12 +79,14 @@ import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinu
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryCounterReplicatedTxTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousBatchAckTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousBatchForceServerModeAckTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFactoryFilterTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverAtomicPrimaryWriteOrderOffheapTieredTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverAtomicPrimaryWriteOrderSelfTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverAtomicReplicatedSelfTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverTxOffheapTieredTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverTxReplicatedSelfTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverTxSelfTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryOperationP2PTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicNearEnabledSelfTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicOffheapTieredTest;
@@ -225,6 +227,8 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite {
         suite.addTestSuite(CacheContinuousQueryFailoverAtomicPrimaryWriteOrderOffheapTieredTest.class);
         suite.addTestSuite(CacheContinuousQueryFailoverTxOffheapTieredTest.class);
         suite.addTestSuite(CacheContinuousQueryRandomOperationsTest.class);
+        suite.addTestSuite(CacheContinuousQueryFactoryFilterTest.class);
+        suite.addTestSuite(CacheContinuousQueryOperationP2PTest.class);
         suite.addTestSuite(CacheContinuousBatchAckTest.class);
         suite.addTestSuite(CacheContinuousBatchForceServerModeAckTest.class);