You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2018/04/10 08:59:34 UTC

[06/12] ignite git commit: IGNITE-7222 Added ZooKeeper discovery SPI

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveStartMessage.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveStartMessage.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveStartMessage.java
new file mode 100644
index 0000000..0c79c36
--- /dev/null
+++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveStartMessage.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.util.UUID;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Zk Communication Error Resolve Start Message.
+ */
+public class ZkCommunicationErrorResolveStartMessage implements DiscoverySpiCustomMessage, ZkInternalMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    final UUID id;
+
+    /**
+     * @param id Unique ID.
+     */
+    ZkCommunicationErrorResolveStartMessage(UUID id) {
+        this.id = id;
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public DiscoverySpiCustomMessage ackMessage() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isMutable() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean stopProcess() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(ZkCommunicationErrorResolveStartMessage.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationFailureContext.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationFailureContext.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationFailureContext.java
new file mode 100644
index 0000000..d27b717
--- /dev/null
+++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationFailureContext.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.CommunicationFailureContext;
+import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
+import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ *
+ */
+class ZkCommunicationFailureContext implements CommunicationFailureContext {
+    /** */
+    private static final Comparator<ClusterNode> NODE_ORDER_CMP = new Comparator<ClusterNode>() {
+        @Override public int compare(ClusterNode node1, ClusterNode node2) {
+            return Long.compare(node1.order(), node2.order());
+        }
+    };
+
+    /** */
+    private Set<ClusterNode> killedNodes = new HashSet<>();
+
+    /** */
+    private final Map<UUID, BitSet> nodesState;
+
+    /** */
+    private final List<ClusterNode> initialNodes;
+
+    /** */
+    private final List<ClusterNode> curNodes;
+
+    /** */
+    private final GridCacheSharedContext<?, ?> ctx;
+
+    /**
+     * @param ctx Context.
+     * @param curNodes Current topology snapshot.
+     * @param initialNodes Topology snapshot when communication error resolve started.
+     * @param nodesState Nodes communication state.
+     */
+    ZkCommunicationFailureContext(
+        GridCacheSharedContext<?, ?> ctx,
+        List<ClusterNode> curNodes,
+        List<ClusterNode> initialNodes,
+        Map<UUID, BitSet> nodesState)
+    {
+        this.ctx = ctx;
+        this.curNodes = Collections.unmodifiableList(curNodes);
+        this.initialNodes = initialNodes;
+        this.nodesState = nodesState;
+    }
+
+    /** {@inheritDoc} */
+    @Override public List<ClusterNode> topologySnapshot() {
+        return curNodes;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean connectionAvailable(ClusterNode node1, ClusterNode node2) {
+        BitSet nodeState = nodesState.get(node1.id());
+
+        if (nodeState == null)
+            throw new IllegalArgumentException("Invalid node: " + node1);
+
+        int nodeIdx = Collections.binarySearch(initialNodes, node2, NODE_ORDER_CMP);
+
+        if (nodeIdx < 0)
+            throw new IllegalArgumentException("Invalid node: " + node2);
+
+        assert nodeIdx < nodeState.size() : nodeIdx;
+
+        return nodeState.get(nodeIdx);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Map<String, CacheConfiguration<?, ?>> startedCaches() {
+        Map<Integer, DynamicCacheDescriptor> cachesMap = ctx.affinity().caches();
+
+        Map<String, CacheConfiguration<?, ?>> res = U.newHashMap(cachesMap.size());
+
+        for (DynamicCacheDescriptor desc : cachesMap.values()) {
+            if (desc.cacheType().userCache())
+                res.put(desc.cacheName(), desc.cacheConfiguration());
+        }
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public List<List<ClusterNode>> cacheAffinity(String cacheName) {
+        if (cacheName == null)
+            throw new NullPointerException("Null cache name.");
+
+        DynamicCacheDescriptor cacheDesc = ctx.affinity().caches().get(CU.cacheId(cacheName));
+
+        if (cacheDesc == null)
+            throw new IllegalArgumentException("Invalid cache name: " + cacheName);
+
+        GridAffinityAssignmentCache aff = ctx.affinity().groupAffinity(cacheDesc.groupId());
+
+        assert aff != null : cacheName;
+
+        return aff.readyAssignments(aff.lastVersion());
+    }
+
+    /** {@inheritDoc} */
+    @Override public List<List<ClusterNode>> cachePartitionOwners(String cacheName) {
+        if (cacheName == null)
+            throw new NullPointerException("Null cache name.");
+
+        DynamicCacheDescriptor cacheDesc = ctx.affinity().caches().get(CU.cacheId(cacheName));
+
+        if (cacheDesc == null)
+            throw new IllegalArgumentException("Invalid cache name: " + cacheName);
+
+        if (cacheDesc.cacheConfiguration().getCacheMode() == CacheMode.LOCAL)
+            return Collections.emptyList();
+
+        CacheGroupContext grp = ctx.cache().cacheGroup(cacheDesc.groupId());
+
+        GridDhtPartitionTopology top;
+
+        if (grp == null) {
+            top = ctx.exchange().clientTopologyIfExists(cacheDesc.groupId());
+
+            assert top != null : cacheName;
+        }
+        else
+            top = grp.topology();
+
+        return top.allOwners();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void killNode(ClusterNode node) {
+        if (node == null)
+            throw new NullPointerException();
+
+        if (Collections.binarySearch(curNodes, node, NODE_ORDER_CMP) < 0)
+            throw new IllegalArgumentException("Invalid node: " + node);
+
+        killedNodes.add(node);
+    }
+
+    /**
+     * @return Nodes to fail.
+     */
+    Set<ClusterNode> killedNodes() {
+        return killedNodes;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return "ZkCommunicationFailureContext []";
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java
new file mode 100644
index 0000000..21dfe62
--- /dev/null
+++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.util.UUID;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
+
+/**
+ *
+ */
+class ZkDiscoveryCustomEventData extends ZkDiscoveryEventData {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    final long origEvtId;
+
+    /** */
+    final UUID sndNodeId;
+
+    /** */
+    final String evtPath;
+
+    /** Message instance (can be marshalled as part of ZkDiscoveryCustomEventData or stored in separate znode. */
+    DiscoverySpiCustomMessage msg;
+
+    /** Unmarshalled message. */
+    transient DiscoverySpiCustomMessage resolvedMsg;
+
+    /**
+     * @param evtId Event ID.
+     * @param origEvtId For acknowledge events ID of original event.
+     * @param topVer Topology version.
+     * @param sndNodeId Sender node ID.
+     * @param msg Message instance.
+     * @param evtPath Event path.
+     */
+    ZkDiscoveryCustomEventData(
+        long evtId,
+        long origEvtId,
+        long topVer,
+        UUID sndNodeId,
+        DiscoverySpiCustomMessage msg,
+        String evtPath)
+    {
+        super(evtId, ZK_EVT_CUSTOM_EVT, topVer);
+
+        assert sndNodeId != null;
+        assert msg != null || origEvtId != 0 || !F.isEmpty(evtPath);
+
+        this.origEvtId = origEvtId;
+        this.msg = msg;
+        this.sndNodeId = sndNodeId;
+        this.evtPath = evtPath;
+    }
+
+    /**
+     * @return {@code True} for custom event ack message.
+     */
+    boolean ackEvent() {
+        return origEvtId != 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return "ZkDiscoveryCustomEventData [" +
+            "evtId=" + eventId() +
+            ", topVer=" + topologyVersion() +
+            ", sndNode=" + sndNodeId +
+            ", ack=" + ackEvent() +
+            ']';
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java
new file mode 100644
index 0000000..d667a17
--- /dev/null
+++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Set;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ *
+ */
+abstract class ZkDiscoveryEventData implements Serializable {
+    /** */
+    static final byte ZK_EVT_NODE_JOIN = 1;
+
+    /** */
+    static final byte ZK_EVT_NODE_FAILED = 2;
+
+    /** */
+    static final byte ZK_EVT_CUSTOM_EVT = 3;
+
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private final long evtId;
+
+    /** */
+    private final byte evtType;
+
+    /** */
+    private final long topVer;
+
+    /** */
+    private transient Set<Long> remainingAcks;
+
+    /** */
+    int flags;
+
+    /**
+     * @param evtId Event ID.
+     * @param evtType Event type.
+     * @param topVer Topology version.
+     */
+    ZkDiscoveryEventData(long evtId, byte evtType, long topVer) {
+        assert evtType == ZK_EVT_NODE_JOIN || evtType == ZK_EVT_NODE_FAILED || evtType == ZK_EVT_CUSTOM_EVT : evtType;
+
+        this.evtId = evtId;
+        this.evtType = evtType;
+        this.topVer = topVer;
+    }
+
+    /**
+     * @param nodes Current nodes in topology.
+     */
+    void initRemainingAcks(Collection<ZookeeperClusterNode> nodes) {
+        assert remainingAcks == null : this;
+
+        remainingAcks = U.newHashSet(nodes.size());
+
+        for (ZookeeperClusterNode node : nodes) {
+            if (!node.isLocal() && node.order() <= topVer) {
+                boolean add = remainingAcks.add(node.internalId());
+
+                assert add : node;
+            }
+        }
+    }
+
+    /**
+     * @param node Node.
+     */
+    void addRemainingAck(ZookeeperClusterNode node) {
+        assert node.order() <= topVer : node;
+
+        boolean add = remainingAcks.add(node.internalId());
+
+        assert add : node;
+    }
+
+    /**
+     * @return {@code True} if all nodes processed event.
+     */
+    boolean allAcksReceived() {
+        return remainingAcks.isEmpty();
+    }
+
+    /**
+     * @return Remaining acks.
+     */
+    Set<Long> remainingAcks() {
+        return remainingAcks;
+    }
+
+    /**
+     * @param nodeInternalId Node ID.
+     * @param ackEvtId Last event ID processed on node.
+     * @return {@code True} if all nodes processed event.
+     */
+    boolean onAckReceived(Long nodeInternalId, long ackEvtId) {
+        assert remainingAcks != null;
+
+        if (ackEvtId >= evtId)
+            remainingAcks.remove(nodeInternalId);
+
+        return remainingAcks.isEmpty();
+    }
+
+    /**
+     * @param node Failed node.
+     * @return {@code True} if all nodes processed event.
+     */
+    boolean onNodeFail(ZookeeperClusterNode node) {
+        assert remainingAcks != null : this;
+
+        remainingAcks.remove(node.internalId());
+
+        return remainingAcks.isEmpty();
+    }
+
+    /**
+     * @param flag Flag mask.
+     * @return {@code True} if flag set.
+     */
+    boolean flagSet(int flag) {
+        return (flags & flag) == flag;
+    }
+
+    /**
+     * @return Event ID.
+     */
+    long eventId() {
+        return evtId;
+    }
+
+    /**
+     * @return Event type.
+     */
+    byte eventType() {
+        return evtType;
+    }
+
+    /**
+     * @return Event topology version.
+     */
+    long topologyVersion() {
+        return topVer;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java
new file mode 100644
index 0000000..dce861b
--- /dev/null
+++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.TreeMap;
+import java.util.UUID;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+class ZkDiscoveryEventsData implements Serializable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Unique cluster ID (generated when first node in cluster starts). */
+    final UUID clusterId;
+
+    /** Internal order of last processed custom event. */
+    long procCustEvt = -1;
+
+    /** Event ID counter. */
+    long evtIdGen;
+
+    /** Current topology version. */
+    long topVer;
+
+    /** Max node internal order in cluster. */
+    long maxInternalOrder;
+
+    /** Cluster start time (recorded when first node in cluster starts). */
+    final long clusterStartTime;
+
+    /** Events to process. */
+    final TreeMap<Long, ZkDiscoveryEventData> evts;
+
+    /** ID of current active communication error resolve process. */
+    private UUID commErrFutId;
+
+    /**
+     * @param clusterStartTime Start time of first node in cluster.
+     * @return Events.
+     */
+    static ZkDiscoveryEventsData createForNewCluster(long clusterStartTime) {
+        return new ZkDiscoveryEventsData(
+            UUID.randomUUID(),
+            clusterStartTime,
+            1L,
+            new TreeMap<Long, ZkDiscoveryEventData>()
+        );
+    }
+
+    /**
+     * @param clusterId Cluster ID.
+     * @param topVer Current topology version.
+     * @param clusterStartTime Cluster start time.
+     * @param evts Events history.
+     */
+    private ZkDiscoveryEventsData(
+        UUID clusterId,
+        long clusterStartTime,
+        long topVer,
+        TreeMap<Long, ZkDiscoveryEventData> evts)
+    {
+        this.clusterId = clusterId;
+        this.clusterStartTime = clusterStartTime;
+        this.topVer = topVer;
+        this.evts = evts;
+    }
+
+    /**
+     * @param node Joined node.
+     */
+    void onNodeJoin(ZookeeperClusterNode node) {
+        if (node.internalId() > maxInternalOrder)
+            maxInternalOrder = node.internalId();
+    }
+
+    /**
+     * @return Future ID.
+     */
+    @Nullable UUID communicationErrorResolveFutureId() {
+        return commErrFutId;
+    }
+
+    /**
+     * @param id Future ID.
+     */
+     void communicationErrorResolveFutureId(@Nullable UUID id) {
+        commErrFutId = id;
+    }
+
+    /**
+     * @param nodes Current nodes in topology (these nodes should ack that event processed).
+     * @param evt Event.
+     */
+    void addEvent(Collection<ZookeeperClusterNode> nodes, ZkDiscoveryEventData evt) {
+        Object old = evts.put(evt.eventId(), evt);
+
+        assert old == null : old;
+
+        evt.initRemainingAcks(nodes);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeFailEventData.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeFailEventData.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeFailEventData.java
new file mode 100644
index 0000000..c76158f
--- /dev/null
+++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeFailEventData.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+/**
+ *
+ */
+class ZkDiscoveryNodeFailEventData extends ZkDiscoveryEventData {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private long failedNodeInternalId;
+
+    /**
+     * @param evtId Event ID.
+     * @param topVer Topology version.
+     * @param failedNodeInternalId Failed node ID.
+     */
+    ZkDiscoveryNodeFailEventData(long evtId, long topVer, long failedNodeInternalId) {
+        super(evtId, ZK_EVT_NODE_FAILED, topVer);
+
+        this.failedNodeInternalId = failedNodeInternalId;
+    }
+
+    /**
+     * @return Failed node ID.
+     */
+    long failedNodeInternalId() {
+        return failedNodeInternalId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return "ZkDiscoveryNodeFailEventData [" +
+            "evtId=" + eventId() +
+            ", topVer=" + topologyVersion() +
+            ", nodeId=" + failedNodeInternalId + ']';
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java
new file mode 100644
index 0000000..e46d52d
--- /dev/null
+++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.util.List;
+
+/**
+ *
+ */
+class ZkDiscoveryNodeJoinEventData extends ZkDiscoveryEventData {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    final List<ZkJoinedNodeEvtData> joinedNodes;
+
+    /** */
+    final int dataForJoinedPartCnt;
+
+    /**
+     * @param evtId Event ID.
+     * @param topVer Topology version.
+     * @param joinedNodes Joined nodes data.
+     * @param dataForJoinedPartCnt Data for joined part count.
+     */
+    ZkDiscoveryNodeJoinEventData(
+        long evtId,
+        long topVer,
+        List<ZkJoinedNodeEvtData> joinedNodes,
+        int dataForJoinedPartCnt)
+    {
+        super(evtId, ZK_EVT_NODE_JOIN, topVer);
+
+        this.joinedNodes = joinedNodes;
+        this.dataForJoinedPartCnt = dataForJoinedPartCnt;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return "ZkDiscoveryNodeJoinEventData [" +
+            "evtId=" + eventId() +
+            ", topVer=" + topologyVersion() +
+            ", nodes=" + joinedNodes + ']';
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDistributedCollectDataFuture.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDistributedCollectDataFuture.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDistributedCollectDataFuture.java
new file mode 100644
index 0000000..174d698
--- /dev/null
+++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDistributedCollectDataFuture.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.Stat;
+
+/**
+ *
+ */
+class ZkDistributedCollectDataFuture extends GridFutureAdapter<Void> {
+    /** */
+    private final IgniteLogger log;
+
+    /** */
+    private final String futPath;
+
+    /** */
+    private final Set<Long> remainingNodes;
+
+    /** */
+    private final Callable<Void> lsnr;
+
+    /**
+     * @param impl Disovery impl
+     * @param rtState Runtime state.
+     * @param futPath Future path.
+     * @param lsnr Future listener.
+     * @throws Exception If listener call failed.
+     */
+    ZkDistributedCollectDataFuture(
+        ZookeeperDiscoveryImpl impl,
+        ZkRuntimeState rtState,
+        String futPath,
+        Callable<Void> lsnr)
+        throws Exception
+    {
+        this.log = impl.log();
+        this.futPath = futPath;
+        this.lsnr = lsnr;
+
+        ZkClusterNodes top = rtState.top;
+
+        // Assume new nodes can not join while future is in progress.
+
+        remainingNodes = U.newHashSet(top.nodesByOrder.size());
+
+        for (ZookeeperClusterNode node : top.nodesByInternalId.values())
+            remainingNodes.add(node.order());
+
+        NodeResultsWatcher watcher = new NodeResultsWatcher(rtState, impl);
+
+        if (remainingNodes.isEmpty())
+            completeAndNotifyListener();
+        else {
+            if (log.isInfoEnabled()) {
+                log.info("Initialize data collect future [futPath=" + futPath + ", " +
+                    "remainingNodes=" + remainingNodes.size() + ']');
+            }
+
+            rtState.zkClient.getChildrenAsync(futPath, watcher, watcher);
+        }
+    }
+
+    /**
+     * @throws Exception If listener call failed.
+     */
+    private void completeAndNotifyListener() throws Exception {
+        if (super.onDone())
+            lsnr.call();
+    }
+
+    /**
+     * @param futPath
+     * @param client
+     * @param nodeOrder
+     * @param data
+     * @throws Exception If failed.
+     */
+    static void saveNodeResult(String futPath, ZookeeperClient client, long nodeOrder, byte[] data) throws Exception {
+        client.createIfNeeded(futPath + "/" + nodeOrder, data, CreateMode.PERSISTENT);
+    }
+
+    /**
+     * @param futPath
+     * @param client
+     * @param nodeOrder
+     * @return Node result data.
+     * @throws Exception If fai.ed
+     */
+    static byte[] readNodeResult(String futPath, ZookeeperClient client, long nodeOrder) throws Exception {
+        return client.getData(futPath + "/" + nodeOrder);
+    }
+
+    /**
+     * @param futResPath Result path.
+     * @param client Client.
+     * @param data Result data.
+     * @throws Exception If failed.
+     */
+    static void saveResult(String futResPath, ZookeeperClient client, byte[] data) throws Exception {
+        client.createIfNeeded(futResPath, data, CreateMode.PERSISTENT);
+    }
+
+    static byte[] readResult(ZookeeperClient client, ZkIgnitePaths paths, UUID futId) throws Exception {
+        return client.getData(paths.distributedFutureResultPath(futId));
+    }
+
+    /**
+     * @param client Client.
+     * @param paths Paths utils.
+     * @param futId Future ID.
+     * @param log Ignite Logger.
+     * @throws Exception If failed.
+     */
+    static void deleteFutureData(ZookeeperClient client,
+        ZkIgnitePaths paths,
+        UUID futId,
+        IgniteLogger log
+    ) throws Exception {
+        // TODO ZK: https://issues.apache.org/jira/browse/IGNITE-8189
+        String evtDir = paths.distributedFutureBasePath(futId);
+
+        try {
+            client.deleteAll(evtDir,
+                client.getChildren(evtDir),
+                -1);
+        }
+        catch (KeeperException.NoNodeException e) {
+            U.log(log, "Node for deletion was not found: " + e.getPath());
+
+            // TODO ZK: https://issues.apache.org/jira/browse/IGNITE-8189
+        }
+
+        client.deleteIfExists(evtDir, -1);
+
+        client.deleteIfExists(paths.distributedFutureResultPath(futId), -1);
+    }
+
+    /**
+     * @param top Current topology.
+     * @throws Exception If listener call failed.
+     */
+    void onTopologyChange(ZkClusterNodes top) throws Exception {
+        if (remainingNodes.isEmpty())
+            return;
+
+        for (Iterator<Long> it = remainingNodes.iterator(); it.hasNext();) {
+            Long nodeOrder = it.next();
+
+            if (!top.nodesByOrder.containsKey(nodeOrder)) {
+                it.remove();
+
+                int remaining = remainingNodes.size();
+
+                if (log.isInfoEnabled()) {
+                    log.info("ZkDistributedCollectDataFuture removed remaining failed node [node=" + nodeOrder +
+                        ", remaining=" + remaining +
+                        ", futPath=" + futPath + ']');
+                }
+
+                if (remaining == 0) {
+                    completeAndNotifyListener();
+
+                    break;
+                }
+            }
+        }
+    }
+
+    /**
+     *
+     */
+    class NodeResultsWatcher extends ZkAbstractWatcher implements AsyncCallback.Children2Callback {
+        /**
+         * @param rtState Runtime state.
+         * @param impl Discovery impl.
+         */
+        NodeResultsWatcher(ZkRuntimeState rtState, ZookeeperDiscoveryImpl impl) {
+            super(rtState, impl);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void process0(WatchedEvent evt) {
+            if (evt.getType() == Watcher.Event.EventType.NodeChildrenChanged)
+                rtState.zkClient.getChildrenAsync(evt.getPath(), this, this);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
+            if (!onProcessStart())
+                return;
+
+            try {
+                if (!isDone()) {
+                    assert rc == 0 : KeeperException.Code.get(rc);
+
+                    for (int i = 0; i < children.size(); i++) {
+                        Long nodeOrder = Long.parseLong(children.get(i));
+
+                        if (remainingNodes.remove(nodeOrder)) {
+                            int remaining = remainingNodes.size();
+
+                            if (log.isInfoEnabled()) {
+                                log.info("ZkDistributedCollectDataFuture added new result [node=" + nodeOrder +
+                                    ", remaining=" + remaining +
+                                    ", futPath=" + path + ']');
+                            }
+
+                            if (remaining == 0)
+                                completeAndNotifyListener();
+                        }
+                    }
+                }
+
+                onProcessEnd();
+            }
+            catch (Throwable e) {
+                onProcessError(e);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkForceNodeFailMessage.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkForceNodeFailMessage.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkForceNodeFailMessage.java
new file mode 100644
index 0000000..de7291c
--- /dev/null
+++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkForceNodeFailMessage.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Zk Force Node Fail Message.
+ */
+public class ZkForceNodeFailMessage implements DiscoverySpiCustomMessage, ZkInternalMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    final long nodeInternalId;
+
+    /** */
+    final String warning;
+
+    /**
+     * @param nodeInternalId Node ID.
+     * @param warning Warning to be displayed on all nodes.
+     */
+    ZkForceNodeFailMessage(long nodeInternalId, String warning) {
+        this.nodeInternalId = nodeInternalId;
+        this.warning = warning;
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public DiscoverySpiCustomMessage ackMessage() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isMutable() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean stopProcess() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(ZkForceNodeFailMessage.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
new file mode 100644
index 0000000..9caf00f
--- /dev/null
+++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.util.UUID;
+
+/**
+ *
+ */
+class ZkIgnitePaths {
+    /** */
+    static final String PATH_SEPARATOR = "/";
+
+    /** */
+    private static final byte CLIENT_NODE_FLAG_MASK = 0x01;
+
+    /** */
+    private static final int UUID_LEN = 36;
+
+    /** Directory to store joined node data. */
+    private static final String JOIN_DATA_DIR = "jd";
+
+    /** Directory to store new custom events. */
+    private static final String CUSTOM_EVTS_DIR = "ce";
+
+    /** Directory to store parts of multi-parts custom events. */
+    private static final String CUSTOM_EVTS_PARTS_DIR = "cp";
+
+    /** Directory to store acknowledge messages for custom events. */
+    private static final String CUSTOM_EVTS_ACKS_DIR = "ca";
+
+    /** Directory to store EPHEMERAL znodes for alive cluster nodes. */
+    static final String ALIVE_NODES_DIR = "n";
+
+    /** Path to store discovery events {@link ZkDiscoveryEventsData}. */
+    private static final String DISCO_EVENTS_PATH = "e";
+
+    /** */
+    final String clusterDir;
+
+    /** */
+    final String aliveNodesDir;
+
+    /** */
+    final String joinDataDir;
+
+    /** */
+    final String evtsPath;
+
+    /** */
+    final String customEvtsDir;
+
+    /** */
+    final String customEvtsPartsDir;
+
+    /** */
+    final String customEvtsAcksDir;
+
+    /**
+     * @param zkRootPath Base Zookeeper directory for all Ignite nodes.
+     */
+    ZkIgnitePaths(String zkRootPath) {
+        clusterDir = zkRootPath;
+
+        aliveNodesDir = zkPath(ALIVE_NODES_DIR);
+        joinDataDir = zkPath(JOIN_DATA_DIR);
+        evtsPath = zkPath(DISCO_EVENTS_PATH);
+        customEvtsDir = zkPath(CUSTOM_EVTS_DIR);
+        customEvtsPartsDir = zkPath(CUSTOM_EVTS_PARTS_DIR);
+        customEvtsAcksDir = zkPath(CUSTOM_EVTS_ACKS_DIR);
+    }
+
+    /**
+     * @param path Relative path.
+     * @return Full path.
+     */
+    private String zkPath(String path) {
+        return clusterDir + "/" + path;
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param prefixId Unique prefix ID.
+     * @return Path.
+     */
+    String joiningNodeDataPath(UUID nodeId, UUID prefixId) {
+        return joinDataDir + '/' + prefixId + ":" + nodeId.toString();
+    }
+
+    /**
+     * @param path Alive node zk path.
+     * @return Node internal ID.
+     */
+    static long aliveInternalId(String path) {
+        int idx = path.lastIndexOf('|');
+
+        return Integer.parseInt(path.substring(idx + 1));
+    }
+
+    /**
+     * @param prefix Node unique path prefix.
+     * @param node Node.
+     * @return Path.
+     */
+    String aliveNodePathForCreate(String prefix, ZookeeperClusterNode node) {
+        byte flags = 0;
+
+        if (node.isClient())
+            flags |= CLIENT_NODE_FLAG_MASK;
+
+        return aliveNodesDir + "/" + prefix + ":" + node.id() + ":" + encodeFlags(flags) + "|";
+    }
+
+    /**
+     * @param path Alive node zk path.
+     * @return {@code True} if node is client.
+     */
+    static boolean aliveNodeClientFlag(String path) {
+        return (aliveFlags(path) & CLIENT_NODE_FLAG_MASK) != 0;
+    }
+
+    /**
+     * @param path Alive node zk path.
+     * @return Node ID.
+     */
+    static UUID aliveNodePrefixId(String path) {
+        return UUID.fromString(path.substring(0, ZkIgnitePaths.UUID_LEN));
+    }
+
+    /**
+     * @param path Alive node zk path.
+     * @return Node ID.
+     */
+    static UUID aliveNodeId(String path) {
+        // <uuid prefix>:<node id>:<flags>|<alive seq>
+        int startIdx = ZkIgnitePaths.UUID_LEN + 1;
+
+        String idStr = path.substring(startIdx, startIdx + ZkIgnitePaths.UUID_LEN);
+
+        return UUID.fromString(idStr);
+    }
+
+    /**
+     * @param path Event zk path.
+     * @return Event sequence number.
+     */
+    static int customEventSequence(String path) {
+        int idx = path.lastIndexOf('|');
+
+        return Integer.parseInt(path.substring(idx + 1));
+    }
+
+    /**
+     * @param path Custom event zl path.
+     * @return Event node ID.
+     */
+    static UUID customEventSendNodeId(String path) {
+        // <uuid prefix>:<node id>:<partCnt>|<seq>
+        int startIdx = ZkIgnitePaths.UUID_LEN + 1;
+
+        String idStr = path.substring(startIdx, startIdx + ZkIgnitePaths.UUID_LEN);
+
+        return UUID.fromString(idStr);
+    }
+
+    /**
+     * @param path Event path.
+     * @return Event unique prefix.
+     */
+    static String customEventPrefix(String path) {
+        // <uuid prefix>:<node id>:<partCnt>|<seq>
+
+        return path.substring(0, ZkIgnitePaths.UUID_LEN);
+    }
+
+    /**
+     * @param path Custom event zl path.
+     * @return Event node ID.
+     */
+    static int customEventPartsCount(String path) {
+        // <uuid prefix>:<node id>:<partCnt>|<seq>
+        int startIdx = 2 * ZkIgnitePaths.UUID_LEN + 2;
+
+        String cntStr = path.substring(startIdx, startIdx + 4);
+
+        int partCnt = Integer.parseInt(cntStr);
+
+        assert partCnt >= 1 : partCnt;
+
+        return partCnt;
+    }
+
+    /**
+     * @param prefix Prefix.
+     * @param nodeId Node ID.
+     * @param partCnt Parts count.
+     * @return Path.
+     */
+    String createCustomEventPath(String prefix, UUID nodeId, int partCnt) {
+        return customEvtsDir + "/" + prefix + ":" + nodeId + ":" + String.format("%04d", partCnt) + '|';
+    }
+
+    /**
+     * @param prefix Prefix.
+     * @param nodeId Node ID.
+     * @return Path.
+     */
+    String customEventPartsBasePath(String prefix, UUID nodeId) {
+        return customEvtsPartsDir + "/" + prefix + ":" + nodeId + ":";
+    }
+
+    /**
+     * @param prefix Prefix.
+     * @param nodeId Node ID.
+     * @param part Part number.
+     * @return Path.
+     */
+    String customEventPartPath(String prefix, UUID nodeId, int part) {
+        return customEventPartsBasePath(prefix, nodeId) + String.format("%04d", part);
+    }
+
+    /**
+     * @param evtId Event ID.
+     * @return Event zk path.
+     */
+    String joinEventDataPathForJoined(long evtId) {
+        return evtsPath + "/fj-" + evtId;
+    }
+
+    /**
+     * @param topVer Event topology version.
+     * @return Event zk path.
+     */
+    String joinEventSecuritySubjectPath(long topVer) {
+        return evtsPath + "/s-" + topVer;
+    }
+
+    /**
+     * @param origEvtId ID of original custom event.
+     * @return Path for custom event ack.
+     */
+    String ackEventDataPath(long origEvtId) {
+        assert origEvtId != 0;
+
+        return customEvtsAcksDir + "/" + String.valueOf(origEvtId);
+    }
+
+    /**
+     * @param id Future ID.
+     * @return Future path.
+     */
+    String distributedFutureBasePath(UUID id) {
+        return evtsPath + "/f-" + id;
+    }
+
+    /**
+     * @param id Future ID.
+     * @return Future path.
+     */
+    String distributedFutureResultPath(UUID id) {
+        return evtsPath + "/fr-" + id;
+    }
+
+    /**
+     * @param flags Flags.
+     * @return Flags string.
+     */
+    private static String encodeFlags(byte flags) {
+        int intVal = flags + 128;
+
+        String str = Integer.toString(intVal, 16);
+
+        if (str.length() == 1)
+            str = '0' + str;
+
+        assert str.length() == 2  : str;
+
+        return str;
+    }
+
+    /**
+     * @param path Alive node zk path.
+     * @return Flags.
+     */
+    private static byte aliveFlags(String path) {
+        int startIdx = path.lastIndexOf(':') + 1;
+
+        String flagsStr = path.substring(startIdx, startIdx + 2);
+
+        return (byte)(Integer.parseInt(flagsStr, 16) - 128);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalJoinErrorMessage.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalJoinErrorMessage.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalJoinErrorMessage.java
new file mode 100644
index 0000000..a73312c
--- /dev/null
+++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalJoinErrorMessage.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+/**
+ *
+ */
+class ZkInternalJoinErrorMessage implements ZkInternalMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    transient boolean notifyNode = true;
+
+    /** */
+    final long nodeInternalId;
+
+    /** */
+    final String err;
+
+    /**
+     * @param nodeInternalId Joining node internal ID.
+     * @param err Error message.
+     */
+    ZkInternalJoinErrorMessage(long nodeInternalId, String err) {
+        this.nodeInternalId = nodeInternalId;
+        this.err = err;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalMessage.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalMessage.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalMessage.java
new file mode 100644
index 0000000..c1d56f0
--- /dev/null
+++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalMessage.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.io.Serializable;
+
+/**
+ *
+ */
+interface ZkInternalMessage extends Serializable {
+    // No-op.
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoinEventDataForJoined.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoinEventDataForJoined.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoinEventDataForJoined.java
new file mode 100644
index 0000000..e4ae4ba0
--- /dev/null
+++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoinEventDataForJoined.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+class ZkJoinEventDataForJoined implements Serializable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private final List<ZookeeperClusterNode> top;
+
+    /** */
+    private final Map<Long, byte[]> discoData;
+
+    /** */
+    private final Map<Long, Long> dupDiscoData;
+
+    /**
+     * @param top Topology.
+     * @param discoData Discovery data.
+     */
+    ZkJoinEventDataForJoined(List<ZookeeperClusterNode> top, Map<Long, byte[]> discoData, @Nullable Map<Long, Long> dupDiscoData) {
+        assert top != null;
+        assert discoData != null && !discoData.isEmpty();
+
+        this.top = top;
+        this.discoData = discoData;
+        this.dupDiscoData = dupDiscoData;
+    }
+
+    byte[] discoveryDataForNode(long nodeOrder) {
+        assert discoData != null;
+
+        byte[] dataBytes = discoData.get(nodeOrder);
+
+        if (dataBytes != null)
+            return dataBytes;
+
+        assert dupDiscoData != null;
+
+        Long dupDataNode = dupDiscoData.get(nodeOrder);
+
+        assert dupDataNode != null;
+
+        dataBytes = discoData.get(dupDataNode);
+
+        assert dataBytes != null;
+
+        return dataBytes;
+    }
+
+    /**
+     * @return Current topology.
+     */
+    List<ZookeeperClusterNode> topology() {
+        assert top != null;
+
+        return top;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoinedNodeEvtData.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoinedNodeEvtData.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoinedNodeEvtData.java
new file mode 100644
index 0000000..3c367cf
--- /dev/null
+++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoinedNodeEvtData.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.io.Serializable;
+import java.util.UUID;
+
+/**
+ * Zk Joined Node Evt Data.
+ */
+public class ZkJoinedNodeEvtData implements Serializable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    final long topVer;
+
+    /** */
+    final long joinedInternalId;
+
+    /** */
+    final UUID nodeId;
+
+    /** */
+    final int joinDataPartCnt;
+
+    /** */
+    final int secSubjPartCnt;
+
+    /** */
+    final UUID joinDataPrefixId;
+
+    /** */
+    transient ZkJoiningNodeData joiningNodeData;
+
+    /**
+     * @param topVer Topology version for node join event.
+     * @param nodeId Joined node ID.
+     * @param joinedInternalId Joined node internal ID.
+     * @param joinDataPrefixId Join data unique prefix.
+     * @param joinDataPartCnt Join data part count.
+     * @param secSubjPartCnt Security subject part count.
+     */
+    ZkJoinedNodeEvtData(
+        long topVer,
+        UUID nodeId,
+        long joinedInternalId,
+        UUID joinDataPrefixId,
+        int joinDataPartCnt,
+        int secSubjPartCnt)
+    {
+        this.topVer = topVer;
+        this.nodeId = nodeId;
+        this.joinedInternalId = joinedInternalId;
+        this.joinDataPrefixId = joinDataPrefixId;
+        this.joinDataPartCnt = joinDataPartCnt;
+        this.secSubjPartCnt = secSubjPartCnt;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return "ZkJoinedNodeData [id=" + nodeId + ", order=" + topVer + ']';
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoiningNodeData.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoiningNodeData.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoiningNodeData.java
new file mode 100644
index 0000000..ff8311d
--- /dev/null
+++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoiningNodeData.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.io.Serializable;
+import java.util.Map;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ *
+ */
+class ZkJoiningNodeData implements Serializable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private int partCnt;
+
+    /** */
+    @GridToStringInclude
+    private ZookeeperClusterNode node;
+
+    /** */
+    @GridToStringInclude
+    private Map<Integer, Serializable> discoData;
+
+    /**
+     * @param partCnt Number of parts in multi-parts message.
+     */
+    ZkJoiningNodeData(int partCnt) {
+        this.partCnt = partCnt;
+    }
+
+    /**
+     * @param node Node.
+     * @param discoData Discovery data.
+     */
+    ZkJoiningNodeData(ZookeeperClusterNode node, Map<Integer, Serializable> discoData) {
+        assert node != null && node.id() != null : node;
+        assert discoData != null;
+
+        this.node = node;
+        this.discoData = discoData;
+    }
+
+    /**
+     * @return Number of parts in multi-parts message.
+     */
+    int partCount() {
+        return partCnt;
+    }
+
+    /**
+     * @return Node.
+     */
+    ZookeeperClusterNode node() {
+        return node;
+    }
+
+    /**
+     * @return Discovery data.
+     */
+    Map<Integer, Serializable> discoveryData() {
+        return discoData;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(ZkJoiningNodeData.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkNoServersMessage.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkNoServersMessage.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkNoServersMessage.java
new file mode 100644
index 0000000..626fe74
--- /dev/null
+++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkNoServersMessage.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+class ZkNoServersMessage implements DiscoverySpiCustomMessage, ZkInternalMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** {@inheritDoc} */
+    @Nullable @Override public DiscoverySpiCustomMessage ackMessage() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isMutable() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean stopProcess() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(ZkNoServersMessage.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkNodeValidateResult.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkNodeValidateResult.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkNodeValidateResult.java
new file mode 100644
index 0000000..2abfee3
--- /dev/null
+++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkNodeValidateResult.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+/**
+ *
+ */
+class ZkNodeValidateResult {
+    /** */
+    String err;
+
+    /** */
+    byte[] secSubjZipBytes;
+
+    /**
+     * @param err Error.
+     */
+    ZkNodeValidateResult(String err) {
+        this.err = err;
+    }
+
+    /**
+     * @param secSubjZipBytes Marshalled security subject.
+     */
+    ZkNodeValidateResult(byte[] secSubjZipBytes) {
+        this.secSubjZipBytes = secSubjZipBytes;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRunnable.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRunnable.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRunnable.java
new file mode 100644
index 0000000..965bdc0
--- /dev/null
+++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRunnable.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+/**
+ * Zk Runnable.
+ */
+public abstract class ZkRunnable extends ZkAbstractCallabck implements Runnable {
+    /**
+     * @param rtState Runtime state.
+     * @param impl Discovery impl.
+     */
+    ZkRunnable(ZkRuntimeState rtState, ZookeeperDiscoveryImpl impl) {
+        super(rtState, impl);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void run() {
+        if (!onProcessStart())
+            return;
+
+        try {
+            run0();
+
+            onProcessEnd();
+        }
+        catch (Throwable e) {
+            onProcessError(e);
+        }
+    }
+
+    /**
+     *
+     */
+    protected abstract void run0() throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java
new file mode 100644
index 0000000..cb04ac3
--- /dev/null
+++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.util.List;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.spi.IgniteSpiTimeoutObject;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.Watcher;
+
+/**
+ *
+ */
+class ZkRuntimeState {
+    /** */
+    ZkWatcher watcher;
+
+    /** */
+    ZkAliveNodeDataWatcher aliveNodeDataWatcher;
+
+    /** */
+    volatile Exception errForClose;
+
+    /** */
+    final boolean prevJoined;
+
+    /** */
+    ZookeeperClient zkClient;
+
+    /** */
+    long internalOrder;
+
+    /** */
+    int joinDataPartCnt;
+
+    /** */
+    long gridStartTime;
+
+    /** */
+    volatile boolean joined;
+
+    /** */
+    ZkDiscoveryEventsData evtsData;
+
+    /** */
+    boolean crd;
+
+    /** */
+    String locNodeZkPath;
+
+    /** */
+    final ZkAliveNodeData locNodeInfo = new ZkAliveNodeData();
+
+    /** */
+    int procEvtCnt;
+
+    /** */
+    final ZkClusterNodes top = new ZkClusterNodes();
+
+    /** */
+    List<ClusterNode> commErrProcNodes;
+
+    /** Timeout callback registering watcher for join error
+     * (set this watcher after timeout as a minor optimization).
+     */
+    ZkTimeoutObject joinErrTo;
+
+    /** Timeout callback set to wait for join timeout. */
+    ZkTimeoutObject joinTo;
+
+    /** Timeout callback to update processed events counter. */
+    ZkTimeoutObject procEvtsUpdateTo;
+
+    /** */
+    boolean updateAlives;
+
+    /**
+     * @param prevJoined {@code True} if joined topology before reconnect attempt.
+     */
+    ZkRuntimeState(boolean prevJoined) {
+        this.prevJoined = prevJoined;
+    }
+
+    /**
+     * @param watcher Watcher.
+     * @param aliveNodeDataWatcher Alive nodes data watcher.
+     */
+    void init(ZkWatcher watcher, ZkAliveNodeDataWatcher aliveNodeDataWatcher) {
+        this.watcher = watcher;
+        this.aliveNodeDataWatcher = aliveNodeDataWatcher;
+    }
+
+    /**
+     * @param err Error.
+     */
+    void onCloseStart(Exception err) {
+        assert err != null;
+
+        errForClose = err;
+
+        ZookeeperClient zkClient = this.zkClient;
+
+        if (zkClient != null)
+            zkClient.onCloseStart();
+    }
+
+    /**
+     *
+     */
+    interface ZkWatcher extends Watcher, AsyncCallback.Children2Callback, AsyncCallback.DataCallback {
+        // No-op.
+    }
+
+    /**
+     *
+     */
+    interface ZkAliveNodeDataWatcher extends Watcher, AsyncCallback.DataCallback {
+        // No-op.
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkTimeoutObject.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkTimeoutObject.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkTimeoutObject.java
new file mode 100644
index 0000000..4d3d5b4
--- /dev/null
+++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkTimeoutObject.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.spi.IgniteSpiTimeoutObject;
+
+/**
+ *
+ */
+abstract class ZkTimeoutObject implements IgniteSpiTimeoutObject {
+    /** */
+    private final IgniteUuid id = IgniteUuid.randomUuid();
+
+    /** */
+    private final long endTime;
+
+    /** */
+    volatile boolean cancelled;
+
+    /**
+     * @param timeout Timeout.
+     */
+    ZkTimeoutObject(long timeout) {
+        long endTime = timeout >= 0 ? System.currentTimeMillis() + timeout : Long.MAX_VALUE;
+
+        this.endTime = endTime >= 0 ? endTime : Long.MAX_VALUE;
+    }
+
+    /** {@inheritDoc} */
+    @Override public final IgniteUuid id() {
+        return id;
+    }
+
+    /** {@inheritDoc} */
+    @Override public final long endTime() {
+        return endTime;
+    }
+}