You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2015/06/11 09:18:17 UTC

[02/50] incubator-ignite git commit: #IGNITE-389 - Partition scan review and fixes.w

#IGNITE-389 - Partition scan review and fixes.w


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/d72b040c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/d72b040c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/d72b040c

Branch: refs/heads/ignite-484-1
Commit: d72b040ccc4718852747d42152a448b9653f2c3f
Parents: 381c169
Author: Alexey Goncharuk <ag...@gridgain.com>
Authored: Tue May 26 12:08:10 2015 -0700
Committer: Alexey Goncharuk <ag...@gridgain.com>
Committed: Tue May 26 12:08:10 2015 -0700

----------------------------------------------------------------------
 .../apache/ignite/cache/query/ScanQuery.java    | 39 +++++++++++++++++++-
 .../processors/cache/IgniteCacheProxy.java      |  3 +-
 .../cache/query/GridCacheQueryAdapter.java      |  5 ++-
 .../cache/IgniteCacheAbstractQuerySelfTest.java | 11 +++---
 .../org/apache/ignite/spark/IgniteRDD.scala     |  6 ++-
 5 files changed, 55 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d72b040c/modules/core/src/main/java/org/apache/ignite/cache/query/ScanQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/ScanQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/ScanQuery.java
index 688eb2e..f56b0c7 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/query/ScanQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/query/ScanQuery.java
@@ -36,11 +36,18 @@ public final class ScanQuery<K, V> extends Query<Cache.Entry<K, V>> {
     /** */
     private IgniteBiPredicate<K, V> filter;
 
+    /** */
+    private Integer part;
+
     /**
      * Create scan query returning all entries.
      */
     public ScanQuery() {
-        this(null);
+        this(null, null);
+    }
+
+    public ScanQuery(int part) {
+        this(part, null);
     }
 
     /**
@@ -49,6 +56,16 @@ public final class ScanQuery<K, V> extends Query<Cache.Entry<K, V>> {
      * @param filter Filter. If {@code null} then all entries will be returned.
      */
     public ScanQuery(@Nullable IgniteBiPredicate<K, V> filter) {
+        this(null, filter);
+    }
+
+    /**
+     * Create scan query with filter.
+     *
+     * @param filter Filter. If {@code null} then all entries will be returned.
+     */
+    public ScanQuery(Integer part, @Nullable IgniteBiPredicate<K, V> filter) {
+        setPartition(part);
         setFilter(filter);
     }
 
@@ -73,6 +90,26 @@ public final class ScanQuery<K, V> extends Query<Cache.Entry<K, V>> {
         return this;
     }
 
+    /**
+     * Gets partition number over which this query should iterate. Will return {@code null} if partition was not
+     * set. In this case query will iterate over all partitions in the cache.
+     *
+     * @return Partition number or {@code null}.
+     */
+    public Integer getPartition() {
+        return part;
+    }
+
+    /**
+     * Sets partition number over which this query should iterate. If {@code null}, query will iterate over
+     * all partitions in the cache. Must be in the range [0, N) where N is partition number in the cache.
+     *
+     * @param part Partition number over which this query should iterate.
+     */
+    public void setPartition(Integer part) {
+        this.part = part;
+    }
+
     /** {@inheritDoc} */
     @Override public ScanQuery<K, V> setPageSize(int pageSize) {
         return (ScanQuery<K, V>)super.setPageSize(pageSize);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d72b040c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index 0009127..176543b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -353,7 +353,8 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
         if (filter instanceof ScanQuery) {
             IgniteBiPredicate<K, V> p = ((ScanQuery)filter).getFilter();
 
-            qry = ctx.queries().createScanQuery(p != null ? p : ACCEPT_ALL, null, isKeepPortable);
+            qry = ctx.queries().createScanQuery(p != null ? p : ACCEPT_ALL, ((ScanQuery)filter).getPartition(),
+                isKeepPortable);
 
             if (grp != null)
                 qry.projection(grp);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d72b040c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
index d976d2c..9ab8c4f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
@@ -487,11 +487,14 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
         @Nullable final ClusterGroup prj, @Nullable final Integer part) {
         assert cctx != null;
 
+        final List<ClusterNode> owners = part == null ? null :
+            cctx.topology().owners(part, cctx.affinity().affinityTopologyVersion());
+
         return F.view(CU.allNodes(cctx), new P1<ClusterNode>() {
             @Override public boolean apply(ClusterNode n) {
                 return cctx.discovery().cacheAffinityNode(n, cctx.name()) &&
                     (prj == null || prj.node(n.id()) != null) &&
-                    (part == null || cctx.affinity().primary(n , part, cctx.affinity().affinityTopologyVersion()));
+                    (part == null || owners.contains(n));
             }
         });
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d72b040c/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java
index eb5027c..228526f 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java
@@ -669,19 +669,20 @@ public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstrac
         }
 
         for (int i = 0; i < cctx.affinity().partitions(); i++) {
-            CacheQuery<Map.Entry<Integer, Integer>> qry =
-                ((IgniteCacheProxy<Integer, Integer>)cache).context().queries().createScanQuery(null, i, false);
+            ScanQuery<Integer, Integer> scan = new ScanQuery<>(i);
 
-            CacheQueryFuture<Map.Entry<Integer, Integer>> fut = qry.execute();
+            Collection<Cache.Entry<Integer, Integer>> actual = cache.query(scan).getAll();
 
             Map<Integer, Integer> exp = entries.get(i);
 
-            Collection<Map.Entry<Integer, Integer>> actual = fut.get();
+            int size = exp == null ? 0 : exp.size();
+
+            assertEquals("Failed for partition: " + i, size, actual.size());
 
             if (exp == null)
                 assertTrue(actual.isEmpty());
             else
-                for (Map.Entry<Integer, Integer> entry : actual)
+                for (Cache.Entry<Integer, Integer> entry : actual)
                     assertTrue(entry.getValue().equals(exp.get(entry.getKey())));
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d72b040c/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
index 281c483..6a3720c 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
@@ -37,7 +37,11 @@ class IgniteRDD[K, V] (
     override def compute(part: Partition, context: TaskContext): Iterator[(K, V)] = {
         val cache = ensureCache()
 
-        val it: java.util.Iterator[Cache.Entry[K, V]] = cache.query(new ScanQuery[K, V]()).iterator()
+        val qry: ScanQuery[K, V] = new ScanQuery[K, V]()
+
+        qry.setPartition(part.index)
+
+        val it: java.util.Iterator[Cache.Entry[K, V]] = cache.query(qry).iterator()
 
         new IgniteQueryIterator[Cache.Entry[K, V], (K, V)](it, entry => {
             (entry.getKey, entry.getValue)