You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by tl...@apache.org on 2021/04/05 13:10:12 UTC

[ignite] branch master updated: IGNITE-14471 JDBCv2: fix query cursors leak when node to execute queries is specified (#8966)

This is an automated email from the ASF dual-hosted git repository.

tledkov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 1d28b39  IGNITE-14471 JDBCv2: fix query cursors leak when node to execute queries is specified (#8966)
1d28b39 is described below

commit 1d28b39267ef3342f135ea0bea3d57cd0168070e
Author: tledkov <tl...@gridgain.com>
AuthorDate: Mon Apr 5 16:09:54 2021 +0300

    IGNITE-14471 JDBCv2: fix query cursors leak when node to execute queries is specified (#8966)
---
 .../ignite/internal/jdbc2/JdbcCursorLeaksTest.java | 202 +++++++++++++++++++++
 .../jdbc/suite/IgniteJdbcDriverTestSuite.java      |   1 +
 .../ignite/internal/jdbc2/JdbcCloseCursorTask.java |  46 +++++
 .../ignite/internal/jdbc2/JdbcConnection.java      |  11 ++
 .../ignite/internal/jdbc2/JdbcResultSet.java       |  18 +-
 .../ignite/internal/jdbc2/JdbcStatement.java       |  21 +--
 .../main/resources/META-INF/classnames.properties  |   1 +
 7 files changed, 276 insertions(+), 24 deletions(-)

diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcCursorLeaksTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcCursorLeaksTest.java
new file mode 100644
index 0000000..91b4bf5
--- /dev/null
+++ b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcCursorLeaksTest.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.jdbc2;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.Statement;
+import java.util.Collection;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.cache.query.FieldsQueryCursor;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.index.AbstractIndexingCommonTest;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.ignite.IgniteJdbcDriver.CFG_URL_PREFIX;
+
+/**
+ * Test for cursors leak on JDBC v2.
+ */
+@RunWith(Parameterized.class)
+public class JdbcCursorLeaksTest extends AbstractIndexingCommonTest {
+    /** Keys count. */
+    private static final int KEYS = 100;
+
+    /** */
+    @Parameterized.Parameter
+    public boolean remote;
+
+    /** */
+    @Parameterized.Parameter(1)
+    public boolean multipleStatement;
+
+    /** */
+    @Parameterized.Parameter(2)
+    public boolean distributedJoin;
+
+    /**
+     * @return Test parameters.
+     */
+    @Parameterized.Parameters(name = "remote={0}, multipleStatement={1}, distributedJoin={2}")
+    public static Collection parameters() {
+        Set<Object[]> paramsSet = new LinkedHashSet<>();
+
+        for (int i = 0; i < 8; ++i) {
+            Object[] params = new Object[3];
+
+            params[0] = (i & 1) != 0;
+            params[1] = (i & 2) != 0;
+            params[2] = (i & 4) != 0;
+
+            paramsSet.add(params);
+        }
+
+        return paramsSet;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        startGrids(3);
+
+        sql("CREATE TABLE A(ID INT PRIMARY KEY, JID INT)");
+        sql("CREATE TABLE B(ID INT PRIMARY KEY, JID INT)");
+
+        for (int i = 0; i < KEYS; ++i) {
+            sql("INSERT INTO A VALUES (?, ?)", i, i);
+            sql("INSERT INTO B VALUES (?, ?)", i, i + 1);
+        }
+    }
+
+    /**
+     * @throws Exception On error.
+     */
+    @Test
+    public void testSingleQuery() throws Exception {
+        checkQuery("SELECT 1");
+    }
+
+    /**
+     * @throws Exception On error.
+     */
+    @Test
+    public void testMultipleStatement0() throws Exception {
+        // Skip the test when multiple statement not allowed
+        if (!multipleStatement)
+            return;
+
+        checkQuery("SELECT 1; SELECT 2");
+    }
+
+    /**
+     * @throws Exception On error.
+     */
+    @Test
+    public void testMultipleStatement1() throws Exception {
+        // Skip the test when multiple statement not allowed
+        if (!multipleStatement)
+            return;
+
+        checkQuery("SELECT 1; SELECT 2; SELECT 3");
+    }
+
+    /**
+     * @throws Exception On error.
+     */
+    @Test
+    public void testJoin() throws Exception {
+        checkQuery("SELECT * FROM A JOIN B on A.JID=B.JID");
+    }
+
+    /**
+     * @param sql Query string.
+     * @throws Exception Orn error.
+     */
+    private void checkQuery(String sql) throws Exception {
+        try (Connection conn = DriverManager.getConnection(url())) {
+            for (int i = 0; i < 10; i++) {
+                try (Statement stmt = conn.createStatement()) {
+                    stmt.execute(sql);
+                }
+            }
+
+            checkThereAreNoRunningQueries(1000);
+        }
+    }
+
+    /**
+     * @param timeout Timeout to finish running queries.
+     */
+    private void checkThereAreNoRunningQueries(int timeout) {
+        for (Ignite ign : G.allGrids())
+            checkThereAreNoRunningQueries((IgniteEx)ign, timeout);
+    }
+
+    /**
+     * @param ign Noe to check running queries.
+     * @param timeout Timeout to finish running queries.
+     */
+    private void checkThereAreNoRunningQueries(IgniteEx ign, int timeout) {
+        long t0 = U.currentTimeMillis();
+
+        while (true) {
+            List<List<?>> res = ign.context().query().querySqlFields(
+                new SqlFieldsQuery("SELECT * FROM SYS.SQL_QUERIES"), false).getAll();
+
+            if (res.size() == 1)
+                return;
+
+            if (U.currentTimeMillis() - t0 > timeout)
+                fail("Timeout. There are unexpected running queries [node=" + ign.name() + ", queries= " + res + ']');
+        }
+    }
+
+    /**
+     * @return JDBCv2 URL connection string.
+     */
+    private String url() {
+        StringBuilder params = new StringBuilder();
+
+        params.append("multipleStatementsAllowed=").append(multipleStatement);
+        params.append(":");
+        params.append("distributedJoin=").append(distributedJoin);
+        params.append(":");
+
+        if (remote)
+            params.append("nodeId=").append(grid(0).cluster().localNode().id());
+
+        return CFG_URL_PREFIX + params + "@modules/clients/src/test/config/jdbc-config.xml";
+    }
+
+    /**
+     * @param sql SQL query.
+     * @param args Query parameters.
+     * @return Results cursor.
+     */
+    private FieldsQueryCursor<List<?>> sql(String sql, Object... args) {
+        return grid(0).context().query().querySqlFields(new SqlFieldsQuery(sql)
+            .setArgs(args), false);
+    }
+}
diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java
index 36c98a8..a3f6005 100644
--- a/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java
+++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java
@@ -126,6 +126,7 @@ import org.junit.runners.Suite;
     JdbcAuthorizationTest.class,
 
     // Ignite client node based driver tests
+    org.apache.ignite.internal.jdbc2.JdbcCursorLeaksTest.class,
     org.apache.ignite.internal.jdbc2.JdbcConnectionSelfTest.class,
     org.apache.ignite.internal.jdbc2.JdbcSpringSelfTest.class,
     org.apache.ignite.internal.jdbc2.JdbcStatementSelfTest.class,
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcCloseCursorTask.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcCloseCursorTask.java
new file mode 100644
index 0000000..f93d6de
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcCloseCursorTask.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.jdbc2;
+
+import java.util.UUID;
+import org.apache.ignite.lang.IgniteCallable;
+
+/**
+ * Task for close query cursor on remote node.
+ */
+class JdbcCloseCursorTask implements IgniteCallable<Void> {
+    /** Serial version uid. */
+    private static final long serialVersionUID = 0L;
+
+    /** Cursor ID to close. */
+    private final UUID curId;
+
+    /**
+     * @param curId Cursor ID.
+     */
+    public JdbcCloseCursorTask(UUID curId) {
+        this.curId = curId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Void call() throws Exception {
+        JdbcQueryTask.remove(curId);
+
+        return null;
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java
index 48befd2..89dc054 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java
@@ -111,6 +111,10 @@ public class JdbcConnection implements Connection {
     private static final IgniteProductVersion MULTIPLE_STATEMENTS_TASK_V2_SUPPORTED_SINCE =
         IgniteProductVersion.fromString("2.8.0");
 
+    /** Close remote cursor task is supported since version. {@link JdbcCloseCursorTask}*/
+    private static final IgniteProductVersion CLOSE_CURSOR_TASK_SUPPORTED_SINCE =
+        IgniteProductVersion.fromString("2.11.0");
+
     /**
      * Ignite nodes cache.
      *
@@ -879,6 +883,13 @@ public class JdbcConnection implements Connection {
     }
 
     /**
+     * @return {@code true} if close remote cursor is supported.
+     */
+    boolean isCloseCursorTaskSupported() {
+        return U.isOldestNodeVersionAtLeast(CLOSE_CURSOR_TASK_SUPPORTED_SINCE, ignite.cluster().nodes());
+    }
+
+    /**
      * @return {@code true} if update on server is enabled, {@code false} otherwise.
      */
     boolean skipReducerOnUpdate() {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcResultSet.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcResultSet.java
index 3852453..c6aecd8 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcResultSet.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcResultSet.java
@@ -252,9 +252,6 @@ public class JdbcResultSet implements ResultSet {
 
     /** {@inheritDoc} */
     @Override public void close() throws SQLException {
-        if (uuid != null)
-            stmt.resSets.remove(this);
-
         closeInternal();
     }
 
@@ -264,8 +261,19 @@ public class JdbcResultSet implements ResultSet {
      * @throws SQLException On error.
      */
     void closeInternal() throws SQLException {
-        if (((JdbcConnection)stmt.getConnection()).nodeId() == null && uuid != null)
-            JdbcQueryTask.remove(uuid);
+        if (uuid != null) {
+            if (((JdbcConnection)stmt.getConnection()).nodeId() == null)
+                JdbcQueryTask.remove(uuid);
+            else {
+                JdbcConnection conn = (JdbcConnection)stmt.getConnection();
+
+                if (conn.isCloseCursorTaskSupported()) {
+                    Ignite ignite = conn.ignite();
+
+                    ignite.compute(ignite.cluster().forNodeId(conn.nodeId())).call(new JdbcCloseCursorTask(uuid));
+                }
+            }
+        }
 
         closed = true;
     }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStatement.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStatement.java
index fb4d8e9..244a9f7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStatement.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStatement.java
@@ -25,13 +25,9 @@ import java.sql.SQLWarning;
 import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
-import java.util.Set;
 import java.util.UUID;
+
 import org.apache.ignite.Ignite;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.internal.processors.odbc.SqlStateCode;
@@ -65,12 +61,6 @@ public class JdbcStatement implements Statement {
     /** Fetch size. */
     private int fetchSize = DFLT_FETCH_SIZE;
 
-    /** Result sets. */
-    final Set<JdbcResultSet> resSets = new HashSet<>();
-
-    /** Fields indexes. */
-    Map<String, Integer> fieldsIdxs = new HashMap<>();
-
     /** Batch of statements. */
     private List<String> batch;
 
@@ -239,13 +229,7 @@ public class JdbcStatement implements Statement {
      * @throws SQLException On error.
      */
     void closeInternal() throws SQLException {
-        for (Iterator<JdbcResultSet> it = resSets.iterator(); it.hasNext(); ) {
-            JdbcResultSet rs = it.next();
-
-            rs.closeInternal();
-
-            it.remove();
-        }
+        closeResults();
 
         closed = true;
     }
@@ -696,5 +680,4 @@ public class JdbcStatement implements Statement {
             curRes = 0;
         }
     }
-
 }
diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties
index 2396d1d..ea469c3 100644
--- a/modules/core/src/main/resources/META-INF/classnames.properties
+++ b/modules/core/src/main/resources/META-INF/classnames.properties
@@ -387,6 +387,7 @@ org.apache.ignite.internal.jdbc.thin.ConnectionPropertiesImpl$NumberProperty
 org.apache.ignite.internal.jdbc.thin.ConnectionPropertiesImpl$PropertyValidator
 org.apache.ignite.internal.jdbc.thin.ConnectionPropertiesImpl$StringProperty
 org.apache.ignite.internal.jdbc2.JdbcBatchUpdateTask
+org.apache.ignite.internal.jdbc2.JdbcCloseCursorTask
 org.apache.ignite.internal.jdbc2.JdbcConnection$JdbcConnectionValidationTask
 org.apache.ignite.internal.jdbc2.JdbcDatabaseMetadata$UpdateMetadataTask
 org.apache.ignite.internal.jdbc2.JdbcQueryMultipleStatementsTask