You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by tl...@apache.org on 2021/04/05 13:10:12 UTC
[ignite] branch master updated: IGNITE-14471 JDBCv2: fix query
cursors leak when node to execute queries is specified (#8966)
This is an automated email from the ASF dual-hosted git repository.
tledkov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 1d28b39 IGNITE-14471 JDBCv2: fix query cursors leak when node to execute queries is specified (#8966)
1d28b39 is described below
commit 1d28b39267ef3342f135ea0bea3d57cd0168070e
Author: tledkov <tl...@gridgain.com>
AuthorDate: Mon Apr 5 16:09:54 2021 +0300
IGNITE-14471 JDBCv2: fix query cursors leak when node to execute queries is specified (#8966)
---
.../ignite/internal/jdbc2/JdbcCursorLeaksTest.java | 202 +++++++++++++++++++++
.../jdbc/suite/IgniteJdbcDriverTestSuite.java | 1 +
.../ignite/internal/jdbc2/JdbcCloseCursorTask.java | 46 +++++
.../ignite/internal/jdbc2/JdbcConnection.java | 11 ++
.../ignite/internal/jdbc2/JdbcResultSet.java | 18 +-
.../ignite/internal/jdbc2/JdbcStatement.java | 21 +--
.../main/resources/META-INF/classnames.properties | 1 +
7 files changed, 276 insertions(+), 24 deletions(-)
diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcCursorLeaksTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcCursorLeaksTest.java
new file mode 100644
index 0000000..91b4bf5
--- /dev/null
+++ b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcCursorLeaksTest.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.jdbc2;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.Statement;
+import java.util.Collection;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.cache.query.FieldsQueryCursor;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.index.AbstractIndexingCommonTest;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.ignite.IgniteJdbcDriver.CFG_URL_PREFIX;
+
+/**
+ * Test for cursors leak on JDBC v2.
+ */
+@RunWith(Parameterized.class)
+public class JdbcCursorLeaksTest extends AbstractIndexingCommonTest {
+ /** Keys count. */
+ private static final int KEYS = 100;
+
+ /** */
+ @Parameterized.Parameter
+ public boolean remote;
+
+ /** */
+ @Parameterized.Parameter(1)
+ public boolean multipleStatement;
+
+ /** */
+ @Parameterized.Parameter(2)
+ public boolean distributedJoin;
+
+ /**
+ * @return Test parameters.
+ */
+ @Parameterized.Parameters(name = "remote={0}, multipleStatement={1}, distributedJoin={2}")
+ public static Collection parameters() {
+ Set<Object[]> paramsSet = new LinkedHashSet<>();
+
+ for (int i = 0; i < 8; ++i) {
+ Object[] params = new Object[3];
+
+ params[0] = (i & 1) != 0;
+ params[1] = (i & 2) != 0;
+ params[2] = (i & 4) != 0;
+
+ paramsSet.add(params);
+ }
+
+ return paramsSet;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ startGrids(3);
+
+ sql("CREATE TABLE A(ID INT PRIMARY KEY, JID INT)");
+ sql("CREATE TABLE B(ID INT PRIMARY KEY, JID INT)");
+
+ for (int i = 0; i < KEYS; ++i) {
+ sql("INSERT INTO A VALUES (?, ?)", i, i);
+ sql("INSERT INTO B VALUES (?, ?)", i, i + 1);
+ }
+ }
+
+ /**
+ * @throws Exception On error.
+ */
+ @Test
+ public void testSingleQuery() throws Exception {
+ checkQuery("SELECT 1");
+ }
+
+ /**
+ * @throws Exception On error.
+ */
+ @Test
+ public void testMultipleStatement0() throws Exception {
+ // Skip the test when multiple statement not allowed
+ if (!multipleStatement)
+ return;
+
+ checkQuery("SELECT 1; SELECT 2");
+ }
+
+ /**
+ * @throws Exception On error.
+ */
+ @Test
+ public void testMultipleStatement1() throws Exception {
+ // Skip the test when multiple statement not allowed
+ if (!multipleStatement)
+ return;
+
+ checkQuery("SELECT 1; SELECT 2; SELECT 3");
+ }
+
+ /**
+ * @throws Exception On error.
+ */
+ @Test
+ public void testJoin() throws Exception {
+ checkQuery("SELECT * FROM A JOIN B on A.JID=B.JID");
+ }
+
+ /**
+ * @param sql Query string.
+ * @throws Exception Orn error.
+ */
+ private void checkQuery(String sql) throws Exception {
+ try (Connection conn = DriverManager.getConnection(url())) {
+ for (int i = 0; i < 10; i++) {
+ try (Statement stmt = conn.createStatement()) {
+ stmt.execute(sql);
+ }
+ }
+
+ checkThereAreNoRunningQueries(1000);
+ }
+ }
+
+ /**
+ * @param timeout Timeout to finish running queries.
+ */
+ private void checkThereAreNoRunningQueries(int timeout) {
+ for (Ignite ign : G.allGrids())
+ checkThereAreNoRunningQueries((IgniteEx)ign, timeout);
+ }
+
+ /**
+ * @param ign Noe to check running queries.
+ * @param timeout Timeout to finish running queries.
+ */
+ private void checkThereAreNoRunningQueries(IgniteEx ign, int timeout) {
+ long t0 = U.currentTimeMillis();
+
+ while (true) {
+ List<List<?>> res = ign.context().query().querySqlFields(
+ new SqlFieldsQuery("SELECT * FROM SYS.SQL_QUERIES"), false).getAll();
+
+ if (res.size() == 1)
+ return;
+
+ if (U.currentTimeMillis() - t0 > timeout)
+ fail("Timeout. There are unexpected running queries [node=" + ign.name() + ", queries= " + res + ']');
+ }
+ }
+
+ /**
+ * @return JDBCv2 URL connection string.
+ */
+ private String url() {
+ StringBuilder params = new StringBuilder();
+
+ params.append("multipleStatementsAllowed=").append(multipleStatement);
+ params.append(":");
+ params.append("distributedJoin=").append(distributedJoin);
+ params.append(":");
+
+ if (remote)
+ params.append("nodeId=").append(grid(0).cluster().localNode().id());
+
+ return CFG_URL_PREFIX + params + "@modules/clients/src/test/config/jdbc-config.xml";
+ }
+
+ /**
+ * @param sql SQL query.
+ * @param args Query parameters.
+ * @return Results cursor.
+ */
+ private FieldsQueryCursor<List<?>> sql(String sql, Object... args) {
+ return grid(0).context().query().querySqlFields(new SqlFieldsQuery(sql)
+ .setArgs(args), false);
+ }
+}
diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java
index 36c98a8..a3f6005 100644
--- a/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java
+++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java
@@ -126,6 +126,7 @@ import org.junit.runners.Suite;
JdbcAuthorizationTest.class,
// Ignite client node based driver tests
+ org.apache.ignite.internal.jdbc2.JdbcCursorLeaksTest.class,
org.apache.ignite.internal.jdbc2.JdbcConnectionSelfTest.class,
org.apache.ignite.internal.jdbc2.JdbcSpringSelfTest.class,
org.apache.ignite.internal.jdbc2.JdbcStatementSelfTest.class,
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcCloseCursorTask.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcCloseCursorTask.java
new file mode 100644
index 0000000..f93d6de
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcCloseCursorTask.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.jdbc2;
+
+import java.util.UUID;
+import org.apache.ignite.lang.IgniteCallable;
+
+/**
+ * Task for close query cursor on remote node.
+ */
+class JdbcCloseCursorTask implements IgniteCallable<Void> {
+ /** Serial version uid. */
+ private static final long serialVersionUID = 0L;
+
+ /** Cursor ID to close. */
+ private final UUID curId;
+
+ /**
+ * @param curId Cursor ID.
+ */
+ public JdbcCloseCursorTask(UUID curId) {
+ this.curId = curId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Void call() throws Exception {
+ JdbcQueryTask.remove(curId);
+
+ return null;
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java
index 48befd2..89dc054 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java
@@ -111,6 +111,10 @@ public class JdbcConnection implements Connection {
private static final IgniteProductVersion MULTIPLE_STATEMENTS_TASK_V2_SUPPORTED_SINCE =
IgniteProductVersion.fromString("2.8.0");
+ /** Close remote cursor task is supported since version. {@link JdbcCloseCursorTask}*/
+ private static final IgniteProductVersion CLOSE_CURSOR_TASK_SUPPORTED_SINCE =
+ IgniteProductVersion.fromString("2.11.0");
+
/**
* Ignite nodes cache.
*
@@ -879,6 +883,13 @@ public class JdbcConnection implements Connection {
}
/**
+ * @return {@code true} if close remote cursor is supported.
+ */
+ boolean isCloseCursorTaskSupported() {
+ return U.isOldestNodeVersionAtLeast(CLOSE_CURSOR_TASK_SUPPORTED_SINCE, ignite.cluster().nodes());
+ }
+
+ /**
* @return {@code true} if update on server is enabled, {@code false} otherwise.
*/
boolean skipReducerOnUpdate() {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcResultSet.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcResultSet.java
index 3852453..c6aecd8 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcResultSet.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcResultSet.java
@@ -252,9 +252,6 @@ public class JdbcResultSet implements ResultSet {
/** {@inheritDoc} */
@Override public void close() throws SQLException {
- if (uuid != null)
- stmt.resSets.remove(this);
-
closeInternal();
}
@@ -264,8 +261,19 @@ public class JdbcResultSet implements ResultSet {
* @throws SQLException On error.
*/
void closeInternal() throws SQLException {
- if (((JdbcConnection)stmt.getConnection()).nodeId() == null && uuid != null)
- JdbcQueryTask.remove(uuid);
+ if (uuid != null) {
+ if (((JdbcConnection)stmt.getConnection()).nodeId() == null)
+ JdbcQueryTask.remove(uuid);
+ else {
+ JdbcConnection conn = (JdbcConnection)stmt.getConnection();
+
+ if (conn.isCloseCursorTaskSupported()) {
+ Ignite ignite = conn.ignite();
+
+ ignite.compute(ignite.cluster().forNodeId(conn.nodeId())).call(new JdbcCloseCursorTask(uuid));
+ }
+ }
+ }
closed = true;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStatement.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStatement.java
index fb4d8e9..244a9f7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStatement.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStatement.java
@@ -25,13 +25,9 @@ import java.sql.SQLWarning;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
-import java.util.Map;
-import java.util.Set;
import java.util.UUID;
+
import org.apache.ignite.Ignite;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.internal.processors.odbc.SqlStateCode;
@@ -65,12 +61,6 @@ public class JdbcStatement implements Statement {
/** Fetch size. */
private int fetchSize = DFLT_FETCH_SIZE;
- /** Result sets. */
- final Set<JdbcResultSet> resSets = new HashSet<>();
-
- /** Fields indexes. */
- Map<String, Integer> fieldsIdxs = new HashMap<>();
-
/** Batch of statements. */
private List<String> batch;
@@ -239,13 +229,7 @@ public class JdbcStatement implements Statement {
* @throws SQLException On error.
*/
void closeInternal() throws SQLException {
- for (Iterator<JdbcResultSet> it = resSets.iterator(); it.hasNext(); ) {
- JdbcResultSet rs = it.next();
-
- rs.closeInternal();
-
- it.remove();
- }
+ closeResults();
closed = true;
}
@@ -696,5 +680,4 @@ public class JdbcStatement implements Statement {
curRes = 0;
}
}
-
}
diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties
index 2396d1d..ea469c3 100644
--- a/modules/core/src/main/resources/META-INF/classnames.properties
+++ b/modules/core/src/main/resources/META-INF/classnames.properties
@@ -387,6 +387,7 @@ org.apache.ignite.internal.jdbc.thin.ConnectionPropertiesImpl$NumberProperty
org.apache.ignite.internal.jdbc.thin.ConnectionPropertiesImpl$PropertyValidator
org.apache.ignite.internal.jdbc.thin.ConnectionPropertiesImpl$StringProperty
org.apache.ignite.internal.jdbc2.JdbcBatchUpdateTask
+org.apache.ignite.internal.jdbc2.JdbcCloseCursorTask
org.apache.ignite.internal.jdbc2.JdbcConnection$JdbcConnectionValidationTask
org.apache.ignite.internal.jdbc2.JdbcDatabaseMetadata$UpdateMetadataTask
org.apache.ignite.internal.jdbc2.JdbcQueryMultipleStatementsTask