You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by an...@apache.org on 2022/09/05 09:18:56 UTC
[ignite] branch master updated: IGNITE-17594 Provide ability to register listeners for query start/finish events. (#10227)
This is an automated email from the ASF dual-hosted git repository.
anovikov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new d9640648cdf IGNITE-17594 Provide ability to register listeners for query start/finish events. (#10227)
d9640648cdf is described below
commit d9640648cdfc87959d9fe80f67e22d2ba5438379
Author: Andrey Novikov <an...@gridgain.com>
AuthorDate: Mon Sep 5 16:18:40 2022 +0700
IGNITE-17594 Provide ability to register listeners for query start/finish events. (#10227)
---
.../query/calcite/QueryRegistryImpl.java | 2 +-
...ngQueryInfo.java => GridQueryFinishedInfo.java} | 151 +++----
...ingQueryInfo.java => GridQueryStartedInfo.java} | 134 ++----
.../processors/query/GridRunningQueryInfo.java | 48 ++
.../processors/query/RunningQueryManager.java | 133 +++++-
.../processors/query/h2/IgniteH2Indexing.java | 61 ++-
.../h2/IgniteSqlQueryStartFinishListenerTest.java | 495 +++++++++++++++++++++
.../IgniteBinaryCacheQueryTestSuite.java | 2 +
8 files changed, 854 insertions(+), 172 deletions(-)
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/QueryRegistryImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/QueryRegistryImpl.java
index 58c5df1bf25..d023a258e1a 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/QueryRegistryImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/QueryRegistryImpl.java
@@ -63,7 +63,7 @@ public class QueryRegistryImpl extends AbstractService implements QueryRegistry
String initiatorId = fieldsQry != null ? fieldsQry.getQueryInitiatorId() : null;
long locId = qryMgr.register(rootQry.sql(), GridCacheQueryType.SQL_FIELDS, rootQry.context().schemaName(),
- false, createCancelToken(qry), initiatorId);
+ false, createCancelToken(qry), initiatorId, false, false, false);
rootQry.localQueryId(locId);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryFinishedInfo.java
similarity index 58%
copy from modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java
copy to modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryFinishedInfo.java
index bdc29e9f6ae..8983f8230a4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryFinishedInfo.java
@@ -19,13 +19,13 @@ package org.apache.ignite.internal.processors.query;
import java.util.UUID;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
-import org.apache.ignite.internal.processors.tracing.MTC;
-import org.apache.ignite.internal.processors.tracing.Span;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
/**
- * Query descriptor.
+ * Info about finished query.
*/
-public class GridRunningQueryInfo {
+public class GridQueryFinishedInfo {
/** */
private final long id;
@@ -44,29 +44,30 @@ public class GridRunningQueryInfo {
/** */
private final long startTime;
- /** Query start time in nanoseconds to measure duration. */
- private final long startTimeNanos;
-
/** */
- private final GridQueryCancel cancel;
+ private final long finishTime;
/** */
private final boolean loc;
- /** */
- private final QueryRunningFuture fut = new QueryRunningFuture();
+ /** Enforce join order query flag. */
+ private boolean enforceJoinOrder;
- /** Span of the running query. */
- private final Span span;
+ /** Lazy query flag. */
+ private boolean lazy;
- /** Originator. */
- private final String qryInitiatorId;
+ /** Distributed joins query flag. */
+ private boolean distributedJoins;
- /** Request ID. */
- private long reqId;
+ /** Whether query is failed or not. */
+ private final boolean failed;
- /** Subject ID. */
- private final UUID subjId;
+ /** Exception that caused query execution fail. */
+ @Nullable
+ private final Throwable failReason;
+
+ /** Originator. */
+ private final String qryInitiatorId;
/**
* Constructor.
@@ -77,23 +78,30 @@ public class GridRunningQueryInfo {
* @param qryType Query type.
* @param schemaName Schema name.
* @param startTime Query start time.
- * @param startTimeNanos Query start time in nanoseconds.
- * @param cancel Query cancel.
+ * @param finishTime Query finish time.
* @param loc Local query flag.
- * @param subjId Subject ID.
- */
- public GridRunningQueryInfo(
- long id,
+ * @param enforceJoinOrder Local query flag.
+ * @param lazy Local query flag.
+ * @param distributedJoins Local query flag.
+ * @param failed Whether query is failed or not.
+ * @param failReason Exception that caused query execution fail.
+ * @param qryInitiatorId Query's initiator identifier.
+ */
+ public GridQueryFinishedInfo(
+ Long id,
UUID nodeId,
String qry,
GridCacheQueryType qryType,
String schemaName,
long startTime,
- long startTimeNanos,
- GridQueryCancel cancel,
+ long finishTime,
boolean loc,
- String qryInitiatorId,
- UUID subjId
+ boolean enforceJoinOrder,
+ boolean lazy,
+ boolean distributedJoins,
+ boolean failed,
+ @Nullable Throwable failReason,
+ String qryInitiatorId
) {
this.id = id;
this.nodeId = nodeId;
@@ -101,26 +109,28 @@ public class GridRunningQueryInfo {
this.qryType = qryType;
this.schemaName = schemaName;
this.startTime = startTime;
- this.startTimeNanos = startTimeNanos;
- this.cancel = cancel;
+ this.finishTime = finishTime;
this.loc = loc;
- this.span = MTC.span();
+ this.enforceJoinOrder = enforceJoinOrder;
+ this.lazy = lazy;
+ this.distributedJoins = distributedJoins;
+ this.failed = failed;
+ this.failReason = failReason;
this.qryInitiatorId = qryInitiatorId;
- this.subjId = subjId;
}
/**
* @return Query ID.
*/
- public long id() {
+ public Long id() {
return id;
}
/**
- * @return Global query ID.
+ * @return Node ID.
*/
- public String globalQueryId() {
- return QueryUtils.globalQueryId(nodeId, id);
+ public UUID nodeId() {
+ return nodeId;
}
/**
@@ -152,67 +162,53 @@ public class GridRunningQueryInfo {
}
/**
- * @return Query start time in nanoseconds.
- */
- public long startTimeNanos() {
- return startTimeNanos;
- }
-
- /**
- * @param curTime Current time.
- * @param duration Duration of long query.
- * @return {@code true} if this query should be considered as long running query.
+ * @return Query finish time.
*/
- public boolean longQuery(long curTime, long duration) {
- return curTime - startTime > duration;
+ public long finishTime() {
+ return finishTime;
}
/**
- * Cancel query.
+ * @return {@code true} if query is local.
*/
- public void cancel() {
- if (cancel != null)
- cancel.cancel();
+ public boolean local() {
+ return loc;
}
/**
- * @return Query running future.
+ * @return Enforce join order flag.
*/
- public QueryRunningFuture runningFuture() {
- return fut;
+ public boolean enforceJoinOrder() {
+ return enforceJoinOrder;
}
/**
- * @return {@code true} if query can be cancelled.
+ * @return Lazy flag.
*/
- public boolean cancelable() {
- return cancel != null;
+ public boolean lazy() {
+ return lazy;
}
/**
- * @return {@code true} if query is local.
+ * @return Distributed joins.
*/
- public boolean local() {
- return loc;
+ public boolean distributedJoins() {
+ return distributedJoins;
}
/**
- * @return Originating node ID.
+ * @return {@code true} if query is failed.
*/
- public UUID nodeId() {
- return nodeId;
+ public boolean failed() {
+ return failed;
}
/**
- * @return Span of the running query.
+ * @return Exception that caused query execution fail, or {@code null} if query succeded.
*/
- public Span span() {
- return span;
- }
-
- /** @return Request ID. */
- public long requestId() {
- return reqId;
+ @Nullable
+ public Throwable failReason() {
+ return failReason;
}
/**
@@ -223,13 +219,8 @@ public class GridRunningQueryInfo {
return qryInitiatorId;
}
- /** @param reqId Request ID. */
- public void requestId(long reqId) {
- this.reqId = reqId;
- }
-
- /** @return Subject ID. */
- public UUID subjectId() {
- return subjId;
+ /**{@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridQueryFinishedInfo.class, this);
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryStartedInfo.java
similarity index 58%
copy from modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java
copy to modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryStartedInfo.java
index bdc29e9f6ae..f0e83794dbd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryStartedInfo.java
@@ -19,13 +19,12 @@ package org.apache.ignite.internal.processors.query;
import java.util.UUID;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
-import org.apache.ignite.internal.processors.tracing.MTC;
-import org.apache.ignite.internal.processors.tracing.Span;
+import org.apache.ignite.internal.util.typedef.internal.S;
/**
- * Query descriptor.
+ * Info about new started query.
*/
-public class GridRunningQueryInfo {
+public class GridQueryStartedInfo {
/** */
private final long id;
@@ -44,30 +43,24 @@ public class GridRunningQueryInfo {
/** */
private final long startTime;
- /** Query start time in nanoseconds to measure duration. */
- private final long startTimeNanos;
-
- /** */
- private final GridQueryCancel cancel;
+ /** Query cancellable flag. */
+ private boolean cancellable;
/** */
private final boolean loc;
- /** */
- private final QueryRunningFuture fut = new QueryRunningFuture();
+ /** Enforce join order query flag. */
+ private boolean enforceJoinOrder;
+
+ /** Lazy query flag. */
+ private boolean lazy;
- /** Span of the running query. */
- private final Span span;
+ /** Distributed joins query flag. */
+ private boolean distributedJoins;
/** Originator. */
private final String qryInitiatorId;
- /** Request ID. */
- private long reqId;
-
- /** Subject ID. */
- private final UUID subjId;
-
/**
* Constructor.
*
@@ -77,23 +70,26 @@ public class GridRunningQueryInfo {
* @param qryType Query type.
* @param schemaName Schema name.
* @param startTime Query start time.
- * @param startTimeNanos Query start time in nanoseconds.
- * @param cancel Query cancel.
+ * @param cancellable Query cancellable flag.
* @param loc Local query flag.
- * @param subjId Subject ID.
+ * @param enforceJoinOrder Local query flag.
+ * @param lazy Local query flag.
+ * @param distributedJoins Local query flag.
+ * @param qryInitiatorId Query's initiator identifier.
*/
- public GridRunningQueryInfo(
- long id,
+ public GridQueryStartedInfo(
+ Long id,
UUID nodeId,
String qry,
GridCacheQueryType qryType,
String schemaName,
long startTime,
- long startTimeNanos,
- GridQueryCancel cancel,
+ boolean cancellable,
boolean loc,
- String qryInitiatorId,
- UUID subjId
+ boolean enforceJoinOrder,
+ boolean lazy,
+ boolean distributedJoins,
+ String qryInitiatorId
) {
this.id = id;
this.nodeId = nodeId;
@@ -101,26 +97,26 @@ public class GridRunningQueryInfo {
this.qryType = qryType;
this.schemaName = schemaName;
this.startTime = startTime;
- this.startTimeNanos = startTimeNanos;
- this.cancel = cancel;
+ this.cancellable = cancellable;
this.loc = loc;
- this.span = MTC.span();
+ this.enforceJoinOrder = enforceJoinOrder;
+ this.lazy = lazy;
+ this.distributedJoins = distributedJoins;
this.qryInitiatorId = qryInitiatorId;
- this.subjId = subjId;
}
/**
* @return Query ID.
*/
- public long id() {
+ public Long id() {
return id;
}
/**
- * @return Global query ID.
+ * @return Node ID.
*/
- public String globalQueryId() {
- return QueryUtils.globalQueryId(nodeId, id);
+ public UUID nodeId() {
+ return nodeId;
}
/**
@@ -151,42 +147,11 @@ public class GridRunningQueryInfo {
return startTime;
}
- /**
- * @return Query start time in nanoseconds.
- */
- public long startTimeNanos() {
- return startTimeNanos;
- }
-
- /**
- * @param curTime Current time.
- * @param duration Duration of long query.
- * @return {@code true} if this query should be considered as long running query.
- */
- public boolean longQuery(long curTime, long duration) {
- return curTime - startTime > duration;
- }
-
- /**
- * Cancel query.
- */
- public void cancel() {
- if (cancel != null)
- cancel.cancel();
- }
-
- /**
- * @return Query running future.
- */
- public QueryRunningFuture runningFuture() {
- return fut;
- }
-
/**
* @return {@code true} if query can be cancelled.
*/
- public boolean cancelable() {
- return cancel != null;
+ public boolean cancellable() {
+ return cancellable;
}
/**
@@ -197,22 +162,24 @@ public class GridRunningQueryInfo {
}
/**
- * @return Originating node ID.
+ * @return Enforce join order flag.
*/
- public UUID nodeId() {
- return nodeId;
+ public boolean enforceJoinOrder() {
+ return enforceJoinOrder;
}
/**
- * @return Span of the running query.
+ * @return Lazy flag.
*/
- public Span span() {
- return span;
+ public boolean lazy() {
+ return lazy;
}
- /** @return Request ID. */
- public long requestId() {
- return reqId;
+ /**
+ * @return Distributed joins.
+ */
+ public boolean distributedJoins() {
+ return distributedJoins;
}
/**
@@ -223,13 +190,8 @@ public class GridRunningQueryInfo {
return qryInitiatorId;
}
- /** @param reqId Request ID. */
- public void requestId(long reqId) {
- this.reqId = reqId;
- }
-
- /** @return Subject ID. */
- public UUID subjectId() {
- return subjId;
+ /**{@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridQueryStartedInfo.class, this);
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java
index bdc29e9f6ae..927239533cc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java
@@ -21,6 +21,8 @@ import java.util.UUID;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
import org.apache.ignite.internal.processors.tracing.MTC;
import org.apache.ignite.internal.processors.tracing.Span;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
/**
* Query descriptor.
@@ -54,6 +56,7 @@ public class GridRunningQueryInfo {
private final boolean loc;
/** */
+ @GridToStringExclude
private final QueryRunningFuture fut = new QueryRunningFuture();
/** Span of the running query. */
@@ -62,6 +65,15 @@ public class GridRunningQueryInfo {
/** Originator. */
private final String qryInitiatorId;
+ /** Enforce join order flag. */
+ private final boolean enforceJoinOrder;
+
+ /** Lazy flag. */
+ private final boolean lazy;
+
+ /** Distributed joins flag. */
+ private final boolean distributedJoins;
+
/** Request ID. */
private long reqId;
@@ -80,6 +92,10 @@ public class GridRunningQueryInfo {
* @param startTimeNanos Query start time in nanoseconds.
* @param cancel Query cancel.
* @param loc Local query flag.
+ * @param qryInitiatorId Query's initiator identifier.
+ * @param enforceJoinOrder Enforce join order flag.
+ * @param lazy Lazy flag.
+ * @param distributedJoins Distributed joins flag.
* @param subjId Subject ID.
*/
public GridRunningQueryInfo(
@@ -93,6 +109,9 @@ public class GridRunningQueryInfo {
GridQueryCancel cancel,
boolean loc,
String qryInitiatorId,
+ boolean enforceJoinOrder,
+ boolean lazy,
+ boolean distributedJoins,
UUID subjId
) {
this.id = id;
@@ -106,6 +125,9 @@ public class GridRunningQueryInfo {
this.loc = loc;
this.span = MTC.span();
this.qryInitiatorId = qryInitiatorId;
+ this.enforceJoinOrder = enforceJoinOrder;
+ this.lazy = lazy;
+ this.distributedJoins = distributedJoins;
this.subjId = subjId;
}
@@ -223,6 +245,27 @@ public class GridRunningQueryInfo {
return qryInitiatorId;
}
+ /**
+ * @return Distributed joins.
+ */
+ public boolean distributedJoins() {
+ return distributedJoins;
+ }
+
+ /**
+ * @return Enforce join order flag.
+ */
+ public boolean enforceJoinOrder() {
+ return enforceJoinOrder;
+ }
+
+ /**
+ * @return Lazy flag.
+ */
+ public boolean lazy() {
+ return lazy;
+ }
+
/** @param reqId Request ID. */
public void requestId(long reqId) {
this.reqId = reqId;
@@ -232,4 +275,9 @@ public class GridRunningQueryInfo {
public UUID subjectId() {
return subjId;
}
+
+ /**{@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridRunningQueryInfo.class, this);
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/RunningQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/RunningQueryManager.java
index 11f1c8709df..c98bf4cf87c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/RunningQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/RunningQueryManager.java
@@ -25,10 +25,13 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Consumer;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cluster.ClusterNode;
@@ -43,6 +46,7 @@ import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.managers.systemview.walker.SqlQueryHistoryViewWalker;
import org.apache.ignite.internal.managers.systemview.walker.SqlQueryViewWalker;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
+import org.apache.ignite.internal.processors.closure.GridClosureProcessor;
import org.apache.ignite.internal.processors.metric.MetricRegistry;
import org.apache.ignite.internal.processors.metric.impl.AtomicLongMetric;
import org.apache.ignite.internal.processors.metric.impl.LongAdderMetric;
@@ -53,6 +57,7 @@ import org.apache.ignite.internal.util.GridSpinBusyLock;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridPlainRunnable;
import org.apache.ignite.internal.util.typedef.CIX2;
+import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.plugin.extensions.communication.Message;
@@ -89,6 +94,9 @@ public class RunningQueryManager {
/** Undefined query ID value. */
public static final long UNDEFINED_QUERY_ID = 0L;
+ /** */
+ private final GridClosureProcessor closure;
+
/** Keep registered user queries. */
private final ConcurrentMap<Long, GridRunningQueryInfo> runs = new ConcurrentHashMap<>();
@@ -147,6 +155,12 @@ public class RunningQueryManager {
}
};
+ /** */
+ private final List<Consumer<GridQueryStartedInfo>> qryStartedListeners = new CopyOnWriteArrayList<>();
+
+ /** */
+ private final List<Consumer<GridQueryFinishedInfo>> qryFinishedListeners = new CopyOnWriteArrayList<>();
+
/**
* Constructor.
*
@@ -160,6 +174,7 @@ public class RunningQueryManager {
localNodeId = ctx.localNodeId();
histSz = ctx.config().getSqlConfiguration().getSqlQueryHistorySize();
+ closure = ctx.closure();
qryHistTracker = new QueryHistoryTracker(histSz);
@@ -229,11 +244,14 @@ public class RunningQueryManager {
* @param schemaName Schema name.
* @param loc Local query flag.
* @param cancel Query cancel. Should be passed in case query is cancelable, or {@code null} otherwise.
+ * @param enforceJoinOrder Enforce join order flag.
+ * @param lazy Lazy flag.
+ * @param distributedJoins Distributed joins flag.
* @return Id of registered query. Id is a positive number.
*/
public long register(String qry, GridCacheQueryType qryType, String schemaName, boolean loc,
@Nullable GridQueryCancel cancel,
- String qryInitiatorId) {
+ String qryInitiatorId, boolean enforceJoinOrder, boolean lazy, boolean distributedJoins) {
long qryId = qryIdGen.incrementAndGet();
if (qryInitiatorId == null)
@@ -250,6 +268,9 @@ public class RunningQueryManager {
cancel,
loc,
qryInitiatorId,
+ enforceJoinOrder,
+ lazy,
+ distributedJoins,
securitySubjectId(ctx)
);
@@ -262,6 +283,41 @@ public class RunningQueryManager {
run.span().addTag(SQL_QRY_ID, run::globalQueryId);
+ if (!qryStartedListeners.isEmpty()) {
+ GridQueryStartedInfo info = new GridQueryStartedInfo(
+ run.id(),
+ localNodeId,
+ run.query(),
+ run.queryType(),
+ run.schemaName(),
+ run.startTime(),
+ run.cancelable(),
+ run.local(),
+ run.enforceJoinOrder(),
+ run.lazy(),
+ run.distributedJoins(),
+ run.queryInitiatorId()
+ );
+
+ try {
+ closure.runLocal(
+ () -> qryStartedListeners.forEach(lsnr -> {
+ try {
+ lsnr.accept(info);
+ }
+ catch (Exception ex) {
+ log.error("Listener fails during handling query started" +
+ " event [qryId=" + qryId + "]", ex);
+ }
+ }),
+ GridIoPolicy.PUBLIC_POOL
+ );
+ }
+ catch (IgniteCheckedException ex) {
+ throw new IgniteException(ex.getMessage(), ex);
+ }
+ }
+
return qryId;
}
@@ -289,6 +345,43 @@ public class RunningQueryManager {
if (failed)
qrySpan.addTag(ERROR, failReason::getMessage);
+ if (!qryFinishedListeners.isEmpty()) {
+ GridQueryFinishedInfo info = new GridQueryFinishedInfo(
+ qry.id(),
+ localNodeId,
+ qry.query(),
+ qry.queryType(),
+ qry.schemaName(),
+ qry.startTime(),
+ System.currentTimeMillis(),
+ qry.local(),
+ qry.enforceJoinOrder(),
+ qry.lazy(),
+ qry.distributedJoins(),
+ failed,
+ failReason,
+ qry.queryInitiatorId()
+ );
+
+ try {
+ closure.runLocal(
+ () -> qryFinishedListeners.forEach(lsnr -> {
+ try {
+ lsnr.accept(info);
+ }
+ catch (Exception ex) {
+ log.error("Listener fails during handling query finished" +
+ " event [qryId=" + qryId + "]", ex);
+ }
+ }),
+ GridIoPolicy.PUBLIC_POOL
+ );
+ }
+ catch (IgniteCheckedException ex) {
+ throw new IgniteException(ex.getMessage(), ex);
+ }
+ }
+
//We need to collect query history and metrics only for SQL queries.
if (isSqlQuery(qry)) {
qry.runningFuture().onDone();
@@ -349,6 +442,44 @@ public class RunningQueryManager {
return res;
}
+ /**
+ * @param lsnr Listener.
+ */
+ public void registerQueryStartedListener(Consumer<GridQueryStartedInfo> lsnr) {
+ A.notNull(lsnr, "lsnr");
+
+ qryStartedListeners.add(lsnr);
+ }
+
+ /**
+ * @param lsnr Listener.
+ */
+ @SuppressWarnings("SuspiciousMethodCalls")
+ public boolean unregisterQueryStartedListener(Object lsnr) {
+ A.notNull(lsnr, "lsnr");
+
+ return qryStartedListeners.remove(lsnr);
+ }
+
+ /**
+ * @param lsnr Listener.
+ */
+ public void registerQueryFinishedListener(Consumer<GridQueryFinishedInfo> lsnr) {
+ A.notNull(lsnr, "lsnr");
+
+ qryFinishedListeners.add(lsnr);
+ }
+
+ /**
+ * @param lsnr Listener.
+ */
+ @SuppressWarnings("SuspiciousMethodCalls")
+ public boolean unregisterQueryFinishedListener(Object lsnr) {
+ A.notNull(lsnr, "lsnr");
+
+ return qryFinishedListeners.remove(lsnr);
+ }
+
/**
* Check belongs running query to an SQL type.
*
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index a208ea0f3de..2463cba9677 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -31,6 +31,7 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
import javax.cache.CacheException;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteDataStreamer;
@@ -84,7 +85,9 @@ import org.apache.ignite.internal.processors.query.GridQueryCancel;
import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
import org.apache.ignite.internal.processors.query.GridQueryFieldsResult;
import org.apache.ignite.internal.processors.query.GridQueryFieldsResultAdapter;
+import org.apache.ignite.internal.processors.query.GridQueryFinishedInfo;
import org.apache.ignite.internal.processors.query.GridQueryIndexing;
+import org.apache.ignite.internal.processors.query.GridQueryStartedInfo;
import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
import org.apache.ignite.internal.processors.query.QueryField;
@@ -359,13 +362,29 @@ public class IgniteH2Indexing implements GridQueryIndexing {
H2TableDescriptor tbl = schemaMgr.tableForType(schemaName, cacheName, typeName);
if (tbl != null && tbl.luceneIndex() != null) {
- long qryId = runningQueryManager().register(qry, TEXT, schemaName, true, null, null);
+ long qryId = runningQueryManager().register(
+ qry,
+ TEXT,
+ schemaName,
+ true,
+ null,
+ null,
+ false,
+ false,
+ false
+ );
+ Throwable failReason = null;
try {
return tbl.luceneIndex().query(qry.toUpperCase(), filters, limit);
}
+ catch (Throwable t) {
+ failReason = t;
+
+ throw t;
+ }
finally {
- runningQueryManager().unregister(qryId, null);
+ runningQueryManager().unregister(qryId, failReason);
}
}
@@ -584,7 +603,10 @@ public class IgniteH2Indexing implements GridQueryIndexing {
schemaName,
true,
null,
- qryInitiatorId
+ qryInitiatorId,
+ false,
+ false,
+ false
);
Exception failReason = null;
@@ -1459,7 +1481,10 @@ public class IgniteH2Indexing implements GridQueryIndexing {
qryDesc.schemaName(),
qryDesc.local(),
cancel,
- qryDesc.queryInitiatorId()
+ qryDesc.queryInitiatorId(),
+ qryDesc.enforceJoinOrder(),
+ qryParams.lazy(),
+ qryDesc.distributedJoins()
);
if (ctx.event().isRecordable(EVT_SQL_QUERY_EXECUTION)) {
@@ -1508,6 +1533,34 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
}
+ /**
+ * @param lsnr Listener.
+ */
+ public void registerQueryStartedListener(Consumer<GridQueryStartedInfo> lsnr) {
+ runningQueryManager().registerQueryStartedListener(lsnr);
+ }
+
+ /**
+ * @param lsnr Listener.
+ */
+ public boolean unregisterQueryStartedListener(Object lsnr) {
+ return runningQueryManager().unregisterQueryStartedListener(lsnr);
+ }
+
+ /**
+ * @param lsnr Listener.
+ */
+ public void registerQueryFinishedListener(Consumer<GridQueryFinishedInfo> lsnr) {
+ runningQueryManager().registerQueryFinishedListener(lsnr);
+ }
+
+ /**
+ * @param lsnr Listener.
+ */
+ public boolean unregisterQueryFinishedListener(Object lsnr) {
+ return runningQueryManager().unregisterQueryFinishedListener(lsnr);
+ }
+
/** {@inheritDoc} */
@Override public UpdateSourceIterator<?> executeUpdateOnDataNodeTransactional(
GridCacheContext<?, ?> cctx,
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/IgniteSqlQueryStartFinishListenerTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/IgniteSqlQueryStartFinishListenerTest.java
new file mode 100644
index 00000000000..db129abe902
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/IgniteSqlQueryStartFinishListenerTest.java
@@ -0,0 +1,495 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cache.query.TextQuery;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.processors.cache.index.AbstractIndexingCommonTest;
+import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
+import org.apache.ignite.internal.processors.query.GridQueryFinishedInfo;
+import org.apache.ignite.internal.processors.query.GridQueryStartedInfo;
+import org.apache.ignite.internal.processors.query.IgniteSQLException;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.hamcrest.CustomMatcher;
+import org.hamcrest.Matcher;
+import org.junit.After;
+import org.junit.Test;
+
+import static org.apache.ignite.internal.processors.query.QueryUtils.SCHEMA_SYS;
+import static org.hamcrest.core.Is.is;
+import static org.hamcrest.core.IsEqual.equalTo;
+import static org.hamcrest.core.IsNull.notNullValue;
+import static org.junit.Assert.assertThat;
+
+/** Test for SQL query listeners. */
+public class IgniteSqlQueryStartFinishListenerTest extends AbstractIndexingCommonTest {
+ /** Client node name. */
+ private static final String CLIENT_NODE_NAME = "CLIENT_NODE";
+
+ /** Client node name. */
+ private static final String SERVER_NODE_NAME = "SERVER_NODE";
+
+ /** Listeners. */
+ private final List<Object> lsnrs = new ArrayList<>();
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ startGrid(SERVER_NODE_NAME);
+ startClientGrid(CLIENT_NODE_NAME);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ super.afterTestsStopped();
+
+ stopAllGrids();
+ }
+
+ /** */
+ @After
+ public void unregisterListeners() {
+ lsnrs.forEach(indexing()::unregisterQueryFinishedListener);
+ lsnrs.forEach(indexing()::unregisterQueryStartedListener);
+
+ lsnrs.clear();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ return super.getConfiguration(gridName)
+ .setSqlSchemas("TEST1")
+ .setCacheConfiguration(
+ new CacheConfiguration<String, String>(DEFAULT_CACHE_NAME)
+ .setQueryEntities(Collections.singleton(new QueryEntity(String.class, String.class)))
+ .setSqlFunctionClasses(GridTestUtils.SqlTestFunctions.class)
+ );
+ }
+
+ /**
+ * Ensure you could register and unregister a listener for query start/finish events:
+ * - register listeners
+ * - execute a query
+ * - ensure both listeneres were notified
+ * - unregister the query start listener
+ * - run a query one more time
+ * - ensure only one listener was notified
+ * - unregister the query finish listener and register new one
+ * - run a query one more time
+ * - ensure only new listener was notified
+ *
+ * @throws Exception In case of error.
+ */
+ @Test
+ public void testRegisterUnregisterQueryListeners() throws Exception {
+ final AtomicInteger qryStarted = new AtomicInteger();
+ final AtomicInteger qryFinished = new AtomicInteger();
+
+ final Consumer<GridQueryStartedInfo> qryStartedLsnr = registerQueryStartedListener(info -> qryStarted.incrementAndGet());
+ final Consumer<GridQueryFinishedInfo> qryFinishedLsnr = registerQueryFinishedListener(info -> qryFinished.incrementAndGet());
+
+ {
+ execSql(SCHEMA_SYS, "select * from caches");
+
+ assertWithTimeout(qryStarted::get, is(equalTo(1)), 1_000);
+ assertWithTimeout(qryFinished::get, is(equalTo(1)), 1_000);
+ }
+
+ {
+ assertTrue(indexing().unregisterQueryStartedListener(qryStartedLsnr));
+
+ execSql(SCHEMA_SYS, "select * from caches");
+
+ assertWithTimeout(qryFinished::get, is(equalTo(2)), 1_000);
+ assertWithTimeout(qryStarted::get, is(equalTo(1)), 1_000);
+ }
+
+ {
+ assertTrue(indexing().unregisterQueryFinishedListener(qryFinishedLsnr));
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ registerQueryFinishedListener(info -> latch.countDown());
+
+ execSql(SCHEMA_SYS, "select * from caches");
+
+ latch.await(1, TimeUnit.SECONDS);
+
+ assertWithTimeout(qryFinished::get, is(equalTo(2)), 1_000);
+ assertWithTimeout(qryStarted::get, is(equalTo(1)), 1_000);
+ }
+ }
+
+ /**
+ * Ensure listeners are notified with an actual query info:
+ * - register listeners
+ * - execute different queries
+ * - verify query info passed to listeners
+ */
+ @Test
+ @SuppressWarnings("ThrowableNotThrown")
+ public void testVerifyQueryInfoPassedToListeners() throws Exception {
+ final AtomicReference<GridQueryStartedInfo> qryStarted = new AtomicReference<>();
+ final AtomicReference<GridQueryFinishedInfo> qryFinished = new AtomicReference<>();
+
+ registerQueryStartedListener(qryStarted::set);
+ registerQueryFinishedListener(qryFinished::set);
+
+ {
+ final long delay = 100;
+ final String qry = "select * from caches where ? = \"default\".delay(?) limit 1";
+
+ execSql(SCHEMA_SYS, qry, delay, delay);
+
+ assertWithTimeout(qryStarted::get, is(notNullValue()), 1_000);
+
+ GridQueryStartedInfo startedInfo = qryStarted.get();
+ assertEquals(SCHEMA_SYS, startedInfo.schemaName());
+ assertEquals(qry, startedInfo.query());
+ assertEquals(grid(SERVER_NODE_NAME).localNode().id(), startedInfo.nodeId());
+ assertEquals(false, startedInfo.local());
+ assertEquals(GridCacheQueryType.SQL_FIELDS, startedInfo.queryType());
+
+ assertWithTimeout(qryFinished::get, is(notNullValue()), 1_000);
+
+ GridQueryFinishedInfo finishedInfo = qryFinished.get();
+ assertEquals(SCHEMA_SYS, finishedInfo.schemaName());
+ assertEquals(qry, finishedInfo.query());
+ assertEquals(grid(SERVER_NODE_NAME).localNode().id(), startedInfo.nodeId());
+ assertEquals(false, finishedInfo.local());
+ assertEquals(GridCacheQueryType.SQL_FIELDS, finishedInfo.queryType());
+ assertEquals(false, finishedInfo.failed());
+ assertThat(finishedInfo.finishTime() - finishedInfo.startTime(), is(greaterOrEqualTo(delay)));
+
+ qryStarted.set(null);
+ qryFinished.set(null);
+ }
+
+ {
+ final String schema = "TEST1";
+ final String qry = "select \"default\".can_fail() from " + SCHEMA_SYS + ".caches where ? = ? limit 1";
+
+ GridTestUtils.SqlTestFunctions.fail = true;
+
+ GridTestUtils.assertThrowsWithCause(() -> execSqlLocal(schema, qry, 1, 1), IgniteSQLException.class);
+
+ assertWithTimeout(qryStarted::get, is(notNullValue()), 1_000);
+
+ GridQueryStartedInfo startedInfo = qryStarted.get();
+ assertEquals(schema, startedInfo.schemaName());
+ assertEquals(qry, startedInfo.query());
+ assertEquals(grid(SERVER_NODE_NAME).localNode().id(), startedInfo.nodeId());
+ assertEquals(true, startedInfo.local());
+ assertEquals(GridCacheQueryType.SQL_FIELDS, startedInfo.queryType());
+
+ assertWithTimeout(qryFinished::get, is(notNullValue()), 1_000);
+
+ GridQueryFinishedInfo finishedInfo = qryFinished.get();
+ assertEquals(schema, finishedInfo.schemaName());
+ assertEquals(qry, finishedInfo.query());
+ assertEquals(grid(SERVER_NODE_NAME).localNode().id(), startedInfo.nodeId());
+ assertEquals(true, finishedInfo.local());
+ assertEquals(GridCacheQueryType.SQL_FIELDS, finishedInfo.queryType());
+ assertEquals(true, finishedInfo.failed());
+ assertThat(finishedInfo.finishTime(), is(greaterOrEqualTo(finishedInfo.startTime())));
+
+ qryStarted.set(null);
+ qryFinished.set(null);
+ }
+
+ {
+ final String qry = "text query";
+
+ IgniteCache<String, Object> cache = grid(CLIENT_NODE_NAME).cache(DEFAULT_CACHE_NAME);
+
+ cache.query(new TextQuery<String, String>(String.class, "text query"));
+
+ assertWithTimeout(qryStarted::get, is(notNullValue()), 1_000);
+
+ GridQueryStartedInfo startedInfo = qryStarted.get();
+ assertEquals(cache.getName(), startedInfo.schemaName());
+ assertEquals(qry, startedInfo.query());
+ assertEquals(grid(SERVER_NODE_NAME).localNode().id(), startedInfo.nodeId());
+ assertEquals(true, startedInfo.local());
+ assertEquals(GridCacheQueryType.TEXT, startedInfo.queryType());
+
+ assertWithTimeout(qryFinished::get, is(notNullValue()), 1_000);
+
+ GridQueryFinishedInfo finishedInfo = qryFinished.get();
+ assertEquals(cache.getName(), finishedInfo.schemaName());
+ assertEquals(qry, finishedInfo.query());
+ assertEquals(grid(SERVER_NODE_NAME).localNode().id(), startedInfo.nodeId());
+ assertEquals(true, finishedInfo.local());
+ assertEquals(GridCacheQueryType.TEXT, finishedInfo.queryType());
+ assertEquals(false, finishedInfo.failed());
+ assertThat(finishedInfo.finishTime(), is(greaterOrEqualTo(finishedInfo.startTime())));
+
+ qryStarted.set(null);
+ qryFinished.set(null);
+ }
+ }
+
+ /**
+ * Ensure listeners do not block query execution
+ * - register blocking listeners
+ * - execute a lot of queries
+ * - verify all queries finished while listeners is still blocked
+ */
+ @Test
+ public void testListeneresNotBlocksQueryExecution() throws IgniteCheckedException {
+ final CountDownLatch latch = new CountDownLatch(1);
+ final AtomicInteger lsnrCalls = new AtomicInteger();
+
+ final int quryRuns = 1_000;
+ final int threadCnt = 20;
+
+ registerQueryStartedListener(info -> {
+ try {
+ latch.await();
+ }
+ catch (InterruptedException ignored) {
+ }
+
+ lsnrCalls.incrementAndGet();
+ });
+ registerQueryFinishedListener(info -> {
+ try {
+ latch.await();
+ }
+ catch (InterruptedException ignored) {
+ }
+
+ lsnrCalls.incrementAndGet();
+ });
+
+ IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(() -> {
+ for (int i = 0; i < quryRuns; i++)
+ execSql(SCHEMA_SYS, "select * from caches");
+ }, threadCnt, "test-async-query-runner");
+
+ try {
+ fut.get(15_000);
+ }
+ finally {
+ latch.countDown();
+ }
+
+ assertWithTimeout(lsnrCalls::get, is(equalTo(2 * threadCnt * quryRuns)), 15_000);
+ }
+
+ /**
+ * Ensure notification chain is not interrupted if exception was thrown in the middle of the chain
+ * - register several listeners such the listener will throw exception if condition is met
+ * - execute query several times, one of the listeners from each chain should fail
+ * - verify all other listeners were notified
+ */
+ @Test
+ public void testFailedListenereNotAffectOthers() throws IgniteCheckedException {
+ final int lsnrCnt = 3;
+ final long waitTimeout = 1_000;
+
+ boolean[] startLsnrsNotified = new boolean[lsnrCnt];
+ boolean[] finishLsnrsNotified = new boolean[lsnrCnt];
+ boolean[] startLsnrShouldFail = new boolean[lsnrCnt];
+ boolean[] finishLsnrShouldFail = new boolean[lsnrCnt];
+
+ for (int i = 0; i < lsnrCnt; i++) {
+ final int lsnNo = i;
+
+ registerQueryStartedListener(info -> {
+ if (startLsnrShouldFail[lsnNo]) {
+ startLsnrShouldFail[lsnNo] = false;
+
+ throw new RuntimeException("Start listener fails");
+ }
+
+ startLsnrsNotified[lsnNo] = true;
+ });
+
+ registerQueryFinishedListener(info -> {
+ if (finishLsnrShouldFail[lsnNo]) {
+ finishLsnrShouldFail[lsnNo] = false;
+
+ throw new RuntimeException("Finish listener fails");
+ }
+
+ finishLsnrsNotified[lsnNo] = true;
+ });
+ }
+
+ {
+ startLsnrShouldFail[0] = true;
+ finishLsnrShouldFail[1] = true;
+
+ execSql(SCHEMA_SYS, "select * from caches");
+
+ assertWithTimeout(() -> startLsnrsNotified[0], isFalse(), waitTimeout);
+ assertWithTimeout(() -> startLsnrsNotified[1], isTrue(), waitTimeout);
+ assertWithTimeout(() -> startLsnrsNotified[2], isTrue(), waitTimeout);
+
+ assertWithTimeout(() -> finishLsnrsNotified[0], isTrue(), waitTimeout);
+ assertWithTimeout(() -> finishLsnrsNotified[1], isFalse(), waitTimeout);
+ assertWithTimeout(() -> finishLsnrsNotified[2], isTrue(), waitTimeout);
+
+ resetListeners(startLsnrsNotified, finishLsnrsNotified);
+ }
+
+ {
+ startLsnrShouldFail[1] = true;
+ finishLsnrShouldFail[2] = true;
+
+ execSql(SCHEMA_SYS, "select * from caches");
+
+ assertWithTimeout(() -> startLsnrsNotified[0], isTrue(), waitTimeout);
+ assertWithTimeout(() -> startLsnrsNotified[1], isFalse(), waitTimeout);
+ assertWithTimeout(() -> startLsnrsNotified[2], isTrue(), waitTimeout);
+
+ assertWithTimeout(() -> finishLsnrsNotified[0], isTrue(), waitTimeout);
+ assertWithTimeout(() -> finishLsnrsNotified[1], isTrue(), waitTimeout);
+ assertWithTimeout(() -> finishLsnrsNotified[2], isFalse(), waitTimeout);
+
+ resetListeners(startLsnrsNotified, finishLsnrsNotified);
+ }
+
+ {
+ startLsnrShouldFail[2] = true;
+ finishLsnrShouldFail[0] = true;
+
+ execSql(SCHEMA_SYS, "select * from caches");
+
+ assertWithTimeout(() -> startLsnrsNotified[0], isTrue(), waitTimeout);
+ assertWithTimeout(() -> startLsnrsNotified[1], isTrue(), waitTimeout);
+ assertWithTimeout(() -> startLsnrsNotified[2], isFalse(), waitTimeout);
+
+ assertWithTimeout(() -> finishLsnrsNotified[0], isFalse(), waitTimeout);
+ assertWithTimeout(() -> finishLsnrsNotified[1], isTrue(), waitTimeout);
+ assertWithTimeout(() -> finishLsnrsNotified[2], isTrue(), waitTimeout);
+
+ resetListeners(startLsnrsNotified, finishLsnrsNotified);
+ }
+ }
+
+ /**
+ * Sets all elements from each array to {@code false}.
+ */
+ private void resetListeners(boolean[]... arrays) {
+ for (boolean[] arr : arrays)
+ Arrays.fill(arr, false);
+ }
+
+ /** */
+ private IgniteH2Indexing indexing() {
+ return (IgniteH2Indexing)grid(SERVER_NODE_NAME).context().query().getIndexing();
+ }
+
+ /** */
+ private List<List<?>> execSql(String schema, String sql, Object... args) {
+ return grid(SERVER_NODE_NAME).cache(DEFAULT_CACHE_NAME).query(
+ new SqlFieldsQuery(sql).setSchema(schema).setArgs(args).setLocal(false)
+ ).getAll();
+ }
+
+ /** */
+ private List<List<?>> execSqlLocal(String schema, String sql, Object... args) {
+ return grid(SERVER_NODE_NAME).cache(DEFAULT_CACHE_NAME).query(
+ new SqlFieldsQuery(sql).setSchema(schema).setArgs(args).setLocal(true)
+ ).getAll();
+ }
+
+ /**
+ * @param lsnr Listener.
+ */
+ private Consumer<GridQueryStartedInfo> registerQueryStartedListener(Consumer<GridQueryStartedInfo> lsnr) {
+ lsnrs.add(lsnr);
+
+ indexing().registerQueryStartedListener(lsnr);
+
+ return lsnr;
+ }
+
+ /**
+ * @param lsnr Listener.
+ */
+ private Consumer<GridQueryFinishedInfo> registerQueryFinishedListener(Consumer<GridQueryFinishedInfo> lsnr) {
+ lsnrs.add(lsnr);
+
+ indexing().registerQueryFinishedListener(lsnr);
+
+ return lsnr;
+ }
+
+ /**
+ * @param actualSupplier Supplier for value to check.
+ * @param matcher Matcher.
+ * @param timeout Timeout.
+ */
+ private <T> void assertWithTimeout(Supplier<T> actualSupplier, Matcher<? super T> matcher, long timeout)
+ throws IgniteInterruptedCheckedException {
+ GridTestUtils.waitForCondition(() -> matcher.matches(actualSupplier.get()), timeout);
+
+ assertThat(actualSupplier.get(), matcher);
+ }
+
+ /**
+ * @param wanted Wanted.
+ */
+ private static <T extends Comparable<? super T>> Matcher<T> greaterOrEqualTo(T wanted) {
+ return new CustomMatcher<T>("should be greater or equal to " + wanted) {
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ @Override public boolean matches(Object item) {
+ return wanted != null && item instanceof Comparable && ((Comparable)item).compareTo(wanted) >= 0;
+ }
+ };
+ }
+
+ /** */
+ private static Matcher<Boolean> isTrue() {
+ return new CustomMatcher<Boolean>("should be true ") {
+ @Override public boolean matches(Object item) {
+ return item instanceof Boolean && (Boolean)item;
+ }
+ };
+ }
+
+ /** */
+ private static Matcher<Boolean> isFalse() {
+ return new CustomMatcher<Boolean>("should be true ") {
+ @Override public boolean matches(Object item) {
+ return item instanceof Boolean && !(Boolean)item;
+ }
+ };
+ }
+}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
index c2722a15e0a..24dc627847a 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
@@ -137,6 +137,7 @@ import org.apache.ignite.internal.processors.query.h2.GridSubqueryJoinOptimizerS
import org.apache.ignite.internal.processors.query.h2.H2ResultSetIteratorNullifyOnEndSelfTest;
import org.apache.ignite.internal.processors.query.h2.IgniteSqlBigIntegerKeyTest;
import org.apache.ignite.internal.processors.query.h2.IgniteSqlQueryMinMaxTest;
+import org.apache.ignite.internal.processors.query.h2.IgniteSqlQueryStartFinishListenerTest;
import org.apache.ignite.internal.processors.query.h2.QueryDataPageScanTest;
import org.apache.ignite.internal.processors.query.h2.sql.ExplainSelfTest;
import org.apache.ignite.internal.processors.query.h2.sql.GridQueryParsingTest;
@@ -302,6 +303,7 @@ import org.junit.runners.Suite;
IgniteCacheMultipleIndexedTypesTest.class,
IgniteSqlQueryMinMaxTest.class,
+ IgniteSqlQueryStartFinishListenerTest.class,
GridCircularQueueTest.class,
IndexingSpiQueryWithH2IndexingSelfTest.class,