You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/11/14 13:02:33 UTC

[1/3] ignite git commit: zk

Repository: ignite
Updated Branches:
  refs/heads/ignite-zk 8ab3b5667 -> 73f5af60c


zk


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3736abe2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3736abe2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3736abe2

Branch: refs/heads/ignite-zk
Commit: 3736abe27a080ac7962dd8ede9e7b7414c4a82ae
Parents: 8ab3b56
Author: sboikov <sb...@gridgain.com>
Authored: Tue Nov 14 14:14:25 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Nov 14 14:14:25 2017 +0300

----------------------------------------------------------------------
 .../discovery/GridDiscoveryManager.java         |  5 ++-
 .../GridDhtPartitionsExchangeFuture.java        |  3 +-
 .../spi/discovery/zk/ZookeeperClusterNode.java  | 15 ++++++++-
 .../spi/discovery/zk/ZookeeperDiscoverySpi.java | 32 ++++++++++++++------
 .../zk/ZookeeperDiscoverySpiBasicTest.java      |  6 +++-
 5 files changed, 45 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/3736abe2/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 022dc97..a6737dc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -2522,9 +2522,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
 
             switch (type) {
                 case EVT_NODE_JOINED: {
-// TODO ZK
-//                    assert !discoOrdered || topVer.topologyVersion() == node.order() : "Invalid topology version [topVer=" + topVer +
-//                        ", node=" + node + ']';
+                    assert !discoOrdered || topVer.topologyVersion() == node.order() : "Invalid topology version [topVer=" + topVer +
+                        ", node=" + node + ']';
 
                     try {
                         checkAttributes(F.asList(node));

http://git-wip-us.apache.org/repos/asf/ignite/blob/3736abe2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 5d9186b..d29293e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -1598,8 +1598,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                 mergedJoinExchMsgs = new LinkedHashMap<>();
 
             if (msg != null) {
-                // TODO ZK
-                // assert msg.exchangeId().topologyVersion().equals(new AffinityTopologyVersion(node.order()));
+                assert msg.exchangeId().topologyVersion().equals(new AffinityTopologyVersion(node.order()));
 
                 if (log.isInfoEnabled()) {
                     log.info("Merge server join exchange, message received [curFut=" + initialVersion() +

http://git-wip-us.apache.org/repos/asf/ignite/blob/3736abe2/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperClusterNode.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperClusterNode.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperClusterNode.java
index dc0f374..ae638b8 100644
--- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperClusterNode.java
+++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperClusterNode.java
@@ -47,6 +47,9 @@ public class ZookeeperClusterNode implements ClusterNode, Serializable {
     private Serializable consistentId;
 
     /** */
+    private long internalOrder;
+
+    /** */
     private long order;
 
     /** */
@@ -154,9 +157,19 @@ public class ZookeeperClusterNode implements ClusterNode, Serializable {
         return order;
     }
 
+    public long internalOrder() {
+        return internalOrder;
+    }
+
     /**
-     * @param order Order of the node.
+     * @param internalOrder Order of the node.
      */
+    public void internalOrder(long internalOrder) {
+        assert internalOrder > 0 : internalOrder;
+
+        this.internalOrder = internalOrder;
+    }
+
     public void order(long order) {
         assert order > 0 : order;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/3736abe2/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
index ce04d33..bae183d 100644
--- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
+++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
@@ -380,12 +380,18 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery
 
                 zkCurator.getCuratorListenable().addListener(new CuratorListener() {
                     @Override public void eventReceived(CuratorFramework client, CuratorEvent evt) throws Exception {
-                        log.info("Curator event: " + evt.getType());
+                        IgniteLogger log = ZookeeperDiscoverySpi.this.log;
+
+                        if (log != null)
+                            log.info("Curator event: " + evt.getType());
                     }
                 });
                 zkCurator.getConnectionStateListenable().addListener(new ConnectionStateListener() {
                     @Override public void stateChanged(CuratorFramework client, ConnectionState newState) {
-                        log.info("Curator event, connection: " + newState);
+                        IgniteLogger log = ZookeeperDiscoverySpi.this.log;
+
+                        if (log != null)
+                            log.info("Curator event, connection: " + newState);
                     }
                 });
 
@@ -644,7 +650,7 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery
 
         assert joinData != null && joinData.node != null && joinData.joiningNodeData != null : joinData;
 
-        joinData.node.order(data.order);
+        joinData.node.internalOrder(data.order);
 
         data.joinData = joinData;
     }
@@ -823,7 +829,9 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery
                 ZKJoiningNodeData joinData = joined.joinData;
 
                 synchronized (curTop) {
-                    curTop.put(joinData.node.order(), joinData.node);
+                    joinData.node.order(v);
+
+                    curTop.put(joinData.node.internalOrder(), joinData.node);
 
                     ZKDiscoveryEvent joinEvt = new ZKDiscoveryEvent(EventType.EVT_NODE_JOINED,
                         v,
@@ -963,6 +971,9 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery
                         locJoin = e.evtType == EventType.EVT_NODE_JOINED && e.node.id().equals(locNode.id());
 
                         if (locJoin) {
+                            assert e.node.order() > 0 && e.node.internalOrder() > 0 : e.node;
+
+                            locNode.internalOrder(e.node.internalOrder());
                             locNode.order(e.node.order());
 
                             fireEvt = true;
@@ -980,9 +991,9 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery
                             synchronized (curTop) {
                                 if (locJoin) {
                                     for (ZookeeperClusterNode node : e.allNodes) {
-                                        assert node.order() > 0 : node;
+                                        assert node.internalOrder() > 0 : node;
 
-                                        Object old = curTop.put(node.order(), node);
+                                        Object old = curTop.put(node.internalOrder(), node);
 
                                         assert old == null : node;
                                     }
@@ -1006,7 +1017,7 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery
 
                                             exchange.onExchange(dataBag);
 
-                                            Object old = curTop.put(node.order(), node);
+                                            Object old = curTop.put(node.internalOrder(), node);
 
                                             assert old == null : node;
 
@@ -1016,7 +1027,7 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery
                                         case EventType.EVT_NODE_FAILED: {
                                             ZookeeperClusterNode node = e.node;
 
-                                            Object failedNode = curTop.remove(node.order());
+                                            Object failedNode = curTop.remove(node.internalOrder());
 
                                             assert failedNode != null : node;
 
@@ -1160,7 +1171,10 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery
     private class ZookeeperWatcher implements Watcher {
         /** {@inheritDoc} */
         @Override public void process(WatchedEvent event) {
-            log.info("Process event [type=" + event.getType() + ", state=" + event.getState() + ", path=" + event.getPath() + ']');
+            IgniteLogger log = ZookeeperDiscoverySpi.this.log;
+
+            if (log != null)
+                log.info("Process event [type=" + event.getType() + ", state=" + event.getState() + ", path=" + event.getPath() + ']');
 
             if (event.getType() == Event.EventType.NodeChildrenChanged) {
                 zk.getChildren(event.getPath(), this, nodesUpdateCallback, null);

http://git-wip-us.apache.org/repos/asf/ignite/blob/3736abe2/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java
index dfb2199..b4db065 100644
--- a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java
+++ b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java
@@ -78,6 +78,9 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest {
     private boolean testSockNio;
 
     /** */
+    private int sesTimeout;
+
+    /** */
     private ConcurrentHashMap<String, ZookeeperDiscoverySpi> spis = new ConcurrentHashMap<>();
 
     /** {@inheritDoc} */
@@ -91,7 +94,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest {
 
         ZookeeperDiscoverySpi zkSpi = new ZookeeperDiscoverySpi();
 
-        zkSpi.setSessionTimeout(10_000);
+        zkSpi.setSessionTimeout(sesTimeout > 0 ? sesTimeout : 10_000);
 
         spis.put(igniteInstanceName, zkSpi);
 
@@ -386,6 +389,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     private void connectionRestore_Coordinator(int initNodes, int startNodes, int failCnt) throws Exception {
+        sesTimeout = 30_000;
         testSockNio = true;
 
         Ignite node0 = startGrids(initNodes);


[2/3] ignite git commit: zk

Posted by sb...@apache.org.
zk


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1842bb4c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1842bb4c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1842bb4c

Branch: refs/heads/ignite-zk
Commit: 1842bb4c48ce245a5b69b669087590351de686fa
Parents: 3736abe
Author: sboikov <sb...@gridgain.com>
Authored: Tue Nov 14 15:07:55 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Nov 14 15:10:16 2017 +0300

----------------------------------------------------------------------
 .../spi/discovery/zk/ZookeeperDiscoverySpi.java | 33 +++++++++++---------
 1 file changed, 19 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/1842bb4c/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
index bae183d..80d563e 100644
--- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
+++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
@@ -97,6 +97,9 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery
     private static final String ALIVE_NODES_PATH = CLUSTER_PATH + "/alive";
 
     /** */
+    private static final String CUSTOM_EVTS_PATH = CLUSTER_PATH + "/customEvts";
+
+    /** */
     private static final byte[] EMPTY_BYTES = new byte[0];
 
     /** */
@@ -127,7 +130,7 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery
     private final JdkMarshaller marsh = new JdkMarshaller();
 
     /** */
-    private final NodesUpdateCallback nodesUpdateCallback;
+    private final ZKChildrenUpdateCallback zkChildrenUpdateCallback;
 
     /** */
     private final DataUpdateCallback dataUpdateCallback;
@@ -166,7 +169,7 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery
     public ZookeeperDiscoverySpi() {
         zkWatcher = new ZookeeperWatcher();
 
-        nodesUpdateCallback = new NodesUpdateCallback();
+        zkChildrenUpdateCallback = new ZKChildrenUpdateCallback();
         dataUpdateCallback = new DataUpdateCallback();
     }
 
@@ -307,7 +310,12 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery
     /** {@inheritDoc} */
     @Override public void sendCustomEvent(DiscoverySpiCustomMessage msg) throws IgniteException {
         // TODO ZK
-        //throw new UnsupportedOperationException();
+        try {
+            zkCurator.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath(CUSTOM_EVTS_PATH, marshal(msg));
+        }
+        catch (Exception e) {
+            throw new IgniteSpiException(e);
+        }
     }
 
     /** {@inheritDoc} */
@@ -318,8 +326,7 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery
 
     /** {@inheritDoc} */
     @Override public boolean isClientMode() throws IllegalStateException {
-        // TODO ZK
-        return false;
+        return locNode.isClient();
     }
 
     /**
@@ -354,11 +361,9 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery
     }
 
     private boolean igniteClusterStarted() throws Exception {
-        boolean started = zkCurator.checkExists().forPath(IGNITE_PATH) != null &&
+        return zkCurator.checkExists().forPath(IGNITE_PATH) != null &&
             zkCurator.checkExists().forPath(ALIVE_NODES_PATH) != null &&
             !zk.getChildren(ALIVE_NODES_PATH, false).isEmpty();
-
-        return started;
     }
 
     /** {@inheritDoc} */
@@ -454,8 +459,8 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery
             gridStartTime = clusterData.gridStartTime;
 
             zk.getData(EVENTS_PATH, zkWatcher, dataUpdateCallback, null);
-            zk.getChildren(JOIN_HIST_PATH, zkWatcher, nodesUpdateCallback, null);
-            zk.getChildren(ALIVE_NODES_PATH, zkWatcher, nodesUpdateCallback, null);
+            zk.getChildren(JOIN_HIST_PATH, zkWatcher, zkChildrenUpdateCallback, null);
+            zk.getChildren(ALIVE_NODES_PATH, zkWatcher, zkChildrenUpdateCallback, null);
 
             List<Op> joinOps = new ArrayList<>();
 
@@ -693,7 +698,7 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery
     /**
      *
      */
-    class NodesUpdateCallback implements AsyncCallback.Children2Callback {
+    class ZKChildrenUpdateCallback implements AsyncCallback.Children2Callback {
         @Override public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
             try {
                 if (children == null || children.isEmpty())
@@ -756,8 +761,8 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery
     private final TreeMap<Long, ZookeeperClusterNode> curTop = new TreeMap<>();
 
     /**
-     * @param oldNodes
-     * @param newNodes
+     * @param oldNodes Previous processed state.
+     * @param newNodes Current state.
      */
     private void generateEvents(ZKAliveNodes oldNodes, ZKAliveNodes newNodes) {
         assert newNodes != null;
@@ -1177,7 +1182,7 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery
                 log.info("Process event [type=" + event.getType() + ", state=" + event.getState() + ", path=" + event.getPath() + ']');
 
             if (event.getType() == Event.EventType.NodeChildrenChanged) {
-                zk.getChildren(event.getPath(), this, nodesUpdateCallback, null);
+                zk.getChildren(event.getPath(), this, zkChildrenUpdateCallback, null);
             } else if (event.getType() == Event.EventType.NodeDataChanged) {
                 zk.getData(event.getPath(), this, dataUpdateCallback, null);
             }


[3/3] ignite git commit: zk

Posted by sb...@apache.org.
zk


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/73f5af60
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/73f5af60
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/73f5af60

Branch: refs/heads/ignite-zk
Commit: 73f5af60cc6701a88712735f94f14f4fe0cdd92c
Parents: 1842bb4
Author: sboikov <sb...@gridgain.com>
Authored: Tue Nov 14 15:41:47 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Nov 14 16:01:20 2017 +0300

----------------------------------------------------------------------
 .../spi/discovery/zk/ZookeeperDiscoverySpi.java | 76 ++++++++++++++------
 1 file changed, 56 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/73f5af60/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
index 80d563e..cc0e6a4 100644
--- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
+++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.TreeSet;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -100,6 +101,9 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery
     private static final String CUSTOM_EVTS_PATH = CLUSTER_PATH + "/customEvts";
 
     /** */
+    private static final String DISCO_EVTS_HIST_PATH = CLUSTER_PATH + "/evtsHist";
+
+    /** */
     private static final byte[] EMPTY_BYTES = new byte[0];
 
     /** */
@@ -439,6 +443,8 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery
                                 initOps.add(Op.create(JOIN_HIST_PATH, EMPTY_BYTES, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
                                 initOps.add(Op.create(ALIVE_NODES_PATH, EMPTY_BYTES, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
                                 initOps.add(Op.create(EVENTS_PATH, EMPTY_BYTES, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
+                                initOps.add(Op.create(DISCO_EVTS_HIST_PATH, EMPTY_BYTES, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
+                                initOps.add(Op.create(CUSTOM_EVTS_PATH, EMPTY_BYTES, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
 
                                 zk.multi(initOps);
                             }
@@ -569,17 +575,13 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery
         final UUID nodeId;
 
         /** */
-        final String zkPath;
-
-        /** */
         transient ZKJoiningNodeData joinData;
 
         /**
          * @param order Node order.
          * @param nodeId Node ID.
          */
-        ZKNodeData(String zkPath, long order, UUID nodeId) {
-            this.zkPath = zkPath;
+        ZKNodeData(long order, UUID nodeId) {
             this.order = order;
             this.nodeId = nodeId;
         }
@@ -602,9 +604,6 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery
         @GridToStringInclude
         final TreeMap<Long, ZKNodeData> nodesByOrder;
 
-        /** */
-        final TreeMap<UUID, ZKNodeData> nodesById;
-
         /**
          * @param ver
          * @param nodesByOrder
@@ -612,11 +611,15 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery
         ZKAliveNodes(int ver, TreeMap<Long, ZKNodeData> nodesByOrder) {
             this.ver = ver;
             this.nodesByOrder = nodesByOrder;
+        }
 
-            nodesById = new TreeMap<>();
+        ZKNodeData nodeById(UUID nodeId) {
+            for (ZKNodeData nodeData : nodesByOrder.values()) {
+                if (nodeId.equals(nodeData.nodeId))
+                    return nodeData;
+            }
 
-            for (ZKNodeData nodeData : nodesByOrder.values())
-                nodesById.put(nodeData.nodeId, nodeData);
+            return null;
         }
 
         /** {@inheritDoc} */
@@ -636,7 +639,7 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery
 
         int nodeOrder = Integer.parseInt(path.substring(ID_LEN + 1)) + 1;
 
-        return new ZKNodeData(path, nodeOrder, nodeId);
+        return new ZKNodeData(nodeOrder, nodeId);
     }
 
     /** */
@@ -767,7 +770,7 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery
     private void generateEvents(ZKAliveNodes oldNodes, ZKAliveNodes newNodes) {
         assert newNodes != null;
 
-        ZKNodeData locNode = newNodes.nodesById.get(this.locNode.id());
+        ZKNodeData locNode = newNodes.nodeById(this.locNode.id());
 
         if (locNode == null)
             return;
@@ -912,15 +915,22 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery
         if (curCrdEvts == null) {
             expVer = 0;
 
-            newEvents = new ZKDiscoveryEvents(nextJoinOrder, newNodes, evts);
+            newEvents = new ZKDiscoveryEvents(nextJoinOrder, newNodes, new TreeSet<>(evts.keySet()));
         }
         else {
-            TreeMap<Integer, ZKDiscoveryEvent> evts0 = new TreeMap<>(curCrdEvts.evts);
+//            TreeMap<Integer, ZKDiscoveryEvent> evts0 = new TreeMap<>(curCrdEvts.evts);
+//
+//            for (ZKDiscoveryEvent e : evts.values()) {
+//                assert !evts0.containsKey(e.topVer) : "[newEvt=" + e + ", oldEvt=" + evts0.get(e.topVer) + ']';
+//
+//                evts0.put(e.topVer, e);
+//            }
+            TreeSet<Integer> evts0 = new TreeSet<>(curCrdEvts.evts);
 
             for (ZKDiscoveryEvent e : evts.values()) {
-                assert !evts0.containsKey(e.topVer) : "[newEvt=" + e + ", oldEvt=" + evts0.get(e.topVer) + ']';
+                assert !evts0.contains(e.topVer) : "[newEvt=" + e + ", oldEvts=" + evts0 + ']';
 
-                evts0.put(e.topVer, e);
+                evts0.add(e.topVer);
             }
 
             newEvents = new ZKDiscoveryEvents(nextJoinOrder, newNodes, evts0);
@@ -928,6 +938,14 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery
             expVer = curCrdEvts.ver;
         }
 
+        try {
+            for (ZKDiscoveryEvent evt : evts.values())
+                zkCurator.create().withMode(CreateMode.PERSISTENT).forPath(DISCO_EVTS_HIST_PATH + "/" + evt.topVer, marshal(evt));
+        }
+        catch (Exception e) {
+            U.error(log, "Failed to create discovery event node: " + e, e);
+        }
+
         newEvents.ver = expVer + 1;
 
         try {
@@ -951,6 +969,8 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery
     /** */
     private ZKDiscoveryEvent lastEvt;
 
+    private int lastProcessed = -1;
+
     /**
      *
      */
@@ -968,10 +988,26 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery
 
                 newEvts.ver = stat.getVersion();
 
-                for (ZKDiscoveryEvent e : newEvts.evts.values()) {
+                for (Integer evtVer : newEvts.evts) {
+                    if (evtVer > lastProcessed)
+                        lastProcessed = evtVer;
+                    else
+                        continue;
+
                     boolean fireEvt;
                     boolean locJoin = false;
 
+                    ZKDiscoveryEvent e;
+
+                    try {
+                        e = unmarshal(zkCurator.getData().forPath(DISCO_EVTS_HIST_PATH + "/" + evtVer));
+                    }
+                    catch (Exception err) {
+                        U.error(log, "Failed to read discovery event: " + err, err);
+
+                        continue;
+                    }
+
                     if (lastEvt == null) {
                         locJoin = e.evtType == EventType.EVT_NODE_JOINED && e.node.id().equals(locNode.id());
 
@@ -1095,7 +1131,7 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery
 
         /** */
         @GridToStringInclude
-        final TreeMap<Integer, ZKDiscoveryEvent> evts;
+        final TreeSet<Integer> evts;
 
         /** */
         final long nextJoinOrder;
@@ -1104,7 +1140,7 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery
          * @param aliveNodes
          * @param evts
          */
-        ZKDiscoveryEvents(long nextJoinOrder, ZKAliveNodes aliveNodes, TreeMap<Integer, ZKDiscoveryEvent> evts) {
+        ZKDiscoveryEvents(long nextJoinOrder, ZKAliveNodes aliveNodes, TreeSet<Integer> evts) {
             this.nextJoinOrder = nextJoinOrder;
             this.aliveNodes = aliveNodes;
             this.evts = evts;