You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2015/06/18 08:54:18 UTC
[11/50] incubator-ignite git commit: ignite-484-1 - collocated
partitions + replicated caches
ignite-484-1 - collocated partitions + replicated caches
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/d389ada8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/d389ada8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/d389ada8
Branch: refs/heads/ignite-sprint-6
Commit: d389ada8b9546994d3dec5da00349f831f81fcb0
Parents: 357b4c0
Author: S.Vladykin <sv...@gridgain.com>
Authored: Wed Jun 3 22:55:16 2015 +0300
Committer: S.Vladykin <sv...@gridgain.com>
Committed: Wed Jun 3 22:55:16 2015 +0300
----------------------------------------------------------------------
.../processors/query/GridQueryIndexing.java | 3 +-
.../h2/twostep/messages/GridQueryRequest.java | 13 +-
.../processors/query/h2/IgniteH2Indexing.java | 12 +-
.../query/h2/twostep/GridMapQueryExecutor.java | 15 +-
.../h2/twostep/GridReduceQueryExecutor.java | 248 +++++++++++++------
.../IgniteCacheQueryNodeRestartSelfTest.java | 10 +-
6 files changed, 206 insertions(+), 95 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d389ada8/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
index de35201..98c2af7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
@@ -243,6 +243,5 @@ public interface GridQueryIndexing {
* @param parts Partitions.
* @return Backup filter.
*/
- public IndexingQueryFilter backupFilter(List<String> caches, AffinityTopologyVersion topVer,
- List<int[]> parts);
+ public IndexingQueryFilter backupFilter(List<String> caches, AffinityTopologyVersion topVer, int[] parts);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d389ada8/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java
index 99ef094..6465bbc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java
@@ -57,8 +57,7 @@ public class GridQueryRequest implements Message {
private List<String> extraSpaces;
/** */
- @GridDirectCollection(int[].class)
- private List<int[]> parts;
+ private int[] parts;
/**
* Default constructor.
@@ -83,7 +82,7 @@ public class GridQueryRequest implements Message {
Collection<GridCacheSqlQuery> qrys,
AffinityTopologyVersion topVer,
List<String> extraSpaces,
- List<int[]> parts) {
+ int[] parts) {
this.reqId = reqId;
this.pageSize = pageSize;
this.space = space;
@@ -110,14 +109,14 @@ public class GridQueryRequest implements Message {
/**
* @return All the needed partitions for {@link #space()} and {@link #extraSpaces()}.
*/
- public List<int[]> partitions() {
+ public int[] partitions() {
return parts;
}
/**
* @param parts All the needed partitions for {@link #space()} and {@link #extraSpaces()}.
*/
- public void partitions(List<int[]> parts) {
+ public void partitions(int[] parts) {
this.parts = parts;
}
@@ -217,7 +216,7 @@ public class GridQueryRequest implements Message {
writer.incrementState();
case 6:
- if (!writer.writeCollection("partitions", parts, MessageCollectionItemType.INT_ARR))
+ if (!writer.writeIntArray("partitions", parts))
return false;
writer.incrementState();
@@ -283,7 +282,7 @@ public class GridQueryRequest implements Message {
reader.incrementState();
case 6:
- parts = reader.readCollection("partitions", MessageCollectionItemType.INT_ARR);
+ parts = reader.readIntArray("partitions");
if (!reader.isLastRead())
return false;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d389ada8/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index ffedfb3..da497a2 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -1357,7 +1357,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
@Override public IndexingQueryFilter backupFilter(
@Nullable final List<String> caches,
@Nullable final AffinityTopologyVersion topVer,
- @Nullable final List<int[]> parts
+ @Nullable final int[] parts
) {
final AffinityTopologyVersion topVer0 = topVer != null ? topVer : AffinityTopologyVersion.NONE;
@@ -1371,16 +1371,12 @@ public class IgniteH2Indexing implements GridQueryIndexing {
final GridCacheAffinityManager aff = cache.context().affinity();
if (parts != null) {
- int idx = caches.indexOf(spaceName);
-
- final int[] parts0 = parts.get(idx);
-
- if (parts0.length < 64) { // Fast scan for small arrays.
+ if (parts.length < 64) { // Fast scan for small arrays.
return new IgniteBiPredicate<K,V>() {
@Override public boolean apply(K k, V v) {
int p = aff.partition(k);
- for (int p0 : parts0) {
+ for (int p0 : parts) {
if (p0 == p)
return true;
@@ -1397,7 +1393,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
@Override public boolean apply(K k, V v) {
int p = aff.partition(k);
- return Arrays.binarySearch(parts0, p) >= 0;
+ return Arrays.binarySearch(parts, p) >= 0;
}
};
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d389ada8/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index 9d9060a..ede0e2e 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -222,21 +222,24 @@ public class GridMapQueryExecutor {
* @return {@code true} If all the needed partitions successfully reserved.
* @throws IgniteCheckedException If failed.
*/
- private boolean reservePartitions(Collection<String> cacheNames, AffinityTopologyVersion topVer, List<int[]> parts,
+ private boolean reservePartitions(Collection<String> cacheNames, AffinityTopologyVersion topVer, int[] parts,
List<GridDhtLocalPartition> reserved) throws IgniteCheckedException {
- assert parts == null || parts.size() == cacheNames.size();
-
- int i = 0;
+ Collection<Integer> partIds = parts == null ? null : wrap(parts);
for (String cacheName : cacheNames) {
GridCacheContext<?,?> cctx = cacheContext(cacheName, topVer);
- Collection<Integer> partIds = parts != null ? wrap(parts.get(i++)) :
- cctx.affinity().primaryPartitions(ctx.localNodeId(), topVer);
+ int partsCnt = cctx.affinity().partitions();
+
+ if (parts == null)
+ partIds = cctx.affinity().primaryPartitions(ctx.localNodeId(), topVer);
for (int partId : partIds) {
GridDhtLocalPartition part = cctx.topology().localPartition(partId, topVer, false);
+ if (partId >= partsCnt)
+ break; // We can have more partitions because `parts` array is shared for all caches.
+
if (part != null) {
// Await for owning state.
part.owningFuture().get();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d389ada8/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index c445844..87ac2f4 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -56,6 +56,8 @@ import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
+import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.*;
+
/**
* Reduce query executor.
*/
@@ -279,35 +281,85 @@ public class GridReduceQueryExecutor {
}
/**
- * @param set Set.
- * @return Array.
+ * @param readyTop Latest ready topology.
+ * @return {@code true} If preloading is active.
*/
- private static int[] unbox(Set<Integer> set) {
- if (set == null)
- return null;
+ private boolean isPreloadingActive(AffinityTopologyVersion readyTop) {
+ AffinityTopologyVersion freshTop = ctx.discovery().topologyVersionEx();
- int[] arr = new int[set.size()];
+ int res = readyTop.compareTo(freshTop);
- int i = 0;
+ assert res <= 0 : readyTop + " " + freshTop;
- for (int x : set)
- arr[i++] = x;
+ return res < 0;
+ }
- return arr;
+ /**
+ * @param name Cache name.
+ * @return Cache context.
+ */
+ private GridCacheContext<?,?> cacheContext(String name) {
+ return ctx.cache().internalCache(name).context();
}
/**
- * @param readyTop Latest ready topology.
- * @return {@code true} If preloading is active.
+ * @param topVer Topology version.
+ * @param cctx Cache context for main space.
+ * @param extraSpaces Extra spaces.
+ * @return Data nodes.
*/
- private boolean isPreloadingActive(AffinityTopologyVersion readyTop) {
- AffinityTopologyVersion freshTop = ctx.discovery().topologyVersionEx();
+ private Collection<ClusterNode> dataNodes(
+ AffinityTopologyVersion topVer,
+ final GridCacheContext<?,?> cctx,
+ List<String> extraSpaces
+ ) {
+ String space = cctx.name();
- int res = readyTop.compareTo(freshTop);
+ Set<ClusterNode> nodes = new HashSet<>(ctx.discovery().cacheAffinityNodes(space, topVer));
- assert res <= 0 : readyTop + " " + freshTop;
+ if (F.isEmpty(nodes))
+ throw new CacheException("No data nodes found for cache: " + space);
- return res < 0;
+ if (!F.isEmpty(extraSpaces)) {
+ for (String extraSpace : extraSpaces) {
+ GridCacheContext<?,?> extraCctx = cacheContext(extraSpace);
+
+ if (extraCctx.isLocal())
+ continue;
+
+ if (cctx.isReplicated() && !extraCctx.isReplicated())
+ throw new CacheException("Queries running on replicated cache should not contain JOINs " +
+ "with partitioned tables.");
+
+ Collection<ClusterNode> extraNodes = ctx.discovery().cacheAffinityNodes(extraSpace, topVer);
+
+ if (F.isEmpty(extraNodes))
+ throw new CacheException("No data nodes found for cache: " + extraSpace);
+
+ if (cctx.isReplicated() && extraCctx.isReplicated()) {
+ nodes.retainAll(extraNodes);
+
+ if (nodes.isEmpty() && !isPreloadingActive(topVer))
+ throw new CacheException("Caches '" + cctx.name() + "' and '" + extraSpace +
+ "' have distinct set of data nodes.");
+ }
+ else if (!cctx.isReplicated() && extraCctx.isReplicated()) {
+ if (!extraNodes.containsAll(nodes) && !isPreloadingActive(topVer))
+ throw new CacheException("Caches '" + cctx.name() + "' and '" + extraSpace +
+ "' have distinct set of data nodes.");
+ }
+ else if (!cctx.isReplicated() && !extraCctx.isReplicated()) {
+ if ((extraNodes.size() != nodes.size() || !nodes.containsAll(extraNodes)) &&
+ !isPreloadingActive(topVer))
+ throw new CacheException("Caches '" + cctx.name() + "' and '" + extraSpace +
+ "' have distinct set of data nodes.");
+ }
+ else
+ throw new IllegalStateException();
+ }
+ }
+
+ return nodes;
}
/**
@@ -331,15 +383,12 @@ public class GridReduceQueryExecutor {
AffinityTopologyVersion topVer = h2.readyTopologyVersion();
- Collection<ClusterNode> nodes = ctx.discovery().cacheAffinityNodes(space, topVer);
-
- if (F.isEmpty(nodes))
- throw new CacheException("No data nodes found for cache: " + space);
-
List<String> extraSpaces = extraSpaces(space, qry.spaces());
- // Explicit partition mapping for unstable topology: {nodeId -> {cacheName -> {parts}}}
- Map<ClusterNode, Map<String, Set<Integer>>> gridPartsMap = null;
+ Collection<ClusterNode> nodes = dataNodes(topVer, cctx, extraSpaces);
+
+ // Explicit partition mapping for unstable topology.
+ Map<ClusterNode, IntArray> gridPartsMap = null;
if (cctx.isReplicated() || qry.explain()) {
assert qry.explain() || !nodes.contains(ctx.cluster().get().localNode()) : "We must be on a client node.";
@@ -348,14 +397,10 @@ public class GridReduceQueryExecutor {
nodes = Collections.singleton(F.rand(nodes));
}
else if (isPreloadingActive(topVer)) {
- gridPartsMap = new HashMap<>(nodes.size(), 1f);
-
- collectPartitionOwners(gridPartsMap, cctx);
+ gridPartsMap = partitionLocations(cctx, extraSpaces);
- if (extraSpaces != null) {
- for (String extraSpace : extraSpaces)
- collectPartitionOwners(gridPartsMap, ctx.cache().internalCache(extraSpace).context());
- }
+ if (gridPartsMap == null)
+ continue; // Retry.
nodes = gridPartsMap.keySet();
}
@@ -462,46 +507,114 @@ public class GridReduceQueryExecutor {
}
}
- /**
- * Collects actual partition owners for the cache context int the given map.
- *
- * @param gridPartsMap Target map.
- * @param cctx Cache context.
- */
- private void collectPartitionOwners(
- Map<ClusterNode,Map<String,Set<Integer>>> gridPartsMap,
- GridCacheContext<?,?> cctx
- ) {
- int partsCnt = cctx.affinity().partitions();
+ @SuppressWarnings("unchecked")
+ private Map<ClusterNode, IntArray> partitionLocations(final GridCacheContext<?,?> cctx, List<String> extraSpaces) {
+ assert !cctx.isReplicated() && !cctx.isLocal() : cctx.name() + " must be partitioned";
+
+ int maxParts = cctx.affinity().partitions();
+
+ if (extraSpaces != null) { // Find max number of partitions for partitioned caches.
+ for (String extraSpace : extraSpaces) {
+ GridCacheContext<?,?> extraCctx = cacheContext(extraSpace);
+
+ if (extraCctx.isReplicated() || extraCctx.isLocal())
+ continue;
+
+ int parts = extraCctx.affinity().partitions();
+
+ if (parts > maxParts)
+ maxParts = parts;
+ }
+ }
+
+ Set<ClusterNode>[] partLocs = new Set[maxParts];
- for (int p = 0; p < partsCnt; p++) {
- // We don't care about exact topology version here, we just need to get all the needed partition
- // owners in actual state.
+ // Fill partition locations for main cache.
+ for (int p = 0, parts = cctx.affinity().partitions(); p < parts; p++) {
List<ClusterNode> owners = cctx.topology().owners(p);
if (F.isEmpty(owners))
- continue; // All primary and backup nodes are dead now for this partition. We sorrow.
+ throw new CacheException("No data nodes found for cache '" + cctx.name() + "' for partition " + p);
+
+ partLocs[p] = new HashSet<>(owners);
+ }
+
+ if (extraSpaces != null) {
+ // Find owner intersections for each participating partitioned cache partition.
+ // We need this for logical collocation between different partitioned caches with the same affinity.
+ for (String extraSpace : extraSpaces) {
+ GridCacheContext<?,?> extraCctx = cacheContext(extraSpace);
- ClusterNode owner = F.rand(owners);
+ if (extraCctx.isReplicated() || extraCctx.isLocal())
+ continue;
+
+ for (int p = 0, parts = extraCctx.affinity().partitions(); p < parts; p++) {
+ List<ClusterNode> owners = extraCctx.topology().owners(p);
- Map<String, Set<Integer>> nodePartsMap = gridPartsMap.get(owner);
+ if (F.isEmpty(owners))
+ throw new CacheException("No data nodes found for cache '" + extraSpace +
+ "' for partition " + p);
- if (nodePartsMap == null) {
- nodePartsMap = new HashMap<>();
+ if (partLocs[p] == null)
+ partLocs[p] = new HashSet<>(owners);
+ else {
+ partLocs[p].retainAll(owners); // Intersection of owners.
- gridPartsMap.put(owner, nodePartsMap);
+ if (partLocs[p].isEmpty())
+ return null; // Intersection is empty -> retry.
+ }
+ }
}
- Set<Integer> parts = nodePartsMap.get(cctx.name());
+ // Filter nodes where not all the replicated caches loaded.
+ for (String extraSpace : extraSpaces) {
+ GridCacheContext<?,?> extraCctx = cacheContext(extraSpace);
- if (parts == null) {
- parts = new TreeSet<>(); // We need them sorted.
+ if (!extraCctx.isReplicated())
+ continue;
- nodePartsMap.put(cctx.name(), parts);
+ Set<ClusterNode> dataNodes = new HashSet<>(ctx.discovery().cacheAffinityNodes(extraSpace, NONE));
+
+ if (dataNodes.isEmpty())
+ throw new CacheException("No data nodes found for cache '" + extraSpace + "'");
+
+ for (int p = 0, extraParts = extraCctx.affinity().partitions(); p < extraParts; p++) {
+ dataNodes.retainAll(extraCctx.topology().owners(p));
+
+ if (dataNodes.isEmpty())
+ throw new CacheException("No data nodes found for cache '" + extraSpace +
+ "' for partition " + p);
+ }
+
+ for (Set<ClusterNode> partLoc : partLocs) {
+ partLoc.retainAll(dataNodes);
+
+ if (partLoc.isEmpty())
+ return null; // Intersection is empty -> retry.
+ }
}
+ }
+
+ // Collect the final partitions mapping.
+ Map<ClusterNode, IntArray> res = new HashMap<>();
+
+ // Here partitions in all IntArray's will be sorted in ascending order, this is important.
+ for (int p = 0; p < partLocs.length; p++) {
+ Set<ClusterNode> pl = partLocs[p];
+
+ assert !F.isEmpty(pl) : pl;
+
+ ClusterNode n = pl.size() == 1 ? F.first(pl) : F.rand(pl);
+
+ IntArray parts = res.get(n);
+
+ if (parts == null)
+ res.put(n, parts = new IntArray());
parts.add(p);
}
+
+ return res;
}
/**
@@ -580,7 +693,7 @@ public class GridReduceQueryExecutor {
private void send(
Collection<ClusterNode> nodes,
Message msg,
- Map<ClusterNode, Map<String, Set<Integer>>> gridPartsMap
+ Map<ClusterNode,IntArray> gridPartsMap
) {
boolean locNodeFound = false;
@@ -595,7 +708,7 @@ public class GridReduceQueryExecutor {
ctx.io().send(node, GridTopic.TOPIC_QUERY, copy(msg, node, gridPartsMap), GridIoPolicy.PUBLIC_POOL);
}
catch (IgniteCheckedException e) {
- U.error(log, "Failed to send message to node: " + node, e);
+ U.warn(log, e.getMessage());
}
}
@@ -609,28 +722,21 @@ public class GridReduceQueryExecutor {
* @param gridPartsMap Partitions map.
* @return Copy of message with partitions set.
*/
- private Message copy(Message msg, ClusterNode node, Map<ClusterNode, Map<String, Set<Integer>>> gridPartsMap) {
+ private Message copy(Message msg, ClusterNode node, Map<ClusterNode,IntArray> gridPartsMap) {
if (gridPartsMap == null)
return msg;
- Map<String,Set<Integer>> nodeParts = gridPartsMap.get(node);
-
- assert nodeParts != null;
+ GridQueryRequest res = new GridQueryRequest((GridQueryRequest)msg);
- GridQueryRequest req = (GridQueryRequest)msg;
+ IntArray parts = gridPartsMap.get(node);
- List<int[]> parts = new ArrayList<>(nodeParts.size());
+ assert parts != null : node;
- parts.add(unbox(nodeParts.get(req.space())));
-
- if (req.extraSpaces() != null) {
- for (String extraSpace : req.extraSpaces())
- parts.add(unbox(nodeParts.get(extraSpace)));
- }
+ int[] partsArr = new int[parts.size()];
- GridQueryRequest res = new GridQueryRequest(req);
+ parts.toArray(partsArr);
- res.partitions(parts);
+ res.partitions(partsArr);
return res;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d389ada8/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest.java
index 128e148..edba352 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest.java
@@ -176,11 +176,19 @@ public class IgniteCacheQueryNodeRestartSelfTest extends GridCacheAbstractSelfTe
Thread.sleep(duration);
+ info("Stopping..");
+
done.set(true);
- fut1.get();
fut2.get();
+ info("Restarts stopped.");
+
+ fut1.get();
+
+ info("Queries stopped.");
+
+
info("Awaiting rebalance events [restartCnt=" + restartCnt.get() + ']');
boolean success = lsnr.awaitEvents(GRID_CNT * 2 * restartCnt.get(), 15000);