You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/11/22 14:41:23 UTC

[3/5] ignite git commit: zk

http://git-wip-us.apache.org/repos/asf/ignite/blob/42bbed0a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
new file mode 100644
index 0000000..8b3a117
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
@@ -0,0 +1,994 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.curator.test.TestingCluster;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.events.EventType;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.IgnitionEx;
+import org.apache.ignite.internal.managers.discovery.DiscoveryLocalJoinData;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.marshaller.jdk.JdkMarshaller;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.spi.discovery.zk.ZookeeperDiscoverySpi;
+import org.apache.ignite.spi.discovery.zk.internal.ZookeeperClient;
+import org.apache.ignite.spi.discovery.zk.internal.ZookeeperDiscoveryImpl;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.zookeeper.ZkTestClientCnxnSocketNIO;
+import org.apache.zookeeper.ZooKeeper;
+
+import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
+import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
+import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
+import static org.apache.ignite.spi.discovery.zk.internal.ZookeeperDiscoveryImpl.IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_THRESHOLD;
+import static org.apache.zookeeper.ZooKeeper.ZOOKEEPER_CLIENT_CNXN_SOCKET;
+
+/**
+ *
+ */
+public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest {
+    /** */
+    private static TestingCluster zkCluster;
+
+    /** */
+    private static final boolean USE_TEST_CLUSTER = true;
+
+    /** */
+    private boolean client;
+
+    /** */
+    private static ConcurrentHashMap<UUID, Map<Long, DiscoveryEvent>> evts = new ConcurrentHashMap<>();
+
+    /** */
+    private static volatile boolean err;
+
+    /** */
+    private boolean testSockNio;
+
+    /** */
+    private int sesTimeout;
+
+    /** */
+    private ConcurrentHashMap<String, ZookeeperDiscoverySpi> spis = new ConcurrentHashMap<>();
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        if (testSockNio)
+            System.setProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET, ZkTestClientCnxnSocketNIO.class.getName());
+
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        cfg.setConsistentId(igniteInstanceName);
+
+        ZookeeperDiscoverySpi zkSpi = new ZookeeperDiscoverySpi();
+
+        zkSpi.setSessionTimeout(sesTimeout > 0 ? sesTimeout : 10_000);
+
+        spis.put(igniteInstanceName, zkSpi);
+
+        if (USE_TEST_CLUSTER) {
+            assert zkCluster != null;
+
+            zkSpi.setZkConnectionString(zkCluster.getConnectString());
+        }
+        else
+            zkSpi.setZkConnectionString("localhost:2181");
+
+        cfg.setDiscoverySpi(zkSpi);
+
+        CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME);
+
+        ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+
+        cfg.setCacheConfiguration(ccfg);
+
+        // cfg.setMarshaller(new JdkMarshaller());
+
+        cfg.setClientMode(client);
+
+        Map<IgnitePredicate<? extends Event>, int[]> lsnrs = new HashMap<>();
+
+        lsnrs.put(new IgnitePredicate<Event>() {
+            /** */
+            @IgniteInstanceResource
+            private Ignite ignite;
+
+            @Override public boolean apply(Event evt) {
+                try {
+                    DiscoveryEvent discoveryEvt = (DiscoveryEvent)evt;
+
+                    UUID locId = ignite.cluster().localNode().id();
+
+                    Map<Long, DiscoveryEvent> nodeEvts = evts.get(locId);
+
+                    if (nodeEvts == null) {
+                        Object old = evts.put(locId, nodeEvts = new TreeMap<>());
+
+                        assertNull(old);
+
+                        synchronized (nodeEvts) {
+                            DiscoveryLocalJoinData locJoin = ((IgniteKernal)ignite).context().discovery().localJoin();
+
+                            nodeEvts.put(locJoin.event().topologyVersion(), locJoin.event());
+                        }
+                    }
+
+                    synchronized (nodeEvts) {
+                        DiscoveryEvent old = nodeEvts.put(discoveryEvt.topologyVersion(), discoveryEvt);
+
+                        assertNull(old);
+                    }
+                }
+                catch (Throwable e) {
+                    err = true;
+
+                    info("Unexpected error: " + e);
+                }
+
+                return true;
+            }
+        }, new int[]{EVT_NODE_JOINED, EVT_NODE_FAILED, EVT_NODE_LEFT});
+
+        cfg.setLocalEventListeners(lsnrs);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        if (USE_TEST_CLUSTER) {
+            zkCluster = new TestingCluster(3);
+            zkCluster.start();
+        }
+
+        System.setProperty(IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_THRESHOLD, "1");
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        System.clearProperty(IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_THRESHOLD);
+
+        if (zkCluster != null) {
+            try {
+                zkCluster.close();
+            }
+            catch (Exception e) {
+                U.error(log, "Failed to stop Zookeeper client: " + e, e);
+            }
+
+            zkCluster = null;
+        }
+
+        super.afterTestsStopped();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        reset();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        try {
+            assertFalse("Unexpected error, see log for details", err);
+
+            checkEventsConsistency();
+        }
+        finally {
+            reset();
+
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testClientNodesStatus() throws Exception {
+        startGrid(0);
+
+        for (Ignite node : G.allGrids()) {
+            assertEquals(0, node.cluster().forClients().nodes().size());
+            assertEquals(1, node.cluster().forServers().nodes().size());
+        }
+
+        client = true;
+
+        startGrid(1);
+
+        for (Ignite node : G.allGrids()) {
+            assertEquals(1, node.cluster().forClients().nodes().size());
+            assertEquals(1, node.cluster().forServers().nodes().size());
+        }
+
+        client = false;
+
+        startGrid(2);
+
+        client = true;
+
+        startGrid(3);
+
+        for (Ignite node : G.allGrids()) {
+            assertEquals(2, node.cluster().forClients().nodes().size());
+            assertEquals(2, node.cluster().forServers().nodes().size());
+        }
+
+        stopGrid(1);
+
+        waitForTopology(3);
+
+        for (Ignite node : G.allGrids()) {
+            assertEquals(1, node.cluster().forClients().nodes().size());
+            assertEquals(2, node.cluster().forServers().nodes().size());
+        }
+
+        stopGrid(2);
+
+        waitForTopology(2);
+
+        for (Ignite node : G.allGrids()) {
+            assertEquals(1, node.cluster().forClients().nodes().size());
+            assertEquals(1, node.cluster().forServers().nodes().size());
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testStopNode_1() throws Exception {
+        startGrids(5);
+
+        waitForTopology(5);
+
+        stopGrid(3);
+
+        waitForTopology(4);
+
+        startGrid(3);
+
+        waitForTopology(5);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCustomEventsSimple1_SingleNode() throws Exception {
+        Ignite srv0 = startGrid(0);
+
+        srv0.createCache(new CacheConfiguration<>("c1"));
+
+        waitForEventsAcks(srv0);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCustomEventsSimple1_5_Nodes() throws Exception {
+        Ignite srv0 = startGrids(5);
+
+        srv0.createCache(new CacheConfiguration<>("c1"));
+
+        awaitPartitionMapExchange();
+
+        waitForEventsAcks(srv0);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testSegmentation1() throws Exception {
+        sesTimeout = 1000;
+        testSockNio = true;
+
+        Ignite node0 = startGrid(0);
+
+        final CountDownLatch l = new CountDownLatch(1);
+
+        node0.events().localListen(new IgnitePredicate<Event>() {
+            @Override public boolean apply(Event event) {
+                l.countDown();
+
+                return false;
+            }
+        }, EventType.EVT_NODE_SEGMENTED);
+
+        ZkTestClientCnxnSocketNIO c0 = ZkTestClientCnxnSocketNIO.forNode(node0);
+
+        c0.closeSocket(true);
+
+        for (int i = 0; i < 10; i++) {
+            Thread.sleep(1_000);
+
+            if (l.getCount() == 0)
+                break;
+        }
+
+        info("Allow connect");
+
+        c0.allowConnect();
+
+        assertTrue(l.await(10, TimeUnit.SECONDS));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testConnectionRestore1() throws Exception {
+        testSockNio = true;
+
+        Ignite node0 = startGrid(0);
+
+        ZkTestClientCnxnSocketNIO c0 = ZkTestClientCnxnSocketNIO.forNode(node0);
+
+        c0.closeSocket(false);
+
+        startGrid(1);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testConnectionRestore2() throws Exception {
+        testSockNio = true;
+
+        Ignite node0 = startGrid(0);
+
+        ZkTestClientCnxnSocketNIO c0 = ZkTestClientCnxnSocketNIO.forNode(node0);
+
+        c0.closeSocket(false);
+
+        startGridsMultiThreaded(1, 5);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testConnectionRestore_NonCoordinator1() throws Exception {
+        connectionRestore_NonCoordinator(false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testConnectionRestore_NonCoordinator2() throws Exception {
+        connectionRestore_NonCoordinator(true);
+    }
+
+    /**
+     * @param failWhenDisconnected {@code True} if fail node while another node is disconnected.
+     * @throws Exception If failed.
+     */
+    private void connectionRestore_NonCoordinator(boolean failWhenDisconnected) throws Exception {
+        testSockNio = true;
+
+        Ignite node0 = startGrid(0);
+        Ignite node1 = startGrid(1);
+
+        ZkTestClientCnxnSocketNIO c1 = ZkTestClientCnxnSocketNIO.forNode(node1);
+
+        c1.closeSocket(true);
+
+        IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                try {
+                    startGrid(2);
+                }
+                catch (Exception e) {
+                    info("Start error: " + e);
+                }
+
+                return null;
+            }
+        }, "start-node");
+
+        checkEvents(node0, joinEvent(3));
+
+        if (failWhenDisconnected) {
+            ZookeeperDiscoverySpi spi = spis.get(getTestIgniteInstanceName(2));
+
+            closeZkClient(spi);
+
+            checkEvents(node0, failEvent(4));
+        }
+
+        c1.allowConnect();
+
+        checkEvents(ignite(1), joinEvent(3));
+
+        if (failWhenDisconnected) {
+            checkEvents(ignite(1), failEvent(4));
+
+            IgnitionEx.stop(getTestIgniteInstanceName(2), true, true);
+        }
+
+        fut.get();
+
+        waitForTopology(failWhenDisconnected ? 2 : 3);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testConnectionRestore_Coordinator1() throws Exception {
+        connectionRestore_Coordinator(1, 1, 0);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testConnectionRestore_Coordinator1_1() throws Exception {
+        connectionRestore_Coordinator(1, 1, 1);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testConnectionRestore_Coordinator2() throws Exception {
+        connectionRestore_Coordinator(1, 3, 0);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testConnectionRestore_Coordinator3() throws Exception {
+        connectionRestore_Coordinator(3, 3, 0);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testConnectionRestore_Coordinator4() throws Exception {
+        connectionRestore_Coordinator(3, 3, 1);
+    }
+
+    /**
+     * @param initNodes Number of initially started nodes.
+     * @param startNodes Number of nodes to start after coordinator loose connection.
+     * @param failCnt Number of nodes to stop after coordinator loose connection.
+     * @throws Exception If failed.
+     */
+    private void connectionRestore_Coordinator(int initNodes, int startNodes, int failCnt) throws Exception {
+        sesTimeout = 30_000;
+        testSockNio = true;
+
+        Ignite node0 = startGrids(initNodes);
+
+        ZkTestClientCnxnSocketNIO c0 = ZkTestClientCnxnSocketNIO.forNode(node0);
+
+        c0.closeSocket(true);
+
+        final AtomicInteger nodeIdx = new AtomicInteger(initNodes);
+
+        IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                try {
+                    startGrid(nodeIdx.getAndIncrement());
+                }
+                catch (Exception e) {
+                    error("Start failed: " + e);
+                }
+
+                return null;
+            }
+        }, startNodes, "start-node");
+
+        int cnt = 0;
+
+        DiscoveryEvent[] expEvts = new DiscoveryEvent[startNodes - failCnt];
+
+        int expEvtCnt = 0;
+
+        sesTimeout = 1000;
+
+        List<ZkTestClientCnxnSocketNIO> blockedC = new ArrayList<>();
+
+        final List<String> failedZkNodes = new ArrayList<>(failCnt);
+
+        for (int i = initNodes; i < initNodes + startNodes; i++) {
+            ZookeeperDiscoverySpi spi = waitSpi(getTestIgniteInstanceName(i));
+
+            ZookeeperDiscoveryImpl impl = GridTestUtils.getFieldValue(spi, "impl");
+
+            impl.waitConnectStart();
+
+            if (cnt++ < failCnt) {
+                ZkTestClientCnxnSocketNIO c = ZkTestClientCnxnSocketNIO.forNode(getTestIgniteInstanceName(i));
+
+                c.closeSocket(true);
+
+                blockedC.add(c);
+
+                failedZkNodes.add((String)GridTestUtils.getFieldValue(impl, "locNodeZkPath"));
+            }
+            else {
+                expEvts[expEvtCnt] = joinEvent(initNodes + expEvtCnt + 1);
+
+                expEvtCnt++;
+            }
+        }
+
+        final ZookeeperClient zkClient = new ZookeeperClient(log, zkCluster.getConnectString(), 10_000, null);
+
+        try {
+            assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                @Override public boolean apply() {
+                    try {
+                        List<String> c = zkClient.getChildren("/apacheIgnite/default/alive");
+
+                        for (String failedZkNode : failedZkNodes) {
+                            if (c.contains(failedZkNode))
+                                return false;
+                        }
+
+                        return true;
+                    }
+                    catch (Exception e) {
+                        fail();
+
+                        return true;
+                    }
+                }
+            }, 10_000));
+        }
+        finally {
+            zkClient.close();
+        }
+
+        c0.allowConnect();
+
+        for (ZkTestClientCnxnSocketNIO c : blockedC)
+            c.allowConnect();
+
+        if (expEvts.length > 0) {
+            for (int i = 0; i < initNodes; i++)
+                checkEvents(ignite(i), expEvts);
+        }
+
+        fut.get();
+
+        waitForTopology(initNodes + startNodes - failCnt);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testClusterRestart() throws Exception {
+        startGridsMultiThreaded(3, false);
+
+        stopAllGrids();
+
+        evts.clear();
+
+        startGridsMultiThreaded(3, false);
+
+        waitForTopology(3);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testConnectionRestore4() throws Exception {
+        testSockNio = true;
+
+        Ignite node0 = startGrid(0);
+
+        ZkTestClientCnxnSocketNIO c0 = ZkTestClientCnxnSocketNIO.forNode(node0);
+
+        c0.closeSocket(false);
+
+        startGrid(1);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testStartStop_1_Node() throws Exception {
+        startGrid(0);
+
+        waitForTopology(1);
+
+        stopGrid(0);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRestarts_2_Nodes() throws Exception {
+        startGrid(0);
+
+        for (int i = 0; i < 10; i++) {
+            info("Iteration: " + i);
+
+            startGrid(1);
+
+            waitForTopology(2);
+
+            stopGrid(1);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testStartStop_2_Nodes_WithCache() throws Exception {
+        startGrids(2);
+
+        for (Ignite node : G.allGrids()) {
+            IgniteCache cache = node.cache(DEFAULT_CACHE_NAME);
+
+            assertNotNull(cache);
+
+            for (int i = 0; i < 100; i++) {
+                cache.put(i, node.name());
+
+                assertEquals(node.name(), cache.get(i));
+            }
+        }
+
+        awaitPartitionMapExchange();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testStartStop_2_Nodes() throws Exception {
+        startGrid(0);
+
+        waitForTopology(1);
+
+        startGrid(1);
+
+        waitForTopology(2);
+
+        for (Ignite node : G.allGrids())
+            node.compute().broadcast(new DummyCallable(null));
+
+        awaitPartitionMapExchange();
+
+        waitForEventsAcks(ignite(0));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testStartStop1() throws Exception {
+        startGridsMultiThreaded(5, false);
+
+        waitForTopology(5);
+
+        awaitPartitionMapExchange();
+
+        waitForEventsAcks(ignite(0));
+
+        stopGrid(0);
+
+        waitForTopology(4);
+
+        for (Ignite node : G.allGrids())
+            node.compute().broadcast(new DummyCallable(null));
+
+        startGrid(0);
+
+        waitForTopology(5);
+
+        awaitPartitionMapExchange();
+
+        waitForEventsAcks(grid(CU.oldest(ignite(1).cluster().nodes())));
+    }
+
+    /**
+     * @param node Node.
+     * @throws Exception If failed.
+     */
+    private void waitForEventsAcks(final Ignite node) throws Exception {
+        assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                Map<Object, Object> evts = GridTestUtils.getFieldValue(node.configuration().getDiscoverySpi(),
+                    "impl", "evtsData", "evts");
+
+                if (!evts.isEmpty()) {
+                    info("Unacked events: " + evts);
+
+                    return false;
+                }
+
+                return true;
+            }
+        }, 10_000));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testStartStop2() throws Exception {
+        startGridsMultiThreaded(10, false);
+
+        GridTestUtils.runMultiThreaded(new IgniteInClosure<Integer>() {
+            @Override public void apply(Integer idx) {
+                stopGrid(idx);
+            }
+        }, 3, "stop-node-thread");
+
+        waitForTopology(7);
+
+        startGridsMultiThreaded(0, 3);
+
+        waitForTopology(10);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testStartStopWithClients() throws Exception {
+        final int SRVS = 3;
+
+        startGrids(SRVS);
+
+        client = true;
+
+        final int THREADS = 30;
+
+        for (int i = 0; i < 5; i++) {
+            info("Iteration: " + i);
+
+            startGridsMultiThreaded(SRVS, THREADS);
+
+            waitForTopology(SRVS + THREADS);
+
+            GridTestUtils.runMultiThreaded(new IgniteInClosure<Integer>() {
+                @Override public void apply(Integer idx) {
+                    stopGrid(idx + SRVS);
+                }
+            }, THREADS, "stop-node");
+
+            waitForTopology(SRVS);
+
+            checkEventsConsistency();
+        }
+    }
+
+    /**
+     *
+     */
+    private void reset() {
+        System.clearProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET);
+
+        ZkTestClientCnxnSocketNIO.reset();
+
+        System.clearProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET);
+
+        err = false;
+
+        evts.clear();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void checkEventsConsistency() throws Exception {
+        for (Map.Entry<UUID, Map<Long, DiscoveryEvent>> nodeEvtEntry : evts.entrySet()) {
+            UUID nodeId = nodeEvtEntry.getKey();
+            Map<Long, DiscoveryEvent> nodeEvts = nodeEvtEntry.getValue();
+
+            for (Map.Entry<UUID, Map<Long, DiscoveryEvent>> nodeEvtEntry0 : evts.entrySet()) {
+                if (!nodeId.equals(nodeEvtEntry0.getKey())) {
+                    Map<Long, DiscoveryEvent> nodeEvts0 = nodeEvtEntry0.getValue();
+
+                    synchronized (nodeEvts) {
+                        synchronized (nodeEvts0) {
+                            checkEventsConsistency(nodeEvts, nodeEvts0);
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * @param evts1 Received events.
+     * @param evts2 Received events.
+     */
+    private void checkEventsConsistency(Map<Long, DiscoveryEvent> evts1, Map<Long, DiscoveryEvent> evts2) {
+        for (Map.Entry<Long, DiscoveryEvent> e1 : evts1.entrySet()) {
+            DiscoveryEvent evt1 = e1.getValue();
+            DiscoveryEvent evt2 = evts2.get(e1.getKey());
+
+            if (evt2 != null) {
+                assertEquals(evt1.topologyVersion(), evt2.topologyVersion());
+                assertEquals(evt1.eventNode(), evt2.eventNode());
+                assertEquals(evt1.topologyNodes(), evt2.topologyNodes());
+            }
+        }
+    }
+
+    /**
+     * @param nodeName Node name.
+     * @return Node's discovery SPI.
+     * @throws Exception If failed.
+     */
+    private ZookeeperDiscoverySpi waitSpi(final String nodeName) throws Exception {
+        GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                return spis.contains(nodeName);
+            }
+        }, 5000);
+
+        ZookeeperDiscoverySpi spi = spis.get(nodeName);
+
+        assertNotNull("Failed to get SPI for node: " + nodeName, spi);
+
+        return spi;
+    }
+
+    private static DiscoveryEvent joinEvent(long topVer) {
+        DiscoveryEvent expEvt = new DiscoveryEvent(null, null, EventType.EVT_NODE_JOINED, null);
+
+        expEvt.topologySnapshot(topVer, null);
+
+        return expEvt;
+    }
+
+    private static DiscoveryEvent failEvent(long topVer) {
+        DiscoveryEvent expEvt = new DiscoveryEvent(null, null, EventType.EVT_NODE_FAILED, null);
+
+        expEvt.topologySnapshot(topVer, null);
+
+        return expEvt;
+    }
+
+    /**
+     * @param node Node.
+     * @param expEvts Expected events.
+     * @throws Exception If fialed.
+     */
+    private void checkEvents(final Ignite node, final DiscoveryEvent...expEvts) throws Exception {
+        checkEvents(node.cluster().localNode().id(), expEvts);
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param expEvts Expected events.
+     * @throws Exception If failed.
+     */
+    private void checkEvents(final UUID nodeId, final DiscoveryEvent...expEvts) throws Exception {
+        assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                Map<Long, DiscoveryEvent> nodeEvts = evts.get(nodeId);
+
+                if (nodeEvts == null) {
+                    info("No events for node: " + nodeId);
+
+                    return false;
+                }
+
+                synchronized (nodeEvts) {
+                    for (DiscoveryEvent expEvt : expEvts) {
+                        DiscoveryEvent evt0 = nodeEvts.get(expEvt.topologyVersion());
+
+                        if (evt0 == null) {
+                            info("No event for version: " + expEvt.topologyVersion());
+
+                            return false;
+                        }
+
+                        assertEquals(expEvt.type(), evt0.type());
+                    }
+                }
+
+                return true;
+            }
+        }, 10000));
+    }
+
+    /**
+     * @param spi Spi instance.
+     */
+    private void closeZkClient(ZookeeperDiscoverySpi spi) {
+        ZooKeeper zk = GridTestUtils.getFieldValue(spi, "impl", "zkClient", "zk");
+
+        try {
+            zk.close();
+        }
+        catch (Exception e) {
+            fail("Unexpected error: " + e);
+        }
+    }
+
+    /**
+     * @param expSize Expected nodes number.
+     * @throws Exception If failed.
+     */
+    private void waitForTopology(final int expSize) throws Exception {
+        assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                List<Ignite> nodes = G.allGrids();
+
+                if (nodes.size() != expSize) {
+                    info("Wait all nodes [size=" + nodes.size() + ", exp=" + expSize + ']');
+
+                    return false;
+                }
+
+                for (Ignite node: nodes) {
+                    int sizeOnNode = node.cluster().nodes().size();
+
+                    if (sizeOnNode != expSize) {
+                        info("Wait for size on node [node=" + node.name() + ", size=" + sizeOnNode + ", exp=" + expSize + ']');
+
+                        return false;
+                    }
+                }
+
+                return true;
+            }
+        }, 5000));
+    }
+
+    /**
+     *
+     */
+    private static class DummyCallable implements IgniteCallable<Object> {
+        /** */
+        private byte[] data;
+
+        /**
+         * @param data Data.
+         */
+        DummyCallable(byte[] data) {
+            this.data = data;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object call() throws Exception {
+            return data;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/42bbed0a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
index 4965d16..de8d0ad 100755
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
@@ -43,6 +43,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import javax.cache.configuration.Factory;
 import javax.cache.configuration.FactoryBuilder;
 import junit.framework.TestCase;
+import org.apache.curator.test.TestingCluster;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
@@ -96,6 +97,7 @@ import org.apache.ignite.spi.discovery.tcp.TestTcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.spi.discovery.zk.ZookeeperDiscoverySpi;
 import org.apache.ignite.spi.eventstorage.memory.MemoryEventStorageSpi;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.config.GridTestProperties;
@@ -831,6 +833,21 @@ public abstract class GridAbstractTest extends TestCase {
     protected Ignite startGrid(String igniteInstanceName, GridSpringResourceContext ctx) throws Exception {
         return startGrid(igniteInstanceName, optimize(getConfiguration(igniteInstanceName)), ctx);
     }
+
+    /** */
+    private static TestingCluster zkCluster;
+
+    static {
+        zkCluster = new TestingCluster(1);
+
+        try {
+            zkCluster.start();
+        }
+        catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
     /**
      * Starts new grid with given name.
      *
@@ -845,12 +862,19 @@ public abstract class GridAbstractTest extends TestCase {
             startingIgniteInstanceName.set(igniteInstanceName);
 
             try {
+                ZookeeperDiscoverySpi zkSpi = new ZookeeperDiscoverySpi();
+
+                zkSpi.setZkConnectionString(zkCluster.getConnectString());
+
+                cfg.setDiscoverySpi(zkSpi);
+
                 Ignite node = IgnitionEx.start(cfg, ctx);
 
                 IgniteConfiguration nodeCfg = node.configuration();
 
                 log.info("Node started with the following configuration [id=" + node.cluster().localNode().id()
                     + ", marshaller=" + nodeCfg.getMarshaller()
+                    + ", discovery=" + nodeCfg.getDiscoverySpi()
                     + ", binaryCfg=" + nodeCfg.getBinaryConfiguration()
                     + ", lateAff=" + nodeCfg.isLateAffinityAssignment() + "]");
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/42bbed0a/modules/core/src/test/java/org/apache/zookeeper/ZkTestClientCnxnSocketNIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/zookeeper/ZkTestClientCnxnSocketNIO.java b/modules/core/src/test/java/org/apache/zookeeper/ZkTestClientCnxnSocketNIO.java
new file mode 100644
index 0000000..c8886af
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/zookeeper/ZkTestClientCnxnSocketNIO.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.channels.SelectionKey;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.logger.java.JavaLogger;
+import org.apache.ignite.testframework.GridTestUtils;
+
+/**
+ *
+ */
+public class ZkTestClientCnxnSocketNIO extends ClientCnxnSocketNIO {
+    /** */
+    public static final IgniteLogger log = new JavaLogger().getLogger(ZkTestClientCnxnSocketNIO.class);
+
+    /** */
+    public volatile CountDownLatch blockConnectLatch;
+
+    /** */
+    public static ConcurrentHashMap<String, ZkTestClientCnxnSocketNIO> clients = new ConcurrentHashMap<>();
+
+    /** */
+    private final String nodeName;
+
+    /**
+     *
+     */
+    public static void reset() {
+        clients.clear();
+    }
+
+    /**
+     * @param node Node.
+     * @return ZK client.
+     */
+    public static ZkTestClientCnxnSocketNIO forNode(Ignite node) {
+        return clients.get(node.name());
+    }
+
+    /**
+     * @param instanceName Ignite instance name.
+     * @return ZK client.
+     */
+    public static ZkTestClientCnxnSocketNIO forNode(String instanceName) {
+        return clients.get(instanceName);
+    }
+
+    /**
+     * @throws IOException If failed.
+     */
+    public ZkTestClientCnxnSocketNIO() throws IOException {
+        super();
+
+        String threadName = Thread.currentThread().getName();
+
+        nodeName = threadName.substring(threadName.indexOf('-') + 1);
+
+        log.info("ZkTestClientCnxnSocketNIO created for node: " + nodeName);
+    }
+
+    /** {@inheritDoc} */
+    @Override void connect(InetSocketAddress addr) throws IOException {
+        CountDownLatch blockConnect = this.blockConnectLatch;
+
+        log.info("ZkTestClientCnxnSocketNIO connect [node=" + nodeName + ", addr=" + addr + ']');
+
+        if (blockConnect != null && blockConnect.getCount() > 0) {
+            try {
+                log.info("ZkTestClientCnxnSocketNIO block connect");
+
+                blockConnect.await();
+
+                log.info("ZkTestClientCnxnSocketNIO finish block connect");
+            }
+            catch (Exception e) {
+                log.error("Error in ZkTestClientCnxnSocketNIO: " + e, e);
+            }
+        }
+
+        super.connect(addr);
+
+        clients.put(nodeName, this);
+    }
+
+    /**
+     *
+     */
+    public void allowConnect() {
+        assert blockConnectLatch != null && blockConnectLatch.getCount() == 1;
+
+        log.info("ZkTestClientCnxnSocketNIO allowConnect [node=" + nodeName + ']');
+
+        blockConnectLatch.countDown();
+    }
+
+    /**
+     * @param blockConnect {@code True} to block client reconnect.
+     * @throws Exception If failed.
+     */
+    public void closeSocket(boolean blockConnect) throws Exception {
+        if (blockConnect)
+            blockConnectLatch = new CountDownLatch(1);
+
+        log.info("ZkTestClientCnxnSocketNIO closeSocket [node=" + nodeName + ", block=" + blockConnect + ']');
+
+        SelectionKey k = GridTestUtils.getFieldValue(this, ClientCnxnSocketNIO.class, "sockKey");
+
+        k.channel().close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/42bbed0a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
deleted file mode 100644
index ee0209b..0000000
--- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
+++ /dev/null
@@ -1,293 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.spi.discovery.zk;
-
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.CountDownLatch;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.internal.managers.discovery.JoiningNodesAware;
-import org.apache.ignite.lang.IgniteProductVersion;
-import org.apache.ignite.resources.LoggerResource;
-import org.apache.ignite.spi.IgniteSpiAdapter;
-import org.apache.ignite.spi.IgniteSpiContext;
-import org.apache.ignite.spi.IgniteSpiException;
-import org.apache.ignite.spi.IgniteSpiMultipleInstancesSupport;
-import org.apache.ignite.spi.discovery.DiscoveryMetricsProvider;
-import org.apache.ignite.spi.discovery.DiscoverySpi;
-import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
-import org.apache.ignite.spi.discovery.DiscoverySpiDataExchange;
-import org.apache.ignite.spi.discovery.DiscoverySpiHistorySupport;
-import org.apache.ignite.spi.discovery.DiscoverySpiListener;
-import org.apache.ignite.spi.discovery.DiscoverySpiNodeAuthenticator;
-import org.apache.ignite.spi.discovery.DiscoverySpiOrderSupport;
-import org.apache.ignite.spi.discovery.zk.internal.ZookeeperClusterNode;
-import org.apache.ignite.spi.discovery.zk.internal.ZookeeperDiscoveryImpl;
-import org.jetbrains.annotations.Nullable;
-
-/**
- *
- */
-@IgniteSpiMultipleInstancesSupport(true)
-@DiscoverySpiOrderSupport(true)
-@DiscoverySpiHistorySupport(true)
-public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, JoiningNodesAware {
-    /** */
-    private String zkConnectionString;
-
-    /** */
-    private int sesTimeout = 5000;
-
-    /** */
-    private String basePath = "/apacheIgnite";
-
-    /** */
-    private String clusterName = "default";
-
-    /** */
-    private DiscoverySpiListener lsnr;
-
-    /** */
-    private DiscoverySpiDataExchange exchange;
-
-    /** */
-    private DiscoverySpiNodeAuthenticator auth;
-
-    /** */
-    private DiscoveryMetricsProvider metricsProvider;
-
-    /** */
-    private ZookeeperDiscoveryImpl impl;
-
-    /** */
-    private Map<String, Object> locNodeAttrs;
-
-    /** */
-    private IgniteProductVersion locNodeVer;
-
-    /** */
-    private Serializable consistentId;
-
-    /** */
-    @LoggerResource
-    private IgniteLogger log;
-
-    public String getBasePath() {
-        return basePath;
-    }
-
-    public ZookeeperDiscoverySpi setBasePath(String basePath) {
-        this.basePath = basePath;
-
-        return this;
-    }
-
-    public String getClusterName() {
-        return clusterName;
-    }
-
-    public ZookeeperDiscoverySpi setClusterName(String clusterName) {
-        this.clusterName = clusterName;
-
-        return this;
-    }
-
-    public int getSessionTimeout() {
-        return sesTimeout;
-    }
-
-    public ZookeeperDiscoverySpi setSessionTimeout(int sesTimeout) {
-        this.sesTimeout = sesTimeout;
-
-        return this;
-    }
-
-    public String getZkConnectionString() {
-        return zkConnectionString;
-    }
-
-    public ZookeeperDiscoverySpi setZkConnectionString(String zkConnectionString) {
-        this.zkConnectionString = zkConnectionString;
-
-        return this;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean knownNode(UUID nodeId) {
-        return impl.knownNode(nodeId);
-    }
-
-    /** {@inheritDoc} */
-    @Nullable @Override public Serializable consistentId() throws IgniteSpiException {
-        return consistentId;
-    }
-
-    /** {@inheritDoc} */
-    @Override public Collection<ClusterNode> getRemoteNodes() {
-        return impl.remoteNodes();
-    }
-
-    /** {@inheritDoc} */
-    @Override public ClusterNode getLocalNode() {
-        return impl != null ? impl.localNode() : null;
-    }
-
-    /** {@inheritDoc} */
-    @Nullable @Override public ClusterNode getNode(UUID nodeId) {
-        return impl.node(nodeId);
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean pingNode(UUID nodeId) {
-        return impl.pingNode(nodeId);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setNodeAttributes(Map<String, Object> attrs, IgniteProductVersion ver) {
-        assert locNodeAttrs == null;
-        assert locNodeVer == null;
-
-        if (log.isDebugEnabled()) {
-            log.debug("Node attributes to set: " + attrs);
-            log.debug("Node version to set: " + ver);
-        }
-
-        locNodeAttrs = attrs;
-        locNodeVer = ver;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setListener(@Nullable DiscoverySpiListener lsnr) {
-        this.lsnr = lsnr;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setDataExchange(DiscoverySpiDataExchange exchange) {
-        this.exchange = exchange;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setMetricsProvider(DiscoveryMetricsProvider metricsProvider) {
-        this.metricsProvider = metricsProvider;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void disconnect() throws IgniteSpiException {
-        // TODO ZK
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setAuthenticator(DiscoverySpiNodeAuthenticator auth) {
-        // TODO ZK
-        this.auth = auth;
-    }
-
-    /** {@inheritDoc} */
-    @Override public long getGridStartTime() {
-        return impl.gridStartTime();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void sendCustomEvent(DiscoverySpiCustomMessage msg) {
-        impl.sendCustomMessage(msg);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void failNode(UUID nodeId, @Nullable String warning) {
-        // TODO ZK
-        throw new UnsupportedOperationException();
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean isClientMode() throws IllegalStateException {
-        return impl.localNode().isClient();
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void onContextInitialized0(IgniteSpiContext spiCtx) throws IgniteSpiException {
-        super.onContextInitialized0(spiCtx);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void spiStart(@Nullable String igniteInstanceName) throws IgniteSpiException {
-        ZookeeperClusterNode locNode = initLocalNode();
-
-        log.info("Start Zookeeper discovery [zkConnectionString=" + zkConnectionString +
-            ", sesTimeout=" + sesTimeout +
-            ", basePath=" + basePath +
-            ", clusterName=" + clusterName + ']');
-
-        impl = new ZookeeperDiscoveryImpl(log,
-            basePath,
-            clusterName,
-            locNode,
-            lsnr,
-            exchange);
-
-        try {
-            impl.joinTopology(igniteInstanceName, zkConnectionString, sesTimeout);
-        }
-        catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-
-            throw new IgniteSpiException("Failed to join cluster, thread was interrupted", e);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void spiStop() throws IgniteSpiException {
-        if (impl != null)
-            impl.stop();
-    }
-
-    /**
-     * @return Local node instance.
-     */
-    private ZookeeperClusterNode initLocalNode() {
-        assert ignite != null;
-
-        consistentId = ignite.configuration().getConsistentId();
-
-        UUID nodeId = ignite.configuration().getNodeId();
-
-        // TODO ZK
-        if (consistentId == null)
-            consistentId = nodeId;
-
-        ZookeeperClusterNode locNode = new ZookeeperClusterNode(nodeId,
-            locNodeVer,
-            locNodeAttrs,
-            consistentId,
-            ignite.configuration().isClientMode());
-
-        locNode.local(true);
-
-        DiscoverySpiListener lsnr = this.lsnr;
-
-        if (lsnr != null)
-            lsnr.onLocalNodeInitialized(locNode);
-
-        if (log.isDebugEnabled())
-            log.debug("Local node initialized: " + locNode);
-
-        return locNode;
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/42bbed0a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAliveNodeData.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAliveNodeData.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAliveNodeData.java
deleted file mode 100644
index 45f453f..0000000
--- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAliveNodeData.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.spi.discovery.zk.internal;
-
-import java.io.Serializable;
-import org.apache.ignite.internal.util.typedef.internal.S;
-
-/**
- *
- */
-public class ZkAliveNodeData implements Serializable {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** */
-    long lastProcEvt = -1;
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(ZkAliveNodeData.class, this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/42bbed0a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkClusterNodes.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkClusterNodes.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkClusterNodes.java
deleted file mode 100644
index e3e5f8b..0000000
--- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkClusterNodes.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.spi.discovery.zk.internal;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-import org.apache.ignite.cluster.ClusterNode;
-
-/**
- *
- */
-public class ZkClusterNodes {
-    /** */
-    final ConcurrentSkipListMap<Long, ZookeeperClusterNode> nodesByOrder = new ConcurrentSkipListMap<>();
-
-    /** */
-    final ConcurrentSkipListMap<Integer, ZookeeperClusterNode> nodesByInternalId = new ConcurrentSkipListMap<>();
-
-    /** */
-    final ConcurrentHashMap<UUID, ZookeeperClusterNode> nodesById = new ConcurrentHashMap<>();
-
-    /**
-     * @return Remote nodes.
-     */
-    public Collection<ClusterNode> remoteNodes() {
-        // TODO ZK
-        List<ClusterNode> nodes = new ArrayList<>();
-
-        for (ClusterNode node : nodesById.values()) {
-            if (!node.isLocal())
-                nodes.add(node);
-        }
-
-        return nodes;
-    }
-
-    /**
-     * @param node New node.
-     */
-    void addNode(ZookeeperClusterNode node) {
-        assert node.id() != null : node;
-        assert node.order() > 0 : node;
-
-        ZookeeperClusterNode old = nodesById.put(node.id(), node);
-
-        assert old == null : old;
-
-        old = nodesByOrder.put(node.order(), node);
-
-        assert old == null : old;
-
-        old = nodesByInternalId.put(node.internalId(), node);
-
-        assert old == null : old;
-    }
-
-    ZookeeperClusterNode removeNode(int internalId) {
-        ZookeeperClusterNode node = nodesByInternalId.remove(internalId);
-
-        assert node != null : internalId;
-        assert node.order() > 0 : node;
-
-        Object rvmd = nodesByOrder.remove(node.order());
-
-        assert rvmd != null;
-
-        rvmd = nodesById.remove(node.id());
-
-        assert rvmd != null;
-
-        return node;
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/42bbed0a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java
deleted file mode 100644
index 2e50831..0000000
--- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.spi.discovery.zk.internal;
-
-import java.util.UUID;
-import org.apache.ignite.internal.events.DiscoveryCustomEvent;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
-
-/**
- *
- */
-class ZkDiscoveryCustomEventData extends ZkDiscoveryEventData {
-    /** */
-    private static final int CUSTOM_MSG_ACK_FLAG = 1;
-
-    /** */
-    final UUID sndNodeId;
-
-    /** */
-    final String evtPath;
-
-    /** */
-    transient DiscoverySpiCustomMessage msg;
-
-    /**
-     * @param evtId Event ID.
-     * @param topVer Topology version.
-     * @param sndNodeId Sender node ID.
-     * @param evtPath Event path.
-     * @param ack Acknowledge event flag.
-     */
-    ZkDiscoveryCustomEventData(long evtId, long topVer, UUID sndNodeId, String evtPath, boolean ack) {
-        super(evtId, DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT, topVer);
-
-        assert sndNodeId != null;
-        assert !F.isEmpty(evtPath);
-
-        this.sndNodeId = sndNodeId;
-        this.evtPath = evtPath;
-
-        if (ack)
-            flags |= CUSTOM_MSG_ACK_FLAG;
-    }
-
-    /**
-     * @return {@code True} for custom event ack message.
-     */
-    boolean ackEvent() {
-        return flagSet(CUSTOM_MSG_ACK_FLAG);
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return "ZkDiscoveryCustomEventData [topVer=" + topologyVersion() + ", evtId=" + eventId() + ", sndNode=" + sndNodeId + ']';
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/42bbed0a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java
deleted file mode 100644
index 00330e4..0000000
--- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.spi.discovery.zk.internal;
-
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.Set;
-import org.apache.ignite.internal.util.typedef.internal.U;
-
-import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
-import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
-import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
-
-/**
- *
- */
-abstract class ZkDiscoveryEventData implements Serializable {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** */
-    private final long evtId;
-
-    /** */
-    private final int evtType;
-
-    /** */
-    private final long topVer;
-
-    /** */
-    private transient Set<Integer> remainingAcks;
-
-    /** */
-    int flags;
-
-    /**
-     * @param evtType Event type.
-     * @param topVer Topology version.
-     */
-    ZkDiscoveryEventData(long evtId, int evtType, long topVer) {
-        assert evtType == EVT_NODE_JOINED || evtType == EVT_NODE_FAILED || evtType == EVT_DISCOVERY_CUSTOM_EVT : evtType;
-
-        this.evtId = evtId;
-        this.evtType = evtType;
-        this.topVer = topVer;
-    }
-
-    void remainingAcks(Collection<ZookeeperClusterNode> nodes) {
-        assert remainingAcks == null : this;
-
-        remainingAcks = U.newHashSet(nodes.size());
-
-        for (ZookeeperClusterNode node : nodes) {
-            if (!node.isLocal() && node.order() <= topVer)
-                remainingAcks.add(node.internalId());
-        }
-    }
-
-    boolean allAcksReceived() {
-        return remainingAcks.isEmpty();
-    }
-
-    boolean onAckReceived(Integer nodeInternalId, long ackEvtId) {
-        assert remainingAcks != null;
-
-        if (ackEvtId >= evtId)
-            remainingAcks.remove(nodeInternalId);
-
-        return remainingAcks.isEmpty();
-    }
-
-    boolean onNodeFail(ZookeeperClusterNode node) {
-        assert remainingAcks != null : this;
-
-        remainingAcks.remove(node.internalId());
-
-        return remainingAcks.isEmpty();
-    }
-
-    boolean flagSet(int flag) {
-        return (flags & flag) == flag;
-    }
-
-    long eventId() {
-        return evtId;
-    }
-
-    int eventType() {
-        return evtType;
-    }
-
-    long topologyVersion() {
-        return topVer;
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/42bbed0a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java
deleted file mode 100644
index ce21a06..0000000
--- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.spi.discovery.zk.internal;
-
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.TreeMap;
-
-/**
- *
- */
-class ZkDiscoveryEventsData implements Serializable {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** */
-    int procCustEvt = -1;
-
-    /** */
-    long evtIdGen;
-
-    /** */
-    long topVer;
-
-    /** */
-    long gridStartTime;
-
-    /** */
-    TreeMap<Long, ZkDiscoveryEventData> evts;
-
-    /**
-     * @param topVer
-     * @param gridStartTime
-     * @param evts
-     */
-    ZkDiscoveryEventsData(long gridStartTime, long topVer, TreeMap<Long, ZkDiscoveryEventData> evts) {
-        this.gridStartTime = gridStartTime;
-        this.topVer = topVer;
-        this.evts = evts;
-    }
-
-    /**
-     * @param evt Event.
-     */
-    void addEvent(Collection<ZookeeperClusterNode> nodes, ZkDiscoveryEventData evt) {
-        Object old = evts.put(evt.eventId(), evt);
-
-        assert old == null : old;
-
-        evt.remainingAcks(nodes);
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/42bbed0a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeFailEventData.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeFailEventData.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeFailEventData.java
deleted file mode 100644
index 227bb94..0000000
--- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeFailEventData.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.spi.discovery.zk.internal;
-
-import org.apache.ignite.events.EventType;
-
-/**
- *
- */
-class ZkDiscoveryNodeFailEventData extends ZkDiscoveryEventData {
-    /** */
-    private int failedNodeInternalId;
-
-    /**
-     * @param evtId Event ID.
-     * @param topVer Topology version.
-     * @param failedNodeInternalId Failed node ID.
-     */
-    ZkDiscoveryNodeFailEventData(long evtId, long topVer, int failedNodeInternalId) {
-        super(evtId, EventType.EVT_NODE_FAILED, topVer);
-
-        this.failedNodeInternalId = failedNodeInternalId;
-    }
-
-    /**
-     * @return Failed node ID.
-     */
-    int failedNodeInternalId() {
-        return failedNodeInternalId;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return "NodeFailEventData [topVer=" + topologyVersion() + ", nodeId=" + failedNodeInternalId + ']';
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/42bbed0a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java
deleted file mode 100644
index 5a828dc..0000000
--- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.spi.discovery.zk.internal;
-
-import java.util.UUID;
-import org.apache.ignite.events.EventType;
-
-/**
- *
- */
-class ZkDiscoveryNodeJoinEventData extends ZkDiscoveryEventData {
-    /** */
-    final int joinedInternalId;
-
-    /** */
-    final UUID nodeId;
-
-    /** */
-    transient ZkJoiningNodeData joiningNodeData;
-
-    /**
-     * @param evtId Event ID.
-     * @param topVer Topology version.
-     * @param nodeId Joined node ID.
-     * @param joinedInternalId Joined node internal ID.
-     */
-    ZkDiscoveryNodeJoinEventData(long evtId, long topVer, UUID nodeId, int joinedInternalId) {
-        super(evtId, EventType.EVT_NODE_JOINED, topVer);
-
-        this.nodeId = nodeId;
-        this.joinedInternalId = joinedInternalId;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return "NodeJoinEventData [topVer=" + topologyVersion() + ", node=" + nodeId + ']';
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/42bbed0a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkEventAckFuture.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkEventAckFuture.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkEventAckFuture.java
deleted file mode 100644
index c89b586..0000000
--- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkEventAckFuture.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.spi.discovery.zk.internal;
-
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.internal.util.future.GridFutureAdapter;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.zookeeper.AsyncCallback;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.data.Stat;
-import org.jetbrains.annotations.Nullable;
-
-/**
- *
- */
-public class ZkEventAckFuture extends GridFutureAdapter<Void> implements Watcher, AsyncCallback.Children2Callback {
-    /** */
-    private final IgniteLogger log;
-
-    /** */
-    private final ZookeeperDiscoveryImpl impl;
-
-    /** */
-    private final Long evtId;
-
-    /** */
-    private final String evtPath;
-
-    /** */
-    private final int expAcks;
-
-    /** */
-    private final Set<Integer> remaininAcks;
-
-    /**
-     * @param impl
-     * @param evtPath
-     * @param evtId
-     */
-    ZkEventAckFuture(ZookeeperDiscoveryImpl impl, String evtPath, Long evtId) {
-        this.impl = impl;
-        this.log = impl.log();
-        this.evtPath = evtPath;
-        this.evtId = evtId;
-
-        ZkClusterNodes top = impl.nodes();
-
-        remaininAcks = U.newHashSet(top.nodesById.size());
-
-        for (ZookeeperClusterNode node : top.nodesByInternalId.values()) {
-            if (!node.isLocal())
-                remaininAcks.add(node.internalId());
-        }
-
-        expAcks = remaininAcks.size();
-
-        if (expAcks == 0)
-            onDone();
-        else
-            impl.zkClient().getChildrenAsync(evtPath, this, this);
-    }
-
-    /**
-     * @return Event ID.
-     */
-    Long eventId() {
-        return evtId;
-    }
-
-    /**
-     * @param node Failed node.
-     */
-    void onNodeFail(ZookeeperClusterNode node) {
-        assert !remaininAcks.isEmpty();
-
-        if (remaininAcks.remove(node.internalId()) && remaininAcks.isEmpty())
-            onDone();
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean onDone(@Nullable Void res, @Nullable Throwable err) {
-        if (super.onDone(res, err)) {
-            return true;
-        }
-
-        return false;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void process(WatchedEvent evt) {
-        if (isDone())
-            return;
-
-        if (evt.getType() == Event.EventType.NodeChildrenChanged) {
-            if (evtPath.equals(evt.getPath()))
-                impl.zkClient().getChildrenAsync(evtPath, this, this);
-            else
-                U.warn(log, "Received event for unknown path: " + evt.getPath());
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
-        assert rc == 0 : rc;
-
-        if (isDone())
-            return;
-
-        if (expAcks == stat.getCversion()) {
-            log.info("Received expected number of acks [expCnt=" + expAcks + ", cVer=" + stat.getCversion() + ']');
-
-            onDone();
-        }
-        else {
-            for (int i = 0; i < children.size(); i++) {
-                Integer nodeInternalId = Integer.parseInt(children.get(i));
-
-                if (remaininAcks.remove(nodeInternalId) && remaininAcks.size() == 0)
-                    onDone();
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/42bbed0a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
deleted file mode 100644
index f0fcaca..0000000
--- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.spi.discovery.zk.internal;
-
-import java.util.UUID;
-
-/**
- *
- */
-class ZkIgnitePaths {
-    /** */
-    private static final int UUID_LEN = 36;
-
-    /** */
-    private static final String JOIN_DATA_DIR = "joinData";
-
-    /** */
-    private static final String CUSTOM_EVTS_DIR = "customEvts";
-
-    /** */
-    private static final String CUSTOM_EVTS_ACKS_DIR = "customEvtsAcks";
-
-    /** */
-    private static final String ALIVE_NODES_DIR = "alive";
-
-    /** */
-    private static final String DISCO_EVENTS_PATH = "events";
-
-    /** */
-    final String basePath;
-
-    /** */
-    private final String clusterName;
-
-    /** */
-    final String clusterDir;
-
-    /** */
-    final String aliveNodesDir;
-
-    /** */
-    final String joinDataDir;
-
-    /** */
-    final String evtsPath;
-
-    /** */
-    final String customEvtsDir;
-
-    /** */
-    final String customEvtsAcksDir;
-
-    /**
-     * @param basePath Base directory.
-     * @param clusterName Cluster name.
-     */
-    ZkIgnitePaths(String basePath, String clusterName) {
-        this.basePath = basePath;
-        this.clusterName = clusterName;
-
-        clusterDir = basePath + "/" + clusterName;
-
-        aliveNodesDir = zkPath(ALIVE_NODES_DIR);
-        joinDataDir = zkPath(JOIN_DATA_DIR);
-        evtsPath = zkPath(DISCO_EVENTS_PATH);
-        customEvtsDir = zkPath(CUSTOM_EVTS_DIR);
-        customEvtsAcksDir = zkPath(CUSTOM_EVTS_ACKS_DIR);
-    }
-
-    /**
-     * @param path Relative path.
-     * @return Full path.
-     */
-    String zkPath(String path) {
-        return basePath + "/" + clusterName + "/" + path;
-    }
-
-    static int aliveInternalId(String path) {
-        int idx = path.lastIndexOf('|');
-
-        return Integer.parseInt(path.substring(idx + 1));
-    }
-
-    static UUID aliveNodeId(String path) {
-        String idStr = path.substring(0, ZkIgnitePaths.UUID_LEN);
-
-        return UUID.fromString(idStr);
-    }
-
-    static int aliveJoinSequence(String path) {
-        int idx1 = path.indexOf('|');
-        int idx2 = path.lastIndexOf('|');
-
-        return Integer.parseInt(path.substring(idx1 + 1, idx2));
-    }
-
-    static int customEventSequence(String path) {
-        int idx = path.lastIndexOf('|');
-
-        return Integer.parseInt(path.substring(idx + 1));
-    }
-
-    static UUID customEventSendNodeId(String path) {
-        String idStr = path.substring(0, ZkIgnitePaths.UUID_LEN);
-
-        return UUID.fromString(idStr);
-    }
-
-    String joinEventDataPath(long evtId) {
-        return evtsPath + "/" + evtId;
-    }
-
-    String joinEventDataPathForJoined(long evtId) {
-        return evtsPath + "/joined-" + evtId;
-    }
-
-    String customEventDataPath(boolean ack, String child) {
-        String baseDir = ack ? customEvtsAcksDir : customEvtsDir;
-
-        return baseDir + "/" + child;
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/42bbed0a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoinEventDataForJoined.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoinEventDataForJoined.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoinEventDataForJoined.java
deleted file mode 100644
index cdbfdc0..0000000
--- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoinEventDataForJoined.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.spi.discovery.zk.internal;
-
-import java.io.Serializable;
-import java.util.List;
-import java.util.Map;
-
-/**
- *
- */
-class ZkJoinEventDataForJoined implements Serializable {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** */
-    private final List<ZookeeperClusterNode> top;
-
-    /** */
-    private final Map<Integer, Serializable> discoData;
-
-    /**
-     * @param top Topology.
-     * @param discoData Discovery data.
-     */
-    ZkJoinEventDataForJoined(List<ZookeeperClusterNode> top, Map<Integer, Serializable> discoData) {
-        this.top = top;
-        this.discoData = discoData;
-    }
-
-    List<ZookeeperClusterNode> topology() {
-        return top;
-    }
-
-    Map<Integer, Serializable> discoveryData() {
-        return discoData;
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/42bbed0a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoiningNodeData.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoiningNodeData.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoiningNodeData.java
deleted file mode 100644
index 1947b6b..0000000
--- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoiningNodeData.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.spi.discovery.zk.internal;
-
-import java.io.Serializable;
-import java.util.Map;
-
-/**
- *
- */
-class ZkJoiningNodeData implements Serializable {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** */
-    private final ZookeeperClusterNode node;
-
-    /** */
-    private final Map<Integer, Serializable> discoData;
-
-    /**
-     * @param node Node.
-     * @param discoData Discovery data.
-     */
-    ZkJoiningNodeData(ZookeeperClusterNode node, Map<Integer, Serializable> discoData) {
-        assert node != null && node.id() != null : node;
-        assert discoData != null;
-
-        this.node = node;
-        this.discoData = discoData;
-    }
-
-    /**
-     * @return Node.
-     */
-    ZookeeperClusterNode node() {
-        return node;
-    }
-
-    /**
-     * @return Discovery data.
-     */
-    Map<Integer, Serializable> discoveryData() {
-        return discoData;
-    }
-}