You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by mm...@apache.org on 2021/10/20 13:11:38 UTC
[ignite] branch master updated: IGNITE-14703 Add MergeSort
distributed cache query reducer. (#9081)
This is an automated email from the ASF dual-hosted git repository.
mmuzaf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new ea52fa4 IGNITE-14703 Add MergeSort distributed cache query reducer. (#9081)
ea52fa4 is described below
commit ea52fa4719ffff0f330c98c83347aa6e5547ac9b
Author: Maksim Timonin <ti...@gmail.com>
AuthorDate: Wed Oct 20 16:11:07 2021 +0300
IGNITE-14703 Add MergeSort distributed cache query reducer. (#9081)
---
.../processors/cache/CacheObjectUtils.java | 2 +-
.../processors/cache/IgniteCacheProxyImpl.java | 2 +-
.../GridCacheDistributedFieldsQueryFuture.java | 10 +-
.../query/GridCacheDistributedQueryFuture.java | 337 ++++++++++---------
.../query/GridCacheDistributedQueryManager.java | 230 +++----------
.../query/GridCacheLocalFieldsQueryFuture.java | 2 +-
.../cache/query/GridCacheLocalQueryFuture.java | 35 +-
.../cache/query/GridCacheLocalQueryManager.java | 5 -
.../cache/query/GridCacheQueryAdapter.java | 2 +-
.../cache/query/GridCacheQueryFutureAdapter.java | 358 ++++++---------------
.../cache/query/GridCacheQueryManager.java | 40 +--
.../cache/query/GridCacheQueryRequest.java | 85 ++++-
.../processors/cache/query/ScoredCacheEntry.java | 68 ++++
.../cache/query/reducer/CacheQueryReducer.java | 70 ++++
.../query/reducer/MergeSortCacheQueryReducer.java | 104 ++++++
.../processors/cache/query/reducer/NodePage.java | 80 +++++
.../cache/query/reducer/NodePageStream.java | 127 ++++++++
.../query/reducer/UnsortedCacheQueryReducer.java | 83 +++++
.../processors/query/h2/opt/GridLuceneIndex.java | 11 +-
.../cache/GridCacheFullTextQueryAbstractTest.java | 69 ++++
.../cache/GridCacheFullTextQueryFailoverTest.java | 77 +++++
.../cache/GridCacheFullTextQueryLimitTest.java | 94 ++++++
...ridCacheFullTextQueryMultithreadedSelfTest.java | 3 +-
.../cache/GridCacheFullTextQueryPagesTest.java | 182 +++++++++++
.../IgniteBinaryCacheQueryTestSuite.java | 6 +
.../org/apache/ignite/util/KillCommandsTests.java | 13 +-
26 files changed, 1409 insertions(+), 686 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectUtils.java
index ed278b1..cf36ce1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectUtils.java
@@ -177,7 +177,7 @@ public class CacheObjectUtils {
* @param ldr Class loader, used for deserialization from binary representation.
* @return Unwrapped object.
*/
- private static Object unwrapBinary(
+ public static Object unwrapBinary(
CacheObjectValueContext ctx,
Object o,
boolean keepBinary,
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
index eada11c..81d3247 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
@@ -545,7 +545,7 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
if (query instanceof TextQuery) {
TextQuery q = (TextQuery)query;
- qry = ctx.queries().createFullTextQuery(q.getType(), q.getText(), q.getLimit(), isKeepBinary);
+ qry = ctx.queries().createFullTextQuery(q.getType(), q.getText(), q.getLimit(), q.getPageSize(), isKeepBinary);
if (grp != null)
qry.projection(grp);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedFieldsQueryFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedFieldsQueryFuture.java
index 743dc42..a730347 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedFieldsQueryFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedFieldsQueryFuture.java
@@ -34,9 +34,6 @@ import org.jetbrains.annotations.Nullable;
public class GridCacheDistributedFieldsQueryFuture
extends GridCacheDistributedQueryFuture<Object, Object, List<Object>>
implements GridCacheQueryMetadataAware {
- /** */
- private static final long serialVersionUID = 0L;
-
/** Meta data future. */
private final GridFutureAdapter<List<GridQueryFieldMetadata>> metaFut;
@@ -44,10 +41,9 @@ public class GridCacheDistributedFieldsQueryFuture
* @param ctx Cache context.
* @param reqId Request ID.
* @param qry Query.
- * @param nodes Nodes.
*/
- public GridCacheDistributedFieldsQueryFuture(GridCacheContext<?, ?> ctx, long reqId,
- GridCacheQueryBean qry, Iterable<ClusterNode> nodes) {
+ public GridCacheDistributedFieldsQueryFuture(GridCacheContext<?, ?> ctx, long reqId, GridCacheQueryBean qry,
+ Collection<ClusterNode> nodes) {
super((GridCacheContext<Object, Object>)ctx, reqId, qry, nodes);
metaFut = new GridFutureAdapter<>();
@@ -93,7 +89,7 @@ public class GridCacheDistributedFieldsQueryFuture
}
/** {@inheritDoc} */
- @Override boolean fields() {
+ @Override public boolean fields() {
return true;
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java
index 8f08dea..1efc895 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java
@@ -17,257 +17,168 @@
package org.apache.ignite.internal.processors.cache.query;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteIllegalStateException;
import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.query.reducer.MergeSortCacheQueryReducer;
+import org.apache.ignite.internal.processors.cache.query.reducer.NodePageStream;
+import org.apache.ignite.internal.processors.cache.query.reducer.UnsortedCacheQueryReducer;
import org.apache.ignite.internal.util.lang.GridPlainCallable;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.P1;
import org.apache.ignite.internal.util.typedef.internal.U;
+import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.TEXT;
+
/**
* Distributed query future.
*/
public class GridCacheDistributedQueryFuture<K, V, R> extends GridCacheQueryFutureAdapter<K, V, R> {
/** */
- private static final long serialVersionUID = 0L;
+ private final long reqId;
- /** */
- private long reqId;
+ /** Helps to send cache query requests to other nodes. */
+ private final GridCacheDistributedQueryManager<K, V> qryMgr;
- /** */
- private final Collection<UUID> subgrid = new HashSet<>();
+ /** Collection of streams. */
+ private final Map<UUID, NodePageStream<R>> streams;
- /** */
- private final Collection<UUID> rcvd = new HashSet<>();
+ /** Count of streams that finish receiving remote pages. */
+ private final AtomicInteger noRemotePagesStreamsCnt = new AtomicInteger();
- /** */
- private CountDownLatch firstPageLatch = new CountDownLatch(1);
+ /** Count down this latch when every node responses on initial cache query request. */
+ private final CountDownLatch firstPageLatch = new CountDownLatch(1);
+
+ /** Set of nodes that deliver their first page. */
+ private Set<UUID> rcvdFirstPage = ConcurrentHashMap.newKeySet();
/**
* @param ctx Cache context.
* @param reqId Request ID.
* @param qry Query.
- * @param nodes Nodes.
*/
- protected GridCacheDistributedQueryFuture(GridCacheContext<K, V> ctx, long reqId, GridCacheQueryBean qry,
- Iterable<ClusterNode> nodes) {
+ protected GridCacheDistributedQueryFuture(
+ GridCacheContext<K, V> ctx,
+ long reqId,
+ GridCacheQueryBean qry,
+ Collection<ClusterNode> nodes
+ ) {
super(ctx, qry, false);
assert reqId > 0;
this.reqId = reqId;
- GridCacheQueryManager<K, V> mgr = ctx.queries();
+ qryMgr = (GridCacheDistributedQueryManager<K, V>) ctx.queries();
- assert mgr != null;
+ streams = new ConcurrentHashMap<>(nodes.size());
- synchronized (this) {
- for (ClusterNode node : nodes)
- subgrid.add(node.id());
+ for (ClusterNode node : nodes) {
+ streams.computeIfAbsent(node.id(), nodeId ->
+ new NodePageStream<>(nodeId, () -> requestPages(nodeId), () -> cancelPages(nodeId)));
}
- }
- /** {@inheritDoc} */
- @Override protected void cancelQuery() throws IgniteCheckedException {
- final GridCacheQueryManager<K, V> qryMgr = cctx.queries();
-
- assert qryMgr != null;
-
- try {
- Collection<ClusterNode> allNodes = cctx.discovery().allNodes();
- Collection<ClusterNode> nodes;
-
- synchronized (this) {
- nodes = F.retain(allNodes, true,
- new P1<ClusterNode>() {
- @Override public boolean apply(ClusterNode node) {
- return !cctx.localNodeId().equals(node.id()) && subgrid.contains(node.id());
- }
- }
- );
-
- subgrid.clear();
- }
+ Map<UUID, NodePageStream<R>> streamsMap = Collections.unmodifiableMap(streams);
- final GridCacheQueryRequest req = new GridCacheQueryRequest(cctx.cacheId(),
- reqId,
- fields(),
- qryMgr.queryTopologyVersion(),
- cctx.deploymentEnabled());
+ reducer = qry.query().type() == TEXT ?
+ new MergeSortCacheQueryReducer<>(streamsMap)
+ : new UnsortedCacheQueryReducer<>(streamsMap);
+ }
- // Process cancel query directly (without sending) for local node,
- cctx.closures().callLocalSafe(new GridPlainCallable<Object>() {
- @Override public Object call() {
- qryMgr.processQueryRequest(cctx.localNodeId(), req);
+ /** {@inheritDoc} */
+ @Override protected void cancelQuery(Throwable err) {
+ firstPageLatch.countDown();
- return null;
- }
- });
+ for (NodePageStream<R> s : streams.values())
+ s.cancel(err);
- if (!nodes.isEmpty()) {
- for (ClusterNode node : nodes) {
- try {
- cctx.io().send(node, req, cctx.ioPolicy());
- }
- catch (IgniteCheckedException e) {
- if (cctx.io().checkNodeLeft(node.id(), e, false)) {
- if (log.isDebugEnabled())
- log.debug("Failed to send cancel request, node failed: " + node);
- }
- else
- U.error(log, "Failed to send cancel request [node=" + node + ']', e);
- }
- }
- }
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to send cancel request (will cancel query in any case).", e);
- }
-
- qryMgr.onQueryFutureCanceled(reqId);
+ cctx.queries().onQueryFutureCanceled(reqId);
clear();
}
/** {@inheritDoc} */
- @SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext")
@Override protected void onNodeLeft(UUID nodeId) {
- boolean callOnPage;
+ boolean hasRemotePages = streams.get(nodeId).hasRemotePages();
- synchronized (this) {
- callOnPage = !loc && subgrid.contains(nodeId);
- }
-
- if (callOnPage)
- onPage(nodeId, Collections.emptyList(),
- new ClusterTopologyCheckedException("Remote node has left topology: " + nodeId), true);
+ if (hasRemotePages)
+ onError(new ClusterTopologyCheckedException("Remote node has left topology: " + nodeId));
}
/** {@inheritDoc} */
- @Override public void awaitFirstPage() throws IgniteCheckedException {
- try {
- firstPageLatch.await();
+ @Override protected void onPage(UUID nodeId, Collection<R> data, boolean last) {
+ synchronized (firstPageLatch) {
+ if (rcvdFirstPage != null) {
+ rcvdFirstPage.add(nodeId);
- if (isDone() && error() != null)
- // Throw the exception if future failed.
- get();
- }
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
+ if (rcvdFirstPage.size() == streams.size()) {
+ firstPageLatch.countDown();
- throw new IgniteInterruptedCheckedException(e);
+ rcvdFirstPage.clear();
+ rcvdFirstPage = null;
+ }
+ }
}
- }
- /** {@inheritDoc} */
- @Override protected boolean onPage(UUID nodeId, boolean last) {
- assert Thread.holdsLock(this);
+ NodePageStream<R> stream = streams.get(nodeId);
- if (!loc) {
- rcvd.add(nodeId);
+ if (stream == null)
+ return;
- if (rcvd.containsAll(subgrid))
- firstPageLatch.countDown();
- }
-
- boolean futFinish;
+ stream.addPage(data, last);
if (last) {
- futFinish = loc || (subgrid.remove(nodeId) && subgrid.isEmpty());
-
- if (futFinish)
- firstPageLatch.countDown();
- }
- else
- futFinish = false;
-
- return futFinish;
- }
+ int cnt;
- /** {@inheritDoc} */
- @Override protected void loadPage() {
- assert !Thread.holdsLock(this);
-
- Collection<ClusterNode> nodes = null;
+ do {
+ cnt = noRemotePagesStreamsCnt.get();
- synchronized (this) {
- if (!isDone() && rcvd.containsAll(subgrid)) {
- rcvd.clear();
+ } while (!noRemotePagesStreamsCnt.compareAndSet(cnt, cnt + 1));
- nodes = nodes();
- }
+ if (cnt + 1 >= streams.size())
+ onDone();
}
-
- if (nodes != null)
- cctx.queries().loadPage(reqId, qry.query(), nodes, false);
}
/** {@inheritDoc} */
- @Override protected void loadAllPages() throws IgniteInterruptedCheckedException {
- assert !Thread.holdsLock(this);
-
+ @Override public void awaitFirstItemAvailable() throws IgniteCheckedException {
U.await(firstPageLatch);
- Collection<ClusterNode> nodes = null;
-
- synchronized (this) {
- if (!isDone() && !subgrid.isEmpty())
- nodes = nodes();
- }
-
- if (nodes != null)
- cctx.queries().loadPage(reqId, qry.query(), nodes, true);
- }
-
- /**
- * @return Nodes to send requests to.
- */
- private Collection<ClusterNode> nodes() {
- assert Thread.holdsLock(this);
-
- Collection<ClusterNode> nodes = new ArrayList<>(subgrid.size());
-
- for (UUID nodeId : subgrid) {
- ClusterNode node = cctx.discovery().node(nodeId);
-
- if (node != null)
- nodes.add(node);
- }
-
- return nodes;
+ if (isDone() && error() != null)
+ // Throw the exception if future failed.
+ super.get();
}
/** {@inheritDoc} */
- @Override public boolean onDone(Collection<R> res, Throwable err) {
- boolean done = super.onDone(res, err);
-
- // Must release the lath after onDone() in order for a waiting thread to see an exception, if any.
- firstPageLatch.countDown();
-
- return done;
+ @Override public Collection<R> get() throws IgniteCheckedException {
+ return get0();
}
/** {@inheritDoc} */
- @Override public boolean onCancelled() {
- firstPageLatch.countDown();
-
- return super.onCancelled();
+ @Override public Collection<R> get(long timeout, TimeUnit unit) throws IgniteCheckedException {
+ return get0();
}
/** {@inheritDoc} */
- @Override public void onTimeout() {
- firstPageLatch.countDown();
+ @Override public Collection<R> getUninterruptibly() throws IgniteCheckedException {
+ return get0();
+ }
- super.onTimeout();
+ /**
+ * Completion of distributed query future depends on user that iterates over query result with lazy page loading.
+ * Then {@link #get()} can lock on unpredictably long period of time. So we should avoid call it.
+ */
+ private Collection<R> get0() {
+ throw new IgniteIllegalStateException("Unexpected lock on iterator over distributed cache query result.");
}
/** {@inheritDoc} */
@@ -284,4 +195,82 @@ public class GridCacheDistributedQueryFuture<K, V, R> extends GridCacheQueryFutu
long requestId() {
return reqId;
}
+
+ /**
+ * Send initial query request to query nodes.
+ */
+ public void startQuery() {
+ try {
+ GridCacheQueryRequest req = GridCacheQueryRequest.startQueryRequest(cctx, reqId, this);
+
+ qryMgr.sendRequest(this, req, streams.keySet());
+ }
+ catch (IgniteCheckedException e) {
+ onError(e);
+ }
+ }
+
+ /**
+ * Send request to fetch new pages.
+ *
+ * @param nodeId Node to send request.
+ */
+ private void requestPages(UUID nodeId) {
+ try {
+ GridCacheQueryRequest req = GridCacheQueryRequest.pageRequest(cctx, reqId, query().query(), fields());
+
+ qryMgr.sendRequest(this, req, Collections.singletonList(nodeId));
+ }
+ catch (IgniteCheckedException e) {
+ onError(e);
+ }
+ }
+
+ /**
+ * Cancels remote pages.
+ *
+ * @param nodeId Node to send request.
+ */
+ // TODO IGNITE-15731: Refactor how CacheQueryReducer handles remote nodes.
+ private void cancelPages(UUID nodeId) {
+ try {
+ GridCacheQueryRequest req = GridCacheQueryRequest.cancelRequest(cctx, reqId, fields());
+
+ if (nodeId.equals(cctx.localNodeId())) {
+ // Process cancel query directly (without sending) for local node,
+ cctx.closures().callLocalSafe(new GridPlainCallable<Object>() {
+ @Override public Object call() {
+ qryMgr.processQueryRequest(cctx.localNodeId(), req);
+
+ return null;
+ }
+ });
+ }
+ else {
+ try {
+ cctx.io().send(nodeId, req, cctx.ioPolicy());
+ }
+ catch (IgniteCheckedException e) {
+ if (cctx.io().checkNodeLeft(nodeId, e, false)) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to send cancel request, node failed: " + nodeId);
+ }
+ else
+ U.error(log, "Failed to send cancel request [node=" + nodeId + ']', e);
+ }
+ }
+ }
+ catch (IgniteCheckedException e) {
+ U.error(logger(), "Failed to send cancel request (will cancel query in any case).", e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void onError(Throwable err) {
+ if (onDone(err)) {
+ streams.values().forEach(s -> s.cancel(err));
+
+ firstPageLatch.countDown();
+ }
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
index 770542b..6fb0907 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
@@ -37,7 +37,6 @@ import org.apache.ignite.internal.managers.communication.GridIoPolicy;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.metric.IoStatisticsHolder;
import org.apache.ignite.internal.metric.IoStatisticsQueryHelper;
-import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
import org.apache.ignite.internal.util.GridBoundedConcurrentOrderedSet;
import org.apache.ignite.internal.util.GridCloseableIteratorAdapter;
@@ -81,7 +80,7 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
private ConcurrentMap<Long, Thread> threads = new ConcurrentHashMap<>();
/** {request ID -> future} */
- private ConcurrentMap<Long, GridCacheDistributedQueryFuture<?, ?, ?>> futs =
+ private final ConcurrentMap<Long, GridCacheDistributedQueryFuture<?, ?, ?>> futs =
new ConcurrentHashMap<>();
/** Received requests to cancel. */
@@ -158,24 +157,6 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
* Removes query future from futures map.
*
* @param reqId Request id.
- * @param fut Query future.
- */
- protected void addQueryFuture(long reqId, GridCacheDistributedQueryFuture<?, ?, ?> fut) {
- futs.put(reqId, fut);
-
- if (cctx.kernalContext().clientDisconnected()) {
- IgniteClientDisconnectedCheckedException err = new IgniteClientDisconnectedCheckedException(
- cctx.kernalContext().cluster().clientReconnectFuture(),
- "Query was cancelled, client node disconnected.");
-
- fut.onDone(err);
- }
- }
-
- /**
- * Removes query future from futures map.
- *
- * @param reqId Request id.
*/
protected void removeQueryFuture(long reqId) {
futs.remove(reqId);
@@ -197,7 +178,7 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
* @param sndId Sender node id.
* @param req Query request.
*/
- @Override void processQueryRequest(UUID sndId, GridCacheQueryRequest req) {
+ @Override public void processQueryRequest(UUID sndId, GridCacheQueryRequest req) {
assert req.mvccSnapshot() != null || !cctx.mvccEnabled() || req.cancel() ||
(req.type() == null && !req.fields()) : req; // Last assertion means next page request.
@@ -430,22 +411,6 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
}
/** {@inheritDoc} */
- @Override void onWaitAtStop() {
- super.onWaitAtStop();
-
- // Wait till all requests will be finished.
- for (GridCacheQueryFutureAdapter fut : futs.values())
- try {
- fut.get();
- }
- catch (IgniteCheckedException e) {
- if (log.isDebugEnabled())
- log.debug("Received query error while waiting for query to finish [queryFuture= " + fut +
- ", error= " + e + ']');
- }
- }
-
- /** {@inheritDoc} */
@Override protected boolean onPageReady(boolean loc, GridCacheQueryInfo qryInfo,
Collection<?> data, boolean finished, Throwable e) {
GridCacheLocalQueryFuture<?, ?, ?> fut = qryInfo.localQueryFuture();
@@ -523,8 +488,12 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
}
/** {@inheritDoc} */
- @SuppressWarnings("unchecked")
@Override public CacheQueryFuture<?> queryDistributed(GridCacheQueryBean qry, final Collection<ClusterNode> nodes) {
+ return queryDistributed(qry, nodes, false);
+ }
+
+ /** */
+ private CacheQueryFuture<?> queryDistributed(GridCacheQueryBean qry, final Collection<ClusterNode> nodes, boolean fields) {
assert cctx.config().getCacheMode() != LOCAL;
if (log.isDebugEnabled())
@@ -532,48 +501,14 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
long reqId = cctx.io().nextIoId();
- final GridCacheDistributedQueryFuture<K, V, ?> fut =
- new GridCacheDistributedQueryFuture<>(cctx, reqId, qry, nodes);
+ final GridCacheDistributedQueryFuture fut = fields ?
+ new GridCacheDistributedFieldsQueryFuture(cctx, reqId, qry, nodes) :
+ new GridCacheDistributedQueryFuture(cctx, reqId, qry, nodes);
try {
- qry.query().validate();
-
- String clsName = qry.query().queryClassName();
- Boolean dataPageScanEnabled = qry.query().isDataPageScanEnabled();
- MvccSnapshot mvccSnapshot = qry.query().mvccSnapshot();
-
- boolean deployFilterOrTransformer = (qry.query().scanFilter() != null || qry.query().transform() != null
- || qry.query().idxQryDesc() != null) && cctx.gridDeploy().enabled();
-
- final GridCacheQueryRequest req = new GridCacheQueryRequest(
- cctx.cacheId(),
- reqId,
- cctx.name(),
- qry.query().type(),
- false,
- qry.query().clause(),
- qry.query().idxQryDesc(),
- qry.query().limit(),
- clsName,
- qry.query().scanFilter(),
- qry.query().partition(),
- qry.reducer(),
- qry.transform(),
- qry.query().pageSize(),
- qry.query().includeBackups(),
- qry.arguments(),
- false,
- qry.query().keepBinary(),
- qry.query().taskHash(),
- queryTopologyVersion(),
- mvccSnapshot,
- // Force deployment anyway if scan query is used.
- cctx.deploymentEnabled() || deployFilterOrTransformer,
- dataPageScanEnabled);
-
- addQueryFuture(req.id(), fut);
+ fut.qry.query().validate();
- final Object topic = topic(cctx.nodeId(), req.id());
+ final Object topic = topic(cctx.nodeId(), reqId);
cctx.io().addOrderedCacheHandler(cctx.shared(), topic, resHnd);
@@ -583,17 +518,27 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
}
});
- sendRequest(fut, req, nodes);
+ futs.put(reqId, fut);
+
+ if (cctx.kernalContext().clientDisconnected()) {
+ IgniteClientDisconnectedCheckedException err = new IgniteClientDisconnectedCheckedException(
+ cctx.kernalContext().cluster().clientReconnectFuture(),
+ "Query was cancelled, client node disconnected.");
+
+ fut.onError(err);
+ }
+
+ fut.startQuery();
}
catch (IgniteCheckedException e) {
- fut.onDone(e);
+ fut.onError(e);
}
return fut;
}
/** {@inheritDoc} */
- @SuppressWarnings({"unchecked", "serial"})
+ @SuppressWarnings({"unchecked"})
@Override public GridCloseableIterator scanQueryDistributed(final GridCacheQueryAdapter qry,
Collection<ClusterNode> nodes) throws IgniteCheckedException {
assert !cctx.isLocal() : cctx.name();
@@ -722,39 +667,6 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
}
/** {@inheritDoc} */
- @Override public void loadPage(long id, GridCacheQueryAdapter<?> qry, Collection<ClusterNode> nodes, boolean all) {
- assert cctx.config().getCacheMode() != LOCAL;
- assert qry != null;
- assert nodes != null;
-
- GridCacheDistributedQueryFuture<?, ?, ?> fut = futs.get(id);
-
- assert fut != null;
-
- try {
- GridCacheQueryRequest req = new GridCacheQueryRequest(
- cctx.cacheId(),
- id,
- cctx.name(),
- qry.pageSize(),
- qry.includeBackups(),
- fut.fields(),
- all,
- qry.keepBinary(),
- qry.taskHash(),
- queryTopologyVersion(),
- // Force deployment anyway if scan query is used.
- cctx.deploymentEnabled() || (qry.scanFilter() != null && cctx.gridDeploy().enabled()),
- qry.isDataPageScanEnabled());
-
- sendRequest(fut, req, nodes);
- }
- catch (IgniteCheckedException e) {
- fut.onDone(e);
- }
- }
-
- /** {@inheritDoc} */
@Override public CacheQueryFuture<?> queryFieldsLocal(GridCacheQueryBean qry) {
assert cctx.config().getCacheMode() != LOCAL;
@@ -776,65 +688,8 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
}
/** {@inheritDoc} */
- @Override public CacheQueryFuture<?> queryFieldsDistributed(GridCacheQueryBean qry,
- Collection<ClusterNode> nodes) {
- assert cctx.config().getCacheMode() != LOCAL;
-
- if (log.isDebugEnabled())
- log.debug("Executing distributed query: " + qry);
-
- long reqId = cctx.io().nextIoId();
-
- final GridCacheDistributedFieldsQueryFuture fut =
- new GridCacheDistributedFieldsQueryFuture(cctx, reqId, qry, nodes);
-
- try {
- qry.query().validate();
-
- GridCacheQueryRequest req = new GridCacheQueryRequest(
- cctx.cacheId(),
- reqId,
- cctx.name(),
- qry.query().type(),
- true,
- qry.query().clause(),
- null,
- qry.query().limit(),
- null,
- null,
- null,
- qry.reducer(),
- qry.transform(),
- qry.query().pageSize(),
- qry.query().includeBackups(),
- qry.arguments(),
- qry.query().includeMetadata(),
- qry.query().keepBinary(),
- qry.query().taskHash(),
- queryTopologyVersion(),
- null,
- cctx.deploymentEnabled(),
- qry.query().isDataPageScanEnabled());
-
- addQueryFuture(req.id(), fut);
-
- final Object topic = topic(cctx.nodeId(), req.id());
-
- cctx.io().addOrderedCacheHandler(cctx.shared(), topic, resHnd);
-
- fut.listen(new CI1<IgniteInternalFuture<?>>() {
- @Override public void apply(IgniteInternalFuture<?> fut) {
- cctx.io().removeOrderedHandler(false, topic);
- }
- });
-
- sendRequest(fut, req, nodes);
- }
- catch (IgniteCheckedException e) {
- fut.onDone(e);
- }
-
- return fut;
+ @Override public CacheQueryFuture<?> queryFieldsDistributed(GridCacheQueryBean qry, Collection<ClusterNode> nodes) {
+ return queryDistributed(qry, nodes, true);
}
/**
@@ -842,32 +697,33 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
*
* @param fut Distributed future.
* @param req Request.
- * @param nodes Nodes.
+ * @param nodeIds Nodes.
* @throws IgniteCheckedException In case of error.
*/
- private void sendRequest(
+ // TODO IGNITE-15731: Refactor how CacheQueryReducer handles remote nodes.
+ public void sendRequest(
final GridCacheDistributedQueryFuture<?, ?, ?> fut,
final GridCacheQueryRequest req,
- Collection<ClusterNode> nodes
+ Collection<UUID> nodeIds
) throws IgniteCheckedException {
assert fut != null;
assert req != null;
- assert nodes != null;
+ assert nodeIds != null;
final UUID locNodeId = cctx.localNodeId();
- ClusterNode locNode = null;
+ boolean locNode = false;
- Collection<ClusterNode> rmtNodes = null;
+ Collection<UUID> rmtNodes = null;
- for (ClusterNode n : nodes) {
- if (n.id().equals(locNodeId))
- locNode = n;
+ for (UUID nodeId: nodeIds) {
+ if (nodeId.equals(locNodeId))
+ locNode = true;
else {
if (rmtNodes == null)
- rmtNodes = new ArrayList<>(nodes.size());
+ rmtNodes = new ArrayList<>(nodeIds.size());
- rmtNodes.add(n);
+ rmtNodes.add(nodeId);
}
}
@@ -875,13 +731,13 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
// For example, a remote reducer has a state, we should not serialize and then send
// the reducer changed by the local node.
if (!F.isEmpty(rmtNodes)) {
- for (ClusterNode node : rmtNodes) {
+ for (UUID node : rmtNodes) {
try {
cctx.io().send(node, req, GridIoPolicy.QUERY_POOL);
}
catch (IgniteCheckedException e) {
- if (cctx.io().checkNodeLeft(node.id(), e, true)) {
- fut.onNodeLeft(node.id());
+ if (cctx.io().checkNodeLeft(node, e, true)) {
+ fut.onNodeLeft(node);
if (fut.isDone())
return;
@@ -892,7 +748,7 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
}
}
- if (locNode != null) {
+ if (locNode) {
cctx.closures().callLocalSafe(new GridPlainCallable<Object>() {
@Override public Object call() throws Exception {
req.beforeLocalExecution(cctx);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalFieldsQueryFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalFieldsQueryFuture.java
index e83c35f..20e4066 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalFieldsQueryFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalFieldsQueryFuture.java
@@ -72,7 +72,7 @@ public class GridCacheLocalFieldsQueryFuture
}
/** {@inheritDoc} */
- @Override boolean fields() {
+ @Override public boolean fields() {
return true;
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryFuture.java
index bd60ed3..d8fd4f2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryFuture.java
@@ -17,12 +17,17 @@
package org.apache.ignite.internal.processors.cache.query;
+import java.util.Collection;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.managers.communication.GridIoPolicy;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.query.reducer.CacheQueryReducer;
+import org.apache.ignite.internal.processors.cache.query.reducer.NodePageStream;
+import org.apache.ignite.internal.processors.cache.query.reducer.UnsortedCacheQueryReducer;
import org.apache.ignite.internal.util.lang.GridPlainRunnable;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteReducer;
@@ -33,14 +38,14 @@ import org.apache.ignite.marshaller.Marshaller;
*/
public class GridCacheLocalQueryFuture<K, V, R> extends GridCacheQueryFutureAdapter<K, V, R> {
/** */
- private static final long serialVersionUID = 0L;
-
- /** */
private Runnable run;
/** */
private IgniteInternalFuture<?> fut;
+ /** Local node page stream with single page. */
+ private final NodePageStream<R> stream;
+
/**
* @param ctx Context.
* @param qry Query.
@@ -49,6 +54,10 @@ public class GridCacheLocalQueryFuture<K, V, R> extends GridCacheQueryFutureAdap
super(ctx, qry, true);
run = new LocalQueryRunnable();
+
+ stream = new NodePageStream<>(ctx.localNodeId(), () -> {}, () -> {});
+
+ reducer = new UnsortedCacheQueryReducer<>(F.asMap(stream.nodeId(), stream));
}
/**
@@ -59,29 +68,27 @@ public class GridCacheLocalQueryFuture<K, V, R> extends GridCacheQueryFutureAdap
}
/** {@inheritDoc} */
- @Override protected void cancelQuery() throws IgniteCheckedException {
+ @Override protected void cancelQuery(Throwable err) throws IgniteCheckedException {
if (fut != null)
fut.cancel();
- }
- /** {@inheritDoc} */
- @Override protected boolean onPage(UUID nodeId, boolean last) {
- return last;
+ stream.cancel(err);
}
/** {@inheritDoc} */
- @Override protected void loadPage() {
- // No-op.
+ @Override public void awaitFirstItemAvailable() throws IgniteCheckedException {
+ CacheQueryReducer.get(stream.headPage());
}
/** {@inheritDoc} */
- @Override protected void loadAllPages() {
- // No-op.
+ @Override protected void onError(Throwable err) {
+ if (onDone(err))
+ stream.cancel(err);
}
/** {@inheritDoc} */
- @Override public void awaitFirstPage() throws IgniteCheckedException {
- get();
+ @Override protected void onPage(UUID nodeId, Collection<R> data, boolean lastPage) {
+ stream.addPage(data, lastPage);
}
/** */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryManager.java
index 147725b..4d1e20e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryManager.java
@@ -98,11 +98,6 @@ public class GridCacheLocalQueryManager<K, V> extends GridCacheQueryManager<K, V
}
/** {@inheritDoc} */
- @Override public void loadPage(long id, GridCacheQueryAdapter<?> qry, Collection<ClusterNode> nodes, boolean all) {
- // No-op.
- }
-
- /** {@inheritDoc} */
@Override public CacheQueryFuture<?> queryFieldsLocal(GridCacheQueryBean qry) {
assert cctx.config().getCacheMode() == LOCAL;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
index 68c5b86..8a60d54 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
@@ -907,7 +907,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
return (cur = convert(fut.next())) != null;
try {
- fut.awaitFirstPage();
+ fut.awaitFirstItemAvailable();
firstItemReturned = true;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java
index ee91c8d..691ea1c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java
@@ -20,26 +20,18 @@ package org.apache.ignite.internal.processors.cache.query;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.Queue;
import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
-import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.processors.cache.CacheObjectUtils;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.query.reducer.CacheQueryReducer;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.C1;
import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -47,7 +39,10 @@ import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;
/**
- * Query future adapter.
+ * Query future adapter. Future marked as done after all result pages are ready. Note that completeness of this future
+ * doesn't depend on whether all data are delievered to user or not. This class provides {@code Iterator} interface to
+ * access result data. Handling of recieved pages is triggered by {@code Iterator} methods.
+ * Reducing of result data (order, limit) is controlled by {@link CacheQueryReducer}.
*
* @param <R> Result type.
*/
@@ -74,23 +69,11 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> extends GridFutureAda
/** */
private boolean limitDisabled;
- /** Set of received keys used to deduplicate query result set. */
- private final Collection<K> keys;
-
- /** */
- private final Queue<Collection<R>> queue = new LinkedList<>();
-
- /** */
- private final AtomicInteger cnt = new AtomicInteger();
-
- /** */
- private Iterator<R> iter;
-
/** */
- private IgniteUuid timeoutId = IgniteUuid.randomUuid();
+ private int cnt;
/** */
- private long startTime;
+ private final IgniteUuid timeoutId = IgniteUuid.randomUuid();
/** */
private long endTime;
@@ -98,10 +81,12 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> extends GridFutureAda
/** */
protected boolean loc;
+ /** Reducer of cache query results. */
+ protected CacheQueryReducer<R> reducer;
+
/** */
protected GridCacheQueryFutureAdapter() {
qry = null;
- keys = null;
}
/**
@@ -117,7 +102,7 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> extends GridFutureAda
if (log == null)
log = U.logger(cctx.kernalContext(), logRef, GridCacheQueryFutureAdapter.class);
- startTime = U.currentTimeMillis();
+ long startTime = U.currentTimeMillis();
long timeout = qry.query().timeout();
capacity = query().query().limit();
@@ -132,8 +117,6 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> extends GridFutureAda
cctx.time().addTimeoutObject(this);
}
-
- keys = qry.query().enableDedup() ? new HashSet<K>() : null;
}
/**
@@ -143,43 +126,62 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> extends GridFutureAda
return qry;
}
+ /** {@inheritDoc} */
+ @Override public IgniteLogger logger() {
+ return log;
+ }
+
/**
* @return If fields query.
*/
- boolean fields() {
+ public boolean fields() {
return false;
}
/** {@inheritDoc} */
- @Override public boolean onDone(Collection<R> res, Throwable err) {
- cctx.time().removeTimeoutObject(this);
-
- return super.onDone(res, err);
- }
-
- /** {@inheritDoc} */
@Override public R next() {
try {
- R next = unmaskNull(internalIterator().next());
+ if (!limitDisabled && cnt == capacity)
+ return null;
+
+ checkError();
+
+ R next = null;
- cnt.decrementAndGet();
+ if (reducer.hasNextX()) {
+ next = unmaskNull(reducer.nextX());
+
+ if (!limitDisabled) {
+ cnt++;
+
+ // Exceed limit, stop page loading and cancel queries.
+ if (cnt == capacity)
+ cancel();
+ }
+ }
return next;
}
- catch (NoSuchElementException ignored) {
- return null;
- }
catch (IgniteCheckedException e) {
throw CU.convertToCacheException(e);
}
}
/**
- * Waits for the first page to be received from remote node(s), if any.
+ * @return Cache context.
+ */
+ public GridCacheContext<K, V> cacheContext() {
+ return cctx;
+ }
+
+ /**
+ * TODO: IGNITE-15728 Provide custom reducer for ScanQueryFallbackClosableIterator.
+ *
+ * Waits for the first item is available to return.
*
* @throws IgniteCheckedException If query execution failed with an error.
*/
- public abstract void awaitFirstPage() throws IgniteCheckedException;
+ public abstract void awaitFirstItemAvailable() throws IgniteCheckedException;
/**
* @throws IgniteCheckedException If future is done with an error.
@@ -193,65 +195,8 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> extends GridFutureAda
}
/**
- * @return Iterator.
- * @throws IgniteCheckedException In case of error.
- */
- private Iterator<R> internalIterator() throws IgniteCheckedException {
- checkError();
-
- Iterator<R> it = null;
-
- while (it == null || !it.hasNext()) {
- Collection<R> c;
-
- synchronized (this) {
- it = iter;
-
- if (it != null && it.hasNext())
- break;
-
- c = queue.poll();
-
- if (c != null)
- it = iter = c.iterator();
-
- if (isDone() && queue.peek() == null)
- break;
- }
-
- if (c == null && !isDone()) {
- loadPage();
-
- long timeout = qry.query().timeout();
-
- long waitTime = timeout == 0 ? Long.MAX_VALUE : timeout - (U.currentTimeMillis() - startTime);
-
- if (waitTime <= 0) {
- it = Collections.<R>emptyList().iterator();
-
- break;
- }
-
- synchronized (this) {
- try {
- if (queue.isEmpty() && !isDone())
- wait(waitTime);
- }
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
-
- throw new IgniteCheckedException("Query was interrupted: " + qry, e);
- }
- }
- }
- }
-
- checkError();
-
- return it;
- }
-
- /**
+ * Callback that invoked in case of a node left cluster.
+ *
* @param evtNodeId Removed or failed node Id.
*/
protected void onNodeLeft(UUID evtNodeId) {
@@ -259,59 +204,14 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> extends GridFutureAda
}
/**
- * @param col Collection.
- */
- protected void enqueue(Collection<?> col) {
- assert Thread.holdsLock(this);
-
- if (limitDisabled) {
- queue.add((Collection<R>)col);
-
- cnt.addAndGet(col.size());
- }
- else {
- if (capacity >= col.size()) {
- queue.add((Collection<R>)col);
- capacity -= col.size();
-
- cnt.addAndGet(col.size());
- }
- else if (capacity > 0) {
- queue.add(new ArrayList<>((Collection<R>)col).subList(0, capacity));
- capacity = 0;
-
- cnt.addAndGet(capacity);
- }
- }
- }
-
- /**
- * @param col Query data collection.
- * @return If dedup flag is {@code true} deduplicated collection (considering keys), otherwise passed in collection
- * without any modifications.
- */
- private Collection<?> dedupIfRequired(Collection<?> col) {
- if (!qry.query().enableDedup())
- return col;
-
- Collection<Object> dedupCol = new ArrayList<>(col.size());
-
- synchronized (this) {
- for (Object o : col)
- if (!(o instanceof Map.Entry) || keys.add(((Map.Entry<K, V>)o).getKey()))
- dedupCol.add(o);
- }
-
- return dedupCol;
- }
-
- /**
+ * Entrypoint for handling query result page from remote node.
+ *
* @param nodeId Sender node.
* @param data Page data.
* @param err Error (if was).
- * @param finished Finished or not.
+ * @param lastPage Whether it is the last page for sender node.
*/
- public void onPage(@Nullable UUID nodeId, @Nullable Collection<?> data, @Nullable Throwable err, boolean finished) {
+ public void onPage(@Nullable UUID nodeId, @Nullable Collection<?> data, @Nullable Throwable err, boolean lastPage) {
if (isCancelled())
return;
@@ -320,71 +220,76 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> extends GridFutureAda
"nodeId", nodeId, false,
"data", data, true,
"err", err, false,
- "finished", finished, false));
+ "finished", lastPage, false));
try {
- if (err != null)
- synchronized (this) {
- enqueue(Collections.emptyList());
-
- if (err instanceof IgniteCheckedException)
- onDone(err);
- else
- onDone(new IgniteCheckedException(nodeId != null ?
- S.toString("Failed to execute query on node",
- "query", qry, true,
- "nodeId", nodeId, false) :
- S.toString("Failed to execute query locally",
- "query", qry, true),
- err));
-
- onPage(nodeId, true);
-
- notifyAll();
+ if (err != null) {
+ if (!(err instanceof IgniteCheckedException)) {
+ err = new IgniteCheckedException(nodeId != null ?
+ S.toString("Failed to execute query on node",
+ "query", qry, true,
+ "nodeId", nodeId, false) :
+ S.toString("Failed to execute query locally",
+ "query", qry, true),
+ err);
}
+
+ onError(err);
+ }
else {
if (data == null)
data = Collections.emptyList();
- data = dedupIfRequired((Collection<Object>)data);
+ if (qry.query().type() == GridCacheQueryType.TEXT) {
+ ArrayList unwrapped = new ArrayList();
- data = cctx.unwrapBinariesIfNeeded((Collection<Object>)data, qry.query().keepBinary());
+ for (Object o: data) {
+ ScoredCacheEntry e = (ScoredCacheEntry) o;
- synchronized (this) {
- enqueue(data);
+ Object uKey = CacheObjectUtils.unwrapBinary(
+ cctx.cacheObjectContext(), e.getKey(), qry.query().keepBinary(), true, null);
- if (onPage(nodeId, finished)) {
- onDone(/* data */);
+ Object uVal = CacheObjectUtils.unwrapBinary(
+ cctx.cacheObjectContext(), e.getValue(), qry.query().keepBinary(), true, null);
- clear();
+ if (uKey != e.getKey() || uVal != e.getValue())
+ unwrapped.add(new ScoredCacheEntry<>(uKey, uVal, e.score()));
+ else
+ unwrapped.add(o);
}
- notifyAll();
- }
+ data = unwrapped;
+
+ } else
+ data = cctx.unwrapBinariesIfNeeded((Collection<Object>)data, qry.query().keepBinary());
+
+ onPage(nodeId, (Collection<R>) data, lastPage);
+
+ if (isDone())
+ clear();
}
}
catch (Throwable e) {
- onPageError(nodeId, e);
+ onError(e);
if (e instanceof Error)
throw (Error)e;
}
}
- /**
- * @param nodeId Sender node id.
- * @param e Error.
- */
- private void onPageError(@Nullable UUID nodeId, Throwable e) {
- synchronized (this) {
- enqueue(Collections.emptyList());
+ /** Invokes in case of this future error. */
+ protected abstract void onError(Throwable err);
- onPage(nodeId, true);
+ /** Handles new data page from query node. */
+ protected abstract void onPage(UUID nodeId, Collection<R> data, boolean lastPage);
- onDone(e);
+ /** {@inheritDoc} */
+ @Override public boolean onDone(Collection<R> res, Throwable err) {
+ boolean done = super.onDone(res, err);
- notifyAll();
- }
+ cctx.time().removeTimeoutObject(this);
+
+ return done;
}
/**
@@ -423,49 +328,6 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> extends GridFutureAda
return obj != NULL ? obj : null;
}
- /** {@inheritDoc} */
- @Override public Collection<R> get() throws IgniteCheckedException {
- if (!isDone())
- loadAllPages();
-
- return super.get();
- }
-
- /** {@inheritDoc} */
- @Override public Collection<R> get(long timeout, TimeUnit unit) throws IgniteCheckedException {
- if (!isDone())
- loadAllPages();
-
- return super.get(timeout, unit);
- }
-
- /** {@inheritDoc} */
- @Override public Collection<R> getUninterruptibly() throws IgniteCheckedException {
- if (!isDone())
- loadAllPages();
-
- return super.getUninterruptibly();
- }
-
- /**
- * @param nodeId Sender node id.
- * @param last Whether page is last.
- * @return Is query finished or not.
- */
- protected abstract boolean onPage(UUID nodeId, boolean last);
-
- /**
- * Loads next page.
- */
- protected abstract void loadPage();
-
- /**
- * Loads all left pages.
- *
- * @throws IgniteInterruptedCheckedException If thread is interrupted.
- */
- protected abstract void loadAllPages() throws IgniteInterruptedCheckedException;
-
/**
* Clears future.
*/
@@ -476,7 +338,7 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> extends GridFutureAda
/** {@inheritDoc} */
@Override public boolean cancel() throws IgniteCheckedException {
if (onCancelled()) {
- cancelQuery();
+ cancelQuery(new IgniteCheckedException("Query was cancelled."));
return true;
}
@@ -485,9 +347,12 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> extends GridFutureAda
}
/**
+ * Cancels query on remote nodes and cleanes owned resources.
+ *
+ * @param err If query was cancelled with error.
* @throws IgniteCheckedException In case of error.
*/
- protected abstract void cancelQuery() throws IgniteCheckedException;
+ protected abstract void cancelQuery(Throwable err) throws IgniteCheckedException;
/** {@inheritDoc} */
@Override public IgniteUuid timeoutId() {
@@ -501,14 +366,7 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> extends GridFutureAda
/** {@inheritDoc} */
@Override public void onTimeout() {
- try {
- cancelQuery();
-
- onDone(new IgniteFutureTimeoutCheckedException("Query timed out."));
- }
- catch (IgniteCheckedException e) {
- onDone(e);
- }
+ onError(new IgniteFutureTimeoutCheckedException("Query timed out."));
}
/** {@inheritDoc} */
@@ -520,14 +378,4 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> extends GridFutureAda
@Override public String toString() {
return S.toString(GridCacheQueryFutureAdapter.class, this);
}
-
- /**
- *
- */
- public void printMemoryStats() {
- X.println(">>> Query future memory statistics.");
- X.println(">>> queueSize: " + queue.size());
- X.println(">>> keysSize: " + keys.size());
- X.println(">>> cnt: " + cnt);
- }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index 3190fd0..d02316b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -319,10 +319,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
cctx.events().removeListener(lsnr);
- if (cancel)
- onCancelAtStop();
- else
- onWaitAtStop();
+ onCancelAtStop();
}
/**
@@ -367,13 +364,6 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
}
/**
- * Wait flag handler at stop.
- */
- void onWaitAtStop() {
- // No-op.
- }
-
- /**
* Processes cache query request.
*
* @param sndId Sender node id.
@@ -515,16 +505,6 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
Collection<ClusterNode> nodes) throws IgniteCheckedException;
/**
- * Loads page.
- *
- * @param id Query ID.
- * @param qry Query.
- * @param nodes Nodes.
- * @param all Whether to load all pages.
- */
- public abstract void loadPage(long id, GridCacheQueryAdapter<?> qry, Collection<ClusterNode> nodes, boolean all);
-
- /**
* Executes distributed fields query.
*
* @param qry Query.
@@ -1224,7 +1204,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
final K key = row.getKey();
- V val = row.getValue();
+ final V val = row.getValue();
if (log.isDebugEnabled()) {
ClusterNode primaryNode = cctx.affinity().primaryByKey(key,
@@ -1324,8 +1304,13 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
else
continue;
}
- else
- data.add(new T2<>(key, val));
+ else {
+ if (type == TEXT)
+ // (K, V, score). Value transfers as BinaryObject.
+ data.add(row0);
+ else
+ data.add(new T2<>(key, val));
+ }
}
if (!loc) {
@@ -2851,11 +2836,12 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
* @param clsName Query class name.
* @param search Search clause.
* @param limit Limits response records count. If 0 or less, considered to be no limit.
+ * @param pageSize Query page size.
* @param keepBinary Keep binary flag.
* @return Created query.
*/
public CacheQuery<Map.Entry<K, V>> createFullTextQuery(String clsName,
- String search, int limit, boolean keepBinary) {
+ String search, int limit, int pageSize, boolean keepBinary) {
A.notNull("clsName", clsName);
A.notNull("search", search);
@@ -2867,7 +2853,9 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
null,
false,
keepBinary,
- null).limit(limit);
+ null)
+ .limit(limit)
+ .pageSize(pageSize);
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
index 3d779f4..0912db3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
@@ -162,6 +162,85 @@ public class GridCacheQueryRequest extends GridCacheIdMessage implements GridCac
}
/**
+ * Send initial query request to specified nodes.
+ *
+ * @param reqId Request (cache query) ID.
+ * @param fut Cache query future, contains query info.
+ */
+ public static GridCacheQueryRequest startQueryRequest(GridCacheContext<?, ?> cctx, long reqId,
+ GridCacheDistributedQueryFuture<?, ?, ?> fut) {
+ GridCacheQueryBean bean = fut.query();
+ GridCacheQueryAdapter<?> qry = bean.query();
+
+ boolean deployFilterOrTransformer = (qry.scanFilter() != null || qry.transform() != null)
+ && cctx.gridDeploy().enabled();
+
+ return new GridCacheQueryRequest(
+ cctx.cacheId(),
+ reqId,
+ cctx.name(),
+ qry.type(),
+ fut.fields(),
+ qry.clause(),
+ qry.idxQryDesc(),
+ qry.limit(),
+ qry.queryClassName(),
+ qry.scanFilter(),
+ qry.partition(),
+ bean.reducer(),
+ qry.transform(),
+ qry.pageSize(),
+ qry.includeBackups(),
+ bean.arguments(),
+ qry.includeMetadata(),
+ qry.keepBinary(),
+ qry.taskHash(),
+ cctx.startTopologyVersion(),
+ qry.mvccSnapshot(),
+ // Force deployment anyway if scan query is used.
+ cctx.deploymentEnabled() || deployFilterOrTransformer,
+ qry.isDataPageScanEnabled());
+ }
+
+ /**
+ * Send request for fetching query result pages to specified nodes.
+ *
+ * @param reqId Request (cache query) ID.
+ */
+ public static GridCacheQueryRequest pageRequest(GridCacheContext<?, ?> cctx, long reqId,
+ GridCacheQueryAdapter<?> qry, boolean fields) {
+
+ return new GridCacheQueryRequest(
+ cctx.cacheId(),
+ reqId,
+ cctx.name(),
+ qry.pageSize(),
+ qry.includeBackups(),
+ fields,
+ false,
+ qry.keepBinary(),
+ qry.taskHash(),
+ cctx.startTopologyVersion(),
+ // Force deployment anyway if scan query is used.
+ cctx.deploymentEnabled() || (qry.scanFilter() != null && cctx.gridDeploy().enabled()),
+ qry.isDataPageScanEnabled());
+ }
+
+ /**
+ * Send cancel query request, so no new pages will be sent.
+ *
+ * @param reqId Query request ID.
+ * @param fieldsQry Whether query is a fields query.
+ */
+ public static GridCacheQueryRequest cancelRequest(GridCacheContext<?, ?> cctx, long reqId, boolean fieldsQry) {
+ return new GridCacheQueryRequest(cctx.cacheId(),
+ reqId,
+ fieldsQry,
+ cctx.startTopologyVersion(),
+ cctx.deploymentEnabled());
+ }
+
+ /**
* Creates cancel query request.
*
* @param cacheId Cache ID.
@@ -170,7 +249,7 @@ public class GridCacheQueryRequest extends GridCacheIdMessage implements GridCac
* @param topVer Topology version.
* @param addDepInfo Deployment info flag.
*/
- public GridCacheQueryRequest(
+ private GridCacheQueryRequest(
int cacheId,
long id,
boolean fields,
@@ -202,7 +281,7 @@ public class GridCacheQueryRequest extends GridCacheIdMessage implements GridCac
* @param addDepInfo Deployment info flag.
* @param dataPageScanEnabled Flag to enable data page scan.
*/
- public GridCacheQueryRequest(
+ private GridCacheQueryRequest(
int cacheId,
long id,
String cacheName,
@@ -256,7 +335,7 @@ public class GridCacheQueryRequest extends GridCacheIdMessage implements GridCac
* @param mvccSnapshot Mvcc snapshot.
* @param addDepInfo Deployment info flag.
*/
- public GridCacheQueryRequest(
+ private GridCacheQueryRequest(
int cacheId,
long id,
String cacheName,
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/ScoredCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/ScoredCacheEntry.java
new file mode 100644
index 0000000..e498c92
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/ScoredCacheEntry.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteBiTuple;
+
+/** Represents cache key-value pair and score to compare cache entry by custom rule. */
+public class ScoredCacheEntry<K, V> extends IgniteBiTuple<K, V> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private float score;
+
+ /** */
+ public ScoredCacheEntry() {}
+
+ /** */
+ public ScoredCacheEntry(K key, V val, float score) {
+ super(key, val);
+
+ this.score = score;
+ }
+
+ /** */
+ public float score() {
+ return score;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ super.writeExternal(out);
+
+ out.writeObject(score);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ super.readExternal(in);
+
+ score = (float)in.readObject();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(ScoredCacheEntry.class, this);
+ }
+}
+
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/CacheQueryReducer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/CacheQueryReducer.java
new file mode 100644
index 0000000..abbe949
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/CacheQueryReducer.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.reducer;
+
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.util.lang.GridIteratorAdapter;
+
+/**
+ * This abstract class is base class for cache query reducers. They are responsible for reducing results of cache query.
+ *
+ * <T> is a type of cache query result item.
+ */
+public abstract class CacheQueryReducer<T> extends GridIteratorAdapter<T> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Page streams collection. */
+ protected final Map<UUID, NodePageStream<T>> pageStreams;
+
+ /** */
+ protected CacheQueryReducer(final Map<UUID, NodePageStream<T>> pageStreams) {
+ this.pageStreams = pageStreams;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void removeX() throws IgniteCheckedException {
+ throw new UnsupportedOperationException("CacheQueryReducer doesn't support removing items.");
+ }
+
+ /**
+ * @return Page with query results data from specified stream.
+ */
+ public static <T> NodePage<T> get(CompletableFuture<?> pageFut) throws IgniteCheckedException {
+ try {
+ return (NodePage<T>) pageFut.get();
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+
+ throw new IgniteCheckedException("Query was interrupted.", e);
+ }
+ catch (ExecutionException e) {
+ Throwable t = e.getCause();
+
+ if (t instanceof IgniteCheckedException)
+ throw (IgniteCheckedException) t;
+
+ throw new IgniteCheckedException("Page future was completed with unexpected error.", e);
+ }
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/MergeSortCacheQueryReducer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/MergeSortCacheQueryReducer.java
new file mode 100644
index 0000000..d2b3a3b
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/MergeSortCacheQueryReducer.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.reducer;
+
+import java.util.Comparator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.PriorityQueue;
+import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.cache.query.ScoredCacheEntry;
+
+/**
+ * Reducer of cache query results that sort result through all nodes. Note that it's assumed that every node
+ * returns pre-sorted collection of data.
+ */
+public class MergeSortCacheQueryReducer<R> extends CacheQueryReducer<R> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Queue of pages from all nodes. Order of streams controls with order of head items of pages. */
+ private PriorityQueue<NodePage<R>> nodePages;
+
+ /**
+ * Page are iterated within the {@link #nextX()} method. In case a page is done, the corresponding stream is
+ * asked for new page. We have to wait it in {@link #hasNextX()}.
+ */
+ private UUID pendingNodeId;
+
+ /** */
+ public MergeSortCacheQueryReducer(final Map<UUID, NodePageStream<R>> pageStreams) {
+ super(pageStreams);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean hasNextX() throws IgniteCheckedException {
+ // Initial sort.
+ if (nodePages == null) {
+ // Compares head pages from all nodes to get the lowest value at the moment.
+ Comparator<NodePage<R>> pageCmp = (o1, o2) -> textResultComparator.compare(
+ (ScoredCacheEntry<?, ?>)o1.head(), (ScoredCacheEntry<?, ?>)o2.head());
+
+ nodePages = new PriorityQueue<>(pageStreams.size(), pageCmp);
+
+ for (NodePageStream<R> s : pageStreams.values()) {
+ NodePage<R> p = get(s.headPage());
+
+ if (p == null || !p.hasNext())
+ continue;
+
+ nodePages.add(p);
+ }
+ }
+
+ if (pendingNodeId != null) {
+ NodePageStream<R> stream = pageStreams.get(pendingNodeId);
+
+ if (!stream.closed()) {
+ NodePage<R> p = get(stream.headPage());
+
+ if (p != null && p.hasNext())
+ nodePages.add(p);
+ }
+ }
+
+ return !nodePages.isEmpty();
+ }
+
+ /** {@inheritDoc} */
+ @Override public R nextX() throws IgniteCheckedException {
+ if (nodePages.isEmpty())
+ throw new NoSuchElementException("No next element. Please, be sure to invoke hasNext() before next().");
+
+ NodePage<R> page = nodePages.poll();
+
+ R o = page.next();
+
+ if (page.hasNext())
+ nodePages.offer(page);
+ else if (!pageStreams.get(page.nodeId()).closed())
+ pendingNodeId = page.nodeId();
+
+ return o;
+ }
+
+ /** Compares rows for {@code TextQuery} results for ordering results in MergeSort reducer. */
+ private static final Comparator<ScoredCacheEntry<?, ?>> textResultComparator = (c1, c2) ->
+ Float.compare(c2.score(), c1.score());
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/NodePage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/NodePage.java
new file mode 100644
index 0000000..8336fd9
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/NodePage.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.reducer;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.UUID;
+
+/**
+ * Represents a single page with results of cache query from single node.
+ */
+public class NodePage<R> implements Iterator<R> {
+ /** Iterator over data in this page. */
+ private final Iterator<R> pageIter;
+
+ /** Node ID. */
+ private final UUID nodeId;
+
+ /** Head of page. Requires {@link #hasNext()} to be called on {@link #pageIter} before. */
+ private R head;
+
+ /**
+ * @param nodeId Node ID.
+ * @param data Page data.
+ */
+ public NodePage(UUID nodeId, Collection<R> data) {
+ pageIter = data.iterator();
+ this.nodeId = nodeId;
+ }
+
+ /** */
+ public R head() {
+ return head;
+ }
+
+ /** */
+ public UUID nodeId() {
+ return nodeId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean hasNext() {
+ if (head != null)
+ return true;
+ else {
+ if (!pageIter.hasNext())
+ return false;
+ else
+ return (head = pageIter.next()) != null;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public R next() {
+ if (head == null)
+ throw new NoSuchElementException("Node page is empty.");
+
+ R o = head;
+
+ head = null;
+
+ return o;
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/NodePageStream.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/NodePageStream.java
new file mode 100644
index 0000000..b6cfb08
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/NodePageStream.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.reducer;
+
+import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * This class provides an interface {@link #headPage()} that returns a future will be completed with {@link NodePage}
+ * of cache query result from single node. A new page requests when previous page was fetched by class consumer.
+ */
+public class NodePageStream<R> {
+ /** Node ID to stream pages. */
+ private final UUID nodeId;
+
+ /** Request pages action. */
+ private final Runnable reqPages;
+
+ /** Cancel remote pages action. */
+ private final Runnable cancelPages;
+
+ /** Flags shows whether there are available pages on a query node. */
+ private boolean hasRemotePages = true;
+
+ /** Promise to notify the stream consumer about delivering new page. */
+ private CompletableFuture<NodePage<R>> head = new CompletableFuture<>();
+
+ /** */
+ public NodePageStream(UUID nodeId, Runnable reqPages, Runnable cancelPages) {
+ this.nodeId = nodeId;
+ this.reqPages = reqPages;
+ this.cancelPages = cancelPages;
+ }
+
+ /** */
+ public UUID nodeId() {
+ return nodeId;
+ }
+
+ /**
+ * Returns a last delivered page from this stream.
+ *
+ * @return Future that will be completed with query result page.
+ */
+ public synchronized CompletableFuture<NodePage<R>> headPage() {
+ return head;
+ }
+
+ /**
+ * Add new query result page of data.
+ *
+ * @param data Collection of query result items.
+ * @param last Whether it is the last page from this node.
+ */
+ public synchronized void addPage(Collection<R> data, boolean last) {
+ head.complete(new NodePage<R>(nodeId, data) {
+ /** Flag shows whether the request for new page was triggered. */
+ private boolean reqNext;
+
+ /** {@inheritDoc} */
+ @Override public boolean hasNext() {
+ if (!reqNext) {
+ synchronized (NodePageStream.this) {
+ if (hasRemotePages) {
+ head = new CompletableFuture<>();
+
+ reqPages.run();
+ }
+ else
+ head = null;
+ }
+
+ reqNext = true;
+ }
+
+ return super.hasNext();
+ }
+ });
+
+ if (last)
+ hasRemotePages = false;
+ }
+
+ /**
+ * Cancel query on all nodes.
+ */
+ public synchronized void cancel(Throwable err) {
+ if (closed())
+ return;
+
+ head.completeExceptionally(err);
+
+ cancelPages.run();
+
+ hasRemotePages = false;
+ }
+
+ /**
+ * @return {@code true} if there are some undelivered page from the node, otherwise {@code false}.
+ */
+ public synchronized boolean hasRemotePages() {
+ return hasRemotePages;
+ }
+
+ /**
+ * @return {@code true} if this stream delivers all query results from the node to a consumer.
+ */
+ public synchronized boolean closed() {
+ return !hasRemotePages && (head == null);
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/UnsortedCacheQueryReducer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/UnsortedCacheQueryReducer.java
new file mode 100644
index 0000000..9c50b2c
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/UnsortedCacheQueryReducer.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.reducer;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.IgniteCheckedException;
+
+/**
+ * Reducer of cache query results, no ordering of results is provided.
+ */
+public class UnsortedCacheQueryReducer<R> extends CacheQueryReducer<R> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Current page to return data to user. */
+ private NodePage<R> page;
+
+ /** Pending futures for requeseted pages. */
+ private final CompletableFuture<NodePage<R>>[] futs;
+
+ /** */
+ public UnsortedCacheQueryReducer(Map<UUID, NodePageStream<R>> pageStreams) {
+ super(pageStreams);
+
+ futs = new CompletableFuture[pageStreams.size()];
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean hasNextX() throws IgniteCheckedException {
+ while (page == null || !page.hasNext()) {
+ int pendingNodesCnt = 0;
+
+ for (NodePageStream<R> s: pageStreams.values()) {
+ if (s.closed())
+ continue;
+
+ CompletableFuture<NodePage<R>> f = s.headPage();
+
+ if (f.isDone()) {
+ page = get(f);
+
+ if (page.hasNext())
+ return true;
+ } else
+ futs[pendingNodesCnt++] = f;
+ }
+
+ if (pendingNodesCnt == 0)
+ return false;
+
+ CompletableFuture[] pendingFuts = Arrays.copyOf(futs, pendingNodesCnt);
+
+ Arrays.fill(futs, 0, pendingNodesCnt, null);
+
+ page = get(CompletableFuture.anyOf(pendingFuts));
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public R nextX() throws IgniteCheckedException {
+ return page.next();
+ }
+}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneIndex.java
index a173785..4a183d5 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneIndex.java
@@ -24,6 +24,7 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.CacheObjectContext;
+import org.apache.ignite.internal.processors.cache.query.ScoredCacheEntry;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.query.GridQueryIndexDescriptor;
import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
@@ -358,7 +359,6 @@ public class GridLuceneIndex implements AutoCloseable {
* @return Object.
* @throws IgniteCheckedException If failed.
*/
- @SuppressWarnings("unchecked")
private <Z> Z unmarshall(byte[] bytes, ClassLoader ldr) throws IgniteCheckedException {
if (coctx == null) // For tests.
return (Z)JdbcUtils.deserialize(bytes, null);
@@ -371,15 +371,18 @@ public class GridLuceneIndex implements AutoCloseable {
*
* @throws IgniteCheckedException If failed.
*/
- @SuppressWarnings("unchecked")
private void findNext() throws IgniteCheckedException {
curr = null;
while (idx < docs.length) {
Document doc;
+ float score;
try {
- doc = searcher.doc(docs[idx++].doc);
+ doc = searcher.doc(docs[idx].doc);
+ score = docs[idx].score;
+
+ idx++;
}
catch (IOException e) {
throw new IgniteCheckedException(e);
@@ -401,7 +404,7 @@ public class GridLuceneIndex implements AutoCloseable {
assert v != null;
- curr = new IgniteBiTuple<>(k, v);
+ curr = new ScoredCacheEntry(k, v, score);
break;
}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheFullTextQueryAbstractTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheFullTextQueryAbstractTest.java
new file mode 100644
index 0000000..3faba1b
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheFullTextQueryAbstractTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.query.annotations.QueryTextField;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/** */
+public class GridCacheFullTextQueryAbstractTest extends GridCommonAbstractTest {
+ /** Cache name */
+ protected static final String PERSON_CACHE = "Person";
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ CacheConfiguration<Integer, Person> cacheCfg = defaultCacheConfiguration();
+
+ cacheCfg.setName(PERSON_CACHE)
+ .setIndexedTypes(Integer.class, Person.class);
+
+ cfg.setCacheConfiguration(cacheCfg);
+
+ return cfg;
+ }
+
+ /** */
+ protected IgniteCache<Integer, Person> cache() {
+ return grid(0).cache(PERSON_CACHE);
+ }
+
+ /** */
+ protected static class Person {
+ /** */
+ @QueryTextField
+ @GridToStringInclude
+ final String name;
+
+ /** */
+ public Person(String name) {
+ this.name = name;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(Person.class, this);
+ }
+ }
+}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheFullTextQueryFailoverTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheFullTextQueryFailoverTest.java
new file mode 100644
index 0000000..ca8e366
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheFullTextQueryFailoverTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import javax.cache.Cache;
+import javax.cache.CacheException;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cache.query.TextQuery;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Test;
+
+/** */
+public class GridCacheFullTextQueryFailoverTest extends GridCacheFullTextQueryAbstractTest {
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ IgniteCache<Integer, Person> cache = startGrids(2).cache(PERSON_CACHE);
+
+ for (int i = 0; i < 100; i++)
+ cache.put(i, new Person("str" + i));
+ }
+
+ /** */
+ @Test
+ public void testStopNodeDuringQuery() throws Exception {
+ TextQuery<Integer, Person> qry = new TextQuery<Integer, Person>(Person.class, "str~")
+ .setPageSize(10);
+
+ Iterator<Cache.Entry<Integer, Person>> iter = cache().query(qry).iterator();
+
+ // Initialize internal structures.
+ iter.next();
+
+ stopGrid(1);
+
+ awaitPartitionMapExchange();
+
+ GridTestUtils.assertThrows(log, iter::hasNext, CacheException.class, "Remote node has left topology");
+ }
+
+ /** */
+ @Test
+ public void testCancelQuery() {
+ TextQuery<Integer, Person> qry = new TextQuery<Integer, Person>(Person.class, "str~")
+ .setPageSize(10);
+
+ QueryCursor<Cache.Entry<Integer, Person>> cursor = cache().query(qry);
+
+ Iterator<Cache.Entry<Integer, Person>> iter = cursor.iterator();
+
+ // Initialize internal structures.
+ iter.next();
+
+ cursor.close();
+
+ assertFalse(iter.hasNext());
+
+ GridTestUtils.assertThrows(log, iter::next, NoSuchElementException.class, null);
+ }
+}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheFullTextQueryLimitTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheFullTextQueryLimitTest.java
new file mode 100644
index 0000000..8e30364
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheFullTextQueryLimitTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import javax.cache.Cache;
+import org.apache.ignite.cache.query.TextQuery;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/** */
+@RunWith(Parameterized.class)
+public class GridCacheFullTextQueryLimitTest extends GridCacheFullTextQueryAbstractTest {
+ /** Cache size. */
+ private static final int MAX_ITEM_PER_NODE_COUNT = 100;
+
+ /** Number of nodes. */
+ @Parameterized.Parameter
+ public int nodesCnt;
+
+ /** */
+ @Parameterized.Parameters(name = "nodesCnt={0}")
+ public static Iterable<Object[]> params() {
+ List<Object[]> params = new ArrayList<>();
+
+ for (int i = 1; i <= 8; i++) {
+ Object[] p = new Object[1];
+ p[0] = i;
+
+ params.add(p);
+ }
+
+ return params;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids();
+ }
+
+ /** */
+ @Test
+ public void testResultOrderedByScore() throws Exception {
+ startGrids(nodesCnt);
+
+ int items = MAX_ITEM_PER_NODE_COUNT * nodesCnt;
+
+ int helloPivot = new Random().nextInt(items);
+
+ for (int i = 0; i < items; i++) {
+ String name = i == helloPivot ? "hello" : String.format("hello%02d", i);
+
+ cache().put(i, new Person(name));
+ }
+
+ // Extract all data that matches even little.
+ for (int limit = 1; limit <= items; limit++) {
+ TextQuery<Integer, Person> qry = new TextQuery<Integer, Person>(Person.class, "hello~")
+ .setLimit(limit);
+
+ List<Cache.Entry<Integer, Person>> result = cache().query(qry).getAll();
+
+ // Lucene returns top 50 results by query from single node even if limit is higher.
+ // Number of docs depends on amount of data on every node.
+ if (limit <= 50)
+ assertEquals(limit, result.size());
+ else
+ assertTrue(limit >= result.size());
+
+ // hello has to be on the top.
+ assertEquals("Limit=" + limit, "hello", result.get(0).getValue().name);
+ }
+ }
+}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheFullTextQueryMultithreadedSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheFullTextQueryMultithreadedSelfTest.java
index 862a87e..0b83ef2 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheFullTextQueryMultithreadedSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheFullTextQueryMultithreadedSelfTest.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.cache.query.Query;
import org.apache.ignite.cache.query.annotations.QueryTextField;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
@@ -96,7 +97,7 @@ public class GridCacheFullTextQueryMultithreadedSelfTest extends GridCacheAbstra
// Create query.
final CacheQuery<Map.Entry<Integer, H2TextValue>> qry = c.context().queries().createFullTextQuery(
- H2TextValue.class.getSimpleName(), txt, limit, false);
+ H2TextValue.class.getSimpleName(), txt, limit, Query.DFLT_PAGE_SIZE, false);
qry.enableDedup(false);
qry.includeBackups(false);
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheFullTextQueryPagesTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheFullTextQueryPagesTest.java
new file mode 100644
index 0000000..ad0354d
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheFullTextQueryPagesTest.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import javax.cache.Cache;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cache.query.TextQuery;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.query.GridCacheQueryRequest;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.junit.Test;
+
+/**
+ * Test pages loading for text queries tests.
+ */
+public class GridCacheFullTextQueryPagesTest extends GridCacheFullTextQueryAbstractTest {
+ /** Cache size. */
+ private static final int MAX_ITEM_COUNT = 10_000;
+
+ /** Nodes count. */
+ private static final int NODES = 4;
+
+ /** Query page size. */
+ private static final int PAGE_SIZE = 100;
+
+ /** Limitation to query response size */
+ private static final int QUERY_LIMIT = 100;
+
+ /** Client node to start query. */
+ private static IgniteEx client;
+
+ /** */
+ private static TestRecordingCommunicationSpi spi;
+
+ /** */
+ private static int dataCnt;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String instanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(instanceName);
+
+ cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ startGrids(NODES);
+
+ client = startClientGrid();
+
+ populateCache(client, MAX_ITEM_COUNT, (IgnitePredicate<Integer>)x -> String.valueOf(x).startsWith("1"));
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ spi = TestRecordingCommunicationSpi.spi(client);
+
+ spi.record((n, m) -> m instanceof GridCacheQueryRequest);
+ }
+
+ /** Test that there is no cancel queries and number of requests corresponds to count of data rows on remote nodes. */
+ @Test
+ public void testTextQueryMultiplePagesNoLimit() {
+ checkTextQuery("1*", 0, PAGE_SIZE);
+
+ checkPages(4, 8, 0);
+ }
+
+ /** Test that do not send cache page request after limit exceeded. */
+ @Test
+ public void testTextQueryLimitedMultiplePages() {
+ checkTextQuery("1*", QUERY_LIMIT, 30);
+
+ checkPages(4, 7, 3);
+ }
+
+ /** Test that rerequest some pages but then send a cancel query after limit exceeded. */
+ @Test
+ public void testTextQueryHighLimitedMultiplePages() {
+ checkTextQuery("1*", QUERY_LIMIT, 20);
+
+ checkPages(4, 8, 3);
+ }
+
+ /** */
+ private void checkPages(int expInitCnt, int expLoadCnt, int expCancelCnt) {
+ List<Object> messages = spi.recordedMessages(true);
+
+ for (Object msg: messages) {
+ GridCacheQueryRequest req = (GridCacheQueryRequest) msg;
+
+ if (req.cancel())
+ expCancelCnt--;
+ else if (req.clause() != null)
+ expInitCnt--;
+ else
+ expLoadCnt--;
+ }
+
+ assertEquals(0, expInitCnt);
+ assertEquals(0, expLoadCnt);
+ assertEquals(0, expCancelCnt);
+ }
+
+ /**
+ * Fill cache.
+ *
+ * @throws IgniteCheckedException if failed.
+ */
+ private void populateCache(IgniteEx ignite, int cnt, IgnitePredicate<Integer> expectedEntryFilter) throws IgniteCheckedException {
+ IgniteInternalCache<Integer, Person> cache = ignite.cachex(PERSON_CACHE);
+
+ Affinity<Integer> aff = cache.affinity();
+
+ Map<UUID, Integer> nodeToCnt = new HashMap<>();
+
+ Set<String> vals = new HashSet<>();
+
+ for (int i = 0; i < cnt; i++) {
+ cache.put(i, new Person(String.valueOf(i)));
+
+ if (expectedEntryFilter.apply(i)) {
+ vals.add(String.valueOf(i));
+
+ UUID nodeId = aff.mapKeyToNode(i).id();
+
+ if (nodeId.equals(ignite.localNode().id()))
+ continue;
+
+ nodeToCnt.putIfAbsent(nodeId, 0);
+
+ nodeToCnt.compute(nodeId, (k, v) -> v + 1);
+ }
+ }
+
+ dataCnt = vals.size();
+ }
+
+ /**
+ * @param clause Query clause.
+ * @param limit limits response size.
+ */
+ private void checkTextQuery(String clause, int limit, int pageSize) {
+ TextQuery<Integer, Person> qry = new TextQuery<Integer, Person>(Person.class, clause)
+ .setLimit(limit).setPageSize(pageSize);
+
+ IgniteCache<Integer, Person> cache = client.cache(PERSON_CACHE);
+
+ List<Cache.Entry<Integer, Person>> result = cache.query(qry).getAll();
+
+ int expRes = qry.getLimit() == 0 ? dataCnt : qry.getLimit();
+
+ assertEquals(expRes, result.size());
+ }
+}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
index 3ee65f3..2518d8a 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
@@ -48,7 +48,10 @@ import org.apache.ignite.internal.processors.cache.DdlTransactionSelfTest;
import org.apache.ignite.internal.processors.cache.GridCacheCrossCacheQuerySelfTest;
import org.apache.ignite.internal.processors.cache.GridCacheDynamicLoadOnClientPersistentTest;
import org.apache.ignite.internal.processors.cache.GridCacheDynamicLoadOnClientTest;
+import org.apache.ignite.internal.processors.cache.GridCacheFullTextQueryFailoverTest;
+import org.apache.ignite.internal.processors.cache.GridCacheFullTextQueryLimitTest;
import org.apache.ignite.internal.processors.cache.GridCacheFullTextQueryMultithreadedSelfTest;
+import org.apache.ignite.internal.processors.cache.GridCacheFullTextQueryPagesTest;
import org.apache.ignite.internal.processors.cache.GridCacheFullTextQuerySelfTest;
import org.apache.ignite.internal.processors.cache.GridCacheLazyQueryPartitionsReleaseTest;
import org.apache.ignite.internal.processors.cache.GridCacheQueryIndexDisabledSelfTest;
@@ -495,8 +498,11 @@ import org.junit.runners.Suite;
CacheQueryEvictDataLostTest.class,
// Full text queries.
+ GridCacheFullTextQueryFailoverTest.class,
GridCacheFullTextQuerySelfTest.class,
GridCacheFullTextQueryMultithreadedSelfTest.class,
+ GridCacheFullTextQueryPagesTest.class,
+ GridCacheFullTextQueryLimitTest.class,
IgniteCacheFullTextQueryNodeJoiningSelfTest.class,
// Ignite cache and H2 comparison.
diff --git a/modules/indexing/src/test/java/org/apache/ignite/util/KillCommandsTests.java b/modules/indexing/src/test/java/org/apache/ignite/util/KillCommandsTests.java
index 7e5450d..ccaf3f8 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/util/KillCommandsTests.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/util/KillCommandsTests.java
@@ -143,15 +143,16 @@ class KillCommandsTests {
// Cancel first query.
qryCanceler.accept(qryInfo);
- // Fetch all cached entries. It's size equal to the {@code PAGE_SZ * NODES_CNT}.
- for (int i = 0; i < PAGE_SZ * srvs.size() - 1; i++)
- assertNotNull(iter1.next());
+ // Fetch of the next page should throw the exception. New page is delivered in parallel to iterating.
+ assertThrowsWithCause(() -> {
+ for (int i = 0; i < PAGE_SZ * PAGES_CNT - 1; i++)
+ assertNotNull(iter1.next());
- // Fetch of the next page should throw the exception.
- assertThrowsWithCause(iter1::next, IgniteCheckedException.class);
+ return null;
+ }, IgniteCheckedException.class);
// Checking that second query works fine after canceling first.
- for (int i = 0; i < PAGE_SZ * PAGE_SZ - 1; i++)
+ for (int i = 0; i < PAGE_SZ * PAGES_CNT - 1; i++)
assertNotNull(iter2.next());
checkScanQueryResources(cli, srvs, qryInfo.get3());