You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by nt...@apache.org on 2016/02/11 19:06:59 UTC
ignite git commit: IGNITE-2604 Fixed. Added tests.
Repository: ignite
Updated Branches:
refs/heads/ignite-2604 [created] 91a5ad287
IGNITE-2604 Fixed. Added tests.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/91a5ad28
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/91a5ad28
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/91a5ad28
Branch: refs/heads/ignite-2604
Commit: 91a5ad287e6444b5797825dfc5494a3edec5a066
Parents: 35b0e6b
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Thu Feb 11 21:07:28 2016 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Thu Feb 11 21:07:28 2016 +0300
----------------------------------------------------------------------
.../continuous/CacheContinuousQueryHandler.java | 4 +-
.../continuous/CacheContinuousQueryManager.java | 21 +-
.../continuous/CacheContinuousBatchAckTest.java | 305 +++++++++++++++++++
.../IgniteCacheQuerySelfTestSuite.java | 2 +
4 files changed, 321 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/91a5ad28/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index cf9b439..686bc18 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -1034,10 +1034,10 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
Collection<ClusterNode> nodes = new HashSet<>();
for (AffinityTopologyVersion topVer : t.get2())
- nodes.addAll(ctx.discovery().cacheNodes(topVer));
+ nodes.addAll(ctx.discovery().remoteCacheNodes(cctx.name(), topVer));
for (ClusterNode node : nodes) {
- if (!node.id().equals(ctx.localNodeId())) {
+ if (!node.isClient()) {
try {
cctx.io().send(node, msg, GridIoPolicy.SYSTEM_POOL);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/91a5ad28/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index cc59989..2c45f40 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -56,6 +56,7 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
import org.apache.ignite.internal.util.typedef.CI2;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.plugin.security.SecurityPermission;
@@ -116,17 +117,19 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
// Append cache name to the topic.
topicPrefix = "CONTINUOUS_QUERY" + (cctx.name() == null ? "" : "_" + cctx.name());
- cctx.io().addHandler(cctx.cacheId(), CacheContinuousQueryBatchAck.class,
- new CI2<UUID, CacheContinuousQueryBatchAck>() {
- @Override public void apply(UUID uuid, CacheContinuousQueryBatchAck msg) {
- CacheContinuousQueryListener lsnr = lsnrs.get(msg.routineId());
+ if (!CU.clientNode(cctx.localNode())) {
+ cctx.io().addHandler(cctx.cacheId(), CacheContinuousQueryBatchAck.class,
+ new CI2<UUID, CacheContinuousQueryBatchAck>() {
+ @Override public void apply(UUID uuid, CacheContinuousQueryBatchAck msg) {
+ CacheContinuousQueryListener lsnr = lsnrs.get(msg.routineId());
- if (lsnr != null)
- lsnr.cleanupBackupQueue(msg.updateCntrs());
- }
- });
+ if (lsnr != null)
+ lsnr.cleanupBackupQueue(msg.updateCntrs());
+ }
+ });
- cctx.time().schedule(new BackupCleaner(lsnrs, cctx.kernalContext()), BACKUP_ACK_FREQ, BACKUP_ACK_FREQ);
+ cctx.time().schedule(new BackupCleaner(lsnrs, cctx.kernalContext()), BACKUP_ACK_FREQ, BACKUP_ACK_FREQ);
+ }
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/91a5ad28/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousBatchAckTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousBatchAckTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousBatchAckTest.java
new file mode 100644
index 0000000..1eafd67
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousBatchAckTest.java
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.io.Serializable;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.cache.event.CacheEntryListenerException;
+import javax.cache.event.CacheEntryUpdatedListener;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMemoryMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.util.typedef.PA;
+import org.apache.ignite.lang.IgniteRunnable;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ * Continuous queries tests.
+ */
+public class CacheContinuousBatchAckTest extends GridCommonAbstractTest implements Serializable {
+ /** IP finder. */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private static final String CLIENT = "_client";
+
+ /** */
+ private static final String SERVER = "server";
+
+ /** */
+ private static final AtomicBoolean fail = new AtomicBoolean(false);
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ if (gridName.endsWith(CLIENT)) {
+ cfg.setClientMode(true);
+
+ cfg.setCommunicationSpi(new FailedTcpCommunicationSpi(true));
+ }
+ else
+ cfg.setCommunicationSpi(new FailedTcpCommunicationSpi(false));
+
+ TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+ disco.setIpFinder(IP_FINDER);
+
+ cfg.setDiscoverySpi(disco);
+
+ ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ startGrid(SERVER);
+ startGrid("1" + CLIENT);
+ startGrid("2" + CLIENT);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ super.afterTestsStopped();
+
+ stopAllGrids();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ fail.set(false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAtomicCache() throws Exception {
+ QueryCursor query = null;
+
+ try {
+ ContinuousQuery q = new ContinuousQuery();
+
+ q.setLocalListener(new CacheEntryUpdatedListener() {
+ @Override public void onUpdated(Iterable iterable) throws CacheEntryListenerException {
+ // No-op.
+ }
+ });
+
+ IgniteCache<Object, Object> cache =
+ grid(SERVER).getOrCreateCache(cacheConfiguration(PARTITIONED, 1, ATOMIC, ONHEAP_TIERED));
+
+ query = cache.query(q);
+
+ for (int i = 0; i < 10000; i++)
+ cache.put(i, i);
+
+ assert !GridTestUtils.waitForCondition(new PA() {
+ @Override public boolean apply() {
+ return fail.get();
+ }
+ }, 2000L);
+ }
+ finally {
+
+ if (query != null)
+ query.close();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxCache() throws Exception {
+ QueryCursor query = null;
+
+ try {
+ ContinuousQuery q = new ContinuousQuery();
+
+ q.setLocalListener(new CacheEntryUpdatedListener() {
+ @Override public void onUpdated(Iterable iterable) throws CacheEntryListenerException {
+ // No-op.
+ }
+ });
+
+ IgniteCache<Object, Object> cache =
+ grid(SERVER).getOrCreateCache(cacheConfiguration(PARTITIONED, 1, TRANSACTIONAL, ONHEAP_TIERED));
+
+ query = cache.query(q);
+
+ for (int i = 0; i < 10000; i++)
+ cache.put(i, i);
+
+ assert !GridTestUtils.waitForCondition(new PA() {
+ @Override public boolean apply() {
+ return fail.get();
+ }
+ }, 2000L);
+ }
+ finally {
+ if (query != null)
+ query.close();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReplicatedCache() throws Exception {
+ QueryCursor query = null;
+
+ try {
+ ContinuousQuery q = new ContinuousQuery();
+
+ q.setLocalListener(new CacheEntryUpdatedListener() {
+ @Override public void onUpdated(Iterable iterable) throws CacheEntryListenerException {
+ // No-op.
+ }
+ });
+
+ IgniteCache<Object, Object> cache =
+ grid(SERVER).getOrCreateCache(cacheConfiguration(REPLICATED, 1, ATOMIC, ONHEAP_TIERED));
+
+ query = cache.query(q);
+
+ for (int i = 0; i < 10000; i++)
+ cache.put(i, i);
+
+ assert !GridTestUtils.waitForCondition(new PA() {
+ @Override public boolean apply() {
+ return fail.get();
+ }
+ }, 2000L);
+ }
+ finally {
+ if (query != null)
+ query.close();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReplicatedTxCache() throws Exception {
+ QueryCursor query = null;
+
+ try {
+ ContinuousQuery q = new ContinuousQuery();
+
+ q.setLocalListener(new CacheEntryUpdatedListener() {
+ @Override public void onUpdated(Iterable iterable) throws CacheEntryListenerException {
+ // No-op.
+ }
+ });
+
+ IgniteCache<Object, Object> cache =
+ grid(SERVER).getOrCreateCache(cacheConfiguration(REPLICATED, 1, ATOMIC, ONHEAP_TIERED));
+
+ query = cache.query(q);
+
+ for (int i = 0; i < 10000; i++)
+ cache.put(i, i);
+
+ assert !GridTestUtils.waitForCondition(new PA() {
+ @Override public boolean apply() {
+ return fail.get();
+ }
+ }, 2000L);
+ }
+ finally {
+ if (query != null)
+ query.close();
+ }
+ }
+
+ /**
+ *
+ * @param cacheMode Cache mode.
+ * @param backups Number of backups.
+ * @param atomicityMode Cache atomicity mode.
+ * @param memoryMode Cache memory mode.
+ * @return Cache configuration.
+ */
+ private CacheConfiguration<Object, Object> cacheConfiguration(
+ CacheMode cacheMode,
+ int backups,
+ CacheAtomicityMode atomicityMode,
+ CacheMemoryMode memoryMode) {
+ CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
+
+ ccfg.setAtomicityMode(atomicityMode);
+ ccfg.setCacheMode(cacheMode);
+ ccfg.setMemoryMode(memoryMode);
+ ccfg.setWriteSynchronizationMode(FULL_SYNC);
+
+ if (cacheMode == PARTITIONED)
+ ccfg.setBackups(backups);
+
+ return ccfg;
+ }
+
+ /**
+ *
+ */
+ protected static class FailedTcpCommunicationSpi extends TcpCommunicationSpi {
+ /** */
+ private boolean check;
+
+ /**
+ * @param check Check inbound message.
+ */
+ public FailedTcpCommunicationSpi(boolean check) {
+ this.check = check;
+ }
+
+ @Override protected void notifyListener(UUID sndId, Message msg, IgniteRunnable msgC) {
+ if (check) {
+ if (msg instanceof GridIoMessage &&
+ ((GridIoMessage)msg).message() instanceof CacheContinuousQueryBatchAck)
+ fail.set(true);
+ }
+
+ super.notifyListener(sndId, msg, msgC);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/91a5ad28/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
index 3cd4579..59e5c89 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
@@ -70,6 +70,7 @@ import org.apache.ignite.internal.processors.cache.local.IgniteCacheLocalAtomicQ
import org.apache.ignite.internal.processors.cache.local.IgniteCacheLocalFieldsQuerySelfTest;
import org.apache.ignite.internal.processors.cache.local.IgniteCacheLocalQuerySelfTest;
import org.apache.ignite.internal.processors.cache.query.GridCacheSwapScanQuerySelfTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousBatchAckTest;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverAtomicPrimaryWriteOrderOffheapTieredTest;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverAtomicPrimaryWriteOrderSelfTest;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverAtomicReplicatedSelfTest;
@@ -209,6 +210,7 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite {
suite.addTestSuite(CacheContinuousQueryFailoverAtomicPrimaryWriteOrderOffheapTieredTest.class);
suite.addTestSuite(CacheContinuousQueryFailoverTxOffheapTieredTest.class);
suite.addTestSuite(CacheContinuousQueryRandomOperationsTest.class);
+ suite.addTestSuite(CacheContinuousBatchAckTest.class);
// Reduce fields queries.
suite.addTestSuite(GridCacheReduceFieldsQueryLocalSelfTest.class);