You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2015/02/02 11:52:41 UTC
[2/2] incubator-ignite git commit: IGNITE-158 implement
ClusterGroup#forClientNodes() and ClusterGroup#forClientNodes().
IGNITE-158 implement ClusterGroup#forClientNodes() and ClusterGroup#forClientNodes().
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/1b9629d0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/1b9629d0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/1b9629d0
Branch: refs/heads/ignite-158
Commit: 1b9629d07ef7f2a7dd7a43ae41296e79a9a9612b
Parents: f04e0ec
Author: sevdokimov <se...@gridgain.com>
Authored: Mon Feb 2 13:51:34 2015 +0300
Committer: sevdokimov <se...@gridgain.com>
Committed: Mon Feb 2 13:51:34 2015 +0300
----------------------------------------------------------------------
.../org/apache/ignite/cluster/ClusterGroup.java | 23 ++++++++
.../ignite/internal/ClusterGroupAdapter.java | 55 ++++++++++++++++++--
.../ignite/internal/IgniteClusterAsyncImpl.java | 10 ++++
.../GridProjectionForCachesSelfTest.java | 44 ++++++++++++++--
...idHadoopDefaultMapReducePlannerSelfTest.java | 10 ++++
5 files changed, 134 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1b9629d0/modules/core/src/main/java/org/apache/ignite/cluster/ClusterGroup.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cluster/ClusterGroup.java b/modules/core/src/main/java/org/apache/ignite/cluster/ClusterGroup.java
index 9f2f435..6267dbe 100644
--- a/modules/core/src/main/java/org/apache/ignite/cluster/ClusterGroup.java
+++ b/modules/core/src/main/java/org/apache/ignite/cluster/ClusterGroup.java
@@ -18,6 +18,7 @@
package org.apache.ignite.cluster;
import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
import org.apache.ignite.lang.*;
import org.jetbrains.annotations.*;
@@ -148,6 +149,28 @@ public interface ClusterGroup {
public ClusterGroup forCacheNodes(String cacheName, @Nullable String... cacheNames);
/**
+ * Creates projection for all nodes that have cache with specified name running and cache distribution mode is
+ * {@link CacheDistributionMode#PARTITIONED_ONLY} or {@link CacheDistributionMode#NEAR_PARTITIONED}.
+ *
+ * @param cacheName Cache name.
+ * @param cacheNames Optional additional cache names to include into projection.
+ * @return Projection over nodes that have specified cache running.
+ * @see CacheConfiguration#getDistributionMode()
+ */
+ public ClusterGroup forDataNodes(String cacheName, @Nullable String... cacheNames);
+
+ /**
+ * Creates projection for all nodes that have cache with specified name running and cache distribution mode is
+ * {@link CacheDistributionMode#CLIENT_ONLY} or {@link CacheDistributionMode#NEAR_ONLY}.
+ *
+ * @param cacheName Cache name.
+ * @param cacheNames Optional additional cache names to include into projection.
+ * @return Projection over nodes that have specified cache running.
+ * @see CacheConfiguration#getDistributionMode()
+ */
+ public ClusterGroup forClientNodes(String cacheName, @Nullable String... cacheNames);
+
+ /**
* Creates projection for all nodes that have streamer with specified name running.
*
* @param streamerName Streamer name.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1b9629d0/modules/core/src/main/java/org/apache/ignite/internal/ClusterGroupAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/ClusterGroupAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/ClusterGroupAdapter.java
index 7dff797..fafb23c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/ClusterGroupAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/ClusterGroupAdapter.java
@@ -18,7 +18,9 @@
package org.apache.ignite.internal;
import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
import org.apache.ignite.cluster.*;
+import org.apache.ignite.internal.processors.cache.*;
import org.apache.ignite.lang.*;
import org.apache.ignite.internal.executor.*;
import org.apache.ignite.internal.util.typedef.*;
@@ -525,7 +527,17 @@ public class ClusterGroupAdapter implements ClusterGroupEx, Externalizable {
/** {@inheritDoc} */
@Override public final ClusterGroup forCacheNodes(@Nullable String cacheName, @Nullable String... cacheNames) {
- return forPredicate(new CachesFilter(cacheName, cacheNames));
+ return forPredicate(new CachesFilter(cacheName, cacheNames, null));
+ }
+
+ /** {@inheritDoc} */
+ @Override public final ClusterGroup forDataNodes(@Nullable String cacheName, @Nullable String... cacheNames) {
+ return forPredicate(new CachesFilter(cacheName, cacheNames, CachesFilter.DATA_MODES));
+ }
+
+ /** {@inheritDoc} */
+ @Override public final ClusterGroup forClientNodes(@Nullable String cacheName, @Nullable String... cacheNames) {
+ return forPredicate(new CachesFilter(cacheName, cacheNames, CachesFilter.CLIENT_MODES));
}
/** {@inheritDoc} */
@@ -652,6 +664,14 @@ public class ClusterGroupAdapter implements ClusterGroupEx, Externalizable {
*/
private static class CachesFilter implements IgnitePredicate<ClusterNode> {
/** */
+ private static final Set<CacheDistributionMode> DATA_MODES = EnumSet.of(CacheDistributionMode.NEAR_PARTITIONED,
+ CacheDistributionMode.PARTITIONED_ONLY);
+
+ /** */
+ private static final Set<CacheDistributionMode> CLIENT_MODES = EnumSet.of(CacheDistributionMode.CLIENT_ONLY,
+ CacheDistributionMode.NEAR_ONLY);
+
+ /** */
private static final long serialVersionUID = 0L;
/** Cache name. */
@@ -660,27 +680,54 @@ public class ClusterGroupAdapter implements ClusterGroupEx, Externalizable {
/** Cache names. */
private final String[] cacheNames;
+ /** */
+ private final Set<CacheDistributionMode> distributionMode;
+
/**
* @param cacheName Cache name.
* @param cacheNames Cache names.
+ * @param distributionMode Filter by {@link CacheConfiguration#getDistributionMode()}.
*/
- private CachesFilter(@Nullable String cacheName, @Nullable String[] cacheNames) {
+ private CachesFilter(@Nullable String cacheName, @Nullable String[] cacheNames,
+ @Nullable Set<CacheDistributionMode> distributionMode) {
this.cacheName = cacheName;
this.cacheNames = cacheNames;
+ this.distributionMode = distributionMode;
}
/** {@inheritDoc} */
@Override public boolean apply(ClusterNode n) {
- if (!U.hasCache(n, cacheName))
+ GridCacheAttributes[] caches = n.attribute(ATTR_CACHE);
+
+ if (caches == null)
+ return false;
+
+ if (!hasCache(caches, cacheName, distributionMode))
return false;
if (!F.isEmpty(cacheNames))
for (String cn : cacheNames)
- if (!U.hasCache(n, cn))
+ if (!hasCache(caches, cn, distributionMode))
return false;
return true;
}
+
+ /**
+ * @param cacheName Cache name to check.
+ * @param distributionMode Filter by {@link CacheConfiguration#getDistributionMode()}.
+ * @return {@code true} if given node has specified cache started.
+ */
+ public static boolean hasCache(GridCacheAttributes[] caches, @Nullable String cacheName,
+ @Nullable Collection<CacheDistributionMode> distributionMode) {
+ for (GridCacheAttributes attrs : caches) {
+ if (Objects.equals(cacheName, attrs.cacheName())
+ && (distributionMode == null || distributionMode.contains(attrs.partitionedTaxonomy())))
+ return true;
+ }
+
+ return false;
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1b9629d0/modules/core/src/main/java/org/apache/ignite/internal/IgniteClusterAsyncImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteClusterAsyncImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteClusterAsyncImpl.java
index ea148ff..7e2eaf1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteClusterAsyncImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteClusterAsyncImpl.java
@@ -172,6 +172,16 @@ public class IgniteClusterAsyncImpl extends IgniteAsyncSupportAdapter<IgniteClus
}
/** {@inheritDoc} */
+ @Override public ClusterGroup forDataNodes(String cacheName, @Nullable String... cacheNames) {
+ return grid.forDataNodes(cacheName, cacheNames);
+ }
+
+ /** {@inheritDoc} */
+ @Override public ClusterGroup forClientNodes(String cacheName, @Nullable String... cacheNames) {
+ return grid.forClientNodes(cacheName, cacheNames);
+ }
+
+ /** {@inheritDoc} */
@Override public ClusterGroup forStreamer(String streamerName, @Nullable String... streamerNames) {
return grid.forStreamer(streamerName, streamerNames);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1b9629d0/modules/core/src/test/java/org/apache/ignite/internal/GridProjectionForCachesSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridProjectionForCachesSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridProjectionForCachesSelfTest.java
index 35c8afb..8bcceb1 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/GridProjectionForCachesSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/GridProjectionForCachesSelfTest.java
@@ -53,11 +53,12 @@ public class GridProjectionForCachesSelfTest extends GridCommonAbstractTest {
cfg.setDiscoverySpi(discoverySpi());
if (gridName.equals(getTestGridName(0)))
- cfg.setCacheConfiguration(cacheConfiguration(null));
+ cfg.setCacheConfiguration(cacheConfiguration(null, CacheDistributionMode.PARTITIONED_ONLY));
else if (gridName.equals(getTestGridName(1)))
- cfg.setCacheConfiguration(cacheConfiguration(CACHE_NAME));
+ cfg.setCacheConfiguration(cacheConfiguration(CACHE_NAME, CacheDistributionMode.NEAR_ONLY));
else if (gridName.equals(getTestGridName(2)) || gridName.equals(getTestGridName(3)))
- cfg.setCacheConfiguration(cacheConfiguration(null), cacheConfiguration(CACHE_NAME));
+ cfg.setCacheConfiguration(cacheConfiguration(null, CacheDistributionMode.CLIENT_ONLY),
+ cacheConfiguration(CACHE_NAME, CacheDistributionMode.NEAR_PARTITIONED));
else
cfg.setCacheConfiguration();
@@ -79,11 +80,14 @@ public class GridProjectionForCachesSelfTest extends GridCommonAbstractTest {
* @param cacheName Cache name.
* @return Cache configuration.
*/
- private CacheConfiguration cacheConfiguration(@Nullable String cacheName) {
+ private CacheConfiguration cacheConfiguration(@Nullable String cacheName, CacheDistributionMode distributionMode) {
CacheConfiguration cfg = defaultCacheConfiguration();
cfg.setName(cacheName);
cfg.setCacheMode(PARTITIONED);
+
+ cfg.setDistributionMode(distributionMode);
+
cfg.setBackups(1);
return cfg;
@@ -153,6 +157,38 @@ public class GridProjectionForCachesSelfTest extends GridCommonAbstractTest {
/**
* @throws Exception If failed.
*/
+ public void testProjectionForDataCaches() throws Exception {
+ ClusterGroup prj = ignite.cluster().forDataNodes(null);
+
+ assert prj != null;
+ assert prj.nodes().size() == 1;
+ assert prj.nodes().contains(grid(0).localNode());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testProjectionForClientCaches() throws Exception {
+ ClusterGroup prj = ignite.cluster().forClientNodes(CACHE_NAME);
+
+ assert prj != null;
+ assert prj.nodes().size() == 1;
+ assert prj.nodes().contains(grid(1).localNode());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testProjectionForClientBothCaches() throws Exception {
+ ClusterGroup prj = ignite.cluster().forClientNodes(null, CACHE_NAME);
+
+ assert prj != null;
+ assert prj.nodes().isEmpty();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testProjectionForWrongCacheName() throws Exception {
ClusterGroup prj = ignite.cluster().forCacheNodes("wrong");
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1b9629d0/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopDefaultMapReducePlannerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopDefaultMapReducePlannerSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopDefaultMapReducePlannerSelfTest.java
index 014f7c5..ca927ec 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopDefaultMapReducePlannerSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopDefaultMapReducePlannerSelfTest.java
@@ -1145,6 +1145,16 @@ public class GridHadoopDefaultMapReducePlannerSelfTest extends GridHadoopAbstrac
}
/** {@inheritDoc} */
+ @Override public ClusterGroup forDataNodes(String cacheName, @Nullable String... cacheNames) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public ClusterGroup forClientNodes(String cacheName, @Nullable String... cacheNames) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
@Override public ClusterGroup forStreamer(String streamerName, @Nullable String... streamerNames) {
return null;
}