You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/08/17 15:24:44 UTC
ignite git commit: IGNITE-5991: SQL: Lazy query execution. This
closes #2437.
Repository: ignite
Updated Branches:
refs/heads/master 15710a869 -> 136075ae0
IGNITE-5991: SQL: Lazy query execution. This closes #2437.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/136075ae
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/136075ae
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/136075ae
Branch: refs/heads/master
Commit: 136075ae0f7070999dec6913afc8cef1a26eb307
Parents: 15710a8
Author: devozerov <vo...@gridgain.com>
Authored: Thu Aug 17 18:24:34 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Thu Aug 17 18:24:34 2017 +0300
----------------------------------------------------------------------
.../ignite/cache/query/SqlFieldsQuery.java | 38 +-
.../processors/query/h2/IgniteH2Indexing.java | 57 ++-
.../query/h2/twostep/GridMapQueryExecutor.java | 237 ++++++++---
.../h2/twostep/GridReduceQueryExecutor.java | 7 +-
.../query/h2/twostep/MapNodeResults.java | 19 +-
.../query/h2/twostep/MapQueryLazyWorker.java | 176 +++++++++
.../query/h2/twostep/MapQueryLazyWorkerKey.java | 97 +++++
.../query/h2/twostep/MapQueryResult.java | 46 ++-
.../query/h2/twostep/MapQueryResults.java | 26 +-
.../query/h2/twostep/MapRequestKey.java | 23 +-
.../h2/twostep/msg/GridH2QueryRequest.java | 9 +
.../processors/query/LazyQuerySelfTest.java | 389 +++++++++++++++++++
.../IgniteCacheQuerySelfTestSuite.java | 2 +
13 files changed, 1041 insertions(+), 85 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/136075ae/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java
index 2838fe3..54f8396 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java
@@ -71,6 +71,9 @@ public class SqlFieldsQuery extends Query<List<?>> {
/** */
private boolean replicatedOnly;
+ /** */
+ private boolean lazy;
+
/** Partitions for query */
private int[] parts;
@@ -230,7 +233,7 @@ public class SqlFieldsQuery extends Query<List<?>> {
/**
* Check if distributed joins are enabled for this query.
*
- * @return {@code true} If distributed joind enabled.
+ * @return {@code true} If distributed joins enabled.
*/
public boolean isDistributedJoins() {
return distributedJoins;
@@ -269,6 +272,39 @@ public class SqlFieldsQuery extends Query<List<?>> {
}
/**
+ * Sets lazy query execution flag.
+ * <p>
+ * By default Ignite attempts to fetch the whole query result set to memory and send it to the client. For small
+ * and medium result sets this provides optimal performance and minimize duration of internal database locks, thus
+ * increasing concurrency.
+ * <p>
+ * If result set is too big to fit in available memory this could lead to excessive GC pauses and even
+ * OutOfMemoryError. Use this flag as a hint for Ignite to fetch result set lazily, thus minimizing memory
+ * consumption at the cost of moderate performance hit.
+ * <p>
+ * Defaults to {@code false}, meaning that the whole result set is fetched to memory eagerly.
+ *
+ * @param lazy Lazy query execution flag.
+ * @return {@code this} For chaining.
+ */
+ public SqlFieldsQuery setLazy(boolean lazy) {
+ this.lazy = lazy;
+
+ return this;
+ }
+
+ /**
+ * Gets lazy query execution flag.
+ * <p>
+ * See {@link #setLazy(boolean)} for more information.
+ *
+ * @return Lazy flag.
+ */
+ public boolean isLazy() {
+ return lazy;
+ }
+
+ /**
* Gets partitions for query, in ascending order.
*/
@Nullable public int[] getPartitions() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/136075ae/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 007eeb1..6896f18 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -111,6 +111,7 @@ import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQueryParser;
import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQuerySplitter;
import org.apache.ignite.internal.processors.query.h2.twostep.GridMapQueryExecutor;
import org.apache.ignite.internal.processors.query.h2.twostep.GridReduceQueryExecutor;
+import org.apache.ignite.internal.processors.query.h2.twostep.MapQueryLazyWorker;
import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitor;
import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorClosure;
import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
@@ -139,6 +140,7 @@ import org.h2.api.ErrorCode;
import org.h2.api.JavaObjectSerializer;
import org.h2.command.Prepared;
import org.h2.command.dml.Insert;
+import org.h2.engine.Session;
import org.h2.engine.SysProperties;
import org.h2.index.Index;
import org.h2.jdbc.JdbcPreparedStatement;
@@ -905,24 +907,32 @@ public class IgniteH2Indexing implements GridQueryIndexing {
* @throws IgniteCheckedException If failed.
*/
private ResultSet executeSqlQuery(final Connection conn, final PreparedStatement stmt,
- int timeoutMillis, @Nullable GridQueryCancel cancel)
- throws IgniteCheckedException {
+ int timeoutMillis, @Nullable GridQueryCancel cancel) throws IgniteCheckedException {
+ final MapQueryLazyWorker lazyWorker = MapQueryLazyWorker.currentWorker();
if (cancel != null) {
cancel.set(new Runnable() {
@Override public void run() {
- try {
- stmt.cancel();
- }
- catch (SQLException ignored) {
- // No-op.
+ if (lazyWorker != null) {
+ lazyWorker.submit(new Runnable() {
+ @Override public void run() {
+ cancelStatement(stmt);
+ }
+ });
}
+ else
+ cancelStatement(stmt);
}
});
}
+ Session ses = H2Utils.session(conn);
+
if (timeoutMillis > 0)
- H2Utils.session(conn).setQueryTimeout(timeoutMillis);
+ ses.setQueryTimeout(timeoutMillis);
+
+ if (lazyWorker != null)
+ ses.setLazyQueryExecution(true);
try {
return stmt.executeQuery();
@@ -936,7 +946,24 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
finally {
if (timeoutMillis > 0)
- H2Utils.session(conn).setQueryTimeout(0);
+ ses.setQueryTimeout(0);
+
+ if (lazyWorker != null)
+ ses.setLazyQueryExecution(false);
+ }
+ }
+
+ /**
+ * Cancel prepared statement.
+ *
+ * @param stmt Statement.
+ */
+ private static void cancelStatement(PreparedStatement stmt) {
+ try {
+ stmt.cancel();
+ }
+ catch (SQLException ignored) {
+ // No-op.
}
}
@@ -1143,6 +1170,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
* @param keepCacheObj Flag to keep cache object.
* @param enforceJoinOrder Enforce join order of tables.
* @param parts Partitions.
+ * @param lazy Lazy query execution flag.
* @return Iterable result.
*/
private Iterable<List<?>> runQueryTwoStep(
@@ -1153,12 +1181,13 @@ public class IgniteH2Indexing implements GridQueryIndexing {
final int timeoutMillis,
final GridQueryCancel cancel,
final Object[] params,
- final int[] parts
+ final int[] parts,
+ final boolean lazy
) {
return new Iterable<List<?>>() {
@Override public Iterator<List<?>> iterator() {
return rdcQryExec.query(schemaName, qry, keepCacheObj, enforceJoinOrder, timeoutMillis, cancel, params,
- parts);
+ parts, lazy);
}
};
}
@@ -1402,7 +1431,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
QueryCursorImpl<List<?>> cursor = new QueryCursorImpl<>(
runQueryTwoStep(schemaName, twoStepQry, keepBinary, enforceJoinOrder, qry.getTimeout(), cancel,
- qry.getArgs(), partitions), cancel);
+ qry.getArgs(), partitions, qry.isLazy()), cancel);
cursor.fieldsMeta(meta);
@@ -2070,6 +2099,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
if (log.isDebugEnabled())
log.debug("Stopping cache query index...");
+ mapQryExec.cancelLazyWorkers();
+
// unregisterMBean(); TODO https://issues.apache.org/jira/browse/IGNITE-2139
if (ctx != null && !ctx.cache().context().database().persistenceEnabled()) {
for (H2Schema schema : schemas.values())
@@ -2355,6 +2386,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
/** {@inheritDoc} */
@Override public void cancelAllQueries() {
+ mapQryExec.cancelLazyWorkers();
+
for (Connection conn : conns)
U.close(conn, log);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/136075ae/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index ca978e2..0cc4172 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -28,7 +28,9 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
import javax.cache.CacheException;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
@@ -52,7 +54,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservabl
import org.apache.ignite.internal.processors.cache.query.CacheQueryType;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable;
import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
-import org.apache.ignite.internal.processors.cache.query.QueryTable;
import org.apache.ignite.internal.processors.query.h2.H2Utils;
import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
import org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode;
@@ -69,6 +70,7 @@ import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.thread.IgniteThread;
import org.h2.jdbc.JdbcResultSet;
import org.h2.value.Value;
import org.jetbrains.annotations.Nullable;
@@ -107,6 +109,15 @@ public class GridMapQueryExecutor {
/** */
private final ConcurrentMap<MapReservationKey, GridReservable> reservations = new ConcurrentHashMap8<>();
+ /** Lazy workers. */
+ private final ConcurrentHashMap<MapQueryLazyWorkerKey, MapQueryLazyWorker> lazyWorkers = new ConcurrentHashMap<>();
+
+ /** Busy lock for lazy workers. */
+ private final GridSpinBusyLock lazyWorkerBusyLock = new GridSpinBusyLock();
+
+ /** Lazy worker stop guard. */
+ private final AtomicBoolean lazyWorkerStopGuard = new AtomicBoolean();
+
/**
* @param busyLock Busy lock.
*/
@@ -162,6 +173,21 @@ public class GridMapQueryExecutor {
}
/**
+ * Cancel active lazy queries and prevent submit of new queries.
+ */
+ public void cancelLazyWorkers() {
+ if (!lazyWorkerStopGuard.compareAndSet(false, true))
+ return;
+
+ lazyWorkerBusyLock.block();
+
+ for (MapQueryLazyWorker worker : lazyWorkers.values())
+ worker.stop();
+
+ lazyWorkers.clear();
+ }
+
+ /**
* @param nodeId Node ID.
* @param msg Message.
*/
@@ -221,7 +247,7 @@ public class GridMapQueryExecutor {
MapNodeResults nodeRess = qryRess.get(nodeId);
if (nodeRess == null) {
- nodeRess = new MapNodeResults();
+ nodeRess = new MapNodeResults(nodeId);
MapNodeResults old = qryRess.putIfAbsent(nodeId, nodeRess);
@@ -416,6 +442,7 @@ public class GridMapQueryExecutor {
final boolean enforceJoinOrder = req.isFlagSet(GridH2QueryRequest.FLAG_ENFORCE_JOIN_ORDER);
final boolean explain = req.isFlagSet(GridH2QueryRequest.FLAG_EXPLAIN);
final boolean replicated = req.isFlagSet(GridH2QueryRequest.FLAG_REPLICATED);
+ final boolean lazy = req.isFlagSet(GridH2QueryRequest.FLAG_LAZY);
final List<Integer> cacheIds = req.caches();
@@ -429,30 +456,51 @@ public class GridMapQueryExecutor {
final int segment = i;
- ctx.closure().callLocal(
- new Callable<Void>() {
- @Override public Void call() throws Exception {
- onQueryRequest0(node,
- req.requestId(),
- segment,
- req.schemaName(),
- req.queries(),
- cacheIds,
- req.topologyVersion(),
- partsMap,
- parts,
- req.tables(),
- req.pageSize(),
- joinMode,
- enforceJoinOrder,
- false,
- req.timeout(),
- params);
-
- return null;
+ if (lazy) {
+ onQueryRequest0(node,
+ req.requestId(),
+ segment,
+ req.schemaName(),
+ req.queries(),
+ cacheIds,
+ req.topologyVersion(),
+ partsMap,
+ parts,
+ req.pageSize(),
+ joinMode,
+ enforceJoinOrder,
+ false, // Replicated is always false here (see condition above).
+ req.timeout(),
+ params,
+ true); // Lazy = true.
+ }
+ else {
+ ctx.closure().callLocal(
+ new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ onQueryRequest0(node,
+ req.requestId(),
+ segment,
+ req.schemaName(),
+ req.queries(),
+ cacheIds,
+ req.topologyVersion(),
+ partsMap,
+ parts,
+ req.pageSize(),
+ joinMode,
+ enforceJoinOrder,
+ false,
+ req.timeout(),
+ params,
+ false); // Lazy = false.
+
+ return null;
+ }
}
- }
- , QUERY_POOL);
+ , QUERY_POOL);
+ }
}
onQueryRequest0(node,
@@ -464,13 +512,13 @@ public class GridMapQueryExecutor {
req.topologyVersion(),
partsMap,
parts,
- req.tables(),
req.pageSize(),
joinMode,
enforceJoinOrder,
replicated,
req.timeout(),
- params);
+ params,
+ lazy);
}
/**
@@ -483,28 +531,61 @@ public class GridMapQueryExecutor {
* @param topVer Topology version.
* @param partsMap Partitions map for unstable topology.
* @param parts Explicit partitions for current node.
- * @param tbls Tables.
* @param pageSize Page size.
* @param distributedJoinMode Query distributed join mode.
+ * @param lazy Streaming flag.
*/
private void onQueryRequest0(
- ClusterNode node,
- long reqId,
- int segmentId,
- String schemaName,
- Collection<GridCacheSqlQuery> qrys,
- List<Integer> cacheIds,
- AffinityTopologyVersion topVer,
- Map<UUID, int[]> partsMap,
- int[] parts,
- Collection<QueryTable> tbls,
- int pageSize,
- DistributedJoinMode distributedJoinMode,
- boolean enforceJoinOrder,
- boolean replicated,
- int timeout,
- Object[] params
+ final ClusterNode node,
+ final long reqId,
+ final int segmentId,
+ final String schemaName,
+ final Collection<GridCacheSqlQuery> qrys,
+ final List<Integer> cacheIds,
+ final AffinityTopologyVersion topVer,
+ final Map<UUID, int[]> partsMap,
+ final int[] parts,
+ final int pageSize,
+ final DistributedJoinMode distributedJoinMode,
+ final boolean enforceJoinOrder,
+ final boolean replicated,
+ final int timeout,
+ final Object[] params,
+ boolean lazy
) {
+ if (lazy && MapQueryLazyWorker.currentWorker() == null) {
+ // Lazy queries must be re-submitted to dedicated workers.
+ MapQueryLazyWorkerKey key = new MapQueryLazyWorkerKey(node.id(), reqId, segmentId);
+ MapQueryLazyWorker worker = new MapQueryLazyWorker(ctx.igniteInstanceName(), key, log, this);
+
+ worker.submit(new Runnable() {
+ @Override public void run() {
+ onQueryRequest0(node, reqId, segmentId, schemaName, qrys, cacheIds, topVer, partsMap, parts,
+ pageSize, distributedJoinMode, enforceJoinOrder, replicated, timeout, params, true);
+ }
+ });
+
+ if (lazyWorkerBusyLock.enterBusy()) {
+ try {
+ MapQueryLazyWorker oldWorker = lazyWorkers.put(key, worker);
+
+ if (oldWorker != null)
+ oldWorker.stop();
+
+ IgniteThread thread = new IgniteThread(worker);
+
+ thread.start();
+ }
+ finally {
+ lazyWorkerBusyLock.leaveBusy();
+ }
+ }
+ else
+ log.info("Ignored query request (node is stopping) [nodeId=" + node.id() + ", reqId=" + reqId + ']');
+
+ return;
+ }
+
// Prepare to run queries.
GridCacheContext<?, ?> mainCctx =
!F.isEmpty(cacheIds) ? ctx.cache().context().cacheContext(cacheIds.get(0)) : null;
@@ -519,13 +600,18 @@ public class GridMapQueryExecutor {
if (topVer != null) {
// Reserve primary for topology version or explicit partitions.
if (!reservePartitions(cacheIds, topVer, parts, reserved)) {
+ // Unregister lazy worker because re-try may never reach this node again.
+ if (lazy)
+ stopAndUnregisterCurrentLazyWorker();
+
sendRetry(node, reqId, segmentId);
return;
}
}
- qr = new MapQueryResults(h2, reqId, qrys.size(), mainCctx != null ? mainCctx.name() : null);
+ qr = new MapQueryResults(h2, reqId, qrys.size(), mainCctx != null ? mainCctx.name() : null,
+ MapQueryLazyWorker.currentWorker());
if (nodeRess.put(reqId, segmentId, qr) != null)
throw new IllegalStateException();
@@ -570,8 +656,7 @@ public class GridMapQueryExecutor {
ResultSet rs = null;
// If we are not the target node for this replicated query, just ignore it.
- if (qry.node() == null ||
- (segmentId == 0 && qry.node().equals(ctx.localNodeId()))) {
+ if (qry.node() == null || (segmentId == 0 && qry.node().equals(ctx.localNodeId()))) {
rs = h2.executeSqlQueryWithTimer(conn, qry.query(),
F.asList(qry.parameters(params)), true,
timeout,
@@ -624,6 +709,10 @@ public class GridMapQueryExecutor {
qr.cancel(false);
}
+ // Unregister worker after possible cancellation.
+ if (lazy)
+ stopAndUnregisterCurrentLazyWorker();
+
if (X.hasCause(e, GridH2RetryException.class))
sendRetry(node, reqId, segmentId);
else {
@@ -672,27 +761,39 @@ public class GridMapQueryExecutor {
* @param node Node.
* @param req Request.
*/
- private void onNextPageRequest(ClusterNode node, GridQueryNextPageRequest req) {
- MapNodeResults nodeRess = qryRess.get(node.id());
+ private void onNextPageRequest(final ClusterNode node, final GridQueryNextPageRequest req) {
+ final MapNodeResults nodeRess = qryRess.get(node.id());
if (nodeRess == null) {
sendError(node, req.queryRequestId(), new CacheException("No node result found for request: " + req));
return;
- } else if (nodeRess.cancelled(req.queryRequestId())) {
+ }
+ else if (nodeRess.cancelled(req.queryRequestId())) {
sendError(node, req.queryRequestId(), new QueryCancelledException());
return;
}
- MapQueryResults qr = nodeRess.get(req.queryRequestId(), req.segmentId());
+ final MapQueryResults qr = nodeRess.get(req.queryRequestId(), req.segmentId());
if (qr == null)
sendError(node, req.queryRequestId(), new CacheException("No query result found for request: " + req));
else if (qr.cancelled())
sendError(node, req.queryRequestId(), new QueryCancelledException());
- else
- sendNextPage(nodeRess, node, qr, req.query(), req.segmentId(), req.pageSize());
+ else {
+ MapQueryLazyWorker lazyWorker = qr.lazyWorker();
+
+ if (lazyWorker != null) {
+ lazyWorker.submit(new Runnable() {
+ @Override public void run() {
+ sendNextPage(nodeRess, node, qr, req.query(), req.segmentId(), req.pageSize());
+ }
+ });
+ }
+ else
+ sendNextPage(nodeRess, node, qr, req.query(), req.segmentId(), req.pageSize());
+ }
}
/**
@@ -784,4 +885,34 @@ public class GridMapQueryExecutor {
reservations.remove(grpKey);
}
}
+
+ /**
+ * Unregister lazy worker if needed (i.e. if we are currently in lazy worker thread).
+ */
+ public void stopAndUnregisterCurrentLazyWorker() {
+ MapQueryLazyWorker worker = MapQueryLazyWorker.currentWorker();
+
+ if (worker != null) {
+ worker.stop();
+
+ // Just stop is not enough as worker may be registered, but not started due to exception.
+ unregisterLazyWorker(worker);
+ }
+ }
+
+ /**
+ * Unregister lazy worker.
+ *
+ * @param worker Worker.
+ */
+ public void unregisterLazyWorker(MapQueryLazyWorker worker) {
+ lazyWorkers.remove(worker.key(), worker);
+ }
+
+ /**
+ * @return Number of registered lazy workers.
+ */
+ public int registeredLazyWorkers() {
+ return lazyWorkers.size();
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/136075ae/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 0e9d1a2..8638794 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -506,6 +506,7 @@ public class GridReduceQueryExecutor {
* @param cancel Query cancel.
* @param params Query parameters.
* @param parts Partitions.
+ * @param lazy Lazy execution flag.
* @return Rows iterator.
*/
public Iterator<List<?>> query(
@@ -516,7 +517,8 @@ public class GridReduceQueryExecutor {
int timeoutMillis,
GridQueryCancel cancel,
Object[] params,
- final int[] parts
+ final int[] parts,
+ boolean lazy
) {
if (F.isEmpty(params))
params = EMPTY_PARAMS;
@@ -712,6 +714,9 @@ public class GridReduceQueryExecutor {
if (isReplicatedOnly)
flags |= GridH2QueryRequest.FLAG_REPLICATED;
+ if (lazy && mapQrys.size() == 1)
+ flags |= GridH2QueryRequest.FLAG_LAZY;
+
GridH2QueryRequest req = new GridH2QueryRequest()
.requestId(qryReqId)
.topologyVersion(topVer)
http://git-wip-us.apache.org/repos/asf/ignite/blob/136075ae/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapNodeResults.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapNodeResults.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapNodeResults.java
index d5ea357..2d20c8d 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapNodeResults.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapNodeResults.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.query.h2.twostep;
import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashMap;
import org.jsr166.ConcurrentHashMap8;
+import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import static org.jsr166.ConcurrentLinkedHashMap.QueuePolicy.PER_SEGMENT_Q;
@@ -35,6 +36,18 @@ class MapNodeResults {
private final GridBoundedConcurrentLinkedHashMap<Long, Boolean> qryHist =
new GridBoundedConcurrentLinkedHashMap<>(1024, 1024, 0.75f, 64, PER_SEGMENT_Q);
+ /** Node ID. */
+ private final UUID nodeId;
+
+ /**
+ * Constructor.
+ *
+ * @param nodeId Node ID.
+ */
+ public MapNodeResults(UUID nodeId) {
+ this.nodeId = nodeId;
+ }
+
/**
* @param reqId Query Request ID.
* @return {@code False} if query was already cancelled.
@@ -59,7 +72,7 @@ class MapNodeResults {
* @return query partial results.
*/
public MapQueryResults get(long reqId, int segmentId) {
- return res.get(new MapRequestKey(reqId, segmentId));
+ return res.get(new MapRequestKey(nodeId, reqId, segmentId));
}
/**
@@ -84,7 +97,7 @@ class MapNodeResults {
* @return {@code True} if removed.
*/
public boolean remove(long reqId, int segmentId, MapQueryResults qr) {
- return res.remove(new MapRequestKey(reqId, segmentId), qr);
+ return res.remove(new MapRequestKey(nodeId, reqId, segmentId), qr);
}
/**
@@ -94,7 +107,7 @@ class MapNodeResults {
* @return previous value.
*/
public MapQueryResults put(long reqId, int segmentId, MapQueryResults qr) {
- return res.put(new MapRequestKey(reqId, segmentId), qr);
+ return res.put(new MapRequestKey(nodeId, reqId, segmentId), qr);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/136075ae/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryLazyWorker.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryLazyWorker.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryLazyWorker.java
new file mode 100644
index 0000000..5158035
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryLazyWorker.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.twostep;
+
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.internal.util.worker.GridWorker;
+import org.jetbrains.annotations.Nullable;
+import org.jsr166.LongAdder8;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingDeque;
+
+/**
+ * Worker for lazy query execution.
+ */
+public class MapQueryLazyWorker extends GridWorker {
+ /** Lazy thread flag. */
+ private static final ThreadLocal<MapQueryLazyWorker> LAZY_WORKER = new ThreadLocal<>();
+
+ /** Active lazy worker count (for testing purposes). */
+ private static final LongAdder8 ACTIVE_CNT = new LongAdder8();
+
+ /** Task to be executed. */
+ private final BlockingQueue<Runnable> tasks = new LinkedBlockingDeque<>();
+
+ /** Key. */
+ private final MapQueryLazyWorkerKey key;
+
+ /** Map query executor. */
+ private final GridMapQueryExecutor exec;
+
+ /** Latch decremented when worker finishes. */
+ private final CountDownLatch stopLatch = new CountDownLatch(1);
+
+ /** Map query result. */
+ private volatile MapQueryResult res;
+
+ /**
+ * Constructor.
+ *
+ * @param instanceName Instance name.
+ * @param key Lazy worker key.
+ * @param log Logger.
+ * @param exec Map query executor.
+ */
+ public MapQueryLazyWorker(@Nullable String instanceName, MapQueryLazyWorkerKey key, IgniteLogger log,
+ GridMapQueryExecutor exec) {
+ super(instanceName, workerName(instanceName, key), log);
+
+ this.key = key;
+ this.exec = exec;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
+ LAZY_WORKER.set(this);
+
+ ACTIVE_CNT.increment();
+
+ try {
+ while (!isCancelled()) {
+ Runnable task = tasks.take();
+
+ if (task != null)
+ task.run();
+ }
+ }
+ finally {
+ if (res != null)
+ res.close();
+
+ LAZY_WORKER.set(null);
+
+ ACTIVE_CNT.decrement();
+
+ exec.unregisterLazyWorker(this);
+ }
+ }
+
+ /**
+ * Submit task to worker.
+ *
+ * @param task Task to be executed.
+ */
+ public void submit(Runnable task) {
+ tasks.add(task);
+ }
+
+ /**
+ * @return Worker key.
+ */
+ public MapQueryLazyWorkerKey key() {
+ return key;
+ }
+
+ /**
+ * Stop the worker.
+ */
+ public void stop() {
+ if (MapQueryLazyWorker.currentWorker() == null)
+ submit(new Runnable() {
+ @Override public void run() {
+ stop();
+ }
+ });
+ else {
+ isCancelled = true;
+
+ stopLatch.countDown();
+ }
+ }
+
+ /**
+ * Await worker stop.
+ */
+ public void awaitStop() {
+ try {
+ U.await(stopLatch);
+ }
+ catch (IgniteInterruptedCheckedException e) {
+ throw new IgniteException("Failed to wait for lazy worker stop (interrupted): " + name(), e);
+ }
+ }
+
+ /**
+ * @param res Map query result.
+ */
+ public void result(MapQueryResult res) {
+ this.res = res;
+ }
+
+ /**
+ * @return Current worker or {@code null} if call is performed not from lazy worker thread.
+ */
+ @Nullable public static MapQueryLazyWorker currentWorker() {
+ return LAZY_WORKER.get();
+ }
+
+ /**
+ * @return Active workers count.
+ */
+ public static int activeCount() {
+ return ACTIVE_CNT.intValue();
+ }
+
+ /**
+ * Construct worker name.
+ *
+ * @param instanceName Instance name.
+ * @param key Key.
+ * @return Name.
+ */
+ private static String workerName(String instanceName, MapQueryLazyWorkerKey key) {
+ return "query-lazy-worker_" + instanceName + "_" + key.nodeId() + "_" + key.queryRequestId() + "_" +
+ key.segment();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/136075ae/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryLazyWorkerKey.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryLazyWorkerKey.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryLazyWorkerKey.java
new file mode 100644
index 0000000..a0f5ebb
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryLazyWorkerKey.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.twostep;
+
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+import java.util.UUID;
+
+/**
+ * Key to identify lazy worker.
+ */
+public class MapQueryLazyWorkerKey {
+ /** Client node ID. */
+ private final UUID nodeId;
+
+ /** Query request ID. */
+ private final long qryReqId;
+
+ /** Segment. */
+ private final int segment;
+
+ /**
+ * Constructor.
+ *
+ * @param nodeId Node ID.
+ * @param qryReqId Query request ID.
+ * @param segment Segment.
+ */
+ public MapQueryLazyWorkerKey(UUID nodeId, long qryReqId, int segment) {
+ this.nodeId = nodeId;
+ this.qryReqId = qryReqId;
+ this.segment = segment;
+ }
+
+ /**
+ * @return Node id.
+ */
+ public UUID nodeId() {
+ return nodeId;
+ }
+
+ /**
+ * @return Query request ID.
+ */
+ public long queryRequestId() {
+ return qryReqId;
+ }
+
+ /**
+ * @return Segment.
+ */
+ public int segment() {
+ return segment;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ int res = nodeId.hashCode();
+
+ res = 31 * res + (int)(qryReqId ^ (qryReqId >>> 32));
+ res = 31 * res + segment;
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object obj) {
+ if (obj != null && obj instanceof MapQueryLazyWorkerKey) {
+ MapQueryLazyWorkerKey other = (MapQueryLazyWorkerKey)obj;
+
+ return F.eq(qryReqId, other.qryReqId) && F.eq(nodeId, other.nodeId) && F.eq(segment, other.segment);
+ }
+
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(MapQueryLazyWorkerKey.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/136075ae/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java
index 4799e03..e54c784d 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java
@@ -26,6 +26,7 @@ import org.apache.ignite.internal.processors.query.h2.opt.GridH2ValueCacheObject
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.h2.jdbc.JdbcResultSet;
+import org.h2.result.LazyResult;
import org.h2.result.ResultInterface;
import org.h2.value.Value;
import org.jetbrains.annotations.Nullable;
@@ -41,7 +42,7 @@ import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_OBJECT_READ;
/**
* Mapper result for a single part of the query.
*/
-class MapQueryResult implements AutoCloseable {
+class MapQueryResult {
/** */
private static final Field RESULT_FIELD;
@@ -95,24 +96,30 @@ class MapQueryResult implements AutoCloseable {
/** */
private final Object[] params;
+ /** Lazy worker. */
+ private final MapQueryLazyWorker lazyWorker;
+
/**
* @param rs Result set.
* @param cacheName Cache name.
* @param qrySrcNodeId Query source node.
* @param qry Query.
* @param params Query params.
+ * @param lazyWorker Lazy worker.
*/
MapQueryResult(IgniteH2Indexing h2, ResultSet rs, @Nullable String cacheName,
- UUID qrySrcNodeId, GridCacheSqlQuery qry, Object[] params) {
+ UUID qrySrcNodeId, GridCacheSqlQuery qry, Object[] params, @Nullable MapQueryLazyWorker lazyWorker) {
this.h2 = h2;
this.cacheName = cacheName;
this.qry = qry;
this.params = params;
this.qrySrcNodeId = qrySrcNodeId;
this.cpNeeded = F.eq(h2.kernalContext().localNodeId(), qrySrcNodeId);
+ this.lazyWorker = lazyWorker;
if (rs != null) {
this.rs = rs;
+
try {
res = (ResultInterface)RESULT_FIELD.get(rs);
}
@@ -120,7 +127,7 @@ class MapQueryResult implements AutoCloseable {
throw new IllegalStateException(e); // Must not happen.
}
- rowCnt = res.getRowCount();
+ rowCnt = (res instanceof LazyResult) ? -1 : res.getRowCount();
cols = res.getVisibleColumnCount();
}
else {
@@ -167,6 +174,8 @@ class MapQueryResult implements AutoCloseable {
* @return {@code true} If there are no more rows available.
*/
synchronized boolean fetchNextPage(List<Value[]> rows, int pageSize) {
+ assert lazyWorker == null || lazyWorker == MapQueryLazyWorker.currentWorker();
+
if (closed)
return true;
@@ -246,13 +255,34 @@ class MapQueryResult implements AutoCloseable {
return res;
}
- /** {@inheritDoc} */
- @Override public synchronized void close() {
- if (closed)
+ /**
+ * Close the result.
+ */
+ public void close() {
+ if (lazyWorker != null && MapQueryLazyWorker.currentWorker() == null) {
+ lazyWorker.submit(new Runnable() {
+ @Override public void run() {
+ close();
+ }
+ });
+
+ lazyWorker.awaitStop();
+
return;
+ }
- closed = true;
+ synchronized (this) {
+ assert lazyWorker == null || lazyWorker == MapQueryLazyWorker.currentWorker();
- U.closeQuiet(rs);
+ if (closed)
+ return;
+
+ closed = true;
+
+ U.closeQuiet(rs);
+
+ if (lazyWorker != null)
+ lazyWorker.stop();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/136075ae/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResults.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResults.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResults.java
index 7ad1d14..99f1966 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResults.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResults.java
@@ -45,20 +45,27 @@ class MapQueryResults {
/** */
private final String cacheName;
+ /** Lazy worker. */
+ private final MapQueryLazyWorker lazyWorker;
+
/** */
private volatile boolean cancelled;
/**
+ * Constructor.
+ *
* @param qryReqId Query request ID.
* @param qrys Number of queries.
* @param cacheName Cache name.
+ * @param lazyWorker Lazy worker (if any).
*/
@SuppressWarnings("unchecked")
- MapQueryResults(IgniteH2Indexing h2, long qryReqId, int qrys,
- @Nullable String cacheName) {
+ MapQueryResults(IgniteH2Indexing h2, long qryReqId, int qrys, @Nullable String cacheName,
+ @Nullable MapQueryLazyWorker lazyWorker) {
this.h2 = h2;
this.qryReqId = qryReqId;
this.cacheName = cacheName;
+ this.lazyWorker = lazyWorker;
results = new AtomicReferenceArray<>(qrys);
cancels = new GridQueryCancel[qrys];
@@ -86,13 +93,25 @@ class MapQueryResults {
}
/**
+ * @return Lazy worker.
+ */
+ MapQueryLazyWorker lazyWorker() {
+ return lazyWorker;
+ }
+
+ /**
+ * Add result.
+ *
* @param qry Query result index.
* @param q Query object.
* @param qrySrcNodeId Query source node.
* @param rs Result set.
*/
void addResult(int qry, GridCacheSqlQuery q, UUID qrySrcNodeId, ResultSet rs, Object[] params) {
- MapQueryResult res = new MapQueryResult(h2, rs, cacheName, qrySrcNodeId, q, params);
+ MapQueryResult res = new MapQueryResult(h2, rs, cacheName, qrySrcNodeId, q, params, lazyWorker);
+
+ if (lazyWorker != null)
+ lazyWorker.result(res);
if (!results.compareAndSet(qry, null, res))
throw new IllegalStateException();
@@ -130,6 +149,7 @@ class MapQueryResults {
continue;
}
+ // NB: Cancel is already safe even for lazy queries (see implementation of passed Runnable).
if (forceQryCancel) {
GridQueryCancel cancel = cancels[i];
http://git-wip-us.apache.org/repos/asf/ignite/blob/136075ae/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapRequestKey.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapRequestKey.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapRequestKey.java
index 6feb8ea..9d987db 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapRequestKey.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapRequestKey.java
@@ -17,18 +17,32 @@
package org.apache.ignite.internal.processors.query.h2.twostep;
+import org.apache.ignite.internal.util.typedef.F;
+
+import java.util.UUID;
+
/**
* Mapper request key.
*/
class MapRequestKey {
+ /** Node ID. */
+ private UUID nodeId;
+
/** */
private long reqId;
/** */
private int segmentId;
- /** Constructor */
- MapRequestKey(long reqId, int segmentId) {
+ /**
+ * Constructor.
+ *
+ * @param nodeId Node ID.
+ * @param reqId Request ID.
+ * @param segmentId Segment ID.
+ */
+ MapRequestKey(UUID nodeId, long reqId, int segmentId) {
+ this.nodeId = nodeId;
this.reqId = reqId;
this.segmentId = segmentId;
}
@@ -50,14 +64,15 @@ class MapRequestKey {
MapRequestKey other = (MapRequestKey)o;
- return reqId == other.reqId && segmentId == other.segmentId;
+ return F.eq(nodeId, other.nodeId) && reqId == other.reqId && segmentId == other.segmentId;
}
/** {@inheritDoc} */
@Override public int hashCode() {
- int res = (int)(reqId ^ (reqId >>> 32));
+ int res = nodeId != null ? nodeId.hashCode() : 0;
+ res = 31 * res + (int)(reqId ^ (reqId >>> 32));
res = 31 * res + segmentId;
return res;
http://git-wip-us.apache.org/repos/asf/ignite/blob/136075ae/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
index 93a383c..4e1fadb 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
@@ -78,6 +78,11 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
*/
public static final int FLAG_REPLICATED = 1 << 4;
+ /**
+ * If lazy execution is enabled.
+ */
+ public static final int FLAG_LAZY = 1 << 5;
+
/** */
private long reqId;
@@ -185,6 +190,10 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
}
/**
+ * Get tables.
+ * <p>
+ * N.B.: Was used in AI 1.9 for snapshots. Unused at the moment, but should be kept for compatibility reasons.
+ *
* @return Tables.
*/
public Collection<QueryTable> tables() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/136075ae/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LazyQuerySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LazyQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LazyQuerySelfTest.java
new file mode 100644
index 0000000..d5cc0eb
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LazyQuerySelfTest.java
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.query.FieldsQueryCursor;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
+import org.apache.ignite.internal.processors.query.h2.twostep.MapQueryLazyWorker;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * Tests for lazy query execution.
+ */
+public class LazyQuerySelfTest extends GridCommonAbstractTest {
+ /** Keys ocunt. */
+ private static final int KEY_CNT = 200;
+
+ /** Base query argument. */
+ private static final int BASE_QRY_ARG = 50;
+
+ /** Size for small pages. */
+ private static final int PAGE_SIZE_SMALL = 12;
+
+ /** Cache name. */
+ private static final String CACHE_NAME = "cache";
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+ }
+
+ /**
+ * Test local query execution.
+ *
+ * @throws Exception If failed.
+ */
+ public void testSingleNode() throws Exception {
+ checkSingleNode(1);
+ }
+
+ /**
+ * Test local query execution.
+ *
+ * @throws Exception If failed.
+ */
+ public void testSingleNodeWithParallelism() throws Exception {
+ checkSingleNode(4);
+ }
+
+ /**
+ * Test query execution with multiple topology nodes.
+ *
+ * @throws Exception If failed.
+ */
+ public void testMultipleNodes() throws Exception {
+ checkMultipleNodes(1);
+ }
+
+ /**
+ * Test query execution with multiple topology nodes with query parallelism.
+ *
+ * @throws Exception If failed.
+ */
+ public void testMultipleNodesWithParallelism() throws Exception {
+ checkMultipleNodes(4);
+ }
+
+ /**
+ * Check local query execution.
+ *
+ * @param parallelism Query parallelism.
+ * @throws Exception If failed.
+ */
+ public void checkSingleNode(int parallelism) throws Exception {
+ Ignite srv = startGrid();
+
+ srv.createCache(cacheConfiguration(parallelism));
+
+ populateBaseQueryData(srv);
+
+ checkBaseOperations(srv);
+ }
+
+ /**
+ * Check query execution with multiple topology nodes.
+ *
+ * @param parallelism Query parallelism.
+ * @throws Exception If failed.
+ */
+ public void checkMultipleNodes(int parallelism) throws Exception {
+ Ignite srv1 = startGrid(1);
+ Ignite srv2 = startGrid(2);
+
+ Ignite cli;
+
+ try {
+ Ignition.setClientMode(true);
+
+ cli = startGrid(3);
+ }
+ finally {
+ Ignition.setClientMode(false);
+ }
+
+ cli.createCache(cacheConfiguration(parallelism));
+
+ populateBaseQueryData(cli);
+
+ checkBaseOperations(srv1);
+ checkBaseOperations(srv2);
+ checkBaseOperations(cli);
+
+ // Test originating node leave.
+ FieldsQueryCursor<List<?>> cursor = execute(cli, baseQuery().setPageSize(PAGE_SIZE_SMALL));
+
+ Iterator<List<?>> iter = cursor.iterator();
+
+ for (int i = 0; i < 30; i++)
+ iter.next();
+
+ stopGrid(3);
+
+ assertNoWorkers();
+
+ // Test server node leave with active worker.
+ cursor = execute(srv1, baseQuery().setPageSize(PAGE_SIZE_SMALL));
+
+ try {
+ iter = cursor.iterator();
+
+ for (int i = 0; i < 30; i++)
+ iter.next();
+
+ stopGrid(2);
+ }
+ finally {
+ cursor.close();
+ }
+
+ assertNoWorkers();
+ }
+
+ /**
+ * Check base operations.
+ *
+ * @param node Node.
+ * @throws Exception If failed.
+ */
+ private void checkBaseOperations(Ignite node) throws Exception {
+ // Get full data.
+ List<List<?>> rows = execute(node, baseQuery()).getAll();
+
+ assertBaseQueryResults(rows);
+ assertNoWorkers();
+
+ // Get data in several pages.
+ rows = execute(node, baseQuery().setPageSize(PAGE_SIZE_SMALL)).getAll();
+
+ assertBaseQueryResults(rows);
+ assertNoWorkers();
+
+ // Test full iteration.
+ rows = new ArrayList<>();
+
+ FieldsQueryCursor<List<?>> cursor = execute(node, baseQuery().setPageSize(PAGE_SIZE_SMALL));
+
+ for (List<?> row : cursor)
+ rows.add(row);
+
+ assertBaseQueryResults(rows);
+ assertNoWorkers();
+
+ // Test partial iteration with cursor close.
+ try (FieldsQueryCursor<List<?>> partialCursor = execute(node, baseQuery().setPageSize(PAGE_SIZE_SMALL))) {
+ Iterator<List<?>> iter = partialCursor.iterator();
+
+ for (int i = 0; i < 30; i++)
+ iter.next();
+ }
+
+ assertNoWorkers();
+
+ // Test execution of multiple queries at a time.
+ List<Iterator<List<?>>> iters = new ArrayList<>();
+
+ for (int i = 0; i < 200; i++)
+ iters.add(execute(node, randomizedQuery().setPageSize(PAGE_SIZE_SMALL)).iterator());
+
+ while (!iters.isEmpty()) {
+ Iterator<Iterator<List<?>>> iterIter = iters.iterator();
+
+ while (iterIter.hasNext()) {
+ Iterator<List<?>> iter = iterIter.next();
+
+ int i = 0;
+
+ while (iter.hasNext() && i < 20) {
+ iter.next();
+
+ i++;
+ }
+
+ if (!iter.hasNext())
+ iterIter.remove();
+ }
+ }
+
+ assertNoWorkers();
+ }
+
+ /**
+ * Populate base query data.
+ *
+ * @param node Node.
+ */
+ private static void populateBaseQueryData(Ignite node) {
+ IgniteCache<Long, Person> cache = cache(node);
+
+ for (long i = 0; i < KEY_CNT; i++)
+ cache.put(i, new Person(i));
+ }
+
+ /**
+ * @return Query with randomized argument.
+ */
+ private static SqlFieldsQuery randomizedQuery() {
+ return query(ThreadLocalRandom.current().nextInt(KEY_CNT / 2));
+ }
+
+ /**
+ * @return Base query.
+ */
+ private static SqlFieldsQuery baseQuery() {
+ return query(BASE_QRY_ARG);
+ }
+
+ /**
+ * @param parallelism Query parallelism.
+ * @return Default cache configuration.
+ */
+ private static CacheConfiguration<Long, Person> cacheConfiguration(int parallelism) {
+ return new CacheConfiguration<Long, Person>().setName(CACHE_NAME).setIndexedTypes(Long.class, Person.class)
+ .setQueryParallelism(parallelism);
+ }
+
+ /**
+ * Default query.
+ *
+ * @param arg Argument.
+ * @return Query.
+ */
+ private static SqlFieldsQuery query(long arg) {
+ return new SqlFieldsQuery("SELECT id, name FROM Person WHERE id >= ?").setArgs(arg);
+ }
+
+ /**
+ * Assert base query results.
+ *
+ * @param rows Result rows.
+ */
+ private static void assertBaseQueryResults(List<List<?>> rows) {
+ assertEquals(KEY_CNT - BASE_QRY_ARG, rows.size());
+
+ for (List<?> row : rows) {
+ Long id = (Long)row.get(0);
+ String name = (String)row.get(1);
+
+ assertTrue(id >= BASE_QRY_ARG);
+ assertEquals(nameForId(id), name);
+ }
+ }
+
+ /**
+ * Get cache for node.
+ *
+ * @param node Node.
+ * @return Cache.
+ */
+ private static IgniteCache<Long, Person> cache(Ignite node) {
+ return node.cache(CACHE_NAME);
+ }
+
+ /**
+ * Execute query on the given cache.
+ *
+ * @param node Node.
+ * @param qry Query.
+ * @return Cursor.
+ */
+ @SuppressWarnings("unchecked")
+ private static FieldsQueryCursor<List<?>> execute(Ignite node, SqlFieldsQuery qry) {
+ return cache(node).query(qry.setLazy(true));
+ }
+
+ /**
+ * Make sure that are no active lazy workers.
+ *
+ * @throws Exception If failed.
+ */
+ private static void assertNoWorkers() throws Exception {
+ assert GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ for (Ignite node : Ignition.allGrids()) {
+ IgniteH2Indexing idx = (IgniteH2Indexing) ((IgniteKernal)node).context().query().getIndexing();
+
+ if (idx.mapQueryExecutor().registeredLazyWorkers() != 0)
+ return false;
+ }
+
+ return MapQueryLazyWorker.activeCount() == 0;
+ }
+ }, 1000L);
+ }
+
+ /**
+ * Get name for ID.
+ *
+ * @param id ID.
+ * @return Name.
+ */
+ private static String nameForId(long id) {
+ return "name-" + id;
+ }
+
+ /**
+ * Person class.
+ */
+ private static class Person {
+ /** ID. */
+ @QuerySqlField(index = true)
+ private long id;
+
+ /** Name. */
+ @QuerySqlField
+ private String name;
+
+ /**
+ * Constructor.
+ *
+ * @param id ID.
+ */
+ public Person(long id) {
+ this.id = id;
+ this.name = nameForId(id);
+ }
+
+ /**
+ * @return ID.
+ */
+ public long id() {
+ return id;
+ }
+
+ /**
+ * @return Name.
+ */
+ public String name() {
+ return name;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/136075ae/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
index 99b0370..5ac0655f 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
@@ -127,6 +127,7 @@ import org.apache.ignite.internal.processors.query.IgniteSqlSchemaIndexingTest;
import org.apache.ignite.internal.processors.query.IgniteSqlSegmentedIndexMultiNodeSelfTest;
import org.apache.ignite.internal.processors.query.IgniteSqlSegmentedIndexSelfTest;
import org.apache.ignite.internal.processors.query.IgniteSqlSplitterSelfTest;
+import org.apache.ignite.internal.processors.query.LazyQuerySelfTest;
import org.apache.ignite.internal.processors.query.SqlSchemaSelfTest;
import org.apache.ignite.internal.processors.query.h2.GridH2IndexingInMemSelfTest;
import org.apache.ignite.internal.processors.query.h2.GridH2IndexingOffheapSelfTest;
@@ -184,6 +185,7 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite {
suite.addTestSuite(IncorrectQueryEntityTest.class);
// Queries tests.
+ suite.addTestSuite(LazyQuerySelfTest.class);
suite.addTestSuite(IgniteSqlSplitterSelfTest.class);
suite.addTestSuite(IgniteSqlSegmentedIndexSelfTest.class);
suite.addTestSuite(IgniteSqlSegmentedIndexMultiNodeSelfTest.class);