You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by an...@apache.org on 2022/09/05 09:18:56 UTC

[ignite] branch master updated: IGNITE-17594 Provide ability to register listeners for query start/finish events. (#10227)

This is an automated email from the ASF dual-hosted git repository.

anovikov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new d9640648cdf IGNITE-17594 Provide ability to register listeners for query start/finish events. (#10227)
d9640648cdf is described below

commit d9640648cdfc87959d9fe80f67e22d2ba5438379
Author: Andrey Novikov <an...@gridgain.com>
AuthorDate: Mon Sep 5 16:18:40 2022 +0700

    IGNITE-17594 Provide ability to register listeners for query start/finish events. (#10227)
---
 .../query/calcite/QueryRegistryImpl.java           |   2 +-
 ...ngQueryInfo.java => GridQueryFinishedInfo.java} | 151 +++----
 ...ingQueryInfo.java => GridQueryStartedInfo.java} | 134 ++----
 .../processors/query/GridRunningQueryInfo.java     |  48 ++
 .../processors/query/RunningQueryManager.java      | 133 +++++-
 .../processors/query/h2/IgniteH2Indexing.java      |  61 ++-
 .../h2/IgniteSqlQueryStartFinishListenerTest.java  | 495 +++++++++++++++++++++
 .../IgniteBinaryCacheQueryTestSuite.java           |   2 +
 8 files changed, 854 insertions(+), 172 deletions(-)

diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/QueryRegistryImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/QueryRegistryImpl.java
index 58c5df1bf25..d023a258e1a 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/QueryRegistryImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/QueryRegistryImpl.java
@@ -63,7 +63,7 @@ public class QueryRegistryImpl extends AbstractService implements QueryRegistry
             String initiatorId = fieldsQry != null ? fieldsQry.getQueryInitiatorId() : null;
 
             long locId = qryMgr.register(rootQry.sql(), GridCacheQueryType.SQL_FIELDS, rootQry.context().schemaName(),
-                false, createCancelToken(qry), initiatorId);
+                false, createCancelToken(qry), initiatorId, false, false, false);
 
             rootQry.localQueryId(locId);
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryFinishedInfo.java
similarity index 58%
copy from modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java
copy to modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryFinishedInfo.java
index bdc29e9f6ae..8983f8230a4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryFinishedInfo.java
@@ -19,13 +19,13 @@ package org.apache.ignite.internal.processors.query;
 
 import java.util.UUID;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
-import org.apache.ignite.internal.processors.tracing.MTC;
-import org.apache.ignite.internal.processors.tracing.Span;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
 
 /**
- * Query descriptor.
+ * Info about finished query.
  */
-public class GridRunningQueryInfo {
+public class GridQueryFinishedInfo {
     /** */
     private final long id;
 
@@ -44,29 +44,30 @@ public class GridRunningQueryInfo {
     /** */
     private final long startTime;
 
-    /** Query start time in nanoseconds to measure duration. */
-    private final long startTimeNanos;
-
     /** */
-    private final GridQueryCancel cancel;
+    private final long finishTime;
 
     /** */
     private final boolean loc;
 
-    /** */
-    private final QueryRunningFuture fut = new QueryRunningFuture();
+    /** Enforce join order query flag. */
+    private boolean enforceJoinOrder;
 
-    /** Span of the running query. */
-    private final Span span;
+    /** Lazy query flag. */
+    private boolean lazy;
 
-    /** Originator. */
-    private final String qryInitiatorId;
+    /** Distributed joins query flag. */
+    private boolean distributedJoins;
 
-    /** Request ID. */
-    private long reqId;
+    /** Whether query is failed or not. */
+    private final boolean failed;
 
-    /** Subject ID. */
-    private final UUID subjId;
+    /** Exception that caused query execution fail. */
+    @Nullable
+    private final Throwable failReason;
+
+    /** Originator. */
+    private final String qryInitiatorId;
 
     /**
      * Constructor.
@@ -77,23 +78,30 @@ public class GridRunningQueryInfo {
      * @param qryType Query type.
      * @param schemaName Schema name.
      * @param startTime Query start time.
-     * @param startTimeNanos Query start time in nanoseconds.
-     * @param cancel Query cancel.
+     * @param finishTime Query finish time.
      * @param loc Local query flag.
-     * @param subjId Subject ID.
-     */
-    public GridRunningQueryInfo(
-        long id,
+     * @param enforceJoinOrder Local query flag.
+     * @param lazy Local query flag.
+     * @param distributedJoins Local query flag.
+     * @param failed Whether query is failed or not.
+     * @param failReason Exception that caused query execution fail.
+     * @param qryInitiatorId Query's initiator identifier.
+     */
+    public GridQueryFinishedInfo(
+        Long id,
         UUID nodeId,
         String qry,
         GridCacheQueryType qryType,
         String schemaName,
         long startTime,
-        long startTimeNanos,
-        GridQueryCancel cancel,
+        long finishTime,
         boolean loc,
-        String qryInitiatorId,
-        UUID subjId
+        boolean enforceJoinOrder,
+        boolean lazy,
+        boolean distributedJoins,
+        boolean failed,
+        @Nullable Throwable failReason,
+        String qryInitiatorId
     ) {
         this.id = id;
         this.nodeId = nodeId;
@@ -101,26 +109,28 @@ public class GridRunningQueryInfo {
         this.qryType = qryType;
         this.schemaName = schemaName;
         this.startTime = startTime;
-        this.startTimeNanos = startTimeNanos;
-        this.cancel = cancel;
+        this.finishTime = finishTime;
         this.loc = loc;
-        this.span = MTC.span();
+        this.enforceJoinOrder = enforceJoinOrder;
+        this.lazy = lazy;
+        this.distributedJoins = distributedJoins;
+        this.failed = failed;
+        this.failReason = failReason;
         this.qryInitiatorId = qryInitiatorId;
-        this.subjId = subjId;
     }
 
     /**
      * @return Query ID.
      */
-    public long id() {
+    public Long id() {
         return id;
     }
 
     /**
-     * @return Global query ID.
+     * @return Node ID.
      */
-    public String globalQueryId() {
-        return QueryUtils.globalQueryId(nodeId, id);
+    public UUID nodeId() {
+        return nodeId;
     }
 
     /**
@@ -152,67 +162,53 @@ public class GridRunningQueryInfo {
     }
 
     /**
-     * @return Query start time in nanoseconds.
-     */
-    public long startTimeNanos() {
-        return startTimeNanos;
-    }
-
-    /**
-     * @param curTime Current time.
-     * @param duration Duration of long query.
-     * @return {@code true} if this query should be considered as long running query.
+     * @return Query finish time.
      */
-    public boolean longQuery(long curTime, long duration) {
-        return curTime - startTime > duration;
+    public long finishTime() {
+        return finishTime;
     }
 
     /**
-     * Cancel query.
+     * @return {@code true} if query is local.
      */
-    public void cancel() {
-        if (cancel != null)
-            cancel.cancel();
+    public boolean local() {
+        return loc;
     }
 
     /**
-     * @return Query running future.
+     * @return Enforce join order flag.
      */
-    public QueryRunningFuture runningFuture() {
-        return fut;
+    public boolean enforceJoinOrder() {
+        return enforceJoinOrder;
     }
 
     /**
-     * @return {@code true} if query can be cancelled.
+     * @return Lazy flag.
      */
-    public boolean cancelable() {
-        return cancel != null;
+    public boolean lazy() {
+        return lazy;
     }
 
     /**
-     * @return {@code true} if query is local.
+     * @return Distributed joins.
      */
-    public boolean local() {
-        return loc;
+    public boolean distributedJoins() {
+        return distributedJoins;
     }
 
     /**
-     * @return Originating node ID.
+     * @return {@code true} if query is failed.
      */
-    public UUID nodeId() {
-        return nodeId;
+    public boolean failed() {
+        return failed;
     }
 
     /**
-     * @return Span of the running query.
+     * @return Exception that caused query execution fail, or {@code null} if query succeded.
      */
-    public Span span() {
-        return span;
-    }
-
-    /** @return Request ID. */
-    public long requestId() {
-        return reqId;
+    @Nullable
+    public Throwable failReason() {
+        return failReason;
     }
 
     /**
@@ -223,13 +219,8 @@ public class GridRunningQueryInfo {
         return qryInitiatorId;
     }
 
-    /** @param reqId Request ID. */
-    public void requestId(long reqId) {
-        this.reqId = reqId;
-    }
-
-    /** @return Subject ID. */
-    public UUID subjectId() {
-        return subjId;
+    /**{@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridQueryFinishedInfo.class, this);
     }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryStartedInfo.java
similarity index 58%
copy from modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java
copy to modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryStartedInfo.java
index bdc29e9f6ae..f0e83794dbd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryStartedInfo.java
@@ -19,13 +19,12 @@ package org.apache.ignite.internal.processors.query;
 
 import java.util.UUID;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
-import org.apache.ignite.internal.processors.tracing.MTC;
-import org.apache.ignite.internal.processors.tracing.Span;
+import org.apache.ignite.internal.util.typedef.internal.S;
 
 /**
- * Query descriptor.
+ * Info about new started query.
  */
-public class GridRunningQueryInfo {
+public class GridQueryStartedInfo {
     /** */
     private final long id;
 
@@ -44,30 +43,24 @@ public class GridRunningQueryInfo {
     /** */
     private final long startTime;
 
-    /** Query start time in nanoseconds to measure duration. */
-    private final long startTimeNanos;
-
-    /** */
-    private final GridQueryCancel cancel;
+    /** Query cancellable flag. */
+    private boolean cancellable;
 
     /** */
     private final boolean loc;
 
-    /** */
-    private final QueryRunningFuture fut = new QueryRunningFuture();
+    /** Enforce join order query flag. */
+    private boolean enforceJoinOrder;
+
+    /** Lazy query flag. */
+    private boolean lazy;
 
-    /** Span of the running query. */
-    private final Span span;
+    /** Distributed joins query flag. */
+    private boolean distributedJoins;
 
     /** Originator. */
     private final String qryInitiatorId;
 
-    /** Request ID. */
-    private long reqId;
-
-    /** Subject ID. */
-    private final UUID subjId;
-
     /**
      * Constructor.
      *
@@ -77,23 +70,26 @@ public class GridRunningQueryInfo {
      * @param qryType Query type.
      * @param schemaName Schema name.
      * @param startTime Query start time.
-     * @param startTimeNanos Query start time in nanoseconds.
-     * @param cancel Query cancel.
+     * @param cancellable Query cancellable flag.
      * @param loc Local query flag.
-     * @param subjId Subject ID.
+     * @param enforceJoinOrder Local query flag.
+     * @param lazy Local query flag.
+     * @param distributedJoins Local query flag.
+     * @param qryInitiatorId Query's initiator identifier.
      */
-    public GridRunningQueryInfo(
-        long id,
+    public GridQueryStartedInfo(
+        Long id,
         UUID nodeId,
         String qry,
         GridCacheQueryType qryType,
         String schemaName,
         long startTime,
-        long startTimeNanos,
-        GridQueryCancel cancel,
+        boolean cancellable,
         boolean loc,
-        String qryInitiatorId,
-        UUID subjId
+        boolean enforceJoinOrder,
+        boolean lazy,
+        boolean distributedJoins,
+        String qryInitiatorId
     ) {
         this.id = id;
         this.nodeId = nodeId;
@@ -101,26 +97,26 @@ public class GridRunningQueryInfo {
         this.qryType = qryType;
         this.schemaName = schemaName;
         this.startTime = startTime;
-        this.startTimeNanos = startTimeNanos;
-        this.cancel = cancel;
+        this.cancellable = cancellable;
         this.loc = loc;
-        this.span = MTC.span();
+        this.enforceJoinOrder = enforceJoinOrder;
+        this.lazy = lazy;
+        this.distributedJoins = distributedJoins;
         this.qryInitiatorId = qryInitiatorId;
-        this.subjId = subjId;
     }
 
     /**
      * @return Query ID.
      */
-    public long id() {
+    public Long id() {
         return id;
     }
 
     /**
-     * @return Global query ID.
+     * @return Node ID.
      */
-    public String globalQueryId() {
-        return QueryUtils.globalQueryId(nodeId, id);
+    public UUID nodeId() {
+        return nodeId;
     }
 
     /**
@@ -151,42 +147,11 @@ public class GridRunningQueryInfo {
         return startTime;
     }
 
-    /**
-     * @return Query start time in nanoseconds.
-     */
-    public long startTimeNanos() {
-        return startTimeNanos;
-    }
-
-    /**
-     * @param curTime Current time.
-     * @param duration Duration of long query.
-     * @return {@code true} if this query should be considered as long running query.
-     */
-    public boolean longQuery(long curTime, long duration) {
-        return curTime - startTime > duration;
-    }
-
-    /**
-     * Cancel query.
-     */
-    public void cancel() {
-        if (cancel != null)
-            cancel.cancel();
-    }
-
-    /**
-     * @return Query running future.
-     */
-    public QueryRunningFuture runningFuture() {
-        return fut;
-    }
-
     /**
      * @return {@code true} if query can be cancelled.
      */
-    public boolean cancelable() {
-        return cancel != null;
+    public boolean cancellable() {
+        return cancellable;
     }
 
     /**
@@ -197,22 +162,24 @@ public class GridRunningQueryInfo {
     }
 
     /**
-     * @return Originating node ID.
+     * @return Enforce join order flag.
      */
-    public UUID nodeId() {
-        return nodeId;
+    public boolean enforceJoinOrder() {
+        return enforceJoinOrder;
     }
 
     /**
-     * @return Span of the running query.
+     * @return Lazy flag.
      */
-    public Span span() {
-        return span;
+    public boolean lazy() {
+        return lazy;
     }
 
-    /** @return Request ID. */
-    public long requestId() {
-        return reqId;
+    /**
+     * @return Distributed joins.
+     */
+    public boolean distributedJoins() {
+        return distributedJoins;
     }
 
     /**
@@ -223,13 +190,8 @@ public class GridRunningQueryInfo {
         return qryInitiatorId;
     }
 
-    /** @param reqId Request ID. */
-    public void requestId(long reqId) {
-        this.reqId = reqId;
-    }
-
-    /** @return Subject ID. */
-    public UUID subjectId() {
-        return subjId;
+    /**{@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridQueryStartedInfo.class, this);
     }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java
index bdc29e9f6ae..927239533cc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java
@@ -21,6 +21,8 @@ import java.util.UUID;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
 import org.apache.ignite.internal.processors.tracing.MTC;
 import org.apache.ignite.internal.processors.tracing.Span;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
 
 /**
  * Query descriptor.
@@ -54,6 +56,7 @@ public class GridRunningQueryInfo {
     private final boolean loc;
 
     /** */
+    @GridToStringExclude
     private final QueryRunningFuture fut = new QueryRunningFuture();
 
     /** Span of the running query. */
@@ -62,6 +65,15 @@ public class GridRunningQueryInfo {
     /** Originator. */
     private final String qryInitiatorId;
 
+    /** Enforce join order flag. */
+    private final boolean enforceJoinOrder;
+
+    /** Lazy flag. */
+    private final boolean lazy;
+
+    /** Distributed joins flag. */
+    private final boolean distributedJoins;
+
     /** Request ID. */
     private long reqId;
 
@@ -80,6 +92,10 @@ public class GridRunningQueryInfo {
      * @param startTimeNanos Query start time in nanoseconds.
      * @param cancel Query cancel.
      * @param loc Local query flag.
+     * @param qryInitiatorId Query's initiator identifier.
+     * @param enforceJoinOrder Enforce join order flag.
+     * @param lazy Lazy flag.
+     * @param distributedJoins Distributed joins flag.
      * @param subjId Subject ID.
      */
     public GridRunningQueryInfo(
@@ -93,6 +109,9 @@ public class GridRunningQueryInfo {
         GridQueryCancel cancel,
         boolean loc,
         String qryInitiatorId,
+        boolean enforceJoinOrder,
+        boolean lazy,
+        boolean distributedJoins,
         UUID subjId
     ) {
         this.id = id;
@@ -106,6 +125,9 @@ public class GridRunningQueryInfo {
         this.loc = loc;
         this.span = MTC.span();
         this.qryInitiatorId = qryInitiatorId;
+        this.enforceJoinOrder = enforceJoinOrder;
+        this.lazy = lazy;
+        this.distributedJoins = distributedJoins;
         this.subjId = subjId;
     }
 
@@ -223,6 +245,27 @@ public class GridRunningQueryInfo {
         return qryInitiatorId;
     }
 
+    /**
+     * @return Distributed joins.
+     */
+    public boolean distributedJoins() {
+        return distributedJoins;
+    }
+
+    /**
+     * @return Enforce join order flag.
+     */
+    public boolean enforceJoinOrder() {
+        return enforceJoinOrder;
+    }
+
+    /**
+     * @return Lazy flag.
+     */
+    public boolean lazy() {
+        return lazy;
+    }
+
     /** @param reqId Request ID. */
     public void requestId(long reqId) {
         this.reqId = reqId;
@@ -232,4 +275,9 @@ public class GridRunningQueryInfo {
     public UUID subjectId() {
         return subjId;
     }
+
+    /**{@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridRunningQueryInfo.class, this);
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/RunningQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/RunningQueryManager.java
index 11f1c8709df..c98bf4cf87c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/RunningQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/RunningQueryManager.java
@@ -25,10 +25,13 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Consumer;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.cluster.ClusterNode;
@@ -43,6 +46,7 @@ import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.managers.systemview.walker.SqlQueryHistoryViewWalker;
 import org.apache.ignite.internal.managers.systemview.walker.SqlQueryViewWalker;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
+import org.apache.ignite.internal.processors.closure.GridClosureProcessor;
 import org.apache.ignite.internal.processors.metric.MetricRegistry;
 import org.apache.ignite.internal.processors.metric.impl.AtomicLongMetric;
 import org.apache.ignite.internal.processors.metric.impl.LongAdderMetric;
@@ -53,6 +57,7 @@ import org.apache.ignite.internal.util.GridSpinBusyLock;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.lang.GridPlainRunnable;
 import org.apache.ignite.internal.util.typedef.CIX2;
+import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.plugin.extensions.communication.Message;
@@ -89,6 +94,9 @@ public class RunningQueryManager {
     /** Undefined query ID value. */
     public static final long UNDEFINED_QUERY_ID = 0L;
 
+    /** */
+    private final GridClosureProcessor closure;
+
     /** Keep registered user queries. */
     private final ConcurrentMap<Long, GridRunningQueryInfo> runs = new ConcurrentHashMap<>();
 
@@ -147,6 +155,12 @@ public class RunningQueryManager {
         }
     };
 
+    /** */
+    private final List<Consumer<GridQueryStartedInfo>> qryStartedListeners = new CopyOnWriteArrayList<>();
+
+    /** */
+    private final List<Consumer<GridQueryFinishedInfo>> qryFinishedListeners = new CopyOnWriteArrayList<>();
+
     /**
      * Constructor.
      *
@@ -160,6 +174,7 @@ public class RunningQueryManager {
         localNodeId = ctx.localNodeId();
 
         histSz = ctx.config().getSqlConfiguration().getSqlQueryHistorySize();
+        closure = ctx.closure();
 
         qryHistTracker = new QueryHistoryTracker(histSz);
 
@@ -229,11 +244,14 @@ public class RunningQueryManager {
      * @param schemaName Schema name.
      * @param loc Local query flag.
      * @param cancel Query cancel. Should be passed in case query is cancelable, or {@code null} otherwise.
+     * @param enforceJoinOrder Enforce join order flag.
+     * @param lazy Lazy flag.
+     * @param distributedJoins Distributed joins flag.
      * @return Id of registered query. Id is a positive number.
      */
     public long register(String qry, GridCacheQueryType qryType, String schemaName, boolean loc,
         @Nullable GridQueryCancel cancel,
-        String qryInitiatorId) {
+        String qryInitiatorId, boolean enforceJoinOrder, boolean lazy, boolean distributedJoins) {
         long qryId = qryIdGen.incrementAndGet();
 
         if (qryInitiatorId == null)
@@ -250,6 +268,9 @@ public class RunningQueryManager {
             cancel,
             loc,
             qryInitiatorId,
+            enforceJoinOrder,
+            lazy,
+            distributedJoins,
             securitySubjectId(ctx)
         );
 
@@ -262,6 +283,41 @@ public class RunningQueryManager {
 
         run.span().addTag(SQL_QRY_ID, run::globalQueryId);
 
+        if (!qryStartedListeners.isEmpty()) {
+            GridQueryStartedInfo info = new GridQueryStartedInfo(
+                run.id(),
+                localNodeId,
+                run.query(),
+                run.queryType(),
+                run.schemaName(),
+                run.startTime(),
+                run.cancelable(),
+                run.local(),
+                run.enforceJoinOrder(),
+                run.lazy(),
+                run.distributedJoins(),
+                run.queryInitiatorId()
+            );
+
+            try {
+                closure.runLocal(
+                    () -> qryStartedListeners.forEach(lsnr -> {
+                        try {
+                            lsnr.accept(info);
+                        }
+                        catch (Exception ex) {
+                            log.error("Listener fails during handling query started" +
+                                " event [qryId=" + qryId + "]", ex);
+                        }
+                    }),
+                    GridIoPolicy.PUBLIC_POOL
+                );
+            }
+            catch (IgniteCheckedException ex) {
+                throw new IgniteException(ex.getMessage(), ex);
+            }
+        }
+
         return qryId;
     }
 
@@ -289,6 +345,43 @@ public class RunningQueryManager {
             if (failed)
                 qrySpan.addTag(ERROR, failReason::getMessage);
 
+            if (!qryFinishedListeners.isEmpty()) {
+                GridQueryFinishedInfo info = new GridQueryFinishedInfo(
+                    qry.id(),
+                    localNodeId,
+                    qry.query(),
+                    qry.queryType(),
+                    qry.schemaName(),
+                    qry.startTime(),
+                    System.currentTimeMillis(),
+                    qry.local(),
+                    qry.enforceJoinOrder(),
+                    qry.lazy(),
+                    qry.distributedJoins(),
+                    failed,
+                    failReason,
+                    qry.queryInitiatorId()
+                );
+
+                try {
+                    closure.runLocal(
+                        () -> qryFinishedListeners.forEach(lsnr -> {
+                            try {
+                                lsnr.accept(info);
+                            }
+                            catch (Exception ex) {
+                                log.error("Listener fails during handling query finished" +
+                                    " event [qryId=" + qryId + "]", ex);
+                            }
+                        }),
+                        GridIoPolicy.PUBLIC_POOL
+                    );
+                }
+                catch (IgniteCheckedException ex) {
+                    throw new IgniteException(ex.getMessage(), ex);
+                }
+            }
+
             //We need to collect query history and metrics only for SQL queries.
             if (isSqlQuery(qry)) {
                 qry.runningFuture().onDone();
@@ -349,6 +442,44 @@ public class RunningQueryManager {
         return res;
     }
 
+    /**
+     * @param lsnr Listener.
+     */
+    public void registerQueryStartedListener(Consumer<GridQueryStartedInfo> lsnr) {
+        A.notNull(lsnr, "lsnr");
+
+        qryStartedListeners.add(lsnr);
+    }
+
+    /**
+     * @param lsnr Listener.
+     */
+    @SuppressWarnings("SuspiciousMethodCalls")
+    public boolean unregisterQueryStartedListener(Object lsnr) {
+        A.notNull(lsnr, "lsnr");
+
+        return qryStartedListeners.remove(lsnr);
+    }
+
+    /**
+     * @param lsnr Listener.
+     */
+    public void registerQueryFinishedListener(Consumer<GridQueryFinishedInfo> lsnr) {
+        A.notNull(lsnr, "lsnr");
+
+        qryFinishedListeners.add(lsnr);
+    }
+
+    /**
+     * @param lsnr Listener.
+     */
+    @SuppressWarnings("SuspiciousMethodCalls")
+    public boolean unregisterQueryFinishedListener(Object lsnr) {
+        A.notNull(lsnr, "lsnr");
+
+        return qryFinishedListeners.remove(lsnr);
+    }
+
     /**
      * Check belongs running query to an SQL type.
      *
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index a208ea0f3de..2463cba9677 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -31,6 +31,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
 import javax.cache.CacheException;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteDataStreamer;
@@ -84,7 +85,9 @@ import org.apache.ignite.internal.processors.query.GridQueryCancel;
 import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
 import org.apache.ignite.internal.processors.query.GridQueryFieldsResult;
 import org.apache.ignite.internal.processors.query.GridQueryFieldsResultAdapter;
+import org.apache.ignite.internal.processors.query.GridQueryFinishedInfo;
 import org.apache.ignite.internal.processors.query.GridQueryIndexing;
+import org.apache.ignite.internal.processors.query.GridQueryStartedInfo;
 import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
 import org.apache.ignite.internal.processors.query.IgniteSQLException;
 import org.apache.ignite.internal.processors.query.QueryField;
@@ -359,13 +362,29 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         H2TableDescriptor tbl = schemaMgr.tableForType(schemaName, cacheName, typeName);
 
         if (tbl != null && tbl.luceneIndex() != null) {
-            long qryId = runningQueryManager().register(qry, TEXT, schemaName, true, null, null);
+            long qryId = runningQueryManager().register(
+                qry,
+                TEXT,
+                schemaName,
+                true,
+                null,
+                null,
+                false,
+                false,
+                false
+            );
 
+            Throwable failReason = null;
             try {
                 return tbl.luceneIndex().query(qry.toUpperCase(), filters, limit);
             }
+            catch (Throwable t) {
+                failReason = t;
+
+                throw t;
+            }
             finally {
-                runningQueryManager().unregister(qryId, null);
+                runningQueryManager().unregister(qryId, failReason);
             }
         }
 
@@ -584,7 +603,10 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             schemaName,
             true,
             null,
-            qryInitiatorId
+            qryInitiatorId,
+            false,
+            false,
+            false
         );
 
         Exception failReason = null;
@@ -1459,7 +1481,10 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             qryDesc.schemaName(),
             qryDesc.local(),
             cancel,
-            qryDesc.queryInitiatorId()
+            qryDesc.queryInitiatorId(),
+            qryDesc.enforceJoinOrder(),
+            qryParams.lazy(),
+            qryDesc.distributedJoins()
         );
 
         if (ctx.event().isRecordable(EVT_SQL_QUERY_EXECUTION)) {
@@ -1508,6 +1533,34 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         }
     }
 
+    /**
+     * @param lsnr Listener.
+     */
+    public void registerQueryStartedListener(Consumer<GridQueryStartedInfo> lsnr) {
+        runningQueryManager().registerQueryStartedListener(lsnr);
+    }
+
+    /**
+     * @param lsnr Listener.
+     */
+    public boolean unregisterQueryStartedListener(Object lsnr) {
+        return runningQueryManager().unregisterQueryStartedListener(lsnr);
+    }
+
+    /**
+     * @param lsnr Listener.
+     */
+    public void registerQueryFinishedListener(Consumer<GridQueryFinishedInfo> lsnr) {
+        runningQueryManager().registerQueryFinishedListener(lsnr);
+    }
+
+    /**
+     * @param lsnr Listener.
+     */
+    public boolean unregisterQueryFinishedListener(Object lsnr) {
+        return runningQueryManager().unregisterQueryFinishedListener(lsnr);
+    }
+
     /** {@inheritDoc} */
     @Override public UpdateSourceIterator<?> executeUpdateOnDataNodeTransactional(
         GridCacheContext<?, ?> cctx,
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/IgniteSqlQueryStartFinishListenerTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/IgniteSqlQueryStartFinishListenerTest.java
new file mode 100644
index 00000000000..db129abe902
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/IgniteSqlQueryStartFinishListenerTest.java
@@ -0,0 +1,495 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cache.query.TextQuery;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.processors.cache.index.AbstractIndexingCommonTest;
+import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
+import org.apache.ignite.internal.processors.query.GridQueryFinishedInfo;
+import org.apache.ignite.internal.processors.query.GridQueryStartedInfo;
+import org.apache.ignite.internal.processors.query.IgniteSQLException;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.hamcrest.CustomMatcher;
+import org.hamcrest.Matcher;
+import org.junit.After;
+import org.junit.Test;
+
+import static org.apache.ignite.internal.processors.query.QueryUtils.SCHEMA_SYS;
+import static org.hamcrest.core.Is.is;
+import static org.hamcrest.core.IsEqual.equalTo;
+import static org.hamcrest.core.IsNull.notNullValue;
+import static org.junit.Assert.assertThat;
+
+/** Test for SQL query listeners. */
+public class IgniteSqlQueryStartFinishListenerTest extends AbstractIndexingCommonTest {
+    /** Client node name. */
+    private static final String CLIENT_NODE_NAME = "CLIENT_NODE";
+
+    /** Client node name. */
+    private static final String SERVER_NODE_NAME = "SERVER_NODE";
+
+    /** Listeners. */
+    private final List<Object> lsnrs = new ArrayList<>();
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGrid(SERVER_NODE_NAME);
+        startClientGrid(CLIENT_NODE_NAME);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        super.afterTestsStopped();
+
+        stopAllGrids();
+    }
+
+    /** */
+    @After
+    public void unregisterListeners() {
+        lsnrs.forEach(indexing()::unregisterQueryFinishedListener);
+        lsnrs.forEach(indexing()::unregisterQueryStartedListener);
+
+        lsnrs.clear();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        return super.getConfiguration(gridName)
+            .setSqlSchemas("TEST1")
+            .setCacheConfiguration(
+                new CacheConfiguration<String, String>(DEFAULT_CACHE_NAME)
+                    .setQueryEntities(Collections.singleton(new QueryEntity(String.class, String.class)))
+                    .setSqlFunctionClasses(GridTestUtils.SqlTestFunctions.class)
+            );
+    }
+
+    /**
+     * Ensure you could register and unregister a listener for query start/finish events:
+     *     - register listeners
+     *     - execute a query
+     *     - ensure both listeneres were notified
+     *     - unregister the query start listener
+     *     - run a query one more time
+     *     - ensure only one listener was notified
+     *     - unregister the query finish listener and register new one
+     *     - run a query one more time
+     *     - ensure only new listener was notified
+     *
+     * @throws Exception In case of error.
+     */
+    @Test
+    public void testRegisterUnregisterQueryListeners() throws Exception {
+        final AtomicInteger qryStarted = new AtomicInteger();
+        final AtomicInteger qryFinished = new AtomicInteger();
+
+        final Consumer<GridQueryStartedInfo> qryStartedLsnr = registerQueryStartedListener(info -> qryStarted.incrementAndGet());
+        final Consumer<GridQueryFinishedInfo> qryFinishedLsnr = registerQueryFinishedListener(info -> qryFinished.incrementAndGet());
+
+        {
+            execSql(SCHEMA_SYS, "select * from caches");
+
+            assertWithTimeout(qryStarted::get, is(equalTo(1)), 1_000);
+            assertWithTimeout(qryFinished::get, is(equalTo(1)), 1_000);
+        }
+
+        {
+            assertTrue(indexing().unregisterQueryStartedListener(qryStartedLsnr));
+
+            execSql(SCHEMA_SYS, "select * from caches");
+
+            assertWithTimeout(qryFinished::get, is(equalTo(2)), 1_000);
+            assertWithTimeout(qryStarted::get, is(equalTo(1)), 1_000);
+        }
+
+        {
+            assertTrue(indexing().unregisterQueryFinishedListener(qryFinishedLsnr));
+
+            final CountDownLatch latch = new CountDownLatch(1);
+
+            registerQueryFinishedListener(info -> latch.countDown());
+
+            execSql(SCHEMA_SYS, "select * from caches");
+
+            latch.await(1, TimeUnit.SECONDS);
+
+            assertWithTimeout(qryFinished::get, is(equalTo(2)), 1_000);
+            assertWithTimeout(qryStarted::get, is(equalTo(1)), 1_000);
+        }
+    }
+
+    /**
+     * Ensure listeners are notified with an actual query info:
+     *     - register listeners
+     *     - execute different queries
+     *     - verify query info passed to listeners
+     */
+    @Test
+    @SuppressWarnings("ThrowableNotThrown")
+    public void testVerifyQueryInfoPassedToListeners() throws Exception {
+        final AtomicReference<GridQueryStartedInfo> qryStarted = new AtomicReference<>();
+        final AtomicReference<GridQueryFinishedInfo> qryFinished = new AtomicReference<>();
+
+        registerQueryStartedListener(qryStarted::set);
+        registerQueryFinishedListener(qryFinished::set);
+
+        {
+            final long delay = 100;
+            final String qry = "select * from caches where ? = \"default\".delay(?) limit 1";
+
+            execSql(SCHEMA_SYS, qry, delay, delay);
+
+            assertWithTimeout(qryStarted::get, is(notNullValue()), 1_000);
+
+            GridQueryStartedInfo startedInfo = qryStarted.get();
+            assertEquals(SCHEMA_SYS, startedInfo.schemaName());
+            assertEquals(qry, startedInfo.query());
+            assertEquals(grid(SERVER_NODE_NAME).localNode().id(), startedInfo.nodeId());
+            assertEquals(false, startedInfo.local());
+            assertEquals(GridCacheQueryType.SQL_FIELDS, startedInfo.queryType());
+
+            assertWithTimeout(qryFinished::get, is(notNullValue()), 1_000);
+
+            GridQueryFinishedInfo finishedInfo = qryFinished.get();
+            assertEquals(SCHEMA_SYS, finishedInfo.schemaName());
+            assertEquals(qry, finishedInfo.query());
+            assertEquals(grid(SERVER_NODE_NAME).localNode().id(), startedInfo.nodeId());
+            assertEquals(false, finishedInfo.local());
+            assertEquals(GridCacheQueryType.SQL_FIELDS, finishedInfo.queryType());
+            assertEquals(false, finishedInfo.failed());
+            assertThat(finishedInfo.finishTime() - finishedInfo.startTime(), is(greaterOrEqualTo(delay)));
+
+            qryStarted.set(null);
+            qryFinished.set(null);
+        }
+
+        {
+            final String schema = "TEST1";
+            final String qry = "select \"default\".can_fail() from " + SCHEMA_SYS + ".caches where ? = ? limit 1";
+
+            GridTestUtils.SqlTestFunctions.fail = true;
+
+            GridTestUtils.assertThrowsWithCause(() -> execSqlLocal(schema, qry, 1, 1), IgniteSQLException.class);
+
+            assertWithTimeout(qryStarted::get, is(notNullValue()), 1_000);
+
+            GridQueryStartedInfo startedInfo = qryStarted.get();
+            assertEquals(schema, startedInfo.schemaName());
+            assertEquals(qry, startedInfo.query());
+            assertEquals(grid(SERVER_NODE_NAME).localNode().id(), startedInfo.nodeId());
+            assertEquals(true, startedInfo.local());
+            assertEquals(GridCacheQueryType.SQL_FIELDS, startedInfo.queryType());
+
+            assertWithTimeout(qryFinished::get, is(notNullValue()), 1_000);
+
+            GridQueryFinishedInfo finishedInfo = qryFinished.get();
+            assertEquals(schema, finishedInfo.schemaName());
+            assertEquals(qry, finishedInfo.query());
+            assertEquals(grid(SERVER_NODE_NAME).localNode().id(), startedInfo.nodeId());
+            assertEquals(true, finishedInfo.local());
+            assertEquals(GridCacheQueryType.SQL_FIELDS, finishedInfo.queryType());
+            assertEquals(true, finishedInfo.failed());
+            assertThat(finishedInfo.finishTime(), is(greaterOrEqualTo(finishedInfo.startTime())));
+
+            qryStarted.set(null);
+            qryFinished.set(null);
+        }
+
+        {
+            final String qry = "text query";
+
+            IgniteCache<String, Object> cache = grid(CLIENT_NODE_NAME).cache(DEFAULT_CACHE_NAME);
+
+            cache.query(new TextQuery<String, String>(String.class, "text query"));
+
+            assertWithTimeout(qryStarted::get, is(notNullValue()), 1_000);
+
+            GridQueryStartedInfo startedInfo = qryStarted.get();
+            assertEquals(cache.getName(), startedInfo.schemaName());
+            assertEquals(qry, startedInfo.query());
+            assertEquals(grid(SERVER_NODE_NAME).localNode().id(), startedInfo.nodeId());
+            assertEquals(true, startedInfo.local());
+            assertEquals(GridCacheQueryType.TEXT, startedInfo.queryType());
+
+            assertWithTimeout(qryFinished::get, is(notNullValue()), 1_000);
+
+            GridQueryFinishedInfo finishedInfo = qryFinished.get();
+            assertEquals(cache.getName(), finishedInfo.schemaName());
+            assertEquals(qry, finishedInfo.query());
+            assertEquals(grid(SERVER_NODE_NAME).localNode().id(), startedInfo.nodeId());
+            assertEquals(true, finishedInfo.local());
+            assertEquals(GridCacheQueryType.TEXT, finishedInfo.queryType());
+            assertEquals(false, finishedInfo.failed());
+            assertThat(finishedInfo.finishTime(), is(greaterOrEqualTo(finishedInfo.startTime())));
+
+            qryStarted.set(null);
+            qryFinished.set(null);
+        }
+    }
+
+    /**
+     * Ensure listeners do not block query execution
+     *     - register blocking listeners
+     *     - execute a lot of queries
+     *     - verify all queries finished while listeners is still blocked
+     */
+    @Test
+    public void testListeneresNotBlocksQueryExecution() throws IgniteCheckedException {
+        final CountDownLatch latch = new CountDownLatch(1);
+        final AtomicInteger lsnrCalls = new AtomicInteger();
+
+        final int quryRuns = 1_000;
+        final int threadCnt = 20;
+
+        registerQueryStartedListener(info -> {
+            try {
+                latch.await();
+            }
+            catch (InterruptedException ignored) {
+            }
+
+            lsnrCalls.incrementAndGet();
+        });
+        registerQueryFinishedListener(info -> {
+            try {
+                latch.await();
+            }
+            catch (InterruptedException ignored) {
+            }
+
+            lsnrCalls.incrementAndGet();
+        });
+
+        IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(() -> {
+            for (int i = 0; i < quryRuns; i++)
+                execSql(SCHEMA_SYS, "select * from caches");
+        }, threadCnt, "test-async-query-runner");
+
+        try {
+            fut.get(15_000);
+        }
+        finally {
+            latch.countDown();
+        }
+
+        assertWithTimeout(lsnrCalls::get, is(equalTo(2 * threadCnt * quryRuns)), 15_000);
+    }
+
+    /**
+     * Ensure notification chain is not interrupted if exception was thrown in the middle of the chain
+     *     - register several listeners such the listener will throw exception if condition is met
+     *     - execute query several times, one of the listeners from each chain should fail
+     *     - verify all other listeners were notified
+     */
+    @Test
+    public void testFailedListenereNotAffectOthers() throws IgniteCheckedException {
+        final int lsnrCnt = 3;
+        final long waitTimeout = 1_000;
+
+        boolean[] startLsnrsNotified = new boolean[lsnrCnt];
+        boolean[] finishLsnrsNotified = new boolean[lsnrCnt];
+        boolean[] startLsnrShouldFail = new boolean[lsnrCnt];
+        boolean[] finishLsnrShouldFail = new boolean[lsnrCnt];
+
+        for (int i = 0; i < lsnrCnt; i++) {
+            final int lsnNo = i;
+
+            registerQueryStartedListener(info -> {
+                if (startLsnrShouldFail[lsnNo]) {
+                    startLsnrShouldFail[lsnNo] = false;
+
+                    throw new RuntimeException("Start listener fails");
+                }
+
+                startLsnrsNotified[lsnNo] = true;
+            });
+
+            registerQueryFinishedListener(info -> {
+                if (finishLsnrShouldFail[lsnNo]) {
+                    finishLsnrShouldFail[lsnNo] = false;
+
+                    throw new RuntimeException("Finish listener fails");
+                }
+
+                finishLsnrsNotified[lsnNo] = true;
+            });
+        }
+
+        {
+            startLsnrShouldFail[0] = true;
+            finishLsnrShouldFail[1] = true;
+
+            execSql(SCHEMA_SYS, "select * from caches");
+
+            assertWithTimeout(() -> startLsnrsNotified[0], isFalse(), waitTimeout);
+            assertWithTimeout(() -> startLsnrsNotified[1], isTrue(), waitTimeout);
+            assertWithTimeout(() -> startLsnrsNotified[2], isTrue(), waitTimeout);
+
+            assertWithTimeout(() -> finishLsnrsNotified[0], isTrue(), waitTimeout);
+            assertWithTimeout(() -> finishLsnrsNotified[1], isFalse(), waitTimeout);
+            assertWithTimeout(() -> finishLsnrsNotified[2], isTrue(), waitTimeout);
+
+            resetListeners(startLsnrsNotified, finishLsnrsNotified);
+        }
+
+        {
+            startLsnrShouldFail[1] = true;
+            finishLsnrShouldFail[2] = true;
+
+            execSql(SCHEMA_SYS, "select * from caches");
+
+            assertWithTimeout(() -> startLsnrsNotified[0], isTrue(), waitTimeout);
+            assertWithTimeout(() -> startLsnrsNotified[1], isFalse(), waitTimeout);
+            assertWithTimeout(() -> startLsnrsNotified[2], isTrue(), waitTimeout);
+
+            assertWithTimeout(() -> finishLsnrsNotified[0], isTrue(), waitTimeout);
+            assertWithTimeout(() -> finishLsnrsNotified[1], isTrue(), waitTimeout);
+            assertWithTimeout(() -> finishLsnrsNotified[2], isFalse(), waitTimeout);
+
+            resetListeners(startLsnrsNotified, finishLsnrsNotified);
+        }
+
+        {
+            startLsnrShouldFail[2] = true;
+            finishLsnrShouldFail[0] = true;
+
+            execSql(SCHEMA_SYS, "select * from caches");
+
+            assertWithTimeout(() -> startLsnrsNotified[0], isTrue(), waitTimeout);
+            assertWithTimeout(() -> startLsnrsNotified[1], isTrue(), waitTimeout);
+            assertWithTimeout(() -> startLsnrsNotified[2], isFalse(), waitTimeout);
+
+            assertWithTimeout(() -> finishLsnrsNotified[0], isFalse(), waitTimeout);
+            assertWithTimeout(() -> finishLsnrsNotified[1], isTrue(), waitTimeout);
+            assertWithTimeout(() -> finishLsnrsNotified[2], isTrue(), waitTimeout);
+
+            resetListeners(startLsnrsNotified, finishLsnrsNotified);
+        }
+    }
+
+    /**
+     * Sets all elements from each array to {@code false}.
+     */
+    private void resetListeners(boolean[]... arrays) {
+        for (boolean[] arr : arrays)
+            Arrays.fill(arr, false);
+    }
+
+    /** */
+    private IgniteH2Indexing indexing() {
+        return (IgniteH2Indexing)grid(SERVER_NODE_NAME).context().query().getIndexing();
+    }
+
+    /** */
+    private List<List<?>> execSql(String schema, String sql, Object... args) {
+        return grid(SERVER_NODE_NAME).cache(DEFAULT_CACHE_NAME).query(
+            new SqlFieldsQuery(sql).setSchema(schema).setArgs(args).setLocal(false)
+        ).getAll();
+    }
+
+    /** */
+    private List<List<?>> execSqlLocal(String schema, String sql, Object... args) {
+        return grid(SERVER_NODE_NAME).cache(DEFAULT_CACHE_NAME).query(
+            new SqlFieldsQuery(sql).setSchema(schema).setArgs(args).setLocal(true)
+        ).getAll();
+    }
+
+    /**
+     * @param lsnr Listener.
+     */
+    private Consumer<GridQueryStartedInfo> registerQueryStartedListener(Consumer<GridQueryStartedInfo> lsnr) {
+        lsnrs.add(lsnr);
+
+        indexing().registerQueryStartedListener(lsnr);
+
+        return lsnr;
+    }
+
+    /**
+     * @param lsnr Listener.
+     */
+    private Consumer<GridQueryFinishedInfo> registerQueryFinishedListener(Consumer<GridQueryFinishedInfo> lsnr) {
+        lsnrs.add(lsnr);
+
+        indexing().registerQueryFinishedListener(lsnr);
+
+        return lsnr;
+    }
+
+    /**
+     * @param actualSupplier Supplier for value to check.
+     * @param matcher Matcher.
+     * @param timeout Timeout.
+     */
+    private <T> void assertWithTimeout(Supplier<T> actualSupplier, Matcher<? super T> matcher, long timeout)
+        throws IgniteInterruptedCheckedException {
+        GridTestUtils.waitForCondition(() -> matcher.matches(actualSupplier.get()), timeout);
+
+        assertThat(actualSupplier.get(), matcher);
+    }
+
+    /**
+     * @param wanted Wanted.
+     */
+    private static <T extends Comparable<? super T>> Matcher<T> greaterOrEqualTo(T wanted) {
+        return new CustomMatcher<T>("should be greater or equal to " + wanted) {
+            @SuppressWarnings({"unchecked", "rawtypes"})
+            @Override public boolean matches(Object item) {
+                return wanted != null && item instanceof Comparable && ((Comparable)item).compareTo(wanted) >= 0;
+            }
+        };
+    }
+
+    /** */
+    private static Matcher<Boolean> isTrue() {
+        return new CustomMatcher<Boolean>("should be true ") {
+            @Override public boolean matches(Object item) {
+                return item instanceof Boolean && (Boolean)item;
+            }
+        };
+    }
+
+    /** */
+    private static Matcher<Boolean> isFalse() {
+        return new CustomMatcher<Boolean>("should be true ") {
+            @Override public boolean matches(Object item) {
+                return item instanceof Boolean && !(Boolean)item;
+            }
+        };
+    }
+}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
index c2722a15e0a..24dc627847a 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
@@ -137,6 +137,7 @@ import org.apache.ignite.internal.processors.query.h2.GridSubqueryJoinOptimizerS
 import org.apache.ignite.internal.processors.query.h2.H2ResultSetIteratorNullifyOnEndSelfTest;
 import org.apache.ignite.internal.processors.query.h2.IgniteSqlBigIntegerKeyTest;
 import org.apache.ignite.internal.processors.query.h2.IgniteSqlQueryMinMaxTest;
+import org.apache.ignite.internal.processors.query.h2.IgniteSqlQueryStartFinishListenerTest;
 import org.apache.ignite.internal.processors.query.h2.QueryDataPageScanTest;
 import org.apache.ignite.internal.processors.query.h2.sql.ExplainSelfTest;
 import org.apache.ignite.internal.processors.query.h2.sql.GridQueryParsingTest;
@@ -302,6 +303,7 @@ import org.junit.runners.Suite;
 
     IgniteCacheMultipleIndexedTypesTest.class,
     IgniteSqlQueryMinMaxTest.class,
+    IgniteSqlQueryStartFinishListenerTest.class,
 
     GridCircularQueueTest.class,
     IndexingSpiQueryWithH2IndexingSelfTest.class,