You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2017/02/13 10:32:10 UTC
[13/31] ignite git commit: IGNITE-3867: Fixed ScanQuery ignores
pageSize property. This closes #1406.
IGNITE-3867: Fixed ScanQuery ignores pageSize property. This closes #1406.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d0c0bcec
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d0c0bcec
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d0c0bcec
Branch: refs/heads/ignite-4436-2
Commit: d0c0bcece7d8e9d373aaf13a210f6d890e5ad48b
Parents: a922ac9
Author: Andrey V. Mashenkov <an...@gmail.com>
Authored: Tue Jan 17 16:19:02 2017 +0300
Committer: Andrey V. Mashenkov <an...@gmail.com>
Committed: Tue Jan 17 16:19:02 2017 +0300
----------------------------------------------------------------------
.../processors/cache/IgniteCacheProxy.java | 3 +
.../IgniteCachePartitionedQuerySelfTest.java | 87 ++++++++++++++++++++
2 files changed, 90 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/d0c0bcec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index b9737c6..873c822 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -491,6 +491,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
qry = ctx.queries().createScanQuery(p, transformer, scanQry.getPartition(), isKeepBinary);
+ if (scanQry.getPageSize() > 0)
+ qry.pageSize(scanQry.getPageSize());
+
if (grp != null)
qry.projection(grp);
http://git-wip-us.apache.org/repos/asf/ignite/blob/d0c0bcec/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCachePartitionedQuerySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCachePartitionedQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCachePartitionedQuerySelfTest.java
index 78fd914..b9f21da 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCachePartitionedQuerySelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCachePartitionedQuerySelfTest.java
@@ -20,15 +20,28 @@ package org.apache.ignite.internal.processors.cache.distributed.near;
import java.util.Collection;
import java.util.List;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
import javax.cache.Cache;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cache.query.ScanQuery;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cache.query.SqlQuery;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
import org.apache.ignite.internal.processors.cache.IgniteCacheAbstractQuerySelfTest;
+import org.apache.ignite.internal.processors.cache.query.GridCacheQueryRequest;
+import org.apache.ignite.internal.processors.cache.query.GridCacheQueryResponse;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.communication.CommunicationSpi;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CachePeekMode.ALL;
@@ -47,6 +60,11 @@ public class IgniteCachePartitionedQuerySelfTest extends IgniteCacheAbstractQuer
return PARTITIONED;
}
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ return super.getConfiguration(gridName).setCommunicationSpi(new TestTcpCommunicationSpi());
+ }
+
/**
* @throws Exception If failed.
*/
@@ -135,4 +153,73 @@ public class IgniteCachePartitionedQuerySelfTest extends IgniteCacheAbstractQuer
assert F.asList(persons).contains(entry.getValue());
}
}
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testScanQueryPagination() throws Exception {
+ final int pageSize = 5;
+
+ final AtomicInteger pages = new AtomicInteger(0);
+
+ IgniteCache<Integer, Integer> cache = ignite().cache(null);
+
+ for (int i = 0; i < 50; i++)
+ cache.put(i, i);
+
+ CommunicationSpi spi = ignite().configuration().getCommunicationSpi();
+
+ assert spi instanceof TestTcpCommunicationSpi;
+
+ TestTcpCommunicationSpi commSpi = (TestTcpCommunicationSpi)spi;
+
+ commSpi.filter = new IgniteInClosure<Message>() {
+ @Override public void apply(Message msg) {
+ if (!(msg instanceof GridIoMessage))
+ return;
+
+ Message msg0 = ((GridIoMessage)msg).message();
+
+ if (msg0 instanceof GridCacheQueryRequest) {
+ assertEquals(pageSize, ((GridCacheQueryRequest)msg0).pageSize());
+
+ pages.incrementAndGet();
+ }
+ else if (msg0 instanceof GridCacheQueryResponse) {
+ assertTrue(((GridCacheQueryResponse)msg0).data().size() <= pageSize);
+ }
+ }
+ };
+
+ try {
+ ScanQuery<Integer, Integer> qry = new ScanQuery<Integer, Integer>();
+
+ qry.setPageSize(pageSize);
+
+ List<Cache.Entry<Integer, Integer>> all = cache.query(qry).getAll();
+
+ assertTrue(pages.get() > ignite().cluster().forDataNodes(null).nodes().size());
+
+ assertEquals(50, all.size());
+ }
+ finally {
+ commSpi.filter = null;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class TestTcpCommunicationSpi extends TcpCommunicationSpi {
+ volatile IgniteInClosure<Message> filter;
+
+ /** {@inheritDoc} */
+ @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackClosure)
+ throws IgniteSpiException {
+ if(filter != null)
+ filter.apply(msg);
+
+ super.sendMessage(node, msg, ackClosure);
+ }
+ }
}
\ No newline at end of file