You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by pp...@apache.org on 2022/10/17 08:50:06 UTC
[ignite] branch master updated: IGNITE-15245 Fixed JDBC connection leak with cache.invoke() over write-behind enabled cache (#10302)
This is an automated email from the ASF dual-hosted git repository.
ppa pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new f3792036b7c IGNITE-15245 Fixed JDBC connection leak with cache.invoke() over write-behind enabled cache (#10302)
f3792036b7c is described below
commit f3792036b7ccc5e41953f55bb5bdc3d6030e84b4
Author: Pavel Pereslegin <xx...@gmail.com>
AuthorDate: Mon Oct 17 11:49:58 2022 +0300
IGNITE-15245 Fixed JDBC connection leak with cache.invoke() over write-behind enabled cache (#10302)
---
.../cache/store/GridCacheWriteBehindStore.java | 4 +-
...CacheJdbcPojoWriteBehindConnectionLeakTest.java | 134 +++++++++++++++++++++
...teCacheStoreSessionWriteBehindAbstractTest.java | 2 +-
.../ignite/testsuites/IgniteCacheTestSuite.java | 2 +
4 files changed, 140 insertions(+), 2 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStore.java
index 0cb1d90d656..ae647957cd8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStore.java
@@ -577,7 +577,9 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
/** {@inheritDoc} */
@Override public void sessionEnd(boolean commit) {
- // No-op.
+ // To prevent connection leaks, we must call CacheStore#sessionEnd
+ // on stores that manage connections themselves.
+ store.sessionEnd(commit);
}
/** {@inheritDoc} */
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoWriteBehindConnectionLeakTest.java b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoWriteBehindConnectionLeakTest.java
new file mode 100644
index 00000000000..cb255c7a4bf
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoWriteBehindConnectionLeakTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.jdbc;
+
+import java.io.Serializable;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Types;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.store.jdbc.dialect.H2Dialect;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.h2.jdbcx.JdbcConnectionPool;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Ensures that there are no connection leaks when working with an external data source.
+ */
+public class CacheJdbcPojoWriteBehindConnectionLeakTest extends GridCommonAbstractTest {
+ /** Table name. */
+ private static final String TABLE_NAME = "person";
+
+ /** Connection pool. */
+ private static final JdbcConnectionPool pool = (JdbcConnectionPool)new H2DataSourceFactory().create();
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ return super.getConfiguration(igniteInstanceName).setCacheConfiguration(cacheConfig());
+ }
+
+ /**
+ * @return Cache configuration.
+ */
+ public CacheConfiguration<Integer, Person> cacheConfig() {
+ CacheConfiguration<Integer, Person> cacheConfig = new CacheConfiguration<Integer, Person>()
+ .setName(DEFAULT_CACHE_NAME)
+ .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
+ .setReadThrough(true)
+ .setWriteThrough(true)
+ .setWriteBehindEnabled(true)
+ .setWriteBehindFlushFrequency(1_000);
+
+ CacheJdbcPojoStoreFactory<Integer, Person> cacheStoreFactory = new CacheJdbcPojoStoreFactory<>();
+
+ cacheStoreFactory.setDataSourceFactory(() -> pool)
+ .setDialect(new H2Dialect())
+ .setTypes(new JdbcType()
+ .setCacheName(DEFAULT_CACHE_NAME)
+ .setDatabaseTable(TABLE_NAME)
+ .setKeyType(Integer.class)
+ .setKeyFields(new JdbcTypeField(Types.INTEGER, "id", Integer.class, "id"))
+ .setValueType(Person.class)
+ .setValueFields(new JdbcTypeField(Types.VARCHAR, "name", String.class, "name"))
+ );
+
+ cacheConfig.setCacheStoreFactory(cacheStoreFactory);
+
+ return cacheConfig;
+ }
+
+ /** */
+ @Before
+ public void setUp() throws SQLException {
+ execStandaloneQuery("CREATE TABLE IF NOT EXISTS " + TABLE_NAME + "(ID INT UNSIGNED PRIMARY KEY, NAME VARCHAR(20))");
+ }
+
+ /** */
+ @After
+ public void tearDown() throws SQLException {
+ execStandaloneQuery("DROP TABLE " + TABLE_NAME);
+ }
+
+ /** */
+ @Test
+ public void testInvoke() throws Exception {
+ try (Ignite ignite = startGrid(0)) {
+ IgniteCache<Integer, Person> cache = ignite.getOrCreateCache(DEFAULT_CACHE_NAME);
+
+ try (Transaction tx = ignite.transactions().txStart()) {
+ cache.invoke(0, (entry, arg) -> true);
+
+ tx.commit();
+ }
+ }
+
+ // We close ignite before assertion to be sure that write-behind flushing is finished.
+ assertEquals(0, pool.getActiveConnections());
+ }
+
+ /**
+ * @param sql SQL query.
+ * @throws SQLException If failed.
+ */
+ private void execStandaloneQuery(String sql) throws SQLException {
+ try (Connection connection = pool.getConnection();
+ PreparedStatement statement = connection.prepareStatement(sql)) {
+ statement.execute();
+ }
+ }
+
+ /** */
+ public static class Person implements Serializable {
+ /** Serial version UID. */
+ private static final long serialVersionUID = 0;
+
+ /** */
+ private Integer id;
+
+ /** */
+ private String name;
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheStoreSessionWriteBehindAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheStoreSessionWriteBehindAbstractTest.java
index 52a1d54863d..0a86ed99bb9 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheStoreSessionWriteBehindAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheStoreSessionWriteBehindAbstractTest.java
@@ -184,7 +184,7 @@ public abstract class IgniteCacheStoreSessionWriteBehindAbstractTest extends Ign
/** {@inheritDoc} */
@Override public void sessionEnd(boolean commit) throws CacheWriterException {
- fail();
+ // No-op.
}
/** {@inheritDoc} */
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
index b1d8b7d3f8b..8a2359b10be 100755
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
@@ -32,6 +32,7 @@ import org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStoreBinaryMarshallerStor
import org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStoreBinaryMarshallerWithSqlEscapeSelfTest;
import org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStoreMultitreadedSelfTest;
import org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStoreTest;
+import org.apache.ignite.cache.store.jdbc.CacheJdbcPojoWriteBehindConnectionLeakTest;
import org.apache.ignite.cache.store.jdbc.CacheJdbcPojoWriteBehindStoreWithCoalescingTest;
import org.apache.ignite.cache.store.jdbc.GridCacheJdbcBlobStoreMultithreadedSelfTest;
import org.apache.ignite.cache.store.jdbc.GridCacheJdbcBlobStoreSelfTest;
@@ -185,6 +186,7 @@ public class IgniteCacheTestSuite {
GridTestUtils.addTestIfNeeded(suite, CacheJdbcPojoStoreBinaryMarshallerStoreKeepBinaryWithSqlEscapeSelfTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, CacheJdbcPojoStoreMultitreadedSelfTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, CacheJdbcPojoWriteBehindStoreWithCoalescingTest.class, ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite, CacheJdbcPojoWriteBehindConnectionLeakTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, GridCacheBalancingStoreSelfTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, GridCacheAffinityApiSelfTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, GridCacheStoreValueBytesSelfTest.class, ignoredTests);