You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/06/17 08:54:19 UTC
[10/41] ignite git commit: IGNITE-3272 Fixed "Memory consumption in
ContinuousQueryHandler".
IGNITE-3272 Fixed "Memory consumption in ContinuousQueryHandler".
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5f446720
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5f446720
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5f446720
Branch: refs/heads/ignite-3331
Commit: 5f44672053e548a6e2cff881ac57d064b60066c7
Parents: 98a0990
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Tue Jun 14 18:17:33 2016 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Tue Jun 14 18:17:33 2016 +0300
----------------------------------------------------------------------
.../continuous/CacheContinuousQueryEntry.java | 16 +++
.../continuous/CacheContinuousQueryHandler.java | 2 +-
...niteCacheContinuousQueryBackupQueueTest.java | 135 +++++++++++++++++++
.../IgniteCacheQuerySelfTestSuite3.java | 2 +
4 files changed, 154 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/5f446720/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
index 63dc4cb..74f930a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
@@ -200,6 +200,22 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
}
/**
+ * @return If entry filtered then will return light-weight <i><b>new entry</b></i> without values and key
+ * (avoid to huge memory consumption), otherwise {@code this}.
+ */
+ CacheContinuousQueryEntry forBackupQueue() {
+ if (!isFiltered())
+ return this;
+
+ CacheContinuousQueryEntry e =
+ new CacheContinuousQueryEntry(cacheId, evtType, null, null, null, keepBinary, part, updateCntr, topVer);
+
+ e.flags = flags;
+
+ return e;
+ }
+
+ /**
* @return {@code True} if entry sent by backup node.
*/
boolean isBackup() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/5f446720/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index d0a3722..fc38eba 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -740,7 +740,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
if (!primary && !internal && entry.updateCounter() != -1L /* Skip init query and expire entries */) {
entry.markBackup();
- backupQueue.add(entry);
+ backupQueue.add(entry.forBackupQueue());
}
return notify;
http://git-wip-us.apache.org/repos/asf/ignite/blob/5f446720/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryBackupQueueTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryBackupQueueTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryBackupQueueTest.java
new file mode 100644
index 0000000..aea1954
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryBackupQueueTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.io.Serializable;
+import java.util.concurrent.TimeUnit;
+import javax.cache.configuration.Factory;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryEventFilter;
+import javax.cache.event.CacheEntryUpdatedListener;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+public class IgniteCacheContinuousQueryBackupQueueTest extends GridCommonAbstractTest {
+ /** */
+ private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** Keys count. */
+ private static final int KEYS_COUNT = 1024;
+
+ /** Grid count. */
+ private static final int GRID_COUNT = 2;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ CacheConfiguration ccfg = new CacheConfiguration();
+ ccfg.setCacheMode(PARTITIONED);
+ ccfg.setAtomicityMode(ATOMIC);
+ ccfg.setWriteSynchronizationMode(FULL_SYNC);
+ ccfg.setBackups(1);
+
+ cfg.setCacheConfiguration(ccfg);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected long getTestTimeout() {
+ return TimeUnit.MINUTES.toMillis(2);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testBackupQueue() throws Exception {
+ startGridsMultiThreaded(GRID_COUNT);
+
+ final CacheEventListener lsnr = new CacheEventListener();
+
+ ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+ qry.setLocalListener(lsnr);
+ qry.setRemoteFilterFactory(new FilterFactory());
+
+ try (QueryCursor<?> ignore = grid(0).cache(null).query(qry)) {
+ for (int i = 0; i < KEYS_COUNT; i++) {
+ log.info("Put key: " + i);
+
+ for (int j = 0; j < 100; j++)
+ grid(i % GRID_COUNT).cache(null).put(i, new byte[1024 * 50]);
+ }
+
+ log.info("Finish.");
+ }
+ }
+
+ /**
+ *
+ */
+ private static class FilterFactory implements Factory<CacheEntryEventFilter<Object, Object>> {
+ /** {@inheritDoc} */
+ @Override public CacheEntryEventFilter<Object, Object> create() {
+ return new CacheEventFilter();
+ }
+ }
+
+ /**
+ *
+ */
+ private static class CacheEventFilter implements CacheEntryEventFilter<Object, Object>, Serializable {
+ /** {@inheritDoc} */
+ @Override public boolean evaluate(CacheEntryEvent<?, ?> evt) {
+ return false;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class CacheEventListener implements CacheEntryUpdatedListener<Object, Object> {
+ /** {@inheritDoc} */
+ @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) {
+ fail();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5f446720/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java
index dbef1fb..5afce19 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java
@@ -55,6 +55,7 @@ import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheCon
import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryTxOffheapTieredTest;
import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryTxOffheapValuesTest;
import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryTxSelfTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryBackupQueueTest;
import org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryClientReconnectTest;
import org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryClientTest;
import org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryClientTxReconnectTest;
@@ -117,6 +118,7 @@ public class IgniteCacheQuerySelfTestSuite3 extends TestSuite {
suite.addTestSuite(CacheKeepBinaryIterationStoreEnabledTest.class);
suite.addTestSuite(CacheKeepBinaryIterationSwapEnabledTest.class);
suite.addTestSuite(CacheKeepBinaryIterationNearEnabledTest.class);
+ suite.addTestSuite(IgniteCacheContinuousQueryBackupQueueTest.class);
return suite;
}