You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/07/17 08:24:21 UTC
[17/24] ignite git commit: ignite-5489 Fixed possible connection
leaks when loadPreviousValue set to true
ignite-5489 Fixed possible connection leaks when loadPreviousValue set to true
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1b2b26a8
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1b2b26a8
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1b2b26a8
Branch: refs/heads/ignite-5578
Commit: 1b2b26a82ea286472134a22619952c662b95033f
Parents: 7283edb
Author: Tikhonov Nikolay <ti...@gmail.com>
Authored: Wed Jun 21 17:55:05 2017 +0300
Committer: agura <ag...@gridgain.com>
Committed: Fri Jul 14 15:43:31 2017 +0300
----------------------------------------------------------------------
.../store/GridCacheStoreManagerAdapter.java | 7 +-
.../cache/CacheConnectionLeakStoreTxTest.java | 291 +++++++++++++++++++
.../testsuites/IgniteCacheTestSuite4.java | 2 +
3 files changed, 299 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/1b2b26a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
index c02e2c7..99541ba 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
@@ -327,7 +327,12 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
throw new IgniteCheckedException(new CacheLoaderException(e));
}
finally {
- sessionEnd0(tx, threwEx);
+ IgniteInternalTx tx0 = tx;
+
+ if (tx0 != null && (tx0.dht() && tx0.local()))
+ tx0 = null;
+
+ sessionEnd0(tx0, threwEx);
}
if (log.isDebugEnabled())
http://git-wip-us.apache.org/repos/asf/ignite/blob/1b2b26a8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheConnectionLeakStoreTxTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheConnectionLeakStoreTxTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheConnectionLeakStoreTxTest.java
new file mode 100644
index 0000000..611f2cd
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheConnectionLeakStoreTxTest.java
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.io.Serializable;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.cache.Cache;
+import javax.cache.configuration.Factory;
+import javax.cache.integration.CacheLoaderException;
+import javax.cache.integration.CacheWriterException;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.store.CacheStoreAdapter;
+import org.apache.ignite.cache.store.CacheStoreSession;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.resources.CacheStoreSessionResource;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.cache.TestCacheSession;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+
+import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+
+/**
+ *
+ */
+public class CacheConnectionLeakStoreTxTest extends GridCommonAbstractTest {
+ /** */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** Cache name. */
+ private static final String CACHE_NAME = "cache";
+
+ /** */
+ private static final int CLIENT_NODE = 1;
+
+ /** */
+ private static boolean client;
+
+ /** */
+ private static volatile boolean isLoadFromStore;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+ disco.setIpFinder(IP_FINDER);
+
+ cfg.setDiscoverySpi(disco);
+ cfg.setClientMode(client);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ startGrid(0);
+
+ client = true;
+
+ startGrid(CLIENT_NODE);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ isLoadFromStore = false;
+ TestStore.sessions.clear();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ stopAllGrids();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testConnectionLeakOneBackupAtomic() throws Exception {
+ checkConnectionLeak(CacheAtomicityMode.ATOMIC, null, null);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testConnectionLeakOneBackupAtomicLoadFromStore() throws Exception {
+ isLoadFromStore = true;
+
+ checkConnectionLeak(CacheAtomicityMode.ATOMIC, null, null);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testConnectionLeakOneBackupOptimisticRepeatableRead() throws Exception {
+ checkConnectionLeak(CacheAtomicityMode.TRANSACTIONAL, OPTIMISTIC, REPEATABLE_READ);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testConnectionLeakOneBackupOptimisticRepeatableReadLoadFromStore() throws Exception {
+ isLoadFromStore = true;
+
+ checkConnectionLeak(CacheAtomicityMode.TRANSACTIONAL, OPTIMISTIC, REPEATABLE_READ);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testConnectionLeakOneBackupOptimisticReadCommitted() throws Exception {
+ checkConnectionLeak(CacheAtomicityMode.TRANSACTIONAL, OPTIMISTIC, READ_COMMITTED);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testConnectionLeakOneBackupOptimisticReadCommittedLoadFromStore() throws Exception {
+ isLoadFromStore = true;
+
+ checkConnectionLeak(CacheAtomicityMode.TRANSACTIONAL, OPTIMISTIC, READ_COMMITTED);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testConnectionLeakOneBackupPessimisticRepeatableRead() throws Exception {
+ checkConnectionLeak(CacheAtomicityMode.TRANSACTIONAL, PESSIMISTIC, REPEATABLE_READ);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testConnectionLeakOneBackupPessimisticReadCommitted() throws Exception {
+ checkConnectionLeak(CacheAtomicityMode.TRANSACTIONAL, PESSIMISTIC, READ_COMMITTED);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testConnectionLeakOneBackupPessimisticReadCommittedLoadFromStore() throws Exception {
+ isLoadFromStore = true;
+
+ checkConnectionLeak(CacheAtomicityMode.TRANSACTIONAL, PESSIMISTIC, READ_COMMITTED);
+ }
+
+ /**
+ * @param atomicityMode Atomicity mode.
+ * @param txConcurrency Transaction concurrency.
+ * @param txIsolation Transaction isolation.
+ *
+ * @throws Exception If failed.
+ */
+ private void checkConnectionLeak(
+ CacheAtomicityMode atomicityMode,
+ TransactionConcurrency txConcurrency,
+ TransactionIsolation txIsolation
+ ) throws Exception {
+ CacheConfiguration<Integer, Integer> cacheCfg = new CacheConfiguration<>();
+
+ cacheCfg.setName(CACHE_NAME);
+ cacheCfg.setCacheMode(CacheMode.PARTITIONED);
+ cacheCfg.setAtomicityMode(atomicityMode);
+ cacheCfg.setCacheStoreFactory(new TestStoreFactory());
+ cacheCfg.setReadThrough(true);
+ cacheCfg.setWriteThrough(false);
+ cacheCfg.setLoadPreviousValue(true);
+
+ Ignite ignite = ignite(CLIENT_NODE);
+ IgniteCache<Integer, Integer> cache = ignite.createCache(cacheCfg);
+
+ try {
+ assertEquals(0, cache.size());
+
+ if (atomicityMode == CacheAtomicityMode.TRANSACTIONAL) {
+ try (Transaction tx = ignite.transactions().txStart(txConcurrency, txIsolation)) {
+ cacheOp(cache);
+
+ tx.commit();
+ }
+ }
+ else {
+ cacheOp(cache);
+ }
+
+ assertTrue("Session was leak on nodes: " + TestStore.sessions, TestStore.sessions.isEmpty());
+ }
+ finally {
+ cache.destroy();
+ }
+ }
+
+ /**
+ * @param cache Cache.
+ */
+ private void cacheOp(IgniteCache<Integer, Integer> cache) {
+ boolean b = cache.putIfAbsent(42, 42);
+
+ log.info("PutIfAbsent: " + b);
+
+ Integer val = cache.get(42);
+
+ log.info("Get: " + val);
+ }
+
+ /**
+ *
+ */
+ private static class TestStoreFactory implements Factory<CacheStoreAdapter<Integer, Integer>> {
+ /** {@inheritDoc} */
+ @Override public CacheStoreAdapter<Integer, Integer> create() {
+ return new TestStore();
+ }
+ }
+
+ /**
+ *
+ */
+ private static class TestStore extends CacheStoreAdapter<Integer, Integer> implements Serializable {
+ /** */
+ @IgniteInstanceResource
+ private Ignite ignite;
+
+ /** */
+ @CacheStoreSessionResource
+ private CacheStoreSession ses;
+
+ /** */
+ private CacheStoreSession NULL = new TestCacheSession();
+
+ /** */
+ public static ConcurrentHashMap<CacheStoreSession, ClusterNode> sessions = new ConcurrentHashMap<>();
+
+ /** {@inheritDoc} */
+ @Override public Integer load(Integer key) throws CacheLoaderException {
+ addSession();
+
+ return isLoadFromStore ? key : null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void write(Cache.Entry<? extends Integer, ? extends Integer> e) throws CacheWriterException {
+ addSession();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void delete(Object key) throws CacheWriterException {
+ addSession();
+ }
+
+ /** */
+ private void addSession() {
+ sessions.put(ses == null ? NULL : ses, ignite.cluster().localNode());
+ }
+
+ /** {@inheritDoc} */
+ @Override public void sessionEnd(boolean commit) {
+ sessions.remove(ses == null ? NULL : ses);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/1b2b26a8/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
index 45f575e..e7f38be 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
@@ -21,6 +21,7 @@ import junit.framework.TestSuite;
import org.apache.ignite.cache.store.jdbc.CacheJdbcStoreSessionListenerSelfTest;
import org.apache.ignite.internal.processors.GridCacheTxLoadFromStoreOnLockSelfTest;
import org.apache.ignite.internal.processors.cache.CacheClientStoreSelfTest;
+import org.apache.ignite.internal.processors.cache.CacheConnectionLeakStoreTxTest;
import org.apache.ignite.internal.processors.cache.CacheGetEntryOptimisticReadCommittedSeltTest;
import org.apache.ignite.internal.processors.cache.CacheGetEntryOptimisticRepeatableReadSeltTest;
import org.apache.ignite.internal.processors.cache.CacheGetEntryOptimisticSerializableSeltTest;
@@ -279,6 +280,7 @@ public class IgniteCacheTestSuite4 extends TestSuite {
suite.addTestSuite(CacheStoreUsageMultinodeStaticStartTxTest.class);
suite.addTestSuite(CacheStoreUsageMultinodeDynamicStartAtomicTest.class);
suite.addTestSuite(CacheStoreUsageMultinodeDynamicStartTxTest.class);
+ suite.addTestSuite(CacheConnectionLeakStoreTxTest.class);
suite.addTestSuite(GridCacheStoreManagerDeserializationTest.class);
suite.addTestSuite(GridLocalCacheStoreManagerDeserializationTest.class);