You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2014/12/23 04:14:28 UTC

[45/50] [abbrv] incubator-ignite git commit: GG-9580 - Fix for queries in replicated cache

GG-9580 - Fix for queries in replicated cache


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/1483feb0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/1483feb0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/1483feb0

Branch: refs/heads/ignite-32
Commit: 1483feb0e2eed3263ea664115385f9bbe48f927a
Parents: 1812040
Author: Valentin Kulichenko <vk...@gridgain.com>
Authored: Mon Dec 22 16:00:22 2014 -0800
Committer: Valentin Kulichenko <vk...@gridgain.com>
Committed: Mon Dec 22 16:00:22 2014 -0800

----------------------------------------------------------------------
 .../cache/query/GridCacheQueryAdapter.java      | 46 ++++++++++++++++----
 .../java/org/gridgain/grid/util/GridUtils.java  | 19 ++++++++
 .../cache/GridCacheAbstractQuerySelfTest.java   |  3 +-
 .../GridCacheReplicatedQuerySelfTest.java       | 39 ++++++++++++++++-
 4 files changed, 96 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1483feb0/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueryAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueryAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueryAdapter.java
index d15d77c..e22b420 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueryAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueryAdapter.java
@@ -13,7 +13,6 @@ import org.apache.ignite.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.plugin.security.*;
-import org.gridgain.grid.*;
 import org.gridgain.grid.cache.*;
 import org.gridgain.grid.cache.query.*;
 import org.gridgain.grid.kernal.processors.cache.*;
@@ -23,6 +22,7 @@ import org.jetbrains.annotations.*;
 
 import java.util.*;
 
+import static org.gridgain.grid.cache.GridCacheDistributionMode.*;
 import static org.gridgain.grid.kernal.processors.cache.query.GridCacheQueryType.*;
 
 /**
@@ -447,18 +447,48 @@ public class GridCacheQueryAdapter<T> implements GridCacheQuery<T> {
      * @return Nodes to execute on.
      */
     private Collection<ClusterNode> nodes() {
-        Collection<ClusterNode> nodes = CU.allNodes(cctx);
+        GridCacheMode cacheMode = cctx.config().getCacheMode();
+
+        switch (cacheMode) {
+            case LOCAL:
+                if (prj != null)
+                    U.warn(log, "Ignoring query projection because it's executed over LOCAL cache " +
+                        "(only local node will be queried): " + this);
 
-        if (prj == null) {
-            if (cctx.isReplicated())
                 return Collections.singletonList(cctx.localNode());
 
-            return nodes;
+            case REPLICATED:
+                if (prj != null)
+                    return nodes(cctx, prj);
+
+                GridCacheDistributionMode mode = cctx.config().getDistributionMode();
+
+                return mode == PARTITIONED_ONLY || mode == NEAR_PARTITIONED ?
+                    Collections.singletonList(cctx.localNode()) :
+                    Collections.singletonList(F.rand(nodes(cctx, null)));
+
+            case PARTITIONED:
+                return nodes(cctx, prj);
+
+            default:
+                throw new IllegalStateException("Unknown cache distribution mode: " + cacheMode);
         }
+    }
+
+    /**
+     * @param cctx Cache context.
+     * @param prj Projection (optional).
+     * @return Collection of data nodes in provided projection (if any).
+     */
+    private static Collection<ClusterNode> nodes(final GridCacheContext<?, ?> cctx, @Nullable final ClusterGroup prj) {
+        assert cctx != null;
+
+        return F.view(CU.allNodes(cctx), new P1<ClusterNode>() {
+            @Override public boolean apply(ClusterNode n) {
+                GridCacheDistributionMode mode = U.distributionMode(n, cctx.name());
 
-        return F.view(nodes, new P1<ClusterNode>() {
-            @Override public boolean apply(ClusterNode e) {
-                return prj.node(e.id()) != null;
+                return (mode == PARTITIONED_ONLY || mode == NEAR_PARTITIONED) &&
+                    (prj == null || prj.node(n.id()) != null);
             }
         });
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1483feb0/modules/core/src/main/java/org/gridgain/grid/util/GridUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/util/GridUtils.java b/modules/core/src/main/java/org/gridgain/grid/util/GridUtils.java
index 803badd..fb7955d 100644
--- a/modules/core/src/main/java/org/gridgain/grid/util/GridUtils.java
+++ b/modules/core/src/main/java/org/gridgain/grid/util/GridUtils.java
@@ -7194,6 +7194,25 @@ public abstract class GridUtils {
     }
 
     /**
+     * Gets cache distribution mode on given node or {@code null} if cache is not
+     * present on given node.
+     *
+     * @param n Node to check.
+     * @param cacheName Cache to check.
+     * @return Cache distribution mode or {@code null} if cache is not found.
+     */
+    @Nullable public static GridCacheDistributionMode distributionMode(ClusterNode n, String cacheName) {
+        GridCacheAttributes[] caches = n.attribute(ATTR_CACHE);
+
+        if (caches != null)
+            for (GridCacheAttributes attrs : caches)
+                if (F.eq(cacheName, attrs.cacheName()))
+                    return attrs.partitionedTaxonomy();
+
+        return null;
+    }
+
+    /**
      * Checks if given node has near cache enabled for the specified
      * partitioned cache.
      *

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1483feb0/modules/indexing/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAbstractQuerySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAbstractQuerySelfTest.java b/modules/indexing/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAbstractQuerySelfTest.java
index 2883215..4987a8c 100644
--- a/modules/indexing/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAbstractQuerySelfTest.java
+++ b/modules/indexing/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAbstractQuerySelfTest.java
@@ -18,7 +18,6 @@ import org.apache.ignite.spi.discovery.tcp.*;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
 import org.apache.ignite.spi.swapspace.file.*;
-import org.gridgain.grid.*;
 import org.gridgain.grid.cache.*;
 import org.gridgain.grid.cache.query.*;
 import org.gridgain.grid.cache.store.*;
@@ -117,7 +116,7 @@ public abstract class GridCacheAbstractQuerySelfTest extends GridCommonAbstractT
 
             cc.setCacheMode(cacheMode());
             cc.setAtomicityMode(atomicityMode());
-            cc.setDistributionMode(distributionMode());
+            cc.setDistributionMode(gridName.startsWith("client") ? CLIENT_ONLY :distributionMode());
             cc.setWriteSynchronizationMode(FULL_SYNC);
             cc.setStore(store);
             cc.setPreloadMode(SYNC);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1483feb0/modules/indexing/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/replicated/GridCacheReplicatedQuerySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/replicated/GridCacheReplicatedQuerySelfTest.java b/modules/indexing/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/replicated/GridCacheReplicatedQuerySelfTest.java
index d72d004..c0c3306 100644
--- a/modules/indexing/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/replicated/GridCacheReplicatedQuerySelfTest.java
+++ b/modules/indexing/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/replicated/GridCacheReplicatedQuerySelfTest.java
@@ -13,7 +13,6 @@ import org.apache.ignite.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.events.*;
 import org.apache.ignite.lang.*;
-import org.gridgain.grid.*;
 import org.gridgain.grid.cache.*;
 import org.gridgain.grid.cache.query.*;
 import org.gridgain.grid.kernal.*;
@@ -98,6 +97,44 @@ public class GridCacheReplicatedQuerySelfTest extends GridCacheAbstractQuerySelf
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testClientOnlyNode() throws Exception {
+        try {
+            Ignite g = startGrid("client");
+
+            GridCache<Integer, Integer> c = g.cache(null);
+
+            for (int i = 0; i < 10; i++)
+                c.putx(i, i);
+
+            // Client cache should be empty.
+            assertEquals(0, c.size());
+
+            Collection<Map.Entry<Integer, Integer>> res =
+                c.queries().createSqlQuery(Integer.class, "_key >= 5 order by _key").execute().get();
+
+            assertEquals(5, res.size());
+
+            Iterator<Map.Entry<Integer, Integer>> it = res.iterator();
+
+            int i = 5;
+
+            while (it.hasNext()) {
+                Map.Entry<Integer, Integer> e  = it.next();
+
+                assertEquals(i, e.getKey().intValue());
+                assertEquals(i, e.getValue().intValue());
+
+                i++;
+            }
+        }
+        finally {
+            stopGrid("client");
+        }
+    }
+
+    /**
      * JUnit.
      *
      * @throws Exception If failed.