You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/18 21:48:43 UTC
[33/50] incubator-ignite git commit: ignite-484-1 - improved retry
ignite-484-1 - improved retry
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/94060c9e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/94060c9e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/94060c9e
Branch: refs/heads/ignite-980
Commit: 94060c9ef41161c7262a28044ddb176f86814b01
Parents: 10febf2
Author: S.Vladykin <sv...@gridgain.com>
Authored: Wed Jun 17 19:46:42 2015 +0300
Committer: S.Vladykin <sv...@gridgain.com>
Committed: Wed Jun 17 19:46:42 2015 +0300
----------------------------------------------------------------------
.../query/h2/twostep/GridMapQueryExecutor.java | 26 ++++--
.../h2/twostep/GridReduceQueryExecutor.java | 86 ++++++++++++++------
...lientQueryReplicatedNodeRestartSelfTest.java | 50 ++++++++++--
3 files changed, 125 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/94060c9e/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index aaf64ee..2503a87 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -48,6 +48,7 @@ import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import static org.apache.ignite.events.EventType.*;
+import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.*;
import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*;
import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2ValueMessageFactory.*;
@@ -230,6 +231,15 @@ public class GridMapQueryExecutor {
}
/**
+ * @param cctx Cache context.
+ * @param p Partition ID.
+ * @return Partition.
+ */
+ private GridDhtLocalPartition partition(GridCacheContext<?, ?> cctx, int p) {
+ return cctx.topology().localPartition(p, NONE, false);
+ }
+
+ /**
* @param cacheNames Cache names.
* @param topVer Topology version.
* @param explicitParts Explicit partitions list.
@@ -263,10 +273,12 @@ public class GridMapQueryExecutor {
GridReservable r = reservations.get(grpKey);
if (explicitParts == null && r != null) { // Try to reserve group partition if any and no explicits.
- if (!r.reserve())
- return false; // We need explicit partitions here -> retry.
+ if (r != ReplicatedReservation.INSTANCE) {
+ if (!r.reserve())
+ return false; // We need explicit partitions here -> retry.
- reserved.add(r);
+ reserved.add(r);
+ }
}
else { // Try to reserve partitions one by one.
int partsCnt = cctx.affinity().partitions();
@@ -274,7 +286,7 @@ public class GridMapQueryExecutor {
if (cctx.isReplicated()) { // Check all the partitions are in owning state for replicated cache.
if (r == null) { // Check only once.
for (int p = 0; p < partsCnt; p++) {
- GridDhtLocalPartition part = cctx.topology().localPartition(p, topVer, false);
+ GridDhtLocalPartition part = partition(cctx, p);
// We don't need to reserve partitions because they will not be evicted in replicated caches.
if (part == null || part.state() != OWNING)
@@ -290,7 +302,7 @@ public class GridMapQueryExecutor {
partIds = cctx.affinity().primaryPartitions(ctx.localNodeId(), topVer);
for (int partId : partIds) {
- GridDhtLocalPartition part = cctx.topology().localPartition(partId, topVer, false);
+ GridDhtLocalPartition part = partition(cctx, partId);
if (part == null || part.state() != OWNING || !part.reserve())
return false;
@@ -806,12 +818,12 @@ public class GridMapQueryExecutor {
/** {@inheritDoc} */
@Override public boolean reserve() {
- return true;
+ throw new IllegalStateException();
}
/** {@inheritDoc} */
@Override public void release() {
- // No-op.
+ throw new IllegalStateException();
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/94060c9e/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index c570d24..6635dde 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -335,7 +335,7 @@ public class GridReduceQueryExecutor {
) {
String space = cctx.name();
- Set<ClusterNode> nodes = new HashSet<>(ctx.discovery().cacheAffinityNodes(space, topVer));
+ Set<ClusterNode> nodes = new HashSet<>(dataNodes(space, topVer));
if (F.isEmpty(nodes))
throw new CacheException("No data nodes found for cache: " + space);
@@ -351,7 +351,7 @@ public class GridReduceQueryExecutor {
throw new CacheException("Queries running on replicated cache should not contain JOINs " +
"with partitioned tables.");
- Collection<ClusterNode> extraNodes = ctx.discovery().cacheAffinityNodes(extraSpace, topVer);
+ Collection<ClusterNode> extraNodes = dataNodes(extraSpace, topVer);
if (F.isEmpty(extraNodes))
throw new CacheException("No data nodes found for cache: " + extraSpace);
@@ -398,7 +398,18 @@ public class GridReduceQueryExecutor {
* @return Cursor.
*/
public Iterator<List<?>> query(GridCacheContext<?,?> cctx, GridCacheTwoStepQuery qry, boolean keepPortable) {
- for (;;) {
+ for (int attempt = 0;; attempt++) {
+ if (attempt != 0) {
+ try {
+ Thread.sleep(attempt * 10); // Wait for exchange.
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+
+ throw new CacheException("Query was interrupted.", e);
+ }
+ }
+
long qryReqId = reqIdGen.incrementAndGet();
QueryRun r = new QueryRun();
@@ -422,9 +433,9 @@ public class GridReduceQueryExecutor {
if (isPreloadingActive(cctx, extraSpaces)) {
if (cctx.isReplicated())
- nodes = replicatedDataNodes(cctx, extraSpaces);
+ nodes = replicatedUnstableDataNodes(cctx, extraSpaces);
else {
- partsMap = partitionLocations(cctx, extraSpaces);
+ partsMap = partitionedUnstableDataNodes(cctx, extraSpaces);
nodes = partsMap == null ? null : partsMap.keySet();
}
@@ -538,9 +549,6 @@ public class GridReduceQueryExecutor {
catch (IgniteCheckedException | RuntimeException e) {
U.closeQuiet(r.conn);
- if (e instanceof CacheException)
- throw (CacheException)e;
-
throw new CacheException("Failed to run reduce query locally.", e);
}
finally {
@@ -559,10 +567,14 @@ public class GridReduceQueryExecutor {
* @param extraSpaces Extra spaces.
* @return Collection of all data nodes owning all the caches or {@code null} for retry.
*/
- private Collection<ClusterNode> replicatedDataNodes(final GridCacheContext<?,?> cctx, List<String> extraSpaces) {
+ private Collection<ClusterNode> replicatedUnstableDataNodes(final GridCacheContext<?,?> cctx,
+ List<String> extraSpaces) {
assert cctx.isReplicated() : cctx.name() + " must be replicated";
- Set<ClusterNode> nodes = owningReplicatedDataNodes(cctx);
+ Set<ClusterNode> nodes = replicatedUnstableDataNodes(cctx);
+
+ if (F.isEmpty(nodes))
+ return null; // Retry.
if (!F.isEmpty(extraSpaces)) {
for (String extraSpace : extraSpaces) {
@@ -575,7 +587,12 @@ public class GridReduceQueryExecutor {
throw new CacheException("Queries running on replicated cache should not contain JOINs " +
"with partitioned tables.");
- nodes.retainAll(owningReplicatedDataNodes(extraCctx));
+ Set<ClusterNode> extraOwners = replicatedUnstableDataNodes(extraCctx);
+
+ if (F.isEmpty(extraOwners))
+ return null; // Retry.
+
+ nodes.retainAll(extraOwners);
if (nodes.isEmpty())
return null; // Retry.
@@ -586,34 +603,43 @@ public class GridReduceQueryExecutor {
}
/**
+ * @param space Cache name.
+ * @param topVer Topology version.
+ * @return Collection of data nodes.
+ */
+ private Collection<ClusterNode> dataNodes(String space, AffinityTopologyVersion topVer) {
+ Collection<ClusterNode> res = ctx.discovery().cacheAffinityNodes(space, topVer);
+
+ return res != null ? res : Collections.<ClusterNode>emptySet();
+ }
+
+ /**
* Collects all the nodes owning all the partitions for the given replicated cache.
*
* @param cctx Cache context.
- * @return Owning nodes.
+ * @return Owning nodes or {@code null} if we can't find owners for some partitions.
*/
- private Set<ClusterNode> owningReplicatedDataNodes(GridCacheContext<?,?> cctx) {
+ private Set<ClusterNode> replicatedUnstableDataNodes(GridCacheContext<?,?> cctx) {
assert cctx.isReplicated() : cctx.name() + " must be replicated";
String space = cctx.name();
- Set<ClusterNode> dataNodes = new HashSet<>(ctx.discovery().cacheAffinityNodes(space, NONE));
+ Set<ClusterNode> dataNodes = new HashSet<>(dataNodes(space, NONE));
if (dataNodes.isEmpty())
throw new CacheException("No data nodes found for cache '" + space + "'");
// Find all the nodes owning all the partitions for replicated cache.
- for (int p = 0, extraParts = cctx.affinity().partitions(); p < extraParts; p++) {
+ for (int p = 0, parts = cctx.affinity().partitions(); p < parts; p++) {
List<ClusterNode> owners = cctx.topology().owners(p);
- if (owners.isEmpty())
- throw new CacheException("No data nodes found for cache '" + space +
- "' for partition " + p);
+ if (F.isEmpty(owners))
+ return null; // Retry.
dataNodes.retainAll(owners);
if (dataNodes.isEmpty())
- throw new CacheException("No data nodes found for cache '" + space +
- "' owning all the partitions.");
+ return null; // Retry.
}
return dataNodes;
@@ -627,7 +653,8 @@ public class GridReduceQueryExecutor {
* @return Partition mapping or {@code null} if we can't calculate it due to repartitioning and we need to retry.
*/
@SuppressWarnings("unchecked")
- private Map<ClusterNode, IntArray> partitionLocations(final GridCacheContext<?,?> cctx, List<String> extraSpaces) {
+ private Map<ClusterNode, IntArray> partitionedUnstableDataNodes(final GridCacheContext<?,?> cctx,
+ List<String> extraSpaces) {
assert !cctx.isReplicated() && !cctx.isLocal() : cctx.name() + " must be partitioned";
final int partsCnt = cctx.affinity().partitions();
@@ -653,8 +680,12 @@ public class GridReduceQueryExecutor {
for (int p = 0, parts = cctx.affinity().partitions(); p < parts; p++) {
List<ClusterNode> owners = cctx.topology().owners(p);
- if (F.isEmpty(owners))
+ if (F.isEmpty(owners)) {
+ if (!F.isEmpty(dataNodes(cctx.name(), NONE)))
+ return null; // Retry.
+
throw new CacheException("No data nodes found for cache '" + cctx.name() + "' for partition " + p);
+ }
partLocs[p] = new HashSet<>(owners);
}
@@ -671,9 +702,13 @@ public class GridReduceQueryExecutor {
for (int p = 0, parts = extraCctx.affinity().partitions(); p < parts; p++) {
List<ClusterNode> owners = extraCctx.topology().owners(p);
- if (F.isEmpty(owners))
+ if (F.isEmpty(owners)) {
+ if (!F.isEmpty(dataNodes(extraSpace, NONE)))
+ return null; // Retry.
+
throw new CacheException("No data nodes found for cache '" + extraSpace +
"' for partition " + p);
+ }
if (partLocs[p] == null)
partLocs[p] = new HashSet<>(owners);
@@ -693,7 +728,10 @@ public class GridReduceQueryExecutor {
if (!extraCctx.isReplicated())
continue;
- Set<ClusterNode> dataNodes = owningReplicatedDataNodes(extraCctx);
+ Set<ClusterNode> dataNodes = replicatedUnstableDataNodes(extraCctx);
+
+ if (F.isEmpty(dataNodes))
+ return null; // Retry.
for (Set<ClusterNode> partLoc : partLocs) {
partLoc.retainAll(dataNodes);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/94060c9e/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheClientQueryReplicatedNodeRestartSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheClientQueryReplicatedNodeRestartSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheClientQueryReplicatedNodeRestartSelfTest.java
index 23f44c0..3f23005 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheClientQueryReplicatedNodeRestartSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheClientQueryReplicatedNodeRestartSelfTest.java
@@ -64,6 +64,9 @@ public class IgniteCacheClientQueryReplicatedNodeRestartSelfTest extends GridCom
};
/** */
+ private static final List<List<?>> FAKE = new LinkedList<>();
+
+ /** */
private static final int GRID_CNT = 5;
/** */
@@ -191,7 +194,7 @@ public class IgniteCacheClientQueryReplicatedNodeRestartSelfTest extends GridCom
public void testRestarts() throws Exception {
int duration = 90 * 1000;
int qryThreadNum = 5;
- int restartThreadsNum = 2; // 2 of 4 data nodes
+ int restartThreadsNum = 3; // 3 of 4 data nodes
final int nodeLifeTime = 2 * 1000;
final int logFreq = 10;
@@ -212,13 +215,32 @@ public class IgniteCacheClientQueryReplicatedNodeRestartSelfTest extends GridCom
final AtomicInteger qryCnt = new AtomicInteger();
final AtomicBoolean qrysDone = new AtomicBoolean();
+ final List<Integer> cacheSize = new ArrayList<>(4);
+
for (int i = 0; i < GRID_CNT - 1; i++) {
- for (String cacheName : F.asList("co", "pr", "pe", "pu"))
- assertClient(grid(i).cache(cacheName), false);
+ int j = 0;
+
+ for (String cacheName : F.asList("co", "pr", "pe", "pu")) {
+ IgniteCache<?,?> cache = grid(i).cache(cacheName);
+
+ assertClient(cache, false);
+
+ if (i == 0)
+ cacheSize.add(cache.size());
+ else
+ assertEquals(cacheSize.get(j++).intValue(), cache.size());
+ }
}
- for (String cacheName : F.asList("co", "pr", "pe", "pu"))
- assertClient(grid(GRID_CNT - 1).cache(cacheName), true);
+ int j = 0;
+
+ for (String cacheName : F.asList("co", "pr", "pe", "pu")) {
+ IgniteCache<?,?> cache = grid(GRID_CNT - 1).cache(cacheName);
+
+ assertClient(cache, true);
+
+ assertEquals(cacheSize.get(j++).intValue(), cache.size());
+ }
final IgniteCache<?,?> clientCache = grid(GRID_CNT - 1).cache("pu");
@@ -234,8 +256,10 @@ public class IgniteCacheClientQueryReplicatedNodeRestartSelfTest extends GridCom
if (smallPageSize)
qry.setPageSize(3);
+ List<List<?>> res;
+
try {
- assertEquals(pRes, clientCache.query(qry).getAll());
+ res = clientCache.query(qry).getAll();
}
catch (CacheException e) {
assertTrue("On large page size must retry.", smallPageSize);
@@ -259,6 +283,20 @@ public class IgniteCacheClientQueryReplicatedNodeRestartSelfTest extends GridCom
fail("Must fail inside of GridResultPage.fetchNextPage or subclass.");
}
+
+ res = FAKE;
+ }
+
+ if (res != FAKE && !res.equals(pRes)) {
+ int j = 0;
+
+ // Check for data loss.
+ for (String cacheName : F.asList("co", "pr", "pe", "pu")) {
+ assertEquals(cacheName, cacheSize.get(j++).intValue(),
+ grid(GRID_CNT - 1).cache(cacheName).size());
+ }
+
+ assertEquals(pRes, res); // Fail with nice message.
}
int c = qryCnt.incrementAndGet();