You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/05/28 17:30:28 UTC

incubator-ignite git commit: ignite-389 Avoid backups filtering in case of partition scan query

Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-389 f00a9e998 -> 5d6bb532c


ignite-389 Avoid backups filtering in case of partition scan query


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/5d6bb532
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/5d6bb532
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/5d6bb532

Branch: refs/heads/ignite-389
Commit: 5d6bb532c7de35cfea7674b5fc1446e72a5fa985
Parents: f00a9e9
Author: agura <ag...@gridgain.com>
Authored: Thu May 28 18:30:08 2015 +0300
Committer: agura <ag...@gridgain.com>
Committed: Thu May 28 18:30:08 2015 +0300

----------------------------------------------------------------------
 .../apache/ignite/cache/query/ScanQuery.java    |  12 +-
 .../cache/query/GridCacheQueryAdapter.java      | 122 +++----------------
 .../cache/query/GridCacheQueryManager.java      |   9 +-
 3 files changed, 28 insertions(+), 115 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5d6bb532/modules/core/src/main/java/org/apache/ignite/cache/query/ScanQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/ScanQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/ScanQuery.java
index f56b0c7..e6b69bc 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/query/ScanQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/query/ScanQuery.java
@@ -46,6 +46,11 @@ public final class ScanQuery<K, V> extends Query<Cache.Entry<K, V>> {
         this(null, null);
     }
 
+    /**
+     * Creates partition scan query returning all entries for given partition.
+     *
+     * @param part Partition.
+     */
     public ScanQuery(int part) {
         this(part, null);
     }
@@ -62,9 +67,10 @@ public final class ScanQuery<K, V> extends Query<Cache.Entry<K, V>> {
     /**
      * Create scan query with filter.
      *
+     * @param part Partition.
      * @param filter Filter. If {@code null} then all entries will be returned.
      */
-    public ScanQuery(Integer part, @Nullable IgniteBiPredicate<K, V> filter) {
+    public ScanQuery(@Nullable Integer part, @Nullable IgniteBiPredicate<K, V> filter) {
         setPartition(part);
         setFilter(filter);
     }
@@ -96,7 +102,7 @@ public final class ScanQuery<K, V> extends Query<Cache.Entry<K, V>> {
      *
      * @return Partition number or {@code null}.
      */
-    public Integer getPartition() {
+    @Nullable public Integer getPartition() {
         return part;
     }
 
@@ -106,7 +112,7 @@ public final class ScanQuery<K, V> extends Query<Cache.Entry<K, V>> {
      *
      * @param part Partition number over which this query should iterate.
      */
-    public void setPartition(Integer part) {
+    public void setPartition(@Nullable Integer part) {
         this.part = part;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5d6bb532/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
index 6574f0a..2f32faa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
@@ -26,14 +26,15 @@ import org.apache.ignite.internal.cluster.*;
 import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.query.*;
+import org.apache.ignite.internal.util.future.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.plugin.security.*;
+
 import org.jetbrains.annotations.*;
 
 import java.util.*;
-import java.util.concurrent.*;
 
 import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.*;
 
@@ -457,7 +458,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
             return (CacheQueryFuture<R>)(loc ? qryMgr.queryFieldsLocal(bean) :
                 qryMgr.queryFieldsDistributed(bean, nodes));
         else if (type == SCAN && part != null && nodes.size() > 1)
-            return new CacheQueryFallbackFuture(nodes, bean, qryMgr);
+            return new CacheQueryFallbackFuture<>(nodes, bean, qryMgr);
         else
             return (CacheQueryFuture<R>)(loc ? qryMgr.queryLocal(bean) : qryMgr.queryDistributed(bean, nodes));
     }
@@ -524,9 +525,10 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
     /**
      * Wrapper for queries with fallback.
      */
-    private static class CacheQueryFallbackFuture<R> extends GridCacheQueryFutureAdapter<Object, Object, R> {
-        /** Target. */
-        private GridCacheQueryFutureAdapter<?, ?, R> fut;
+    private static class CacheQueryFallbackFuture<R> extends GridFutureAdapter<Collection<R>>
+        implements CacheQueryFuture<R> {
+        /** Query future. */
+        private volatile GridCacheQueryFutureAdapter<?, ?, R> fut;
 
         /** Backups. */
         private final Queue<ClusterNode> nodes;
@@ -559,13 +561,10 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
 
             ClusterNode node = F.first(F.view(nodes, IS_LOC_NODE));
 
-            if (node != null) {
+            if (node != null)
                 fallbacks.add(node);
 
-                fallbacks.addAll(F.view(nodes, F.not(IS_LOC_NODE)));
-            }
-            else
-                fallbacks.addAll(nodes);
+            fallbacks.addAll(node != null ? F.view(nodes, F.not(IS_LOC_NODE)) : nodes);
 
             return fallbacks;
         }
@@ -576,10 +575,11 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
         private void init() {
             ClusterNode node = nodes.poll();
 
-            fut = (GridCacheQueryFutureAdapter<?, ?, R>)(node.isLocal() ? qryMgr.queryLocal(bean) :
-                qryMgr.queryDistributed(bean, Collections.singleton(node)));
+            GridCacheQueryFutureAdapter<?, ?, R> fut0 =
+                (GridCacheQueryFutureAdapter<?, ?, R>)(node.isLocal() ? qryMgr.queryLocal(bean) :
+                    qryMgr.queryDistributed(bean, Collections.singleton(node)));
 
-            fut.listen(new IgniteInClosure<IgniteInternalFuture<Collection<R>>>() {
+            fut0.listen(new IgniteInClosure<IgniteInternalFuture<Collection<R>>>() {
                 @Override public void apply(IgniteInternalFuture<Collection<R>> fut) {
                     try {
                         onDone(fut.get());
@@ -592,26 +592,8 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
                     }
                 }
             });
-        }
-
-        /** {@inheritDoc} */
-        @Override protected boolean onPage(UUID nodeId, boolean last) {
-            return fut.onPage(nodeId, last);
-        }
 
-        /** {@inheritDoc} */
-        @Override protected void loadPage() {
-            fut.loadPage();
-        }
-
-        /** {@inheritDoc} */
-        @Override protected void loadAllPages() throws IgniteInterruptedCheckedException {
-            fut.loadAllPages();
-        }
-
-        /** {@inheritDoc} */
-        @Override protected void cancelQuery() throws IgniteCheckedException {
-            fut.cancelQuery();
+            fut = fut0;
         }
 
         /** {@inheritDoc} */
@@ -625,84 +607,8 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
         }
 
         /** {@inheritDoc} */
-        @Override void clear() {
-            fut.clear();
-        }
-
-        /** {@inheritDoc} */
-        @Override public long endTime() {
-            return fut.endTime();
-        }
-
-        /** {@inheritDoc} */
-        @Override protected void enqueue(Collection<?> col) {
-            fut.enqueue(col);
-        }
-
-        /** {@inheritDoc} */
-        @Override boolean fields() {
-            return fut.fields();
-        }
-
-        /** {@inheritDoc} */
-        @Override public Collection<R> get() throws IgniteCheckedException {
-            return fut.get();
-        }
-
-        /** {@inheritDoc} */
-        @Override public Collection<R> get(long timeout, TimeUnit unit) throws IgniteCheckedException {
-            return fut.get(timeout, unit);
-        }
-
-        /** {@inheritDoc} */
         @Override public R next() {
             return fut.next();
         }
-
-        /** {@inheritDoc} */
-        @Override public Collection<R> nextPage() throws IgniteCheckedException {
-            return fut.nextPage();
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean onDone(Collection<R> res, Throwable err) {
-            return fut.onDone(res, err);
-        }
-
-        /** {@inheritDoc} */
-        @Override public Collection<R> nextPage(long timeout) throws IgniteCheckedException {
-            return fut.nextPage(timeout);
-        }
-
-        /** {@inheritDoc} */
-        @Override protected void onNodeLeft(UUID evtNodeId) {
-            fut.onNodeLeft(evtNodeId);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void onPage(@Nullable UUID nodeId, @Nullable Collection<?> data,
-            @Nullable Throwable err, boolean finished) {
-            fut.onPage(nodeId, data, err, finished);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void onTimeout() {
-            fut.onTimeout();
-        }
-
-        /** {@inheritDoc} */
-        @Override public void printMemoryStats() {
-            fut.printMemoryStats();
-        }
-
-        /** {@inheritDoc} */
-        @Override public GridCacheQueryBean query() {
-            return fut.query();
-        }
-
-        /** {@inheritDoc} */
-        @Override public IgniteUuid timeoutId() {
-            return fut.timeoutId();
-        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5d6bb532/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index fac3d8f..652d62e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -795,7 +795,6 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
                             !locPart.reserve())
                             throw new GridDhtInvalidPartitionException(part, "Partition can't be reserved");
 
-
                         iter = new Iterator<K>() {
                             private Iterator<KeyCacheObject> iter0 = locPart.keySet().iterator();
 
@@ -1329,9 +1328,11 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
 
                     K key = row.getKey();
 
-                    // Filter backups for SCAN queries. Other types are filtered in indexing manager.
-                    if (!cctx.isReplicated() && cctx.config().getCacheMode() != LOCAL && qry.type() == SCAN &&
-                        !incBackups && !cctx.affinity().primary(cctx.localNode(), key, topVer)) {
+                    // Filter backups for SCAN queries, if it isn't partition scan.
+                    // Other types are filtered in indexing manager.
+                    if (!cctx.isReplicated() && qry.type() == SCAN && qry.partition() == null &&
+                        cctx.config().getCacheMode() != LOCAL && !incBackups &&
+                        !cctx.affinity().primary(cctx.localNode(), key, topVer)) {
                         if (log.isDebugEnabled())
                             log.debug("Ignoring backup element [row=" + row +
                                 ", cacheMode=" + cctx.config().getCacheMode() + ", incBackups=" + incBackups +