You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/06/27 09:37:46 UTC
[04/10] ignite git commit: IGNITE-3339 - Fixed entries eviction.
IGNITE-3339 - Fixed entries eviction.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b634a680
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b634a680
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b634a680
Branch: refs/heads/ignite-3361
Commit: b634a680735a83cfe4132aadfbda49dc8cb3bb0d
Parents: 8218a2a
Author: Alexey Goncharuk <al...@gmail.com>
Authored: Wed Jun 22 06:34:57 2016 -0700
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Wed Jun 22 06:34:57 2016 -0700
----------------------------------------------------------------------
.../distributed/near/GridNearGetFuture.java | 8 +-
.../transactions/IgniteTxLocalAdapter.java | 12 +-
.../cache/transactions/IgniteTxManager.java | 16 +-
.../IgniteCacheTxIteratorSelfTest.java | 241 +++++++++++++++++++
.../testsuites/IgniteCacheTestSuite5.java | 2 +
5 files changed, 266 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/b634a680/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
index 73b9d38..d94839c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
@@ -62,8 +62,6 @@ import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;
-import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
-
/**
*
*/
@@ -630,7 +628,7 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
catch (GridCacheEntryRemovedException ignored) {
// Retry.
}
- catch (GridDhtInvalidPartitionException e) {
+ catch (GridDhtInvalidPartitionException ignored) {
return false;
}
catch (IgniteCheckedException e) {
@@ -639,7 +637,9 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
return false;
}
finally {
- if (dhtEntry != null && (tx == null || (!tx.implicit() && tx.isolation() == READ_COMMITTED)))
+ if (dhtEntry != null)
+ // Near cache is enabled, so near entry will be enlisted in the transaction.
+ // Always touch DHT entry in this case.
dht.context().evicts().touch(dhtEntry, topVer);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b634a680/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 28ecda5..105a582 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -1401,11 +1401,15 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
log.debug("Got removed entry in transaction getAllAsync(..) (will retry): " + key);
}
finally {
- if (cacheCtx.isNear() && entry != null && readCommitted()) {
- if (cacheCtx.affinity().belongs(cacheCtx.localNode(), entry.partition(), topVer)) {
- if (entry.markObsolete(xidVer))
- cacheCtx.cache().removeEntry(entry);
+ if (entry != null && readCommitted()) {
+ if (cacheCtx.isNear()) {
+ if (cacheCtx.affinity().belongs(cacheCtx.localNode(), entry.partition(), topVer)) {
+ if (entry.markObsolete(xidVer))
+ cacheCtx.cache().removeEntry(entry);
+ }
}
+ else
+ entry.context().evicts().touch(entry, topVer);
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b634a680/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index 6e8f9fa..2aa174a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -1252,7 +1252,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
* @param tx Transaction to finish.
* @param commit {@code True} if transaction is committed, {@code false} if rolled back.
*/
- public void fastFinishTx(IgniteInternalTx tx, boolean commit) {
+ public void fastFinishTx(GridNearTxLocal tx, boolean commit) {
assert tx != null;
assert tx.writeMap().isEmpty();
assert tx.optimistic() || tx.readMap().isEmpty();
@@ -1263,16 +1263,22 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
// 1. Notify evictions.
notifyEvitions(tx);
- // 2. Remove obsolete entries.
+ // 2. Evict near entries.
+ if (!tx.readMap().isEmpty()) {
+ for (IgniteTxEntry entry : tx.readMap().values())
+ tx.evictNearEntry(entry, false);
+ }
+
+ // 3. Remove obsolete entries.
removeObsolete(tx);
- // 3. Remove from per-thread storage.
+ // 4. Remove from per-thread storage.
clearThreadMap(tx);
- // 4. Clear context.
+ // 5. Clear context.
resetContext();
- // 5. Update metrics.
+ // 6. Update metrics.
if (!tx.dht() && tx.local()) {
if (!tx.system()) {
if (commit)
http://git-wip-us.apache.org/repos/asf/ignite/blob/b634a680/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheTxIteratorSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheTxIteratorSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheTxIteratorSelfTest.java
new file mode 100644
index 0000000..769a5f6
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheTxIteratorSelfTest.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMemoryMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cache.eviction.fifo.FifoEvictionPolicy;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.configuration.TransactionConfiguration;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+
+import javax.cache.Cache;
+
+/**
+ *
+ */
+public class IgniteCacheTxIteratorSelfTest extends GridCommonAbstractTest {
+ /** */
+ public static final String CACHE_NAME = "testCache";
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ final IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ final TransactionConfiguration txCfg = new TransactionConfiguration();
+
+ txCfg.setDefaultTxIsolation(TransactionIsolation.READ_COMMITTED);
+
+ cfg.setTransactionConfiguration(txCfg);
+
+ return cfg;
+ }
+
+ /**
+ * @return Cache configuration.
+ */
+ private CacheConfiguration<String, TestClass> cacheConfiguration(
+ CacheMode mode,
+ CacheAtomicityMode atomMode,
+ CacheMemoryMode memMode,
+ boolean nearEnabled,
+ boolean useEvictPlc
+ ) {
+ final CacheConfiguration<String, TestClass> ccfg = new CacheConfiguration<>(CACHE_NAME);
+
+ ccfg.setAtomicityMode(atomMode);
+ ccfg.setCacheMode(mode);
+ ccfg.setMemoryMode(memMode);
+ ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+
+ if (nearEnabled)
+ ccfg.setNearConfiguration(new NearCacheConfiguration<String, TestClass>());
+
+ if (memMode == CacheMemoryMode.ONHEAP_TIERED && useEvictPlc) {
+ ccfg.setOffHeapMaxMemory(10 * 1024 * 1024);
+ ccfg.setEvictionPolicy(new FifoEvictionPolicy(50));
+ }
+
+ return ccfg;
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ public void testModesSingleNode() throws Exception {
+ checkModes(1);
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ public void testModesMultiNode() throws Exception {
+ checkModes(3);
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ public void checkModes(int gridCnt) throws Exception {
+ startGrids(gridCnt);
+
+ try {
+ for (CacheMode mode : CacheMode.values()) {
+ for (CacheAtomicityMode atomMode : CacheAtomicityMode.values()) {
+ for (CacheMemoryMode memMode : CacheMemoryMode.values()) {
+ if (mode == CacheMode.PARTITIONED) {
+ // Near cache makes sense only for partitioned cache.
+ checkTxCache(CacheMode.PARTITIONED, atomMode, memMode, true, false);
+ }
+
+ if (memMode == CacheMemoryMode.ONHEAP_TIERED)
+ checkTxCache(mode, atomMode, CacheMemoryMode.ONHEAP_TIERED, false, true);
+
+ checkTxCache(mode, atomMode, memMode, false, false);
+ }
+ }
+ }
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ private void checkTxCache(
+ CacheMode mode,
+ CacheAtomicityMode atomMode,
+ CacheMemoryMode memMode,
+ boolean nearEnabled,
+ boolean useEvicPlc
+ ) throws Exception {
+ final Ignite ignite = grid(0);
+
+ final CacheConfiguration<String, TestClass> ccfg = cacheConfiguration(
+ mode,
+ atomMode,
+ memMode,
+ nearEnabled,
+ useEvicPlc);
+
+ final IgniteCache<String, TestClass> cache = ignite.createCache(ccfg);
+
+ info("Checking cache [mode=" + mode + ", atomMode=" + atomMode + ", memMode=" + memMode +
+ ", near=" + nearEnabled + ']');
+
+ try {
+ for (int i = 0; i < 30; i++) {
+ final TestClass val = new TestClass("data");
+ final String key = "key-" + i;
+
+ cache.put(key, val);
+
+ assertEquals(i + 1, cache.size());
+
+ for (TransactionIsolation iso : TransactionIsolation.values()) {
+ for (TransactionConcurrency con : TransactionConcurrency.values()) {
+ try (Transaction transaction = ignite.transactions().txStart(con, iso)) {
+ assertEquals(val, cache.get(key));
+
+ transaction.commit();
+ }
+
+ int cnt = iterateOverKeys(cache);
+
+ assertEquals("Failed [con=" + con + ", iso=" + iso + ']', i + 1, cnt);
+
+ assertEquals("Failed [con=" + con + ", iso=" + iso + ']', i + 1, cache.size());
+ }
+ }
+ }
+ }
+ finally {
+ grid(0).destroyCache(CACHE_NAME);
+ }
+ }
+
+ /**
+ * @param cache Cache.
+ */
+ @SuppressWarnings("TypeMayBeWeakened")
+ private int iterateOverKeys(final IgniteCache<String, TestClass> cache) {
+ int cnt = 0;
+
+ for (final Cache.Entry<String, TestClass> ignore : cache)
+ cnt++;
+
+ return cnt;
+ }
+
+ /**
+ *
+ */
+ private static class TestClass {
+ /** */
+ private String data;
+
+ /**
+ * @param data Data.
+ */
+ private TestClass(String data) {
+ this.data = data;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ final TestClass testCls = (TestClass)o;
+
+ return data != null ? data.equals(testCls.data) : testCls.data == null;
+
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public int hashCode() {
+ return data != null ? data.hashCode() : 0;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public String toString() {
+ return S.toString(TestClass.class, this);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/b634a680/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java
index c5c5c67..98a3c44 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java
@@ -23,6 +23,7 @@ import org.apache.ignite.internal.processors.cache.CacheSerializableTransactions
import org.apache.ignite.internal.processors.cache.EntryVersionConsistencyReadThroughTest;
import org.apache.ignite.internal.processors.cache.IgniteCachePutStackOverflowSelfTest;
import org.apache.ignite.internal.processors.cache.IgniteCacheStoreCollectionTest;
+import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheTxIteratorSelfTest;
import org.apache.ignite.internal.processors.cache.store.IgniteCacheWriteBehindNoUpdateSelfTest;
/**
@@ -42,6 +43,7 @@ public class IgniteCacheTestSuite5 extends TestSuite {
suite.addTestSuite(IgniteCacheWriteBehindNoUpdateSelfTest.class);
suite.addTestSuite(IgniteCachePutStackOverflowSelfTest.class);
suite.addTestSuite(EntryVersionConsistencyReadThroughTest.class);
+ suite.addTestSuite(IgniteCacheTxIteratorSelfTest.class);
return suite;
}