You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2017/02/13 16:03:29 UTC
[7/7] ignite git commit: IGNITE-4664 - Added lifecycle and injection
support for TopologyValidator. Fixes #1514
IGNITE-4664 - Added lifecycle and injection support for TopologyValidator. Fixes #1514
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3ef7a0e0
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3ef7a0e0
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3ef7a0e0
Branch: refs/heads/ignite-1.9
Commit: 3ef7a0e03b9cb36fb4037cb075512adac95cc3f7
Parents: 262a341
Author: Aleksei Scherbakov <al...@gmail.com>
Authored: Mon Feb 13 14:19:27 2017 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Mon Feb 13 14:19:27 2017 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheProcessor.java | 5 +
.../cache/GridCacheLifecycleAwareSelfTest.java | 33 ++
...niteTopologyValidatorGridSplitCacheTest.java | 334 +++++++++++++++++++
.../IgniteTopologyValidatorTestSuit.java | 1 +
4 files changed, 373 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/3ef7a0e0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index b0a78f4..7093403 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -528,6 +528,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
prepare(cfg, cfg.getAffinityMapper(), false);
prepare(cfg, cfg.getEvictionFilter(), false);
prepare(cfg, cfg.getInterceptor(), false);
+ prepare(cfg, cfg.getTopologyValidator(), false);
NearCacheConfiguration nearCfg = cfg.getNearConfiguration();
@@ -563,6 +564,9 @@ public class GridCacheProcessor extends GridProcessorAdapter {
cleanup(cfg, cfg.getEvictionPolicy(), false);
cleanup(cfg, cfg.getAffinity(), false);
cleanup(cfg, cfg.getAffinityMapper(), false);
+ cleanup(cfg, cfg.getEvictionFilter(), false);
+ cleanup(cfg, cfg.getInterceptor(), false);
+ cleanup(cfg, cfg.getTopologyValidator(), false);
cleanup(cfg, cctx.store().configuredStore(), false);
if (!CU.isUtilityCache(cfg.getName()) && !CU.isSystemCache(cfg.getName())) {
@@ -3561,6 +3565,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
ret.add(ccfg.getEvictionFilter());
ret.add(ccfg.getEvictionPolicy());
ret.add(ccfg.getInterceptor());
+ ret.add(ccfg.getTopologyValidator());
NearCacheConfiguration nearCfg = ccfg.getNearConfiguration();
http://git-wip-us.apache.org/repos/asf/ignite/blob/3ef7a0e0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheLifecycleAwareSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheLifecycleAwareSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheLifecycleAwareSelfTest.java
index 81a6433..aa31ff9 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheLifecycleAwareSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheLifecycleAwareSelfTest.java
@@ -26,6 +26,7 @@ import java.util.Map;
import java.util.UUID;
import javax.cache.Cache;
import javax.cache.integration.CacheLoaderException;
+import org.apache.ignite.Ignite;
import org.apache.ignite.cache.CacheInterceptor;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.affinity.AffinityFunction;
@@ -39,10 +40,12 @@ import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.configuration.TopologyValidator;
import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lifecycle.LifecycleAware;
import org.apache.ignite.resources.CacheNameResource;
+import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.testframework.junits.common.GridAbstractLifecycleAwareSelfTest;
import org.jetbrains.annotations.Nullable;
@@ -256,6 +259,30 @@ public class GridCacheLifecycleAwareSelfTest extends GridAbstractLifecycleAwareS
}
}
+ /**
+ */
+ private static class TestTopologyValidator extends TestLifecycleAware implements TopologyValidator {
+ @IgniteInstanceResource
+ private Ignite ignite;
+
+ /**
+ */
+ public TestTopologyValidator() {
+ super(CACHE_NAME);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean validate(Collection<ClusterNode> nodes) {
+ return false;
+ }
+
+ @Override public void start() {
+ super.start();
+
+ assertNotNull(ignite);
+ }
+ }
+
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override protected final IgniteConfiguration getConfiguration(String gridName) throws Exception {
@@ -324,6 +351,12 @@ public class GridCacheLifecycleAwareSelfTest extends GridAbstractLifecycleAwareS
ccfg.setInterceptor(interceptor);
+ TestTopologyValidator topValidator = new TestTopologyValidator();
+
+ lifecycleAwares.add(topValidator);
+
+ ccfg.setTopologyValidator(topValidator);
+
cfg.setCacheConfiguration(ccfg);
return cfg;
http://git-wip-us.apache.org/repos/asf/ignite/blob/3ef7a0e0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTopologyValidatorGridSplitCacheTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTopologyValidatorGridSplitCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTopologyValidatorGridSplitCacheTest.java
new file mode 100644
index 0000000..3593ad6
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTopologyValidatorGridSplitCacheTest.java
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.Collection;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.TopologyValidator;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.events.EventType;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.lifecycle.LifecycleAware;
+import org.apache.ignite.resources.CacheNameResource;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.resources.LoggerResource;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+
+/**
+ * Tests complex scenario with topology validator.
+ * Grid is split between to data centers, defined by attribute {@link #DC_NODE_ATTR}.
+ * If only nodes from single DC are left in topology, grid is moved into inoperative state until special
+ * activator node'll enter a topology, enabling grid operations.
+ */
+public class IgniteTopologyValidatorGridSplitCacheTest extends GridCommonAbstractTest {
+ /** */
+ private static final String DC_NODE_ATTR = "dc";
+
+ /** */
+ private static final String ACTIVATOR_NODE_ATTR = "split.resolved";
+
+ /** */
+ private static final int GRID_CNT = 4;
+
+ /** */
+ private static final int CACHES_CNT = 10;
+
+ /** */
+ private static final int RESOLVER_GRID_IDX = GRID_CNT;
+
+ /** */
+ private static final int CONFIGLESS_GRID_IDX = GRID_CNT + 1;
+
+ /** */
+ private static CountDownLatch initLatch = new CountDownLatch(GRID_CNT);
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ int idx = getTestGridIndex(gridName);
+
+ cfg.setUserAttributes(F.asMap(DC_NODE_ATTR, idx % 2));
+
+ if (idx != CONFIGLESS_GRID_IDX) {
+ if (idx == RESOLVER_GRID_IDX) {
+ cfg.setClientMode(true);
+
+ cfg.setUserAttributes(F.asMap(ACTIVATOR_NODE_ATTR, "true"));
+ }
+ else {
+ CacheConfiguration[] ccfgs = new CacheConfiguration[CACHES_CNT];
+
+ for (int cnt = 0; cnt < CACHES_CNT; cnt++) {
+ CacheConfiguration ccfg = new CacheConfiguration();
+
+ ccfg.setName(testCacheName(cnt));
+ ccfg.setCacheMode(PARTITIONED);
+ ccfg.setBackups(0);
+ ccfg.setTopologyValidator(new SplitAwareTopologyValidator());
+
+ ccfgs[cnt] = ccfg;
+ }
+
+ cfg.setCacheConfiguration(ccfgs);
+ }
+ }
+
+ return cfg;
+ }
+
+ /**
+ * @param idx Index.
+ */
+ private String testCacheName(int idx) {
+ return "test" + idx;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ startGridsMultiThreaded(GRID_CNT);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ super.afterTestsStopped();
+
+ stopAllGrids();
+ }
+
+ /**
+ * Tests topology split scenario.
+ * @throws Exception
+ */
+ public void testTopologyValidator() throws Exception {
+ assertTrue(initLatch.await(10, TimeUnit.SECONDS));
+
+ // Tests what each node is able to do puts.
+ tryPut(0, 1, 2, 3);
+
+ clearAll();
+
+ stopGrid(1);
+
+ stopGrid(3);
+
+ awaitPartitionMapExchange();
+
+ try {
+ tryPut(0, 2);
+
+ fail();
+ }
+ catch (Exception e) {
+ // No-op.
+ }
+
+ resolveSplit();
+
+ tryPut(0, 2);
+
+ clearAll();
+
+ startGrid(CONFIGLESS_GRID_IDX);
+
+ awaitPartitionMapExchange();
+
+ tryPut(CONFIGLESS_GRID_IDX);
+
+ stopGrid(CONFIGLESS_GRID_IDX);
+
+ awaitPartitionMapExchange();
+
+ try {
+ tryPut(0, 2);
+
+ fail();
+ }
+ catch (Exception e) {
+ // No-op.
+ }
+
+ resolveSplit();
+
+ tryPut(0, 2);
+
+ clearAll();
+
+ startGrid(1);
+
+ awaitPartitionMapExchange();
+
+ tryPut(0, 1, 2);
+ }
+
+ /** */
+ private void clearAll() {
+ for (int i = 0; i < CACHES_CNT; i++)
+ grid(0).cache(testCacheName(i)).clear();
+ }
+
+ /**
+ * Resolves split by client node join.
+ */
+ private void resolveSplit() throws Exception {
+ startGrid(RESOLVER_GRID_IDX);
+
+ stopGrid(RESOLVER_GRID_IDX);
+ }
+
+ /**
+ * @param grids Grids to test.
+ */
+ private void tryPut(int... grids) {
+ for (int i = 0; i < grids.length; i++) {
+ IgniteEx g = grid(grids[i]);
+
+ for (int cnt = 0; cnt < CACHES_CNT; cnt++) {
+ String cacheName = testCacheName(cnt);
+
+ for (int k = 0; k < 100; k++) {
+ if (g.affinity(cacheName).isPrimary(g.localNode(), k)) {
+ log().info("Put " + k + " to node " + g.localNode().id().toString());
+
+ IgniteCache<Object, Object> cache = g.cache(cacheName);
+
+ cache.put(k, k);
+
+ assertEquals(1, cache.localSize());
+
+ break;
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Prevents cache from performing any operation if only nodes from single data center are left in topology.
+ */
+ private static class SplitAwareTopologyValidator implements TopologyValidator, LifecycleAware {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ @CacheNameResource
+ private String cacheName;
+
+ @IgniteInstanceResource
+ private Ignite ignite;
+
+ @LoggerResource
+ private IgniteLogger log;
+
+ /** */
+ private transient volatile long activatorTopVer;
+
+ /** {@inheritDoc} */
+ @Override public boolean validate(Collection<ClusterNode> nodes) {
+ if (!F.view(nodes, new IgnitePredicate<ClusterNode>() {
+ @Override public boolean apply(ClusterNode node) {
+ return !node.isClient() && node.attribute(DC_NODE_ATTR) == null;
+ }
+ }).isEmpty())
+ return false;
+
+ IgniteKernal kernal = (IgniteKernal)ignite.cache(cacheName).unwrap(Ignite.class);
+
+ GridDhtCacheAdapter<Object, Object> dht = kernal.context().cache().internalCache(cacheName).context().dht();
+
+ long cacheTopVer = dht.topology().topologyVersionFuture().topologyVersion().topologyVersion();
+
+ if (hasSplit(nodes)) {
+ boolean resolved = activatorTopVer != 0 && cacheTopVer >= activatorTopVer;
+
+ if (!resolved)
+ log.info("Grid segmentation is detected, switching to inoperative state.");
+
+ return resolved;
+ }
+ else
+ activatorTopVer = 0;
+
+ return true;
+ }
+
+ /** */
+ private boolean hasSplit(Collection<ClusterNode> nodes) {
+ ClusterNode prev = null;
+
+ for (ClusterNode node : nodes) {
+ if (node.isClient())
+ continue;
+
+ if (prev != null &&
+ !prev.attribute(DC_NODE_ATTR).equals(node.attribute(DC_NODE_ATTR)))
+ return false;
+
+ prev = node;
+ }
+
+ return true;
+ }
+
+ @Override public void start() throws IgniteException {
+ if (ignite.cluster().localNode().isClient())
+ return;
+
+ initLatch.countDown();
+
+ ignite.events().localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ DiscoveryEvent discoEvt = (DiscoveryEvent)evt;
+
+ ClusterNode node = discoEvt.eventNode();
+
+ if (isMarkerNode(node))
+ activatorTopVer = discoEvt.topologyVersion();
+
+ return true;
+ }
+ }, EventType.EVT_NODE_LEFT);
+ }
+
+ /**
+ * @param node Node.
+ */
+ private boolean isMarkerNode(ClusterNode node) {
+ return node.isClient() && node.attribute(ACTIVATOR_NODE_ATTR) != null;
+ }
+
+ @Override public void stop() throws IgniteException {
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/3ef7a0e0/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteTopologyValidatorTestSuit.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteTopologyValidatorTestSuit.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteTopologyValidatorTestSuit.java
index b100127..8c4cd11 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteTopologyValidatorTestSuit.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteTopologyValidatorTestSuit.java
@@ -37,6 +37,7 @@ public class IgniteTopologyValidatorTestSuit extends TestSuite {
suite.addTest(new TestSuite(IgniteTopologyValidatorPartitionedTxCacheTest.class));
suite.addTest(new TestSuite(IgniteTopologyValidatorReplicatedAtomicCacheTest.class));
suite.addTest(new TestSuite(IgniteTopologyValidatorReplicatedTxCacheTest.class));
+ suite.addTest(new TestSuite(IgniteTopologyValidatorGridSplitCacheTest.class));
return suite;
}