You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/12/29 09:27:18 UTC
[3/4] ignite git commit: zk
http://git-wip-us.apache.org/repos/asf/ignite/blob/26ffa0db/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
deleted file mode 100644
index 7c2f642..0000000
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
+++ /dev/null
@@ -1,4247 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.spi.discovery.zk.internal;
-
-import java.io.File;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.BitSet;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.UUID;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.CyclicBarrier;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-import org.apache.curator.test.InstanceSpec;
-import org.apache.curator.test.TestingCluster;
-import org.apache.curator.test.TestingZooKeeperServer;
-import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteCache;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.configuration.CommunicationProblemContext;
-import org.apache.ignite.configuration.CommunicationProblemResolver;
-import org.apache.ignite.configuration.DataRegionConfiguration;
-import org.apache.ignite.configuration.DataStorageConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.configuration.WALMode;
-import org.apache.ignite.events.DiscoveryEvent;
-import org.apache.ignite.events.Event;
-import org.apache.ignite.events.EventType;
-import org.apache.ignite.internal.DiscoverySpiTestListener;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.IgniteKernal;
-import org.apache.ignite.internal.IgnitionEx;
-import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
-import org.apache.ignite.internal.managers.discovery.CustomEventListener;
-import org.apache.ignite.internal.managers.discovery.DiscoCache;
-import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
-import org.apache.ignite.internal.managers.discovery.DiscoveryLocalJoinData;
-import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
-import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi;
-import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpiInternalListener;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.GridCacheAbstractFullApiSelfTest;
-import org.apache.ignite.internal.processors.security.SecurityContext;
-import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl;
-import org.apache.ignite.internal.util.lang.GridAbsPredicate;
-import org.apache.ignite.internal.util.typedef.G;
-import org.apache.ignite.internal.util.typedef.T3;
-import org.apache.ignite.internal.util.typedef.X;
-import org.apache.ignite.internal.util.typedef.internal.CU;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteCallable;
-import org.apache.ignite.lang.IgniteFuture;
-import org.apache.ignite.lang.IgniteInClosure;
-import org.apache.ignite.lang.IgniteOutClosure;
-import org.apache.ignite.lang.IgnitePredicate;
-import org.apache.ignite.lang.IgniteUuid;
-import org.apache.ignite.logger.java.JavaLogger;
-import org.apache.ignite.marshaller.jdk.JdkMarshaller;
-import org.apache.ignite.plugin.security.SecurityCredentials;
-import org.apache.ignite.plugin.security.SecurityPermission;
-import org.apache.ignite.plugin.security.SecuritySubject;
-import org.apache.ignite.resources.IgniteInstanceResource;
-import org.apache.ignite.resources.LoggerResource;
-import org.apache.ignite.spi.IgniteSpiException;
-import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
-import org.apache.ignite.spi.discovery.DiscoverySpi;
-import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
-import org.apache.ignite.spi.discovery.DiscoverySpiNodeAuthenticator;
-import org.apache.ignite.spi.discovery.zk.ZookeeperDiscoverySpi;
-import org.apache.ignite.testframework.GridTestUtils;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-import org.apache.zookeeper.ZKUtil;
-import org.apache.zookeeper.ZkTestClientCnxnSocketNIO;
-import org.apache.zookeeper.ZooKeeper;
-import org.jetbrains.annotations.Nullable;
-
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
-import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_DISCONNECTED;
-import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_RECONNECTED;
-import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
-import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
-import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
-import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME;
-import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS;
-import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_SECURITY_SUBJECT_V2;
-import static org.apache.ignite.spi.discovery.zk.internal.ZookeeperDiscoveryImpl.IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_THRESHOLD;
-import static org.apache.zookeeper.ZooKeeper.ZOOKEEPER_CLIENT_CNXN_SOCKET;
-
-/**
- * TODO ZK: test with max client connections limit error.
- */
-@SuppressWarnings("deprecation")
-public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest {
- /** */
- private static final String IGNITE_ZK_ROOT = ZookeeperDiscoverySpi.DFLT_ROOT_PATH;
-
- /** */
- private static final int ZK_SRVS = 3;
-
- /** */
- private static TestingCluster zkCluster;
-
- /** */
- private static final boolean USE_TEST_CLUSTER = true;
-
- /** */
- private boolean client;
-
- /** */
- private static ThreadLocal<Boolean> clientThreadLoc = new ThreadLocal<>();
-
- /** */
- private static ConcurrentHashMap<UUID, Map<Long, DiscoveryEvent>> evts = new ConcurrentHashMap<>();
-
- /** */
- private static volatile boolean err;
-
- /** */
- private boolean testSockNio;
-
- /** */
- private boolean testCommSpi;
-
- /** */
- private long sesTimeout;
-
- /** */
- private long joinTimeout;
-
- /** */
- private boolean clientReconnectDisabled;
-
- /** */
- private ConcurrentHashMap<String, ZookeeperDiscoverySpi> spis = new ConcurrentHashMap<>();
-
- /** */
- private Map<String, Object> userAttrs;
-
- /** */
- private boolean dfltConsistenId;
-
- /** */
- private UUID nodeId;
-
- /** */
- private boolean persistence;
-
- /** */
- private IgniteOutClosure<CommunicationProblemResolver> commProblemRslvr;
-
- /** */
- private IgniteOutClosure<DiscoverySpiNodeAuthenticator> auth;
-
- /** */
- private String zkRootPath;
-
- /** {@inheritDoc} */
- @Override protected IgniteConfiguration getConfiguration(final String igniteInstanceName) throws Exception {
- if (testSockNio)
- System.setProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET, ZkTestClientCnxnSocketNIO.class.getName());
-
- IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
-
- if (nodeId != null)
- cfg.setNodeId(nodeId);
-
- if (!dfltConsistenId)
- cfg.setConsistentId(igniteInstanceName);
-
- ZookeeperDiscoverySpi zkSpi = new ZookeeperDiscoverySpi();
-
- if (joinTimeout != 0)
- zkSpi.setJoinTimeout(joinTimeout);
-
- zkSpi.setSessionTimeout(sesTimeout > 0 ? sesTimeout : 10_000);
-
- zkSpi.setClientReconnectDisabled(clientReconnectDisabled);
-
- // Set authenticator for basic sanity tests.
- if (auth != null) {
- zkSpi.setAuthenticator(auth.apply());
-
- zkSpi.setInternalListener(new IgniteDiscoverySpiInternalListener() {
- @Override public void beforeJoin(ClusterNode locNode, IgniteLogger log) {
- ZookeeperClusterNode locNode0 = (ZookeeperClusterNode)locNode;
-
- Map<String, Object> attrs = new HashMap<>(locNode0.getAttributes());
-
- attrs.put(ATTR_SECURITY_CREDENTIALS, new SecurityCredentials(null, null, igniteInstanceName));
-
- locNode0.setAttributes(attrs);
- }
-
- @Override public boolean beforeSendCustomEvent(DiscoverySpi spi, IgniteLogger log, DiscoverySpiCustomMessage msg) {
- return false;
- }
- });
- }
-
- spis.put(igniteInstanceName, zkSpi);
-
- if (USE_TEST_CLUSTER) {
- assert zkCluster != null;
-
- zkSpi.setZkConnectionString(zkCluster.getConnectString());
-
- if (zkRootPath != null)
- zkSpi.setZkRootPath(zkRootPath);
- }
- else
- zkSpi.setZkConnectionString("localhost:2181");
-
- cfg.setDiscoverySpi(zkSpi);
-
- CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME);
-
- ccfg.setWriteSynchronizationMode(FULL_SYNC);
-
- cfg.setCacheConfiguration(ccfg);
-
- Boolean clientMode = clientThreadLoc.get();
-
- if (clientMode != null)
- cfg.setClientMode(clientMode);
- else
- cfg.setClientMode(client);
-
- if (userAttrs != null)
- cfg.setUserAttributes(userAttrs);
-
- Map<IgnitePredicate<? extends Event>, int[]> lsnrs = new HashMap<>();
-
- lsnrs.put(new IgnitePredicate<Event>() {
- /** */
- @IgniteInstanceResource
- private Ignite ignite;
-
- @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
- @Override public boolean apply(Event evt) {
- try {
- DiscoveryEvent discoveryEvt = (DiscoveryEvent)evt;
-
- UUID locId = ((IgniteKernal)ignite).context().localNodeId();
-
- Map<Long, DiscoveryEvent> nodeEvts = evts.get(locId);
-
- if (nodeEvts == null) {
- Object old = evts.put(locId, nodeEvts = new TreeMap<>());
-
- assertNull(old);
-
- synchronized (nodeEvts) {
- DiscoveryLocalJoinData locJoin = ((IgniteKernal)ignite).context().discovery().localJoin();
-
- nodeEvts.put(locJoin.event().topologyVersion(), locJoin.event());
- }
- }
-
- synchronized (nodeEvts) {
- DiscoveryEvent old = nodeEvts.put(discoveryEvt.topologyVersion(), discoveryEvt);
-
- assertNull(old);
- }
- }
- catch (Throwable e) {
- error("Unexpected error [evt=" + evt + ", err=" + e + ']', e);
-
- err = true;
- }
-
- return true;
- }
- }, new int[]{EVT_NODE_JOINED, EVT_NODE_FAILED, EVT_NODE_LEFT});
-
- cfg.setLocalEventListeners(lsnrs);
-
- if (persistence) {
- DataStorageConfiguration memCfg = new DataStorageConfiguration()
- .setDefaultDataRegionConfiguration(new DataRegionConfiguration().setMaxSize(100 * 1024 * 1024).
- setPersistenceEnabled(true))
- .setPageSize(1024)
- .setWalMode(WALMode.LOG_ONLY);
-
- cfg.setDataStorageConfiguration(memCfg);
- }
-
- if (testCommSpi)
- cfg.setCommunicationSpi(new ZkTestCommunicationSpi());
-
- if (commProblemRslvr != null)
- cfg.setCommunicationProblemResolver(commProblemRslvr.apply());
-
- return cfg;
- }
-
- /**
- * @param clientMode Client mode flag for started nodes.
- */
- private void clientMode(boolean clientMode) {
- client = clientMode;
- }
-
- /**
- * @param clientMode Client mode flag for nodes started from current thread.
- */
- private void clientModeThreadLocal(boolean clientMode) {
- clientThreadLoc.set(clientMode);
- }
-
- /** {@inheritDoc} */
- @Override protected void beforeTestsStarted() throws Exception {
- super.beforeTestsStarted();
-
- IgnitionEx.TEST_ZK = false;
- }
-
- /**
- * @param instances Number of instances.
- * @return Cluster.
- */
- private static TestingCluster createTestingCluster(int instances) {
- String tmpDir = System.getProperty("java.io.tmpdir");
-
- List<InstanceSpec> specs = new ArrayList<>();
-
- for (int i = 0; i < instances; i++) {
- File file = new File(tmpDir, "apacheIgniteTestZk-" + i);
-
- if (file.isDirectory())
- deleteRecursively0(file);
- else {
- if (!file.mkdirs())
- throw new IgniteException("Failed to create directory for test Zookeeper server: " + file.getAbsolutePath());
- }
-
-
- specs.add(new InstanceSpec(file, -1, -1, -1, true, -1, 1000, -1));
- }
-
- return new TestingCluster(specs);
- }
-
- /**
- * @param file Directory to delete.
- */
- private static void deleteRecursively0(File file) {
- File[] files = file.listFiles();
-
- if (files == null)
- return;
-
- for (File f : files) {
- if (f.isDirectory())
- deleteRecursively0(f);
- else {
- if (!f.delete())
- throw new IgniteException("Failed to delete file: " + f.getAbsolutePath());
- }
- }
- }
-
- /** {@inheritDoc} */
- @Override protected void afterTestsStopped() throws Exception {
- stopZkCluster();
-
- super.afterTestsStopped();
- }
-
- /**
- *
- */
- private void stopZkCluster() {
- if (zkCluster != null) {
- try {
- zkCluster.close();
- }
- catch (Exception e) {
- U.error(log, "Failed to stop Zookeeper client: " + e, e);
- }
-
- zkCluster = null;
- }
- }
-
- /**
- *
- */
- private static void ackEveryEventSystemProperty() {
- System.setProperty(IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_THRESHOLD, "1");
- }
-
- /**
- *
- */
- private void clearAckEveryEventSystemProperty() {
- System.setProperty(IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_THRESHOLD, "1");
- }
-
- /** {@inheritDoc} */
- @Override protected void beforeTest() throws Exception {
- super.beforeTest();
-
- if (USE_TEST_CLUSTER && zkCluster == null) {
- zkCluster = createTestingCluster(ZK_SRVS);
-
- zkCluster.start();
- }
-
- reset();
- }
-
- /** {@inheritDoc} */
- @Override protected void afterTest() throws Exception {
- super.afterTest();
-
- clearAckEveryEventSystemProperty();
-
- try {
- assertFalse("Unexpected error, see log for details", err);
-
- checkEventsConsistency();
-
- checkInternalStructuresCleanup();
- }
- finally {
- reset();
-
- stopAllGrids();
- }
- }
-
- /**
- * @throws Exception If failed.
- */
- private void checkInternalStructuresCleanup() throws Exception {
- for (Ignite node : G.allGrids()) {
- final AtomicReference<?> res = GridTestUtils.getFieldValue(spi(node), "impl", "commErrProcFut");
-
- GridTestUtils.waitForCondition(new GridAbsPredicate() {
- @Override public boolean apply() {
- return res.get() == null;
- }
- }, 30_000);
-
- assertNull(res.get());
- }
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testZkRootNotExists() throws Exception {
- zkRootPath = "/a/b/c";
-
- for (int i = 0; i < 3; i++) {
- reset();
-
- startGridsMultiThreaded(5);
-
- waitForTopology(5);
-
- stopAllGrids();
-
- checkEventsConsistency();
- }
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testMetadataUpdate() throws Exception {
- startGrid(0);
-
- GridTestUtils.runMultiThreaded(new Callable<Void>() {
- @Override public Void call() throws Exception {
- ignite(0).configuration().getMarshaller().marshal(new C1());
- ignite(0).configuration().getMarshaller().marshal(new C2());
-
- return null;
- }
- }, 64, "marshal");
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testNodeAddresses() throws Exception {
- startGridsMultiThreaded(3);
-
- clientMode(true);
-
- startGridsMultiThreaded(3, 3);
-
- waitForTopology(6);
-
- for (Ignite node : G.allGrids()) {
- ClusterNode locNode0 = node.cluster().localNode();
-
- assertTrue(locNode0.addresses().size() > 0);
- assertTrue(locNode0.hostNames().size() > 0);
-
- for (ClusterNode node0 : node.cluster().nodes()) {
- assertTrue(node0.addresses().size() > 0);
- assertTrue(node0.hostNames().size() > 0);
- }
- }
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testSetConsistentId() throws Exception {
- startGridsMultiThreaded(3);
-
- clientMode(true);
-
- startGridsMultiThreaded(3, 3);
-
- waitForTopology(6);
-
- for (Ignite node : G.allGrids()) {
- ClusterNode locNode0 = node.cluster().localNode();
-
- assertEquals(locNode0.attribute(ATTR_IGNITE_INSTANCE_NAME),
- locNode0.consistentId());
-
- for (ClusterNode node0 : node.cluster().nodes()) {
- assertEquals(node0.attribute(ATTR_IGNITE_INSTANCE_NAME),
- node0.consistentId());
- }
- }
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testDefaultConsistentId() throws Exception {
- dfltConsistenId = true;
-
- startGridsMultiThreaded(3);
-
- clientMode(true);
-
- startGridsMultiThreaded(3, 3);
-
- waitForTopology(6);
-
- for (Ignite node : G.allGrids()) {
- ClusterNode locNode0 = node.cluster().localNode();
-
- assertNotNull(locNode0.consistentId());
-
- for (ClusterNode node0 : node.cluster().nodes())
- assertNotNull(node0.consistentId());
- }
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testClientNodesStatus() throws Exception {
- startGrid(0);
-
- for (Ignite node : G.allGrids()) {
- assertEquals(0, node.cluster().forClients().nodes().size());
- assertEquals(1, node.cluster().forServers().nodes().size());
- }
-
- clientMode(true);
-
- startGrid(1);
-
- for (Ignite node : G.allGrids()) {
- assertEquals(1, node.cluster().forClients().nodes().size());
- assertEquals(1, node.cluster().forServers().nodes().size());
- }
-
- clientMode(false);
-
- startGrid(2);
-
- clientMode(true);
-
- startGrid(3);
-
- for (Ignite node : G.allGrids()) {
- assertEquals(2, node.cluster().forClients().nodes().size());
- assertEquals(2, node.cluster().forServers().nodes().size());
- }
-
- stopGrid(1);
-
- waitForTopology(3);
-
- for (Ignite node : G.allGrids()) {
- assertEquals(1, node.cluster().forClients().nodes().size());
- assertEquals(2, node.cluster().forServers().nodes().size());
- }
-
- stopGrid(2);
-
- waitForTopology(2);
-
- for (Ignite node : G.allGrids()) {
- assertEquals(1, node.cluster().forClients().nodes().size());
- assertEquals(1, node.cluster().forServers().nodes().size());
- }
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testLocalAuthenticationFails() throws Exception {
- auth = ZkTestNodeAuthenticator.factory(getTestIgniteInstanceName(0));
-
- Throwable err = GridTestUtils.assertThrows(log, new Callable<Void>() {
- @Override public Void call() throws Exception {
- startGrid(0);
-
- return null;
- }
- }, IgniteCheckedException.class, null);
-
- IgniteSpiException spiErr = X.cause(err, IgniteSpiException.class);
-
- assertNotNull(spiErr);
- assertTrue(spiErr.getMessage().contains("Authentication failed for local node"));
-
- startGrid(1);
- startGrid(2);
-
- checkTestSecuritySubject(2);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testAuthentication() throws Exception {
- auth = ZkTestNodeAuthenticator.factory(getTestIgniteInstanceName(1),
- getTestIgniteInstanceName(5));
-
- startGrid(0);
-
- checkTestSecuritySubject(1);
-
- {
- clientMode(false);
- checkStartFail(1);
-
- clientMode(true);
- checkStartFail(1);
-
- clientMode(false);
- }
-
- startGrid(2);
-
- checkTestSecuritySubject(2);
-
- stopGrid(2);
-
- checkTestSecuritySubject(1);
-
- startGrid(2);
-
- checkTestSecuritySubject(2);
-
- stopGrid(0);
-
- checkTestSecuritySubject(1);
-
- checkStartFail(1);
-
- clientMode(false);
-
- startGrid(3);
-
- clientMode(true);
-
- startGrid(4);
-
- clientMode(false);
-
- startGrid(0);
-
- checkTestSecuritySubject(4);
-
- checkStartFail(1);
- checkStartFail(5);
-
- clientMode(true);
-
- checkStartFail(1);
- checkStartFail(5);
- }
-
- /**
- * @param nodeIdx Node index.
- */
- private void checkStartFail(final int nodeIdx) {
- Throwable err = GridTestUtils.assertThrows(log, new Callable<Void>() {
- @Override public Void call() throws Exception {
- startGrid(nodeIdx);
-
- return null;
- }
- }, IgniteCheckedException.class, null);
-
- IgniteSpiException spiErr = X.cause(err, IgniteSpiException.class);
-
- assertNotNull(spiErr);
- assertTrue(spiErr.getMessage().contains("Authentication failed"));
- }
-
- /**
- * @param expNodes Expected nodes number.
- * @throws Exception If failed.
- */
- private void checkTestSecuritySubject(int expNodes) throws Exception {
- waitForTopology(expNodes);
-
- List<Ignite> nodes = G.allGrids();
-
- JdkMarshaller marsh = new JdkMarshaller();
-
- for (Ignite ignite : nodes) {
- Collection<ClusterNode> nodes0 = ignite.cluster().nodes();
-
- assertEquals(nodes.size(), nodes0.size());
-
- for (ClusterNode node : nodes0) {
- byte[] secSubj = node.attribute(ATTR_SECURITY_SUBJECT_V2);
-
- assertNotNull(secSubj);
-
- ZkTestNodeAuthenticator.TestSecurityContext secCtx = marsh.unmarshal(secSubj, null);
-
- assertEquals(node.attribute(ATTR_IGNITE_INSTANCE_NAME), secCtx.nodeName);
- }
- }
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testStopNode_1() throws Exception {
- startGrids(5);
-
- waitForTopology(5);
-
- stopGrid(3);
-
- waitForTopology(4);
-
- startGrid(3);
-
- waitForTopology(5);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testCustomEventsSimple1_SingleNode() throws Exception {
- ackEveryEventSystemProperty();
-
- Ignite srv0 = startGrid(0);
-
- srv0.createCache(new CacheConfiguration<>("c1"));
-
- waitForEventsAcks(srv0);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testCustomEventsSimple1_5_Nodes() throws Exception {
- ackEveryEventSystemProperty();
-
- Ignite srv0 = startGrids(5);
-
- srv0.createCache(new CacheConfiguration<>("c1"));
-
- awaitPartitionMapExchange();
-
- waitForEventsAcks(srv0);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testCustomEvents_FastStopProcess_1() throws Exception {
- customEvents_FastStopProcess(1, 0);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testCustomEvents_FastStopProcess_2() throws Exception {
- customEvents_FastStopProcess(5, 5);
- }
-
- /**
- * @param srvs Servers number.
- * @param clients Clients number.
- * @throws Exception If failed.
- */
- private void customEvents_FastStopProcess(int srvs, int clients) throws Exception {
- ackEveryEventSystemProperty();
-
- Map<UUID, List<T3<AffinityTopologyVersion, UUID, DiscoveryCustomMessage>>> rcvdMsgs =
- new ConcurrentHashMap<>();
-
- Ignite crd = startGrid(0);
-
- UUID crdId = crd.cluster().localNode().id();
-
- if (srvs > 1)
- startGridsMultiThreaded(1, srvs - 1);
-
- if (clients > 0) {
- client = true;
-
- startGridsMultiThreaded(srvs, clients);
- }
-
- awaitPartitionMapExchange();
-
- List<Ignite> nodes = G.allGrids();
-
- assertEquals(srvs + clients, nodes.size());
-
- for (Ignite node : nodes)
- registerTestEventListeners(node, rcvdMsgs);
-
- int payload = 0;
-
- AffinityTopologyVersion topVer = ((IgniteKernal)crd).context().discovery().topologyVersionEx();
-
- for (Ignite node : nodes) {
- UUID sndId = node.cluster().localNode().id();
-
- info("Send from node: " + sndId);
-
- GridDiscoveryManager discoveryMgr = ((IgniteKernal)node).context().discovery();
-
- {
- List<T3<AffinityTopologyVersion, UUID, DiscoveryCustomMessage>> expCrdMsgs = new ArrayList<>();
- List<T3<AffinityTopologyVersion, UUID, DiscoveryCustomMessage>> expNodesMsgs = Collections.emptyList();
-
- TestFastStopProcessCustomMessage msg = new TestFastStopProcessCustomMessage(false, payload++);
-
- expCrdMsgs.add(new T3<AffinityTopologyVersion, UUID, DiscoveryCustomMessage>(topVer, sndId, msg));
-
- discoveryMgr.sendCustomEvent(msg);
-
- doSleep(200); // Wait some time to check extra messages are not received.
-
- checkEvents(crd, rcvdMsgs, expCrdMsgs);
-
- for (Ignite node0 : nodes) {
- if (node0 != crd)
- checkEvents(node0, rcvdMsgs, expNodesMsgs);
- }
-
- rcvdMsgs.clear();
- }
- {
- List<T3<AffinityTopologyVersion, UUID, DiscoveryCustomMessage>> expCrdMsgs = new ArrayList<>();
- List<T3<AffinityTopologyVersion, UUID, DiscoveryCustomMessage>> expNodesMsgs = new ArrayList<>();
-
- TestFastStopProcessCustomMessage msg = new TestFastStopProcessCustomMessage(true, payload++);
-
- expCrdMsgs.add(new T3<AffinityTopologyVersion, UUID, DiscoveryCustomMessage>(topVer, sndId, msg));
-
- discoveryMgr.sendCustomEvent(msg);
-
- TestFastStopProcessCustomMessageAck ackMsg = new TestFastStopProcessCustomMessageAck(msg.payload);
-
- expCrdMsgs.add(new T3<AffinityTopologyVersion, UUID, DiscoveryCustomMessage>(topVer, crdId, ackMsg));
- expNodesMsgs.add(new T3<AffinityTopologyVersion, UUID, DiscoveryCustomMessage>(topVer, crdId, ackMsg));
-
- doSleep(200); // Wait some time to check extra messages are not received.
-
- checkEvents(crd, rcvdMsgs, expCrdMsgs);
-
- for (Ignite node0 : nodes) {
- if (node0 != crd)
- checkEvents(node0, rcvdMsgs, expNodesMsgs);
- }
-
- rcvdMsgs.clear();
- }
-
- waitForEventsAcks(crd);
- }
- }
-
- /**
- * @param node Node to check.
- * @param rcvdMsgs Received messages.
- * @param expMsgs Expected messages.
- * @throws Exception If failed.
- */
- private void checkEvents(
- Ignite node,
- final Map<UUID, List<T3<AffinityTopologyVersion, UUID, DiscoveryCustomMessage>>> rcvdMsgs,
- final List<T3<AffinityTopologyVersion, UUID, DiscoveryCustomMessage>> expMsgs) throws Exception {
- final UUID nodeId = node.cluster().localNode().id();
-
- assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() {
- @Override public boolean apply() {
- List<T3<AffinityTopologyVersion, UUID, DiscoveryCustomMessage>> msgs = rcvdMsgs.get(nodeId);
-
- int size = msgs == null ? 0 : msgs.size();
-
- return size >= expMsgs.size();
- }
- }, 5000));
-
- List<T3<AffinityTopologyVersion, UUID, DiscoveryCustomMessage>> msgs = rcvdMsgs.get(nodeId);
-
- if (msgs == null)
- msgs = Collections.emptyList();
-
- assertEqualsCollections(expMsgs, msgs);
- }
-
- /**
- * @param node Node.
- * @param rcvdMsgs Map to store received events.
- */
- private void registerTestEventListeners(Ignite node,
- final Map<UUID, List<T3<AffinityTopologyVersion, UUID, DiscoveryCustomMessage>>> rcvdMsgs) {
- GridDiscoveryManager discoveryMgr = ((IgniteKernal)node).context().discovery();
-
- final UUID nodeId = node.cluster().localNode().id();
-
- discoveryMgr.setCustomEventListener(TestFastStopProcessCustomMessage.class,
- new CustomEventListener<TestFastStopProcessCustomMessage>() {
- @Override public void onCustomEvent(AffinityTopologyVersion topVer, ClusterNode snd, TestFastStopProcessCustomMessage msg) {
- List<T3<AffinityTopologyVersion, UUID, DiscoveryCustomMessage>> list = rcvdMsgs.get(nodeId);
-
- if (list == null)
- rcvdMsgs.put(nodeId, list = new ArrayList<>());
-
- list.add(new T3<>(topVer, snd.id(), (DiscoveryCustomMessage)msg));
- }
- }
- );
- discoveryMgr.setCustomEventListener(TestFastStopProcessCustomMessageAck.class,
- new CustomEventListener<TestFastStopProcessCustomMessageAck>() {
- @Override public void onCustomEvent(AffinityTopologyVersion topVer, ClusterNode snd, TestFastStopProcessCustomMessageAck msg) {
- List<T3<AffinityTopologyVersion, UUID, DiscoveryCustomMessage>> list = rcvdMsgs.get(nodeId);
-
- if (list == null)
- rcvdMsgs.put(nodeId, list = new ArrayList<>());
-
- list.add(new T3<>(topVer, snd.id(), (DiscoveryCustomMessage)msg));
- }
- }
- );
- }
-
- /**
- *
- */
- static class TestFastStopProcessCustomMessage implements DiscoveryCustomMessage {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** */
- private final IgniteUuid id = IgniteUuid.randomUuid();;
-
- /** */
- private final boolean createAck;
-
- /** */
- private final int payload;
-
- /**
- * @param createAck Create ack message flag.
- * @param payload Payload.
- */
- TestFastStopProcessCustomMessage(boolean createAck, int payload) {
- this.createAck = createAck;
- this.payload = payload;
-
- }
-
- /** {@inheritDoc} */
- @Override public IgniteUuid id() {
- return id;
- }
-
- /** {@inheritDoc} */
- @Nullable @Override public DiscoveryCustomMessage ackMessage() {
- return createAck ? new TestFastStopProcessCustomMessageAck(payload) : null;
- }
-
- /** {@inheritDoc} */
- @Override public boolean isMutable() {
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public boolean stopProcess() {
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr,
- AffinityTopologyVersion topVer,
- DiscoCache discoCache) {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public boolean equals(Object o) {
- if (this == o)
- return true;
-
- if (o == null || getClass() != o.getClass())
- return false;
-
- TestFastStopProcessCustomMessage that = (TestFastStopProcessCustomMessage)o;
-
- return createAck == that.createAck && payload == that.payload;
- }
-
- /** {@inheritDoc} */
- @Override public int hashCode() {
- return Objects.hash(createAck, payload);
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(TestFastStopProcessCustomMessage.class, this);
- }
- }
-
- /**
- *
- */
- static class TestFastStopProcessCustomMessageAck implements DiscoveryCustomMessage {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** */
- private final IgniteUuid id = IgniteUuid.randomUuid();;
-
- /** */
- private final int payload;
-
- /**
- * @param payload Payload.
- */
- TestFastStopProcessCustomMessageAck(int payload) {
- this.payload = payload;
- }
-
- /** {@inheritDoc} */
- @Override public IgniteUuid id() {
- return id;
- }
-
- /** {@inheritDoc} */
- @Nullable @Override public DiscoveryCustomMessage ackMessage() {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public boolean isMutable() {
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public boolean stopProcess() {
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr,
- AffinityTopologyVersion topVer,
- DiscoCache discoCache) {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public boolean equals(Object o) {
- if (this == o)
- return true;
-
- if (o == null || getClass() != o.getClass())
- return false;
-
- TestFastStopProcessCustomMessageAck that = (TestFastStopProcessCustomMessageAck)o;
- return payload == that.payload;
- }
-
- /** {@inheritDoc} */
- @Override public int hashCode() {
- return Objects.hash(payload);
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(TestFastStopProcessCustomMessageAck.class, this);
- }
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testSegmentation1() throws Exception {
- sesTimeout = 2000;
- testSockNio = true;
-
- Ignite node0 = startGrid(0);
-
- final CountDownLatch l = new CountDownLatch(1);
-
- node0.events().localListen(new IgnitePredicate<Event>() {
- @Override public boolean apply(Event evt) {
- l.countDown();
-
- return false;
- }
- }, EventType.EVT_NODE_SEGMENTED);
-
- ZkTestClientCnxnSocketNIO c0 = ZkTestClientCnxnSocketNIO.forNode(node0);
-
- c0.closeSocket(true);
-
- for (int i = 0; i < 10; i++) {
- Thread.sleep(1_000);
-
- if (l.getCount() == 0)
- break;
- }
-
- info("Allow connect");
-
- c0.allowConnect();
-
- assertTrue(l.await(10, TimeUnit.SECONDS));
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testSegmentation2() throws Exception {
- sesTimeout = 2000;
-
- Ignite node0 = startGrid(0);
-
- final CountDownLatch l = new CountDownLatch(1);
-
- node0.events().localListen(new IgnitePredicate<Event>() {
- @Override public boolean apply(Event evt) {
- l.countDown();
-
- return false;
- }
- }, EventType.EVT_NODE_SEGMENTED);
-
- try {
- zkCluster.close();
-
- assertTrue(l.await(10, TimeUnit.SECONDS));
- }
- finally {
- zkCluster = createTestingCluster(ZK_SRVS);
-
- zkCluster.start();
- }
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testSegmentation3() throws Exception {
- sesTimeout = 5000;
-
- Ignite node0 = startGrid(0);
-
- final CountDownLatch l = new CountDownLatch(1);
-
- node0.events().localListen(new IgnitePredicate<Event>() {
- @Override public boolean apply(Event evt) {
- l.countDown();
-
- return false;
- }
- }, EventType.EVT_NODE_SEGMENTED);
-
- List<TestingZooKeeperServer> srvs = zkCluster.getServers();
-
- assertEquals(3, srvs.size());
-
- try {
- srvs.get(0).stop();
- srvs.get(1).stop();
-
- assertTrue(l.await(20, TimeUnit.SECONDS));
- }
- finally {
- zkCluster.close();
-
- zkCluster = createTestingCluster(ZK_SRVS);
-
- zkCluster.start();
- }
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testQuorumRestore() throws Exception {
- sesTimeout = 15_000;
-
- startGrids(3);
-
- waitForTopology(3);
-
- List<TestingZooKeeperServer> srvs = zkCluster.getServers();
-
- assertEquals(3, srvs.size());
-
- try {
- srvs.get(0).stop();
- srvs.get(1).stop();
-
- U.sleep(2000);
-
- srvs.get(1).restart();
-
- startGrid(4);
-
- waitForTopology(4);
- }
- finally {
- zkCluster.close();
-
- zkCluster = createTestingCluster(ZK_SRVS);
-
- zkCluster.start();
- }
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testConnectionRestore1() throws Exception {
- testSockNio = true;
-
- Ignite node0 = startGrid(0);
-
- ZkTestClientCnxnSocketNIO c0 = ZkTestClientCnxnSocketNIO.forNode(node0);
-
- c0.closeSocket(false);
-
- startGrid(1);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testConnectionRestore2() throws Exception {
- testSockNio = true;
-
- Ignite node0 = startGrid(0);
-
- ZkTestClientCnxnSocketNIO c0 = ZkTestClientCnxnSocketNIO.forNode(node0);
-
- c0.closeSocket(false);
-
- startGridsMultiThreaded(1, 5);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testConnectionRestore_NonCoordinator1() throws Exception {
- connectionRestore_NonCoordinator(false);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testConnectionRestore_NonCoordinator2() throws Exception {
- connectionRestore_NonCoordinator(true);
- }
-
- /**
- * @param failWhenDisconnected {@code True} if fail node while another node is disconnected.
- * @throws Exception If failed.
- */
- private void connectionRestore_NonCoordinator(boolean failWhenDisconnected) throws Exception {
- testSockNio = true;
-
- Ignite node0 = startGrid(0);
- Ignite node1 = startGrid(1);
-
- ZkTestClientCnxnSocketNIO c1 = ZkTestClientCnxnSocketNIO.forNode(node1);
-
- c1.closeSocket(true);
-
- IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() {
- @Override public Void call() {
- try {
- startGrid(2);
- }
- catch (Exception e) {
- info("Start error: " + e);
- }
-
- return null;
- }
- }, "start-node");
-
- checkEvents(node0, joinEvent(3));
-
- if (failWhenDisconnected) {
- ZookeeperDiscoverySpi spi = spis.get(getTestIgniteInstanceName(2));
-
- closeZkClient(spi);
-
- checkEvents(node0, failEvent(4));
- }
-
- c1.allowConnect();
-
- checkEvents(ignite(1), joinEvent(3));
-
- if (failWhenDisconnected) {
- checkEvents(ignite(1), failEvent(4));
-
- IgnitionEx.stop(getTestIgniteInstanceName(2), true, true);
- }
-
- fut.get();
-
- waitForTopology(failWhenDisconnected ? 2 : 3);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testConnectionRestore_Coordinator1() throws Exception {
- connectionRestore_Coordinator(1, 1, 0);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testConnectionRestore_Coordinator1_1() throws Exception {
- connectionRestore_Coordinator(1, 1, 1);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testConnectionRestore_Coordinator2() throws Exception {
- connectionRestore_Coordinator(1, 3, 0);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testConnectionRestore_Coordinator3() throws Exception {
- connectionRestore_Coordinator(3, 3, 0);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testConnectionRestore_Coordinator4() throws Exception {
- connectionRestore_Coordinator(3, 3, 1);
- }
-
- /**
- * @param initNodes Number of initially started nodes.
- * @param startNodes Number of nodes to start after coordinator loose connection.
- * @param failCnt Number of nodes to stop after coordinator loose connection.
- * @throws Exception If failed.
- */
- private void connectionRestore_Coordinator(final int initNodes, int startNodes, int failCnt) throws Exception {
- sesTimeout = 30_000;
- testSockNio = true;
-
- Ignite node0 = startGrids(initNodes);
-
- ZkTestClientCnxnSocketNIO c0 = ZkTestClientCnxnSocketNIO.forNode(node0);
-
- c0.closeSocket(true);
-
- final AtomicInteger nodeIdx = new AtomicInteger(initNodes);
-
- IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
- @Override public Void call() {
- try {
- startGrid(nodeIdx.getAndIncrement());
- }
- catch (Exception e) {
- error("Start failed: " + e);
- }
-
- return null;
- }
- }, startNodes, "start-node");
-
- int cnt = 0;
-
- DiscoveryEvent[] expEvts = new DiscoveryEvent[startNodes - failCnt];
-
- int expEvtCnt = 0;
-
- sesTimeout = 1000;
-
- List<ZkTestClientCnxnSocketNIO> blockedC = new ArrayList<>();
-
- final List<String> failedZkNodes = new ArrayList<>(failCnt);
-
- for (int i = initNodes; i < initNodes + startNodes; i++) {
- final ZookeeperDiscoverySpi spi = waitSpi(getTestIgniteInstanceName(i));
-
- assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() {
- @Override public boolean apply() {
- long internalOrder = GridTestUtils.getFieldValue(spi, "impl", "rtState", "internalOrder");
-
- return internalOrder > 0;
- }
- }, 10_000));
-
- if (cnt++ < failCnt) {
- ZkTestClientCnxnSocketNIO c = ZkTestClientCnxnSocketNIO.forNode(getTestIgniteInstanceName(i));
-
- c.closeSocket(true);
-
- blockedC.add(c);
-
- failedZkNodes.add(aliveZkNodePath(spi));
- }
- else {
- expEvts[expEvtCnt] = joinEvent(initNodes + expEvtCnt + 1);
-
- expEvtCnt++;
- }
- }
-
- waitNoAliveZkNodes(log, zkCluster.getConnectString(), failedZkNodes, 10_000);
-
- c0.allowConnect();
-
- for (ZkTestClientCnxnSocketNIO c : blockedC)
- c.allowConnect();
-
- if (expEvts.length > 0) {
- for (int i = 0; i < initNodes; i++)
- checkEvents(ignite(i), expEvts);
- }
-
- fut.get();
-
- waitForTopology(initNodes + startNodes - failCnt);
- }
-
- /**
- * @param node Node.
- * @return Corresponding znode.
- */
- private static String aliveZkNodePath(Ignite node) {
- return aliveZkNodePath(node.configuration().getDiscoverySpi());
- }
-
- /**
- * @param spi SPI.
- * @return Znode related to given SPI.
- */
- private static String aliveZkNodePath(DiscoverySpi spi) {
- String path = GridTestUtils.getFieldValue(spi, "impl", "rtState", "locNodeZkPath");
-
- return path.substring(path.lastIndexOf('/') + 1);
- }
-
- /**
- * @param log Logger.
- * @param connectString Zookeeper connect string.
- * @param failedZkNodes Znodes which should be removed.
- * @param timeout Timeout.
- * @throws Exception If failed.
- */
- private static void waitNoAliveZkNodes(final IgniteLogger log,
- String connectString,
- final List<String> failedZkNodes,
- long timeout)
- throws Exception
- {
- final ZookeeperClient zkClient = new ZookeeperClient(log, connectString, 10_000, null);
-
- try {
- assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() {
- @Override public boolean apply() {
- try {
- List<String> c = zkClient.getChildren(IGNITE_ZK_ROOT + "/" + ZkIgnitePaths.ALIVE_NODES_DIR);
-
- for (String failedZkNode : failedZkNodes) {
- if (c.contains(failedZkNode)) {
- log.info("Alive node is not removed [node=" + failedZkNode + ", all=" + c + ']');
-
- return false;
- }
- }
-
- return true;
- }
- catch (Exception e) {
- e.printStackTrace();
-
- fail();
-
- return true;
- }
- }
- }, timeout));
- }
- finally {
- zkClient.close();
- }
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testConcurrentStartWithClient() throws Exception {
- final int NODES = 20;
-
- for (int i = 0; i < 3; i++) {
- info("Iteration: " + i);
-
- final int srvIdx = ThreadLocalRandom.current().nextInt(NODES);
-
- final AtomicInteger idx = new AtomicInteger();
-
- GridTestUtils.runMultiThreaded(new Callable<Void>() {
- @Override public Void call() throws Exception {
- int threadIdx = idx.getAndIncrement();
-
- clientModeThreadLocal(threadIdx == srvIdx || ThreadLocalRandom.current().nextBoolean());
-
- startGrid(threadIdx);
-
- return null;
- }
- }, NODES, "start-node");
-
- waitForTopology(NODES);
-
- stopAllGrids();
-
- checkEventsConsistency();
-
- evts.clear();
- }
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testConcurrentStartStop1() throws Exception {
- concurrentStartStop(1);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testConcurrentStartStop2() throws Exception {
- concurrentStartStop(5);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testConcurrentStartStop2_EventsThrottle() throws Exception {
- System.setProperty(ZookeeperDiscoveryImpl.IGNITE_ZOOKEEPER_DISCOVERY_SPI_MAX_EVTS, "1");
-
- try {
- concurrentStartStop(5);
- }
- finally {
- System.clearProperty(ZookeeperDiscoveryImpl.IGNITE_ZOOKEEPER_DISCOVERY_SPI_MAX_EVTS);
- }
- }
-
- /**
- * @param initNodes Number of initially started nnodes.
- * @throws Exception If failed.
- */
- private void concurrentStartStop(final int initNodes) throws Exception {
- startGrids(initNodes);
-
- final int NODES = 5;
-
- long topVer = initNodes;
-
- for (int i = 0; i < 10; i++) {
- info("Iteration: " + i);
-
- DiscoveryEvent[] expEvts = new DiscoveryEvent[NODES];
-
- startGridsMultiThreaded(initNodes, NODES);
-
- for (int j = 0; j < NODES; j++)
- expEvts[j] = joinEvent(++topVer);
-
- checkEvents(ignite(0), expEvts);
-
- checkEventsConsistency();
-
- final CyclicBarrier b = new CyclicBarrier(NODES);
-
- GridTestUtils.runMultiThreaded(new IgniteInClosure<Integer>() {
- @Override public void apply(Integer idx) {
- try {
- b.await();
-
- stopGrid(initNodes + idx);
- }
- catch (Exception e) {
- e.printStackTrace();
-
- fail();
- }
- }
- }, NODES, "stop-node");
-
- for (int j = 0; j < NODES; j++)
- expEvts[j] = failEvent(++topVer);
-
- checkEventsConsistency();
- }
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testClusterRestart() throws Exception {
- startGridsMultiThreaded(3, false);
-
- stopAllGrids();
-
- evts.clear();
-
- startGridsMultiThreaded(3, false);
-
- waitForTopology(3);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testConnectionRestore4() throws Exception {
- testSockNio = true;
-
- Ignite node0 = startGrid(0);
-
- ZkTestClientCnxnSocketNIO c0 = ZkTestClientCnxnSocketNIO.forNode(node0);
-
- c0.closeSocket(false);
-
- startGrid(1);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testStartStop_1_Node() throws Exception {
- startGrid(0);
-
- waitForTopology(1);
-
- stopGrid(0);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testRestarts_2_Nodes() throws Exception {
- startGrid(0);
-
- for (int i = 0; i < 10; i++) {
- info("Iteration: " + i);
-
- startGrid(1);
-
- waitForTopology(2);
-
- stopGrid(1);
- }
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testStartStop_2_Nodes_WithCache() throws Exception {
- startGrids(2);
-
- for (Ignite node : G.allGrids()) {
- IgniteCache<Object, Object> cache = node.cache(DEFAULT_CACHE_NAME);
-
- assertNotNull(cache);
-
- for (int i = 0; i < 100; i++) {
- cache.put(i, node.name());
-
- assertEquals(node.name(), cache.get(i));
- }
- }
-
- awaitPartitionMapExchange();
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testStartStop_2_Nodes() throws Exception {
- ackEveryEventSystemProperty();
-
- startGrid(0);
-
- waitForTopology(1);
-
- startGrid(1);
-
- waitForTopology(2);
-
- for (Ignite node : G.allGrids())
- node.compute().broadcast(new DummyCallable(null));
-
- awaitPartitionMapExchange();
-
- waitForEventsAcks(ignite(0));
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testStartStop1() throws Exception {
- ackEveryEventSystemProperty();
-
- startGridsMultiThreaded(5, false);
-
- waitForTopology(5);
-
- awaitPartitionMapExchange();
-
- waitForEventsAcks(ignite(0));
-
- stopGrid(0);
-
- waitForTopology(4);
-
- for (Ignite node : G.allGrids())
- node.compute().broadcast(new DummyCallable(null));
-
- startGrid(0);
-
- waitForTopology(5);
-
- awaitPartitionMapExchange();
-
- waitForEventsAcks(grid(CU.oldest(ignite(1).cluster().nodes())));
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testStartStop3() throws Exception {
- startGrids(4);
-
- awaitPartitionMapExchange();
-
- stopGrid(0);
-
- startGrid(5);
-
- awaitPartitionMapExchange();
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testStartStop4() throws Exception {
- startGrids(6);
-
- awaitPartitionMapExchange();
-
- stopGrid(2);
-
- if (ThreadLocalRandom.current().nextBoolean())
- awaitPartitionMapExchange();
-
- stopGrid(1);
-
- if (ThreadLocalRandom.current().nextBoolean())
- awaitPartitionMapExchange();
-
- stopGrid(0);
-
- if (ThreadLocalRandom.current().nextBoolean())
- awaitPartitionMapExchange();
-
- startGrid(7);
-
- awaitPartitionMapExchange();
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testStartStop2() throws Exception {
- startGridsMultiThreaded(10, false);
-
- GridTestUtils.runMultiThreaded(new IgniteInClosure<Integer>() {
- @Override public void apply(Integer idx) {
- stopGrid(idx);
- }
- }, 3, "stop-node-thread");
-
- waitForTopology(7);
-
- startGridsMultiThreaded(0, 3);
-
- waitForTopology(10);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testStartStopWithClients() throws Exception {
- final int SRVS = 3;
-
- startGrids(SRVS);
-
- clientMode(true);
-
- final int THREADS = 30;
-
- for (int i = 0; i < 5; i++) {
- info("Iteration: " + i);
-
- startGridsMultiThreaded(SRVS, THREADS);
-
- waitForTopology(SRVS + THREADS);
-
- GridTestUtils.runMultiThreaded(new IgniteInClosure<Integer>() {
- @Override public void apply(Integer idx) {
- stopGrid(idx + SRVS);
- }
- }, THREADS, "stop-node");
-
- waitForTopology(SRVS);
-
- checkEventsConsistency();
- }
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testTopologyChangeMultithreaded() throws Exception {
- topologyChangeWithRestarts(false, false);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testTopologyChangeMultithreaded_RestartZk() throws Exception {
- topologyChangeWithRestarts(true, false);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testTopologyChangeMultithreaded_RestartZk_CloseClients() throws Exception {
- topologyChangeWithRestarts(true, true);
- }
-
- /**
- * @param restartZk If {@code true} in background restarts on of ZK servers.
- * @param closeClientSock If {@code true} in background closes zk clients' sockets.
- * @throws Exception If failed.
- */
- private void topologyChangeWithRestarts(boolean restartZk, boolean closeClientSock) throws Exception {
- sesTimeout = 30_000;
-
- if (closeClientSock)
- testSockNio = true;
-
- long stopTime = System.currentTimeMillis() + 60_000;
-
- AtomicBoolean stop = new AtomicBoolean();
-
- IgniteInternalFuture<?> fut1 = restartZk ? startRestartZkServers(stopTime, stop) : null;
- IgniteInternalFuture<?> fut2 = closeClientSock ? startCloseZkClientSocket(stopTime, stop) : null;
-
- int INIT_NODES = 10;
-
- startGridsMultiThreaded(INIT_NODES);
-
- final int MAX_NODES = 20;
-
- final List<Integer> startedNodes = new ArrayList<>();
-
- for (int i = 0; i < INIT_NODES; i++)
- startedNodes.add(i);
-
- ThreadLocalRandom rnd = ThreadLocalRandom.current();
-
- final AtomicInteger startIdx = new AtomicInteger(INIT_NODES);
-
- try {
- while (System.currentTimeMillis() < stopTime) {
- if (startedNodes.size() >= MAX_NODES) {
- int stopNodes = rnd.nextInt(5) + 1;
-
- log.info("Next, stop nodes: " + stopNodes);
-
- final List<Integer> idxs = new ArrayList<>();
-
- while (idxs.size() < stopNodes) {
- Integer stopIdx = rnd.nextInt(startedNodes.size());
-
- if (!idxs.contains(stopIdx))
- idxs.add(startedNodes.get(stopIdx));
- }
-
- GridTestUtils.runMultiThreaded(new IgniteInClosure<Integer>() {
- @Override public void apply(Integer threadIdx) {
- int stopNodeIdx = idxs.get(threadIdx);
-
- info("Stop node: " + stopNodeIdx);
-
- stopGrid(stopNodeIdx);
- }
- }, stopNodes, "stop-node");
-
- startedNodes.removeAll(idxs);
- }
- else {
- int startNodes = rnd.nextInt(5) + 1;
-
- log.info("Next, start nodes: " + startNodes);
-
- GridTestUtils.runMultiThreaded(new Callable<Void>() {
- @Override public Void call() throws Exception {
- int idx = startIdx.incrementAndGet();
-
- log.info("Start node: " + idx);
-
- startGrid(idx);
-
- synchronized (startedNodes) {
- startedNodes.add(idx);
- }
-
- return null;
- }
- }, startNodes, "start-node");
- }
-
- U.sleep(rnd.nextInt(100) + 1);
- }
- }
- finally {
- stop.set(true);
- }
-
- if (fut1 != null)
- fut1.get();
-
- if (fut2 != null)
- fut2.get();
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testRandomTopologyChanges() throws Exception {
- randomTopologyChanges(false, false);
- }
-
- /**
- * @throws Exception If failed.
- */
- private void printZkNodes() throws Exception {
- ZookeeperClient zkClient = new ZookeeperClient(new JavaLogger(), zkCluster.getConnectString(), 10_000, null);
-
- List<String> children = ZKUtil.listSubTreeBFS(zkClient.zk(), IGNITE_ZK_ROOT);
-
- info("Zookeeper nodes:");
-
- for (String s : children)
- info(s);
-
- zkClient.close();
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testRandomTopologyChanges_RestartZk() throws Exception {
- randomTopologyChanges(true, false);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testRandomTopologyChanges_CloseClients() throws Exception {
- randomTopologyChanges(false, true);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testDeployService1() throws Exception {
- startGridsMultiThreaded(3);
-
- grid(0).services(grid(0).cluster()).deployNodeSingleton("test", new GridCacheAbstractFullApiSelfTest.DummyServiceImpl());
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testDeployService2() throws Exception {
- clientMode(false);
-
- startGrid(0);
-
- clientMode(true);
-
- startGrid(1);
-
- grid(0).services(grid(0).cluster()).deployNodeSingleton("test", new GridCacheAbstractFullApiSelfTest.DummyServiceImpl());
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testDeployService3() throws Exception {
- IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable() {
- @Override public Object call() throws Exception {
- clientModeThreadLocal(true);
-
- startGrid(0);
-
- return null;
- }
- }, "start-node");
-
- clientModeThreadLocal(false);
-
- startGrid(1);
-
- fut.get();
-
- grid(0).services(grid(0).cluster()).deployNodeSingleton("test", new GridCacheAbstractFullApiSelfTest.DummyServiceImpl());
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testLargeUserAttribute1() throws Exception {
- initLargeAttribute();
-
- startGrid(0);
-
- printZkNodes();
-
- userAttrs = null;
-
- startGrid(1);
-
- waitForEventsAcks(ignite(0));
-
- printZkNodes();
-
- waitForTopology(2);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testLargeUserAttribute2() throws Exception {
- startGrid(0);
-
- initLargeAttribute();
-
- startGrid(1);
-
- waitForEventsAcks(ignite(0));
-
- printZkNodes();
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testLargeUserAttribute3() throws Exception {
- ThreadLocalRandom rnd = ThreadLocalRandom.current();
-
- long stopTime = System.currentTimeMillis() + 60_000;
-
- int nodes = 0;
-
- for (int i = 0; i < 25; i++) {
- info("Iteration: " + i);
-
- if (rnd.nextBoolean())
- initLargeAttribute();
- else
- userAttrs = null;
-
- clientMode(i > 5);
-
- startGrid(i);
-
- nodes++;
-
- if (System.currentTimeMillis() >= stopTime)
- break;
- }
-
- waitForTopology(nodes);
- }
-
- /**
- *
- */
- private void initLargeAttribute() {
- userAttrs = new HashMap<>();
-
- int[] attr = new int[1024 * 1024 + ThreadLocalRandom.current().nextInt(1024)];
-
- for (int i = 0; i < attr.length; i++)
- attr[i] = i;
-
- userAttrs.put("testAttr", attr);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testLargeCustomEvent() throws Exception {
- Ignite srv0 = startGrid(0);
-
- // Send large message, single node in topology.
- IgniteCache<Object, Object> cache = srv0.createCache(largeCacheConfiguration("c1"));
-
- for (int i = 0; i < 100; i++)
- cache.put(i, i);
-
- assertEquals(1, cache.get(1));
-
- waitForEventsAcks(ignite(0));
-
- startGridsMultiThreaded(1, 3);
-
- srv0.destroyCache("c1");
-
- // Send large message, multiple nodes in topology.
- cache = srv0.createCache(largeCacheConfiguration("c1"));
-
- for (int i = 0; i < 100; i++)
- cache.put(i, i);
-
- printZkNodes();
-
- waitForTopology(4);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testClientReconnectSessionExpire1_1() throws Exception {
- clientReconnectSessionExpire(false);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testClientReconnectSessionExpire1_2() throws Exception {
- clientReconnectSessionExpire(true);
- }
-
- /**
- * @param closeSock Test mode flag.
- * @throws Exception If failed.
- */
- private void clientReconnectSessionExpire(boolean closeSock) throws Exception {
- startGrid(0);
-
- sesTimeout = 2000;
- clientMode(true);
- testSockNio = true;
-
- Ignite client = startGrid(1);
-
- client.cache(DEFAULT_CACHE_NAME).put(1, 1);
-
- reconnectClientNodes(log, Collections.singletonList(client), null, closeSock);
-
- assertEquals(1, client.cache(DEFAULT_CACHE_NAME).get(1));
-
- client.compute().broadcast(new DummyCallable(null));
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testForceClientReconnect() throws Exception {
- final int SRVS = 3;
-
- startGrids(SRVS);
-
- clientMode(true);
-
- startGrid(SRVS);
-
- reconnectClientNodes(Collections.singletonList(ignite(SRVS)), new Callable<Void>() {
- @Override public Void call() throws Exception {
- ZookeeperDiscoverySpi spi = waitSpi(getTestIgniteInstanceName(SRVS));
-
- spi.clientReconnect();
-
- return null;
- }
- });
-
- waitForTopology(SRVS + 1);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testForcibleClientFail() throws Exception {
- final int SRVS = 3;
-
- startGrids(SRVS);
-
- clientMode(true);
-
- startGrid(SRVS);
-
- reconnectClientNodes(Collections.singletonList(ignite(SRVS)), new Callable<Void>() {
- @Override public Void call() throws Exception {
- ZookeeperDiscoverySpi spi = waitSpi(getTestIgniteInstanceName(0));
-
- spi.failNode(ignite(SRVS).cluster().localNode().id(), "Test forcible node fail");
-
- return null;
- }
- });
-
- waitForTopology(SRVS + 1);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testDuplicatedNodeId() throws Exception {
- UUID nodeId0 = nodeId = UUID.randomUUID();
-
- startGrid(0);
-
- int failingNodeIdx = 100;
-
- for (int i = 0; i < 5; i++) {
- final int idx = failingNodeIdx++;
-
- nodeId = nodeId0;
-
- info("Start node with duplicated ID [iter=" + i + ", nodeId=" + nodeId + ']');
-
- GridTestUtils.assertThrows(log, new Callable<Void>() {
- @Override public Void call() throws Exception {
- startGrid(idx);
-
- return null;
- }
- }, IgniteCheckedException.class, null);
-
- nodeId = null;
-
- info("Start node with unique ID [iter=" + i + ']');
-
- Ignite ignite = startGrid(idx);
-
- nodeId0 = ignite.cluster().localNode().id();
-
- waitForTopology(i + 2);
- }
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testPing() throws Exception {
- sesTimeout = 5000;
-
- startGrids(3);
-
- final ZookeeperDiscoverySpi spi = waitSpi(getTestIgniteInstanceName(1));
-
- final UUID nodeId = ignite(2).cluster().localNode().id();
-
- IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Runnable() {
- @Override public void run() {
- assertTrue(spi.pingNode(nodeId));
- }
- }, 32, "ping");
-
- fut.get();
-
- fut = GridTestUtils.runMultiThreadedAsync(new Runnable() {
- @Override public void run() {
- spi.pingNode(nodeId);
- }
- }, 32, "ping");
-
- U.sleep(100);
-
- stopGrid(2);
-
- fut.get();
-
- fut = GridTestUtils.runMultiThreadedAsync(new Runnable() {
- @Override public void run() {
- assertFalse(spi.pingNode(nodeId));
- }
- }, 32, "ping");
-
- fut.get();
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testWithPersistence1() throws Exception {
- startWithPersistence(false);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testWithPersistence2() throws Exception {
- startWithPersistence(true);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testNoOpCommunicationErrorResolve_1() throws Exception {
- communicationErrorResolve_Simple(2);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testNoOpCommunicationErrorResolve_2() throws Exception {
- communicationErrorResolve_Simple(10);
- }
-
- /**
- * @param nodes Nodes number.
- * @throws Exception If failed.
- */
- private void communicationErrorResolve_Simple(int nodes) throws Exception {
- assert nodes > 1;
-
- sesTimeout = 2000;
- commProblemRslvr = NoOpCommunicationProblemResolver.FACTORY;
-
- startGridsMultiThreaded(nodes);
-
- ThreadLocalRandom rnd = ThreadLocalRandom.current();
-
- for (int i = 0; i < 3; i++) {
- info("Iteration: " + i);
-
- int idx1 = rnd.nextInt(nodes);
-
- int idx2;
-
- do {
- idx2 = rnd.nextInt(nodes);
- }
- while (idx1 == idx2);
-
- ZookeeperDiscoverySpi spi = spi(ignite(idx1));
-
- spi.resolveCommunicationError(ignite(idx2).cluster().localNode(), new Exception("test"));
-
- checkInternalStructuresCleanup();
- }
- }
-
- /**
- * Tests case when one node fails before sending communication status.
- *
- * @throws Exception If failed.
- */
- public void testNoOpCommunicationErrorResolve_3() throws Exception {
- sesTimeout = 2000;
- commProblemRslvr = NoOpCommunicationProblemResolver.FACTORY;
-
- startGridsMultiThreaded(3);
-
- sesTimeout = 10_000;
-
- testSockNio = true;
- sesTimeout = 5000;
-
- startGrid(3);
-
- IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Object>() {
- @Override public Object call() {
- ZookeeperDiscoverySpi spi = spi(ignite(0));
-
- spi.resolveCommunicationError(ignite(1).cluster().localNode(), new Exception("test"));
-
- return null;
- }
- });
-
- U.sleep(1000);
-
- ZkTestClientCnxnSocketNIO nio = ZkTestClientCnxnSocketNIO.forNode(ignite(3));
-
- nio.closeSocket(true);
-
- try {
- stopGrid(3);
-
- fut.get();
- }
- finally {
- nio.allowConnect();
- }
-
- waitForTopology(3);
- }
-
- /**
- * Tests case when Coordinator fails while resolve process is in progress.
- *
- * @throws Exception If failed.
- */
- public void testNoOpCommunicationErrorResolve_4() throws Exception {
- testCommSpi = true;
-
- sesTimeout = 2000;
- commProblemRslvr = NoOpCommunicationProblemResolver.FACTORY;
-
- startGrid(0);
-
- startGridsMultiThreaded(1, 3);
-
- ZkTestCommunicationSpi commSpi = ZkTestCommunicationSpi.spi(ignite(3));
-
- commSpi.pingLatch = new CountDownLatch(1);
-
- IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Object>() {
- @Override public Object call() {
- ZookeeperDiscoverySpi spi = spi(ignite(1));
-
- spi.resolveCommunicationError(ignite(2).cluster().localNode(), new Exception("test"));
-
- return null;
- }
- });
-
- U.sleep(1000);
-
- assertFalse(fut.isDone());
-
- stopGrid(0);
-
- commSpi.pingLatch.countDown();
-
- fut.get();
-
- waitForTopology(3);
- }
-
- /**
- * Tests that nodes join is delayed while resolve is in progress.
- *
- * @throws Exception If failed.
- */
- public void testNoOpCommunicationErrorResolve_5() throws Exception {
- testCommSpi = true;
-
- sesTimeout = 2000;
- commProblemRslvr = NoOpCommunicationProblemResolver.FACTORY;
-
- startGrid(0);
-
- startGridsMultiThreaded(1, 3);
-
- ZkTestCommunicationSpi commSpi = ZkTestCommunicationSpi.spi(ignite(3));
-
- commSpi.pingStartLatch = new CountDownLatch(1);
- commSpi.pingLatch = new CountDownLatch(1);
-
- IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Object>() {
- @Override public Object call() {
- ZookeeperDiscoverySpi spi = spi(ignite(1));
-
- spi.resolveCommunicationError(ignite(2).cluster().localNode(), new Exception("test"));
-
- return null;
- }
- });
-
- assertTrue(commSpi.pingStartLatch.await(10, SECONDS));
-
- try {
- assertFalse(fut.isDone());
-
- final AtomicInteger nodeIdx = new AtomicInteger(3);
-
- IgniteInternalFuture<?> startFut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
- @Override public Void call() throws Exception {
- startGrid(nodeIdx.incrementAndGet());
-
- return null;
- }
- }, 3, "start-node");
-
- U.sleep(1000);
-
- assertFalse(startFut.isDone());
-
- assertEquals(4, ignite(0).cluster().nodes().size());
-
- commSpi.pingLatch.countDown();
-
- startFut.get();
- fut.get();
-
- waitForTopology(7);
- }
- finally {
- commSpi.pingLatch.countDown();
- }
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testCommunicationErrorResolve_KillNode_1() throws Exception {
- communicationErrorResolve_KillNodes(2, Collections.singleton(2L));
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testCommunicationErrorResolve_KillNode_2() throws Exception {
- communicationErrorResolve_KillNodes(3, Collections.singleton(2L));
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testCommunicationErrorResolve_KillNode_3() throws Exception {
- communicationErrorResolve_KillNodes(10, Arrays.asList(2L, 4L, 6L));
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testCommunicationErrorResolve_KillCoordinator_1() throws Exception {
- communicationErrorResolve_KillNodes(2, Collections.singleton(1L));
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testCommunicationErrorResolve_KillCoordinator_2() throws Exception {
- communicationErrorResolve_KillNodes(3, Collections.singleton(1L));
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testCommunicationErrorResolve_KillCoordinator_3() throws Exception {
- communicationErrorResolve_KillNodes(10, Arrays.asList(1L, 4L, 6L));
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testCommunicationErrorResolve_KillCoordinator_4() throws Exception {
- communicationErrorResolve_KillNodes(10, Arrays.asList(1L, 2L, 3L));
- }
-
- /**
- * @param startNodes Number of nodes to start.
- * @param killNodes Nodes to kill by resolve process.
- * @throws Exception If failed.
- */
- private void communicationErrorResolve_KillNodes(int startNodes, Collection<Long> killNodes) throws Exception {
- testCommSpi = true;
-
- commProblemRslvr = TestNodeKillCommunicationProblemResolver.factory(killNodes);
-
- startGrids(startNodes);
-
- ZkTestCommunicationSpi commSpi = ZkTestCommunicationSpi.spi(ignite(0));
-
- commSpi.checkRes = new BitSet(startNodes);
-
- ZookeeperDiscoverySpi spi = null;
- UUID killNodeId = null;
-
- for (Ignite node : G.allGrids()) {
- ZookeeperDiscoverySpi spi0 = spi(node);
-
- if (!killNodes.contains(node.cluster().localNode().order()))
- spi = spi0;
- else
- killNodeId = node.cluster().localNode().id();
- }
-
- assertNotNull(spi);
- assertNotNull(killNodeId);
-
- try {
- spi.resolveCommunicationError(spi.getNode(killNodeId), new Exception("test"));
-
- fail("Exception is not thrown");
- }
- catch (IgniteSpiException e) {
- assertTrue("Unexpected exception: " + e, e.getCause() instanceof ClusterTopologyCheckedException);
- }
-
- int expNodes = startNodes - killNodes.size();
-
- waitForTopology(expNodes);
-
- for (Ignite node : G.allGrids())
- assertFalse(killNodes.contains(node.cluster().localNode().order()));
-
- startGrid(startNodes);
-
- waitForTopology(expNodes + 1);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testCommunicationErrorResolve_KillCoordinator_5() throws Exception {
- sesTimeout = 2000;
-
- testCommSpi = true;
- commProblemRslvr = KillCoordinatorCommunicationProblemResolver.FACTORY;
-
- startGrids(10);
-
- int crd = 0;
-
- int nodeIdx = 10;
-
- for (int i = 0; i < 10; i++) {
- info("Iteration: " + i);
-
- for (Ignite node : G.allGrids())
- ZkTestCommunicationSpi.spi(node).initCheckResult(10);
-
- UUID crdId = ignite(crd).cluster().localNode().id();
-
- ZookeeperDiscoverySpi spi = spi(ignite(crd + 1));
-
- try {
- spi.resolveCommunicationError(spi.getNode(crdId), new Exception("test"));
-
- fail("Exception is not thrown");
- }
- catch (IgniteSpiException e) {
- assertTrue("Unexpected exception: " + e, e.getCause() instanceof ClusterTopologyCheckedException);
- }
-
- waitForTopology(9);
-
- startGrid(nodeIdx++);
-
- waitForTopology(10);
-
- crd++;
- }
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testCommunicationErrorResolve_KillRandom() throws Exception {
- sesTimeout = 2000;
-
- testCommSpi = true;
- commProblemRslvr = KillRandomCommunicationProblemResolver.FACTORY;
-
- startGridsMultiThreaded(10);
-
- clientMode(true);
-
- startGridsMultiThreaded(10, 5);
-
- int nodeIdx = 15;
-
- for (int i = 0; i < 10; i++) {
- info("Iteration: " + i);
-
- ZookeeperDiscoverySpi spi = null;
-
- for (Ignite node : G.allGrids()) {
- ZkTestCommunicationSpi.spi(node).initCheckResult(100);
-
- spi = spi(node);
- }
-
- assert spi != null;
-
- try {
- spi.resolveCommunicationError(spi.getRemoteNodes().iterator().next(), new Exception("test"));
- }
- catch (IgniteSpiException ignore) {
- // No-op.
- }
-
- clientMode(ThreadLocalRandom.current().nextBoolean());
-
- startGrid(nodeIdx++);
-
- awaitPartitionMapExchange();
- }
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testDefaultCommunicationErrorResolver1() throws Exception {
- testCommSpi = true;
- sesTimeout = 5000;
-
- startGrids(3);
-
- ZkTestCommunicationSpi.spi(ignite(0)).initCheckResult(3, 0, 1);
- ZkTestCommunicationSpi.spi(ignite(1)).initCheckResult(3, 0, 1);
- ZkTestCommunicationSpi.spi(ignite(2)).initCheckResult(3, 2);
-
- UUID killedId = nodeId(2);
-
- assertNotNull(ignite(0).cluster().node(killedId));
-
- ZookeeperDiscoverySpi spi = spi(ignite(0));
-
- spi.resolveCommunicationError(spi.getNode(ignite(1).cluster().localNode().id()), new Exception("test"));
-
- waitForTopology(2);
-
- assertNull(ignite(0).cluster().node(killedId));
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testDefaultCommunicationErrorResolver2() throws Exception {
- testCommSpi = true;
- sesTimeout = 5000;
-
- startGrids(3);
-
- clientMode(true);
-
- startGridsMultiThreaded(3, 2);
-
- ZkTestCommunicationSpi.spi(ignite(0)).initCheckResult(5, 0, 1);
- ZkTestCommunicationSpi.spi(ignite(1)).initCheckResult(5, 0, 1);
- ZkTestCommunicationSpi.spi(ignite(2)).initCheckResult(5, 2, 3, 4);
- ZkTestCommunicationSpi.spi(ignite(3)).initCheckResult(5, 2, 3, 4);
- ZkTestCommunicationSpi.spi(ignite(4)).initCheckResult(5, 2, 3, 4);
-
- ZookeeperDiscoverySpi spi = spi(ignite(0));
-
- spi.resolveCommunicationError(spi.getNode(ignite(1).cluster().localNode().id()), new Exception("test"));
-
- waitForTopology(2);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testDefaultCommunicationErrorResolver3() throws Exception {
- defaultCommunicationErrorResolver_BreakCommunication(3, 1);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testDefaultCommunicationErrorResolver4() throws Exception {
- defaultCommunicationErrorResolver_BreakCommunication(3, 0);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testDefaultCommunicationErrorResolver5() throws Exception {
- defaultCommunicationErrorResolver_BreakCommunication(10, 1, 3, 6);
- }
-
- /**
- * @param startNodes Initial nodes number.
- * @param breakNodes Node indices where communication server is closed.
- * @throws Exception If failed.
- */
- private void defaultCommunicationErrorResolver_BreakCommunication(int startNodes, final int...breakNodes) throws Exception {
- sesTimeout = 5000;
-
- startGridsMultiThreaded(startNodes);
-
- final CyclicBarrier b = new CyclicBarrier(breakNodes.length);
-
- GridTestUtils.runMultiThreaded(new IgniteInClosure<Integer>() {
- @Override public void apply(Integer threadIdx) {
- try {
- b.await();
-
- int nodeIdx = breakNodes[threadIdx];
-
- info("Close communication: " + nodeIdx);
-
- ((TcpCommunicationSpi)ignite(nodeIdx).configuration().getCommunicationSpi()).simulateNodeFailure();
- }
- catch (Exception e) {
- fail("Unexpected error: " + e);
- }
- }
- }, breakNodes.length, "break-communication");
-
- waitForTopology(startNodes - breakNodes.length);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testConnectionCheck() throws Exception {
- final int NODES = 5;
-
- startGridsMultiThreaded(NODES);
-
- for (int i = 0; i < NODES; i++) {
- Ignite node = ignite(i);
-
- TcpCommunicationSpi spi = (TcpCommunicationSpi)node.configuration().getCommunicationSpi();
-
- List<ClusterNode> nodes = new ArrayList<>(node.cluster().nodes());
-
- BitSet res = spi.checkConnection(nodes).get();
-
- for (int j = 0; j < NODES; j++)
- assertTrue(res.get(j));
- }
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testReconnectDisabled_ConnectionLost() throws Exception {
- clientReconnectDisabled = true;
-
- startGrid(0);
-
- sesTimeout = 3000;
- testSockNio = true;
- client = true;
-
- Ignite client = startGrid(1);
-
- final CountDownLatch latch = new CountDownLatch(1);
-
- client.events().localListen(new IgnitePredicate<Event>() {
- @Override public boolean apply(Event evt) {
- latch.countDown();
-
- return false;
- }
- }, EventType.EVT_NODE_SEGMENTED);
-
- ZkTestClientCnxnSocketNIO nio = ZkTestClientCnxnSocketNIO.forNode(client);
-
- nio.closeSocket(true);
-
- try {
- waitNoAliveZkNodes(log,
- zkCluster.getConnectString(),
- Collections.singletonList(aliveZkNodePath(client)),
- 10_000);
- }
- finally {
- nio.allowConnect();
- }
-
- assertTrue(latch.await(10, SECONDS));
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testServersLeft_FailOnTimeout() throws Exception {
- startGrid(0);
-
- final int CLIENTS = 5;
-
- joinTimeout = 3000;
-
- clientMode(true);
-
- startGridsMultiThreaded(1, CLIENTS);
-
- waitForTopology(CLIENTS + 1);
-
- final CountDownLatch latch = new CountDownLatch(CLIENTS);
-
- for (int i = 0; i < CLIENTS; i++) {
- Ignite node = ignite(i + 1);
-
- node.events().localListen(new IgnitePredicate<Event>() {
- @Override public boolean apply(Event evt) {
- latch.countDown();
-
- return false;
- }
- }, EventType.EVT_NODE_SEGMENTED);
- }
-
- stopGrid(getTestIgniteInstanceName(0), true, false);
-
- assertTrue(latch.await(10, SECONDS));
-
- evts.clear();
- }
-
- /**
- *
- */
- public void testStartNoServers_FailOnTimeout() {
- joinTimeout = 3000;
-
- clientMode(true);
-
- long start = System.currentTimeMillis();
-
- Throwable err = GridTestUtils.assertThrows(log, new Callable<Void>() {
- @Override public Void call() throws Exception {
- startGrid(0);
-
- return null;
- }
- }, IgniteCheckedException.class, null);
-
- assertTrue(System.currentTimeMillis() >= start + joinTimeout);
-
- IgniteSpiException spiErr = X.cause(err, IgniteSpiException.class);
-
- assertNotNull(spiErr);
- assertTrue(spiErr.getMessage().contains("Failed to connect to cluster within configured timeout"));
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testStartNoServer_WaitForServers1() throws Exception {
- startNoServer_WaitForServers(0);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testStartNoServer_WaitForServers2() throws Exception {
- startNoServer_WaitForServers(10_000);
- }
-
- /**
- * @param joinTimeout Join timeout.
- * @throws Exception If failed.
- */
- private void startNoServer_WaitForServers(long joinTimeout) throws Exception {
- this.joinTimeout = joinTimeout;
-
- IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() {
- @Override public Void call() throws Exception {
- clientModeThreadLocal(true);
-
- startGrid(0);
-
- return null;
- }
- });
-
- U.sleep(3000);
-
- waitSpi(getTestIgniteInstanceName(0));
-
- clientModeThreadLocal(false);
-
- startGrid(1);
-
- fut.get();
-
- waitForTopology(2);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testDisconnectOnServersLeft_1() throws Exception {
- disconnectOnServersLeft(1, 1);
- }
-
- /**
- * @throws Exception If f
<TRUNCATED>