You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2015/06/18 08:54:14 UTC
[07/50] incubator-ignite git commit: ignite-484 - explicit partitions
list
ignite-484 - explicit partitions list
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/20ec22b8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/20ec22b8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/20ec22b8
Branch: refs/heads/ignite-sprint-6
Commit: 20ec22b85cddf8dfb5c453758aa83dcb49fbbd32
Parents: a12aadf
Author: S.Vladykin <sv...@gridgain.com>
Authored: Mon May 25 03:20:54 2015 +0300
Committer: S.Vladykin <sv...@gridgain.com>
Committed: Mon May 25 03:20:54 2015 +0300
----------------------------------------------------------------------
.../processors/query/GridQueryIndexing.java | 5 ++-
.../processors/query/GridQueryProcessor.java | 4 +-
.../h2/twostep/messages/GridQueryRequest.java | 36 +++++++++++++--
.../processors/query/h2/IgniteH2Indexing.java | 43 ++++++++++++++++--
.../query/h2/twostep/GridMapQueryExecutor.java | 47 ++++++++++++++++++--
.../h2/twostep/GridReduceQueryExecutor.java | 33 +++++++++++++-
6 files changed, 151 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/20ec22b8/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
index 6b1401d..216773e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
@@ -222,8 +222,11 @@ public interface GridQueryIndexing {
/**
* Returns backup filter.
*
+ * @param caches List of caches.
* @param topVer Topology version.
+ * @param parts Partitions.
* @return Backup filter.
*/
- public IndexingQueryFilter backupFilter(AffinityTopologyVersion topVer);
+ public IndexingQueryFilter backupFilter(List<String> caches, AffinityTopologyVersion topVer,
+ List<int[]> parts);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/20ec22b8/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index afd0386..202ec75 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -603,7 +603,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
throw new CacheException("Failed to find SQL table for type: " + type);
final GridCloseableIterator<IgniteBiTuple<K,V>> i = idx.query(space, sqlQry, F.asList(params), typeDesc,
- idx.backupFilter(null));
+ idx.backupFilter(null, null, null));
if (ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED)) {
ctx.event().record(new CacheQueryExecutedEvent<>(
@@ -670,7 +670,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
String sql = qry.getSql();
Object[] args = qry.getArgs();
- GridQueryFieldsResult res = idx.queryFields(space, sql, F.asList(args), idx.backupFilter(null));
+ GridQueryFieldsResult res = idx.queryFields(space, sql, F.asList(args), idx.backupFilter(null, null, null));
if (ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED)) {
ctx.event().record(new CacheQueryExecutedEvent<>(
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/20ec22b8/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java
index 2d53944..74b4392 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java
@@ -54,7 +54,11 @@ public class GridQueryRequest implements Message {
/** */
@GridDirectCollection(String.class)
- private Collection<String> extraSpaces;
+ private List<String> extraSpaces;
+
+ /** */
+ @GridDirectCollection(int[].class)
+ private List<int[]> parts;
/**
* Default constructor.
@@ -70,6 +74,7 @@ public class GridQueryRequest implements Message {
* @param qrys Queries.
* @param topVer Topology version.
* @param extraSpaces All space names participating in query other than {@code space}.
+ * @param parts Optional partitions for unstable topology.
*/
public GridQueryRequest(
long reqId,
@@ -77,7 +82,8 @@ public class GridQueryRequest implements Message {
String space,
Collection<GridCacheSqlQuery> qrys,
AffinityTopologyVersion topVer,
- List<String> extraSpaces) {
+ List<String> extraSpaces,
+ List<int[]> parts) {
this.reqId = reqId;
this.pageSize = pageSize;
this.space = space;
@@ -85,12 +91,20 @@ public class GridQueryRequest implements Message {
this.qrys = qrys;
this.topVer = topVer;
this.extraSpaces = extraSpaces;
+ this.parts = parts;
+ }
+
+ /**
+ * @return All the needed partitions for {@link #space()} and {@link #extraSpaces()}.
+ */
+ public List<int[]> partitions() {
+ return parts;
}
/**
* @return All extra space names participating in query other than {@link #space()}.
*/
- public Collection<String> extraSpaces() {
+ public List<String> extraSpaces() {
return extraSpaces;
}
@@ -181,6 +195,12 @@ public class GridQueryRequest implements Message {
return false;
writer.incrementState();
+
+ case 6:
+ if (!writer.writeCollection("partitions", parts, MessageCollectionItemType.INT_ARR))
+ return false;
+
+ writer.incrementState();
}
return true;
@@ -241,6 +261,14 @@ public class GridQueryRequest implements Message {
return false;
reader.incrementState();
+
+ case 6:
+ parts = reader.readCollection("partitions", MessageCollectionItemType.INT_ARR);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
}
return true;
@@ -253,6 +281,6 @@ public class GridQueryRequest implements Message {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 6;
+ return 7;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/20ec22b8/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 1cfc314..0ee9876 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -1354,21 +1354,56 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
/** {@inheritDoc} */
- @Override public IndexingQueryFilter backupFilter(AffinityTopologyVersion topVer) {
+ @Override public IndexingQueryFilter backupFilter(
+ @Nullable final List<String> caches,
+ @Nullable final AffinityTopologyVersion topVer,
+ @Nullable final List<int[]> parts
+ ) {
final AffinityTopologyVersion topVer0 = topVer != null ? topVer : AffinityTopologyVersion.NONE;
return new IndexingQueryFilter() {
@Nullable @Override public <K, V> IgniteBiPredicate<K, V> forSpace(String spaceName) {
final GridCacheAdapter<Object, Object> cache = ctx.cache().internalCache(spaceName);
- if (cache.context().isReplicated() || cache.configuration().getBackups() == 0)
+ if (cache.context().isReplicated() || (cache.configuration().getBackups() == 0 && parts == null))
return null;
+ final GridCacheAffinityManager aff = cache.context().affinity();
+
+ if (parts != null) {
+ int idx = caches.indexOf(spaceName);
+
+ final int[] parts0 = parts.get(idx);
+
+ if (parts0.length < 64) {
+ return new IgniteBiPredicate<K,V>() {
+ @Override public boolean apply(K k, V v) {
+ int p = aff.partition(k);
+
+ for (int p0 : parts0) {
+ if (p0 == p)
+ return true;
+ }
+
+ return false;
+ }
+ };
+ }
+
+ return new IgniteBiPredicate<K,V>() {
+ @Override public boolean apply(K k, V v) {
+ int p = aff.partition(k);
+
+ return Arrays.binarySearch(parts0, p) >= 0;
+ }
+ };
+ }
+
final ClusterNode locNode = ctx.discovery().localNode();
return new IgniteBiPredicate<K, V>() {
@Override public boolean apply(K k, V v) {
- return cache.context().affinity().primary(locNode, k, topVer0);
+ return aff.primary(locNode, k, topVer0);
}
};
}
@@ -1392,7 +1427,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
* @return Current topology version.
*/
public AffinityTopologyVersion topologyVersion() {
- return ctx.discovery().topologyVersionEx();
+ return ctx.cache().context().exchange().readyAffinityVersion();
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/20ec22b8/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index 112949f..06bad76 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -217,16 +217,22 @@ public class GridMapQueryExecutor {
/**
* @param cacheNames Cache names.
* @param topVer Topology version.
+ * @param parts Explicit partitions.
* @param reserved Reserved list.
* @return {@code true} If all the needed partitions successfully reserved.
* @throws IgniteCheckedException If failed.
*/
- private boolean reservePartitions(Collection<String> cacheNames, AffinityTopologyVersion topVer,
+ private boolean reservePartitions(Collection<String> cacheNames, AffinityTopologyVersion topVer, List<int[]> parts,
List<GridDhtLocalPartition> reserved) throws IgniteCheckedException {
+ assert parts == null || parts.size() == cacheNames.size();
+
+ int i = 0;
+
for (String cacheName : cacheNames) {
GridCacheContext<?,?> cctx = cacheContext(cacheName, topVer);
- Set<Integer> partIds = cctx.affinity().primaryPartitions(ctx.localNodeId(), topVer);
+ Collection<Integer> partIds = parts != null ? wrap(parts.get(i++)) :
+ cctx.affinity().primaryPartitions(ctx.localNodeId(), topVer);
for (int partId : partIds) {
GridDhtLocalPartition part = cctx.topology().localPartition(partId, topVer, false);
@@ -250,6 +256,37 @@ public class GridMapQueryExecutor {
}
/**
+ * @param ints Integers.
+ * @return Collection wrapper.
+ */
+ private static Collection<Integer> wrap(final int[] ints) {
+ return new AbstractCollection<Integer>() {
+ @Override public Iterator<Integer> iterator() {
+ return new Iterator<Integer>() {
+ /** */
+ private int i = 0;
+
+ @Override public boolean hasNext() {
+ return i < ints.length;
+ }
+
+ @Override public Integer next() {
+ return ints[i++];
+ }
+
+ @Override public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+
+ @Override public int size() {
+ return ints.length;
+ }
+ };
+ }
+
+ /**
* Executing queries locally.
*
* @param node Node.
@@ -280,6 +317,8 @@ public class GridMapQueryExecutor {
throw new CacheException(e);
}
+ List<String> caches = (List<String>)F.concat(true, req.space(), req.extraSpaces());
+
// Topology version can be null in rolling restart with previous version!
final AffinityTopologyVersion topVer = req.topologyVersion();
@@ -288,7 +327,7 @@ public class GridMapQueryExecutor {
h2.awaitForCacheAffinity(topVer);
// Reserve primary partitions.
- if (!reservePartitions(F.concat(true, req.space(), req.extraSpaces()), topVer, reserved)) {
+ if (!reservePartitions(caches, topVer, req.partitions(), reserved)) {
sendRetry(node, req.requestId());
return;
@@ -303,7 +342,7 @@ public class GridMapQueryExecutor {
if (nodeRess.put(req.requestId(), qr) != null)
throw new IllegalStateException();
- h2.setFilters(h2.backupFilter(topVer));
+ h2.setFilters(h2.backupFilter(caches, topVer, req.partitions()));
// TODO Prepare snapshots for all the needed tables before the run.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/20ec22b8/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index eb6db88..0836a75 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -279,6 +279,21 @@ public class GridReduceQueryExecutor {
}
/**
+ * @param set Set.
+ * @return Array.
+ */
+ private static int[] unbox(Set<Integer> set) {
+ int[] arr = new int[set.size()];
+
+ int i = 0;
+
+ for (int x : set)
+ arr[i++] = x;
+
+ return arr;
+ }
+
+ /**
* @param cctx Cache context.
* @param qry Query.
* @return Cursor.
@@ -304,12 +319,27 @@ public class GridReduceQueryExecutor {
if (F.isEmpty(nodes))
throw new CacheException("No data nodes found for cache: " + space);
+ List<String> extraSpaces = extraSpaces(space, qry.spaces());
+
+ List<int[]> parts = null;
+
if (cctx.isReplicated() || qry.explain()) {
assert qry.explain() || !nodes.contains(ctx.cluster().get().localNode()) : "We must be on a client node.";
// Select random data node to run query on a replicated data or get EXPLAIN PLAN from a single node.
nodes = Collections.singleton(F.rand(nodes));
}
+ else if (ctx.cache().context().exchange().hasPendingExchange()) { // TODO isActive ??
+ parts = new ArrayList<>(extraSpaces == null ? 1 : extraSpaces.size() + 1);
+
+ parts.add(unbox(cctx.affinity().primaryPartitions(ctx.localNodeId(), topVer)));
+
+ if (extraSpaces != null) {
+ for (String extraSpace : extraSpaces)
+ parts.add(unbox(ctx.cache().internalCache(extraSpace).context()
+ .affinity().primaryPartitions(ctx.localNodeId(), topVer)));
+ }
+ }
for (GridCacheSqlQuery mapQry : qry.mapQueries()) {
GridMergeTable tbl;
@@ -355,8 +385,7 @@ public class GridReduceQueryExecutor {
boolean ok = false;
try {
- send(nodes, new GridQueryRequest(qryReqId, r.pageSize, space, mapQrys, topVer,
- extraSpaces(space, qry.spaces())));
+ send(nodes, new GridQueryRequest(qryReqId, r.pageSize, space, mapQrys, topVer, extraSpaces, parts));
ok = true;
}