You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ti...@apache.org on 2022/08/12 16:22:29 UTC
[ignite] branch master updated: IGNITE-17511 Java ThinClient supports IndexQuery (#10188)
This is an automated email from the ASF dual-hosted git repository.
timoninmaxim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new a9d49f2ad76 IGNITE-17511 Java ThinClient supports IndexQuery (#10188)
a9d49f2ad76 is described below
commit a9d49f2ad761cb014c0bb27f9a806004dbf014a2
Author: Maksim Timonin <ti...@gmail.com>
AuthorDate: Fri Aug 12 19:22:20 2022 +0300
IGNITE-17511 Java ThinClient supports IndexQuery (#10188)
---
.../apache/ignite/client/ClientOperationType.java | 5 +
.../internal/client/thin/ClientOperation.java | 9 +
.../internal/client/thin/TcpClientCache.java | 73 +++++
.../platform/client/ClientMessageParser.java | 12 +
.../client/cache/ClientCacheIndexQueryRequest.java | 145 +++++++++
.../org/apache/ignite/client/ReliabilityTest.java | 4 +-
.../ignite/cache/query/IndexQueryTestSuite.java | 3 +-
.../cache/query/ThinClientIndexQueryTest.java | 359 +++++++++++++++++++++
8 files changed, 607 insertions(+), 3 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/client/ClientOperationType.java b/modules/core/src/main/java/org/apache/ignite/client/ClientOperationType.java
index b8fabd8e213..17a64b27c26 100644
--- a/modules/core/src/main/java/org/apache/ignite/client/ClientOperationType.java
+++ b/modules/core/src/main/java/org/apache/ignite/client/ClientOperationType.java
@@ -166,6 +166,11 @@ public enum ClientOperationType {
*/
QUERY_CONTINUOUS,
+ /**
+ * Index query ({@link ClientCache#query(Query)}).
+ */
+ QUERY_INDEX,
+
/**
* Start transaction ({@link ClientTransactions#txStart}).
*/
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientOperation.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientOperation.java
index c3c6491d3fc..ddd12c1bd47 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientOperation.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientOperation.java
@@ -150,6 +150,12 @@ public enum ClientOperation {
/** Continuous query event. */
QUERY_CONTINUOUS_EVENT(2007, ClientNotificationType.CONTINUOUS_QUERY_EVENT),
+ /** Index query event. */
+ QUERY_INDEX(2008),
+
+ /** Query index cursor get page. */
+ QUERY_INDEX_CURSOR_GET_PAGE(2009),
+
/** Get binary type name. */
GET_BINARY_TYPE_NAME(3000),
@@ -395,6 +401,9 @@ public enum ClientOperation {
case QUERY_CONTINUOUS:
return ClientOperationType.QUERY_CONTINUOUS;
+ case QUERY_INDEX:
+ return ClientOperationType.QUERY_INDEX;
+
case TX_START:
return ClientOperationType.TRANSACTION_START;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java
index 9b86e1c4efb..69af7682ce6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java
@@ -33,6 +33,8 @@ import javax.cache.expiry.ExpiryPolicy;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.cache.query.ContinuousQuery;
import org.apache.ignite.cache.query.FieldsQueryCursor;
+import org.apache.ignite.cache.query.IndexQuery;
+import org.apache.ignite.cache.query.IndexQueryCriterion;
import org.apache.ignite.cache.query.Query;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cache.query.ScanQuery;
@@ -44,9 +46,12 @@ import org.apache.ignite.client.ClientDisconnectListener;
import org.apache.ignite.client.ClientException;
import org.apache.ignite.client.ClientFeatureNotSupportedByServerException;
import org.apache.ignite.client.IgniteClientFuture;
+import org.apache.ignite.internal.binary.BinaryRawWriterEx;
+import org.apache.ignite.internal.binary.BinaryWriterExImpl;
import org.apache.ignite.internal.binary.GridBinaryMarshaller;
import org.apache.ignite.internal.binary.streams.BinaryInputStream;
import org.apache.ignite.internal.binary.streams.BinaryOutputStream;
+import org.apache.ignite.internal.cache.query.RangeIndexQueryCriterion;
import org.apache.ignite.internal.client.thin.TcpClientTransactions.TcpClientTransaction;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.typedef.T2;
@@ -54,6 +59,7 @@ import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.Nullable;
+import static org.apache.ignite.internal.binary.GridBinaryMarshaller.ARR_LIST;
import static org.apache.ignite.internal.client.thin.ProtocolVersionFeature.EXPIRY_POLICY;
import static org.apache.ignite.internal.processors.platform.cache.expiry.PlatformExpiryPolicy.convertDuration;
@@ -747,6 +753,8 @@ public class TcpClientCache<K, V> implements ClientCache<K, V> {
res = (QueryCursor<R>)query((SqlFieldsQuery)qry);
else if (qry instanceof ContinuousQuery)
res = query((ContinuousQuery<K, V>)qry, null);
+ else if (qry instanceof IndexQuery)
+ res = indexQuery((IndexQuery)qry);
else
throw new IllegalArgumentException(
String.format("Query of type [%s] is not supported", qry.getClass().getSimpleName())
@@ -947,6 +955,71 @@ public class TcpClientCache<K, V> implements ClientCache<K, V> {
));
}
+ /** Handle index query. */
+ private QueryCursor<Cache.Entry<K, V>> indexQuery(IndexQuery<K, V> qry) {
+ Consumer<PayloadOutputChannel> qryWriter = payloadCh -> {
+ writeCacheInfo(payloadCh);
+
+ BinaryOutputStream out = payloadCh.out();
+
+ try (BinaryRawWriterEx w = new BinaryWriterExImpl(marsh.context(), out, null, null)) {
+ w.writeInt(qry.getPageSize());
+ w.writeBoolean(qry.isLocal());
+ w.writeInt(qry.getPartition() == null ? -1 : qry.getPartition());
+
+ w.writeString(qry.getValueType());
+ w.writeString(qry.getIndexName());
+
+ if (qry.getCriteria() != null) {
+ out.writeByte(ARR_LIST);
+ out.writeInt(qry.getCriteria().size());
+
+ for (IndexQueryCriterion c: qry.getCriteria()) {
+ if (c instanceof RangeIndexQueryCriterion) {
+ out.writeByte((byte)0); // Criterion type.
+
+ RangeIndexQueryCriterion range = (RangeIndexQueryCriterion)c;
+
+ w.writeString(range.field());
+ w.writeBoolean(range.lowerIncl());
+ w.writeBoolean(range.upperIncl());
+ w.writeBoolean(range.lowerNull());
+ w.writeBoolean(range.upperNull());
+
+ serDes.writeObject(out, range.lower());
+ serDes.writeObject(out, range.upper());
+ }
+ else {
+ throw new IllegalArgumentException(
+ String.format("Unknown IndexQuery criterion type [%s]", c.getClass().getSimpleName())
+ );
+ }
+ }
+ }
+ else
+ out.writeByte(GridBinaryMarshaller.NULL);
+ }
+
+ if (qry.getFilter() == null)
+ out.writeByte(GridBinaryMarshaller.NULL);
+ else {
+ serDes.writeObject(out, qry.getFilter());
+ out.writeByte(JAVA_PLATFORM);
+ }
+ };
+
+ return new ClientQueryCursor<>(new ClientQueryPager<>(
+ ch,
+ ClientOperation.QUERY_INDEX,
+ ClientOperation.QUERY_INDEX_CURSOR_GET_PAGE,
+ qryWriter,
+ keepBinary,
+ marsh,
+ cacheId,
+ qry.getPartition() == null ? -1 : qry.getPartition()
+ ));
+ }
+
/** Handle SQL query. */
private QueryCursor<Cache.Entry<K, V>> sqlQuery(SqlQuery qry) {
Consumer<PayloadOutputChannel> qryWriter = payloadCh -> {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java
index 04843bca0f6..1ac13755489 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java
@@ -53,6 +53,7 @@ import org.apache.ignite.internal.processors.platform.client.cache.ClientCacheGe
import org.apache.ignite.internal.processors.platform.client.cache.ClientCacheGetOrCreateWithNameRequest;
import org.apache.ignite.internal.processors.platform.client.cache.ClientCacheGetRequest;
import org.apache.ignite.internal.processors.platform.client.cache.ClientCacheGetSizeRequest;
+import org.apache.ignite.internal.processors.platform.client.cache.ClientCacheIndexQueryRequest;
import org.apache.ignite.internal.processors.platform.client.cache.ClientCacheLocalPeekRequest;
import org.apache.ignite.internal.processors.platform.client.cache.ClientCacheNodePartitionsRequest;
import org.apache.ignite.internal.processors.platform.client.cache.ClientCachePartitionsRequest;
@@ -252,6 +253,12 @@ public class ClientMessageParser implements ClientListenerMessageParser {
/** */
public static final short OP_QUERY_CONTINUOUS_EVENT_NOTIFICATION = 2007;
+ /** */
+ private static final short OP_QUERY_INDEX = 2008;
+
+ /** */
+ private static final short OP_QUERY_INDEX_CURSOR_GET_PAGE = 2009;
+
/* Binary metadata operations. */
/** */
private static final short OP_BINARY_TYPE_NAME_GET = 3000;
@@ -459,6 +466,8 @@ public class ClientMessageParser implements ClientListenerMessageParser {
case OP_QUERY_SCAN_CURSOR_GET_PAGE:
case OP_QUERY_SQL_CURSOR_GET_PAGE:
+
+ case OP_QUERY_INDEX_CURSOR_GET_PAGE:
return new ClientCacheQueryNextPageRequest(reader);
case OP_RESOURCE_CLOSE:
@@ -575,6 +584,9 @@ public class ClientMessageParser implements ClientListenerMessageParser {
case OP_QUERY_CONTINUOUS:
return new ClientCacheQueryContinuousRequest(reader);
+ case OP_QUERY_INDEX:
+ return new ClientCacheIndexQueryRequest(reader);
+
case OP_TX_START:
return new ClientTxStartRequest(reader);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheIndexQueryRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheIndexQueryRequest.java
new file mode 100644
index 00000000000..a5928cea542
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheIndexQueryRequest.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.client.cache;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.cache.query.IndexQuery;
+import org.apache.ignite.cache.query.IndexQueryCriterion;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.internal.binary.BinaryRawReaderEx;
+import org.apache.ignite.internal.cache.query.RangeIndexQueryCriterion;
+import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
+import org.apache.ignite.internal.processors.platform.client.ClientResponse;
+
+import static org.apache.ignite.internal.binary.GridBinaryMarshaller.ARR_LIST;
+
+/**
+ * IndexQuery request.
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class ClientCacheIndexQueryRequest extends ClientCacheRequest {
+ /** IndexQuery. */
+ private final IndexQuery qry;
+
+ /** Page size. */
+ private final int pageSize;
+
+ /**
+ * @param reader Reader.
+ */
+ public ClientCacheIndexQueryRequest(BinaryRawReaderEx reader) {
+ super(reader);
+
+ pageSize = reader.readInt();
+
+ boolean loc = reader.readBoolean();
+
+ int part = reader.readInt();
+
+ String valType = reader.readString();
+
+ String idxName = reader.readString();
+
+ byte arrMark = reader.readByte();
+
+ List<IndexQueryCriterion> criteria = null;
+
+ if (arrMark == ARR_LIST) {
+ int critSize = reader.readInt();
+
+ criteria = new ArrayList<>(critSize);
+
+ for (int i = 0; i < critSize; i++)
+ criteria.add(readCriterion(reader));
+ }
+
+ Object filterObj = reader.readObjectDetached();
+
+ qry = new IndexQuery(valType, idxName);
+
+ qry.setPageSize(pageSize);
+ qry.setLocal(loc);
+
+ if (part >= 0)
+ qry.setPartition(part);
+
+ if (criteria != null)
+ qry.setCriteria(Arrays.asList(criteria.toArray()));
+
+ if (filterObj != null)
+ qry.setFilter(((BinaryObject)filterObj).deserialize());
+ }
+
+ /** */
+ private IndexQueryCriterion readCriterion(BinaryRawReaderEx reader) {
+ byte type = reader.readByte();
+
+ if (type == 0) {
+ String field = reader.readString();
+
+ boolean lowerIncl = reader.readBoolean();
+ boolean upperIncl = reader.readBoolean();
+ boolean lowerNull = reader.readBoolean();
+ boolean upperNull = reader.readBoolean();
+
+ Object lower = reader.readObjectDetached();
+ Object upper = reader.readObjectDetached();
+
+ RangeIndexQueryCriterion r = new RangeIndexQueryCriterion(field, lower, upper);
+ r.lowerIncl(lowerIncl);
+ r.upperIncl(upperIncl);
+ r.lowerNull(lowerNull);
+ r.upperNull(upperNull);
+
+ return r;
+ }
+
+ throw new IgniteException("Unknown IndexQuery criterion type: " + type);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override public ClientResponse process(ClientConnectionContext ctx) {
+ IgniteCache cache = !isKeepBinary() ? rawCache(ctx) : cache(ctx);
+
+ ctx.incrementCursors();
+
+ try {
+ QueryCursor cur = cache.query(qry);
+
+ ClientCacheEntryQueryCursor cliCur = new ClientCacheEntryQueryCursor(cur, pageSize, ctx);
+
+ long cursorId = ctx.resources().put(cliCur);
+
+ cliCur.id(cursorId);
+
+ return new ClientCacheQueryResponse(requestId(), cliCur);
+ }
+ catch (Exception e) {
+ ctx.decrementCursors();
+
+ throw e;
+ }
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java b/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java
index 7e25d7b67e4..1cf34e84e11 100644
--- a/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java
@@ -316,13 +316,13 @@ public class ReliabilityTest extends AbstractThinClientTest {
String nullOpsNames = nullOps.stream().map(Enum::name).collect(Collectors.joining(", "));
- long expectedNullCount = 18;
+ long expectedNullCnt = 19;
String msg = nullOps.size()
+ " operation codes do not have public equivalent. When adding new codes, update ClientOperationType too. Missing ops: "
+ nullOpsNames;
- assertEquals(msg, expectedNullCount, nullOps.size());
+ assertEquals(msg, expectedNullCnt, nullOps.size());
}
/**
diff --git a/modules/indexing/src/test/java/org/apache/ignite/cache/query/IndexQueryTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/cache/query/IndexQueryTestSuite.java
index c9b0d905bc9..0aeec5d059f 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/cache/query/IndexQueryTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/cache/query/IndexQueryTestSuite.java
@@ -41,7 +41,8 @@ import org.junit.runners.Suite;
IndexQueryWrongIndexTest.class,
MultifieldIndexQueryTest.class,
MultiTableIndexQuery.class,
- RepeatedFieldIndexQueryTest.class
+ RepeatedFieldIndexQueryTest.class,
+ ThinClientIndexQueryTest.class
})
public class IndexQueryTestSuite {
}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/cache/query/ThinClientIndexQueryTest.java b/modules/indexing/src/test/java/org/apache/ignite/cache/query/ThinClientIndexQueryTest.java
new file mode 100644
index 00000000000..ee0f3e87f2f
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/cache/query/ThinClientIndexQueryTest.java
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.query;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+import java.util.function.Consumer;
+import javax.cache.Cache;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.apache.ignite.client.ClientCache;
+import org.apache.ignite.client.ClientException;
+import org.apache.ignite.client.IgniteClient;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.ClientConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageRequest;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.Nullable;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.ignite.cache.query.IndexQueryCriteriaBuilder.between;
+import static org.apache.ignite.cache.query.IndexQueryCriteriaBuilder.eq;
+import static org.apache.ignite.cache.query.IndexQueryCriteriaBuilder.gt;
+import static org.apache.ignite.cache.query.IndexQueryCriteriaBuilder.gte;
+import static org.apache.ignite.cache.query.IndexQueryCriteriaBuilder.lt;
+import static org.apache.ignite.cache.query.IndexQueryCriteriaBuilder.lte;
+
+/** */
+@RunWith(Parameterized.class)
+public class ThinClientIndexQueryTest extends GridCommonAbstractTest {
+ /** */
+ private static final int CNT = 10_000;
+
+ /** */
+ private static final int NODES = 2;
+
+ /** */
+ private static final String IDX_FLD1 = "IDX_FLD1";
+
+ /** */
+ private static final String IDX_FLD1_FLD2 = "IDX_FLD1_FLD2";
+
+ /** */
+ @Parameterized.Parameter
+ public boolean keepBinary;
+
+ /** */
+ @Parameterized.Parameters(name = "keepBinary={0}")
+ public static Object[] params() {
+ return new Object[] { false, true };
+ }
+
+ /** @inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String instanceName) throws Exception {
+ IgniteConfiguration ccfg = super.getConfiguration(instanceName);
+
+ ccfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
+
+ return ccfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ Ignite crd = startGrids(NODES);
+
+ crd.getOrCreateCache(new CacheConfiguration<Integer, Person>()
+ .setName("CACHE")
+ .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
+ .setIndexedTypes(Integer.class, Person.class));
+
+ try (IgniteDataStreamer<Integer, Person> stream = grid(0).dataStreamer("CACHE")) {
+ for (int i = 0; i < CNT; i++)
+ stream.addData(i, new Person(i, i));
+ }
+ }
+
+ /** */
+ @Test
+ public void testValidRanges() {
+ Random rnd = new Random();
+
+ int left = rnd.nextInt(CNT / 2);
+ int right = CNT / 2 + rnd.nextInt(CNT / 4) + 1;
+
+ for (String idxName: F.asList(IDX_FLD1, IDX_FLD1_FLD2, null)) {
+ withClientCache((cache) -> {
+ // No criteria.
+ assertClientQuery(cache, 0, CNT, idxName);
+
+ // Single field, single criterion.
+ assertClientQuery(cache, left + 1, CNT, idxName, gt("fld1", left));
+ assertClientQuery(cache, left, CNT, idxName, gte("fld1", left));
+ assertClientQuery(cache, 0, left, idxName, lt("fld1", left));
+ assertClientQuery(cache, 0, left + 1, idxName, lte("fld1", left));
+ assertClientQuery(cache, left, left + 1, idxName, eq("fld1", left));
+ assertClientQuery(cache, left, right, idxName, between("fld1", left, right));
+
+ // Single field, multiple criteria.
+ assertClientQuery(cache, left, right + 1, idxName, gte("fld1", left), lte("fld1", right));
+ });
+ }
+
+ for (String idxName: F.asList(IDX_FLD1_FLD2, null)) {
+ withClientCache((cache) -> {
+ // Multiple field, multiple criteria.
+ assertClientQuery(cache, left + 1, right, idxName, gt("fld1", left), lt("fld2", right));
+ });
+ }
+ }
+
+ /** */
+ @Test
+ public void testIndexNameMismatchCriteria() {
+ withClientCache((cache) -> {
+ for (IndexQueryCriterion[] criteria: F.asList(
+ new IndexQueryCriterion[] { lt("fld1", 100), lt("fld2", 100) },
+ new IndexQueryCriterion[] { lt("fld2", 100) }
+ )) {
+ IndexQuery<Integer, Person> idxQry = new IndexQuery<Integer, Person>(Person.class, IDX_FLD1)
+ .setCriteria(criteria);
+
+ GridTestUtils.assertThrows(
+ log,
+ () -> cache.query(idxQry).getAll(),
+ ClientException.class,
+ "Failed to parse IndexQuery. Index doesn't match criteria");
+ }
+ });
+ }
+
+ /** */
+ @Test
+ public void testPageSize() {
+ IndexQuery<Integer, Person> idxQry = new IndexQuery<>(Person.class);
+
+ withClientCache(cache -> {
+ for (int pageSize: F.asList(1, 10, 100, 1000, 10_000)) {
+ idxQry.setPageSize(pageSize);
+
+ TestRecordingCommunicationSpi.spi(grid(0)).record(GridQueryNextPageRequest.class);
+
+ assertClientQuery(cache, 0, CNT, null);
+
+ List<Object> reqs = TestRecordingCommunicationSpi.spi(grid(0)).recordedMessages(true);
+
+ for (Object r: reqs)
+ assertEquals(pageSize, ((GridQueryNextPageRequest)r).pageSize());
+ }
+
+ for (int pageSize: F.asList(-10, -1, 0)) {
+ GridTestUtils.assertThrowsAnyCause(
+ log,
+ () -> idxQry.setPageSize(pageSize),
+ IllegalArgumentException.class,
+ "Page size must be above zero");
+ }
+ });
+ }
+
+ /** */
+ @Test
+ public void testLocal() {
+ withClientCache(cache -> {
+ IndexQuery<Integer, Person> idxQry = new IndexQuery<>(Person.class);
+
+ idxQry.setLocal(true);
+
+ TestRecordingCommunicationSpi.spi(grid(0)).record(GridQueryNextPageRequest.class);
+
+ assertTrue(cache.query(idxQry).getAll().size() < CNT);
+
+ List<Object> reqs = TestRecordingCommunicationSpi.spi(grid(0)).recordedMessages(true);
+
+ assertTrue(reqs.isEmpty());
+ });
+ }
+
+ /** */
+ @Test
+ public void testFilter() {
+ IndexQuery idxQry = new IndexQuery(Person.class);
+ idxQry.setFilter((k, v) -> (int)k < 1000);
+
+ withClientCache((cache) -> assertClientQuery(cache, 0, 1000, idxQry));
+ }
+
+ /** */
+ @Test
+ public void testPartition() {
+ withClientCache(cache -> {
+ IndexQuery<Integer, Person> idxQry = new IndexQuery<>(Person.class);
+
+ for (int p = 0; p < 1024; p++) {
+ idxQry.setPartition(p);
+
+ for (int i = 0; i < NODES; i++)
+ TestRecordingCommunicationSpi.spi(grid(i)).record(GridQueryNextPageRequest.class);
+
+ List<Cache.Entry<Integer, Person>> result = cache.query(idxQry).getAll();
+
+ assertTrue(result.size() < CNT);
+
+ for (Cache.Entry<Integer, Person> e: result)
+ assertEquals(p, grid(0).affinity("CACHE").partition(e.getKey()));
+
+ for (int i = 0; i < NODES; i++) {
+ List<Object> reqs = TestRecordingCommunicationSpi.spi(grid(0)).recordedMessages(true);
+
+ assertTrue(reqs.isEmpty());
+ }
+ }
+
+ for (int part: F.asList(-10, -1)) {
+ GridTestUtils.assertThrows(
+ log,
+ () -> idxQry.setPartition(part),
+ IllegalArgumentException.class,
+ "Specified partition must be in the range");
+ }
+
+ GridTestUtils.assertThrows(
+ log,
+ () -> {
+ idxQry.setPartition(5000);
+
+ return cache.query(idxQry).getAll();
+ },
+ ClientException.class,
+ "Specified partition must be in the range");
+ });
+ }
+
+ /** */
+ @Test
+ public void testWrongIndexQueryCriterion() {
+ withClientCache(cache -> {
+ IndexQuery<Integer, Person> idxQry = new IndexQuery<>(Person.class);
+ idxQry.setCriteria(new IndexQueryCriterion() {
+ @Override public String field() {
+ return null;
+ }
+ });
+
+ GridTestUtils.assertThrowsAnyCause(
+ log,
+ () -> cache.query(idxQry).getAll(),
+ IllegalArgumentException.class,
+ "Unknown IndexQuery criterion type");
+ });
+ }
+
+ /** */
+ private void assertClientQuery(
+ ClientCache<Integer, Person> cache,
+ int left,
+ int right,
+ @Nullable String idxName,
+ IndexQueryCriterion... crit
+ ) {
+ IndexQuery<Integer, Person> idxQry = new IndexQuery<Integer, Person>(Person.class, idxName)
+ .setCriteria(crit);
+
+ assertClientQuery(cache, left, right, idxQry);
+ }
+
+ /** */
+ private void assertClientQuery(ClientCache<Integer, Person> cache, int left, int right, IndexQuery idxQry) {
+ Iterator<Cache.Entry<Integer, Person>> cursor = cache.query(idxQry).iterator();
+
+ for (int i = left; i < right; i++) {
+ Cache.Entry<Integer, Person> e = cursor.next();
+
+ assertEquals(i, e.getKey().intValue());
+
+ if (keepBinary) {
+ assertEquals(i, (int)((BinaryObject)e.getValue()).field("fld1"));
+ assertEquals(i, (int)((BinaryObject)e.getValue()).field("fld2"));
+ }
+ else {
+ assertEquals(i, e.getValue().fld1);
+ assertEquals(i, e.getValue().fld2);
+ }
+ }
+ }
+
+ /** */
+ private void withClientCache(Consumer<ClientCache<Integer, Person>> consumer) {
+ ClientConfiguration clnCfg = new ClientConfiguration()
+ .setAddresses("127.0.0.1:10800");
+
+ try (IgniteClient cln = Ignition.startClient(clnCfg)) {
+ ClientCache<Integer, Person> cache = cln.cache("CACHE");
+
+ if (keepBinary)
+ cache = cache.withKeepBinary();
+
+ consumer.accept(cache);
+ }
+ }
+
+ /** */
+ private static class Person {
+ /** */
+ @GridToStringInclude
+ @QuerySqlField(orderedGroups = {
+ @QuerySqlField.Group(name = IDX_FLD1, order = 0),
+ @QuerySqlField.Group(name = IDX_FLD1_FLD2, order = 0)
+ })
+ final int fld1;
+
+ /** */
+ @GridToStringInclude
+ @QuerySqlField(orderedGroups = {
+ @QuerySqlField.Group(name = IDX_FLD1_FLD2, order = 1)
+ })
+ final int fld2;
+
+ /** */
+ Person(int fld1, int fld2) {
+ this.fld1 = fld1;
+ this.fld2 = fld2;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(Person.class, this);
+ }
+ }
+}