You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/02/13 11:29:53 UTC

[13/40] ignite git commit: IGNITE-3867: Fixed ScanQuery ignores pageSize property. This closes #1406.

IGNITE-3867: Fixed ScanQuery ignores pageSize property. This closes #1406.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d0c0bcec
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d0c0bcec
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d0c0bcec

Branch: refs/heads/ignite-2.0
Commit: d0c0bcece7d8e9d373aaf13a210f6d890e5ad48b
Parents: a922ac9
Author: Andrey V. Mashenkov <an...@gmail.com>
Authored: Tue Jan 17 16:19:02 2017 +0300
Committer: Andrey V. Mashenkov <an...@gmail.com>
Committed: Tue Jan 17 16:19:02 2017 +0300

----------------------------------------------------------------------
 .../processors/cache/IgniteCacheProxy.java      |  3 +
 .../IgniteCachePartitionedQuerySelfTest.java    | 87 ++++++++++++++++++++
 2 files changed, 90 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/d0c0bcec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index b9737c6..873c822 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -491,6 +491,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
 
         qry = ctx.queries().createScanQuery(p, transformer, scanQry.getPartition(), isKeepBinary);
 
+        if (scanQry.getPageSize() > 0)
+            qry.pageSize(scanQry.getPageSize());
+
         if (grp != null)
             qry.projection(grp);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/d0c0bcec/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCachePartitionedQuerySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCachePartitionedQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCachePartitionedQuerySelfTest.java
index 78fd914..b9f21da 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCachePartitionedQuerySelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCachePartitionedQuerySelfTest.java
@@ -20,15 +20,28 @@ package org.apache.ignite.internal.processors.cache.distributed.near;
 import java.util.Collection;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
 import javax.cache.Cache;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cache.query.ScanQuery;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.cache.query.SqlQuery;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
 import org.apache.ignite.internal.processors.cache.IgniteCacheAbstractQuerySelfTest;
+import org.apache.ignite.internal.processors.cache.query.GridCacheQueryRequest;
+import org.apache.ignite.internal.processors.cache.query.GridCacheQueryResponse;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.communication.CommunicationSpi;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
 
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
 import static org.apache.ignite.cache.CachePeekMode.ALL;
@@ -47,6 +60,11 @@ public class IgniteCachePartitionedQuerySelfTest extends IgniteCacheAbstractQuer
         return PARTITIONED;
     }
 
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        return super.getConfiguration(gridName).setCommunicationSpi(new TestTcpCommunicationSpi());
+    }
+
     /**
      * @throws Exception If failed.
      */
@@ -135,4 +153,73 @@ public class IgniteCachePartitionedQuerySelfTest extends IgniteCacheAbstractQuer
             assert F.asList(persons).contains(entry.getValue());
         }
     }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testScanQueryPagination() throws Exception {
+        final int pageSize = 5;
+
+        final AtomicInteger pages = new AtomicInteger(0);
+
+        IgniteCache<Integer, Integer> cache = ignite().cache(null);
+
+        for (int i = 0; i < 50; i++)
+            cache.put(i, i);
+
+        CommunicationSpi spi = ignite().configuration().getCommunicationSpi();
+
+        assert spi instanceof TestTcpCommunicationSpi;
+
+        TestTcpCommunicationSpi commSpi = (TestTcpCommunicationSpi)spi;
+
+        commSpi.filter = new IgniteInClosure<Message>() {
+            @Override public void apply(Message msg) {
+                if (!(msg instanceof GridIoMessage))
+                    return;
+
+                Message msg0 = ((GridIoMessage)msg).message();
+
+                if (msg0 instanceof GridCacheQueryRequest) {
+                    assertEquals(pageSize, ((GridCacheQueryRequest)msg0).pageSize());
+
+                    pages.incrementAndGet();
+                }
+                else if (msg0 instanceof GridCacheQueryResponse) {
+                    assertTrue(((GridCacheQueryResponse)msg0).data().size() <= pageSize);
+                }
+            }
+        };
+
+        try {
+            ScanQuery<Integer, Integer> qry = new ScanQuery<Integer, Integer>();
+
+            qry.setPageSize(pageSize);
+
+            List<Cache.Entry<Integer, Integer>> all = cache.query(qry).getAll();
+
+            assertTrue(pages.get() > ignite().cluster().forDataNodes(null).nodes().size());
+
+            assertEquals(50, all.size());
+        }
+        finally {
+            commSpi.filter = null;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class TestTcpCommunicationSpi extends TcpCommunicationSpi {
+        volatile IgniteInClosure<Message> filter;
+
+        /** {@inheritDoc} */
+        @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackClosure)
+            throws IgniteSpiException {
+            if(filter != null)
+                filter.apply(msg);
+
+            super.sendMessage(node, msg, ackClosure);
+        }
+    }
 }
\ No newline at end of file