You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/08/19 11:29:28 UTC
ignite git commit: ignite-900 Added test.
Repository: ignite
Updated Branches:
refs/heads/master 612eb3daf -> 0c860ec72
ignite-900 Added test.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0c860ec7
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0c860ec7
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0c860ec7
Branch: refs/heads/master
Commit: 0c860ec7244d7f531c8908b1574ca3169f4ddfed
Parents: 612eb3d
Author: sboikov <sb...@gridgain.com>
Authored: Fri Aug 19 14:28:56 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Aug 19 14:29:18 2016 +0300
----------------------------------------------------------------------
...teCacheContinuousQueryNoUnsubscribeTest.java | 153 +++++++++++++++++++
.../IgniteCacheQuerySelfTestSuite3.java | 2 +
2 files changed, 155 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/0c860ec7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryNoUnsubscribeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryNoUnsubscribeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryNoUnsubscribeTest.java
new file mode 100644
index 0000000..d7beb02
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryNoUnsubscribeTest.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.cache.configuration.FactoryBuilder;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryListenerException;
+import javax.cache.event.CacheEntryUpdatedListener;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ *
+ */
+public class IgniteCacheContinuousQueryNoUnsubscribeTest extends GridCommonAbstractTest {
+ /** */
+ private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private static AtomicInteger cntr = new AtomicInteger();
+
+ /** */
+ private boolean client;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+ cfg.setPeerClassLoadingEnabled(false);
+ cfg.setClientMode(client);
+
+ CacheConfiguration ccfg = new CacheConfiguration();
+
+ cfg.setCacheConfiguration(ccfg);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ startGridsMultiThreaded(3);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+
+ super.afterTest();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testNoUnsubscribe() throws Exception {
+ checkNoUnsubscribe(false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testNoUnsubscribeClient() throws Exception {
+ checkNoUnsubscribe(true);
+ }
+
+ /**
+ * @param client Client node flag.
+ * @throws Exception If failed.
+ */
+ private void checkNoUnsubscribe(boolean client) throws Exception {
+ cntr.set(0);
+
+ this.client = client;
+
+ try (Ignite ignite = startGrid(3)) {
+ ContinuousQuery qry = new ContinuousQuery();
+
+ qry.setLocalListener(new CacheEntryUpdatedListener() {
+ @Override public void onUpdated(Iterable evts) {
+ // No-op.
+ }
+ });
+
+ qry.setRemoteFilterFactory(FactoryBuilder.factoryOf(CacheTestRemoteFilter.class));
+
+ qry.setAutoUnsubscribe(false);
+
+ ignite.cache(null).query(qry);
+
+ ignite.cache(null).put(1, 1);
+
+ assertEquals(1, cntr.get());
+ }
+
+ this.client = false;
+
+ try (Ignite newSrv = startGrid(3)) {
+ Integer key = primaryKey(newSrv.cache(null));
+
+ newSrv.cache(null).put(key, 1);
+
+ assertEquals(2, cntr.get());
+
+ for (int i = 0; i < 10; i++)
+ ignite(0).cache(null).put(i, 1);
+
+ assertEquals(12, cntr.get());
+ }
+
+ for (int i = 10; i < 20; i++)
+ ignite(0).cache(null).put(i, 1);
+
+ assertEquals(22, cntr.get());
+ }
+
+ /**
+ *
+ */
+ public static class CacheTestRemoteFilter implements CacheEntryEventSerializableFilter<Object, Object> {
+ /** {@inheritDoc} */
+ @Override public boolean evaluate(CacheEntryEvent<?, ?> evt) throws CacheEntryListenerException {
+ cntr.incrementAndGet();
+
+ return true;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0c860ec7/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java
index 032dd3b..a865788 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java
@@ -63,6 +63,7 @@ import org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheC
import org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryClientTest;
import org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryClientTxReconnectTest;
import org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryImmutableEntryTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryNoUnsubscribeTest;
/**
* Test suite for cache queries.
@@ -121,6 +122,7 @@ public class IgniteCacheQuerySelfTestSuite3 extends TestSuite {
suite.addTestSuite(CacheKeepBinaryIterationSwapEnabledTest.class);
suite.addTestSuite(CacheKeepBinaryIterationNearEnabledTest.class);
suite.addTestSuite(IgniteCacheContinuousQueryBackupQueueTest.class);
+ suite.addTestSuite(IgniteCacheContinuousQueryNoUnsubscribeTest.class);
return suite;
}