You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/11 12:31:22 UTC

[06/50] incubator-ignite git commit: ignite-389 Change fallback nodes order: local first

ignite-389 Change fallback nodes order: local first


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/d151244e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/d151244e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/d151244e

Branch: refs/heads/ignite-929
Commit: d151244ee95ad2bba986136561e2326a434c3b5b
Parents: 5a7dd02
Author: agura <ag...@gridgain.com>
Authored: Wed May 27 22:16:28 2015 +0300
Committer: agura <ag...@gridgain.com>
Committed: Wed May 27 22:16:28 2015 +0300

----------------------------------------------------------------------
 .../cache/query/GridCacheQueryAdapter.java      | 132 ++++++++++---------
 ...CacheScanPartitionQueryFallbackSelfTest.java |   2 +-
 2 files changed, 72 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d151244e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
index 1f7b736..6574f0a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
@@ -41,6 +41,13 @@ import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryTy
  * Query adapter.
  */
 public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
+    /** Is local node predicate. */
+    private static final IgnitePredicate<ClusterNode> IS_LOC_NODE = new IgnitePredicate<ClusterNode>() {
+        @Override public boolean apply(ClusterNode n) {
+            return n.isLocal();
+        }
+    };
+
     /** */
     private final GridCacheContext<?, ?> cctx;
 
@@ -449,24 +456,10 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
         if (type == SQL_FIELDS || type == SPI)
             return (CacheQueryFuture<R>)(loc ? qryMgr.queryFieldsLocal(bean) :
                 qryMgr.queryFieldsDistributed(bean, nodes));
-        else {
-            final CacheQueryFuture<R> fut =
-                (CacheQueryFuture<R>)(loc ? qryMgr.queryLocal(bean) : qryMgr.queryDistributed(bean, nodes));
-
-            if (type == SCAN && part != null) {
-                assert nodes.size() == 1;
-
-                final Queue<ClusterNode> backups = new LinkedList<>(
-                    cctx.affinity().backups(part, cctx.affinity().affinityTopologyVersion()));
-
-                if (F.isEmpty(backups))
-                    return fut;
-
-                return new CacheQueryFallbackFuture<>(backups, bean, qryMgr, fut);
-            }
-
-            return fut;
-        }
+        else if (type == SCAN && part != null && nodes.size() > 1)
+            return new CacheQueryFallbackFuture(nodes, bean, qryMgr);
+        else
+            return (CacheQueryFuture<R>)(loc ? qryMgr.queryLocal(bean) : qryMgr.queryDistributed(bean, nodes));
     }
 
     /**
@@ -484,7 +477,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
                 return Collections.singletonList(cctx.localNode());
 
             case REPLICATED:
-                if (prj != null)
+                if (prj != null || partition() != null)
                     return nodes(cctx, prj, partition());
 
                 return cctx.affinityNode() ?
@@ -508,12 +501,13 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
         @Nullable final ClusterGroup prj, @Nullable final Integer part) {
         assert cctx != null;
 
-        final List<ClusterNode> owners = part == null ? null :
-            cctx.topology().owners(part, cctx.affinity().affinityTopologyVersion());
+        final AffinityTopologyVersion topVer = cctx.affinity().affinityTopologyVersion();
+
+        final Set<ClusterNode> owners =
+            part == null ? Collections.<ClusterNode>emptySet() : new HashSet<>(cctx.topology().owners(part, topVer));
 
         return F.view(CU.allNodes(cctx), new P1<ClusterNode>() {
             @Override public boolean apply(ClusterNode n) {
-                AffinityTopologyVersion topVer = cctx.affinity().affinityTopologyVersion();
 
                 return cctx.discovery().cacheAffinityNode(n, cctx.name()) &&
                     (prj == null || prj.node(n.id()) != null) &&
@@ -532,10 +526,10 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
      */
     private static class CacheQueryFallbackFuture<R> extends GridCacheQueryFutureAdapter<Object, Object, R> {
         /** Target. */
-        private GridCacheQueryFutureAdapter<?, ?, R> target;
+        private GridCacheQueryFutureAdapter<?, ?, R> fut;
 
         /** Backups. */
-        private final Queue<ClusterNode> backups;
+        private final Queue<ClusterNode> nodes;
 
         /** Bean. */
         private final GridCacheQueryBean bean;
@@ -544,41 +538,57 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
         private final GridCacheQueryManager qryMgr;
 
         /**
-         * @param backups Backups.
+         * @param nodes Backups.
          * @param bean Bean.
          * @param qryMgr Query manager.
-         * @param fut Future.
          */
-        public CacheQueryFallbackFuture(Queue<ClusterNode> backups, GridCacheQueryBean bean,
-            GridCacheQueryManager qryMgr, CacheQueryFuture<R> fut) {
-            this.backups = backups;
+        public CacheQueryFallbackFuture(Collection<ClusterNode> nodes, GridCacheQueryBean bean,
+            GridCacheQueryManager qryMgr) {
+            this.nodes = fallbacks(nodes);
             this.bean = bean;
             this.qryMgr = qryMgr;
-            this.target = (GridCacheQueryFutureAdapter<?, ?, R>)fut;
 
             init();
         }
 
         /**
+         * @param nodes Nodes.
+         */
+        private Queue<ClusterNode> fallbacks(Collection<ClusterNode> nodes) {
+            Queue<ClusterNode> fallbacks = new LinkedList<>();
+
+            ClusterNode node = F.first(F.view(nodes, IS_LOC_NODE));
+
+            if (node != null) {
+                fallbacks.add(node);
+
+                fallbacks.addAll(F.view(nodes, F.not(IS_LOC_NODE)));
+            }
+            else
+                fallbacks.addAll(nodes);
+
+            return fallbacks;
+        }
+
+        /**
          *
          */
         private void init() {
-            target.listen(new IgniteInClosure<IgniteInternalFuture<Collection<R>>>() {
+            ClusterNode node = nodes.poll();
+
+            fut = (GridCacheQueryFutureAdapter<?, ?, R>)(node.isLocal() ? qryMgr.queryLocal(bean) :
+                qryMgr.queryDistributed(bean, Collections.singleton(node)));
+
+            fut.listen(new IgniteInClosure<IgniteInternalFuture<Collection<R>>>() {
                 @Override public void apply(IgniteInternalFuture<Collection<R>> fut) {
                     try {
                         onDone(fut.get());
                     }
                     catch (IgniteCheckedException e) {
-                        if (F.isEmpty(backups))
+                        if (F.isEmpty(nodes))
                             onDone(e);
-                        else {
-                            Set<ClusterNode> backup = Collections.singleton(backups.poll());
-
-                            target =
-                                (GridCacheQueryFutureAdapter<?, ?, R>)qryMgr.queryDistributed(bean, backup);
-
+                        else
                             init();
-                        }
                     }
                 }
             });
@@ -586,113 +596,113 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
 
         /** {@inheritDoc} */
         @Override protected boolean onPage(UUID nodeId, boolean last) {
-            return target.onPage(nodeId, last);
+            return fut.onPage(nodeId, last);
         }
 
         /** {@inheritDoc} */
         @Override protected void loadPage() {
-            target.loadPage();
+            fut.loadPage();
         }
 
         /** {@inheritDoc} */
         @Override protected void loadAllPages() throws IgniteInterruptedCheckedException {
-            target.loadAllPages();
+            fut.loadAllPages();
         }
 
         /** {@inheritDoc} */
         @Override protected void cancelQuery() throws IgniteCheckedException {
-            target.cancelQuery();
+            fut.cancelQuery();
         }
 
         /** {@inheritDoc} */
         @Override public int available() {
-            return target.available();
+            return fut.available();
         }
 
         /** {@inheritDoc} */
         @Override public boolean cancel() throws IgniteCheckedException {
-            return target.cancel();
+            return fut.cancel();
         }
 
         /** {@inheritDoc} */
         @Override void clear() {
-            target.clear();
+            fut.clear();
         }
 
         /** {@inheritDoc} */
         @Override public long endTime() {
-            return target.endTime();
+            return fut.endTime();
         }
 
         /** {@inheritDoc} */
         @Override protected void enqueue(Collection<?> col) {
-            target.enqueue(col);
+            fut.enqueue(col);
         }
 
         /** {@inheritDoc} */
         @Override boolean fields() {
-            return target.fields();
+            return fut.fields();
         }
 
         /** {@inheritDoc} */
         @Override public Collection<R> get() throws IgniteCheckedException {
-            return target.get();
+            return fut.get();
         }
 
         /** {@inheritDoc} */
         @Override public Collection<R> get(long timeout, TimeUnit unit) throws IgniteCheckedException {
-            return target.get(timeout, unit);
+            return fut.get(timeout, unit);
         }
 
         /** {@inheritDoc} */
         @Override public R next() {
-            return target.next();
+            return fut.next();
         }
 
         /** {@inheritDoc} */
         @Override public Collection<R> nextPage() throws IgniteCheckedException {
-            return target.nextPage();
+            return fut.nextPage();
         }
 
         /** {@inheritDoc} */
         @Override public boolean onDone(Collection<R> res, Throwable err) {
-            return target.onDone(res, err);
+            return fut.onDone(res, err);
         }
 
         /** {@inheritDoc} */
         @Override public Collection<R> nextPage(long timeout) throws IgniteCheckedException {
-            return target.nextPage(timeout);
+            return fut.nextPage(timeout);
         }
 
         /** {@inheritDoc} */
         @Override protected void onNodeLeft(UUID evtNodeId) {
-            target.onNodeLeft(evtNodeId);
+            fut.onNodeLeft(evtNodeId);
         }
 
         /** {@inheritDoc} */
         @Override public void onPage(@Nullable UUID nodeId, @Nullable Collection<?> data,
             @Nullable Throwable err, boolean finished) {
-            target.onPage(nodeId, data, err, finished);
+            fut.onPage(nodeId, data, err, finished);
         }
 
         /** {@inheritDoc} */
         @Override public void onTimeout() {
-            target.onTimeout();
+            fut.onTimeout();
         }
 
         /** {@inheritDoc} */
         @Override public void printMemoryStats() {
-            target.printMemoryStats();
+            fut.printMemoryStats();
         }
 
         /** {@inheritDoc} */
         @Override public GridCacheQueryBean query() {
-            return target.query();
+            return fut.query();
         }
 
         /** {@inheritDoc} */
         @Override public IgniteUuid timeoutId() {
-            return target.timeoutId();
+            return fut.timeoutId();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d151244e/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java
index 3b1b842..31336e6 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java
@@ -41,7 +41,7 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT
     private static final int GRID_CNT = 5;
 
     /** Kys count. */
-    private static final int KEYS_CNT = 1;
+    private static final int KEYS_CNT = 5000;
 
     /** Backups. */
     private int backups;