You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2015/08/21 11:05:18 UTC

[33/44] incubator-ignite git commit: ignite-1203 - Zookeeper IP finder. Closes #16.

ignite-1203 - Zookeeper IP finder. Closes #16.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/d69e7785
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/d69e7785
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/d69e7785

Branch: refs/heads/ignite-gg-9615-1
Commit: d69e7785afe15fe99599e992f37c742b8fa57ba8
Parents: 452af6a
Author: Raul Kripalani <ra...@apache.org>
Authored: Thu Aug 20 18:09:27 2015 -0700
Committer: Valentin Kulichenko <va...@gmail.com>
Committed: Thu Aug 20 18:09:27 2015 -0700

----------------------------------------------------------------------
 modules/zookeeper/pom.xml                       |  89 +++++
 .../zk/TcpDiscoveryZookeeperIpFinder.java       | 350 +++++++++++++++++
 .../tcp/ipfinder/zk/ZookeeperIpFinderTest.java  | 390 +++++++++++++++++++
 pom.xml                                         |   1 +
 4 files changed, 830 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d69e7785/modules/zookeeper/pom.xml
----------------------------------------------------------------------
diff --git a/modules/zookeeper/pom.xml b/modules/zookeeper/pom.xml
new file mode 100644
index 0000000..bf0c578
--- /dev/null
+++ b/modules/zookeeper/pom.xml
@@ -0,0 +1,89 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<!--
+    POM file.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.ignite</groupId>
+        <artifactId>ignite-parent</artifactId>
+        <version>1</version>
+        <relativePath>../../parent</relativePath>
+    </parent>
+
+    <artifactId>ignite-zookeeper</artifactId>
+    <version>1.4.1-SNAPSHOT</version>
+
+    <properties>
+        <curator.version>2.8.0</curator.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-core</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-framework</artifactId>
+            <version>${curator.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-x-discovery</artifactId>
+            <version>${curator.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-test</artifactId>
+            <version>${curator.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-log4j</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-spring</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-core</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d69e7785/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/TcpDiscoveryZookeeperIpFinder.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/TcpDiscoveryZookeeperIpFinder.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/TcpDiscoveryZookeeperIpFinder.java
new file mode 100644
index 0000000..dd1a4a7
--- /dev/null
+++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/TcpDiscoveryZookeeperIpFinder.java
@@ -0,0 +1,350 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp.ipfinder.zk;
+
+import org.apache.curator.*;
+import org.apache.curator.framework.*;
+import org.apache.curator.framework.imps.*;
+import org.apache.curator.retry.*;
+import org.apache.curator.x.discovery.*;
+import org.apache.curator.x.discovery.details.*;
+import org.apache.ignite.*;
+import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.resources.*;
+import org.apache.ignite.spi.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+
+import com.google.common.collect.*;
+import org.codehaus.jackson.map.annotate.*;
+
+import java.net.*;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+/**
+ * This TCP Discovery IP Finder uses Apache ZooKeeper (ZK) to locate peer nodes when bootstrapping in order to join
+ * the cluster. It uses the Apache Curator library to interact with ZooKeeper in a simple manner. Specifically,
+ * it uses the {@link ServiceDiscovery} recipe, which makes use of ephemeral nodes in ZK to register services.
+ *
+ * <p>
+ * There are several ways to instantiate the TcpDiscoveryZookeeperIpFinder:
+ * <li>
+ *     <ul>By providing an instance of {@link CuratorFramework} directly, in which case no ZK Connection String
+ *     is required.</ul>
+ *     <ul>By providing a ZK Connection String through {@link #setZkConnectionString(String)}, and optionally
+ *     a {@link RetryPolicy} through the setter. If the latter is not provided, a default
+ *     {@link ExponentialBackoffRetry} policy is used, with a base sleep time of 1000ms and 10 retries.</ul>
+ *     <ul>By providing a ZK Connection String through system property {@link #PROP_ZK_CONNECTION_STRING}. If this
+ *     property is set, it overrides the ZK Connection String passed in as a property, but it does not override
+ *     the {@link CuratorFramework} if provided.</ul>
+ * </li>
+ *
+ * You may customise the base path for services, as well as the service name. By default {@link #BASE_PATH} and
+ * {@link #SERVICE_NAME} are use respectively. You can also choose to enable or disable duplicate registrations. See
+ * {@link #setAllowDuplicateRegistrations(boolean)} for more details.
+ *
+ * @see <a href="http://zookeeper.apache.org">Apache ZooKeeper</a>
+ * @see <a href="http://curator.apache.org">Apache Curator</a>
+ *
+ * @author Raul Kripalani
+ */
+public class TcpDiscoveryZookeeperIpFinder extends TcpDiscoveryIpFinderAdapter {
+
+    /** System property name to provide the ZK Connection String. */
+    public static final String PROP_ZK_CONNECTION_STRING = "IGNITE_ZK_CONNECTION_STRING";
+
+    /** Default base path for service registrations. */
+    private static final String BASE_PATH = "/services";
+
+    /** Default service name for service registrations. */
+    private static final String SERVICE_NAME = "ignite";
+
+    /** Default URI Spec to use with the {@link ServiceDiscoveryBuilder}. */
+    private static final UriSpec URI_SPEC = new UriSpec("{address}:{port}");
+
+    /** Init guard. */
+    @GridToStringExclude
+    private final AtomicBoolean initGuard = new AtomicBoolean();
+
+    /** Logger. */
+    @LoggerResource
+    private IgniteLogger log;
+
+    /** The Curator framework in use, either injected or constructed by this component. */
+    private CuratorFramework curator;
+
+    /** The ZK Connection String if provided by the user. */
+    private String zkConnectionString;
+
+    /** Retry policy to use. */
+    private RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
+
+    /** Base path to use, by default {#link #BASE_PATH}. */
+    private String basePath = BASE_PATH;
+
+    /** Service name to use, by default {#link #SERVICE_NAME}. */
+    private String serviceName = SERVICE_NAME;
+
+    /** Whether to allow or not duplicate registrations. See setter doc. */
+    private boolean allowDuplicateRegistrations = false;
+
+    /** The Service Discovery recipe. */
+    private ServiceDiscovery<IgniteInstanceDetails> discovery;
+
+    /** Map of the {#link ServiceInstance}s we have registered. */
+    private Map<InetSocketAddress, ServiceInstance<IgniteInstanceDetails>> ourInstances = new ConcurrentHashMap<>();
+
+    /** Constructor. */
+    public TcpDiscoveryZookeeperIpFinder() {
+        setShared(true);
+    }
+
+    /** Initializes this IP Finder by creating the appropriate Curator objects. */
+    private void init() {
+        if (!initGuard.compareAndSet(false, true))
+            return;
+
+        String sysPropZkConnString = System.getProperty(PROP_ZK_CONNECTION_STRING);
+
+        if (sysPropZkConnString != null && sysPropZkConnString.trim().length() > 0)
+            zkConnectionString = sysPropZkConnString;
+
+        log.info("Initializing ZooKeeper IP Finder.");
+
+        if (curator == null) {
+            A.notNullOrEmpty(zkConnectionString, String.format("ZooKeeper URL (or system property %s) cannot be null " +
+                "or empty if a CuratorFramework object is not provided explicitly", PROP_ZK_CONNECTION_STRING));
+            curator = CuratorFrameworkFactory.newClient(zkConnectionString, retryPolicy);
+        }
+
+        if (curator.getState() != CuratorFrameworkState.STARTED)
+            curator.start();
+
+        discovery = ServiceDiscoveryBuilder.builder(IgniteInstanceDetails.class)
+            .client(curator)
+            .basePath(basePath)
+            .serializer(new JsonInstanceSerializer<>(IgniteInstanceDetails.class))
+            .build();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onSpiContextDestroyed() {
+        if (!initGuard.compareAndSet(true, false))
+            return;
+
+        log.info("Destroying ZooKeeper IP Finder.");
+
+        super.onSpiContextDestroyed();
+
+        if (curator != null)
+            curator.close();
+
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<InetSocketAddress> getRegisteredAddresses() throws IgniteSpiException {
+        init();
+
+        if (log.isDebugEnabled())
+            log.debug("Getting registered addresses from ZooKeeper IP Finder.");
+
+        Collection<ServiceInstance<IgniteInstanceDetails>> serviceInstances;
+
+        try {
+            serviceInstances = discovery.queryForInstances(serviceName);
+        } catch (Exception e) {
+            log.warning("Error while getting registered addresses from ZooKeeper IP Finder.", e);
+            return Collections.emptyList();
+        }
+
+        Set<InetSocketAddress> answer = new HashSet<>();
+
+        for (ServiceInstance<IgniteInstanceDetails> si : serviceInstances)
+            answer.add(new InetSocketAddress(si.getAddress(), si.getPort()));
+
+        log.info("ZooKeeper IP Finder resolved addresses: " + answer);
+
+        return answer;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void registerAddresses(Collection<InetSocketAddress> addrs) throws IgniteSpiException {
+        init();
+
+        log.info("Registering addresses with ZooKeeper IP Finder: " + addrs);
+
+        Set<InetSocketAddress> registrationsToIgnore = Sets.newHashSet();
+        if (!allowDuplicateRegistrations) {
+            try {
+                for (ServiceInstance<IgniteInstanceDetails> sd : discovery.queryForInstances(serviceName))
+                    registrationsToIgnore.add(new InetSocketAddress(sd.getAddress(), sd.getPort()));
+            }
+            catch (Exception e) {
+                log.warning("Error while finding currently registered services to avoid duplicate registrations", e);
+                throw new IgniteSpiException(e);
+            }
+        }
+
+        for (InetSocketAddress addr : addrs) {
+            if (registrationsToIgnore.contains(addr))
+                continue;
+
+            try {
+                ServiceInstance<IgniteInstanceDetails> si = ServiceInstance.<IgniteInstanceDetails>builder()
+                        .name(serviceName)
+                        .uriSpec(URI_SPEC)
+                        .address(addr.getAddress().getHostAddress())
+                        .port(addr.getPort())
+                        .build();
+
+                ourInstances.put(addr, si);
+
+                discovery.registerService(si);
+
+            } catch (Exception e) {
+                log.warning(String.format("Error while registering an address from ZooKeeper IP Finder " +
+                    "[message=%s,addresses=%s]", e.getMessage(), addr), e);
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void unregisterAddresses(Collection<InetSocketAddress> addrs) throws IgniteSpiException {
+
+        // if curator is not STARTED, we have nothing to unregister, because we are using ephemeral nodes,
+        // which means that our addresses will only be registered in ZK as long as our connection is alive
+        if (curator.getState() != CuratorFrameworkState.STARTED)
+            return;
+
+        log.info("Unregistering addresses with ZooKeeper IP Finder: " + addrs);
+
+        for (InetSocketAddress addr : addrs) {
+            ServiceInstance<IgniteInstanceDetails> si = ourInstances.get(addr);
+            if (si == null) {
+                log.warning("Asked to unregister address from ZooKeeper IP Finder, but no match was found in local " +
+                        "instance map for: " + addrs);
+                continue;
+            }
+
+            try {
+                discovery.unregisterService(si);
+            } catch (Exception e) {
+                log.warning("Error while unregistering an address from ZooKeeper IP Finder: " + addr, e);
+            }
+        }
+    }
+
+    /**
+     * @param curator A {@link CuratorFramework} instance to use. It can already be in <tt>STARTED</tt> state.
+     */
+    public void setCurator(CuratorFramework curator) {
+        this.curator = curator;
+    }
+
+    /**
+     * @return The ZooKeeper connection string, only if set explicitly. Else, it returns null.
+     */
+    public String getZkConnectionString() {
+        return zkConnectionString;
+    }
+
+    /**
+     * @param zkConnectionString ZooKeeper connection string in case a {@link CuratorFramework} is not being set explicitly.
+     */
+    public void setZkConnectionString(String zkConnectionString) {
+        this.zkConnectionString = zkConnectionString;
+    }
+
+    /**
+     * @return Retry policy in use if, and only if, it was set explicitly. Else, it returns null.
+     */
+    public RetryPolicy getRetryPolicy() {
+        return retryPolicy;
+    }
+
+    /**
+     * @param retryPolicy {@link RetryPolicy} to use in case a ZK Connection String is being injected, or if
+     *                    using a system property.
+     */
+    public void setRetryPolicy(RetryPolicy retryPolicy) {
+        this.retryPolicy = retryPolicy;
+    }
+
+    /**
+     * @return Base path for service registration in ZK. Default value: {@link #BASE_PATH}.
+     */
+    public String getBasePath() {
+        return basePath;
+    }
+
+    /**
+     * @param basePath Base path for service registration in ZK. If not passed, {@link #BASE_PATH} will be used.
+     */
+    public void setBasePath(String basePath) {
+        this.basePath = basePath;
+    }
+
+    /**
+     * @return Service name being used, in Curator terms. See {@link #setServiceName(String)} for more information.
+     */
+    public String getServiceName() {
+        return serviceName;
+    }
+
+    /**
+     * @param serviceName Service name to use, as defined by Curator's {#link ServiceDiscovery} recipe. In physical
+     *                    ZK terms, it represents the node under {@link #basePath}, under which services will be
+     *                    registered.
+     */
+    public void setServiceName(String serviceName) {
+        this.serviceName = serviceName;
+    }
+
+    /**
+     * * @return The value of this flag. See {@link #setAllowDuplicateRegistrations(boolean)} for more details.
+     */
+    public boolean isAllowDuplicateRegistrations() {
+        return allowDuplicateRegistrations;
+    }
+
+    /**
+     * @param allowDuplicateRegistrations Whether to register each node only once, or if duplicate registrations
+     *                                    are allowed. Nodes will attempt to register themselves, plus those they
+     *                                    know about. By default, duplicate registrations are not allowed, but you
+     *                                    might want to set this property to <tt>true</tt> if you have multiple
+     *                                    network interfaces or if you are facing troubles.
+     */
+    public void setAllowDuplicateRegistrations(boolean allowDuplicateRegistrations) {
+        this.allowDuplicateRegistrations = allowDuplicateRegistrations;
+    }
+
+    /**
+     * Empty DTO for storing service instances details. Currently acting as a placeholder because Curator requires
+     * a payload type when registering and discovering nodes. May be enhanced in the future with further information
+     * to assist discovery.
+     *
+     * @author Raul Kripalani
+     */
+    @JsonRootName("ignite_instance_details")
+    private class IgniteInstanceDetails {
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d69e7785/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/ZookeeperIpFinderTest.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/ZookeeperIpFinderTest.java b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/ZookeeperIpFinderTest.java
new file mode 100644
index 0000000..ce059a3
--- /dev/null
+++ b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/ZookeeperIpFinderTest.java
@@ -0,0 +1,390 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp.ipfinder.zk;
+
+import org.apache.curator.framework.*;
+import org.apache.curator.retry.*;
+import org.apache.curator.test.*;
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+/**
+ * Test for {@link TcpDiscoveryZookeeperIpFinder}.
+ *
+ * @author Raul Kripalani
+ */
+public class ZookeeperIpFinderTest extends GridCommonAbstractTest {
+
+    /** ZK Cluster size. */
+    private static final int ZK_CLUSTER_SIZE = 3;
+
+    /** ZK Path size. */
+    private static final String SERVICES_IGNITE_ZK_PATH = "/services/ignite";
+
+    /** The ZK cluster instance, from curator-test. */
+    private TestingCluster zkCluster;
+
+    /** A Curator client to perform assertions on the embedded ZK instances. */
+    private CuratorFramework zkCurator;
+
+    /** Whether to allow duplicate registrations for the current test method or not. */
+    private boolean allowDuplicateRegistrations = false;
+
+    /** Constructor that does not start any grids. */
+    public ZookeeperIpFinderTest() {
+        super(false);
+    }
+
+    /**
+     * Before test.
+     * @throws Exception
+     */
+    @Override public void beforeTest() throws Exception {
+        super.beforeTest();
+
+        // remove stale system properties
+        System.getProperties().remove(TcpDiscoveryZookeeperIpFinder.PROP_ZK_CONNECTION_STRING);
+
+        // start the ZK cluster
+        zkCluster = new TestingCluster(ZK_CLUSTER_SIZE);
+        zkCluster.start();
+
+        // start the Curator client so we can perform assertions on the ZK state later
+        zkCurator = CuratorFrameworkFactory.newClient(zkCluster.getConnectString(), new RetryNTimes(10, 1000));
+        zkCurator.start();
+
+    }
+
+    /**
+     * After test.
+     * @throws Exception
+     */
+    @Override public void afterTest() throws Exception {
+        super.afterTest();
+
+        if (zkCurator != null)
+            zkCurator.close();
+
+        if (zkCluster != null) {
+            zkCluster.stop();
+            zkCluster.close();
+        }
+
+        stopAllGrids();
+
+    }
+
+    /**
+     * Enhances the default configuration with the {#TcpDiscoveryZookeeperIpFinder}.
+     *
+     * @param gridName Grid name.
+     * @return
+     * @throws Exception
+     */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration configuration = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi tcpDisco = (TcpDiscoverySpi) configuration.getDiscoverySpi();
+        TcpDiscoveryZookeeperIpFinder zkIpFinder = new TcpDiscoveryZookeeperIpFinder();
+        zkIpFinder.setAllowDuplicateRegistrations(isAllowDuplicateRegistrations());
+
+        // first node => configure with zkUrl; second node => configure with CuratorFramework; third and subsequent
+        // shall be configured through system property
+        if (gridName.equals(getTestGridName(0))) {
+            zkIpFinder.setZkConnectionString(zkCluster.getConnectString());
+        }
+        else if (gridName.equals(getTestGridName(1))) {
+            zkIpFinder.setCurator(CuratorFrameworkFactory.newClient(zkCluster.getConnectString(),
+                new ExponentialBackoffRetry(100, 5)));
+        }
+
+        tcpDisco.setIpFinder(zkIpFinder);
+        return configuration;
+    }
+
+    /**
+     * @throws Exception
+     */
+    public void testOneIgniteNodeIsAlone() throws Exception {
+        startGrid(0);
+
+        assertEquals(1, grid(0).cluster().metrics().getTotalNodes());
+
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception
+     */
+    public void testTwoIgniteNodesFindEachOther() throws Exception {
+        // start one node
+        startGrid(0);
+
+        // set up an event listener to expect one NODE_JOINED event
+        CountDownLatch latch =  expectJoinEvents(grid(0), 1);
+
+        // start the other node
+        startGrid(1);
+
+        // assert the nodes see each other
+        assertEquals(2, grid(0).cluster().metrics().getTotalNodes());
+        assertEquals(2, grid(1).cluster().metrics().getTotalNodes());
+
+        // assert the event listener got as many events as expected
+        latch.await(1, TimeUnit.SECONDS);
+
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception
+     */
+    public void testThreeNodesWithThreeDifferentConfigMethods() throws Exception {
+        // start one node
+        startGrid(0);
+
+        // set up an event listener to expect one NODE_JOINED event
+        CountDownLatch latch =  expectJoinEvents(grid(0), 2);
+
+        // start the 2nd node
+        startGrid(1);
+
+        // start the 3rd node, first setting the system property
+        System.setProperty(TcpDiscoveryZookeeperIpFinder.PROP_ZK_CONNECTION_STRING, zkCluster.getConnectString());
+        startGrid(2);
+
+        // wait until all grids are started
+        waitForRemoteNodes(grid(0), 2);
+
+        // assert the nodes see each other
+        assertEquals(3, grid(0).cluster().metrics().getTotalNodes());
+        assertEquals(3, grid(1).cluster().metrics().getTotalNodes());
+        assertEquals(3, grid(2).cluster().metrics().getTotalNodes());
+
+        // assert the event listener got as many events as expected
+        latch.await(1, TimeUnit.SECONDS);
+
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception
+     */
+    public void testFourNodesStartingAndStopping() throws Exception {
+        // start one node
+        startGrid(0);
+
+        // set up an event listener to expect one NODE_JOINED event
+        CountDownLatch latch =  expectJoinEvents(grid(0), 3);
+
+        // start the 2nd node
+        startGrid(1);
+
+        // start the 3rd & 4th nodes, first setting the system property
+        System.setProperty(TcpDiscoveryZookeeperIpFinder.PROP_ZK_CONNECTION_STRING, zkCluster.getConnectString());
+        startGrid(2);
+        startGrid(3);
+
+        // wait until all grids are started
+        waitForRemoteNodes(grid(0), 3);
+
+        // assert the nodes see each other
+        assertEquals(4, grid(0).cluster().metrics().getTotalNodes());
+        assertEquals(4, grid(1).cluster().metrics().getTotalNodes());
+        assertEquals(4, grid(2).cluster().metrics().getTotalNodes());
+        assertEquals(4, grid(3).cluster().metrics().getTotalNodes());
+
+        // assert the event listener got as many events as expected
+        latch.await(1, TimeUnit.SECONDS);
+
+        // stop the first grid
+        stopGrid(0);
+
+        // make sure that nodes were synchronized; they should only see 3 now
+        assertEquals(3, grid(1).cluster().metrics().getTotalNodes());
+        assertEquals(3, grid(2).cluster().metrics().getTotalNodes());
+        assertEquals(3, grid(3).cluster().metrics().getTotalNodes());
+
+        // stop all remaining grids
+        stopGrid(1);
+        stopGrid(2);
+        stopGrid(3);
+
+        // check that the nodes are gone in ZK
+        assertEquals(0, zkCurator.getChildren().forPath(SERVICES_IGNITE_ZK_PATH).size());
+    }
+
+    /**
+     * @throws Exception
+     */
+    public void testFourNodesWithDuplicateRegistrations() throws Exception {
+        setAllowDuplicateRegistrations(true);
+
+        // start 4 nodes
+        System.setProperty(TcpDiscoveryZookeeperIpFinder.PROP_ZK_CONNECTION_STRING, zkCluster.getConnectString());
+        startGrids(4);
+
+        // wait until all grids are started
+        waitForRemoteNodes(grid(0), 3);
+
+        // each node will register itself + the node that it connected to to join the cluster
+        assertEquals(7, zkCurator.getChildren().forPath(SERVICES_IGNITE_ZK_PATH).size());
+
+        // stop all grids
+        stopAllGrids();
+
+        // check that all nodes are gone in ZK
+        assertEquals(0, zkCurator.getChildren().forPath(SERVICES_IGNITE_ZK_PATH).size());
+    }
+
+    /**
+     * @throws Exception
+     */
+    public void testFourNodesWithNoDuplicateRegistrations() throws Exception {
+        setAllowDuplicateRegistrations(false);
+
+        // start 4 nodes
+        System.setProperty(TcpDiscoveryZookeeperIpFinder.PROP_ZK_CONNECTION_STRING, zkCluster.getConnectString());
+        startGrids(4);
+
+        // wait until all grids are started
+        waitForRemoteNodes(grid(0), 3);
+
+        // each node will only register itself
+        assertEquals(4, zkCurator.getChildren().forPath(SERVICES_IGNITE_ZK_PATH).size());
+
+        // stop all grids
+        stopAllGrids();
+
+        // check that all nodes are gone in ZK
+        assertEquals(0, zkCurator.getChildren().forPath(SERVICES_IGNITE_ZK_PATH).size());
+    }
+
+    /**
+     * @throws Exception
+     */
+    public void testFourNodesRestartLastSeveralTimes() throws Exception {
+        setAllowDuplicateRegistrations(false);
+
+        // start 4 nodes
+        System.setProperty(TcpDiscoveryZookeeperIpFinder.PROP_ZK_CONNECTION_STRING, zkCluster.getConnectString());
+        startGrids(4);
+
+        // wait until all grids are started
+        waitForRemoteNodes(grid(0), 3);
+
+        // each node will only register itself
+        assertEquals(4, zkCurator.getChildren().forPath(SERVICES_IGNITE_ZK_PATH).size());
+
+        // repeat 5 times
+        for (int i = 0; i < 5; i++) {
+            // stop last grid
+            stopGrid(2);
+
+            // check that the node has unregistered itself and its party
+            assertEquals(3, zkCurator.getChildren().forPath(SERVICES_IGNITE_ZK_PATH).size());
+
+            // start the node again
+            startGrid(2);
+
+            // check that the node back in ZK
+            assertEquals(4, zkCurator.getChildren().forPath(SERVICES_IGNITE_ZK_PATH).size());
+        }
+
+        stopAllGrids();
+
+        assertEquals(0, zkCurator.getChildren().forPath(SERVICES_IGNITE_ZK_PATH).size());
+
+    }
+
+    /**
+     * @throws Exception
+     */
+    public void testFourNodesKillRestartZookeeper() throws Exception {
+        setAllowDuplicateRegistrations(false);
+
+        // start 4 nodes
+        System.setProperty(TcpDiscoveryZookeeperIpFinder.PROP_ZK_CONNECTION_STRING, zkCluster.getConnectString());
+        startGrids(4);
+
+        // wait until all grids are started
+        waitForRemoteNodes(grid(0), 3);
+
+        // each node will only register itself
+        assertEquals(4, zkCurator.getChildren().forPath(SERVICES_IGNITE_ZK_PATH).size());
+
+        // remember ZK server configuration and stop the cluster
+        Collection<InstanceSpec> instances = zkCluster.getInstances();
+        zkCluster.stop();
+        Thread.sleep(1000);
+
+        // start the cluster with the previous configuration
+        zkCluster = new TestingCluster(instances);
+        zkCluster.start();
+
+        // block the client until connected
+        zkCurator.blockUntilConnected();
+
+        // check that the nodes have registered again
+        assertEquals(4, zkCurator.getChildren().forPath(SERVICES_IGNITE_ZK_PATH).size());
+
+        // stop all grids
+        stopAllGrids();
+        Thread.sleep(2000);
+
+        // check that all nodes are gone in ZK
+        assertEquals(0, zkCurator.getChildren().forPath(SERVICES_IGNITE_ZK_PATH).size());
+    }
+
+    /**
+     * @throws Exception
+     */
+    private CountDownLatch expectJoinEvents(Ignite ignite, int joinEventCount) {
+        final CountDownLatch latch = new CountDownLatch(joinEventCount);
+
+        ignite.events().remoteListen(new IgniteBiPredicate<UUID, Event>() {
+            @Override public boolean apply(UUID uuid, Event event) {
+                latch.countDown();
+                return true;
+            }
+        }, null, EventType.EVT_NODE_JOINED);
+
+        return latch;
+    }
+
+    /**
+     * @throws Exception
+     */
+    public void setAllowDuplicateRegistrations(boolean allowDuplicateRegistrations) {
+        this.allowDuplicateRegistrations = allowDuplicateRegistrations;
+    }
+
+    /**
+     * @throws Exception
+     */
+    public boolean isAllowDuplicateRegistrations() {
+        return allowDuplicateRegistrations;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d69e7785/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index f3a5d65..fa3eaa4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -75,6 +75,7 @@
         <module>modules/kafka</module>
         <module>modules/yarn</module>
         <module>modules/jms11</module>
+        <module>modules/zookeeper</module>
     </modules>
 
     <profiles>