You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/10/11 08:24:54 UTC
[6/7] ignite git commit: ignite-3478 Support for removes
http://git-wip-us.apache.org/repos/asf/ignite/blob/970cf47a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccVersionBasedSearchRow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccVersionBasedSearchRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccVersionBasedSearchRow.java
index c829afb..a1d0127 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccVersionBasedSearchRow.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccVersionBasedSearchRow.java
@@ -28,6 +28,9 @@ import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.jetbrains.annotations.Nullable;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.unmaskCoordinatorVersion;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.versionForRemovedValue;
+
/**
*
*/
@@ -66,16 +69,23 @@ public class MvccVersionBasedSearchRow extends SearchRow implements BPlusTree.Tr
{
boolean visible = true;
+ RowLinkIO rowIo = (RowLinkIO)io;
+
+ long crdVerMasked = rowIo.getMvccCoordinatorVersion(pageAddr, idx);
+
if (ver.activeTransactions().size() > 0) {
- RowLinkIO rowIo = (RowLinkIO)io;
+ long rowCrdVer = unmaskCoordinatorVersion(crdVerMasked);
// TODO IGNITE-3478 sort active transactions?
- if (rowIo.getMvccCoordinatorVersion(pageAddr, idx) == ver.coordinatorVersion())
+ if (rowCrdVer == ver.coordinatorVersion())
visible = !ver.activeTransactions().contains(rowIo.getMvccCounter(pageAddr, idx));
}
if (visible) {
- resRow = ((CacheDataTree) tree).getRow(io, pageAddr, idx, CacheDataRowAdapter.RowData.NO_KEY);
+ if (versionForRemovedValue(crdVerMasked))
+ resRow = null;
+ else
+ resRow = ((CacheDataTree)tree).getRow(io, pageAddr, idx, CacheDataRowAdapter.RowData.NO_KEY);
return false; // Stop search.
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/970cf47a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index 6ced2f9..30145ab 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -134,11 +134,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
/** Version which is less then any version generated on coordinator. */
private static final MvccCoordinatorVersion ISOLATED_STREAMER_MVCC_VER =
- new MvccCoordinatorVersionResponse(1L, CacheCoordinatorsProcessor.START_VER, 0L) {
- @Override public boolean initialLoad() {
- return true;
- }
- };
+ new MvccCoordinatorVersionResponse(1L, CacheCoordinatorsProcessor.START_VER, 0L);
/** Cache receiver. */
private StreamReceiver<K, V> rcvr = ISOLATED_UPDATER;
http://git-wip-us.apache.org/repos/asf/ignite/blob/970cf47a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccClusterRestartTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccClusterRestartTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccClusterRestartTest.java
new file mode 100644
index 0000000..ed7b62d
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccClusterRestartTest.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.MemoryConfiguration;
+import org.apache.ignite.configuration.PersistentStoreConfiguration;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+
+/**
+ *
+ */
+public class CacheMvccClusterRestartTest extends GridCommonAbstractTest {
+ /** */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.setConsistentId(gridName);
+
+ cfg.setMvccEnabled(true);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+
+ MemoryConfiguration memCfg = new MemoryConfiguration();
+ memCfg.setPageSize(1024);
+ memCfg.setDefaultMemoryPolicySize(100 * 1024 * 1024);
+
+ cfg.setMemoryConfiguration(memCfg);
+
+ cfg.setPersistentStoreConfiguration(new PersistentStoreConfiguration().setWalMode(WALMode.LOG_ONLY));
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ GridTestUtils.deleteDbFiles();
+
+ super.afterTestsStopped();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ GridTestUtils.deleteDbFiles();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ GridTestUtils.deleteDbFiles();
+
+ stopAllGrids();
+
+ super.afterTest();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testRestart1() throws Exception {
+ restart1(3, 3);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testRestart2() throws Exception {
+ restart1(1, 3);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testRestart3() throws Exception {
+ restart1(3, 1);
+ }
+
+ /**
+ * @param srvBefore Number of servers before restart.
+ * @param srvAfter Number of servers after restart.
+ * @throws Exception If failed.
+ */
+ private void restart1(int srvBefore, int srvAfter) throws Exception {
+ Ignite srv0 = startGridsMultiThreaded(srvBefore);
+
+ srv0.active(true);
+
+ IgniteCache<Object, Object> cache = srv0.createCache(cacheConfiguration());
+
+ Set<Integer> keys = new HashSet<>(primaryKeys(cache, 1, 0));
+
+ try (Transaction tx = srv0.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ for (Integer k : keys)
+ cache.put(k, k);
+
+ tx.commit();
+ }
+
+ stopAllGrids();
+
+ srv0 = startGridsMultiThreaded(srvAfter);
+
+ srv0.active(true);
+
+ cache = srv0.cache(DEFAULT_CACHE_NAME);
+
+ Map<Object, Object> res = cache.getAll(keys);
+
+ assertEquals(keys.size(), res.size());
+
+ for (Integer k : keys)
+ assertEquals(k, cache.get(k));
+
+ try (Transaction tx = srv0.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ for (Integer k : keys)
+ cache.put(k, k + 1);
+
+ tx.commit();
+ }
+
+ for (Integer k : keys)
+ assertEquals(k + 1, cache.get(k));
+ }
+
+ /**
+ * @return Cache configuration.
+ */
+ private CacheConfiguration<Object, Object> cacheConfiguration() {
+ CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME);
+
+ ccfg.setWriteSynchronizationMode(FULL_SYNC);
+ ccfg.setAtomicityMode(TRANSACTIONAL);
+ ccfg.setBackups(2);
+
+ return ccfg;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/970cf47a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
index 115e8a2..8bf9e39 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
@@ -17,7 +17,9 @@
package org.apache.ignite.internal.processors.cache.mvcc;
+import java.io.Serializable;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
@@ -45,6 +47,7 @@ import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.cluster.ClusterTopologyException;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.MemoryConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
@@ -119,6 +122,9 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
/** */
private String nodeAttr;
+ /** */
+ private static final int PAGE_SIZE = MemoryConfiguration.DFLT_PAGE_SIZE;
+
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
@@ -137,6 +143,12 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
if (nodeAttr != null)
cfg.setUserAttributes(F.asMap(nodeAttr, true));
+ MemoryConfiguration memCfg = new MemoryConfiguration();
+
+ memCfg.setPageSize(PAGE_SIZE);
+
+ cfg.setMemoryConfiguration(memCfg);
+
return cfg;
}
@@ -376,6 +388,147 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
/**
* @throws Exception If failed.
*/
+ public void testSimplePutRemove() throws Exception {
+ simplePutRemove(false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testSimplePutRemove_LargeKeys() throws Exception {
+ simplePutRemove(true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ * @param largeKeys {@code True} to use large keys (not fitting in single page).
+ */
+ private void simplePutRemove(boolean largeKeys) throws Exception {
+ Ignite node = startGrid(0);
+
+ IgniteTransactions txs = node.transactions();
+
+ final IgniteCache<Object, Object> cache = node.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 0, 1));
+
+ final int KEYS = 100;
+
+ checkValues(new HashMap<>(), cache);
+
+ try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ for (int k = 0; k < KEYS; k++)
+ cache.remove(testKey(largeKeys, k));
+
+ tx.commit();
+ }
+
+ checkValues(new HashMap<>(), cache);
+
+ Map<Object, Object> expVals = new HashMap<>();
+
+ try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ for (int k = 0; k < KEYS; k++) {
+ Object key = testKey(largeKeys, k);
+
+ expVals.put(key, k);
+
+ cache.put(key, k);
+ }
+
+ tx.commit();
+ }
+
+ checkValues(expVals, cache);
+
+ try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ for (int k = 0; k < KEYS; k++) {
+ if (k % 2 == 0) {
+ Object key = testKey(largeKeys, k);
+
+ cache.remove(key);
+
+ expVals.remove(key);
+ }
+ }
+
+ tx.commit();
+ }
+
+ checkValues(expVals, cache);
+
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ Object key = testKey(largeKeys, 0);
+
+ for (int i = 0; i < 500; i++) {
+ boolean rmvd;
+
+ try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ if (rnd.nextBoolean()) {
+ cache.remove(key);
+
+ rmvd = true;
+ }
+ else {
+ cache.put(key, i);
+
+ rmvd = false;
+ }
+
+ tx.commit();
+ }
+
+ if (rmvd) {
+ assertNull(cache.get(key));
+ assertTrue(cache.getAll(F.asSet(key)).isEmpty());
+ }
+ else {
+ assertEquals(i, cache.get(key));
+
+ Map<Object, Object> res = cache.getAll(F.asSet(key));
+
+ assertEquals(i, res.get(key));
+ }
+ }
+ }
+
+ /**
+ * @param largeKeys {@code True} to use large keys (not fitting in single page).
+ * @param idx Index.
+ * @return Key instance.
+ */
+ private static Object testKey(boolean largeKeys, int idx) {
+ if (largeKeys) {
+ int payloadSize = PAGE_SIZE + ThreadLocalRandom.current().nextInt(PAGE_SIZE * 10);
+
+ return new TestKey(idx, payloadSize);
+ }
+ else
+ return idx;
+ }
+
+ /**
+ * @param expVals Expected values.
+ * @param cache Cache.
+ */
+ private void checkValues(Map<Object, Object> expVals, IgniteCache<Object, Object> cache) {
+ for (Map.Entry<Object, Object> e : expVals.entrySet())
+ assertEquals(e.getValue(), cache.get(e.getKey()));
+
+ Map<Object, Object> res = cache.getAll(expVals.keySet());
+
+ assertEquals(expVals, res);
+
+ res = new HashMap<>();
+
+ for (IgniteCache.Entry<Object, Object> e : cache)
+ res.put(e.getKey(), e.getValue());
+
+ assertEquals(expVals, res);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testThreadUpdatesAreVisibleForThisThread() throws Exception {
final Ignite ignite = startGrid(0);
@@ -1181,42 +1334,49 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
* @throws Exception If failed.
*/
public void testAccountsTxGetAll_SingleNode() throws Exception {
- accountsTxGetAll(1, 0, 0, 64, ReadMode.GET_ALL);
+ accountsTxGetAll(1, 0, 0, 64, false, ReadMode.GET_ALL);
}
/**
* @throws Exception If failed.
*/
public void testAccountsTxGetAll_SingleNode_SinglePartition() throws Exception {
- accountsTxGetAll(1, 0, 0, 1, ReadMode.GET_ALL);
+ accountsTxGetAll(1, 0, 0, 1, false, ReadMode.GET_ALL);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAccountsTxGetAll_WithRemoves_SingleNode_SinglePartition() throws Exception {
+ accountsTxGetAll(1, 0, 0, 1, true, ReadMode.GET_ALL);
}
/**
* @throws Exception If failed.
*/
public void testAccountsTxGetAll_ClientServer_Backups0() throws Exception {
- accountsTxGetAll(4, 2, 0, 64, ReadMode.GET_ALL);
+ accountsTxGetAll(4, 2, 0, 64, false, ReadMode.GET_ALL);
}
/**
* @throws Exception If failed.
*/
public void testAccountsTxGetAll_ClientServer_Backups1() throws Exception {
- accountsTxGetAll(4, 2, 1, 64, ReadMode.GET_ALL);
+ accountsTxGetAll(4, 2, 1, 64, false, ReadMode.GET_ALL);
}
/**
* @throws Exception If failed.
*/
public void testAccountsTxGetAll_ClientServer_Backups2() throws Exception {
- accountsTxGetAll(4, 2, 2, 64, ReadMode.GET_ALL);
+ accountsTxGetAll(4, 2, 2, 64, false, ReadMode.GET_ALL);
}
/**
* @throws Exception If failed.
*/
public void testAccountsTxScan_SingleNode_SinglePartition() throws Exception {
- accountsTxGetAll(1, 0, 0, 1, ReadMode.SCAN);
+ accountsTxGetAll(1, 0, 0, 1, false, ReadMode.SCAN);
}
/**
@@ -1224,6 +1384,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
* @param clients Number of client nodes.
* @param cacheBackups Number of cache backups.
* @param cacheParts Number of cache partitions.
+ * @param withRmvs If {@code true} then in addition to puts tests also executes removes.
* @param readMode Read mode.
* @throws Exception If failed.
*/
@@ -1232,6 +1393,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
final int clients,
int cacheBackups,
int cacheParts,
+ final boolean withRmvs,
final ReadMode readMode
)
throws Exception
@@ -1261,6 +1423,8 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
}
};
+ final Set<Integer> rmvdIds = new HashSet<>();
+
GridInClosure3<Integer, List<IgniteCache>, AtomicBoolean> writer =
new GridInClosure3<Integer, List<IgniteCache>, AtomicBoolean>() {
@Override public void apply(Integer idx, List<IgniteCache> caches, AtomicBoolean stop) {
@@ -1285,8 +1449,8 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
keys.add(id1);
keys.add(id2);
- Integer cntr1;
- Integer cntr2;
+ Integer cntr1 = null;
+ Integer cntr2 = null;
try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
MvccTestAccount a1;
@@ -1297,28 +1461,74 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
a1 = accounts.get(id1);
a2 = accounts.get(id2);
- assertNotNull(a1);
- assertNotNull(a2);
+ if (!withRmvs) {
+ assertNotNull(a1);
+ assertNotNull(a2);
- cntr1 = a1.updateCnt + 1;
- cntr2 = a2.updateCnt + 1;
+ cntr1 = a1.updateCnt + 1;
+ cntr2 = a2.updateCnt + 1;
- cache.put(id1, new MvccTestAccount(a1.val + 1, cntr1));
- cache.put(id2, new MvccTestAccount(a2.val - 1, cntr2));
+ cache.put(id1, new MvccTestAccount(a1.val + 1, cntr1));
+ cache.put(id2, new MvccTestAccount(a2.val - 1, cntr2));
+ }
+ else {
+ if (a1 != null || a2 != null) {
+ if (a1 != null && a2 != null) {
+ Integer rmvd = null;
+
+ if (rnd.nextInt(10) == 0) {
+ synchronized (rmvdIds) {
+ if (rmvdIds.size() < ACCOUNTS / 2) {
+ rmvd = rnd.nextBoolean() ? id1 : id2;
+
+ assertTrue(rmvdIds.add(rmvd));
+ }
+ }
+ }
+
+ if (rmvd != null) {
+ cache.remove(rmvd);
+
+ cache.put(rmvd.equals(id1) ? id2 : id1,
+ new MvccTestAccount(a1.val + a2.val, 1));
+ }
+ else {
+ cache.put(id1, new MvccTestAccount(a1.val + 1, 1));
+ cache.put(id2, new MvccTestAccount(a2.val - 1, 1));
+ }
+ }
+ else {
+ if (a1 == null) {
+ cache.put(id1, new MvccTestAccount(100, 1));
+ cache.put(id2, new MvccTestAccount(a2.val - 100, 1));
+
+ assertTrue(rmvdIds.remove(id1));
+ }
+ else {
+ cache.put(id1, new MvccTestAccount(a1.val - 100, 1));
+ cache.put(id2, new MvccTestAccount(100, 1));
+
+ assertTrue(rmvdIds.remove(id2));
+ }
+ }
+ }
+ }
tx.commit();
}
- Map<Integer, MvccTestAccount> accounts = cache.getAll(keys);
+ if (!withRmvs) {
+ Map<Integer, MvccTestAccount> accounts = cache.getAll(keys);
- MvccTestAccount a1 = accounts.get(id1);
- MvccTestAccount a2 = accounts.get(id2);
+ MvccTestAccount a1 = accounts.get(id1);
+ MvccTestAccount a2 = accounts.get(id2);
- assertNotNull(a1);
- assertNotNull(a2);
+ assertNotNull(a1);
+ assertNotNull(a2);
- assertTrue(a1.updateCnt >= cntr1);
- assertTrue(a2.updateCnt >= cntr2);
+ assertTrue(a1.updateCnt >= cntr1);
+ assertTrue(a2.updateCnt >= cntr2);
+ }
}
info("Writer finished, updates: " + cnt);
@@ -1354,23 +1564,26 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
else
accounts = cache.getAll(keys);
- assertEquals(ACCOUNTS, accounts.size());
+ if (!withRmvs)
+ assertEquals(ACCOUNTS, accounts.size());
int sum = 0;
for (int i = 0; i < ACCOUNTS; i++) {
MvccTestAccount account = accounts.get(i);
- assertNotNull(account);
+ if (account != null) {
+ sum += account.val;
- sum += account.val;
+ Integer cntr = lastUpdateCntrs.get(i);
- Integer cntr = lastUpdateCntrs.get(i);
+ if (cntr != null)
+ assertTrue(cntr <= account.updateCnt);
- if (cntr != null)
- assertTrue(cntr <= account.updateCnt);
-
- lastUpdateCntrs.put(i, cntr);
+ lastUpdateCntrs.put(i, cntr);
+ }
+ else
+ assertTrue(withRmvs);
}
assertEquals(ACCOUNTS * ACCOUNT_START_VAL, sum);
@@ -1386,9 +1599,12 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
for (int i = 0; i < ACCOUNTS; i++) {
MvccTestAccount account = accounts.get(i);
- info("Account [id=" + i + ", val=" + account.val + ']');
+ assertTrue(account != null || withRmvs);
- sum += account.val;
+ info("Account [id=" + i + ", val=" + (account != null ? account.val : null) + ']');
+
+ if (account != null)
+ sum += account.val;
}
info("Sum: " + sum);
@@ -1601,7 +1817,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
/**
* @throws Exception If failed.
*/
- public void testRebalance1() throws Exception {
+ public void testSimpleRebalance() throws Exception {
Ignite srv0 = startGrid(0);
IgniteCache<Integer, Integer> cache = (IgniteCache)srv0.createCache(
@@ -1664,6 +1880,58 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
/**
* @throws Exception If failed.
*/
+ public void testSimpleRebalanceWithRemovedValues() throws Exception {
+ Ignite node = startGrid(0);
+
+ IgniteTransactions txs = node.transactions();
+
+ final IgniteCache<Object, Object> cache = node.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, 64));
+
+ try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ for (int k = 0; k < 100; k++)
+ cache.remove(k);
+
+ tx.commit();
+ }
+
+ Map<Object, Object> expVals = new HashMap<>();
+
+ try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ for (int k = 100; k < 200; k++) {
+ cache.put(k, k);
+
+ expVals.put(k, k);
+ }
+
+ tx.commit();
+ }
+
+ try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ for (int k = 100; k < 200; k++) {
+ if (k % 2 == 0) {
+ cache.remove(k);
+
+ expVals.remove(k);
+ }
+ }
+
+ tx.commit();
+ }
+
+ startGrid(1);
+
+ awaitPartitionMapExchange();
+
+ checkValues(expVals, jcache(1));
+
+ stopGrid(0);
+
+ checkValues(expVals, jcache(1));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testCoordinatorFailurePessimisticTx() throws Exception {
testSpi = true;
@@ -2722,9 +2990,55 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
assertEquals(KEYS, cache.size());
}
- // TODO IGNITE-3478: test removes.
- }
+ int size = KEYS;
+
+ for (int i = 0; i < KEYS; i++) {
+ if (i % 2 == 0) {
+ final Integer key = i;
+
+ try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ cache.remove(key);
+
+ tx.commit();
+ }
+
+ size--;
+
+ assertEquals(size, cache.size());
+ }
+ }
+
+ // Check size does not change if remove already removed keys.
+ for (int i = 0; i < KEYS; i++) {
+ if (i % 2 == 0) {
+ final Integer key = i;
+ try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ cache.remove(key);
+
+ tx.commit();
+ }
+
+ assertEquals(size, cache.size());
+ }
+ }
+
+ for (int i = 0; i < KEYS; i++) {
+ if (i % 2 == 0) {
+ final Integer key = i;
+
+ try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ cache.put(key, i);
+
+ tx.commit();
+ }
+
+ size++;
+
+ assertEquals(size, cache.size());
+ }
+ }
+ }
/**
* @throws IgniteCheckedException If failed.
@@ -2792,7 +3106,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
key0,
vers.get(0).get1());
- MvccCoordinatorVersionResponse ver = version(crd.currentCoordinator().coordinatorVersion(), 100000);
+ MvccCoordinatorVersionResponse ver = version(vers.get(0).get2().coordinatorVersion(), 100000);
for (int v = 0; v < vers.size(); v++) {
MvccCounter cntr = vers.get(v).get2();
@@ -3074,4 +3388,54 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
return null;
}
}
+
+ /**
+ *
+ */
+ static class TestKey implements Serializable {
+ /** */
+ private final int key;
+
+ /** */
+ private final byte[] payload;
+
+ /**
+ * @param key Key.
+ * @param payloadSize Payload size.
+ */
+ public TestKey(int key, int payloadSize) {
+ this.key = key;
+ this.payload = new byte[payloadSize];
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ TestKey testKey = (TestKey)o;
+
+ if (key != testKey.key)
+ return false;
+
+ return Arrays.equals(payload, testKey.payload);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ int res = key;
+
+ res = 31 * res + Arrays.hashCode(payload);
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return "TestKey [k=" + key + ", payloadLen=" + payload.length + ']';
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/970cf47a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/FreeListImplSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/FreeListImplSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/FreeListImplSelfTest.java
index d0897e1..600c8df 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/FreeListImplSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/FreeListImplSelfTest.java
@@ -435,6 +435,11 @@ public class FreeListImplSelfTest extends GridCommonAbstractTest {
@Override public long mvccCounter() {
return 0;
}
+
+ /** {@inheritDoc} */
+ @Override public boolean removed() {
+ return false;
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/970cf47a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java
index 392301c..1819cfe 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java
@@ -174,4 +174,9 @@ public abstract class GridH2Row implements SearchRow, CacheDataRow, Row {
@Override public long mvccCounter() {
throw new UnsupportedOperationException();
}
+
+ /** {@inheritDoc} */
+ @Override public boolean removed() {
+ throw new UnsupportedOperationException();
+ }
}
\ No newline at end of file