You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by tl...@apache.org on 2021/04/07 07:35:46 UTC

[ignite] branch master updated: IGNITE-14461 Track down those who initiated a query (#8965)

This is an automated email from the ASF dual-hosted git repository.

tledkov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 6b74e4fc IGNITE-14461 Track down those who initiated a query (#8965)
6b74e4fc is described below

commit 6b74e4fce5b92520970a7ba3cd26b7d007d61a11
Author: tledkov <tl...@gridgain.com>
AuthorDate: Wed Apr 7 10:35:24 2021 +0300

    IGNITE-14461 Track down those who initiated a query (#8965)
---
 .../common/RunningQueryInfoCheckInitiatorTest.java | 420 +++++++++++++++++++++
 .../jdbc/suite/IgniteJdbcDriverTestSuite.java      |   3 +
 .../ignite/jdbc/thin/JdbcThinMetadataSelfTest.java |   2 +
 .../thin/JdbcThinStreamingAbstractSelfTest.java    |   4 +-
 .../apache/ignite/cache/query/SqlFieldsQuery.java  |  53 +++
 .../ignite/internal/jdbc2/JdbcConnection.java      |  37 ++
 .../jdbc2/JdbcQueryMultipleStatementsTask.java     |  11 +
 .../jdbc2/JdbcQueryMultipleStatementsTaskV3.java   |  72 ++++
 .../ignite/internal/jdbc2/JdbcStatement.java       |   8 +-
 .../jdbc2/JdbcStreamedPreparedStatement.java       |   2 +-
 .../systemview/walker/SqlQueryViewWalker.java      |   4 +-
 .../internal/processors/job/GridJobWorker.java     |   5 +
 .../ClientListenerAbstractConnectionContext.java   |  42 ++-
 .../processors/odbc/ClientListenerNioListener.java |  12 +-
 .../odbc/jdbc/JdbcConnectionContext.java           |   7 +-
 .../processors/odbc/jdbc/JdbcRequestHandler.java   |   4 +-
 .../odbc/odbc/OdbcConnectionContext.java           |   9 +-
 .../processors/odbc/odbc/OdbcRequestHandler.java   |  13 +-
 .../platform/client/ClientConnectionContext.java   |   6 +-
 .../cache/ClientCacheSqlFieldsQueryRequest.java    |   2 +
 .../processors/query/GridQueryIndexing.java        |   6 +-
 .../processors/query/GridQueryProcessor.java       |   9 +-
 .../processors/query/GridRunningQueryInfo.java     |  15 +-
 .../processors/query/RunningQueryManager.java      |  11 +-
 .../ignite/spi/systemview/view/SqlQueryView.java   |   6 +
 .../main/resources/META-INF/classnames.properties  |   1 +
 .../processors/query/DummyQueryIndexing.java       |   6 +-
 .../processors/query/h2/IgniteH2Indexing.java      |  20 +-
 .../processors/query/h2/QueryDescriptor.java       |  14 +-
 .../internal/processors/query/h2/QueryParser.java  |   3 +-
 .../org/apache/ignite/spark/impl/QueryHelper.scala |   2 +-
 .../org/apache/ignite/spark/impl/QueryHelper.scala |   2 +-
 32 files changed, 766 insertions(+), 45 deletions(-)

diff --git a/modules/clients/src/test/java/org/apache/ignite/common/RunningQueryInfoCheckInitiatorTest.java b/modules/clients/src/test/java/org/apache/ignite/common/RunningQueryInfoCheckInitiatorTest.java
new file mode 100644
index 0000000..16799ee
--- /dev/null
+++ b/modules/clients/src/test/java/org/apache/ignite/common/RunningQueryInfoCheckInitiatorTest.java
@@ -0,0 +1,420 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.common;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.Phaser;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+import java.util.regex.Pattern;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.query.FieldsQueryCursor;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cache.query.annotations.QuerySqlFunction;
+import org.apache.ignite.client.IgniteClient;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.ClientConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.jdbc.thin.JdbcThinAbstractSelfTest;
+import org.apache.ignite.lang.IgniteRunnable;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Test;
+
+import static org.apache.ignite.IgniteJdbcDriver.CFG_URL_PREFIX;
+
+/**
+ * Tests for query originator.
+ */
+public class RunningQueryInfoCheckInitiatorTest extends JdbcThinAbstractSelfTest {
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        return super.getConfiguration(igniteInstanceName)
+            .setDataStorageConfiguration(new DataStorageConfiguration()
+                .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
+                    .setPersistenceEnabled(true)))
+            .setAuthenticationEnabled(true)
+            .setCacheConfiguration(new CacheConfiguration()
+                .setName("test")
+                .setSqlSchema("TEST")
+                .setSqlFunctionClasses(TestSQLFunctions.class)
+                .setIndexedTypes(Integer.class, Integer.class)
+            );
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        cleanPersistenceDir();
+
+        startGrid(0);
+        startClientGrid(1);
+
+        grid(0).cluster().active(true);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        for (String cache : grid(0).cacheNames()) {
+            if (!cache.equals("test"))
+                grid(0).cache(cache).destroy();
+        }
+
+        super.afterTest();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testUserDefinedInitiatorId() throws Exception {
+        final String initiatorId = "TestUserSpecifiedOriginator";
+
+        Consumer<String> sqlExec = sql -> GridTestUtils.runAsync(() -> {
+            try {
+                grid(0).context().query().querySqlFields(
+                    new SqlFieldsQuery(sql).setQueryInitiatorId(initiatorId), false).getAll();
+            }
+            catch (Exception e) {
+                log.error("Unexpected exception", e);
+                fail("Unexpected exception");
+            }
+        });
+
+        Consumer<String> initiatorChecker = initId0 -> assertEquals(initiatorId, initId0);
+
+        check(sqlExec, initiatorChecker);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testMultipleStatementsUserDefinedInitiatorId() throws Exception {
+        final String initiatorId = "TestUserSpecifiedOriginator";
+
+        GridTestUtils.runAsync(() -> {
+            List<FieldsQueryCursor<List<?>>> curs = grid(0).context().query().querySqlFields(
+                new SqlFieldsQuery("SELECT 'qry0', test.await(); SELECT 'qry1', test.await()")
+                    .setQueryInitiatorId(initiatorId), false, false);
+
+            for (FieldsQueryCursor<List<?>> cur : curs)
+                cur.getAll();
+        });
+
+        assertEquals(initiatorId, initiatorId(grid(0), "qry0", 1000));
+
+        TestSQLFunctions.unlockQuery();
+
+        assertEquals(initiatorId, initiatorId(grid(0), "qry1", 1000));
+
+        TestSQLFunctions.unlockQuery();
+
+        checkRunningQueriesCount(grid(0), 0, 1000);
+    }
+
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testJdbcThinInitiatorId() throws Exception {
+        Consumer<String> sqlExec = sql -> {
+            GridTestUtils.runAsync(() -> {
+                    try (Connection conn = DriverManager.getConnection(
+                        "jdbc:ignite:thin://127.0.0.1:" + clientPort(grid(0)) + "/?user=ignite&password=ignite")) {
+                        try (Statement stmt = conn.createStatement()) {
+                            stmt.execute(sql);
+                        }
+                    }
+                    catch (SQLException e) {
+                        log.error("Unexpected exception", e);
+                    }
+                }
+            );
+        };
+
+        Consumer<String> initiatorChecker = initiatorId -> {
+            assertTrue("Invalid initiator ID: " + initiatorId,
+                Pattern.compile("jdbc-thin:127\\.0\\.0\\.1:[0-9]+@ignite").matcher(initiatorId).matches());
+        };
+
+        check(sqlExec, initiatorChecker);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testThinClientInitiatorId() throws Exception {
+        Consumer<String> sqlExec = sql -> {
+            GridTestUtils.runAsync(() -> {
+                    try (IgniteClient cli = Ignition.startClient(
+                        new ClientConfiguration()
+                            .setAddresses("127.0.0.1:" + clientPort(grid(0)))
+                            .setUserName("ignite")
+                            .setUserPassword("ignite"))) {
+                        cli.query(new SqlFieldsQuery(sql)).getAll();
+                    }
+                    catch (Exception e) {
+                        log.error("Unexpected exception", e);
+                    }
+                }
+            );
+        };
+
+        Consumer<String> initiatorChecker = initiatorId -> {
+            assertTrue("Invalid initiator ID: " + initiatorId, Pattern.compile("cli:127\\.0\\.0\\.1:[0-9]+@ignite")
+                .matcher(initiatorId).matches());
+        };
+
+        check(sqlExec, initiatorChecker);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testJobDefaultInitiatorId() throws Exception {
+        Consumer<String> sqlExec = sql -> {
+            grid(1).cluster().forServers().ignite().compute().runAsync(new TestSqlJob(sql));
+        };
+
+        Consumer<String> initiatorChecker = initiatorId -> {
+            assertTrue("Invalid initiator ID: " + initiatorId,
+                initiatorId.startsWith("task:" + TestSqlJob.class.getName()) &&
+                    initiatorId.endsWith(grid(1).context().localNodeId().toString()));
+        };
+
+        check(sqlExec, initiatorChecker);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testJdbcV2InitiatorId() throws Exception {
+        Consumer<String> sqlExec = sql -> {
+            final UUID grid0NodeId = grid(0).cluster().localNode().id();
+
+            GridTestUtils.runAsync(() -> {
+                    try (Connection conn = DriverManager.getConnection(
+                        CFG_URL_PREFIX + "nodeId=" + grid0NodeId + "@modules/clients/src/test/config/jdbc-config.xml")) {
+                        try (Statement stmt = conn.createStatement()) {
+                            stmt.execute(sql);
+                        }
+                    }
+                    catch (SQLException e) {
+                        log.error("Unexpected exception", e);
+                    }
+                }
+            );
+        };
+
+        Consumer<String> initiatorChecker = initiatorId -> {
+            assertTrue("Invalid initiator ID: " + initiatorId,
+                Pattern.compile("jdbc-v2:127\\.0\\.0\\.1:sqlGrid-ignite-jdbc-driver-[0-9a-fA-F-]+").matcher(initiatorId).matches());
+        };
+
+        check(sqlExec, initiatorChecker);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testJdbcThinStreamerInitiatorId() throws Exception {
+        final AtomicBoolean end = new AtomicBoolean();
+
+        IgniteInternalFuture f = GridTestUtils.runAsync(() -> {
+                try (Connection conn = DriverManager.getConnection(
+                    "jdbc:ignite:thin://127.0.0.1:" + clientPort(grid(0)) + "/?user=ignite&password=ignite")) {
+                    try (Statement stmt = conn.createStatement()) {
+                        stmt.execute("CREATE TABLE T (ID INT PRIMARY KEY, VAL INT)");
+
+                        stmt.execute("SET STREAMING ON");
+
+                        for (int i = 0; !end.get(); ++i)
+                            stmt.execute("INSERT INTO T VALUES(" + i + " , 0)");
+                    }
+                }
+                catch (SQLException e) {
+                    log.error("Unexpected exception", e);
+                }
+            });
+
+        Consumer<String> initiatorChecker = initiatorId -> {
+            assertTrue("Invalid initiator ID: " + initiatorId,
+                Pattern.compile("jdbc-thin:127\\.0\\.0\\.1:[0-9]+@ignite").matcher(initiatorId).matches());
+        };
+
+        initiatorChecker.accept(initiatorId(grid(0), "INSERT", 2000));
+
+        end.set(true);
+
+        f.get();
+    }
+
+    /** */
+    private void check(Consumer<String> sqlExec, Consumer<String> initiatorChecker) throws Exception {
+        checkInitiatorId(sqlExec, initiatorChecker, "SELECT test.await()", "await");
+
+        grid(0).context().query().querySqlFields(
+            new SqlFieldsQuery("CREATE TABLE T (ID INT PRIMARY KEY, VAL INT)"), false).getAll();
+
+        U.sleep(500);
+
+        checkInitiatorId(sqlExec, initiatorChecker, "INSERT INTO T VALUES (0, test.await())", "await");
+
+        checkInitiatorId(sqlExec, initiatorChecker, "UPDATE T SET VAL=test.await() WHERE ID = 0", "await");
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void checkInitiatorId(Consumer<String> sqlExecutor, Consumer<String> initiatorChecker,
+        String sql, String sqlMatch)
+        throws Exception {
+        try {
+            sqlExecutor.accept(sql);
+
+            initiatorChecker.accept(initiatorId(grid(0), sqlMatch, 2000));
+        }
+        finally {
+            TestSQLFunctions.unlockQuery();
+        }
+
+        checkRunningQueriesCount(grid(0), 0, 2000);
+    }
+
+    /**
+     * @param node Ignite target node where query must be executed.
+     * @param sqlMatch string to match SQL query with specified initiator ID.
+     * @param timeout Timeout.
+     * @return initiator ID.
+     */
+    private String initiatorId(IgniteEx node, String sqlMatch, int timeout) throws Exception {
+        long t0 = U.currentTimeMillis();
+
+        while (true) {
+            if (U.currentTimeMillis() - t0 > timeout)
+                fail("Timeout. Cannot find query with: " + sqlMatch);
+
+            List<List<?>> res = node.context().query().querySqlFields(
+                new SqlFieldsQuery("SELECT sql, initiator_id FROM SYS.SQL_QUERIES"), false).getAll();
+
+            for (List<?> row : res) {
+                if (((String)row.get(0)).toUpperCase().contains(sqlMatch.toUpperCase()))
+                    return (String)row.get(1);
+            }
+
+            U.sleep(200);
+        }
+    }
+
+    /**
+     * @param node Noe to check running queries.
+     * @param timeout Timeout to finish running queries.
+     */
+    private void checkRunningQueriesCount(IgniteEx node, int expectedQryCount, int timeout) {
+        long t0 = U.currentTimeMillis();
+
+        while (true) {
+            List<List<?>> res = node.context().query().querySqlFields(
+                new SqlFieldsQuery("SELECT * FROM SYS.SQL_QUERIES"), false).getAll();
+
+            res.stream().forEach(System.out::println);
+
+            if (res.size() == expectedQryCount + 1)
+                return;
+
+            if (U.currentTimeMillis() - t0 > timeout)
+                fail("Timeout. There are unexpected running queries: " + res);
+        }
+    }
+
+    /** */
+    private static int clientPort(IgniteEx ign) {
+        return ign.context().sqlListener().port();
+    }
+
+    /**
+     * Utility class with custom SQL functions.
+     */
+    public static class TestSQLFunctions {
+        static final Phaser ph = new Phaser(2);
+
+        /**
+         * Recreate latches. Old latches are released.
+         */
+        static void unlockQuery() {
+            ph.arriveAndAwaitAdvance();
+        }
+
+        /**
+         * Releases cancelLatch that leeds to sending cancel Query and waits until cancel Query is fully processed.
+         *
+         * @return 0;
+         */
+        @QuerySqlFunction
+        public static long await() {
+            try {
+                ph.arriveAndAwaitAdvance();
+            }
+            catch (Exception ignored) {
+                // No-op.
+            }
+
+            return 0;
+        }
+    }
+
+    /** */
+    public static class TestSqlJob implements IgniteRunnable {
+        /** */
+        String sql;
+
+        /** */
+        @IgniteInstanceResource
+        Ignite ign;
+
+        /** */
+        public TestSqlJob(String sql) {
+            this.sql = sql;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void run() {
+            ((IgniteEx)ign).context().query().querySqlFields(
+                new SqlFieldsQuery(sql), false).getAll();
+        }
+    }
+}
diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java
index a3f6005..e1c3ed8 100644
--- a/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java
+++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.jdbc.suite;
 
 import java.security.Security;
+import org.apache.ignite.common.RunningQueryInfoCheckInitiatorTest;
 import org.apache.ignite.internal.jdbc2.JdbcBlobTest;
 import org.apache.ignite.internal.jdbc2.JdbcBulkLoadSelfTest;
 import org.apache.ignite.internal.jdbc2.JdbcConnectionReopenTest;
@@ -110,6 +111,8 @@ import org.junit.runners.Suite;
 @Suite.SuiteClasses({
     QaJdbcTestSuite.class,
 
+    RunningQueryInfoCheckInitiatorTest.class,
+
     JdbcConnectionSelfTest.class,
     JdbcStatementSelfTest.class,
     JdbcPreparedStatementSelfTest.class,
diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinMetadataSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinMetadataSelfTest.java
index b6840f5..d4a3ee0 100644
--- a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinMetadataSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinMetadataSelfTest.java
@@ -37,6 +37,7 @@ import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Set;
+
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.QueryEntity;
 import org.apache.ignite.cache.QueryIndex;
@@ -760,6 +761,7 @@ public class JdbcThinMetadataSelfTest extends JdbcThinAbstractSelfTest {
                 "SYS.SQL_QUERIES.START_TIME.null.26.6",
                 "SYS.SQL_QUERIES.DURATION.null.19",
                 "SYS.SQL_QUERIES.ORIGIN_NODE_ID.null.2147483647",
+                "SYS.SQL_QUERIES.INITIATOR_ID.null.2147483647",
                 "SYS.SCAN_QUERIES.START_TIME.null.19",
                 "SYS.SCAN_QUERIES.TRANSFORMER.null.2147483647",
                 "SYS.SCAN_QUERIES.LOCAL.null.1",
diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingAbstractSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingAbstractSelfTest.java
index 5196818..cbb2326 100644
--- a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingAbstractSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingAbstractSelfTest.java
@@ -621,10 +621,10 @@ public abstract class JdbcThinStreamingAbstractSelfTest extends JdbcStreamingSel
 
         /** {@inheritDoc} */
         @Override public List<Long> streamBatchedUpdateQuery(String schemaName, String qry, List<Object[]> params,
-            SqlClientContext cliCtx) throws IgniteCheckedException {
+            SqlClientContext cliCtx, String qryInitiatorId) throws IgniteCheckedException {
             IndexingWithContext.cliCtx = cliCtx;
 
-            return super.streamBatchedUpdateQuery(schemaName, qry, params, cliCtx);
+            return super.streamBatchedUpdateQuery(schemaName, qry, params, cliCtx, qryInitiatorId);
         }
 
         /** {@inheritDoc} */
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java
index 882fd1f..4983288 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java
@@ -54,6 +54,9 @@ public class SqlFieldsQuery extends Query<List<?>> {
     /** Default value of Query timeout. Default is -1 means no timeout is set. */
     private static final int DFLT_QUERY_TIMEOUT = -1;
 
+    /** Threaded query originator. */
+    private static ThreadLocal<String> threadedQryInitiatorId = new ThreadLocal<>();
+
     /** Do not remove. For tests only. */
     @SuppressWarnings("NonConstantFieldWithUpperCaseName")
     private static boolean DFLT_LAZY;
@@ -96,6 +99,12 @@ public class SqlFieldsQuery extends Query<List<?>> {
     private int updateBatchSize = DFLT_UPDATE_BATCH_SIZE;
 
     /**
+     * Query's originator string (client host+port, user name,
+     * job name or any user's information about query initiator).
+     */
+    private String qryInitiatorId;
+
+    /**
      * Copy constructs SQL fields query.
      *
      * @param qry SQL query.
@@ -112,6 +121,7 @@ public class SqlFieldsQuery extends Query<List<?>> {
         parts = qry.parts;
         schema = qry.schema;
         updateBatchSize = qry.updateBatchSize;
+        qryInitiatorId = qry.qryInitiatorId;
     }
 
     /**
@@ -417,12 +427,55 @@ public class SqlFieldsQuery extends Query<List<?>> {
     }
 
     /**
+     * @return Query's initiator identifier string (client host+port, user name,
+     *       job name or any user's information about query initiator).
+     */
+    public String getQueryInitiatorId() {
+        return qryInitiatorId;
+    }
+
+    /**
+     * @param qryInitiatorId Query's initiator identifier string (client host+port, user name,
+     *      job name or any user's information about query initiator).
+     *
+     * @return {@code this} for chaining.
+     */
+    public SqlFieldsQuery setQueryInitiatorId(String qryInitiatorId) {
+        this.qryInitiatorId = qryInitiatorId;
+
+        return this;
+    }
+
+    /**
      * @return Copy of this query.
      */
     public SqlFieldsQuery copy() {
         return new SqlFieldsQuery(this);
     }
 
+    /**
+     * Used at the Job worker to setup originator by default for current thread.
+     *
+     * @param originator Query's originator string.
+     */
+    public static void setThreadedQueryInitiatorId(String originator) {
+        threadedQryInitiatorId.set(originator);
+    }
+
+    /**
+     * Used at the job worker to clear originator for current thread.
+     */
+    public static void resetThreadedQueryInitiatorId() {
+        threadedQryInitiatorId.remove();
+    }
+
+    /**
+     * @return originator set up by the job worker.
+     */
+    public static String threadedQueryInitiatorId() {
+        return threadedQryInitiatorId.get();
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(SqlFieldsQuery.class, this);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java
index 89dc054..edf30fa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java
@@ -115,6 +115,10 @@ public class JdbcConnection implements Connection {
     private static final IgniteProductVersion CLOSE_CURSOR_TASK_SUPPORTED_SINCE =
         IgniteProductVersion.fromString("2.11.0");
 
+    /** Multiple statements V3 task supported since version. */
+    private static final IgniteProductVersion MULTIPLE_STATEMENTS_TASK_V3_SUPPORTED_SINCE =
+        IgniteProductVersion.fromString("2.11.0");
+
     /**
      * Ignite nodes cache.
      *
@@ -192,6 +196,16 @@ public class JdbcConnection implements Connection {
     final Set<JdbcStatement> statements = new HashSet<>();
 
     /**
+     * Describes the client connection:
+     * - thin cli: "cli:host:port@user_name"
+     * - thin JDBC: "jdbc-thin:host:port@user_name"
+     * - ODBC: "odbc:host:port@user_name"
+     *
+     * Used by the running query view to display query initiator.
+     */
+    private final String clientDesc;
+
+    /**
      * Creates new connection.
      *
      * @param url Connection URL.
@@ -263,6 +277,8 @@ public class JdbcConnection implements Connection {
                 if (schemaName == null)
                     schemaName = QueryUtils.DFLT_SCHEMA;
             }
+
+            clientDesc = "jdbc-v2:" + F.first(ignite.cluster().localNode().addresses()) + ":" + ignite.name();
         }
         catch (Exception e) {
             close();
@@ -890,6 +906,13 @@ public class JdbcConnection implements Connection {
     }
 
     /**
+     * @return {@code true} if multiple statements allowed, {@code false} otherwise.
+     */
+    boolean isMultipleStatementsTaskV3Supported() {
+        return U.isOldestNodeVersionAtLeast(MULTIPLE_STATEMENTS_TASK_V3_SUPPORTED_SINCE, ignite.cluster().nodes());
+    }
+
+    /**
      * @return {@code true} if update on server is enabled, {@code false} otherwise.
      */
     boolean skipReducerOnUpdate() {
@@ -950,6 +973,20 @@ public class JdbcConnection implements Connection {
     }
 
     /**
+     * Describes the client connection:
+     * - thin cli: "cli:host:port@user_name"
+     * - thin JDBC: "jdbc-thin:host:port@user_name"
+     * - ODBC: "odbc:host:port@user_name"
+     *
+     * Used by the running query view to display query initiator.
+     *
+     * @return Client descriptor string.
+     */
+    String clientDescriptor() {
+        return clientDesc;
+    }
+
+    /**
      * JDBC connection validation task.
      */
     private static class JdbcConnectionValidationTask implements IgniteCallable<Boolean> {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcQueryMultipleStatementsTask.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcQueryMultipleStatementsTask.java
index 91af6bc..0287ceb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcQueryMultipleStatementsTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcQueryMultipleStatementsTask.java
@@ -31,6 +31,7 @@ import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
 import org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx;
+import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.lang.IgniteCallable;
 import org.apache.ignite.resources.IgniteInstanceResource;
@@ -124,6 +125,9 @@ class JdbcQueryMultipleStatementsTask implements IgniteCallable<List<JdbcStateme
         qry.setLazy(lazy);
         qry.setSchema(schemaName);
 
+        if (!F.isEmpty(queryInitiatorId()))
+            qry.setQueryInitiatorId(queryInitiatorId());
+
         GridKernalContext ctx = ((IgniteKernal)ignite).context();
 
         List<FieldsQueryCursor<List<?>>> curs = ctx.query().querySqlFields(
@@ -181,4 +185,11 @@ class JdbcQueryMultipleStatementsTask implements IgniteCallable<List<JdbcStateme
     protected boolean allowMultipleStatements() {
         return true;
     }
+
+    /**
+     * @return query initiator identifier.
+     */
+    protected String queryInitiatorId() {
+        return null;
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcQueryMultipleStatementsTaskV3.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcQueryMultipleStatementsTaskV3.java
new file mode 100644
index 0000000..95c32da
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcQueryMultipleStatementsTaskV3.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.jdbc2;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteJdbcDriver;
+
+/**
+ * Task for SQL queries execution through {@link IgniteJdbcDriver}.
+ * The query can contains several SQL statements.
+ */
+class JdbcQueryMultipleStatementsTaskV3 extends JdbcQueryMultipleStatementsTask {
+    /** Serial version uid. */
+    private static final long serialVersionUID = 0L;
+
+    /** Allow multiple statements. */
+    private boolean allowMultipleStatements;
+
+    /** Query initiator ID. */
+    private String qryInitiatorId;
+
+    /**
+     * @param ignite Ignite.
+     * @param schemaName Schema name.
+     * @param sql Sql query.
+     * @param isQry Operation type flag - query or not - to enforce query type check.
+     * @param loc Local execution flag.
+     * @param args Args.
+     * @param fetchSize Fetch size.
+     * @param locQry Local query flag.
+     * @param collocatedQry Collocated query flag.
+     * @param distributedJoins Distributed joins flag.
+     * @param enforceJoinOrder Enforce joins order flag.
+     * @param lazy Lazy query execution flag.
+     * @param allowMultipleStatements Allow multiple statements flag.
+     * @param qryInitiatorId Query initiator ID.
+     */
+    public JdbcQueryMultipleStatementsTaskV3(Ignite ignite, String schemaName, String sql,
+        Boolean isQry, boolean loc, Object[] args, int fetchSize, boolean locQry, boolean collocatedQry,
+        boolean distributedJoins, boolean enforceJoinOrder, boolean lazy,
+        boolean allowMultipleStatements, String qryInitiatorId) {
+        super(ignite, schemaName, sql, isQry, loc, args, fetchSize, locQry, collocatedQry, distributedJoins, enforceJoinOrder, lazy);
+
+        this.allowMultipleStatements = allowMultipleStatements;
+        this.qryInitiatorId = qryInitiatorId;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected boolean allowMultipleStatements() {
+        return allowMultipleStatements;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected String queryInitiatorId() {
+        return qryInitiatorId;
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStatement.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStatement.java
index 244a9f7..20554d7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStatement.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStatement.java
@@ -108,7 +108,13 @@ public class JdbcStatement implements Statement {
         boolean loc = nodeId == null;
         JdbcQueryMultipleStatementsTask qryTask;
 
-        if (!conn.isMultipleStatementsAllowed() && conn.isMultipleStatementsTaskV2Supported()) {
+        if (conn.isMultipleStatementsTaskV3Supported()) {
+            qryTask = new JdbcQueryMultipleStatementsTaskV3(loc ? ignite : null, conn.schemaName(),
+                sql, isQuery, loc, getArgs(), fetchSize, conn.isLocalQuery(),
+                conn.isCollocatedQuery(), conn.isDistributedJoins(), conn.isEnforceJoinOrder(), conn.isLazy(),
+                conn.isMultipleStatementsAllowed(), conn.clientDescriptor());
+        }
+        else if (!conn.isMultipleStatementsAllowed() && conn.isMultipleStatementsTaskV2Supported()) {
             qryTask = new JdbcQueryMultipleStatementsNotAllowTask(loc ? ignite : null, conn.schemaName(),
                 sql, isQuery, loc, getArgs(), fetchSize, conn.isLocalQuery(), conn.isCollocatedQuery(),
                 conn.isDistributedJoins(), conn.isEnforceJoinOrder(), conn.isLazy());
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStreamedPreparedStatement.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStreamedPreparedStatement.java
index 9b838ad..49a4e83 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStreamedPreparedStatement.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStreamedPreparedStatement.java
@@ -53,7 +53,7 @@ class JdbcStreamedPreparedStatement extends JdbcPreparedStatement {
         assert isQuery == null || !isQuery;
 
         long updCnt = conn.ignite().context().query().streamUpdateQuery(conn.cacheName(), conn.schemaName(),
-            streamer, sql, getArgs());
+            streamer, sql, getArgs(), conn.clientDescriptor());
 
         JdbcResultSet rs = new JdbcResultSet(this, updCnt);
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/systemview/walker/SqlQueryViewWalker.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/systemview/walker/SqlQueryViewWalker.java
index c7f08bf..83e6425 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/systemview/walker/SqlQueryViewWalker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/systemview/walker/SqlQueryViewWalker.java
@@ -38,6 +38,7 @@ public class SqlQueryViewWalker implements SystemViewRowAttributeWalker<SqlQuery
         v.accept(4, "duration", long.class);
         v.accept(5, "local", boolean.class);
         v.accept(6, "schemaName", String.class);
+        v.accept(7, "initiatorId", String.class);
     }
 
     /** {@inheritDoc} */
@@ -49,10 +50,11 @@ public class SqlQueryViewWalker implements SystemViewRowAttributeWalker<SqlQuery
         v.acceptLong(4, "duration", row.duration());
         v.acceptBoolean(5, "local", row.local());
         v.accept(6, "schemaName", String.class, row.schemaName());
+        v.accept(7, "initiatorId", String.class, row.initiatorId());
     }
 
     /** {@inheritDoc} */
     @Override public int count() {
-        return 7;
+        return 8;
     }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
index 1ff0daa..fa8add3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
@@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.compute.ComputeExecutionRejectedException;
 import org.apache.ignite.compute.ComputeJob;
@@ -536,6 +537,8 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
         // Make sure flag is not set for current thread.
         HOLD.set(false);
 
+        SqlFieldsQuery.setThreadedQueryInitiatorId("task:" + ses.getTaskName() + ":" + getJobId());
+
         try {
             if (partsReservation != null) {
                 try {
@@ -672,6 +675,8 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
             }
         }
         finally {
+            SqlFieldsQuery.resetThreadedQueryInitiatorId();
+
             if (partsReservation != null)
                 partsReservation.release();
         }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerAbstractConnectionContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerAbstractConnectionContext.java
index 3682596..81c61dc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerAbstractConnectionContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerAbstractConnectionContext.java
@@ -40,6 +40,9 @@ public abstract class ClientListenerAbstractConnectionContext implements ClientL
     /** Kernal context. */
     protected final GridKernalContext ctx;
 
+    /** Nio session. */
+    protected final GridNioSession ses;
+
     /** Security context or {@code null} if security is disabled. */
     private SecurityContext secCtx;
 
@@ -53,14 +56,27 @@ public abstract class ClientListenerAbstractConnectionContext implements ClientL
     protected Map<String, String> userAttrs;
 
     /**
+     * Describes the client connection:
+     * - thin cli: "cli:host:port@user_name"
+     * - thin JDBC: "jdbc-thin:host:port@user_name"
+     * - ODBC: "odbc:host:port@user_name"
+     *
+     * Used by the running query view to display query initiator.
+     */
+    private String clientDesc;
+
+    /**
      * Constructor.
      *
      * @param ctx Kernal context.
-     * @param connId Connection id.
+     * @param ses Client's NIO session.
+     * @param connId Connection ID.
      */
-    protected ClientListenerAbstractConnectionContext(GridKernalContext ctx, long connId) {
+    protected ClientListenerAbstractConnectionContext(
+        GridKernalContext ctx, GridNioSession ses, long connId) {
         this.ctx = ctx;
         this.connId = connId;
+        this.ses = ses;
     }
 
     /**
@@ -142,4 +158,26 @@ public abstract class ClientListenerAbstractConnectionContext implements ClientL
         if (ctx.security().enabled())
             ctx.security().onSessionExpired(secCtx.subject().id());
     }
+
+    /** */
+    protected void initClientDescriptor(String prefix) {
+        clientDesc = prefix + ":" + ses.remoteAddress().getHostString() + ":" + ses.remoteAddress().getPort();
+
+        if (authCtx != null)
+            clientDesc += "@" + authCtx.userName();
+    }
+
+    /**
+     * Describes the client connection:
+     * - thin cli: "cli:host:port@user_name"
+     * - thin JDBC: "jdbc-thin:host:port@user_name"
+     * - ODBC: "odbc:host:port@user_name"
+     *
+     * Used by the running query view to display query initiator.
+     *
+     * @return Client descriptor string.
+     */
+    public String clientDescriptor() {
+        return clientDesc;
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioListener.java
index 393383d..c88844b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioListener.java
@@ -321,7 +321,7 @@ public class ClientListenerNioListener extends GridNioServerListenerAdapter<Clie
         ClientListenerConnectionContext connCtx = null;
 
         try {
-            connCtx = prepareContext(clientType);
+            connCtx = prepareContext(clientType, ses);
 
             ensureClientPermissions(clientType);
 
@@ -377,22 +377,24 @@ public class ClientListenerNioListener extends GridNioServerListenerAdapter<Clie
     /**
      * Prepare context.
      *
+     * @param ses Client's NIO session.
      * @param clientType Client type.
      * @return Context.
      * @throws IgniteCheckedException If failed.
      */
-    private ClientListenerConnectionContext prepareContext(byte clientType) throws IgniteCheckedException {
+    private ClientListenerConnectionContext prepareContext(byte clientType, GridNioSession ses)
+        throws IgniteCheckedException {
         long connId = nextConnectionId();
 
         switch (clientType) {
             case ODBC_CLIENT:
-                return new OdbcConnectionContext(ctx, busyLock, connId, maxCursors);
+                return new OdbcConnectionContext(ctx, ses, busyLock, connId, maxCursors);
 
             case JDBC_CLIENT:
-                return new JdbcConnectionContext(ctx, busyLock, connId, maxCursors);
+                return new JdbcConnectionContext(ctx, ses, busyLock, connId, maxCursors);
 
             case THIN_CLIENT:
-                return new ClientConnectionContext(ctx, connId, maxCursors, thinCfg);
+                return new ClientConnectionContext(ctx, ses, connId, maxCursors, thinCfg);
         }
 
         throw new IgniteCheckedException("Unknown client type: " + clientType);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java
index 64cf609..695166b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java
@@ -110,13 +110,14 @@ public class JdbcConnectionContext extends ClientListenerAbstractConnectionConte
     /**
      * Constructor.
      * @param ctx Kernal Context.
+     * @param ses Client's NIO session.
      * @param busyLock Shutdown busy lock.
      * @param connId Connection ID.
      * @param maxCursors Maximum allowed cursors.
      */
-    public JdbcConnectionContext(GridKernalContext ctx, GridSpinBusyLock busyLock, long connId,
+    public JdbcConnectionContext(GridKernalContext ctx, GridNioSession ses, GridSpinBusyLock busyLock, long connId,
         int maxCursors) {
-        super(ctx, connId);
+        super(ctx, ses, connId);
 
         this.busyLock = busyLock;
         this.maxCursors = maxCursors;
@@ -208,6 +209,8 @@ public class JdbcConnectionContext extends ClientListenerAbstractConnectionConte
 
         protoCtx = new JdbcProtocolContext(ver, features, true);
 
+        initClientDescriptor("jdbc-thin");
+
         parser = new JdbcMessageParser(ctx, protoCtx);
 
         ClientListenerResponseSender sender = new ClientListenerResponseSender() {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java
index ecf6fbf..224a9ae 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java
@@ -1008,6 +1008,7 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler {
         qry.setLazy(cliCtx.isLazy());
         qry.setNestedTxMode(nestedTxMode);
         qry.setSchema(schemaName);
+        qry.setQueryInitiatorId(connCtx.clientDescriptor());
 
         if (cliCtx.updateBatchSize() != null)
             qry.setUpdateBatchSize(cliCtx.updateBatchSize());
@@ -1031,7 +1032,8 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler {
                     qry.getSchema(),
                     cliCtx,
                     qry.getSql(),
-                    qry.batchedArguments()
+                    qry.batchedArguments(),
+                    connCtx.clientDescriptor()
                 );
 
                 for (int i = 0; i < cnt.size(); i++)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcConnectionContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcConnectionContext.java
index 5b78b6e..0a94d24 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcConnectionContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcConnectionContext.java
@@ -93,12 +93,13 @@ public class OdbcConnectionContext extends ClientListenerAbstractConnectionConte
     /**
      * Constructor.
      * @param ctx Kernal Context.
+     * @param ses Client's NIO session.
      * @param busyLock Shutdown busy lock.
      * @param connId Connection ID.
      * @param maxCursors Maximum allowed cursors.
      */
-    public OdbcConnectionContext(GridKernalContext ctx, GridSpinBusyLock busyLock, long connId, int maxCursors) {
-        super(ctx, connId);
+    public OdbcConnectionContext(GridKernalContext ctx, GridNioSession ses, GridSpinBusyLock busyLock, long connId, int maxCursors) {
+        super(ctx, ses, connId);
 
         this.busyLock = busyLock;
         this.maxCursors = maxCursors;
@@ -166,8 +167,10 @@ public class OdbcConnectionContext extends ClientListenerAbstractConnectionConte
             }
         };
 
+        initClientDescriptor("odbc");
+
         handler = new OdbcRequestHandler(ctx, busyLock, sender, maxCursors, distributedJoins, enforceJoinOrder,
-            replicatedOnly, collocated, lazy, skipReducerOnUpdate, actx, nestedTxMode, ver);
+            replicatedOnly, collocated, lazy, skipReducerOnUpdate, actx, nestedTxMode, ver, this);
 
         parser = new OdbcMessageParser(ctx, ver);
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcRequestHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcRequestHandler.java
index 8cc24a1..f80aac3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcRequestHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcRequestHandler.java
@@ -128,6 +128,9 @@ public class OdbcRequestHandler implements ClientListenerRequestHandler {
     /** Response sender. */
     private final ClientListenerResponseSender sender;
 
+    /** Connection context. */
+    private final OdbcConnectionContext connCtx;
+
     /**
      * Constructor.
      * @param ctx Context.
@@ -155,8 +158,12 @@ public class OdbcRequestHandler implements ClientListenerRequestHandler {
         boolean collocated,
         boolean lazy,
         boolean skipReducerOnUpdate,
-        AuthorizationContext actx, NestedTxMode nestedTxMode, ClientListenerProtocolVersion ver) {
+        AuthorizationContext actx,
+        NestedTxMode nestedTxMode,
+        ClientListenerProtocolVersion ver,
+        OdbcConnectionContext connCtx) {
         this.ctx = ctx;
+        this.connCtx = connCtx;
 
         Factory<GridWorker> orderedFactory = new Factory<GridWorker>() {
             @Override public GridWorker create() {
@@ -376,6 +383,7 @@ public class OdbcRequestHandler implements ClientListenerRequestHandler {
         qry.setSchema(OdbcUtils.prepareSchema(schema));
         qry.setSkipReducerOnUpdate(cliCtx.isSkipReducerOnUpdate());
         qry.setNestedTxMode(nestedTxMode);
+        qry.setQueryInitiatorId(connCtx.clientDescriptor());
 
         return qry;
     }
@@ -581,7 +589,8 @@ public class OdbcRequestHandler implements ClientListenerRequestHandler {
                 OdbcUtils.prepareSchema(qry.getSchema()),
                 cliCtx,
                 qry.getSql(),
-                qry.batchedArguments()
+                qry.batchedArguments(),
+                connCtx.clientDescriptor()
             );
         }
         catch (Exception e) {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientConnectionContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientConnectionContext.java
index aea4a7c..2a3b5dd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientConnectionContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientConnectionContext.java
@@ -144,8 +144,8 @@ public class ClientConnectionContext extends ClientListenerAbstractConnectionCon
      * @param maxCursors Max active cursors.
      * @param thinCfg Thin-client configuration.
      */
-    public ClientConnectionContext(GridKernalContext ctx, long connId, int maxCursors, ThinClientConfiguration thinCfg) {
-        super(ctx, connId);
+    public ClientConnectionContext(GridKernalContext ctx, GridNioSession ses, long connId, int maxCursors, ThinClientConfiguration thinCfg) {
+        super(ctx, ses, connId);
 
         this.maxCursors = maxCursors;
         maxActiveTxCnt = thinCfg.getMaxActiveTxPerConnection();
@@ -216,6 +216,8 @@ public class ClientConnectionContext extends ClientListenerAbstractConnectionCon
 
         AuthorizationContext authCtx = authenticate(ses, user, pwd);
 
+        initClientDescriptor("cli");
+
         handler = new ClientRequestHandler(this, authCtx, currentProtocolContext);
         parser = new ClientMessageParser(this, currentProtocolContext);
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheSqlFieldsQueryRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheSqlFieldsQueryRequest.java
index 6a701f8..ed0cb79 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheSqlFieldsQueryRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheSqlFieldsQueryRequest.java
@@ -131,6 +131,8 @@ public class ClientCacheSqlFieldsQueryRequest extends ClientCacheDataRequest imp
         ctx.incrementCursors();
 
         try {
+            qry.setQueryInitiatorId(ctx.clientDescriptor());
+
             // If cacheId is provided, we must check the cache for existence.
             if (cacheId() != 0) {
                 DynamicCacheDescriptor desc = cacheDescriptor(ctx);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
index fe13d81..404895c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
@@ -105,11 +105,12 @@ public interface GridQueryIndexing {
      * @param qry Query.
      * @param params Query parameters.
      * @param streamer Data streamer to feed data to.
+     * @param qryInitiatorId Query initiator ID.
      * @return Update counter.
      * @throws IgniteCheckedException If failed.
      */
     public long streamUpdateQuery(String schemaName, String qry, @Nullable Object[] params,
-        IgniteDataStreamer<?, ?> streamer) throws IgniteCheckedException;
+        IgniteDataStreamer<?, ?> streamer, String qryInitiatorId) throws IgniteCheckedException;
 
     /**
      * Execute a batched INSERT statement using data streamer as receiver.
@@ -118,11 +119,12 @@ public interface GridQueryIndexing {
      * @param qry Query.
      * @param params Query parameters.
      * @param cliCtx Client connection context.
+     * @param qryInitiatorId Query initiator ID.
      * @return Update counters.
      * @throws IgniteCheckedException If failed.
      */
     public List<Long> streamBatchedUpdateQuery(String schemaName, String qry, List<Object[]> params,
-        SqlClientContext cliCtx) throws IgniteCheckedException;
+        SqlClientContext cliCtx, String qryInitiatorId) throws IgniteCheckedException;
 
     /**
      * Executes text query.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 92b99d8..f335444 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -2912,7 +2912,8 @@ public class GridQueryProcessor extends GridProcessorAdapter {
      * @return Update counter.
      */
     public long streamUpdateQuery(@Nullable final String cacheName, final String schemaName,
-        final IgniteDataStreamer<?, ?> streamer, final String qry, final Object[] args) {
+        final IgniteDataStreamer<?, ?> streamer, final String qry, final Object[] args,
+        String qryInitiatorId) {
         assert streamer != null;
 
         if (!busyLock.enterBusy())
@@ -2923,7 +2924,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
 
             return executeQuery(GridCacheQueryType.SQL_FIELDS, qry, cctx, new IgniteOutClosureX<Long>() {
                 @Override public Long applyx() throws IgniteCheckedException {
-                    return idx.streamUpdateQuery(schemaName, qry, args, streamer);
+                    return idx.streamUpdateQuery(schemaName, qry, args, streamer, qryInitiatorId);
                 }
             }, true);
         }
@@ -2943,7 +2944,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
      * @return Update counters.
      */
     public List<Long> streamBatchedUpdateQuery(final String schemaName, final SqlClientContext cliCtx,
-        final String qry, final List<Object[]> args) {
+        final String qry, final List<Object[]> args, String qryInitiatorId) {
         checkxEnabled();
 
         if (!busyLock.enterBusy())
@@ -2952,7 +2953,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
         try {
             return executeQuery(GridCacheQueryType.SQL_FIELDS, qry, null, new IgniteOutClosureX<List<Long>>() {
                 @Override public List<Long> applyx() throws IgniteCheckedException {
-                    return idx.streamBatchedUpdateQuery(schemaName, qry, args, cliCtx);
+                    return idx.streamBatchedUpdateQuery(schemaName, qry, args, cliCtx, qryInitiatorId);
                 }
             }, true);
         }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java
index e8906500..2b21960 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java
@@ -59,6 +59,9 @@ public class GridRunningQueryInfo {
     /** Span of the running query. */
     private final Span span;
 
+    /** Originator. */
+    private final String qryInitiatorId;
+
     /** Request ID. */
     private long reqId;
 
@@ -84,7 +87,8 @@ public class GridRunningQueryInfo {
         long startTime,
         long startTimeNanos,
         GridQueryCancel cancel,
-        boolean loc
+        boolean loc,
+        String qryInitiatorId
     ) {
         this.id = id;
         this.nodeId = nodeId;
@@ -96,6 +100,7 @@ public class GridRunningQueryInfo {
         this.cancel = cancel;
         this.loc = loc;
         this.span = MTC.span();
+        this.qryInitiatorId = qryInitiatorId;
     }
 
     /**
@@ -204,6 +209,14 @@ public class GridRunningQueryInfo {
         return reqId;
     }
 
+    /**
+     * @return Query's originator string (client host+port, user name,
+     * job name or any user's information about query initiator).
+     */
+    public String queryInitiatorId() {
+        return qryInitiatorId;
+    }
+
     /** @param reqId Request ID. */
     public void requestId(long reqId) {
         this.reqId = reqId;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/RunningQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/RunningQueryManager.java
index 98af2dd..d6ae245 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/RunningQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/RunningQueryManager.java
@@ -26,6 +26,8 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.configuration.SqlConfiguration;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.managers.systemview.walker.SqlQueryHistoryViewWalker;
@@ -145,9 +147,13 @@ public class RunningQueryManager {
      * @return Id of registered query.
      */
     public Long register(String qry, GridCacheQueryType qryType, String schemaName, boolean loc,
-        @Nullable GridQueryCancel cancel) {
+        @Nullable GridQueryCancel cancel,
+        String qryInitiatorId) {
         long qryId = qryIdGen.incrementAndGet();
 
+        if (qryInitiatorId == null)
+            qryInitiatorId = SqlFieldsQuery.threadedQueryInitiatorId();
+
         final GridRunningQueryInfo run = new GridRunningQueryInfo(
             qryId,
             localNodeId,
@@ -157,7 +163,8 @@ public class RunningQueryManager {
             System.currentTimeMillis(),
             ctx.performanceStatistics().enabled() ? System.nanoTime() : 0,
             cancel,
-            loc
+            loc,
+            qryInitiatorId
         );
 
         GridRunningQueryInfo preRun = runs.putIfAbsent(qryId, run);
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/systemview/view/SqlQueryView.java b/modules/core/src/main/java/org/apache/ignite/spi/systemview/view/SqlQueryView.java
index ece63f8..e68d844 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/systemview/view/SqlQueryView.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/systemview/view/SqlQueryView.java
@@ -72,6 +72,12 @@ public class SqlQueryView {
         return U.currentTimeMillis() - qry.startTime();
     }
 
+    /** @return Query initiator ID. */
+    @Order(7)
+    public String initiatorId() {
+        return qry.queryInitiatorId();
+    }
+
     /** @return {@code True} if query is local. */
     public boolean local() {
         return qry.local();
diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties
index ea469c3..c20a6c9 100644
--- a/modules/core/src/main/resources/META-INF/classnames.properties
+++ b/modules/core/src/main/resources/META-INF/classnames.properties
@@ -391,6 +391,7 @@ org.apache.ignite.internal.jdbc2.JdbcCloseCursorTask
 org.apache.ignite.internal.jdbc2.JdbcConnection$JdbcConnectionValidationTask
 org.apache.ignite.internal.jdbc2.JdbcDatabaseMetadata$UpdateMetadataTask
 org.apache.ignite.internal.jdbc2.JdbcQueryMultipleStatementsTask
+org.apache.ignite.internal.jdbc2.JdbcQueryMultipleStatementsTaskV3
 org.apache.ignite.internal.jdbc2.JdbcQueryTask
 org.apache.ignite.internal.jdbc2.JdbcQueryTask$1
 org.apache.ignite.internal.jdbc2.JdbcQueryTaskResult
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/query/DummyQueryIndexing.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/query/DummyQueryIndexing.java
index c48517e..d7b9856 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/query/DummyQueryIndexing.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/query/DummyQueryIndexing.java
@@ -83,7 +83,8 @@ public class DummyQueryIndexing implements GridQueryIndexing {
         String schemaName,
         String qry,
         @Nullable Object[] params,
-        IgniteDataStreamer<?, ?> streamer
+        IgniteDataStreamer<?, ?> streamer,
+        String qryInitiatorId
     ) throws IgniteCheckedException {
         return 0;
     }
@@ -93,7 +94,8 @@ public class DummyQueryIndexing implements GridQueryIndexing {
         String schemaName,
         String qry,
         List<Object[]> params,
-        SqlClientContext cliCtx
+        SqlClientContext cliCtx,
+        String qryInitiatorId
     ) throws IgniteCheckedException {
         return null;
     }
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 88a375b..50d9970 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -507,7 +507,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         H2TableDescriptor tbl = schemaMgr.tableForType(schemaName, cacheName, typeName);
 
         if (tbl != null && tbl.luceneIndex() != null) {
-            Long qryId = runningQueryManager().register(qry, TEXT, schemaName, true, null);
+            Long qryId = runningQueryManager().register(qry, TEXT, schemaName, true, null, null);
 
             try {
                 return tbl.luceneIndex().query(qry.toUpperCase(), filters, limit);
@@ -658,11 +658,12 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         String schemaName,
         String qry,
         @Nullable Object[] params,
-        IgniteDataStreamer<?, ?> streamer
+        IgniteDataStreamer<?, ?> streamer,
+        String qryInitiatorId
     ) throws IgniteCheckedException {
         QueryParserResultDml dml = streamerParse(schemaName, qry);
 
-        return streamQuery0(qry, schemaName, streamer, dml, params);
+        return streamQuery0(qry, schemaName, streamer, dml, params, qryInitiatorId);
     }
 
     /** {@inheritDoc} */
@@ -671,7 +672,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         String schemaName,
         String qry,
         List<Object[]> params,
-        SqlClientContext cliCtx
+        SqlClientContext cliCtx,
+        String qryInitiatorId
     ) throws IgniteCheckedException {
         if (cliCtx == null || !cliCtx.isStream()) {
             U.warn(log, "Connection is not in streaming mode.");
@@ -688,7 +690,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         List<Long> ress = new ArrayList<>(params.size());
 
         for (int i = 0; i < params.size(); i++) {
-            long res = streamQuery0(qry, schemaName, streamer, dml, params.get(i));
+            long res = streamQuery0(qry, schemaName, streamer, dml, params.get(i), qryInitiatorId);
 
             ress.add(res);
         }
@@ -709,13 +711,14 @@ public class IgniteH2Indexing implements GridQueryIndexing {
      */
     @SuppressWarnings({"unchecked"})
     private long streamQuery0(String qry, String schemaName, IgniteDataStreamer streamer, QueryParserResultDml dml,
-        final Object[] args) throws IgniteCheckedException {
+        final Object[] args, String qryInitiatorId) throws IgniteCheckedException {
         Long qryId = runningQryMgr.register(
             QueryUtils.INCLUDE_SENSITIVE ? qry : sqlWithoutConst(dml.statement()),
             GridCacheQueryType.SQL_FIELDS,
             schemaName,
             true,
-            null
+            null,
+            qryInitiatorId
         );
 
         Exception failReason = null;
@@ -1577,7 +1580,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             GridCacheQueryType.SQL_FIELDS,
             qryDesc.schemaName(),
             qryDesc.local(),
-            cancel
+            cancel,
+            qryDesc.queryInitiatorId()
         );
 
         if (ctx.event().isRecordable(EVT_SQL_QUERY_EXECUTION)) {
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryDescriptor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryDescriptor.java
index 2b1ceaf..372352a 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryDescriptor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryDescriptor.java
@@ -45,6 +45,9 @@ public class QueryDescriptor {
     /** Batched flag. */
     private final boolean batched;
 
+    /** Query initiator identifier. */
+    private final String qryInitiatorId;
+
     /**
      * @param schemaName Schema name.
      * @param sql Sql.
@@ -62,7 +65,8 @@ public class QueryDescriptor {
         boolean enforceJoinOrder,
         boolean loc,
         boolean skipReducerOnUpdate,
-        boolean batched
+        boolean batched,
+        String qryInitiatorId
     ) {
         this.schemaName = schemaName;
         this.sql = sql;
@@ -72,6 +76,7 @@ public class QueryDescriptor {
         this.loc = loc;
         this.skipReducerOnUpdate = skipReducerOnUpdate;
         this.batched = batched;
+        this.qryInitiatorId = qryInitiatorId;
     }
 
     /**
@@ -130,6 +135,13 @@ public class QueryDescriptor {
         return batched;
     }
 
+    /**
+     * @return Query's originator.
+     */
+    public String queryInitiatorId() {
+        return qryInitiatorId;
+    }
+
     /** {@inheritDoc} */
     @SuppressWarnings("SimplifiableIfStatement")
     @Override public boolean equals(Object o) {
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParser.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParser.java
index b1e5bd1..9f01516 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParser.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParser.java
@@ -802,7 +802,8 @@ public class QueryParser {
             qry.isEnforceJoinOrder(),
             qry.isLocal(),
             skipReducerOnUpdate,
-            batched
+            batched,
+            qry.getQueryInitiatorId()
         );
     }
 }
diff --git a/modules/spark-2.4/src/main/scala/org/apache/ignite/spark/impl/QueryHelper.scala b/modules/spark-2.4/src/main/scala/org/apache/ignite/spark/impl/QueryHelper.scala
index d123b01..625f449 100644
--- a/modules/spark-2.4/src/main/scala/org/apache/ignite/spark/impl/QueryHelper.scala
+++ b/modules/spark-2.4/src/main/scala/org/apache/ignite/spark/impl/QueryHelper.scala
@@ -192,7 +192,7 @@ private[apache] object QueryHelper {
                 }
 
                 qryProcessor.streamUpdateQuery(tblInfo.cacheName,
-                    tblInfo.schemaName, streamer, insertQry, args.toArray)
+                    tblInfo.schemaName, streamer, insertQry, args.toArray, "spark")
             }
         }
         finally {
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/QueryHelper.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/QueryHelper.scala
index d123b01..625f449 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/QueryHelper.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/QueryHelper.scala
@@ -192,7 +192,7 @@ private[apache] object QueryHelper {
                 }
 
                 qryProcessor.streamUpdateQuery(tblInfo.cacheName,
-                    tblInfo.schemaName, streamer, insertQry, args.toArray)
+                    tblInfo.schemaName, streamer, insertQry, args.toArray, "spark")
             }
         }
         finally {