You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by pp...@apache.org on 2022/10/17 08:50:06 UTC

[ignite] branch master updated: IGNITE-15245 Fixed JDBC connection leak with cache.invoke() over write-behind enabled cache (#10302)

This is an automated email from the ASF dual-hosted git repository.

ppa pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new f3792036b7c IGNITE-15245 Fixed JDBC connection leak with cache.invoke() over write-behind enabled cache (#10302)
f3792036b7c is described below

commit f3792036b7ccc5e41953f55bb5bdc3d6030e84b4
Author: Pavel Pereslegin <xx...@gmail.com>
AuthorDate: Mon Oct 17 11:49:58 2022 +0300

    IGNITE-15245 Fixed JDBC connection leak with cache.invoke() over write-behind enabled cache (#10302)
---
 .../cache/store/GridCacheWriteBehindStore.java     |   4 +-
 ...CacheJdbcPojoWriteBehindConnectionLeakTest.java | 134 +++++++++++++++++++++
 ...teCacheStoreSessionWriteBehindAbstractTest.java |   2 +-
 .../ignite/testsuites/IgniteCacheTestSuite.java    |   2 +
 4 files changed, 140 insertions(+), 2 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStore.java
index 0cb1d90d656..ae647957cd8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStore.java
@@ -577,7 +577,9 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
 
     /** {@inheritDoc} */
     @Override public void sessionEnd(boolean commit) {
-        // No-op.
+        // To prevent connection leaks, we must call CacheStore#sessionEnd
+        // on stores that manage connections themselves.
+        store.sessionEnd(commit);
     }
 
     /** {@inheritDoc} */
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoWriteBehindConnectionLeakTest.java b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoWriteBehindConnectionLeakTest.java
new file mode 100644
index 00000000000..cb255c7a4bf
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoWriteBehindConnectionLeakTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.jdbc;
+
+import java.io.Serializable;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Types;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.store.jdbc.dialect.H2Dialect;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.h2.jdbcx.JdbcConnectionPool;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Ensures that there are no connection leaks when working with an external data source.
+ */
+public class CacheJdbcPojoWriteBehindConnectionLeakTest extends GridCommonAbstractTest {
+    /** Table name. */
+    private static final String TABLE_NAME = "person";
+
+    /** Connection pool. */
+    private static final JdbcConnectionPool pool = (JdbcConnectionPool)new H2DataSourceFactory().create();
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        return super.getConfiguration(igniteInstanceName).setCacheConfiguration(cacheConfig());
+    }
+
+    /**
+     * @return Cache configuration.
+     */
+    public CacheConfiguration<Integer, Person> cacheConfig() {
+        CacheConfiguration<Integer, Person> cacheConfig = new CacheConfiguration<Integer, Person>()
+            .setName(DEFAULT_CACHE_NAME)
+            .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
+            .setReadThrough(true)
+            .setWriteThrough(true)
+            .setWriteBehindEnabled(true)
+            .setWriteBehindFlushFrequency(1_000);
+
+        CacheJdbcPojoStoreFactory<Integer, Person> cacheStoreFactory = new CacheJdbcPojoStoreFactory<>();
+
+        cacheStoreFactory.setDataSourceFactory(() -> pool)
+            .setDialect(new H2Dialect())
+            .setTypes(new JdbcType()
+                .setCacheName(DEFAULT_CACHE_NAME)
+                .setDatabaseTable(TABLE_NAME)
+                .setKeyType(Integer.class)
+                .setKeyFields(new JdbcTypeField(Types.INTEGER, "id", Integer.class, "id"))
+                .setValueType(Person.class)
+                .setValueFields(new JdbcTypeField(Types.VARCHAR, "name", String.class, "name"))
+            );
+
+        cacheConfig.setCacheStoreFactory(cacheStoreFactory);
+
+        return cacheConfig;
+    }
+
+    /** */
+    @Before
+    public void setUp() throws SQLException {
+        execStandaloneQuery("CREATE TABLE IF NOT EXISTS " + TABLE_NAME + "(ID INT UNSIGNED PRIMARY KEY, NAME VARCHAR(20))");
+    }
+
+    /** */
+    @After
+    public void tearDown() throws SQLException {
+        execStandaloneQuery("DROP TABLE " + TABLE_NAME);
+    }
+
+    /** */
+    @Test
+    public void testInvoke() throws Exception {
+        try (Ignite ignite = startGrid(0)) {
+            IgniteCache<Integer, Person> cache = ignite.getOrCreateCache(DEFAULT_CACHE_NAME);
+
+            try (Transaction tx = ignite.transactions().txStart()) {
+                cache.invoke(0, (entry, arg) -> true);
+
+                tx.commit();
+            }
+        }
+
+        // We close ignite before assertion to be sure that write-behind flushing is finished.
+        assertEquals(0, pool.getActiveConnections());
+    }
+
+    /**
+     * @param sql SQL query.
+     * @throws SQLException If failed.
+     */
+    private void execStandaloneQuery(String sql) throws SQLException {
+        try (Connection connection = pool.getConnection();
+             PreparedStatement statement = connection.prepareStatement(sql)) {
+            statement.execute();
+        }
+    }
+
+    /** */
+    public static class Person implements Serializable {
+        /** Serial version UID. */
+        private static final long serialVersionUID = 0;
+
+        /** */
+        private Integer id;
+
+        /** */
+        private String name;
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheStoreSessionWriteBehindAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheStoreSessionWriteBehindAbstractTest.java
index 52a1d54863d..0a86ed99bb9 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheStoreSessionWriteBehindAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheStoreSessionWriteBehindAbstractTest.java
@@ -184,7 +184,7 @@ public abstract class IgniteCacheStoreSessionWriteBehindAbstractTest extends Ign
 
         /** {@inheritDoc} */
         @Override public void sessionEnd(boolean commit) throws CacheWriterException {
-            fail();
+            // No-op.
         }
 
         /** {@inheritDoc} */
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
index b1d8b7d3f8b..8a2359b10be 100755
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
@@ -32,6 +32,7 @@ import org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStoreBinaryMarshallerStor
 import org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStoreBinaryMarshallerWithSqlEscapeSelfTest;
 import org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStoreMultitreadedSelfTest;
 import org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStoreTest;
+import org.apache.ignite.cache.store.jdbc.CacheJdbcPojoWriteBehindConnectionLeakTest;
 import org.apache.ignite.cache.store.jdbc.CacheJdbcPojoWriteBehindStoreWithCoalescingTest;
 import org.apache.ignite.cache.store.jdbc.GridCacheJdbcBlobStoreMultithreadedSelfTest;
 import org.apache.ignite.cache.store.jdbc.GridCacheJdbcBlobStoreSelfTest;
@@ -185,6 +186,7 @@ public class IgniteCacheTestSuite {
         GridTestUtils.addTestIfNeeded(suite, CacheJdbcPojoStoreBinaryMarshallerStoreKeepBinaryWithSqlEscapeSelfTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, CacheJdbcPojoStoreMultitreadedSelfTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, CacheJdbcPojoWriteBehindStoreWithCoalescingTest.class, ignoredTests);
+        GridTestUtils.addTestIfNeeded(suite, CacheJdbcPojoWriteBehindConnectionLeakTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, GridCacheBalancingStoreSelfTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, GridCacheAffinityApiSelfTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, GridCacheStoreValueBytesSelfTest.class, ignoredTests);