You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2015/08/21 11:05:18 UTC
[33/44] incubator-ignite git commit: ignite-1203 - Zookeeper IP
finder. Closes #16.
ignite-1203 - Zookeeper IP finder. Closes #16.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/d69e7785
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/d69e7785
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/d69e7785
Branch: refs/heads/ignite-gg-9615-1
Commit: d69e7785afe15fe99599e992f37c742b8fa57ba8
Parents: 452af6a
Author: Raul Kripalani <ra...@apache.org>
Authored: Thu Aug 20 18:09:27 2015 -0700
Committer: Valentin Kulichenko <va...@gmail.com>
Committed: Thu Aug 20 18:09:27 2015 -0700
----------------------------------------------------------------------
modules/zookeeper/pom.xml | 89 +++++
.../zk/TcpDiscoveryZookeeperIpFinder.java | 350 +++++++++++++++++
.../tcp/ipfinder/zk/ZookeeperIpFinderTest.java | 390 +++++++++++++++++++
pom.xml | 1 +
4 files changed, 830 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d69e7785/modules/zookeeper/pom.xml
----------------------------------------------------------------------
diff --git a/modules/zookeeper/pom.xml b/modules/zookeeper/pom.xml
new file mode 100644
index 0000000..bf0c578
--- /dev/null
+++ b/modules/zookeeper/pom.xml
@@ -0,0 +1,89 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!--
+ POM file.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-parent</artifactId>
+ <version>1</version>
+ <relativePath>../../parent</relativePath>
+ </parent>
+
+ <artifactId>ignite-zookeeper</artifactId>
+ <version>1.4.1-SNAPSHOT</version>
+
+ <properties>
+ <curator.version>2.8.0</curator.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-framework</artifactId>
+ <version>${curator.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-x-discovery</artifactId>
+ <version>${curator.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-test</artifactId>
+ <version>${curator.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-log4j</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-spring</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-core</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d69e7785/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/TcpDiscoveryZookeeperIpFinder.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/TcpDiscoveryZookeeperIpFinder.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/TcpDiscoveryZookeeperIpFinder.java
new file mode 100644
index 0000000..dd1a4a7
--- /dev/null
+++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/TcpDiscoveryZookeeperIpFinder.java
@@ -0,0 +1,350 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp.ipfinder.zk;
+
+import org.apache.curator.*;
+import org.apache.curator.framework.*;
+import org.apache.curator.framework.imps.*;
+import org.apache.curator.retry.*;
+import org.apache.curator.x.discovery.*;
+import org.apache.curator.x.discovery.details.*;
+import org.apache.ignite.*;
+import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.resources.*;
+import org.apache.ignite.spi.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+
+import com.google.common.collect.*;
+import org.codehaus.jackson.map.annotate.*;
+
+import java.net.*;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+/**
+ * This TCP Discovery IP Finder uses Apache ZooKeeper (ZK) to locate peer nodes when bootstrapping in order to join
+ * the cluster. It uses the Apache Curator library to interact with ZooKeeper in a simple manner. Specifically,
+ * it uses the {@link ServiceDiscovery} recipe, which makes use of ephemeral nodes in ZK to register services.
+ *
+ * <p>
+ * There are several ways to instantiate the TcpDiscoveryZookeeperIpFinder:
+ * <li>
+ * <ul>By providing an instance of {@link CuratorFramework} directly, in which case no ZK Connection String
+ * is required.</ul>
+ * <ul>By providing a ZK Connection String through {@link #setZkConnectionString(String)}, and optionally
+ * a {@link RetryPolicy} through the setter. If the latter is not provided, a default
+ * {@link ExponentialBackoffRetry} policy is used, with a base sleep time of 1000ms and 10 retries.</ul>
+ * <ul>By providing a ZK Connection String through system property {@link #PROP_ZK_CONNECTION_STRING}. If this
+ * property is set, it overrides the ZK Connection String passed in as a property, but it does not override
+ * the {@link CuratorFramework} if provided.</ul>
+ * </li>
+ *
+ * You may customise the base path for services, as well as the service name. By default {@link #BASE_PATH} and
+ * {@link #SERVICE_NAME} are use respectively. You can also choose to enable or disable duplicate registrations. See
+ * {@link #setAllowDuplicateRegistrations(boolean)} for more details.
+ *
+ * @see <a href="http://zookeeper.apache.org">Apache ZooKeeper</a>
+ * @see <a href="http://curator.apache.org">Apache Curator</a>
+ *
+ * @author Raul Kripalani
+ */
+public class TcpDiscoveryZookeeperIpFinder extends TcpDiscoveryIpFinderAdapter {
+
+ /** System property name to provide the ZK Connection String. */
+ public static final String PROP_ZK_CONNECTION_STRING = "IGNITE_ZK_CONNECTION_STRING";
+
+ /** Default base path for service registrations. */
+ private static final String BASE_PATH = "/services";
+
+ /** Default service name for service registrations. */
+ private static final String SERVICE_NAME = "ignite";
+
+ /** Default URI Spec to use with the {@link ServiceDiscoveryBuilder}. */
+ private static final UriSpec URI_SPEC = new UriSpec("{address}:{port}");
+
+ /** Init guard. */
+ @GridToStringExclude
+ private final AtomicBoolean initGuard = new AtomicBoolean();
+
+ /** Logger. */
+ @LoggerResource
+ private IgniteLogger log;
+
+ /** The Curator framework in use, either injected or constructed by this component. */
+ private CuratorFramework curator;
+
+ /** The ZK Connection String if provided by the user. */
+ private String zkConnectionString;
+
+ /** Retry policy to use. */
+ private RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
+
+ /** Base path to use, by default {#link #BASE_PATH}. */
+ private String basePath = BASE_PATH;
+
+ /** Service name to use, by default {#link #SERVICE_NAME}. */
+ private String serviceName = SERVICE_NAME;
+
+ /** Whether to allow or not duplicate registrations. See setter doc. */
+ private boolean allowDuplicateRegistrations = false;
+
+ /** The Service Discovery recipe. */
+ private ServiceDiscovery<IgniteInstanceDetails> discovery;
+
+ /** Map of the {#link ServiceInstance}s we have registered. */
+ private Map<InetSocketAddress, ServiceInstance<IgniteInstanceDetails>> ourInstances = new ConcurrentHashMap<>();
+
+ /** Constructor. */
+ public TcpDiscoveryZookeeperIpFinder() {
+ setShared(true);
+ }
+
+ /** Initializes this IP Finder by creating the appropriate Curator objects. */
+ private void init() {
+ if (!initGuard.compareAndSet(false, true))
+ return;
+
+ String sysPropZkConnString = System.getProperty(PROP_ZK_CONNECTION_STRING);
+
+ if (sysPropZkConnString != null && sysPropZkConnString.trim().length() > 0)
+ zkConnectionString = sysPropZkConnString;
+
+ log.info("Initializing ZooKeeper IP Finder.");
+
+ if (curator == null) {
+ A.notNullOrEmpty(zkConnectionString, String.format("ZooKeeper URL (or system property %s) cannot be null " +
+ "or empty if a CuratorFramework object is not provided explicitly", PROP_ZK_CONNECTION_STRING));
+ curator = CuratorFrameworkFactory.newClient(zkConnectionString, retryPolicy);
+ }
+
+ if (curator.getState() != CuratorFrameworkState.STARTED)
+ curator.start();
+
+ discovery = ServiceDiscoveryBuilder.builder(IgniteInstanceDetails.class)
+ .client(curator)
+ .basePath(basePath)
+ .serializer(new JsonInstanceSerializer<>(IgniteInstanceDetails.class))
+ .build();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onSpiContextDestroyed() {
+ if (!initGuard.compareAndSet(true, false))
+ return;
+
+ log.info("Destroying ZooKeeper IP Finder.");
+
+ super.onSpiContextDestroyed();
+
+ if (curator != null)
+ curator.close();
+
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<InetSocketAddress> getRegisteredAddresses() throws IgniteSpiException {
+ init();
+
+ if (log.isDebugEnabled())
+ log.debug("Getting registered addresses from ZooKeeper IP Finder.");
+
+ Collection<ServiceInstance<IgniteInstanceDetails>> serviceInstances;
+
+ try {
+ serviceInstances = discovery.queryForInstances(serviceName);
+ } catch (Exception e) {
+ log.warning("Error while getting registered addresses from ZooKeeper IP Finder.", e);
+ return Collections.emptyList();
+ }
+
+ Set<InetSocketAddress> answer = new HashSet<>();
+
+ for (ServiceInstance<IgniteInstanceDetails> si : serviceInstances)
+ answer.add(new InetSocketAddress(si.getAddress(), si.getPort()));
+
+ log.info("ZooKeeper IP Finder resolved addresses: " + answer);
+
+ return answer;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void registerAddresses(Collection<InetSocketAddress> addrs) throws IgniteSpiException {
+ init();
+
+ log.info("Registering addresses with ZooKeeper IP Finder: " + addrs);
+
+ Set<InetSocketAddress> registrationsToIgnore = Sets.newHashSet();
+ if (!allowDuplicateRegistrations) {
+ try {
+ for (ServiceInstance<IgniteInstanceDetails> sd : discovery.queryForInstances(serviceName))
+ registrationsToIgnore.add(new InetSocketAddress(sd.getAddress(), sd.getPort()));
+ }
+ catch (Exception e) {
+ log.warning("Error while finding currently registered services to avoid duplicate registrations", e);
+ throw new IgniteSpiException(e);
+ }
+ }
+
+ for (InetSocketAddress addr : addrs) {
+ if (registrationsToIgnore.contains(addr))
+ continue;
+
+ try {
+ ServiceInstance<IgniteInstanceDetails> si = ServiceInstance.<IgniteInstanceDetails>builder()
+ .name(serviceName)
+ .uriSpec(URI_SPEC)
+ .address(addr.getAddress().getHostAddress())
+ .port(addr.getPort())
+ .build();
+
+ ourInstances.put(addr, si);
+
+ discovery.registerService(si);
+
+ } catch (Exception e) {
+ log.warning(String.format("Error while registering an address from ZooKeeper IP Finder " +
+ "[message=%s,addresses=%s]", e.getMessage(), addr), e);
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void unregisterAddresses(Collection<InetSocketAddress> addrs) throws IgniteSpiException {
+
+ // if curator is not STARTED, we have nothing to unregister, because we are using ephemeral nodes,
+ // which means that our addresses will only be registered in ZK as long as our connection is alive
+ if (curator.getState() != CuratorFrameworkState.STARTED)
+ return;
+
+ log.info("Unregistering addresses with ZooKeeper IP Finder: " + addrs);
+
+ for (InetSocketAddress addr : addrs) {
+ ServiceInstance<IgniteInstanceDetails> si = ourInstances.get(addr);
+ if (si == null) {
+ log.warning("Asked to unregister address from ZooKeeper IP Finder, but no match was found in local " +
+ "instance map for: " + addrs);
+ continue;
+ }
+
+ try {
+ discovery.unregisterService(si);
+ } catch (Exception e) {
+ log.warning("Error while unregistering an address from ZooKeeper IP Finder: " + addr, e);
+ }
+ }
+ }
+
+ /**
+ * @param curator A {@link CuratorFramework} instance to use. It can already be in <tt>STARTED</tt> state.
+ */
+ public void setCurator(CuratorFramework curator) {
+ this.curator = curator;
+ }
+
+ /**
+ * @return The ZooKeeper connection string, only if set explicitly. Else, it returns null.
+ */
+ public String getZkConnectionString() {
+ return zkConnectionString;
+ }
+
+ /**
+ * @param zkConnectionString ZooKeeper connection string in case a {@link CuratorFramework} is not being set explicitly.
+ */
+ public void setZkConnectionString(String zkConnectionString) {
+ this.zkConnectionString = zkConnectionString;
+ }
+
+ /**
+ * @return Retry policy in use if, and only if, it was set explicitly. Else, it returns null.
+ */
+ public RetryPolicy getRetryPolicy() {
+ return retryPolicy;
+ }
+
+ /**
+ * @param retryPolicy {@link RetryPolicy} to use in case a ZK Connection String is being injected, or if
+ * using a system property.
+ */
+ public void setRetryPolicy(RetryPolicy retryPolicy) {
+ this.retryPolicy = retryPolicy;
+ }
+
+ /**
+ * @return Base path for service registration in ZK. Default value: {@link #BASE_PATH}.
+ */
+ public String getBasePath() {
+ return basePath;
+ }
+
+ /**
+ * @param basePath Base path for service registration in ZK. If not passed, {@link #BASE_PATH} will be used.
+ */
+ public void setBasePath(String basePath) {
+ this.basePath = basePath;
+ }
+
+ /**
+ * @return Service name being used, in Curator terms. See {@link #setServiceName(String)} for more information.
+ */
+ public String getServiceName() {
+ return serviceName;
+ }
+
+ /**
+ * @param serviceName Service name to use, as defined by Curator's {#link ServiceDiscovery} recipe. In physical
+ * ZK terms, it represents the node under {@link #basePath}, under which services will be
+ * registered.
+ */
+ public void setServiceName(String serviceName) {
+ this.serviceName = serviceName;
+ }
+
+ /**
+ * * @return The value of this flag. See {@link #setAllowDuplicateRegistrations(boolean)} for more details.
+ */
+ public boolean isAllowDuplicateRegistrations() {
+ return allowDuplicateRegistrations;
+ }
+
+ /**
+ * @param allowDuplicateRegistrations Whether to register each node only once, or if duplicate registrations
+ * are allowed. Nodes will attempt to register themselves, plus those they
+ * know about. By default, duplicate registrations are not allowed, but you
+ * might want to set this property to <tt>true</tt> if you have multiple
+ * network interfaces or if you are facing troubles.
+ */
+ public void setAllowDuplicateRegistrations(boolean allowDuplicateRegistrations) {
+ this.allowDuplicateRegistrations = allowDuplicateRegistrations;
+ }
+
+ /**
+ * Empty DTO for storing service instances details. Currently acting as a placeholder because Curator requires
+ * a payload type when registering and discovering nodes. May be enhanced in the future with further information
+ * to assist discovery.
+ *
+ * @author Raul Kripalani
+ */
+ @JsonRootName("ignite_instance_details")
+ private class IgniteInstanceDetails {
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d69e7785/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/ZookeeperIpFinderTest.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/ZookeeperIpFinderTest.java b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/ZookeeperIpFinderTest.java
new file mode 100644
index 0000000..ce059a3
--- /dev/null
+++ b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/ZookeeperIpFinderTest.java
@@ -0,0 +1,390 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp.ipfinder.zk;
+
+import org.apache.curator.framework.*;
+import org.apache.curator.retry.*;
+import org.apache.curator.test.*;
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+/**
+ * Test for {@link TcpDiscoveryZookeeperIpFinder}.
+ *
+ * @author Raul Kripalani
+ */
+public class ZookeeperIpFinderTest extends GridCommonAbstractTest {
+
+ /** ZK Cluster size. */
+ private static final int ZK_CLUSTER_SIZE = 3;
+
+ /** ZK Path size. */
+ private static final String SERVICES_IGNITE_ZK_PATH = "/services/ignite";
+
+ /** The ZK cluster instance, from curator-test. */
+ private TestingCluster zkCluster;
+
+ /** A Curator client to perform assertions on the embedded ZK instances. */
+ private CuratorFramework zkCurator;
+
+ /** Whether to allow duplicate registrations for the current test method or not. */
+ private boolean allowDuplicateRegistrations = false;
+
+ /** Constructor that does not start any grids. */
+ public ZookeeperIpFinderTest() {
+ super(false);
+ }
+
+ /**
+ * Before test.
+ * @throws Exception
+ */
+ @Override public void beforeTest() throws Exception {
+ super.beforeTest();
+
+ // remove stale system properties
+ System.getProperties().remove(TcpDiscoveryZookeeperIpFinder.PROP_ZK_CONNECTION_STRING);
+
+ // start the ZK cluster
+ zkCluster = new TestingCluster(ZK_CLUSTER_SIZE);
+ zkCluster.start();
+
+ // start the Curator client so we can perform assertions on the ZK state later
+ zkCurator = CuratorFrameworkFactory.newClient(zkCluster.getConnectString(), new RetryNTimes(10, 1000));
+ zkCurator.start();
+
+ }
+
+ /**
+ * After test.
+ * @throws Exception
+ */
+ @Override public void afterTest() throws Exception {
+ super.afterTest();
+
+ if (zkCurator != null)
+ zkCurator.close();
+
+ if (zkCluster != null) {
+ zkCluster.stop();
+ zkCluster.close();
+ }
+
+ stopAllGrids();
+
+ }
+
+ /**
+ * Enhances the default configuration with the {#TcpDiscoveryZookeeperIpFinder}.
+ *
+ * @param gridName Grid name.
+ * @return
+ * @throws Exception
+ */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration configuration = super.getConfiguration(gridName);
+
+ TcpDiscoverySpi tcpDisco = (TcpDiscoverySpi) configuration.getDiscoverySpi();
+ TcpDiscoveryZookeeperIpFinder zkIpFinder = new TcpDiscoveryZookeeperIpFinder();
+ zkIpFinder.setAllowDuplicateRegistrations(isAllowDuplicateRegistrations());
+
+ // first node => configure with zkUrl; second node => configure with CuratorFramework; third and subsequent
+ // shall be configured through system property
+ if (gridName.equals(getTestGridName(0))) {
+ zkIpFinder.setZkConnectionString(zkCluster.getConnectString());
+ }
+ else if (gridName.equals(getTestGridName(1))) {
+ zkIpFinder.setCurator(CuratorFrameworkFactory.newClient(zkCluster.getConnectString(),
+ new ExponentialBackoffRetry(100, 5)));
+ }
+
+ tcpDisco.setIpFinder(zkIpFinder);
+ return configuration;
+ }
+
+ /**
+ * @throws Exception
+ */
+ public void testOneIgniteNodeIsAlone() throws Exception {
+ startGrid(0);
+
+ assertEquals(1, grid(0).cluster().metrics().getTotalNodes());
+
+ stopAllGrids();
+ }
+
+ /**
+ * @throws Exception
+ */
+ public void testTwoIgniteNodesFindEachOther() throws Exception {
+ // start one node
+ startGrid(0);
+
+ // set up an event listener to expect one NODE_JOINED event
+ CountDownLatch latch = expectJoinEvents(grid(0), 1);
+
+ // start the other node
+ startGrid(1);
+
+ // assert the nodes see each other
+ assertEquals(2, grid(0).cluster().metrics().getTotalNodes());
+ assertEquals(2, grid(1).cluster().metrics().getTotalNodes());
+
+ // assert the event listener got as many events as expected
+ latch.await(1, TimeUnit.SECONDS);
+
+ stopAllGrids();
+ }
+
+ /**
+ * @throws Exception
+ */
+ public void testThreeNodesWithThreeDifferentConfigMethods() throws Exception {
+ // start one node
+ startGrid(0);
+
+ // set up an event listener to expect one NODE_JOINED event
+ CountDownLatch latch = expectJoinEvents(grid(0), 2);
+
+ // start the 2nd node
+ startGrid(1);
+
+ // start the 3rd node, first setting the system property
+ System.setProperty(TcpDiscoveryZookeeperIpFinder.PROP_ZK_CONNECTION_STRING, zkCluster.getConnectString());
+ startGrid(2);
+
+ // wait until all grids are started
+ waitForRemoteNodes(grid(0), 2);
+
+ // assert the nodes see each other
+ assertEquals(3, grid(0).cluster().metrics().getTotalNodes());
+ assertEquals(3, grid(1).cluster().metrics().getTotalNodes());
+ assertEquals(3, grid(2).cluster().metrics().getTotalNodes());
+
+ // assert the event listener got as many events as expected
+ latch.await(1, TimeUnit.SECONDS);
+
+ stopAllGrids();
+ }
+
+ /**
+ * @throws Exception
+ */
+ public void testFourNodesStartingAndStopping() throws Exception {
+ // start one node
+ startGrid(0);
+
+ // set up an event listener to expect one NODE_JOINED event
+ CountDownLatch latch = expectJoinEvents(grid(0), 3);
+
+ // start the 2nd node
+ startGrid(1);
+
+ // start the 3rd & 4th nodes, first setting the system property
+ System.setProperty(TcpDiscoveryZookeeperIpFinder.PROP_ZK_CONNECTION_STRING, zkCluster.getConnectString());
+ startGrid(2);
+ startGrid(3);
+
+ // wait until all grids are started
+ waitForRemoteNodes(grid(0), 3);
+
+ // assert the nodes see each other
+ assertEquals(4, grid(0).cluster().metrics().getTotalNodes());
+ assertEquals(4, grid(1).cluster().metrics().getTotalNodes());
+ assertEquals(4, grid(2).cluster().metrics().getTotalNodes());
+ assertEquals(4, grid(3).cluster().metrics().getTotalNodes());
+
+ // assert the event listener got as many events as expected
+ latch.await(1, TimeUnit.SECONDS);
+
+ // stop the first grid
+ stopGrid(0);
+
+ // make sure that nodes were synchronized; they should only see 3 now
+ assertEquals(3, grid(1).cluster().metrics().getTotalNodes());
+ assertEquals(3, grid(2).cluster().metrics().getTotalNodes());
+ assertEquals(3, grid(3).cluster().metrics().getTotalNodes());
+
+ // stop all remaining grids
+ stopGrid(1);
+ stopGrid(2);
+ stopGrid(3);
+
+ // check that the nodes are gone in ZK
+ assertEquals(0, zkCurator.getChildren().forPath(SERVICES_IGNITE_ZK_PATH).size());
+ }
+
+ /**
+ * @throws Exception
+ */
+ public void testFourNodesWithDuplicateRegistrations() throws Exception {
+ setAllowDuplicateRegistrations(true);
+
+ // start 4 nodes
+ System.setProperty(TcpDiscoveryZookeeperIpFinder.PROP_ZK_CONNECTION_STRING, zkCluster.getConnectString());
+ startGrids(4);
+
+ // wait until all grids are started
+ waitForRemoteNodes(grid(0), 3);
+
+ // each node will register itself + the node that it connected to to join the cluster
+ assertEquals(7, zkCurator.getChildren().forPath(SERVICES_IGNITE_ZK_PATH).size());
+
+ // stop all grids
+ stopAllGrids();
+
+ // check that all nodes are gone in ZK
+ assertEquals(0, zkCurator.getChildren().forPath(SERVICES_IGNITE_ZK_PATH).size());
+ }
+
+ /**
+ * @throws Exception
+ */
+ public void testFourNodesWithNoDuplicateRegistrations() throws Exception {
+ setAllowDuplicateRegistrations(false);
+
+ // start 4 nodes
+ System.setProperty(TcpDiscoveryZookeeperIpFinder.PROP_ZK_CONNECTION_STRING, zkCluster.getConnectString());
+ startGrids(4);
+
+ // wait until all grids are started
+ waitForRemoteNodes(grid(0), 3);
+
+ // each node will only register itself
+ assertEquals(4, zkCurator.getChildren().forPath(SERVICES_IGNITE_ZK_PATH).size());
+
+ // stop all grids
+ stopAllGrids();
+
+ // check that all nodes are gone in ZK
+ assertEquals(0, zkCurator.getChildren().forPath(SERVICES_IGNITE_ZK_PATH).size());
+ }
+
+ /**
+ * @throws Exception
+ */
+ public void testFourNodesRestartLastSeveralTimes() throws Exception {
+ setAllowDuplicateRegistrations(false);
+
+ // start 4 nodes
+ System.setProperty(TcpDiscoveryZookeeperIpFinder.PROP_ZK_CONNECTION_STRING, zkCluster.getConnectString());
+ startGrids(4);
+
+ // wait until all grids are started
+ waitForRemoteNodes(grid(0), 3);
+
+ // each node will only register itself
+ assertEquals(4, zkCurator.getChildren().forPath(SERVICES_IGNITE_ZK_PATH).size());
+
+ // repeat 5 times
+ for (int i = 0; i < 5; i++) {
+ // stop last grid
+ stopGrid(2);
+
+ // check that the node has unregistered itself and its party
+ assertEquals(3, zkCurator.getChildren().forPath(SERVICES_IGNITE_ZK_PATH).size());
+
+ // start the node again
+ startGrid(2);
+
+ // check that the node back in ZK
+ assertEquals(4, zkCurator.getChildren().forPath(SERVICES_IGNITE_ZK_PATH).size());
+ }
+
+ stopAllGrids();
+
+ assertEquals(0, zkCurator.getChildren().forPath(SERVICES_IGNITE_ZK_PATH).size());
+
+ }
+
+ /**
+ * @throws Exception
+ */
+ public void testFourNodesKillRestartZookeeper() throws Exception {
+ setAllowDuplicateRegistrations(false);
+
+ // start 4 nodes
+ System.setProperty(TcpDiscoveryZookeeperIpFinder.PROP_ZK_CONNECTION_STRING, zkCluster.getConnectString());
+ startGrids(4);
+
+ // wait until all grids are started
+ waitForRemoteNodes(grid(0), 3);
+
+ // each node will only register itself
+ assertEquals(4, zkCurator.getChildren().forPath(SERVICES_IGNITE_ZK_PATH).size());
+
+ // remember ZK server configuration and stop the cluster
+ Collection<InstanceSpec> instances = zkCluster.getInstances();
+ zkCluster.stop();
+ Thread.sleep(1000);
+
+ // start the cluster with the previous configuration
+ zkCluster = new TestingCluster(instances);
+ zkCluster.start();
+
+ // block the client until connected
+ zkCurator.blockUntilConnected();
+
+ // check that the nodes have registered again
+ assertEquals(4, zkCurator.getChildren().forPath(SERVICES_IGNITE_ZK_PATH).size());
+
+ // stop all grids
+ stopAllGrids();
+ Thread.sleep(2000);
+
+ // check that all nodes are gone in ZK
+ assertEquals(0, zkCurator.getChildren().forPath(SERVICES_IGNITE_ZK_PATH).size());
+ }
+
+ /**
+ * @throws Exception
+ */
+ private CountDownLatch expectJoinEvents(Ignite ignite, int joinEventCount) {
+ final CountDownLatch latch = new CountDownLatch(joinEventCount);
+
+ ignite.events().remoteListen(new IgniteBiPredicate<UUID, Event>() {
+ @Override public boolean apply(UUID uuid, Event event) {
+ latch.countDown();
+ return true;
+ }
+ }, null, EventType.EVT_NODE_JOINED);
+
+ return latch;
+ }
+
+ /**
+ * @throws Exception
+ */
+ public void setAllowDuplicateRegistrations(boolean allowDuplicateRegistrations) {
+ this.allowDuplicateRegistrations = allowDuplicateRegistrations;
+ }
+
+ /**
+ * @throws Exception
+ */
+ public boolean isAllowDuplicateRegistrations() {
+ return allowDuplicateRegistrations;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d69e7785/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index f3a5d65..fa3eaa4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -75,6 +75,7 @@
<module>modules/kafka</module>
<module>modules/yarn</module>
<module>modules/jms11</module>
+ <module>modules/zookeeper</module>
</modules>
<profiles>