You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/11/21 12:10:16 UTC
[22/47] ignite git commit: ignite-6858 Fail query if thread has is
cache lock and exchange is in progress
ignite-6858 Fail query if thread has is cache lock and exchange is in progress
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/caad1e91
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/caad1e91
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/caad1e91
Branch: refs/heads/ignite-zk
Commit: caad1e912192d3da0d74395db6cc3625fe2eb804
Parents: 52b46c3
Author: Aleksei Scherbakov <al...@gmail.com>
Authored: Thu Nov 16 12:45:11 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Nov 16 12:45:11 2017 +0300
----------------------------------------------------------------------
.../cache/distributed/near/GridNearTxLocal.java | 7 +-
.../processors/query/h2/IgniteH2Indexing.java | 17 +++++
.../h2/twostep/GridReduceQueryExecutor.java | 11 ++-
.../IgniteCacheQueryNodeRestartSelfTest2.java | 75 +++++++++++++++++---
.../IgniteCacheQueryNodeRestartTxSelfTest.java | 36 ++++++++++
.../IgniteCacheQuerySelfTestSuite2.java | 2 +
6 files changed, 133 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/caad1e91/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index 085f0b7..a3fddaf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -4122,14 +4122,13 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
/** {@inheritDoc} */
@Override public void onTimeout() {
if (state(MARKED_ROLLBACK, true) || (state() == MARKED_ROLLBACK)) {
- if (log.isDebugEnabled())
- log.debug("Will rollback tx on timeout: " + this);
-
cctx.kernalContext().closure().runLocalSafe(new Runnable() {
@Override public void run() {
- // Note: if rollback asynchonously on timeout should not clear thread map
+ // Note: if rollback asynchronously on timeout should not clear thread map
// since thread started tx still should be able to see this tx.
rollbackNearTxLocalAsync(true);
+
+ U.warn(log, "Transaction was rolled back because the timeout is reached: " + GridNearTxLocal.this);
}
});
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/caad1e91/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 450ee20..333a958 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -71,6 +71,7 @@ import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
import org.apache.ignite.internal.processors.cache.query.CacheQueryPartitionInfo;
@@ -2477,6 +2478,22 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
/**
+ * @param readyVer Ready topology version.
+ *
+ * @return {@code true} If pending distributed exchange exists because server topology is changed.
+ */
+ public boolean serverTopologyChanged(AffinityTopologyVersion readyVer) {
+ GridDhtPartitionsExchangeFuture fut = ctx.cache().context().exchange().lastTopologyFuture();
+
+ if (fut.isDone())
+ return false;
+
+ AffinityTopologyVersion initVer = fut.initialVersion();
+
+ return initVer.compareTo(readyVer) > 0 && !CU.clientNode(fut.firstEvent().node());
+ }
+
+ /**
* @param topVer Topology version.
* @throws IgniteCheckedException If failed.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/caad1e91/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index f85cd94..8e994aa 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -90,6 +90,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiClosure;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.transactions.TransactionException;
import org.h2.command.ddl.CreateTableData;
import org.h2.engine.Session;
import org.h2.index.Index;
@@ -560,9 +561,15 @@ public class GridReduceQueryExecutor {
AffinityTopologyVersion topVer = h2.readyTopologyVersion();
+ // Check if topology is changed while retrying on locked topology.
+ if (h2.serverTopologyChanged(topVer) && ctx.cache().context().lockedTopologyVersion(null) != null) {
+ throw new CacheException(new TransactionException("Server topology is changed during query " +
+ "execution inside a transaction. It's recommended to rollback and retry transaction."));
+ }
+
List<Integer> cacheIds = qry.cacheIds();
- Collection<ClusterNode> nodes = null;
+ Collection<ClusterNode> nodes;
// Explicit partition mapping for unstable topology.
Map<ClusterNode, IntArray> partsMap = null;
@@ -1737,4 +1744,4 @@ public class GridReduceQueryExecutor {
return qryMap;
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/caad1e91/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest2.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest2.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest2.java
index 627b3eb..bda503e 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest2.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest2.java
@@ -32,20 +32,25 @@ import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cache.query.QueryCancelledException;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.apache.ignite.cluster.ClusterTopologyException;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.util.GridRandom;
import org.apache.ignite.internal.util.typedef.CAX;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.TransactionException;
+import org.apache.ignite.transactions.TransactionTimeoutException;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
@@ -224,7 +229,7 @@ public class IgniteCacheQueryNodeRestartSelfTest2 extends GridCommonAbstractTest
IgniteInternalFuture<?> fut1 = multithreadedAsync(new CAX() {
@Override public void applyx() throws IgniteCheckedException {
- GridRandom rnd = new GridRandom();
+ final GridRandom rnd = new GridRandom();
while (!qrysDone.get()) {
int g;
@@ -235,28 +240,43 @@ public class IgniteCacheQueryNodeRestartSelfTest2 extends GridCommonAbstractTest
while (!locks.compareAndSet(g, 0, 1));
try {
+ final IgniteEx grid = grid(g);
+
if (rnd.nextBoolean()) { // Partitioned query.
- IgniteCache<?,?> cache = grid(g).cache("pu");
+ final IgniteCache<?,?> cache = grid.cache("pu");
- SqlFieldsQuery qry = new SqlFieldsQuery(PARTITIONED_QRY);
+ final SqlFieldsQuery qry = new SqlFieldsQuery(PARTITIONED_QRY);
boolean smallPageSize = rnd.nextBoolean();
if (smallPageSize)
qry.setPageSize(3);
+ final IgniteCache<Integer, Company> co = grid.cache("co");
+
try {
- assertEquals(pRes, cache.query(qry).getAll());
+ runQuery(grid, new Runnable() {
+ @Override public void run() {
+ if (rnd.nextBoolean())
+ co.get(rnd.nextInt(COMPANY_CNT)); // Get lock run test with open transaction.
+
+ assertEquals(pRes, cache.query(qry).getAll());
+ }
+ });
} catch (CacheException e) {
// Interruptions are expected here.
- if (e.getCause() instanceof IgniteInterruptedCheckedException)
+ if (e.getCause() instanceof IgniteInterruptedCheckedException ||
+ e.getCause() instanceof InterruptedException ||
+ e.getCause() instanceof ClusterTopologyException ||
+ e.getCause() instanceof TransactionTimeoutException ||
+ e.getCause() instanceof TransactionException)
continue;
if (e.getCause() instanceof QueryCancelledException)
fail("Retry is expected");
if (!smallPageSize)
- e.printStackTrace();
+ U.error(grid.log(), "On large page size must retry.", e);
assertTrue("On large page size must retry.", smallPageSize);
@@ -286,13 +306,13 @@ public class IgniteCacheQueryNodeRestartSelfTest2 extends GridCommonAbstractTest
continue;
if (!failedOnRemoteFetch) {
- e.printStackTrace();
+ U.error(grid.log(), "Must fail inside of GridResultPage.fetchNextPage or subclass.", e);
fail("Must fail inside of GridResultPage.fetchNextPage or subclass.");
}
}
} else { // Replicated query.
- IgniteCache<?, ?> cache = grid(g).cache("co");
+ IgniteCache<?, ?> cache = grid.cache("co");
assertEquals(rRes, cache.query(new SqlFieldsQuery(REPLICATED_QRY)).getAll());
}
@@ -358,7 +378,14 @@ public class IgniteCacheQueryNodeRestartSelfTest2 extends GridCommonAbstractTest
restartsDone.set(true);
- fut2.get();
+ try {
+ fut2.get(20_000);
+ }
+ catch (IgniteFutureTimeoutCheckedException e) {
+ U.dumpThreads(log);
+
+ fail("Stopping restarts timeout.");
+ }
info("Restarts stopped.");
@@ -380,12 +407,26 @@ public class IgniteCacheQueryNodeRestartSelfTest2 extends GridCommonAbstractTest
}
/**
+ * Run query closure.
+ *
+ * @param grid Grid.
+ * @param qryRunnable Query runnable.
+ */
+ protected void runQuery(IgniteEx grid, Runnable qryRunnable) {
+ qryRunnable.run();
+ }
+
+ /**
*
*/
private static class Person implements Serializable {
+ /** */
@QuerySqlField(index = true)
int id;
+ /**
+ * @param id Person ID.
+ */
Person(int id) {
this.id = id;
}
@@ -395,12 +436,18 @@ public class IgniteCacheQueryNodeRestartSelfTest2 extends GridCommonAbstractTest
*
*/
private static class Purchase implements Serializable {
+ /** */
@QuerySqlField(index = true)
int personId;
+ /** */
@QuerySqlField(index = true)
int productId;
+ /**
+ * @param personId Person ID.
+ * @param productId Product ID.
+ */
Purchase(int personId, int productId) {
this.personId = personId;
this.productId = productId;
@@ -411,9 +458,13 @@ public class IgniteCacheQueryNodeRestartSelfTest2 extends GridCommonAbstractTest
*
*/
private static class Company implements Serializable {
+ /** */
@QuerySqlField(index = true)
int id;
+ /**
+ * @param id ID.
+ */
Company(int id) {
this.id = id;
}
@@ -423,12 +474,18 @@ public class IgniteCacheQueryNodeRestartSelfTest2 extends GridCommonAbstractTest
*
*/
private static class Product implements Serializable {
+ /** */
@QuerySqlField(index = true)
int id;
+ /** */
@QuerySqlField(index = true)
int companyId;
+ /**
+ * @param id ID.
+ * @param companyId Company ID.
+ */
Product(int id, int companyId) {
this.id = id;
this.companyId = companyId;
http://git-wip-us.apache.org/repos/asf/ignite/blob/caad1e91/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartTxSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartTxSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartTxSelfTest.java
new file mode 100644
index 0000000..ae06c42
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartTxSelfTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.near;
+
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.transactions.Transaction;
+
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+
+/**
+ * Test for distributed queries with node restarts inside transactions.
+ */
+public class IgniteCacheQueryNodeRestartTxSelfTest extends IgniteCacheQueryNodeRestartSelfTest2 {
+ /** {@inheritDoc} */
+ @Override protected void runQuery(IgniteEx grid, Runnable qryRunnable) {
+ try(Transaction tx = grid.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ qryRunnable.run();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/caad1e91/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite2.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite2.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite2.java
index 91e4478..abe06ec 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite2.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite2.java
@@ -31,6 +31,7 @@ import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheQ
import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheQueryNodeRestartDistributedJoinSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheQueryNodeRestartSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheQueryNodeRestartSelfTest2;
+import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheQueryNodeRestartTxSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheQueryStopOnCancelOrTimeoutDistributedJoinSelfTest;
import org.apache.ignite.internal.processors.cache.index.DynamicColumnsConcurrentAtomicPartitionedSelfTest;
import org.apache.ignite.internal.processors.cache.index.DynamicColumnsConcurrentAtomicReplicatedSelfTest;
@@ -85,6 +86,7 @@ public class IgniteCacheQuerySelfTestSuite2 extends TestSuite {
suite.addTestSuite(IgniteCacheQueryNodeFailTest.class);
suite.addTestSuite(IgniteCacheQueryNodeRestartSelfTest.class);
suite.addTestSuite(IgniteCacheQueryNodeRestartSelfTest2.class);
+ suite.addTestSuite(IgniteCacheQueryNodeRestartTxSelfTest.class);
suite.addTestSuite(IgniteCacheSqlQueryMultiThreadedSelfTest.class);
suite.addTestSuite(IgniteCachePartitionedQueryMultiThreadedSelfTest.class);
suite.addTestSuite(CacheScanPartitionQueryFallbackSelfTest.class);