You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by tl...@apache.org on 2021/04/07 07:35:46 UTC
[ignite] branch master updated: IGNITE-14461 Track down those who
initiated a query (#8965)
This is an automated email from the ASF dual-hosted git repository.
tledkov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 6b74e4fc IGNITE-14461 Track down those who initiated a query (#8965)
6b74e4fc is described below
commit 6b74e4fce5b92520970a7ba3cd26b7d007d61a11
Author: tledkov <tl...@gridgain.com>
AuthorDate: Wed Apr 7 10:35:24 2021 +0300
IGNITE-14461 Track down those who initiated a query (#8965)
---
.../common/RunningQueryInfoCheckInitiatorTest.java | 420 +++++++++++++++++++++
.../jdbc/suite/IgniteJdbcDriverTestSuite.java | 3 +
.../ignite/jdbc/thin/JdbcThinMetadataSelfTest.java | 2 +
.../thin/JdbcThinStreamingAbstractSelfTest.java | 4 +-
.../apache/ignite/cache/query/SqlFieldsQuery.java | 53 +++
.../ignite/internal/jdbc2/JdbcConnection.java | 37 ++
.../jdbc2/JdbcQueryMultipleStatementsTask.java | 11 +
.../jdbc2/JdbcQueryMultipleStatementsTaskV3.java | 72 ++++
.../ignite/internal/jdbc2/JdbcStatement.java | 8 +-
.../jdbc2/JdbcStreamedPreparedStatement.java | 2 +-
.../systemview/walker/SqlQueryViewWalker.java | 4 +-
.../internal/processors/job/GridJobWorker.java | 5 +
.../ClientListenerAbstractConnectionContext.java | 42 ++-
.../processors/odbc/ClientListenerNioListener.java | 12 +-
.../odbc/jdbc/JdbcConnectionContext.java | 7 +-
.../processors/odbc/jdbc/JdbcRequestHandler.java | 4 +-
.../odbc/odbc/OdbcConnectionContext.java | 9 +-
.../processors/odbc/odbc/OdbcRequestHandler.java | 13 +-
.../platform/client/ClientConnectionContext.java | 6 +-
.../cache/ClientCacheSqlFieldsQueryRequest.java | 2 +
.../processors/query/GridQueryIndexing.java | 6 +-
.../processors/query/GridQueryProcessor.java | 9 +-
.../processors/query/GridRunningQueryInfo.java | 15 +-
.../processors/query/RunningQueryManager.java | 11 +-
.../ignite/spi/systemview/view/SqlQueryView.java | 6 +
.../main/resources/META-INF/classnames.properties | 1 +
.../processors/query/DummyQueryIndexing.java | 6 +-
.../processors/query/h2/IgniteH2Indexing.java | 20 +-
.../processors/query/h2/QueryDescriptor.java | 14 +-
.../internal/processors/query/h2/QueryParser.java | 3 +-
.../org/apache/ignite/spark/impl/QueryHelper.scala | 2 +-
.../org/apache/ignite/spark/impl/QueryHelper.scala | 2 +-
32 files changed, 766 insertions(+), 45 deletions(-)
diff --git a/modules/clients/src/test/java/org/apache/ignite/common/RunningQueryInfoCheckInitiatorTest.java b/modules/clients/src/test/java/org/apache/ignite/common/RunningQueryInfoCheckInitiatorTest.java
new file mode 100644
index 0000000..16799ee
--- /dev/null
+++ b/modules/clients/src/test/java/org/apache/ignite/common/RunningQueryInfoCheckInitiatorTest.java
@@ -0,0 +1,420 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.common;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.Phaser;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+import java.util.regex.Pattern;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.query.FieldsQueryCursor;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cache.query.annotations.QuerySqlFunction;
+import org.apache.ignite.client.IgniteClient;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.ClientConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.jdbc.thin.JdbcThinAbstractSelfTest;
+import org.apache.ignite.lang.IgniteRunnable;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Test;
+
+import static org.apache.ignite.IgniteJdbcDriver.CFG_URL_PREFIX;
+
+/**
+ * Tests for query originator.
+ */
+public class RunningQueryInfoCheckInitiatorTest extends JdbcThinAbstractSelfTest {
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ return super.getConfiguration(igniteInstanceName)
+ .setDataStorageConfiguration(new DataStorageConfiguration()
+ .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
+ .setPersistenceEnabled(true)))
+ .setAuthenticationEnabled(true)
+ .setCacheConfiguration(new CacheConfiguration()
+ .setName("test")
+ .setSqlSchema("TEST")
+ .setSqlFunctionClasses(TestSQLFunctions.class)
+ .setIndexedTypes(Integer.class, Integer.class)
+ );
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ cleanPersistenceDir();
+
+ startGrid(0);
+ startClientGrid(1);
+
+ grid(0).cluster().active(true);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ for (String cache : grid(0).cacheNames()) {
+ if (!cache.equals("test"))
+ grid(0).cache(cache).destroy();
+ }
+
+ super.afterTest();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testUserDefinedInitiatorId() throws Exception {
+ final String initiatorId = "TestUserSpecifiedOriginator";
+
+ Consumer<String> sqlExec = sql -> GridTestUtils.runAsync(() -> {
+ try {
+ grid(0).context().query().querySqlFields(
+ new SqlFieldsQuery(sql).setQueryInitiatorId(initiatorId), false).getAll();
+ }
+ catch (Exception e) {
+ log.error("Unexpected exception", e);
+ fail("Unexpected exception");
+ }
+ });
+
+ Consumer<String> initiatorChecker = initId0 -> assertEquals(initiatorId, initId0);
+
+ check(sqlExec, initiatorChecker);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testMultipleStatementsUserDefinedInitiatorId() throws Exception {
+ final String initiatorId = "TestUserSpecifiedOriginator";
+
+ GridTestUtils.runAsync(() -> {
+ List<FieldsQueryCursor<List<?>>> curs = grid(0).context().query().querySqlFields(
+ new SqlFieldsQuery("SELECT 'qry0', test.await(); SELECT 'qry1', test.await()")
+ .setQueryInitiatorId(initiatorId), false, false);
+
+ for (FieldsQueryCursor<List<?>> cur : curs)
+ cur.getAll();
+ });
+
+ assertEquals(initiatorId, initiatorId(grid(0), "qry0", 1000));
+
+ TestSQLFunctions.unlockQuery();
+
+ assertEquals(initiatorId, initiatorId(grid(0), "qry1", 1000));
+
+ TestSQLFunctions.unlockQuery();
+
+ checkRunningQueriesCount(grid(0), 0, 1000);
+ }
+
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testJdbcThinInitiatorId() throws Exception {
+ Consumer<String> sqlExec = sql -> {
+ GridTestUtils.runAsync(() -> {
+ try (Connection conn = DriverManager.getConnection(
+ "jdbc:ignite:thin://127.0.0.1:" + clientPort(grid(0)) + "/?user=ignite&password=ignite")) {
+ try (Statement stmt = conn.createStatement()) {
+ stmt.execute(sql);
+ }
+ }
+ catch (SQLException e) {
+ log.error("Unexpected exception", e);
+ }
+ }
+ );
+ };
+
+ Consumer<String> initiatorChecker = initiatorId -> {
+ assertTrue("Invalid initiator ID: " + initiatorId,
+ Pattern.compile("jdbc-thin:127\\.0\\.0\\.1:[0-9]+@ignite").matcher(initiatorId).matches());
+ };
+
+ check(sqlExec, initiatorChecker);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testThinClientInitiatorId() throws Exception {
+ Consumer<String> sqlExec = sql -> {
+ GridTestUtils.runAsync(() -> {
+ try (IgniteClient cli = Ignition.startClient(
+ new ClientConfiguration()
+ .setAddresses("127.0.0.1:" + clientPort(grid(0)))
+ .setUserName("ignite")
+ .setUserPassword("ignite"))) {
+ cli.query(new SqlFieldsQuery(sql)).getAll();
+ }
+ catch (Exception e) {
+ log.error("Unexpected exception", e);
+ }
+ }
+ );
+ };
+
+ Consumer<String> initiatorChecker = initiatorId -> {
+ assertTrue("Invalid initiator ID: " + initiatorId, Pattern.compile("cli:127\\.0\\.0\\.1:[0-9]+@ignite")
+ .matcher(initiatorId).matches());
+ };
+
+ check(sqlExec, initiatorChecker);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testJobDefaultInitiatorId() throws Exception {
+ Consumer<String> sqlExec = sql -> {
+ grid(1).cluster().forServers().ignite().compute().runAsync(new TestSqlJob(sql));
+ };
+
+ Consumer<String> initiatorChecker = initiatorId -> {
+ assertTrue("Invalid initiator ID: " + initiatorId,
+ initiatorId.startsWith("task:" + TestSqlJob.class.getName()) &&
+ initiatorId.endsWith(grid(1).context().localNodeId().toString()));
+ };
+
+ check(sqlExec, initiatorChecker);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testJdbcV2InitiatorId() throws Exception {
+ Consumer<String> sqlExec = sql -> {
+ final UUID grid0NodeId = grid(0).cluster().localNode().id();
+
+ GridTestUtils.runAsync(() -> {
+ try (Connection conn = DriverManager.getConnection(
+ CFG_URL_PREFIX + "nodeId=" + grid0NodeId + "@modules/clients/src/test/config/jdbc-config.xml")) {
+ try (Statement stmt = conn.createStatement()) {
+ stmt.execute(sql);
+ }
+ }
+ catch (SQLException e) {
+ log.error("Unexpected exception", e);
+ }
+ }
+ );
+ };
+
+ Consumer<String> initiatorChecker = initiatorId -> {
+ assertTrue("Invalid initiator ID: " + initiatorId,
+ Pattern.compile("jdbc-v2:127\\.0\\.0\\.1:sqlGrid-ignite-jdbc-driver-[0-9a-fA-F-]+").matcher(initiatorId).matches());
+ };
+
+ check(sqlExec, initiatorChecker);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testJdbcThinStreamerInitiatorId() throws Exception {
+ final AtomicBoolean end = new AtomicBoolean();
+
+ IgniteInternalFuture f = GridTestUtils.runAsync(() -> {
+ try (Connection conn = DriverManager.getConnection(
+ "jdbc:ignite:thin://127.0.0.1:" + clientPort(grid(0)) + "/?user=ignite&password=ignite")) {
+ try (Statement stmt = conn.createStatement()) {
+ stmt.execute("CREATE TABLE T (ID INT PRIMARY KEY, VAL INT)");
+
+ stmt.execute("SET STREAMING ON");
+
+ for (int i = 0; !end.get(); ++i)
+ stmt.execute("INSERT INTO T VALUES(" + i + " , 0)");
+ }
+ }
+ catch (SQLException e) {
+ log.error("Unexpected exception", e);
+ }
+ });
+
+ Consumer<String> initiatorChecker = initiatorId -> {
+ assertTrue("Invalid initiator ID: " + initiatorId,
+ Pattern.compile("jdbc-thin:127\\.0\\.0\\.1:[0-9]+@ignite").matcher(initiatorId).matches());
+ };
+
+ initiatorChecker.accept(initiatorId(grid(0), "INSERT", 2000));
+
+ end.set(true);
+
+ f.get();
+ }
+
+ /** */
+ private void check(Consumer<String> sqlExec, Consumer<String> initiatorChecker) throws Exception {
+ checkInitiatorId(sqlExec, initiatorChecker, "SELECT test.await()", "await");
+
+ grid(0).context().query().querySqlFields(
+ new SqlFieldsQuery("CREATE TABLE T (ID INT PRIMARY KEY, VAL INT)"), false).getAll();
+
+ U.sleep(500);
+
+ checkInitiatorId(sqlExec, initiatorChecker, "INSERT INTO T VALUES (0, test.await())", "await");
+
+ checkInitiatorId(sqlExec, initiatorChecker, "UPDATE T SET VAL=test.await() WHERE ID = 0", "await");
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ private void checkInitiatorId(Consumer<String> sqlExecutor, Consumer<String> initiatorChecker,
+ String sql, String sqlMatch)
+ throws Exception {
+ try {
+ sqlExecutor.accept(sql);
+
+ initiatorChecker.accept(initiatorId(grid(0), sqlMatch, 2000));
+ }
+ finally {
+ TestSQLFunctions.unlockQuery();
+ }
+
+ checkRunningQueriesCount(grid(0), 0, 2000);
+ }
+
+ /**
+ * @param node Ignite target node where query must be executed.
+ * @param sqlMatch string to match SQL query with specified initiator ID.
+ * @param timeout Timeout.
+ * @return initiator ID.
+ */
+ private String initiatorId(IgniteEx node, String sqlMatch, int timeout) throws Exception {
+ long t0 = U.currentTimeMillis();
+
+ while (true) {
+ if (U.currentTimeMillis() - t0 > timeout)
+ fail("Timeout. Cannot find query with: " + sqlMatch);
+
+ List<List<?>> res = node.context().query().querySqlFields(
+ new SqlFieldsQuery("SELECT sql, initiator_id FROM SYS.SQL_QUERIES"), false).getAll();
+
+ for (List<?> row : res) {
+ if (((String)row.get(0)).toUpperCase().contains(sqlMatch.toUpperCase()))
+ return (String)row.get(1);
+ }
+
+ U.sleep(200);
+ }
+ }
+
+ /**
+ * @param node Noe to check running queries.
+ * @param timeout Timeout to finish running queries.
+ */
+ private void checkRunningQueriesCount(IgniteEx node, int expectedQryCount, int timeout) {
+ long t0 = U.currentTimeMillis();
+
+ while (true) {
+ List<List<?>> res = node.context().query().querySqlFields(
+ new SqlFieldsQuery("SELECT * FROM SYS.SQL_QUERIES"), false).getAll();
+
+ res.stream().forEach(System.out::println);
+
+ if (res.size() == expectedQryCount + 1)
+ return;
+
+ if (U.currentTimeMillis() - t0 > timeout)
+ fail("Timeout. There are unexpected running queries: " + res);
+ }
+ }
+
+ /** */
+ private static int clientPort(IgniteEx ign) {
+ return ign.context().sqlListener().port();
+ }
+
+ /**
+ * Utility class with custom SQL functions.
+ */
+ public static class TestSQLFunctions {
+ static final Phaser ph = new Phaser(2);
+
+ /**
+ * Recreate latches. Old latches are released.
+ */
+ static void unlockQuery() {
+ ph.arriveAndAwaitAdvance();
+ }
+
+ /**
+ * Releases cancelLatch that leeds to sending cancel Query and waits until cancel Query is fully processed.
+ *
+ * @return 0;
+ */
+ @QuerySqlFunction
+ public static long await() {
+ try {
+ ph.arriveAndAwaitAdvance();
+ }
+ catch (Exception ignored) {
+ // No-op.
+ }
+
+ return 0;
+ }
+ }
+
+ /** */
+ public static class TestSqlJob implements IgniteRunnable {
+ /** */
+ String sql;
+
+ /** */
+ @IgniteInstanceResource
+ Ignite ign;
+
+ /** */
+ public TestSqlJob(String sql) {
+ this.sql = sql;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void run() {
+ ((IgniteEx)ign).context().query().querySqlFields(
+ new SqlFieldsQuery(sql), false).getAll();
+ }
+ }
+}
diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java
index a3f6005..e1c3ed8 100644
--- a/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java
+++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java
@@ -18,6 +18,7 @@
package org.apache.ignite.jdbc.suite;
import java.security.Security;
+import org.apache.ignite.common.RunningQueryInfoCheckInitiatorTest;
import org.apache.ignite.internal.jdbc2.JdbcBlobTest;
import org.apache.ignite.internal.jdbc2.JdbcBulkLoadSelfTest;
import org.apache.ignite.internal.jdbc2.JdbcConnectionReopenTest;
@@ -110,6 +111,8 @@ import org.junit.runners.Suite;
@Suite.SuiteClasses({
QaJdbcTestSuite.class,
+ RunningQueryInfoCheckInitiatorTest.class,
+
JdbcConnectionSelfTest.class,
JdbcStatementSelfTest.class,
JdbcPreparedStatementSelfTest.class,
diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinMetadataSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinMetadataSelfTest.java
index b6840f5..d4a3ee0 100644
--- a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinMetadataSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinMetadataSelfTest.java
@@ -37,6 +37,7 @@ import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
+
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.QueryEntity;
import org.apache.ignite.cache.QueryIndex;
@@ -760,6 +761,7 @@ public class JdbcThinMetadataSelfTest extends JdbcThinAbstractSelfTest {
"SYS.SQL_QUERIES.START_TIME.null.26.6",
"SYS.SQL_QUERIES.DURATION.null.19",
"SYS.SQL_QUERIES.ORIGIN_NODE_ID.null.2147483647",
+ "SYS.SQL_QUERIES.INITIATOR_ID.null.2147483647",
"SYS.SCAN_QUERIES.START_TIME.null.19",
"SYS.SCAN_QUERIES.TRANSFORMER.null.2147483647",
"SYS.SCAN_QUERIES.LOCAL.null.1",
diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingAbstractSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingAbstractSelfTest.java
index 5196818..cbb2326 100644
--- a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingAbstractSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingAbstractSelfTest.java
@@ -621,10 +621,10 @@ public abstract class JdbcThinStreamingAbstractSelfTest extends JdbcStreamingSel
/** {@inheritDoc} */
@Override public List<Long> streamBatchedUpdateQuery(String schemaName, String qry, List<Object[]> params,
- SqlClientContext cliCtx) throws IgniteCheckedException {
+ SqlClientContext cliCtx, String qryInitiatorId) throws IgniteCheckedException {
IndexingWithContext.cliCtx = cliCtx;
- return super.streamBatchedUpdateQuery(schemaName, qry, params, cliCtx);
+ return super.streamBatchedUpdateQuery(schemaName, qry, params, cliCtx, qryInitiatorId);
}
/** {@inheritDoc} */
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java
index 882fd1f..4983288 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java
@@ -54,6 +54,9 @@ public class SqlFieldsQuery extends Query<List<?>> {
/** Default value of Query timeout. Default is -1 means no timeout is set. */
private static final int DFLT_QUERY_TIMEOUT = -1;
+ /** Threaded query originator. */
+ private static ThreadLocal<String> threadedQryInitiatorId = new ThreadLocal<>();
+
/** Do not remove. For tests only. */
@SuppressWarnings("NonConstantFieldWithUpperCaseName")
private static boolean DFLT_LAZY;
@@ -96,6 +99,12 @@ public class SqlFieldsQuery extends Query<List<?>> {
private int updateBatchSize = DFLT_UPDATE_BATCH_SIZE;
/**
+ * Query's originator string (client host+port, user name,
+ * job name or any user's information about query initiator).
+ */
+ private String qryInitiatorId;
+
+ /**
* Copy constructs SQL fields query.
*
* @param qry SQL query.
@@ -112,6 +121,7 @@ public class SqlFieldsQuery extends Query<List<?>> {
parts = qry.parts;
schema = qry.schema;
updateBatchSize = qry.updateBatchSize;
+ qryInitiatorId = qry.qryInitiatorId;
}
/**
@@ -417,12 +427,55 @@ public class SqlFieldsQuery extends Query<List<?>> {
}
/**
+ * @return Query's initiator identifier string (client host+port, user name,
+ * job name or any user's information about query initiator).
+ */
+ public String getQueryInitiatorId() {
+ return qryInitiatorId;
+ }
+
+ /**
+ * @param qryInitiatorId Query's initiator identifier string (client host+port, user name,
+ * job name or any user's information about query initiator).
+ *
+ * @return {@code this} for chaining.
+ */
+ public SqlFieldsQuery setQueryInitiatorId(String qryInitiatorId) {
+ this.qryInitiatorId = qryInitiatorId;
+
+ return this;
+ }
+
+ /**
* @return Copy of this query.
*/
public SqlFieldsQuery copy() {
return new SqlFieldsQuery(this);
}
+ /**
+ * Used at the Job worker to setup originator by default for current thread.
+ *
+ * @param originator Query's originator string.
+ */
+ public static void setThreadedQueryInitiatorId(String originator) {
+ threadedQryInitiatorId.set(originator);
+ }
+
+ /**
+ * Used at the job worker to clear originator for current thread.
+ */
+ public static void resetThreadedQueryInitiatorId() {
+ threadedQryInitiatorId.remove();
+ }
+
+ /**
+ * @return originator set up by the job worker.
+ */
+ public static String threadedQueryInitiatorId() {
+ return threadedQryInitiatorId.get();
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(SqlFieldsQuery.class, this);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java
index 89dc054..edf30fa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java
@@ -115,6 +115,10 @@ public class JdbcConnection implements Connection {
private static final IgniteProductVersion CLOSE_CURSOR_TASK_SUPPORTED_SINCE =
IgniteProductVersion.fromString("2.11.0");
+ /** Multiple statements V3 task supported since version. */
+ private static final IgniteProductVersion MULTIPLE_STATEMENTS_TASK_V3_SUPPORTED_SINCE =
+ IgniteProductVersion.fromString("2.11.0");
+
/**
* Ignite nodes cache.
*
@@ -192,6 +196,16 @@ public class JdbcConnection implements Connection {
final Set<JdbcStatement> statements = new HashSet<>();
/**
+ * Describes the client connection:
+ * - thin cli: "cli:host:port@user_name"
+ * - thin JDBC: "jdbc-thin:host:port@user_name"
+ * - ODBC: "odbc:host:port@user_name"
+ *
+ * Used by the running query view to display query initiator.
+ */
+ private final String clientDesc;
+
+ /**
* Creates new connection.
*
* @param url Connection URL.
@@ -263,6 +277,8 @@ public class JdbcConnection implements Connection {
if (schemaName == null)
schemaName = QueryUtils.DFLT_SCHEMA;
}
+
+ clientDesc = "jdbc-v2:" + F.first(ignite.cluster().localNode().addresses()) + ":" + ignite.name();
}
catch (Exception e) {
close();
@@ -890,6 +906,13 @@ public class JdbcConnection implements Connection {
}
/**
+ * @return {@code true} if multiple statements allowed, {@code false} otherwise.
+ */
+ boolean isMultipleStatementsTaskV3Supported() {
+ return U.isOldestNodeVersionAtLeast(MULTIPLE_STATEMENTS_TASK_V3_SUPPORTED_SINCE, ignite.cluster().nodes());
+ }
+
+ /**
* @return {@code true} if update on server is enabled, {@code false} otherwise.
*/
boolean skipReducerOnUpdate() {
@@ -950,6 +973,20 @@ public class JdbcConnection implements Connection {
}
/**
+ * Describes the client connection:
+ * - thin cli: "cli:host:port@user_name"
+ * - thin JDBC: "jdbc-thin:host:port@user_name"
+ * - ODBC: "odbc:host:port@user_name"
+ *
+ * Used by the running query view to display query initiator.
+ *
+ * @return Client descriptor string.
+ */
+ String clientDescriptor() {
+ return clientDesc;
+ }
+
+ /**
* JDBC connection validation task.
*/
private static class JdbcConnectionValidationTask implements IgniteCallable<Boolean> {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcQueryMultipleStatementsTask.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcQueryMultipleStatementsTask.java
index 91af6bc..0287ceb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcQueryMultipleStatementsTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcQueryMultipleStatementsTask.java
@@ -31,6 +31,7 @@ import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
import org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgniteCallable;
import org.apache.ignite.resources.IgniteInstanceResource;
@@ -124,6 +125,9 @@ class JdbcQueryMultipleStatementsTask implements IgniteCallable<List<JdbcStateme
qry.setLazy(lazy);
qry.setSchema(schemaName);
+ if (!F.isEmpty(queryInitiatorId()))
+ qry.setQueryInitiatorId(queryInitiatorId());
+
GridKernalContext ctx = ((IgniteKernal)ignite).context();
List<FieldsQueryCursor<List<?>>> curs = ctx.query().querySqlFields(
@@ -181,4 +185,11 @@ class JdbcQueryMultipleStatementsTask implements IgniteCallable<List<JdbcStateme
protected boolean allowMultipleStatements() {
return true;
}
+
+ /**
+ * @return query initiator identifier.
+ */
+ protected String queryInitiatorId() {
+ return null;
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcQueryMultipleStatementsTaskV3.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcQueryMultipleStatementsTaskV3.java
new file mode 100644
index 0000000..95c32da
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcQueryMultipleStatementsTaskV3.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.jdbc2;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteJdbcDriver;
+
+/**
+ * Task for SQL queries execution through {@link IgniteJdbcDriver}.
+ * The query can contains several SQL statements.
+ */
+class JdbcQueryMultipleStatementsTaskV3 extends JdbcQueryMultipleStatementsTask {
+ /** Serial version uid. */
+ private static final long serialVersionUID = 0L;
+
+ /** Allow multiple statements. */
+ private boolean allowMultipleStatements;
+
+ /** Query initiator ID. */
+ private String qryInitiatorId;
+
+ /**
+ * @param ignite Ignite.
+ * @param schemaName Schema name.
+ * @param sql Sql query.
+ * @param isQry Operation type flag - query or not - to enforce query type check.
+ * @param loc Local execution flag.
+ * @param args Args.
+ * @param fetchSize Fetch size.
+ * @param locQry Local query flag.
+ * @param collocatedQry Collocated query flag.
+ * @param distributedJoins Distributed joins flag.
+ * @param enforceJoinOrder Enforce joins order flag.
+ * @param lazy Lazy query execution flag.
+ * @param allowMultipleStatements Allow multiple statements flag.
+ * @param qryInitiatorId Query initiator ID.
+ */
+ public JdbcQueryMultipleStatementsTaskV3(Ignite ignite, String schemaName, String sql,
+ Boolean isQry, boolean loc, Object[] args, int fetchSize, boolean locQry, boolean collocatedQry,
+ boolean distributedJoins, boolean enforceJoinOrder, boolean lazy,
+ boolean allowMultipleStatements, String qryInitiatorId) {
+ super(ignite, schemaName, sql, isQry, loc, args, fetchSize, locQry, collocatedQry, distributedJoins, enforceJoinOrder, lazy);
+
+ this.allowMultipleStatements = allowMultipleStatements;
+ this.qryInitiatorId = qryInitiatorId;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected boolean allowMultipleStatements() {
+ return allowMultipleStatements;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected String queryInitiatorId() {
+ return qryInitiatorId;
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStatement.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStatement.java
index 244a9f7..20554d7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStatement.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStatement.java
@@ -108,7 +108,13 @@ public class JdbcStatement implements Statement {
boolean loc = nodeId == null;
JdbcQueryMultipleStatementsTask qryTask;
- if (!conn.isMultipleStatementsAllowed() && conn.isMultipleStatementsTaskV2Supported()) {
+ if (conn.isMultipleStatementsTaskV3Supported()) {
+ qryTask = new JdbcQueryMultipleStatementsTaskV3(loc ? ignite : null, conn.schemaName(),
+ sql, isQuery, loc, getArgs(), fetchSize, conn.isLocalQuery(),
+ conn.isCollocatedQuery(), conn.isDistributedJoins(), conn.isEnforceJoinOrder(), conn.isLazy(),
+ conn.isMultipleStatementsAllowed(), conn.clientDescriptor());
+ }
+ else if (!conn.isMultipleStatementsAllowed() && conn.isMultipleStatementsTaskV2Supported()) {
qryTask = new JdbcQueryMultipleStatementsNotAllowTask(loc ? ignite : null, conn.schemaName(),
sql, isQuery, loc, getArgs(), fetchSize, conn.isLocalQuery(), conn.isCollocatedQuery(),
conn.isDistributedJoins(), conn.isEnforceJoinOrder(), conn.isLazy());
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStreamedPreparedStatement.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStreamedPreparedStatement.java
index 9b838ad..49a4e83 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStreamedPreparedStatement.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStreamedPreparedStatement.java
@@ -53,7 +53,7 @@ class JdbcStreamedPreparedStatement extends JdbcPreparedStatement {
assert isQuery == null || !isQuery;
long updCnt = conn.ignite().context().query().streamUpdateQuery(conn.cacheName(), conn.schemaName(),
- streamer, sql, getArgs());
+ streamer, sql, getArgs(), conn.clientDescriptor());
JdbcResultSet rs = new JdbcResultSet(this, updCnt);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/systemview/walker/SqlQueryViewWalker.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/systemview/walker/SqlQueryViewWalker.java
index c7f08bf..83e6425 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/systemview/walker/SqlQueryViewWalker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/systemview/walker/SqlQueryViewWalker.java
@@ -38,6 +38,7 @@ public class SqlQueryViewWalker implements SystemViewRowAttributeWalker<SqlQuery
v.accept(4, "duration", long.class);
v.accept(5, "local", boolean.class);
v.accept(6, "schemaName", String.class);
+ v.accept(7, "initiatorId", String.class);
}
/** {@inheritDoc} */
@@ -49,10 +50,11 @@ public class SqlQueryViewWalker implements SystemViewRowAttributeWalker<SqlQuery
v.acceptLong(4, "duration", row.duration());
v.acceptBoolean(5, "local", row.local());
v.accept(6, "schemaName", String.class, row.schemaName());
+ v.accept(7, "initiatorId", String.class, row.initiatorId());
}
/** {@inheritDoc} */
@Override public int count() {
- return 7;
+ return 8;
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
index 1ff0daa..fa8add3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
@@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.compute.ComputeExecutionRejectedException;
import org.apache.ignite.compute.ComputeJob;
@@ -536,6 +537,8 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
// Make sure flag is not set for current thread.
HOLD.set(false);
+ SqlFieldsQuery.setThreadedQueryInitiatorId("task:" + ses.getTaskName() + ":" + getJobId());
+
try {
if (partsReservation != null) {
try {
@@ -672,6 +675,8 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
}
}
finally {
+ SqlFieldsQuery.resetThreadedQueryInitiatorId();
+
if (partsReservation != null)
partsReservation.release();
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerAbstractConnectionContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerAbstractConnectionContext.java
index 3682596..81c61dc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerAbstractConnectionContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerAbstractConnectionContext.java
@@ -40,6 +40,9 @@ public abstract class ClientListenerAbstractConnectionContext implements ClientL
/** Kernal context. */
protected final GridKernalContext ctx;
+ /** Nio session. */
+ protected final GridNioSession ses;
+
/** Security context or {@code null} if security is disabled. */
private SecurityContext secCtx;
@@ -53,14 +56,27 @@ public abstract class ClientListenerAbstractConnectionContext implements ClientL
protected Map<String, String> userAttrs;
/**
+ * Describes the client connection:
+ * - thin cli: "cli:host:port@user_name"
+ * - thin JDBC: "jdbc-thin:host:port@user_name"
+ * - ODBC: "odbc:host:port@user_name"
+ *
+ * Used by the running query view to display query initiator.
+ */
+ private String clientDesc;
+
+ /**
* Constructor.
*
* @param ctx Kernal context.
- * @param connId Connection id.
+ * @param ses Client's NIO session.
+ * @param connId Connection ID.
*/
- protected ClientListenerAbstractConnectionContext(GridKernalContext ctx, long connId) {
+ protected ClientListenerAbstractConnectionContext(
+ GridKernalContext ctx, GridNioSession ses, long connId) {
this.ctx = ctx;
this.connId = connId;
+ this.ses = ses;
}
/**
@@ -142,4 +158,26 @@ public abstract class ClientListenerAbstractConnectionContext implements ClientL
if (ctx.security().enabled())
ctx.security().onSessionExpired(secCtx.subject().id());
}
+
+ /** */
+ protected void initClientDescriptor(String prefix) {
+ clientDesc = prefix + ":" + ses.remoteAddress().getHostString() + ":" + ses.remoteAddress().getPort();
+
+ if (authCtx != null)
+ clientDesc += "@" + authCtx.userName();
+ }
+
+ /**
+ * Describes the client connection:
+ * - thin cli: "cli:host:port@user_name"
+ * - thin JDBC: "jdbc-thin:host:port@user_name"
+ * - ODBC: "odbc:host:port@user_name"
+ *
+ * Used by the running query view to display query initiator.
+ *
+ * @return Client descriptor string.
+ */
+ public String clientDescriptor() {
+ return clientDesc;
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioListener.java
index 393383d..c88844b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioListener.java
@@ -321,7 +321,7 @@ public class ClientListenerNioListener extends GridNioServerListenerAdapter<Clie
ClientListenerConnectionContext connCtx = null;
try {
- connCtx = prepareContext(clientType);
+ connCtx = prepareContext(clientType, ses);
ensureClientPermissions(clientType);
@@ -377,22 +377,24 @@ public class ClientListenerNioListener extends GridNioServerListenerAdapter<Clie
/**
* Prepare context.
*
+ * @param ses Client's NIO session.
* @param clientType Client type.
* @return Context.
* @throws IgniteCheckedException If failed.
*/
- private ClientListenerConnectionContext prepareContext(byte clientType) throws IgniteCheckedException {
+ private ClientListenerConnectionContext prepareContext(byte clientType, GridNioSession ses)
+ throws IgniteCheckedException {
long connId = nextConnectionId();
switch (clientType) {
case ODBC_CLIENT:
- return new OdbcConnectionContext(ctx, busyLock, connId, maxCursors);
+ return new OdbcConnectionContext(ctx, ses, busyLock, connId, maxCursors);
case JDBC_CLIENT:
- return new JdbcConnectionContext(ctx, busyLock, connId, maxCursors);
+ return new JdbcConnectionContext(ctx, ses, busyLock, connId, maxCursors);
case THIN_CLIENT:
- return new ClientConnectionContext(ctx, connId, maxCursors, thinCfg);
+ return new ClientConnectionContext(ctx, ses, connId, maxCursors, thinCfg);
}
throw new IgniteCheckedException("Unknown client type: " + clientType);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java
index 64cf609..695166b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java
@@ -110,13 +110,14 @@ public class JdbcConnectionContext extends ClientListenerAbstractConnectionConte
/**
* Constructor.
* @param ctx Kernal Context.
+ * @param ses Client's NIO session.
* @param busyLock Shutdown busy lock.
* @param connId Connection ID.
* @param maxCursors Maximum allowed cursors.
*/
- public JdbcConnectionContext(GridKernalContext ctx, GridSpinBusyLock busyLock, long connId,
+ public JdbcConnectionContext(GridKernalContext ctx, GridNioSession ses, GridSpinBusyLock busyLock, long connId,
int maxCursors) {
- super(ctx, connId);
+ super(ctx, ses, connId);
this.busyLock = busyLock;
this.maxCursors = maxCursors;
@@ -208,6 +209,8 @@ public class JdbcConnectionContext extends ClientListenerAbstractConnectionConte
protoCtx = new JdbcProtocolContext(ver, features, true);
+ initClientDescriptor("jdbc-thin");
+
parser = new JdbcMessageParser(ctx, protoCtx);
ClientListenerResponseSender sender = new ClientListenerResponseSender() {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java
index ecf6fbf..224a9ae 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java
@@ -1008,6 +1008,7 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler {
qry.setLazy(cliCtx.isLazy());
qry.setNestedTxMode(nestedTxMode);
qry.setSchema(schemaName);
+ qry.setQueryInitiatorId(connCtx.clientDescriptor());
if (cliCtx.updateBatchSize() != null)
qry.setUpdateBatchSize(cliCtx.updateBatchSize());
@@ -1031,7 +1032,8 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler {
qry.getSchema(),
cliCtx,
qry.getSql(),
- qry.batchedArguments()
+ qry.batchedArguments(),
+ connCtx.clientDescriptor()
);
for (int i = 0; i < cnt.size(); i++)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcConnectionContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcConnectionContext.java
index 5b78b6e..0a94d24 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcConnectionContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcConnectionContext.java
@@ -93,12 +93,13 @@ public class OdbcConnectionContext extends ClientListenerAbstractConnectionConte
/**
* Constructor.
* @param ctx Kernal Context.
+ * @param ses Client's NIO session.
* @param busyLock Shutdown busy lock.
* @param connId Connection ID.
* @param maxCursors Maximum allowed cursors.
*/
- public OdbcConnectionContext(GridKernalContext ctx, GridSpinBusyLock busyLock, long connId, int maxCursors) {
- super(ctx, connId);
+ public OdbcConnectionContext(GridKernalContext ctx, GridNioSession ses, GridSpinBusyLock busyLock, long connId, int maxCursors) {
+ super(ctx, ses, connId);
this.busyLock = busyLock;
this.maxCursors = maxCursors;
@@ -166,8 +167,10 @@ public class OdbcConnectionContext extends ClientListenerAbstractConnectionConte
}
};
+ initClientDescriptor("odbc");
+
handler = new OdbcRequestHandler(ctx, busyLock, sender, maxCursors, distributedJoins, enforceJoinOrder,
- replicatedOnly, collocated, lazy, skipReducerOnUpdate, actx, nestedTxMode, ver);
+ replicatedOnly, collocated, lazy, skipReducerOnUpdate, actx, nestedTxMode, ver, this);
parser = new OdbcMessageParser(ctx, ver);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcRequestHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcRequestHandler.java
index 8cc24a1..f80aac3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcRequestHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcRequestHandler.java
@@ -128,6 +128,9 @@ public class OdbcRequestHandler implements ClientListenerRequestHandler {
/** Response sender. */
private final ClientListenerResponseSender sender;
+ /** Connection context. */
+ private final OdbcConnectionContext connCtx;
+
/**
* Constructor.
* @param ctx Context.
@@ -155,8 +158,12 @@ public class OdbcRequestHandler implements ClientListenerRequestHandler {
boolean collocated,
boolean lazy,
boolean skipReducerOnUpdate,
- AuthorizationContext actx, NestedTxMode nestedTxMode, ClientListenerProtocolVersion ver) {
+ AuthorizationContext actx,
+ NestedTxMode nestedTxMode,
+ ClientListenerProtocolVersion ver,
+ OdbcConnectionContext connCtx) {
this.ctx = ctx;
+ this.connCtx = connCtx;
Factory<GridWorker> orderedFactory = new Factory<GridWorker>() {
@Override public GridWorker create() {
@@ -376,6 +383,7 @@ public class OdbcRequestHandler implements ClientListenerRequestHandler {
qry.setSchema(OdbcUtils.prepareSchema(schema));
qry.setSkipReducerOnUpdate(cliCtx.isSkipReducerOnUpdate());
qry.setNestedTxMode(nestedTxMode);
+ qry.setQueryInitiatorId(connCtx.clientDescriptor());
return qry;
}
@@ -581,7 +589,8 @@ public class OdbcRequestHandler implements ClientListenerRequestHandler {
OdbcUtils.prepareSchema(qry.getSchema()),
cliCtx,
qry.getSql(),
- qry.batchedArguments()
+ qry.batchedArguments(),
+ connCtx.clientDescriptor()
);
}
catch (Exception e) {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientConnectionContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientConnectionContext.java
index aea4a7c..2a3b5dd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientConnectionContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientConnectionContext.java
@@ -144,8 +144,8 @@ public class ClientConnectionContext extends ClientListenerAbstractConnectionCon
* @param maxCursors Max active cursors.
* @param thinCfg Thin-client configuration.
*/
- public ClientConnectionContext(GridKernalContext ctx, long connId, int maxCursors, ThinClientConfiguration thinCfg) {
- super(ctx, connId);
+ public ClientConnectionContext(GridKernalContext ctx, GridNioSession ses, long connId, int maxCursors, ThinClientConfiguration thinCfg) {
+ super(ctx, ses, connId);
this.maxCursors = maxCursors;
maxActiveTxCnt = thinCfg.getMaxActiveTxPerConnection();
@@ -216,6 +216,8 @@ public class ClientConnectionContext extends ClientListenerAbstractConnectionCon
AuthorizationContext authCtx = authenticate(ses, user, pwd);
+ initClientDescriptor("cli");
+
handler = new ClientRequestHandler(this, authCtx, currentProtocolContext);
parser = new ClientMessageParser(this, currentProtocolContext);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheSqlFieldsQueryRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheSqlFieldsQueryRequest.java
index 6a701f8..ed0cb79 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheSqlFieldsQueryRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheSqlFieldsQueryRequest.java
@@ -131,6 +131,8 @@ public class ClientCacheSqlFieldsQueryRequest extends ClientCacheDataRequest imp
ctx.incrementCursors();
try {
+ qry.setQueryInitiatorId(ctx.clientDescriptor());
+
// If cacheId is provided, we must check the cache for existence.
if (cacheId() != 0) {
DynamicCacheDescriptor desc = cacheDescriptor(ctx);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
index fe13d81..404895c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
@@ -105,11 +105,12 @@ public interface GridQueryIndexing {
* @param qry Query.
* @param params Query parameters.
* @param streamer Data streamer to feed data to.
+ * @param qryInitiatorId Query initiator ID.
* @return Update counter.
* @throws IgniteCheckedException If failed.
*/
public long streamUpdateQuery(String schemaName, String qry, @Nullable Object[] params,
- IgniteDataStreamer<?, ?> streamer) throws IgniteCheckedException;
+ IgniteDataStreamer<?, ?> streamer, String qryInitiatorId) throws IgniteCheckedException;
/**
* Execute a batched INSERT statement using data streamer as receiver.
@@ -118,11 +119,12 @@ public interface GridQueryIndexing {
* @param qry Query.
* @param params Query parameters.
* @param cliCtx Client connection context.
+ * @param qryInitiatorId Query initiator ID.
* @return Update counters.
* @throws IgniteCheckedException If failed.
*/
public List<Long> streamBatchedUpdateQuery(String schemaName, String qry, List<Object[]> params,
- SqlClientContext cliCtx) throws IgniteCheckedException;
+ SqlClientContext cliCtx, String qryInitiatorId) throws IgniteCheckedException;
/**
* Executes text query.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 92b99d8..f335444 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -2912,7 +2912,8 @@ public class GridQueryProcessor extends GridProcessorAdapter {
* @return Update counter.
*/
public long streamUpdateQuery(@Nullable final String cacheName, final String schemaName,
- final IgniteDataStreamer<?, ?> streamer, final String qry, final Object[] args) {
+ final IgniteDataStreamer<?, ?> streamer, final String qry, final Object[] args,
+ String qryInitiatorId) {
assert streamer != null;
if (!busyLock.enterBusy())
@@ -2923,7 +2924,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
return executeQuery(GridCacheQueryType.SQL_FIELDS, qry, cctx, new IgniteOutClosureX<Long>() {
@Override public Long applyx() throws IgniteCheckedException {
- return idx.streamUpdateQuery(schemaName, qry, args, streamer);
+ return idx.streamUpdateQuery(schemaName, qry, args, streamer, qryInitiatorId);
}
}, true);
}
@@ -2943,7 +2944,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
* @return Update counters.
*/
public List<Long> streamBatchedUpdateQuery(final String schemaName, final SqlClientContext cliCtx,
- final String qry, final List<Object[]> args) {
+ final String qry, final List<Object[]> args, String qryInitiatorId) {
checkxEnabled();
if (!busyLock.enterBusy())
@@ -2952,7 +2953,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
try {
return executeQuery(GridCacheQueryType.SQL_FIELDS, qry, null, new IgniteOutClosureX<List<Long>>() {
@Override public List<Long> applyx() throws IgniteCheckedException {
- return idx.streamBatchedUpdateQuery(schemaName, qry, args, cliCtx);
+ return idx.streamBatchedUpdateQuery(schemaName, qry, args, cliCtx, qryInitiatorId);
}
}, true);
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java
index e8906500..2b21960 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java
@@ -59,6 +59,9 @@ public class GridRunningQueryInfo {
/** Span of the running query. */
private final Span span;
+ /** Originator. */
+ private final String qryInitiatorId;
+
/** Request ID. */
private long reqId;
@@ -84,7 +87,8 @@ public class GridRunningQueryInfo {
long startTime,
long startTimeNanos,
GridQueryCancel cancel,
- boolean loc
+ boolean loc,
+ String qryInitiatorId
) {
this.id = id;
this.nodeId = nodeId;
@@ -96,6 +100,7 @@ public class GridRunningQueryInfo {
this.cancel = cancel;
this.loc = loc;
this.span = MTC.span();
+ this.qryInitiatorId = qryInitiatorId;
}
/**
@@ -204,6 +209,14 @@ public class GridRunningQueryInfo {
return reqId;
}
+ /**
+ * @return Query's originator string (client host+port, user name,
+ * job name or any user's information about query initiator).
+ */
+ public String queryInitiatorId() {
+ return qryInitiatorId;
+ }
+
/** @param reqId Request ID. */
public void requestId(long reqId) {
this.reqId = reqId;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/RunningQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/RunningQueryManager.java
index 98af2dd..d6ae245 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/RunningQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/RunningQueryManager.java
@@ -26,6 +26,8 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.configuration.SqlConfiguration;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.managers.systemview.walker.SqlQueryHistoryViewWalker;
@@ -145,9 +147,13 @@ public class RunningQueryManager {
* @return Id of registered query.
*/
public Long register(String qry, GridCacheQueryType qryType, String schemaName, boolean loc,
- @Nullable GridQueryCancel cancel) {
+ @Nullable GridQueryCancel cancel,
+ String qryInitiatorId) {
long qryId = qryIdGen.incrementAndGet();
+ if (qryInitiatorId == null)
+ qryInitiatorId = SqlFieldsQuery.threadedQueryInitiatorId();
+
final GridRunningQueryInfo run = new GridRunningQueryInfo(
qryId,
localNodeId,
@@ -157,7 +163,8 @@ public class RunningQueryManager {
System.currentTimeMillis(),
ctx.performanceStatistics().enabled() ? System.nanoTime() : 0,
cancel,
- loc
+ loc,
+ qryInitiatorId
);
GridRunningQueryInfo preRun = runs.putIfAbsent(qryId, run);
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/systemview/view/SqlQueryView.java b/modules/core/src/main/java/org/apache/ignite/spi/systemview/view/SqlQueryView.java
index ece63f8..e68d844 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/systemview/view/SqlQueryView.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/systemview/view/SqlQueryView.java
@@ -72,6 +72,12 @@ public class SqlQueryView {
return U.currentTimeMillis() - qry.startTime();
}
+ /** @return Query initiator ID. */
+ @Order(7)
+ public String initiatorId() {
+ return qry.queryInitiatorId();
+ }
+
/** @return {@code True} if query is local. */
public boolean local() {
return qry.local();
diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties
index ea469c3..c20a6c9 100644
--- a/modules/core/src/main/resources/META-INF/classnames.properties
+++ b/modules/core/src/main/resources/META-INF/classnames.properties
@@ -391,6 +391,7 @@ org.apache.ignite.internal.jdbc2.JdbcCloseCursorTask
org.apache.ignite.internal.jdbc2.JdbcConnection$JdbcConnectionValidationTask
org.apache.ignite.internal.jdbc2.JdbcDatabaseMetadata$UpdateMetadataTask
org.apache.ignite.internal.jdbc2.JdbcQueryMultipleStatementsTask
+org.apache.ignite.internal.jdbc2.JdbcQueryMultipleStatementsTaskV3
org.apache.ignite.internal.jdbc2.JdbcQueryTask
org.apache.ignite.internal.jdbc2.JdbcQueryTask$1
org.apache.ignite.internal.jdbc2.JdbcQueryTaskResult
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/query/DummyQueryIndexing.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/query/DummyQueryIndexing.java
index c48517e..d7b9856 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/query/DummyQueryIndexing.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/query/DummyQueryIndexing.java
@@ -83,7 +83,8 @@ public class DummyQueryIndexing implements GridQueryIndexing {
String schemaName,
String qry,
@Nullable Object[] params,
- IgniteDataStreamer<?, ?> streamer
+ IgniteDataStreamer<?, ?> streamer,
+ String qryInitiatorId
) throws IgniteCheckedException {
return 0;
}
@@ -93,7 +94,8 @@ public class DummyQueryIndexing implements GridQueryIndexing {
String schemaName,
String qry,
List<Object[]> params,
- SqlClientContext cliCtx
+ SqlClientContext cliCtx,
+ String qryInitiatorId
) throws IgniteCheckedException {
return null;
}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 88a375b..50d9970 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -507,7 +507,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
H2TableDescriptor tbl = schemaMgr.tableForType(schemaName, cacheName, typeName);
if (tbl != null && tbl.luceneIndex() != null) {
- Long qryId = runningQueryManager().register(qry, TEXT, schemaName, true, null);
+ Long qryId = runningQueryManager().register(qry, TEXT, schemaName, true, null, null);
try {
return tbl.luceneIndex().query(qry.toUpperCase(), filters, limit);
@@ -658,11 +658,12 @@ public class IgniteH2Indexing implements GridQueryIndexing {
String schemaName,
String qry,
@Nullable Object[] params,
- IgniteDataStreamer<?, ?> streamer
+ IgniteDataStreamer<?, ?> streamer,
+ String qryInitiatorId
) throws IgniteCheckedException {
QueryParserResultDml dml = streamerParse(schemaName, qry);
- return streamQuery0(qry, schemaName, streamer, dml, params);
+ return streamQuery0(qry, schemaName, streamer, dml, params, qryInitiatorId);
}
/** {@inheritDoc} */
@@ -671,7 +672,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
String schemaName,
String qry,
List<Object[]> params,
- SqlClientContext cliCtx
+ SqlClientContext cliCtx,
+ String qryInitiatorId
) throws IgniteCheckedException {
if (cliCtx == null || !cliCtx.isStream()) {
U.warn(log, "Connection is not in streaming mode.");
@@ -688,7 +690,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
List<Long> ress = new ArrayList<>(params.size());
for (int i = 0; i < params.size(); i++) {
- long res = streamQuery0(qry, schemaName, streamer, dml, params.get(i));
+ long res = streamQuery0(qry, schemaName, streamer, dml, params.get(i), qryInitiatorId);
ress.add(res);
}
@@ -709,13 +711,14 @@ public class IgniteH2Indexing implements GridQueryIndexing {
*/
@SuppressWarnings({"unchecked"})
private long streamQuery0(String qry, String schemaName, IgniteDataStreamer streamer, QueryParserResultDml dml,
- final Object[] args) throws IgniteCheckedException {
+ final Object[] args, String qryInitiatorId) throws IgniteCheckedException {
Long qryId = runningQryMgr.register(
QueryUtils.INCLUDE_SENSITIVE ? qry : sqlWithoutConst(dml.statement()),
GridCacheQueryType.SQL_FIELDS,
schemaName,
true,
- null
+ null,
+ qryInitiatorId
);
Exception failReason = null;
@@ -1577,7 +1580,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
GridCacheQueryType.SQL_FIELDS,
qryDesc.schemaName(),
qryDesc.local(),
- cancel
+ cancel,
+ qryDesc.queryInitiatorId()
);
if (ctx.event().isRecordable(EVT_SQL_QUERY_EXECUTION)) {
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryDescriptor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryDescriptor.java
index 2b1ceaf..372352a 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryDescriptor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryDescriptor.java
@@ -45,6 +45,9 @@ public class QueryDescriptor {
/** Batched flag. */
private final boolean batched;
+ /** Query initiator identifier. */
+ private final String qryInitiatorId;
+
/**
* @param schemaName Schema name.
* @param sql Sql.
@@ -62,7 +65,8 @@ public class QueryDescriptor {
boolean enforceJoinOrder,
boolean loc,
boolean skipReducerOnUpdate,
- boolean batched
+ boolean batched,
+ String qryInitiatorId
) {
this.schemaName = schemaName;
this.sql = sql;
@@ -72,6 +76,7 @@ public class QueryDescriptor {
this.loc = loc;
this.skipReducerOnUpdate = skipReducerOnUpdate;
this.batched = batched;
+ this.qryInitiatorId = qryInitiatorId;
}
/**
@@ -130,6 +135,13 @@ public class QueryDescriptor {
return batched;
}
+ /**
+ * @return Query's originator.
+ */
+ public String queryInitiatorId() {
+ return qryInitiatorId;
+ }
+
/** {@inheritDoc} */
@SuppressWarnings("SimplifiableIfStatement")
@Override public boolean equals(Object o) {
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParser.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParser.java
index b1e5bd1..9f01516 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParser.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParser.java
@@ -802,7 +802,8 @@ public class QueryParser {
qry.isEnforceJoinOrder(),
qry.isLocal(),
skipReducerOnUpdate,
- batched
+ batched,
+ qry.getQueryInitiatorId()
);
}
}
diff --git a/modules/spark-2.4/src/main/scala/org/apache/ignite/spark/impl/QueryHelper.scala b/modules/spark-2.4/src/main/scala/org/apache/ignite/spark/impl/QueryHelper.scala
index d123b01..625f449 100644
--- a/modules/spark-2.4/src/main/scala/org/apache/ignite/spark/impl/QueryHelper.scala
+++ b/modules/spark-2.4/src/main/scala/org/apache/ignite/spark/impl/QueryHelper.scala
@@ -192,7 +192,7 @@ private[apache] object QueryHelper {
}
qryProcessor.streamUpdateQuery(tblInfo.cacheName,
- tblInfo.schemaName, streamer, insertQry, args.toArray)
+ tblInfo.schemaName, streamer, insertQry, args.toArray, "spark")
}
}
finally {
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/QueryHelper.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/QueryHelper.scala
index d123b01..625f449 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/QueryHelper.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/QueryHelper.scala
@@ -192,7 +192,7 @@ private[apache] object QueryHelper {
}
qryProcessor.streamUpdateQuery(tblInfo.cacheName,
- tblInfo.schemaName, streamer, insertQry, args.toArray)
+ tblInfo.schemaName, streamer, insertQry, args.toArray, "spark")
}
}
finally {