You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/02/16 09:13:06 UTC
[4/6] ignite git commit: IGNITE-2325 - Fixed assertion in optimistic
tx prepare future on remap - Fixes #434.
IGNITE-2325 - Fixed assertion in optimistic tx prepare future on remap - Fixes #434.
Signed-off-by: Alexey Goncharuk <al...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6247ac71
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6247ac71
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6247ac71
Branch: refs/heads/ignite-1232
Commit: 6247ac719a5a7643a317c8e4574565c15fe2e588
Parents: 0e0d94c
Author: Alexey Goncharuk <al...@gmail.com>
Authored: Mon Feb 15 20:12:04 2016 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Mon Feb 15 20:12:04 2016 +0300
----------------------------------------------------------------------
.../near/GridNearOptimisticTxPrepareFuture.java | 18 +-
.../IgniteCacheNearRestartRollbackSelfTest.java | 276 +++++++++++++++++++
.../testsuites/IgniteCacheRestartTestSuite.java | 2 +
3 files changed, 288 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/6247ac71/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index 8476dc3..f146071 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -263,9 +263,9 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
IgniteTxEntry singleWrite = tx.singleWrite();
if (singleWrite != null)
- prepareSingle(singleWrite, topLocked);
+ prepareSingle(singleWrite, topLocked, remap);
else
- prepare(tx.writeEntries(), topLocked);
+ prepare(tx.writeEntries(), topLocked, remap);
markInitialized();
}
@@ -278,7 +278,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
* @param write Write.
* @param topLocked {@code True} if thread already acquired lock preventing topology change.
*/
- private void prepareSingle(IgniteTxEntry write, boolean topLocked) {
+ private void prepareSingle(IgniteTxEntry write, boolean topLocked, boolean remap) {
write.clearEntryReadVersion();
AffinityTopologyVersion topVer = tx.topologyVersion();
@@ -287,7 +287,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
txMapping = new GridDhtTxMapping();
- GridDistributedTxMapping mapping = map(write, topVer, null, topLocked);
+ GridDistributedTxMapping mapping = map(write, topVer, null, topLocked, remap);
if (mapping.node().isLocal()) {
if (write.context().isNear())
@@ -325,7 +325,8 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
*/
private void prepare(
Iterable<IgniteTxEntry> writes,
- boolean topLocked
+ boolean topLocked,
+ boolean remap
) {
AffinityTopologyVersion topVer = tx.topologyVersion();
@@ -343,7 +344,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
for (IgniteTxEntry write : writes) {
write.clearEntryReadVersion();
- GridDistributedTxMapping updated = map(write, topVer, cur, topLocked);
+ GridDistributedTxMapping updated = map(write, topVer, cur, topLocked, remap);
if (cur != updated) {
mappings.offer(updated);
@@ -508,7 +509,8 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
IgniteTxEntry entry,
AffinityTopologyVersion topVer,
@Nullable GridDistributedTxMapping cur,
- boolean topLocked
+ boolean topLocked,
+ boolean remap
) {
GridCacheContext cacheCtx = entry.context();
@@ -542,7 +544,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
entry.cached(cacheCtx.local().entryEx(entry.key(), topVer));
if (cacheCtx.isNear() || cacheCtx.isLocal()) {
- if (entry.explicitVersion() == null) {
+ if (entry.explicitVersion() == null && !remap) {
if (keyLockFut == null) {
keyLockFut = new KeyLockFuture();
http://git-wip-us.apache.org/repos/asf/ignite/blob/6247ac71/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheNearRestartRollbackSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheNearRestartRollbackSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheNearRestartRollbackSelfTest.java
new file mode 100644
index 0000000..6941bcc
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheNearRestartRollbackSelfTest.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicLong;
+import javax.cache.Cache;
+import javax.cache.CacheException;
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheRebalanceMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cluster.ClusterTopologyException;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionRollbackException;
+
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+
+/**
+ *
+ */
+public class IgniteCacheNearRestartRollbackSelfTest extends GridCommonAbstractTest {
+ /** Shared IP finder. */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /**
+ * The number of entries to put to the test cache.
+ */
+ private static final int ENTRY_COUNT = 100;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+
+ discoSpi.setIpFinder(IP_FINDER);
+
+ cfg.setDiscoverySpi(discoSpi);
+
+ cfg.setCacheConfiguration(cacheConfiguration(gridName));
+
+ if (getTestGridName(3).equals(gridName))
+ cfg.setClientMode(true);
+
+ TcpCommunicationSpi commSpi = new TcpCommunicationSpi();
+
+ commSpi.setSharedMemoryPort(-1);
+
+ cfg.setCommunicationSpi(commSpi);
+
+ return cfg;
+ }
+
+ /**
+ * @param gridName Grid name.
+ * @return Cache configuration.
+ */
+ protected CacheConfiguration<Object, Object> cacheConfiguration(String gridName) {
+ CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
+
+ ccfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
+
+ ccfg.setBackups(1);
+
+ ccfg.setNearConfiguration(new NearCacheConfiguration<>());
+
+ ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+
+ ccfg.setRebalanceMode(CacheRebalanceMode.SYNC);
+
+ return ccfg;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
+ public void testRestarts() throws Exception {
+ startGrids(4);
+
+ Ignite tester = ignite(3);
+
+ final AtomicLong lastUpdateTs = new AtomicLong(System.currentTimeMillis());
+
+ try {
+ Set<Integer> keys = new LinkedHashSet<>();
+
+ for (int i = 0; i < ENTRY_COUNT; i++)
+ keys.add(i);
+
+ IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ for (int i = 0; i < 50; i++) {
+ stopGrid(0);
+
+ startGrid(0);
+
+ stopGrid(1);
+
+ startGrid(1);
+
+ stopGrid(2);
+
+ startGrid(2);
+
+ synchronized (lastUpdateTs) {
+ while (System.currentTimeMillis() - lastUpdateTs.get() > 1_000) {
+ info("Will wait for an update operation to finish.");
+
+ lastUpdateTs.wait(1_000);
+ }
+ }
+ }
+
+ return null;
+ }
+ });
+
+ int currentValue = 0;
+ boolean invoke = false;
+
+ while (!fut.isDone()) {
+ updateCache(tester, currentValue, invoke, false, keys);
+
+ updateCache(tester, currentValue + 1, invoke, true, keys);
+
+ invoke = !invoke;
+ currentValue++;
+
+ synchronized (lastUpdateTs) {
+ lastUpdateTs.set(System.currentTimeMillis());
+
+ lastUpdateTs.notifyAll();
+ }
+ }
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+
+ /**
+ * Updates the cache or rollback the update.
+ *
+ * @param ignite Ignite instance to use.
+ * @param newValue the new value to put to the entries
+ * @param invoke whether to use invokeAll() or putAll()
+ * @param rollback whether to rollback the changes or commit
+ * @param keys Collection of keys to update.
+ */
+ private void updateCache(
+ Ignite ignite,
+ int newValue,
+ boolean invoke,
+ boolean rollback,
+ Set<Integer> keys
+ ) {
+ final IgniteCache<Integer, Integer> cache = ignite.cache(null);
+
+ if (rollback) {
+ while (true) {
+ try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ updateEntries(cache, newValue, invoke, keys);
+
+ tx.rollback();
+
+ break;
+ }
+ catch (CacheException e) {
+ if (e.getCause() instanceof ClusterTopologyException) {
+ ClusterTopologyException topEx = (ClusterTopologyException)e.getCause();
+
+ topEx.retryReadyFuture().get();
+ }
+ else
+ throw e;
+ }
+ catch (ClusterTopologyException e) {
+ IgniteFuture<?> fut = e.retryReadyFuture();
+
+ fut.get();
+ }
+ catch (TransactionRollbackException ignore) {
+ // Safe to retry right away.
+ }
+ }
+ }
+ else
+ updateEntries(cache, newValue, invoke, keys);
+ }
+
+ /**
+ * Update the cache using either invokeAll() or putAll().
+ *
+ * @param cache the cache
+ * @param newValue the new value to put to the entries
+ * @param invoke whether to use invokeAll() or putAll()
+ */
+ private void updateEntries(
+ Cache<Integer, Integer> cache,
+ int newValue,
+ boolean invoke,
+ Set<Integer> keys
+ ) {
+ if (invoke)
+ cache.invokeAll(keys, new IntegerSetValue(newValue));
+ else {
+ final Map<Integer, Integer> entries = new HashMap<>(ENTRY_COUNT);
+
+ for (final Integer key : keys)
+ entries.put(key, newValue);
+
+ cache.putAll(entries);
+ }
+ }
+
+ /**
+ * {@link EntryProcessor} used to update the entry value.
+ */
+ private static class IntegerSetValue implements EntryProcessor<Integer, Integer, Boolean>, Serializable {
+ /** */
+ private final int newValue;
+
+ /**
+ * @param newValue New value.
+ */
+ private IntegerSetValue(final int newValue) {
+ this.newValue = newValue;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Boolean process(MutableEntry<Integer, Integer> entry, Object... arguments)
+ throws EntryProcessorException {
+ entry.setValue(newValue);
+
+ return Boolean.TRUE;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6247ac71/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite.java
index a6bd785..9040ea5 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite.java
@@ -19,6 +19,7 @@ package org.apache.ignite.testsuites;
import junit.framework.TestSuite;
import org.apache.ignite.internal.processors.cache.IgniteCacheCreateRestartSelfTest;
+import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheNearRestartRollbackSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridCachePartitionedNearDisabledOptimisticTxNodeRestartTest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridCachePartitionedNodeRestartTest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridCachePartitionedOptimisticTxNodeRestartTest;
@@ -39,6 +40,7 @@ public class IgniteCacheRestartTestSuite extends TestSuite {
suite.addTestSuite(GridCachePartitionedOptimisticTxNodeRestartTest.class);
suite.addTestSuite(GridCacheReplicatedNodeRestartSelfTest.class);
suite.addTestSuite(GridCachePartitionedNearDisabledOptimisticTxNodeRestartTest.class);
+ suite.addTestSuite(IgniteCacheNearRestartRollbackSelfTest.class);
suite.addTestSuite(IgniteCacheCreateRestartSelfTest.class);