You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by an...@apache.org on 2022/08/31 07:37:11 UTC

[ignite] branch ignite-17594 created (now 81402f8d953)

This is an automated email from the ASF dual-hosted git repository.

anovikov pushed a change to branch ignite-17594
in repository https://gitbox.apache.org/repos/asf/ignite.git


      at 81402f8d953 IGNITE-17594 Provide ability to register listeners for query start/finish events.

This branch includes the following new commits:

     new 81402f8d953 IGNITE-17594 Provide ability to register listeners for query start/finish events.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[ignite] 01/01: IGNITE-17594 Provide ability to register listeners for query start/finish events.

Posted by an...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

anovikov pushed a commit to branch ignite-17594
in repository https://gitbox.apache.org/repos/asf/ignite.git

commit 81402f8d953ef5f791500f7997b97dc3c8ff9378
Author: Andrey Novikov <an...@gridgain.com>
AuthorDate: Wed Aug 31 14:34:28 2022 +0700

    IGNITE-17594 Provide ability to register listeners for query start/finish events.
---
 .../query/calcite/QueryRegistryImpl.java           |   2 +-
 ...ngQueryInfo.java => GridQueryFinishedInfo.java} | 151 +++----
 ...ingQueryInfo.java => GridQueryStartedInfo.java} | 134 ++----
 .../processors/query/GridRunningQueryInfo.java     |  48 ++
 .../processors/query/RunningQueryManager.java      | 134 +++++-
 .../processors/query/h2/IgniteH2Indexing.java      |  62 ++-
 .../h2/IgniteSqlQueryStartFinishListenerTest.java  | 496 +++++++++++++++++++++
 .../IgniteBinaryCacheQueryTestSuite.java           |   2 +
 8 files changed, 857 insertions(+), 172 deletions(-)

diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/QueryRegistryImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/QueryRegistryImpl.java
index 58c5df1bf25..d023a258e1a 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/QueryRegistryImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/QueryRegistryImpl.java
@@ -63,7 +63,7 @@ public class QueryRegistryImpl extends AbstractService implements QueryRegistry
             String initiatorId = fieldsQry != null ? fieldsQry.getQueryInitiatorId() : null;
 
             long locId = qryMgr.register(rootQry.sql(), GridCacheQueryType.SQL_FIELDS, rootQry.context().schemaName(),
-                false, createCancelToken(qry), initiatorId);
+                false, createCancelToken(qry), initiatorId, false, false, false);
 
             rootQry.localQueryId(locId);
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryFinishedInfo.java
similarity index 58%
copy from modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java
copy to modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryFinishedInfo.java
index bdc29e9f6ae..8983f8230a4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryFinishedInfo.java
@@ -19,13 +19,13 @@ package org.apache.ignite.internal.processors.query;
 
 import java.util.UUID;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
-import org.apache.ignite.internal.processors.tracing.MTC;
-import org.apache.ignite.internal.processors.tracing.Span;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
 
 /**
- * Query descriptor.
+ * Info about finished query.
  */
-public class GridRunningQueryInfo {
+public class GridQueryFinishedInfo {
     /** */
     private final long id;
 
@@ -44,29 +44,30 @@ public class GridRunningQueryInfo {
     /** */
     private final long startTime;
 
-    /** Query start time in nanoseconds to measure duration. */
-    private final long startTimeNanos;
-
     /** */
-    private final GridQueryCancel cancel;
+    private final long finishTime;
 
     /** */
     private final boolean loc;
 
-    /** */
-    private final QueryRunningFuture fut = new QueryRunningFuture();
+    /** Enforce join order query flag. */
+    private boolean enforceJoinOrder;
 
-    /** Span of the running query. */
-    private final Span span;
+    /** Lazy query flag. */
+    private boolean lazy;
 
-    /** Originator. */
-    private final String qryInitiatorId;
+    /** Distributed joins query flag. */
+    private boolean distributedJoins;
 
-    /** Request ID. */
-    private long reqId;
+    /** Whether query is failed or not. */
+    private final boolean failed;
 
-    /** Subject ID. */
-    private final UUID subjId;
+    /** Exception that caused query execution fail. */
+    @Nullable
+    private final Throwable failReason;
+
+    /** Originator. */
+    private final String qryInitiatorId;
 
     /**
      * Constructor.
@@ -77,23 +78,30 @@ public class GridRunningQueryInfo {
      * @param qryType Query type.
      * @param schemaName Schema name.
      * @param startTime Query start time.
-     * @param startTimeNanos Query start time in nanoseconds.
-     * @param cancel Query cancel.
+     * @param finishTime Query finish time.
      * @param loc Local query flag.
-     * @param subjId Subject ID.
-     */
-    public GridRunningQueryInfo(
-        long id,
+     * @param enforceJoinOrder Local query flag.
+     * @param lazy Local query flag.
+     * @param distributedJoins Local query flag.
+     * @param failed Whether query is failed or not.
+     * @param failReason Exception that caused query execution fail.
+     * @param qryInitiatorId Query's initiator identifier.
+     */
+    public GridQueryFinishedInfo(
+        Long id,
         UUID nodeId,
         String qry,
         GridCacheQueryType qryType,
         String schemaName,
         long startTime,
-        long startTimeNanos,
-        GridQueryCancel cancel,
+        long finishTime,
         boolean loc,
-        String qryInitiatorId,
-        UUID subjId
+        boolean enforceJoinOrder,
+        boolean lazy,
+        boolean distributedJoins,
+        boolean failed,
+        @Nullable Throwable failReason,
+        String qryInitiatorId
     ) {
         this.id = id;
         this.nodeId = nodeId;
@@ -101,26 +109,28 @@ public class GridRunningQueryInfo {
         this.qryType = qryType;
         this.schemaName = schemaName;
         this.startTime = startTime;
-        this.startTimeNanos = startTimeNanos;
-        this.cancel = cancel;
+        this.finishTime = finishTime;
         this.loc = loc;
-        this.span = MTC.span();
+        this.enforceJoinOrder = enforceJoinOrder;
+        this.lazy = lazy;
+        this.distributedJoins = distributedJoins;
+        this.failed = failed;
+        this.failReason = failReason;
         this.qryInitiatorId = qryInitiatorId;
-        this.subjId = subjId;
     }
 
     /**
      * @return Query ID.
      */
-    public long id() {
+    public Long id() {
         return id;
     }
 
     /**
-     * @return Global query ID.
+     * @return Node ID.
      */
-    public String globalQueryId() {
-        return QueryUtils.globalQueryId(nodeId, id);
+    public UUID nodeId() {
+        return nodeId;
     }
 
     /**
@@ -152,67 +162,53 @@ public class GridRunningQueryInfo {
     }
 
     /**
-     * @return Query start time in nanoseconds.
-     */
-    public long startTimeNanos() {
-        return startTimeNanos;
-    }
-
-    /**
-     * @param curTime Current time.
-     * @param duration Duration of long query.
-     * @return {@code true} if this query should be considered as long running query.
+     * @return Query finish time.
      */
-    public boolean longQuery(long curTime, long duration) {
-        return curTime - startTime > duration;
+    public long finishTime() {
+        return finishTime;
     }
 
     /**
-     * Cancel query.
+     * @return {@code true} if query is local.
      */
-    public void cancel() {
-        if (cancel != null)
-            cancel.cancel();
+    public boolean local() {
+        return loc;
     }
 
     /**
-     * @return Query running future.
+     * @return Enforce join order flag.
      */
-    public QueryRunningFuture runningFuture() {
-        return fut;
+    public boolean enforceJoinOrder() {
+        return enforceJoinOrder;
     }
 
     /**
-     * @return {@code true} if query can be cancelled.
+     * @return Lazy flag.
      */
-    public boolean cancelable() {
-        return cancel != null;
+    public boolean lazy() {
+        return lazy;
     }
 
     /**
-     * @return {@code true} if query is local.
+     * @return Distributed joins.
      */
-    public boolean local() {
-        return loc;
+    public boolean distributedJoins() {
+        return distributedJoins;
     }
 
     /**
-     * @return Originating node ID.
+     * @return {@code true} if query is failed.
      */
-    public UUID nodeId() {
-        return nodeId;
+    public boolean failed() {
+        return failed;
     }
 
     /**
-     * @return Span of the running query.
+     * @return Exception that caused query execution fail, or {@code null} if query succeded.
      */
-    public Span span() {
-        return span;
-    }
-
-    /** @return Request ID. */
-    public long requestId() {
-        return reqId;
+    @Nullable
+    public Throwable failReason() {
+        return failReason;
     }
 
     /**
@@ -223,13 +219,8 @@ public class GridRunningQueryInfo {
         return qryInitiatorId;
     }
 
-    /** @param reqId Request ID. */
-    public void requestId(long reqId) {
-        this.reqId = reqId;
-    }
-
-    /** @return Subject ID. */
-    public UUID subjectId() {
-        return subjId;
+    /**{@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridQueryFinishedInfo.class, this);
     }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryStartedInfo.java
similarity index 58%
copy from modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java
copy to modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryStartedInfo.java
index bdc29e9f6ae..f0e83794dbd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryStartedInfo.java
@@ -19,13 +19,12 @@ package org.apache.ignite.internal.processors.query;
 
 import java.util.UUID;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
-import org.apache.ignite.internal.processors.tracing.MTC;
-import org.apache.ignite.internal.processors.tracing.Span;
+import org.apache.ignite.internal.util.typedef.internal.S;
 
 /**
- * Query descriptor.
+ * Info about new started query.
  */
-public class GridRunningQueryInfo {
+public class GridQueryStartedInfo {
     /** */
     private final long id;
 
@@ -44,30 +43,24 @@ public class GridRunningQueryInfo {
     /** */
     private final long startTime;
 
-    /** Query start time in nanoseconds to measure duration. */
-    private final long startTimeNanos;
-
-    /** */
-    private final GridQueryCancel cancel;
+    /** Query cancellable flag. */
+    private boolean cancellable;
 
     /** */
     private final boolean loc;
 
-    /** */
-    private final QueryRunningFuture fut = new QueryRunningFuture();
+    /** Enforce join order query flag. */
+    private boolean enforceJoinOrder;
+
+    /** Lazy query flag. */
+    private boolean lazy;
 
-    /** Span of the running query. */
-    private final Span span;
+    /** Distributed joins query flag. */
+    private boolean distributedJoins;
 
     /** Originator. */
     private final String qryInitiatorId;
 
-    /** Request ID. */
-    private long reqId;
-
-    /** Subject ID. */
-    private final UUID subjId;
-
     /**
      * Constructor.
      *
@@ -77,23 +70,26 @@ public class GridRunningQueryInfo {
      * @param qryType Query type.
      * @param schemaName Schema name.
      * @param startTime Query start time.
-     * @param startTimeNanos Query start time in nanoseconds.
-     * @param cancel Query cancel.
+     * @param cancellable Query cancellable flag.
      * @param loc Local query flag.
-     * @param subjId Subject ID.
+     * @param enforceJoinOrder Local query flag.
+     * @param lazy Local query flag.
+     * @param distributedJoins Local query flag.
+     * @param qryInitiatorId Query's initiator identifier.
      */
-    public GridRunningQueryInfo(
-        long id,
+    public GridQueryStartedInfo(
+        Long id,
         UUID nodeId,
         String qry,
         GridCacheQueryType qryType,
         String schemaName,
         long startTime,
-        long startTimeNanos,
-        GridQueryCancel cancel,
+        boolean cancellable,
         boolean loc,
-        String qryInitiatorId,
-        UUID subjId
+        boolean enforceJoinOrder,
+        boolean lazy,
+        boolean distributedJoins,
+        String qryInitiatorId
     ) {
         this.id = id;
         this.nodeId = nodeId;
@@ -101,26 +97,26 @@ public class GridRunningQueryInfo {
         this.qryType = qryType;
         this.schemaName = schemaName;
         this.startTime = startTime;
-        this.startTimeNanos = startTimeNanos;
-        this.cancel = cancel;
+        this.cancellable = cancellable;
         this.loc = loc;
-        this.span = MTC.span();
+        this.enforceJoinOrder = enforceJoinOrder;
+        this.lazy = lazy;
+        this.distributedJoins = distributedJoins;
         this.qryInitiatorId = qryInitiatorId;
-        this.subjId = subjId;
     }
 
     /**
      * @return Query ID.
      */
-    public long id() {
+    public Long id() {
         return id;
     }
 
     /**
-     * @return Global query ID.
+     * @return Node ID.
      */
-    public String globalQueryId() {
-        return QueryUtils.globalQueryId(nodeId, id);
+    public UUID nodeId() {
+        return nodeId;
     }
 
     /**
@@ -151,42 +147,11 @@ public class GridRunningQueryInfo {
         return startTime;
     }
 
-    /**
-     * @return Query start time in nanoseconds.
-     */
-    public long startTimeNanos() {
-        return startTimeNanos;
-    }
-
-    /**
-     * @param curTime Current time.
-     * @param duration Duration of long query.
-     * @return {@code true} if this query should be considered as long running query.
-     */
-    public boolean longQuery(long curTime, long duration) {
-        return curTime - startTime > duration;
-    }
-
-    /**
-     * Cancel query.
-     */
-    public void cancel() {
-        if (cancel != null)
-            cancel.cancel();
-    }
-
-    /**
-     * @return Query running future.
-     */
-    public QueryRunningFuture runningFuture() {
-        return fut;
-    }
-
     /**
      * @return {@code true} if query can be cancelled.
      */
-    public boolean cancelable() {
-        return cancel != null;
+    public boolean cancellable() {
+        return cancellable;
     }
 
     /**
@@ -197,22 +162,24 @@ public class GridRunningQueryInfo {
     }
 
     /**
-     * @return Originating node ID.
+     * @return Enforce join order flag.
      */
-    public UUID nodeId() {
-        return nodeId;
+    public boolean enforceJoinOrder() {
+        return enforceJoinOrder;
     }
 
     /**
-     * @return Span of the running query.
+     * @return Lazy flag.
      */
-    public Span span() {
-        return span;
+    public boolean lazy() {
+        return lazy;
     }
 
-    /** @return Request ID. */
-    public long requestId() {
-        return reqId;
+    /**
+     * @return Distributed joins.
+     */
+    public boolean distributedJoins() {
+        return distributedJoins;
     }
 
     /**
@@ -223,13 +190,8 @@ public class GridRunningQueryInfo {
         return qryInitiatorId;
     }
 
-    /** @param reqId Request ID. */
-    public void requestId(long reqId) {
-        this.reqId = reqId;
-    }
-
-    /** @return Subject ID. */
-    public UUID subjectId() {
-        return subjId;
+    /**{@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridQueryStartedInfo.class, this);
     }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java
index bdc29e9f6ae..927239533cc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java
@@ -21,6 +21,8 @@ import java.util.UUID;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
 import org.apache.ignite.internal.processors.tracing.MTC;
 import org.apache.ignite.internal.processors.tracing.Span;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
 
 /**
  * Query descriptor.
@@ -54,6 +56,7 @@ public class GridRunningQueryInfo {
     private final boolean loc;
 
     /** */
+    @GridToStringExclude
     private final QueryRunningFuture fut = new QueryRunningFuture();
 
     /** Span of the running query. */
@@ -62,6 +65,15 @@ public class GridRunningQueryInfo {
     /** Originator. */
     private final String qryInitiatorId;
 
+    /** Enforce join order flag. */
+    private final boolean enforceJoinOrder;
+
+    /** Lazy flag. */
+    private final boolean lazy;
+
+    /** Distributed joins flag. */
+    private final boolean distributedJoins;
+
     /** Request ID. */
     private long reqId;
 
@@ -80,6 +92,10 @@ public class GridRunningQueryInfo {
      * @param startTimeNanos Query start time in nanoseconds.
      * @param cancel Query cancel.
      * @param loc Local query flag.
+     * @param qryInitiatorId Query's initiator identifier.
+     * @param enforceJoinOrder Enforce join order flag.
+     * @param lazy Lazy flag.
+     * @param distributedJoins Distributed joins flag.
      * @param subjId Subject ID.
      */
     public GridRunningQueryInfo(
@@ -93,6 +109,9 @@ public class GridRunningQueryInfo {
         GridQueryCancel cancel,
         boolean loc,
         String qryInitiatorId,
+        boolean enforceJoinOrder,
+        boolean lazy,
+        boolean distributedJoins,
         UUID subjId
     ) {
         this.id = id;
@@ -106,6 +125,9 @@ public class GridRunningQueryInfo {
         this.loc = loc;
         this.span = MTC.span();
         this.qryInitiatorId = qryInitiatorId;
+        this.enforceJoinOrder = enforceJoinOrder;
+        this.lazy = lazy;
+        this.distributedJoins = distributedJoins;
         this.subjId = subjId;
     }
 
@@ -223,6 +245,27 @@ public class GridRunningQueryInfo {
         return qryInitiatorId;
     }
 
+    /**
+     * @return Distributed joins.
+     */
+    public boolean distributedJoins() {
+        return distributedJoins;
+    }
+
+    /**
+     * @return Enforce join order flag.
+     */
+    public boolean enforceJoinOrder() {
+        return enforceJoinOrder;
+    }
+
+    /**
+     * @return Lazy flag.
+     */
+    public boolean lazy() {
+        return lazy;
+    }
+
     /** @param reqId Request ID. */
     public void requestId(long reqId) {
         this.reqId = reqId;
@@ -232,4 +275,9 @@ public class GridRunningQueryInfo {
     public UUID subjectId() {
         return subjId;
     }
+
+    /**{@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridRunningQueryInfo.class, this);
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/RunningQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/RunningQueryManager.java
index 11f1c8709df..5dcf7b66a47 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/RunningQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/RunningQueryManager.java
@@ -25,10 +25,13 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.cluster.ClusterNode;
@@ -37,15 +40,18 @@ import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.Event;
 import org.apache.ignite.events.EventType;
 import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
 import org.apache.ignite.internal.GridTopic;
 import org.apache.ignite.internal.managers.communication.GridIoPolicy;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.managers.systemview.walker.SqlQueryHistoryViewWalker;
 import org.apache.ignite.internal.managers.systemview.walker.SqlQueryViewWalker;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
+import org.apache.ignite.internal.processors.closure.GridClosureProcessor;
 import org.apache.ignite.internal.processors.metric.MetricRegistry;
 import org.apache.ignite.internal.processors.metric.impl.AtomicLongMetric;
 import org.apache.ignite.internal.processors.metric.impl.LongAdderMetric;
+import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.processors.query.messages.GridQueryKillRequest;
 import org.apache.ignite.internal.processors.query.messages.GridQueryKillResponse;
 import org.apache.ignite.internal.processors.tracing.Span;
@@ -89,6 +95,9 @@ public class RunningQueryManager {
     /** Undefined query ID value. */
     public static final long UNDEFINED_QUERY_ID = 0L;
 
+    /** */
+    private final GridClosureProcessor closure;
+
     /** Keep registered user queries. */
     private final ConcurrentMap<Long, GridRunningQueryInfo> runs = new ConcurrentHashMap<>();
 
@@ -147,6 +156,12 @@ public class RunningQueryManager {
         }
     };
 
+    /** */
+    private final List<Consumer<GridQueryStartedInfo>> qryStartedListeners = new CopyOnWriteArrayList<>();
+
+    /** */
+    private final List<Consumer<GridQueryFinishedInfo>> qryFinishedListeners = new CopyOnWriteArrayList<>();
+
     /**
      * Constructor.
      *
@@ -160,6 +175,7 @@ public class RunningQueryManager {
         localNodeId = ctx.localNodeId();
 
         histSz = ctx.config().getSqlConfiguration().getSqlQueryHistorySize();
+        closure = ctx.closure();
 
         qryHistTracker = new QueryHistoryTracker(histSz);
 
@@ -229,11 +245,14 @@ public class RunningQueryManager {
      * @param schemaName Schema name.
      * @param loc Local query flag.
      * @param cancel Query cancel. Should be passed in case query is cancelable, or {@code null} otherwise.
+     * @param enforceJoinOrder Enforce join order flag.
+     * @param lazy Lazy flag.
+     * @param distributedJoins Distributed joins flag.
      * @return Id of registered query. Id is a positive number.
      */
     public long register(String qry, GridCacheQueryType qryType, String schemaName, boolean loc,
         @Nullable GridQueryCancel cancel,
-        String qryInitiatorId) {
+        String qryInitiatorId, boolean enforceJoinOrder, boolean lazy, boolean distributedJoins) {
         long qryId = qryIdGen.incrementAndGet();
 
         if (qryInitiatorId == null)
@@ -250,6 +269,9 @@ public class RunningQueryManager {
             cancel,
             loc,
             qryInitiatorId,
+            enforceJoinOrder,
+            lazy,
+            distributedJoins,
             securitySubjectId(ctx)
         );
 
@@ -262,6 +284,41 @@ public class RunningQueryManager {
 
         run.span().addTag(SQL_QRY_ID, run::globalQueryId);
 
+        if (!qryStartedListeners.isEmpty()) {
+            GridQueryStartedInfo info = new GridQueryStartedInfo(
+                run.id(),
+                localNodeId,
+                run.query(),
+                run.queryType(),
+                run.schemaName(),
+                run.startTime(),
+                run.cancelable(),
+                run.local(),
+                run.enforceJoinOrder(),
+                run.lazy(),
+                run.distributedJoins(),
+                run.queryInitiatorId()
+            );
+
+            try {
+                closure.runLocal(
+                    () -> qryStartedListeners.forEach(lsnr -> {
+                        try {
+                            lsnr.accept(info);
+                        }
+                        catch (Exception ex) {
+                            log.error("Listener fails during handling query started" +
+                                " event [qryId=" + qryId + "]", ex);
+                        }
+                    }),
+                    GridIoPolicy.PUBLIC_POOL
+                );
+            }
+            catch (IgniteCheckedException ex) {
+                throw new IgniteException(ex.getMessage(), ex);
+            }
+        }
+
         return qryId;
     }
 
@@ -289,6 +346,43 @@ public class RunningQueryManager {
             if (failed)
                 qrySpan.addTag(ERROR, failReason::getMessage);
 
+            if (!qryFinishedListeners.isEmpty()) {
+                GridQueryFinishedInfo info = new GridQueryFinishedInfo(
+                    qry.id(),
+                    localNodeId,
+                    qry.query(),
+                    qry.queryType(),
+                    qry.schemaName(),
+                    qry.startTime(),
+                    System.currentTimeMillis(),
+                    qry.local(),
+                    qry.enforceJoinOrder(),
+                    qry.lazy(),
+                    qry.distributedJoins(),
+                    failed,
+                    failReason,
+                    qry.queryInitiatorId()
+                );
+
+                try {
+                    closure.runLocal(
+                        () -> qryFinishedListeners.forEach(lsnr -> {
+                            try {
+                                lsnr.accept(info);
+                            }
+                            catch (Exception ex) {
+                                log.error("Listener fails during handling query finished" +
+                                    " event [qryId=" + qryId + "]", ex);
+                            }
+                        }),
+                        GridIoPolicy.PUBLIC_POOL
+                    );
+                }
+                catch (IgniteCheckedException ex) {
+                    throw new IgniteException(ex.getMessage(), ex);
+                }
+            }
+
             //We need to collect query history and metrics only for SQL queries.
             if (isSqlQuery(qry)) {
                 qry.runningFuture().onDone();
@@ -349,6 +443,44 @@ public class RunningQueryManager {
         return res;
     }
 
+    /**
+     * @param lsnr Listener.
+     */
+    public void registerQueryStartedListener(Consumer<GridQueryStartedInfo> lsnr) {
+        A.notNull(lsnr, "lsnr");
+
+        qryStartedListeners.add(lsnr);
+    }
+
+    /**
+     * @param lsnr Listener.
+     */
+    @SuppressWarnings("SuspiciousMethodCalls")
+    public boolean unregisterQueryStartedListener(Object lsnr) {
+        A.notNull(lsnr, "lsnr");
+
+        return qryStartedListeners.remove(lsnr);
+    }
+
+    /**
+     * @param lsnr Listener.
+     */
+    public void registerQueryFinishedListener(Consumer<GridQueryFinishedInfo> lsnr) {
+        A.notNull(lsnr, "lsnr");
+
+        qryFinishedListeners.add(lsnr);
+    }
+
+    /**
+     * @param lsnr Listener.
+     */
+    @SuppressWarnings("SuspiciousMethodCalls")
+    public boolean unregisterQueryFinishedListener(Object lsnr) {
+        A.notNull(lsnr, "lsnr");
+
+        return qryFinishedListeners.remove(lsnr);
+    }
+
     /**
      * Check belongs running query to an SQL type.
      *
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 7a5c824e60e..9788f2987bc 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -36,6 +36,7 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
 import java.util.stream.Stream;
 import javax.cache.CacheException;
 import org.apache.ignite.IgniteCheckedException;
@@ -101,8 +102,11 @@ import org.apache.ignite.internal.processors.query.GridQueryCancel;
 import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
 import org.apache.ignite.internal.processors.query.GridQueryFieldsResult;
 import org.apache.ignite.internal.processors.query.GridQueryFieldsResultAdapter;
+import org.apache.ignite.internal.processors.query.GridQueryFinishedInfo;
 import org.apache.ignite.internal.processors.query.GridQueryIndexing;
 import org.apache.ignite.internal.processors.query.GridQueryProperty;
+import org.apache.ignite.internal.processors.query.GridQueryRowCacheCleaner;
+import org.apache.ignite.internal.processors.query.GridQueryStartedInfo;
 import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
 import org.apache.ignite.internal.processors.query.IgniteSQLException;
 import org.apache.ignite.internal.processors.query.QueryField;
@@ -512,13 +516,29 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         H2TableDescriptor tbl = schemaMgr.tableForType(schemaName, cacheName, typeName);
 
         if (tbl != null && tbl.luceneIndex() != null) {
-            long qryId = runningQueryManager().register(qry, TEXT, schemaName, true, null, null);
+            long qryId = runningQueryManager().register(
+                qry,
+                TEXT,
+                schemaName,
+                true,
+                null,
+                null,
+                false,
+                false,
+                false
+            );
 
+            Throwable failReason = null;
             try {
                 return tbl.luceneIndex().query(qry.toUpperCase(), filters, limit);
             }
+            catch (Throwable t) {
+                failReason = t;
+
+                throw t;
+            }
             finally {
-                runningQueryManager().unregister(qryId, null);
+                runningQueryManager().unregister(qryId, failReason);
             }
         }
 
@@ -737,7 +757,10 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             schemaName,
             true,
             null,
-            qryInitiatorId
+            qryInitiatorId,
+            false,
+            false,
+            false
         );
 
         Exception failReason = null;
@@ -1612,7 +1635,10 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             qryDesc.schemaName(),
             qryDesc.local(),
             cancel,
-            qryDesc.queryInitiatorId()
+            qryDesc.queryInitiatorId(),
+            qryDesc.enforceJoinOrder(),
+            qryParams.lazy(),
+            qryDesc.distributedJoins()
         );
 
         if (ctx.event().isRecordable(EVT_SQL_QUERY_EXECUTION)) {
@@ -1661,6 +1687,34 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         }
     }
 
+    /**
+     * @param lsnr Listener.
+     */
+    public void registerQueryStartedListener(Consumer<GridQueryStartedInfo> lsnr) {
+        runningQueryManager().registerQueryStartedListener(lsnr);
+    }
+
+    /**
+     * @param lsnr Listener.
+     */
+    public boolean unregisterQueryStartedListener(Object lsnr) {
+        return runningQueryManager().unregisterQueryStartedListener(lsnr);
+    }
+
+    /**
+     * @param lsnr Listener.
+     */
+    public void registerQueryFinishedListener(Consumer<GridQueryFinishedInfo> lsnr) {
+        runningQueryManager().registerQueryFinishedListener(lsnr);
+    }
+
+    /**
+     * @param lsnr Listener.
+     */
+    public boolean unregisterQueryFinishedListener(Object lsnr) {
+        return runningQueryManager().unregisterQueryFinishedListener(lsnr);
+    }
+
     /** {@inheritDoc} */
     @Override public UpdateSourceIterator<?> executeUpdateOnDataNodeTransactional(
         GridCacheContext<?, ?> cctx,
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/IgniteSqlQueryStartFinishListenerTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/IgniteSqlQueryStartFinishListenerTest.java
new file mode 100644
index 00000000000..5e9e3dfa27e
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/IgniteSqlQueryStartFinishListenerTest.java
@@ -0,0 +1,496 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cache.query.TextQuery;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.processors.cache.index.AbstractIndexingCommonTest;
+import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
+import org.apache.ignite.internal.processors.query.GridQueryFinishedInfo;
+import org.apache.ignite.internal.processors.query.GridQueryStartedInfo;
+import org.apache.ignite.internal.processors.query.IgniteSQLException;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.hamcrest.CustomMatcher;
+import org.hamcrest.Matcher;
+import org.junit.After;
+import org.junit.Test;
+
+import static org.apache.ignite.internal.processors.query.QueryUtils.SCHEMA_SYS;
+import static org.hamcrest.core.Is.is;
+import static org.hamcrest.core.IsEqual.equalTo;
+import static org.hamcrest.core.IsNull.notNullValue;
+import static org.junit.Assert.assertThat;
+
+/** Test for SQL query listeners. */
+public class IgniteSqlQueryStartFinishListenerTest extends AbstractIndexingCommonTest {
+    /** Client node name. */
+    private static final String CLIENT_NODE_NAME = "CLIENT_NODE";
+
+    /** Client node name. */
+    private static final String SERVER_NODE_NAME = "SERVER_NODE";
+
+    /** Listeners. */
+    private final List<Object> lsnrs = new ArrayList<>();
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGrid(SERVER_NODE_NAME);
+        startClientGrid(CLIENT_NODE_NAME);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        super.afterTestsStopped();
+
+        stopAllGrids();
+    }
+
+    /** */
+    @After
+    public void unregisterListeners() {
+        lsnrs.forEach(indexing()::unregisterQueryFinishedListener);
+        lsnrs.forEach(indexing()::unregisterQueryStartedListener);
+
+        lsnrs.clear();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        return super.getConfiguration(gridName)
+            .setSqlSchemas("TEST1")
+            .setCacheConfiguration(
+                new CacheConfiguration<String, String>(DEFAULT_CACHE_NAME)
+                    .setQueryEntities(Collections.singleton(new QueryEntity(String.class, String.class)))
+                    .setSqlFunctionClasses(GridTestUtils.SqlTestFunctions.class)
+            );
+    }
+
+    /**
+     * Ensure you could register and unregister a listener for query start/finish events:
+     *     - register listeners
+     *     - execute a query
+     *     - ensure both listeneres were notified
+     *     - unregister the query start listener
+     *     - run a query one more time
+     *     - ensure only one listener was notified
+     *     - unregister the query finish listener and register new one
+     *     - run a query one more time
+     *     - ensure only new listener was notified
+     *
+     * @throws Exception In case of error.
+     */
+    @Test
+    public void testRegisterUnregisterQueryListeners() throws Exception {
+        final AtomicInteger qryStarted = new AtomicInteger();
+        final AtomicInteger qryFinished = new AtomicInteger();
+
+        final Consumer<GridQueryStartedInfo> qryStartedLsnr = registerQueryStartedListener(info -> qryStarted.incrementAndGet());
+        final Consumer<GridQueryFinishedInfo> qryFinishedLsnr = registerQueryFinishedListener(info -> qryFinished.incrementAndGet());
+
+        {
+            execSql(SCHEMA_SYS, "select * from caches");
+
+            assertWithTimeout(qryStarted::get, is(equalTo(1)), 1_000);
+            assertWithTimeout(qryFinished::get, is(equalTo(1)), 1_000);
+        }
+
+        {
+            assertTrue(indexing().unregisterQueryStartedListener(qryStartedLsnr));
+
+            execSql(SCHEMA_SYS, "select * from caches");
+
+            assertWithTimeout(qryFinished::get, is(equalTo(2)), 1_000);
+            assertWithTimeout(qryStarted::get, is(equalTo(1)), 1_000);
+        }
+
+        {
+            assertTrue(indexing().unregisterQueryFinishedListener(qryFinishedLsnr));
+
+            final CountDownLatch latch = new CountDownLatch(1);
+
+            registerQueryFinishedListener(info -> latch.countDown());
+
+            execSql(SCHEMA_SYS, "select * from caches");
+
+            latch.await(1, TimeUnit.SECONDS);
+
+            assertWithTimeout(qryFinished::get, is(equalTo(2)), 1_000);
+            assertWithTimeout(qryStarted::get, is(equalTo(1)), 1_000);
+        }
+    }
+
+    /**
+     * Ensure listeners are notified with an actual query info:
+     *     - register listeners
+     *     - execute different queries
+     *     - verify query info passed to listeners
+     */
+    @Test
+    @SuppressWarnings("ThrowableNotThrown")
+    public void testVerifyQueryInfoPassedToListeners() throws Exception {
+        final AtomicReference<GridQueryStartedInfo> qryStarted = new AtomicReference<>();
+        final AtomicReference<GridQueryFinishedInfo> qryFinished = new AtomicReference<>();
+
+        registerQueryStartedListener(qryStarted::set);
+        registerQueryFinishedListener(qryFinished::set);
+
+        {
+            final long delay = 100;
+            final String qry = "select * from caches where ? = \"default\".delay(?) limit 1";
+
+            execSql(SCHEMA_SYS, qry, delay, delay);
+
+            assertWithTimeout(qryStarted::get, is(notNullValue()), 1_000);
+
+            GridQueryStartedInfo startedInfo = qryStarted.get();
+            assertEquals(SCHEMA_SYS, startedInfo.schemaName());
+            assertEquals(qry, startedInfo.query());
+            assertEquals(grid(SERVER_NODE_NAME).localNode().id(), startedInfo.nodeId());
+            assertEquals(false, startedInfo.local());
+            assertEquals(GridCacheQueryType.SQL_FIELDS, startedInfo.queryType());
+
+            assertWithTimeout(qryFinished::get, is(notNullValue()), 1_000);
+
+            GridQueryFinishedInfo finishedInfo = qryFinished.get();
+            assertEquals(SCHEMA_SYS, finishedInfo.schemaName());
+            assertEquals(qry, finishedInfo.query());
+            assertEquals(grid(SERVER_NODE_NAME).localNode().id(), startedInfo.nodeId());
+            assertEquals(false, finishedInfo.local());
+            assertEquals(GridCacheQueryType.SQL_FIELDS, finishedInfo.queryType());
+            assertEquals(false, finishedInfo.failed());
+            assertThat(finishedInfo.finishTime() - finishedInfo.startTime(), is(greaterOrEqualTo(delay)));
+
+            qryStarted.set(null);
+            qryFinished.set(null);
+        }
+
+        {
+            final String schema = "TEST1";
+            final String qry = "select \"default\".can_fail() from " + SCHEMA_SYS + ".caches where ? = ? limit 1";
+
+            GridTestUtils.SqlTestFunctions.fail = true;
+
+            GridTestUtils.assertThrowsWithCause(() -> execSqlLocal(schema, qry, 1, 1), IgniteSQLException.class);
+
+            assertWithTimeout(qryStarted::get, is(notNullValue()), 1_000);
+
+            GridQueryStartedInfo startedInfo = qryStarted.get();
+            assertEquals(schema, startedInfo.schemaName());
+            assertEquals(qry, startedInfo.query());
+            assertEquals(grid(SERVER_NODE_NAME).localNode().id(), startedInfo.nodeId());
+            assertEquals(true, startedInfo.local());
+            assertEquals(GridCacheQueryType.SQL_FIELDS, startedInfo.queryType());
+
+            assertWithTimeout(qryFinished::get, is(notNullValue()), 1_000);
+
+            GridQueryFinishedInfo finishedInfo = qryFinished.get();
+            assertEquals(schema, finishedInfo.schemaName());
+            assertEquals(qry, finishedInfo.query());
+            assertEquals(grid(SERVER_NODE_NAME).localNode().id(), startedInfo.nodeId());
+            assertEquals(true, finishedInfo.local());
+            assertEquals(GridCacheQueryType.SQL_FIELDS, finishedInfo.queryType());
+            assertEquals(true, finishedInfo.failed());
+            assertThat(finishedInfo.finishTime(), is(greaterOrEqualTo(finishedInfo.startTime())));
+
+            qryStarted.set(null);
+            qryFinished.set(null);
+        }
+
+        {
+            final String qry = "text query";
+
+            IgniteCache<String, Object> cache = grid(CLIENT_NODE_NAME).cache(DEFAULT_CACHE_NAME);
+
+            cache.query(new TextQuery<String, String>(String.class, "text query"));
+
+            assertWithTimeout(qryStarted::get, is(notNullValue()), 1_000);
+
+            GridQueryStartedInfo startedInfo = qryStarted.get();
+            assertEquals(cache.getName(), startedInfo.schemaName());
+            assertEquals(qry, startedInfo.query());
+            assertEquals(grid(SERVER_NODE_NAME).localNode().id(), startedInfo.nodeId());
+            assertEquals(true, startedInfo.local());
+            assertEquals(GridCacheQueryType.TEXT, startedInfo.queryType());
+
+            assertWithTimeout(qryFinished::get, is(notNullValue()), 1_000);
+
+            GridQueryFinishedInfo finishedInfo = qryFinished.get();
+            assertEquals(cache.getName(), finishedInfo.schemaName());
+            assertEquals(qry, finishedInfo.query());
+            assertEquals(grid(SERVER_NODE_NAME).localNode().id(), startedInfo.nodeId());
+            assertEquals(true, finishedInfo.local());
+            assertEquals(GridCacheQueryType.TEXT, finishedInfo.queryType());
+            assertEquals(false, finishedInfo.failed());
+            assertThat(finishedInfo.finishTime(), is(greaterOrEqualTo(finishedInfo.startTime())));
+
+            qryStarted.set(null);
+            qryFinished.set(null);
+        }
+    }
+
+    /**
+     * Ensure listeners do not block query execution
+     *     - register blocking listeners
+     *     - execute a lot of queries
+     *     - verify all queries finished while listeners is still blocked
+     */
+    @Test
+    public void testListeneresNotBlocksQueryExecution() throws IgniteCheckedException {
+        final CountDownLatch latch = new CountDownLatch(1);
+        final AtomicInteger lsnrCalls = new AtomicInteger();
+
+        final int quryRuns = 1_000;
+        final int threadCnt = 20;
+
+        registerQueryStartedListener(info -> {
+            try {
+                latch.await();
+            }
+            catch (InterruptedException ignored) {
+            }
+
+            lsnrCalls.incrementAndGet();
+        });
+        registerQueryFinishedListener(info -> {
+            try {
+                latch.await();
+            }
+            catch (InterruptedException ignored) {
+            }
+
+            lsnrCalls.incrementAndGet();
+        });
+
+        IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(() -> {
+            for (int i = 0; i < quryRuns; i++)
+                execSql(SCHEMA_SYS, "select * from caches");
+        }, threadCnt, "test-async-query-runner");
+
+        try {
+            fut.get(15_000);
+        }
+        finally {
+            latch.countDown();
+        }
+
+        assertWithTimeout(lsnrCalls::get, is(equalTo(2 * threadCnt * quryRuns)), 15_000);
+    }
+
+    /**
+     * Ensure notification chain is not interrupted if exception was thrown in the middle of the chain
+     *     - register several listeners such the listener will throw exception if condition is met
+     *     - execute query several times, one of the listeners from each chain should fail
+     *     - verify all other listeners were notified
+     */
+    @Test
+    public void testFailedListenereNotAffectOthers() throws IgniteCheckedException {
+        final int lsnrCnt = 3;
+        final long waitTimeout = 1_000;
+
+        boolean[] startLsnrsNotified = new boolean[lsnrCnt];
+        boolean[] finishLsnrsNotified = new boolean[lsnrCnt];
+        boolean[] startLsnrShouldFail = new boolean[lsnrCnt];
+        boolean[] finishLsnrShouldFail = new boolean[lsnrCnt];
+
+        for (int i = 0; i < lsnrCnt; i++) {
+            final int lsnNo = i;
+
+            registerQueryStartedListener(info -> {
+                if (startLsnrShouldFail[lsnNo]) {
+                    startLsnrShouldFail[lsnNo] = false;
+
+                    throw new RuntimeException("Start listener fails");
+                }
+
+                startLsnrsNotified[lsnNo] = true;
+            });
+
+            registerQueryFinishedListener(info -> {
+                if (finishLsnrShouldFail[lsnNo]) {
+                    finishLsnrShouldFail[lsnNo] = false;
+
+                    throw new RuntimeException("Finish listener fails");
+                }
+
+                finishLsnrsNotified[lsnNo] = true;
+            });
+        }
+
+        {
+            startLsnrShouldFail[0] = true;
+            finishLsnrShouldFail[1] = true;
+
+            execSql(SCHEMA_SYS, "select * from caches");
+
+            assertWithTimeout(() -> startLsnrsNotified[0], isFalse(), waitTimeout);
+            assertWithTimeout(() -> startLsnrsNotified[1], isTrue(), waitTimeout);
+            assertWithTimeout(() -> startLsnrsNotified[2], isTrue(), waitTimeout);
+
+            assertWithTimeout(() -> finishLsnrsNotified[0], isTrue(), waitTimeout);
+            assertWithTimeout(() -> finishLsnrsNotified[1], isFalse(), waitTimeout);
+            assertWithTimeout(() -> finishLsnrsNotified[2], isTrue(), waitTimeout);
+
+            resetListeners(startLsnrsNotified, finishLsnrsNotified);
+        }
+
+        {
+            startLsnrShouldFail[1] = true;
+            finishLsnrShouldFail[2] = true;
+
+            execSql(SCHEMA_SYS, "select * from caches");
+
+            assertWithTimeout(() -> startLsnrsNotified[0], isTrue(), waitTimeout);
+            assertWithTimeout(() -> startLsnrsNotified[1], isFalse(), waitTimeout);
+            assertWithTimeout(() -> startLsnrsNotified[2], isTrue(), waitTimeout);
+
+            assertWithTimeout(() -> finishLsnrsNotified[0], isTrue(), waitTimeout);
+            assertWithTimeout(() -> finishLsnrsNotified[1], isTrue(), waitTimeout);
+            assertWithTimeout(() -> finishLsnrsNotified[2], isFalse(), waitTimeout);
+
+            resetListeners(startLsnrsNotified, finishLsnrsNotified);
+        }
+
+        {
+            startLsnrShouldFail[2] = true;
+            finishLsnrShouldFail[0] = true;
+
+            execSql(SCHEMA_SYS, "select * from caches");
+
+            assertWithTimeout(() -> startLsnrsNotified[0], isTrue(), waitTimeout);
+            assertWithTimeout(() -> startLsnrsNotified[1], isTrue(), waitTimeout);
+            assertWithTimeout(() -> startLsnrsNotified[2], isFalse(), waitTimeout);
+
+            assertWithTimeout(() -> finishLsnrsNotified[0], isFalse(), waitTimeout);
+            assertWithTimeout(() -> finishLsnrsNotified[1], isTrue(), waitTimeout);
+            assertWithTimeout(() -> finishLsnrsNotified[2], isTrue(), waitTimeout);
+
+            resetListeners(startLsnrsNotified, finishLsnrsNotified);
+        }
+    }
+
+    /**
+     * Sets all elements from each array to {@code false}.
+     */
+    private void resetListeners(boolean[]... arrays) {
+        for (boolean[] arr : arrays)
+            Arrays.fill(arr, false);
+    }
+
+    /** */
+    private IgniteH2Indexing indexing() {
+        return (IgniteH2Indexing)grid(SERVER_NODE_NAME).context().query().getIndexing();
+    }
+
+    /** */
+    private List<List<?>> execSql(String schema, String sql, Object... args) {
+        return grid(SERVER_NODE_NAME).cache(DEFAULT_CACHE_NAME).query(
+            new SqlFieldsQuery(sql).setSchema(schema).setArgs(args).setLocal(false)
+        ).getAll();
+    }
+
+    /** */
+    private List<List<?>> execSqlLocal(String schema, String sql, Object... args) {
+        return grid(SERVER_NODE_NAME).cache(DEFAULT_CACHE_NAME).query(
+            new SqlFieldsQuery(sql).setSchema(schema).setArgs(args).setLocal(true)
+        ).getAll();
+    }
+
+    /**
+     * @param lsnr Listener.
+     */
+    private Consumer<GridQueryStartedInfo> registerQueryStartedListener(Consumer<GridQueryStartedInfo> lsnr) {
+        lsnrs.add(lsnr);
+
+        indexing().registerQueryStartedListener(lsnr);
+
+        return lsnr;
+    }
+
+
+    /**
+     * @param lsnr Listener.
+     */
+    private Consumer<GridQueryFinishedInfo> registerQueryFinishedListener(Consumer<GridQueryFinishedInfo> lsnr) {
+        lsnrs.add(lsnr);
+
+        indexing().registerQueryFinishedListener(lsnr);
+
+        return lsnr;
+    }
+
+    /**
+     * @param actualSupplier Supplier for value to check.
+     * @param matcher Matcher.
+     * @param timeout Timeout.
+     */
+    private <T> void assertWithTimeout(Supplier<T> actualSupplier, Matcher<? super T> matcher, long timeout)
+        throws IgniteInterruptedCheckedException {
+        GridTestUtils.waitForCondition(() -> matcher.matches(actualSupplier.get()), timeout);
+
+        assertThat(actualSupplier.get(), matcher);
+    }
+
+    /**
+     * @param wanted Wanted.
+     */
+    private static <T extends Comparable<? super T>> Matcher<T> greaterOrEqualTo(T wanted) {
+        return new CustomMatcher<T>("should be greater or equal to " + wanted) {
+            @SuppressWarnings({"unchecked", "rawtypes"})
+            @Override public boolean matches(Object item) {
+                return wanted != null && item instanceof Comparable && ((Comparable)item).compareTo(wanted) >= 0;
+            }
+        };
+    }
+
+    /** */
+    private static Matcher<Boolean> isTrue() {
+        return new CustomMatcher<Boolean>("should be true ") {
+            @Override public boolean matches(Object item) {
+                return item instanceof Boolean && (Boolean)item;
+            }
+        };
+    }
+
+    /** */
+    private static Matcher<Boolean> isFalse() {
+        return new CustomMatcher<Boolean>("should be true ") {
+            @Override public boolean matches(Object item) {
+                return item instanceof Boolean && !(Boolean)item;
+            }
+        };
+    }
+}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
index c2722a15e0a..24dc627847a 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
@@ -137,6 +137,7 @@ import org.apache.ignite.internal.processors.query.h2.GridSubqueryJoinOptimizerS
 import org.apache.ignite.internal.processors.query.h2.H2ResultSetIteratorNullifyOnEndSelfTest;
 import org.apache.ignite.internal.processors.query.h2.IgniteSqlBigIntegerKeyTest;
 import org.apache.ignite.internal.processors.query.h2.IgniteSqlQueryMinMaxTest;
+import org.apache.ignite.internal.processors.query.h2.IgniteSqlQueryStartFinishListenerTest;
 import org.apache.ignite.internal.processors.query.h2.QueryDataPageScanTest;
 import org.apache.ignite.internal.processors.query.h2.sql.ExplainSelfTest;
 import org.apache.ignite.internal.processors.query.h2.sql.GridQueryParsingTest;
@@ -302,6 +303,7 @@ import org.junit.runners.Suite;
 
     IgniteCacheMultipleIndexedTypesTest.class,
     IgniteSqlQueryMinMaxTest.class,
+    IgniteSqlQueryStartFinishListenerTest.class,
 
     GridCircularQueueTest.class,
     IndexingSpiQueryWithH2IndexingSelfTest.class,