You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2015/02/02 11:52:41 UTC

[2/2] incubator-ignite git commit: IGNITE-158 implement ClusterGroup#forClientNodes() and ClusterGroup#forClientNodes().

IGNITE-158 implement ClusterGroup#forClientNodes() and ClusterGroup#forClientNodes().


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/1b9629d0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/1b9629d0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/1b9629d0

Branch: refs/heads/ignite-158
Commit: 1b9629d07ef7f2a7dd7a43ae41296e79a9a9612b
Parents: f04e0ec
Author: sevdokimov <se...@gridgain.com>
Authored: Mon Feb 2 13:51:34 2015 +0300
Committer: sevdokimov <se...@gridgain.com>
Committed: Mon Feb 2 13:51:34 2015 +0300

----------------------------------------------------------------------
 .../org/apache/ignite/cluster/ClusterGroup.java | 23 ++++++++
 .../ignite/internal/ClusterGroupAdapter.java    | 55 ++++++++++++++++++--
 .../ignite/internal/IgniteClusterAsyncImpl.java | 10 ++++
 .../GridProjectionForCachesSelfTest.java        | 44 ++++++++++++++--
 ...idHadoopDefaultMapReducePlannerSelfTest.java | 10 ++++
 5 files changed, 134 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1b9629d0/modules/core/src/main/java/org/apache/ignite/cluster/ClusterGroup.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cluster/ClusterGroup.java b/modules/core/src/main/java/org/apache/ignite/cluster/ClusterGroup.java
index 9f2f435..6267dbe 100644
--- a/modules/core/src/main/java/org/apache/ignite/cluster/ClusterGroup.java
+++ b/modules/core/src/main/java/org/apache/ignite/cluster/ClusterGroup.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.cluster;
 
 import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
 import org.apache.ignite.lang.*;
 import org.jetbrains.annotations.*;
 
@@ -148,6 +149,28 @@ public interface ClusterGroup {
     public ClusterGroup forCacheNodes(String cacheName, @Nullable String... cacheNames);
 
     /**
+     * Creates projection for all nodes that have cache with specified name running and cache distribution mode is
+     * {@link CacheDistributionMode#PARTITIONED_ONLY} or {@link CacheDistributionMode#NEAR_PARTITIONED}.
+     *
+     * @param cacheName Cache name.
+     * @param cacheNames Optional additional cache names to include into projection.
+     * @return Projection over nodes that have specified cache running.
+     * @see CacheConfiguration#getDistributionMode()
+     */
+    public ClusterGroup forDataNodes(String cacheName, @Nullable String... cacheNames);
+
+    /**
+     * Creates projection for all nodes that have cache with specified name running and cache distribution mode is
+     * {@link CacheDistributionMode#CLIENT_ONLY} or {@link CacheDistributionMode#NEAR_ONLY}.
+     *
+     * @param cacheName Cache name.
+     * @param cacheNames Optional additional cache names to include into projection.
+     * @return Projection over nodes that have specified cache running.
+     * @see CacheConfiguration#getDistributionMode()
+     */
+    public ClusterGroup forClientNodes(String cacheName, @Nullable String... cacheNames);
+
+    /**
      * Creates projection for all nodes that have streamer with specified name running.
      *
      * @param streamerName Streamer name.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1b9629d0/modules/core/src/main/java/org/apache/ignite/internal/ClusterGroupAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/ClusterGroupAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/ClusterGroupAdapter.java
index 7dff797..fafb23c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/ClusterGroupAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/ClusterGroupAdapter.java
@@ -18,7 +18,9 @@
 package org.apache.ignite.internal;
 
 import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
 import org.apache.ignite.cluster.*;
+import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.internal.executor.*;
 import org.apache.ignite.internal.util.typedef.*;
@@ -525,7 +527,17 @@ public class ClusterGroupAdapter implements ClusterGroupEx, Externalizable {
 
     /** {@inheritDoc} */
     @Override public final ClusterGroup forCacheNodes(@Nullable String cacheName, @Nullable String... cacheNames) {
-        return forPredicate(new CachesFilter(cacheName, cacheNames));
+        return forPredicate(new CachesFilter(cacheName, cacheNames, null));
+    }
+
+    /** {@inheritDoc} */
+    @Override public final ClusterGroup forDataNodes(@Nullable String cacheName, @Nullable String... cacheNames) {
+        return forPredicate(new CachesFilter(cacheName, cacheNames, CachesFilter.DATA_MODES));
+    }
+
+    /** {@inheritDoc} */
+    @Override public final ClusterGroup forClientNodes(@Nullable String cacheName, @Nullable String... cacheNames) {
+        return forPredicate(new CachesFilter(cacheName, cacheNames, CachesFilter.CLIENT_MODES));
     }
 
     /** {@inheritDoc} */
@@ -652,6 +664,14 @@ public class ClusterGroupAdapter implements ClusterGroupEx, Externalizable {
      */
     private static class CachesFilter implements IgnitePredicate<ClusterNode> {
         /** */
+        private static final Set<CacheDistributionMode> DATA_MODES = EnumSet.of(CacheDistributionMode.NEAR_PARTITIONED,
+            CacheDistributionMode.PARTITIONED_ONLY);
+
+        /** */
+        private static final Set<CacheDistributionMode> CLIENT_MODES = EnumSet.of(CacheDistributionMode.CLIENT_ONLY,
+            CacheDistributionMode.NEAR_ONLY);
+
+        /** */
         private static final long serialVersionUID = 0L;
 
         /** Cache name. */
@@ -660,27 +680,54 @@ public class ClusterGroupAdapter implements ClusterGroupEx, Externalizable {
         /** Cache names. */
         private final String[] cacheNames;
 
+        /** */
+        private final Set<CacheDistributionMode> distributionMode;
+
         /**
          * @param cacheName Cache name.
          * @param cacheNames Cache names.
+         * @param distributionMode Filter by {@link CacheConfiguration#getDistributionMode()}.
          */
-        private CachesFilter(@Nullable String cacheName, @Nullable String[] cacheNames) {
+        private CachesFilter(@Nullable String cacheName, @Nullable String[] cacheNames,
+            @Nullable Set<CacheDistributionMode> distributionMode) {
             this.cacheName = cacheName;
             this.cacheNames = cacheNames;
+            this.distributionMode = distributionMode;
         }
 
         /** {@inheritDoc} */
         @Override public boolean apply(ClusterNode n) {
-            if (!U.hasCache(n, cacheName))
+            GridCacheAttributes[] caches = n.attribute(ATTR_CACHE);
+
+            if (caches == null)
+                return false;
+
+            if (!hasCache(caches, cacheName, distributionMode))
                 return false;
 
             if (!F.isEmpty(cacheNames))
                 for (String cn : cacheNames)
-                    if (!U.hasCache(n, cn))
+                    if (!hasCache(caches, cn, distributionMode))
                         return false;
 
             return true;
         }
+
+        /**
+         * @param cacheName Cache name to check.
+         * @param distributionMode Filter by {@link CacheConfiguration#getDistributionMode()}.
+         * @return {@code true} if given node has specified cache started.
+         */
+        public static boolean hasCache(GridCacheAttributes[] caches, @Nullable String cacheName,
+            @Nullable Collection<CacheDistributionMode> distributionMode) {
+            for (GridCacheAttributes attrs : caches) {
+                if (Objects.equals(cacheName, attrs.cacheName())
+                    && (distributionMode == null || distributionMode.contains(attrs.partitionedTaxonomy())))
+                    return true;
+            }
+
+            return false;
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1b9629d0/modules/core/src/main/java/org/apache/ignite/internal/IgniteClusterAsyncImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteClusterAsyncImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteClusterAsyncImpl.java
index ea148ff..7e2eaf1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteClusterAsyncImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteClusterAsyncImpl.java
@@ -172,6 +172,16 @@ public class IgniteClusterAsyncImpl extends IgniteAsyncSupportAdapter<IgniteClus
     }
 
     /** {@inheritDoc} */
+    @Override public ClusterGroup forDataNodes(String cacheName, @Nullable String... cacheNames) {
+        return grid.forDataNodes(cacheName, cacheNames);
+    }
+
+    /** {@inheritDoc} */
+    @Override public ClusterGroup forClientNodes(String cacheName, @Nullable String... cacheNames) {
+        return grid.forClientNodes(cacheName, cacheNames);
+    }
+
+    /** {@inheritDoc} */
     @Override public ClusterGroup forStreamer(String streamerName, @Nullable String... streamerNames) {
         return grid.forStreamer(streamerName, streamerNames);
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1b9629d0/modules/core/src/test/java/org/apache/ignite/internal/GridProjectionForCachesSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridProjectionForCachesSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridProjectionForCachesSelfTest.java
index 35c8afb..8bcceb1 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/GridProjectionForCachesSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/GridProjectionForCachesSelfTest.java
@@ -53,11 +53,12 @@ public class GridProjectionForCachesSelfTest extends GridCommonAbstractTest {
         cfg.setDiscoverySpi(discoverySpi());
 
         if (gridName.equals(getTestGridName(0)))
-            cfg.setCacheConfiguration(cacheConfiguration(null));
+            cfg.setCacheConfiguration(cacheConfiguration(null, CacheDistributionMode.PARTITIONED_ONLY));
         else if (gridName.equals(getTestGridName(1)))
-            cfg.setCacheConfiguration(cacheConfiguration(CACHE_NAME));
+            cfg.setCacheConfiguration(cacheConfiguration(CACHE_NAME, CacheDistributionMode.NEAR_ONLY));
         else if (gridName.equals(getTestGridName(2)) || gridName.equals(getTestGridName(3)))
-            cfg.setCacheConfiguration(cacheConfiguration(null), cacheConfiguration(CACHE_NAME));
+            cfg.setCacheConfiguration(cacheConfiguration(null, CacheDistributionMode.CLIENT_ONLY),
+                cacheConfiguration(CACHE_NAME, CacheDistributionMode.NEAR_PARTITIONED));
         else
             cfg.setCacheConfiguration();
 
@@ -79,11 +80,14 @@ public class GridProjectionForCachesSelfTest extends GridCommonAbstractTest {
      * @param cacheName Cache name.
      * @return Cache configuration.
      */
-    private CacheConfiguration cacheConfiguration(@Nullable String cacheName) {
+    private CacheConfiguration cacheConfiguration(@Nullable String cacheName, CacheDistributionMode distributionMode) {
         CacheConfiguration cfg = defaultCacheConfiguration();
 
         cfg.setName(cacheName);
         cfg.setCacheMode(PARTITIONED);
+
+        cfg.setDistributionMode(distributionMode);
+
         cfg.setBackups(1);
 
         return cfg;
@@ -153,6 +157,38 @@ public class GridProjectionForCachesSelfTest extends GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
+    public void testProjectionForDataCaches() throws Exception {
+        ClusterGroup prj = ignite.cluster().forDataNodes(null);
+
+        assert prj != null;
+        assert prj.nodes().size() == 1;
+        assert prj.nodes().contains(grid(0).localNode());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testProjectionForClientCaches() throws Exception {
+        ClusterGroup prj = ignite.cluster().forClientNodes(CACHE_NAME);
+
+        assert prj != null;
+        assert prj.nodes().size() == 1;
+        assert prj.nodes().contains(grid(1).localNode());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testProjectionForClientBothCaches() throws Exception {
+        ClusterGroup prj = ignite.cluster().forClientNodes(null, CACHE_NAME);
+
+        assert prj != null;
+        assert prj.nodes().isEmpty();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testProjectionForWrongCacheName() throws Exception {
         ClusterGroup prj = ignite.cluster().forCacheNodes("wrong");
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1b9629d0/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopDefaultMapReducePlannerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopDefaultMapReducePlannerSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopDefaultMapReducePlannerSelfTest.java
index 014f7c5..ca927ec 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopDefaultMapReducePlannerSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopDefaultMapReducePlannerSelfTest.java
@@ -1145,6 +1145,16 @@ public class GridHadoopDefaultMapReducePlannerSelfTest extends GridHadoopAbstrac
         }
 
         /** {@inheritDoc} */
+        @Override public ClusterGroup forDataNodes(String cacheName, @Nullable String... cacheNames) {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public ClusterGroup forClientNodes(String cacheName, @Nullable String... cacheNames) {
+            return null;
+        }
+
+        /** {@inheritDoc} */
         @Override public ClusterGroup forStreamer(String streamerName, @Nullable String... streamerNames) {
             return null;
         }