You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2018/10/23 15:01:40 UTC
[36/50] [abbrv] ignite git commit: IGNITE-9944: MVCC: Fixed "first
request" handling for DHT transactions. This closes #5040.
IGNITE-9944: MVCC: Fixed "first request" handling for DHT transactions. This closes #5040.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7504880a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7504880a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7504880a
Branch: refs/heads/ignite-gg-14206
Commit: 7504880a443737bf03bb03e455dbea7d8757ae27
Parents: 4bc08da
Author: ipavlukhin <vo...@gmail.com>
Authored: Mon Oct 22 22:15:20 2018 +0300
Committer: devozerov <pp...@gmail.com>
Committed: Mon Oct 22 22:15:20 2018 +0300
----------------------------------------------------------------------
.../dht/GridDhtTxAbstractEnlistFuture.java | 8 +-
.../CacheMvccRemoteTxOnNearNodeStartTest.java | 90 ++++++++++++++++++++
.../testsuites/IgniteCacheMvccTestSuite.java | 3 +
3 files changed, 100 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/7504880a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java
index e2c8237..68669b7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java
@@ -193,6 +193,9 @@ public abstract class GridDhtTxAbstractEnlistFuture<T> extends GridCacheFutureAd
/** Moving partitions. */
private Map<Integer, Boolean> movingParts;
+ /** Map for tracking nodes to which first request was already sent in order to send smaller subsequent requests. */
+ private final Set<ClusterNode> firstReqSent = new HashSet<>();
+
/**
* @param nearNodeId Near node ID.
* @param nearLockVer Near lock version.
@@ -823,9 +826,12 @@ public abstract class GridDhtTxAbstractEnlistFuture<T> extends GridCacheFutureAd
GridDhtTxQueryEnlistRequest req;
- if (newRemoteTx(node)) {
+ if (newRemoteTx(node))
addNewRemoteTxNode(node);
+ if (!firstReqSent.contains(node)) {
+ firstReqSent.add(node);
+
// If this is a first request to this node, send full info.
req = new GridDhtTxQueryFirstEnlistRequest(cctx.cacheId(),
futId,
http://git-wip-us.apache.org/repos/asf/ignite/blob/7504880a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccRemoteTxOnNearNodeStartTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccRemoteTxOnNearNodeStartTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccRemoteTxOnNearNodeStartTest.java
new file mode 100644
index 0000000..ee23e38
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccRemoteTxOnNearNodeStartTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.ArrayList;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+
+/** */
+public class CacheMvccRemoteTxOnNearNodeStartTest extends CacheMvccAbstractTest {
+ /** {@inheritDoc} */
+ @Override protected CacheMode cacheMode() {
+ return CacheMode.PARTITIONED;
+ }
+
+ /**
+ * Ensures that remote transaction on near node is started
+ * when first request is sent to OWNING partition and second to MOVING partition.
+ * @throws Exception if failed.
+ */
+ public void testRemoteTxOnNearNodeIsStartedIfPartitionIsMoving() throws Exception {
+ startGridsMultiThreaded(3);
+
+ IgniteCache<Object, Object> cache = grid(0).getOrCreateCache(new CacheConfiguration<>(DEFAULT_CACHE_NAME)
+ .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT)
+ .setCacheMode(cacheMode())
+ .setBackups(1)
+ );
+
+ ArrayList<Integer> keys = new ArrayList<>();
+
+ Affinity<Object> aff = grid(0).affinity(DEFAULT_CACHE_NAME);
+
+ for (int i = 0; i < 100; i++) {
+ if (aff.isPrimary(grid(1).localNode(), i) && aff.isBackup(grid(0).localNode(), i)) {
+ keys.add(i);
+ break;
+ }
+ }
+
+ for (int i = 0; i < 100; i++) {
+ if (aff.isPrimary(grid(1).localNode(), i) && aff.isBackup(grid(2).localNode(), i)) {
+ keys.add(i);
+ break;
+ }
+ }
+
+ assert keys.size() == 2;
+
+ stopGrid(2);
+
+ try (Transaction tx = grid(0).transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ cache.putAll(ImmutableMap.of(
+ keys.get(0), 0,
+ keys.get(1), 1)
+ );
+
+ tx.commit();
+ }
+
+ // assert transaction was committed without errors
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7504880a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite.java
index 8585ebe..0146344 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite.java
@@ -26,6 +26,7 @@ import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccOperationChecks
import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccPartitionedCoordinatorFailoverTest;
import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccProcessorLazyStartTest;
import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccProcessorTest;
+import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccRemoteTxOnNearNodeStartTest;
import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccReplicatedCoordinatorFailoverTest;
import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccScanQueryWithConcurrentTransactionTest;
import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccSizeWithConcurrentTransactionTest;
@@ -52,6 +53,8 @@ public class IgniteCacheMvccTestSuite extends TestSuite {
suite.addTestSuite(DataStreamProcessorMvccSelfTest.class);
suite.addTestSuite(CacheMvccOperationChecksTest.class);
+ suite.addTestSuite(CacheMvccRemoteTxOnNearNodeStartTest.class);
+
// Concurrent ops tests.
suite.addTestSuite(CacheMvccIteratorWithConcurrentTransactionTest.class);
suite.addTestSuite(CacheMvccLocalEntriesWithConcurrentTransactionTest.class);