You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ti...@apache.org on 2022/08/10 14:39:16 UTC

[ignite] branch master updated: IGNITE-17455 Add setPartition() for IndexQuery (#10182)

This is an automated email from the ASF dual-hosted git repository.

timoninmaxim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new a6e597351b7 IGNITE-17455 Add setPartition() for IndexQuery (#10182)
a6e597351b7 is described below

commit a6e597351b7c9481260985b9c4656c5550525824
Author: Maksim Timonin <ti...@gmail.com>
AuthorDate: Wed Aug 10 17:39:09 2022 +0300

    IGNITE-17455 Add setPartition() for IndexQuery (#10182)
---
 .../org/apache/ignite/cache/query/IndexQuery.java  |  29 +++
 .../query/GridCacheDistributedQueryFuture.java     |  19 ++
 .../cache/query/GridCacheQueryAdapter.java         |   3 +
 .../cache/query/GridCacheQueryManager.java         |  30 ++-
 .../cache/query/IndexQueryPartitionTest.java       | 275 +++++++++++++++++++++
 .../ignite/cache/query/IndexQueryTestSuite.java    |   1 +
 6 files changed, 354 insertions(+), 3 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/IndexQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/IndexQuery.java
index 1c059eb7480..2820b8449a7 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/query/IndexQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/query/IndexQuery.java
@@ -53,6 +53,9 @@ public final class IndexQuery<K, V> extends Query<Cache.Entry<K, V>> {
     /** Index query criteria. */
     private @Nullable List<IndexQueryCriterion> criteria;
 
+    /** Partition to run IndexQuery over. */
+    private @Nullable Integer part;
+
     /**
      * Specify index with cache value class.
      *
@@ -172,6 +175,32 @@ public final class IndexQuery<K, V> extends Query<Cache.Entry<K, V>> {
         return filter;
     }
 
+    /**
+     * Sets partition number over which this query should iterate. If {@code null}, query will iterate over
+     * all partitions in the cache. Must be in the range [0, N) where N is partition number in the cache.
+     *
+     * @param part Partition number over which this query should iterate.
+     * @return {@code this} for chaining.
+     */
+    public IndexQuery<K, V> setPartition(@Nullable Integer part) {
+        A.ensure(part == null || part >= 0,
+            "Specified partition must be in the range [0, N) where N is partition number in the cache.");
+
+        this.part = part;
+
+        return this;
+    }
+
+    /**
+     * Gets partition number over which this query should iterate. Will return {@code null} if partition was not
+     * set. In this case query will iterate over all partitions in the cache.
+     *
+     * @return Partition number or {@code null}.
+     */
+    @Nullable public Integer getPartition() {
+        return part;
+    }
+
     /** */
     private void validateAndSetCriteria(List<IndexQueryCriterion> criteria) {
         if (F.isEmpty(criteria))
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java
index 37c2e1370e7..4a1e48c4321 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java
@@ -87,6 +87,9 @@ public class GridCacheDistributedQueryFuture<K, V, R> extends GridCacheQueryFutu
 
         qryMgr = (GridCacheDistributedQueryManager<K, V>)ctx.queries();
 
+        if (qry.query().partition() != null)
+            nodes = Collections.singletonList(node(nodes));
+
         streams = new ConcurrentHashMap<>(nodes.size());
 
         for (ClusterNode node : nodes) {
@@ -108,6 +111,22 @@ public class GridCacheDistributedQueryFuture<K, V, R> extends GridCacheQueryFutu
         }
     }
 
+    /**
+     * @return Nodes for query execution.
+     */
+    private ClusterNode node(Collection<ClusterNode> nodes) {
+        ClusterNode rmtNode = null;
+
+        for (ClusterNode node : nodes) {
+            if (node.isLocal())
+                return node;
+
+            rmtNode = node;
+        }
+
+        return rmtNode;
+    }
+
     /** {@inheritDoc} */
     @Override protected void cancelQuery(Throwable err) {
         firstPageLatch.countDown();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
index bf6a83582b7..6071b8e33a1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
@@ -302,6 +302,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
      * @param cctx Context.
      * @param type Query type.
      * @param idxQryDesc Index query descriptor.
+     * @param part Partition number to iterate over.
      * @param clsName Class name.
      * @param filter Index query remote filter.
      */
@@ -309,6 +310,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
         GridCacheContext<?, ?> cctx,
         GridCacheQueryType type,
         IndexQueryDesc idxQryDesc,
+        @Nullable Integer part,
         @Nullable String clsName,
         @Nullable IgniteBiPredicate<Object, Object> filter
     ) {
@@ -317,6 +319,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
         this.clsName = clsName;
         this.idxQryDesc = idxQryDesc;
         this.filter = filter;
+        this.part = part;
 
         log = cctx.logger(getClass());
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index 0dbcac6d72e..a4bf962ec9a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -629,8 +629,13 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
                     break;
 
                 case INDEX:
+                    int[] parts = null;
+
+                    if (qry.partition() != null)
+                        parts = new int[]{qry.partition()};
+
                     IndexQueryResult<K, V> idxQryRes = qryProc.queryIndex(cacheName, qry.queryClassName(), qry.idxQryDesc(),
-                        qry.scanFilter(), filter(qry), qry.keepBinary());
+                        qry.scanFilter(), filter(qry, parts, parts != null), qry.keepBinary());
 
                     iter = idxQryRes.iter();
                     res.metadata(idxQryRes.metadata());
@@ -2020,10 +2025,20 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
      * @return Filter.
      */
     private IndexingQueryFilter filter(GridCacheQueryAdapter<?> qry) {
+        return filter(qry, null, false);
+    }
+
+    /**
+     * @param qry Query.
+     * @param partsArr Array of partitions to apply specified query.
+     * @param treatReplicatedAsPartitioned If true, only primary partitions of replicated caches will be used.
+     * @return Filter.
+     */
+    private IndexingQueryFilter filter(GridCacheQueryAdapter<?> qry, @Nullable int[] partsArr, boolean treatReplicatedAsPartitioned) {
         if (qry.includeBackups())
             return null;
 
-        return new IndexingQueryFilterImpl(cctx.kernalContext(), AffinityTopologyVersion.NONE, null);
+        return new IndexingQueryFilterImpl(cctx.kernalContext(), AffinityTopologyVersion.NONE, partsArr, treatReplicatedAsPartitioned);
     }
 
     /**
@@ -2912,9 +2927,18 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
      * @return Created query.
      */
     public <R> CacheQuery<R> createIndexQuery(IndexQuery qry, boolean keepBinary) {
+        if (qry.getPartition() != null) {
+            int part = qry.getPartition();
+
+            A.ensure(part >= 0 && part < cctx.affinity().partitions(),
+                "Specified partition must be in the range [0, N) where N is partition number in the cache.");
+        }
+
         IndexQueryDesc desc = new IndexQueryDesc(qry.getCriteria(), qry.getIndexName(), qry.getValueType());
 
-        GridCacheQueryAdapter q = new GridCacheQueryAdapter<>(cctx, INDEX, desc, qry.getValueType(), qry.getFilter());
+        GridCacheQueryAdapter q = new GridCacheQueryAdapter<>(
+            cctx, INDEX, desc, qry.getPartition(), qry.getValueType(), qry.getFilter());
+
         q.keepBinary(keepBinary);
 
         return q;
diff --git a/modules/indexing/src/test/java/org/apache/ignite/cache/query/IndexQueryPartitionTest.java b/modules/indexing/src/test/java/org/apache/ignite/cache/query/IndexQueryPartitionTest.java
new file mode 100644
index 00000000000..df91fea6e0d
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/cache/query/IndexQueryPartitionTest.java
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.query;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import javax.cache.Cache;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.query.GridCacheQueryRequest;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/** */
+@RunWith(Parameterized.class)
+public class IndexQueryPartitionTest extends GridCommonAbstractTest {
+    /** */
+    @Parameterized.Parameter
+    public CacheMode cacheMode;
+
+    /** */
+    @Parameterized.Parameter(1)
+    public boolean client;
+
+    /** */
+    private static Map<Integer, Person> data;
+
+    /** */
+    @Parameterized.Parameters(name = "mode={0}, client={1}")
+    public static List<Object[]> params() {
+        return F.asList(
+            new Object[]{ CacheMode.PARTITIONED, false },
+            new Object[]{ CacheMode.PARTITIONED, true },
+            new Object[]{ CacheMode.REPLICATED, false },
+            new Object[]{ CacheMode.REPLICATED, true }
+        );
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        CacheConfiguration<Integer, Person> ccfg = new CacheConfiguration<Integer, Person>()
+            .setName("CACHE")
+            .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
+            .setCacheMode(cacheMode)
+            .setIndexedTypes(Integer.class, Person.class)
+            .setAffinity(new RendezvousAffinityFunction().setPartitions(100));
+
+        cfg.setCacheConfiguration(ccfg);
+
+        cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        startGrids(3);
+
+        startClientGrid(3);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+    }
+
+    /** */
+    @Test
+    public void testSinglePartition() {
+        load();
+
+        for (int part = 0; part < 100; part++) {
+            Map<Integer, Person> expRes = expect(part);
+
+            IndexQuery<Integer, Person> qry = new IndexQuery<Integer, Person>(Person.class)
+                .setPartition(part);
+
+            TestRecordingCommunicationSpi.spi(grid()).record(GridCacheQueryRequest.class);
+
+            QueryCursor<Cache.Entry<Integer, Person>> cursor = grid().cache("CACHE").query(qry);
+
+            for (Cache.Entry<Integer, Person> e: cursor) {
+                Person p = expRes.remove(e.getKey());
+
+                assertEquals(e.getKey().toString(), p, e.getValue());
+            }
+
+            assertTrue(expRes.isEmpty());
+
+            // Send request to single node only.
+            int sendReq = 1;
+
+            if (!client) {
+                if (cacheMode == CacheMode.REPLICATED)
+                    sendReq = 0;
+                else {
+                    ClusterNode primNode = grid().affinity("CACHE").mapPartitionToNode(part);
+
+                    if (grid().localNode().equals(primNode))
+                        sendReq = 0;
+                }
+            }
+
+            assertEquals(sendReq, TestRecordingCommunicationSpi.spi(grid()).recordedMessages(true).size());
+        }
+    }
+
+    /** */
+    @Test
+    public void testSetNullNotAffect() {
+        try (IgniteDataStreamer<Integer, Person> dataStreamer = grid().dataStreamer("CACHE")) {
+            Random rnd = new Random();
+
+            for (int i = 0; i < 10_000; i++)
+                dataStreamer.addData(i, new Person(rnd.nextInt()));
+        }
+
+        IndexQuery<Integer, Person> qry = new IndexQuery<Integer, Person>(Person.class)
+            .setPartition(0);
+
+        assertTrue(grid().cache("CACHE").query(qry).getAll().size() < 10_000);
+
+        qry = new IndexQuery<Integer, Person>(Person.class)
+            .setPartition(null);
+
+        assertTrue(grid().cache("CACHE").query(qry).getAll().size() == 10_000);
+    }
+
+    /** */
+    @Test
+    public void testLocalWithPartition() {
+        load();
+
+        for (int part = 0; part < 100; part++) {
+            IndexQuery<Integer, Person> qry = new IndexQuery<Integer, Person>(Person.class)
+                .setPartition(part);
+
+            qry.setLocal(true);
+
+            boolean fail = client || (
+                    cacheMode == CacheMode.PARTITIONED
+                    && !grid().affinity("CACHE").mapPartitionToNode(part).equals(grid().localNode())
+                );
+
+            if (fail) {
+                GridTestUtils.assertThrows(null, () -> grid().cache("CACHE").query(qry).getAll(),
+                    IgniteException.class,
+                    "Cluster group is empty.");
+            }
+            else
+                assertTrue(!grid().cache("CACHE").query(qry).getAll().isEmpty());
+        }
+    }
+
+    /** */
+    @Test
+    public void testNegativePartitionFails() {
+        GridTestUtils.assertThrows(null, () -> new IndexQuery<Integer, Person>(Person.class).setPartition(-1),
+            IllegalArgumentException.class,
+            "Specified partition must be in the range [0, N) where N is partition number in the cache.");
+
+        GridTestUtils.assertThrows(null, () -> new IndexQuery<Integer, Person>(Person.class).setPartition(-23),
+            IllegalArgumentException.class,
+            "Specified partition must be in the range [0, N) where N is partition number in the cache.");
+
+        GridTestUtils.assertThrows(null, () -> {
+            IndexQuery qry = new IndexQuery<Integer, Person>(Person.class).setPartition(1000);
+
+            grid().cache("CACHE").query(qry);
+        },
+            IgniteException.class,
+            "Specified partition must be in the range [0, N) where N is partition number in the cache.");
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteEx grid() {
+        IgniteEx grid = client ? grid(3) : grid(0);
+
+        assert (client && grid(0).localNode().isClient()) || !grid(0).localNode().isClient();
+
+        return grid;
+    }
+
+    /** */
+    private void load() {
+        data = new HashMap<>();
+
+        try (IgniteDataStreamer<Integer, Person> dataStreamer = grid(0).dataStreamer("CACHE")) {
+            Random rnd = new Random();
+
+            for (int i = 0; i < 10_000; i++) {
+                Person p = new Person(rnd.nextInt());
+
+                data.put(i, p);
+                dataStreamer.addData(i, p);
+            }
+        }
+    }
+
+    /** */
+    private Map<Integer, Person> expect(int part) {
+        Map<Integer, Person> exp = new HashMap<>();
+
+        for (Integer key: data.keySet()) {
+            int p = grid(0).affinity("CACHE").partition(key);
+
+            if (p == part)
+                exp.put(key, data.get(key));
+        }
+
+        return exp;
+    }
+
+    /** */
+    private static class Person {
+        /** */
+        @GridToStringInclude
+        @QuerySqlField(index = true)
+        private final int fld;
+
+        /** */
+        Person(int fld) {
+            this.fld = fld;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(Person.class, this);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            return fld == ((Person)o).fld;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return fld;
+        }
+    }
+}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/cache/query/IndexQueryTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/cache/query/IndexQueryTestSuite.java
index c8982e0a6f7..c9b0d905bc9 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/cache/query/IndexQueryTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/cache/query/IndexQueryTestSuite.java
@@ -35,6 +35,7 @@ import org.junit.runners.Suite;
     IndexQueryAliasTest.class,
     IndexQuerySqlIndexTest.class,
     IndexQueryRangeTest.class,
+    IndexQueryPartitionTest.class,
     IndexQueryCacheKeyValueFieldsTest.class,
     IndexQueryCacheKeyValueEscapedFieldsTest.class,
     IndexQueryWrongIndexTest.class,