You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2015/06/11 07:05:24 UTC
[05/50] incubator-ignite git commit: #IGNITE-389 - Partition scan
review and fixes.w
#IGNITE-389 - Partition scan review and fixes.w
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/d72b040c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/d72b040c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/d72b040c
Branch: refs/heads/ignite-sprint-5
Commit: d72b040ccc4718852747d42152a448b9653f2c3f
Parents: 381c169
Author: Alexey Goncharuk <ag...@gridgain.com>
Authored: Tue May 26 12:08:10 2015 -0700
Committer: Alexey Goncharuk <ag...@gridgain.com>
Committed: Tue May 26 12:08:10 2015 -0700
----------------------------------------------------------------------
.../apache/ignite/cache/query/ScanQuery.java | 39 +++++++++++++++++++-
.../processors/cache/IgniteCacheProxy.java | 3 +-
.../cache/query/GridCacheQueryAdapter.java | 5 ++-
.../cache/IgniteCacheAbstractQuerySelfTest.java | 11 +++---
.../org/apache/ignite/spark/IgniteRDD.scala | 6 ++-
5 files changed, 55 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d72b040c/modules/core/src/main/java/org/apache/ignite/cache/query/ScanQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/ScanQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/ScanQuery.java
index 688eb2e..f56b0c7 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/query/ScanQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/query/ScanQuery.java
@@ -36,11 +36,18 @@ public final class ScanQuery<K, V> extends Query<Cache.Entry<K, V>> {
/** */
private IgniteBiPredicate<K, V> filter;
+ /** */
+ private Integer part;
+
/**
* Create scan query returning all entries.
*/
public ScanQuery() {
- this(null);
+ this(null, null);
+ }
+
+ public ScanQuery(int part) {
+ this(part, null);
}
/**
@@ -49,6 +56,16 @@ public final class ScanQuery<K, V> extends Query<Cache.Entry<K, V>> {
* @param filter Filter. If {@code null} then all entries will be returned.
*/
public ScanQuery(@Nullable IgniteBiPredicate<K, V> filter) {
+ this(null, filter);
+ }
+
+ /**
+ * Create scan query with filter.
+ *
+ * @param filter Filter. If {@code null} then all entries will be returned.
+ */
+ public ScanQuery(Integer part, @Nullable IgniteBiPredicate<K, V> filter) {
+ setPartition(part);
setFilter(filter);
}
@@ -73,6 +90,26 @@ public final class ScanQuery<K, V> extends Query<Cache.Entry<K, V>> {
return this;
}
+ /**
+ * Gets partition number over which this query should iterate. Will return {@code null} if partition was not
+ * set. In this case query will iterate over all partitions in the cache.
+ *
+ * @return Partition number or {@code null}.
+ */
+ public Integer getPartition() {
+ return part;
+ }
+
+ /**
+ * Sets partition number over which this query should iterate. If {@code null}, query will iterate over
+ * all partitions in the cache. Must be in the range [0, N) where N is partition number in the cache.
+ *
+ * @param part Partition number over which this query should iterate.
+ */
+ public void setPartition(Integer part) {
+ this.part = part;
+ }
+
/** {@inheritDoc} */
@Override public ScanQuery<K, V> setPageSize(int pageSize) {
return (ScanQuery<K, V>)super.setPageSize(pageSize);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d72b040c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index 0009127..176543b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -353,7 +353,8 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
if (filter instanceof ScanQuery) {
IgniteBiPredicate<K, V> p = ((ScanQuery)filter).getFilter();
- qry = ctx.queries().createScanQuery(p != null ? p : ACCEPT_ALL, null, isKeepPortable);
+ qry = ctx.queries().createScanQuery(p != null ? p : ACCEPT_ALL, ((ScanQuery)filter).getPartition(),
+ isKeepPortable);
if (grp != null)
qry.projection(grp);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d72b040c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
index d976d2c..9ab8c4f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
@@ -487,11 +487,14 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
@Nullable final ClusterGroup prj, @Nullable final Integer part) {
assert cctx != null;
+ final List<ClusterNode> owners = part == null ? null :
+ cctx.topology().owners(part, cctx.affinity().affinityTopologyVersion());
+
return F.view(CU.allNodes(cctx), new P1<ClusterNode>() {
@Override public boolean apply(ClusterNode n) {
return cctx.discovery().cacheAffinityNode(n, cctx.name()) &&
(prj == null || prj.node(n.id()) != null) &&
- (part == null || cctx.affinity().primary(n , part, cctx.affinity().affinityTopologyVersion()));
+ (part == null || owners.contains(n));
}
});
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d72b040c/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java
index eb5027c..228526f 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java
@@ -669,19 +669,20 @@ public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstrac
}
for (int i = 0; i < cctx.affinity().partitions(); i++) {
- CacheQuery<Map.Entry<Integer, Integer>> qry =
- ((IgniteCacheProxy<Integer, Integer>)cache).context().queries().createScanQuery(null, i, false);
+ ScanQuery<Integer, Integer> scan = new ScanQuery<>(i);
- CacheQueryFuture<Map.Entry<Integer, Integer>> fut = qry.execute();
+ Collection<Cache.Entry<Integer, Integer>> actual = cache.query(scan).getAll();
Map<Integer, Integer> exp = entries.get(i);
- Collection<Map.Entry<Integer, Integer>> actual = fut.get();
+ int size = exp == null ? 0 : exp.size();
+
+ assertEquals("Failed for partition: " + i, size, actual.size());
if (exp == null)
assertTrue(actual.isEmpty());
else
- for (Map.Entry<Integer, Integer> entry : actual)
+ for (Cache.Entry<Integer, Integer> entry : actual)
assertTrue(entry.getValue().equals(exp.get(entry.getKey())));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d72b040c/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
index 281c483..6a3720c 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
@@ -37,7 +37,11 @@ class IgniteRDD[K, V] (
override def compute(part: Partition, context: TaskContext): Iterator[(K, V)] = {
val cache = ensureCache()
- val it: java.util.Iterator[Cache.Entry[K, V]] = cache.query(new ScanQuery[K, V]()).iterator()
+ val qry: ScanQuery[K, V] = new ScanQuery[K, V]()
+
+ qry.setPartition(part.index)
+
+ val it: java.util.Iterator[Cache.Entry[K, V]] = cache.query(qry).iterator()
new IgniteQueryIterator[Cache.Entry[K, V], (K, V)](it, entry => {
(entry.getKey, entry.getValue)