You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by yz...@apache.org on 2016/12/28 14:04:47 UTC
[21/50] [abbrv] ignite git commit: IGNITE-4439 - Attribute based node
filter
IGNITE-4439 - Attribute based node filter
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2591c160
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2591c160
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2591c160
Branch: refs/heads/ignite-comm-balance-master
Commit: 2591c160efc4251cb33854955970c93ec20d6b24
Parents: 7094c0f
Author: Valentin Kulichenko <va...@gmail.com>
Authored: Thu Dec 22 13:05:35 2016 -0800
Committer: Valentin Kulichenko <va...@gmail.com>
Committed: Thu Dec 22 13:05:35 2016 -0800
----------------------------------------------------------------------
.../apache/ignite/util/AttributeNodeFilter.java | 105 +++++++++++
.../ignite/testsuites/IgniteBasicTestSuite.java | 3 +
.../util/AttributeNodeFilterSelfTest.java | 184 +++++++++++++++++++
3 files changed, 292 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/2591c160/modules/core/src/main/java/org/apache/ignite/util/AttributeNodeFilter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/util/AttributeNodeFilter.java b/modules/core/src/main/java/org/apache/ignite/util/AttributeNodeFilter.java
new file mode 100644
index 0000000..e2b972b
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/util/AttributeNodeFilter.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.util;
+
+import java.util.Collections;
+import java.util.Map;
+import org.apache.ignite.cluster.ClusterGroup;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.services.ServiceConfiguration;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Implementation of {@code IgnitePredicate<ClusterNode>} based on
+ * {@link IgniteConfiguration#getUserAttributes() user attributes}.
+ * This filter can be used in methods like {@link ClusterGroup#forPredicate(IgnitePredicate)},
+ * {@link CacheConfiguration#setNodeFilter(IgnitePredicate)},
+ * {@link ServiceConfiguration#setNodeFilter(IgnitePredicate)}, etc.
+ * <p>
+ * The filter will evaluate to true if a node has <b>all</b> provided attributes set to
+ * corresponding values. Here is an example of how you can configure node filter for a
+ * cache or a service so that it's deployed only on nodes that have {@code group}
+ * attribute set to value {@code data}:
+ * <pre name="code" class="xml">
+ * <property name="nodeFilter">
+ * <bean class="org.apache.ignite.util.ClusterAttributeNodeFilter">
+ * <constructor-arg value="group"/>
+ * <constructor-arg value="data"/>
+ * </bean>
+ * </property>
+ * </pre>
+ * You can also specify multiple attributes for the filter:
+ * <pre name="code" class="xml">
+ * <property name="nodeFilter">
+ * <bean class="org.apache.ignite.util.ClusterAttributeNodeFilter">
+ * <constructor-arg>
+ * <map>
+ * <entry key="cpu-group" value="high"/>
+ * <entry key="memory-group" value="high"/>
+ * </map>
+ * </constructor-arg>
+ * </bean>
+ * </property>
+ * </pre>
+ * With this configuration a cache or a service will deploy only on nodes that have both
+ * {@code cpu-group} and {@code memory-group} attributes set to value {@code high}.
+ */
+public class AttributeNodeFilter implements IgnitePredicate<ClusterNode> {
+ /** Attributes. */
+ private final Map<String, Object> attrs;
+
+ /**
+ * Creates new node filter with a single attribute value.
+ *
+ * @param attrName Attribute name.
+ * @param attrVal Attribute value.
+ */
+ public AttributeNodeFilter(String attrName, @Nullable Object attrVal) {
+ A.notNull(attrName, "attrName");
+
+ attrs = Collections.singletonMap(attrName, attrVal);
+ }
+
+ /**
+ * Creates new node filter with a set of attributes.
+ *
+ * @param attrs Attributes.
+ */
+ public AttributeNodeFilter(Map<String, Object> attrs) {
+ A.notNull(attrs, "attrs");
+
+ this.attrs = attrs;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean apply(ClusterNode node) {
+ Map<String, Object> nodeAttrs = node.attributes();
+
+ for (Map.Entry<String, Object> attr : attrs.entrySet()) {
+ if (!F.eq(nodeAttrs.get(attr.getKey()), attr.getValue()))
+ return false;
+ }
+
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2591c160/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
index c6281df..8ccec34 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
@@ -20,6 +20,7 @@ package org.apache.ignite.testsuites;
import java.util.Set;
import junit.framework.TestSuite;
import org.apache.ignite.GridSuppressedExceptionSelfTest;
+import org.apache.ignite.util.AttributeNodeFilterSelfTest;
import org.apache.ignite.internal.ClusterGroupHostsSelfTest;
import org.apache.ignite.internal.ClusterGroupSelfTest;
import org.apache.ignite.internal.GridFailFastNodeFailureDetectionSelfTest;
@@ -149,6 +150,8 @@ public class IgniteBasicTestSuite extends TestSuite {
suite.addTestSuite(SecurityPermissionSetBuilderTest.class);
+ suite.addTestSuite(AttributeNodeFilterSelfTest.class);
+
return suite;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2591c160/modules/core/src/test/java/org/apache/ignite/util/AttributeNodeFilterSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/util/AttributeNodeFilterSelfTest.java b/modules/core/src/test/java/org/apache/ignite/util/AttributeNodeFilterSelfTest.java
new file mode 100644
index 0000000..ac3800f
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/util/AttributeNodeFilterSelfTest.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.util;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.Collections;
+import java.util.Map;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ * Tests for {@link AttributeNodeFilter}.
+ */
+public class AttributeNodeFilterSelfTest extends GridCommonAbstractTest {
+ /** */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private Map<String, ?> attrs;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(IP_FINDER));
+
+ if (attrs != null)
+ cfg.setUserAttributes(attrs);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ attrs = null;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testSingleAttribute() throws Exception {
+ IgnitePredicate<ClusterNode> filter = new AttributeNodeFilter("attr", "value");
+
+ assertTrue(filter.apply(nodeProxy(F.asMap("attr", "value"))));
+ assertFalse(filter.apply(nodeProxy(F.asMap("attr", "wrong"))));
+ assertFalse(filter.apply(nodeProxy(F.asMap("attr", null))));
+ assertFalse(filter.apply(nodeProxy(Collections.<String, Object>emptyMap())));
+ assertFalse(filter.apply(nodeProxy(F.asMap("wrong", "value"))));
+ assertFalse(filter.apply(nodeProxy(F.asMap("null", "value"))));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testSingleAttributeNullValue() throws Exception {
+ IgnitePredicate<ClusterNode> filter = new AttributeNodeFilter("attr", null);
+
+ assertTrue(filter.apply(nodeProxy(F.asMap("attr", null))));
+ assertTrue(filter.apply(nodeProxy(Collections.<String, Object>emptyMap())));
+ assertTrue(filter.apply(nodeProxy(F.asMap("wrong", "value"))));
+ assertTrue(filter.apply(nodeProxy(F.asMap("wrong", null))));
+ assertFalse(filter.apply(nodeProxy(F.asMap("attr", "value"))));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testMultipleAttributes() throws Exception {
+ IgnitePredicate<ClusterNode> filter =
+ new AttributeNodeFilter(F.<String, Object>asMap("attr1", "value1", "attr2", "value2"));
+
+ assertTrue(filter.apply(nodeProxy(F.asMap("attr1", "value1", "attr2", "value2"))));
+ assertFalse(filter.apply(nodeProxy(F.asMap("attr1", "wrong", "attr2", "value2"))));
+ assertFalse(filter.apply(nodeProxy(F.asMap("attr1", "value1", "attr2", "wrong"))));
+ assertFalse(filter.apply(nodeProxy(F.asMap("attr1", "wrong", "attr2", "wrong"))));
+ assertFalse(filter.apply(nodeProxy(F.asMap("attr1", "value1"))));
+ assertFalse(filter.apply(nodeProxy(F.asMap("attr2", "value2"))));
+ assertFalse(filter.apply(nodeProxy(Collections.<String, Object>emptyMap())));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testMultipleAttributesNullValues() throws Exception {
+ IgnitePredicate<ClusterNode> filter = new AttributeNodeFilter(F.asMap("attr1", null, "attr2", null));
+
+ assertTrue(filter.apply(nodeProxy(F.asMap("attr1", null, "attr2", null))));
+ assertTrue(filter.apply(nodeProxy(F.asMap("attr1", null))));
+ assertTrue(filter.apply(nodeProxy(F.asMap("attr2", null))));
+ assertTrue(filter.apply(nodeProxy(Collections.<String, Object>emptyMap())));
+ assertFalse(filter.apply(nodeProxy(F.asMap("attr1", "value1"))));
+ assertFalse(filter.apply(nodeProxy(F.asMap("attr2", "value2"))));
+ assertFalse(filter.apply(nodeProxy(F.asMap("attr1", "value1", "attr2", "value2"))));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testClusterGroup() throws Exception {
+ Ignite group1 = startGridsMultiThreaded(3);
+
+ attrs = F.asMap("group", "data");
+
+ Ignite group2 = startGridsMultiThreaded(3, 2);
+
+ assertEquals(2, group1.cluster().forPredicate(new AttributeNodeFilter("group", "data")).nodes().size());
+ assertEquals(2, group2.cluster().forPredicate(new AttributeNodeFilter("group", "data")).nodes().size());
+
+ assertEquals(3, group1.cluster().forPredicate(new AttributeNodeFilter("group", null)).nodes().size());
+ assertEquals(3, group2.cluster().forPredicate(new AttributeNodeFilter("group", null)).nodes().size());
+
+ assertEquals(0, group1.cluster().forPredicate(new AttributeNodeFilter("group", "wrong")).nodes().size());
+ assertEquals(0, group2.cluster().forPredicate(new AttributeNodeFilter("group", "wrong")).nodes().size());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testCacheFilter() throws Exception {
+ Ignite group1 = startGridsMultiThreaded(3);
+
+ attrs = F.asMap("group", "data");
+
+ Ignite group2 = startGridsMultiThreaded(3, 2);
+
+ group1.createCache(new CacheConfiguration<>("test-cache").
+ setNodeFilter(new AttributeNodeFilter("group", "data")));
+
+ assertEquals(2, group1.cluster().forDataNodes("test-cache").nodes().size());
+ assertEquals(2, group2.cluster().forDataNodes("test-cache").nodes().size());
+
+ assertEquals(0, group1.cluster().forDataNodes("wrong").nodes().size());
+ assertEquals(0, group2.cluster().forDataNodes("wrong").nodes().size());
+ }
+
+ /**
+ * @param attrs Attributes.
+ * @return Node proxy.
+ */
+ private static ClusterNode nodeProxy(final Map<String, ?> attrs) {
+ return (ClusterNode)Proxy.newProxyInstance(
+ ClusterNode.class.getClassLoader(),
+ new Class[] { ClusterNode.class },
+ new InvocationHandler() {
+ @SuppressWarnings("SuspiciousMethodCalls")
+ @Override public Object invoke(Object proxy, Method mtd, Object[] args) throws Throwable {
+ if ("attributes".equals(mtd.getName()))
+ return attrs;
+
+ throw new UnsupportedOperationException();
+ }
+ });
+ }
+}