You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/06/17 12:38:39 UTC

[10/43] ignite git commit: IGNITE-3272 Fixed "Memory consumption in ContinuousQueryHandler".

IGNITE-3272 Fixed "Memory consumption in ContinuousQueryHandler".


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5f446720
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5f446720
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5f446720

Branch: refs/heads/ignite-3335
Commit: 5f44672053e548a6e2cff881ac57d064b60066c7
Parents: 98a0990
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Tue Jun 14 18:17:33 2016 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Tue Jun 14 18:17:33 2016 +0300

----------------------------------------------------------------------
 .../continuous/CacheContinuousQueryEntry.java   |  16 +++
 .../continuous/CacheContinuousQueryHandler.java |   2 +-
 ...niteCacheContinuousQueryBackupQueueTest.java | 135 +++++++++++++++++++
 .../IgniteCacheQuerySelfTestSuite3.java         |   2 +
 4 files changed, 154 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/5f446720/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
index 63dc4cb..74f930a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
@@ -200,6 +200,22 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
     }
 
     /**
+     * @return If entry filtered then will return light-weight <i><b>new entry</b></i> without values and key
+     * (avoid to huge memory consumption), otherwise {@code this}.
+     */
+    CacheContinuousQueryEntry forBackupQueue() {
+        if (!isFiltered())
+            return this;
+
+        CacheContinuousQueryEntry e =
+            new CacheContinuousQueryEntry(cacheId, evtType, null, null, null, keepBinary, part, updateCntr, topVer);
+
+        e.flags = flags;
+
+        return e;
+    }
+
+    /**
      * @return {@code True} if entry sent by backup node.
      */
     boolean isBackup() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/5f446720/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index d0a3722..fc38eba 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -740,7 +740,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
         if (!primary && !internal && entry.updateCounter() != -1L /* Skip init query and expire entries */) {
             entry.markBackup();
 
-            backupQueue.add(entry);
+            backupQueue.add(entry.forBackupQueue());
         }
 
         return notify;

http://git-wip-us.apache.org/repos/asf/ignite/blob/5f446720/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryBackupQueueTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryBackupQueueTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryBackupQueueTest.java
new file mode 100644
index 0000000..aea1954
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryBackupQueueTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.io.Serializable;
+import java.util.concurrent.TimeUnit;
+import javax.cache.configuration.Factory;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryEventFilter;
+import javax.cache.event.CacheEntryUpdatedListener;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+public class IgniteCacheContinuousQueryBackupQueueTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** Keys count. */
+    private static final int KEYS_COUNT = 1024;
+
+    /** Grid count. */
+    private static final int GRID_COUNT = 2;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        CacheConfiguration ccfg = new CacheConfiguration();
+        ccfg.setCacheMode(PARTITIONED);
+        ccfg.setAtomicityMode(ATOMIC);
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+        ccfg.setBackups(1);
+
+        cfg.setCacheConfiguration(ccfg);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return TimeUnit.MINUTES.toMillis(2);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testBackupQueue() throws Exception {
+        startGridsMultiThreaded(GRID_COUNT);
+
+        final CacheEventListener lsnr = new CacheEventListener();
+
+        ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+        qry.setLocalListener(lsnr);
+        qry.setRemoteFilterFactory(new FilterFactory());
+
+        try (QueryCursor<?> ignore = grid(0).cache(null).query(qry)) {
+            for (int i = 0; i < KEYS_COUNT; i++) {
+                log.info("Put key: " + i);
+
+                for (int j = 0; j < 100; j++)
+                    grid(i % GRID_COUNT).cache(null).put(i, new byte[1024 * 50]);
+            }
+
+            log.info("Finish.");
+        }
+    }
+
+    /**
+     *
+     */
+    private static class FilterFactory implements Factory<CacheEntryEventFilter<Object, Object>> {
+        /** {@inheritDoc} */
+        @Override public CacheEntryEventFilter<Object, Object> create() {
+            return new CacheEventFilter();
+        }
+    }
+
+    /**
+     *
+     */
+    private static class CacheEventFilter implements CacheEntryEventFilter<Object, Object>, Serializable {
+        /** {@inheritDoc} */
+        @Override public boolean evaluate(CacheEntryEvent<?, ?> evt) {
+            return false;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class CacheEventListener implements CacheEntryUpdatedListener<Object, Object> {
+        /** {@inheritDoc} */
+        @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) {
+            fail();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5f446720/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java
index dbef1fb..5afce19 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java
@@ -55,6 +55,7 @@ import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheCon
 import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryTxOffheapTieredTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryTxOffheapValuesTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryTxSelfTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryBackupQueueTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryClientReconnectTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryClientTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryClientTxReconnectTest;
@@ -117,6 +118,7 @@ public class IgniteCacheQuerySelfTestSuite3 extends TestSuite {
         suite.addTestSuite(CacheKeepBinaryIterationStoreEnabledTest.class);
         suite.addTestSuite(CacheKeepBinaryIterationSwapEnabledTest.class);
         suite.addTestSuite(CacheKeepBinaryIterationNearEnabledTest.class);
+        suite.addTestSuite(IgniteCacheContinuousQueryBackupQueueTest.class);
 
         return suite;
     }