You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/11/30 13:47:42 UTC

[6/6] ignite git commit: zk

zk


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/827b7085
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/827b7085
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/827b7085

Branch: refs/heads/ignite-zk
Commit: 827b708545cf851f784998c5faef874bda8e0898
Parents: 4797181
Author: sboikov <sb...@gridgain.com>
Authored: Thu Nov 30 15:23:44 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Nov 30 16:47:05 2017 +0300

----------------------------------------------------------------------
 .../binary/CacheObjectBinaryProcessorImpl.java  |   3 +
 .../discovery/zk/internal/ZookeeperClient.java  |  44 ++
 .../zk/internal/ZookeeperDiscoveryImpl.java     | 500 ++++++++-----------
 .../ZookeeperDiscoverySpiBasicTest.java         |  31 ++
 4 files changed, 287 insertions(+), 291 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/827b7085/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
index ed4c520..e7f86cb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
@@ -440,6 +440,9 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
 
             BinaryMetadata mergedMeta = BinaryUtils.mergeMetadata(oldMeta, newMeta0);
 
+            if (oldMeta == mergedMeta)
+                return;
+
             MetadataUpdateResult res = transport.requestMetadataUpdate(mergedMeta).get();
 
             assert res != null;

http://git-wip-us.apache.org/repos/asf/ignite/blob/827b7085/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
index ea9b289..73547cf 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
@@ -521,6 +521,13 @@ public class ZookeeperClient implements Watcher {
 
     /**
      * @param path Path.
+     */
+    void deleteIfExistsAsync(String path) {
+        new DeleteIfExistsOperation(path).execute();
+    }
+
+    /**
+     * @param path Path.
      * @param watcher Watcher.
      * @param cb Callback.
      */
@@ -842,6 +849,43 @@ public class ZookeeperClient implements Watcher {
     /**
      *
      */
+    class DeleteIfExistsOperation implements AsyncCallback.VoidCallback, ZkAsyncOperation {
+        /** */
+        private final String path;
+
+        /**
+         * @param path Path.
+         */
+        DeleteIfExistsOperation(String path) {
+            this.path = path;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void execute() {
+            zk.delete(path, -1, this, null);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void processResult(int rc, String path, Object ctx) {
+            if (rc == KeeperException.Code.NONODE.intValue())
+                return;
+
+            if (needRetry(rc)) {
+                U.warn(log, "Failed to execute async operation, connection lost. Will retry after connection restore [" +
+                    "path=" + path + ']');
+
+                retryQ.add(this);
+            }
+            else if (rc == KeeperException.Code.SESSIONEXPIRED.intValue())
+                U.warn(log, "Failed to execute async operation, connection lost [path=" + path + ']');
+            else
+                assert rc == 0 : rc;
+        }
+    }
+
+    /**
+     *
+     */
     class CreateCallbackWrapper implements AsyncCallback.StringCallback {
         /** */
         final CreateOperation op;

http://git-wip-us.apache.org/repos/asf/ignite/blob/827b7085/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
index 0ecbaf3..38f9cbf 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
@@ -27,9 +27,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 import java.util.UUID;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteClientDisconnectedException;
@@ -127,16 +125,16 @@ public class ZookeeperDiscoveryImpl {
     private final int evtsAckThreshold;
 
     /** */
-    private ZkRuntimeState state = new ZkRuntimeState(false);
+    private ZkRuntimeState state;
 
     /** */
     private volatile ConnectionState connState = ConnectionState.STARTED;
 
     /** */
-    private ZkEventWorker evtWorker;
+    private final AtomicBoolean stop = new AtomicBoolean();
 
     /** */
-    private final AtomicBoolean stop = new AtomicBoolean();
+    private final Object stateMux = new Object();
 
     /**
      * @param log Logger.
@@ -213,7 +211,8 @@ public class ZookeeperDiscoveryImpl {
      * @return Ping result.
      */
     public boolean pingNode(UUID nodeId) {
-        checkState();
+        while (!busyLock.enterBusy())
+            checkState();
 
         // TODO ZK
         return node(nodeId) != null;
@@ -225,14 +224,96 @@ public class ZookeeperDiscoveryImpl {
     public void reconnect() {
         assert clientReconnectEnabled;
 
-        evtWorker.onReconnectRequest();
+        synchronized (stateMux) {
+            if (connState == ConnectionState.STARTED)
+                connState = ConnectionState.DISCONNECTED;
+            else
+                return;
+        }
+
+        state.zkClient.onCloseStart();
+
+        busyLock.block();
+
+        busyLock.unblock();
+
+        state.zkClient.close();
+
+        UUID newId = UUID.randomUUID();
+
+        U.quietAndWarn(log, "Local node will try to reconnect to cluster with new id due to network problems [" +
+            "newId=" + newId +
+            ", prevId=" + locNode.id() +
+            ", locNode=" + locNode + ']');
+
+        doReconnect(newId);
+    }
+
+    /**
+     * @param newId New ID.
+     */
+    private void doReconnect(UUID newId) {
+        locNode.onClientDisconnected(newId);
+
+        if (state.joined) {
+            assert state.evtsData != null;
+
+            lsnr.onDiscovery(EVT_CLIENT_NODE_DISCONNECTED,
+                state.evtsData.topVer,
+                locNode,
+                state.top.topologySnapshot(),
+                Collections.<Long, Collection<ClusterNode>>emptyMap(),
+                null);
+        }
+
+        try {
+            joinTopology0(state.joined);
+        }
+        catch (Exception e) {
+            U.error(log, "Failed to reconnect: " + e, e);
+
+            onSegmented(e);
+        }
+    }
+
+
+    /**
+     * @param e Error.
+     */
+    private void onSegmented(Exception e) {
+        if (state.joined) {
+            synchronized (stateMux) {
+                connState = ConnectionState.STOPPED;
+            }
+
+            zkClient().zk().sync(zkPaths.clusterDir, new SegmentedWatcher(), null);
+        }
+        else
+            joinFut.onDone(e);
+    }
+
+    /**
+     *
+     */
+    class SegmentedWatcher implements AsyncCallback.VoidCallback {
+        @Override public void processResult(int rc, String path, Object ctx) {
+            assert state.evtsData != null;
+
+            lsnr.onDiscovery(EventType.EVT_NODE_SEGMENTED,
+                state.evtsData.topVer,
+                locNode,
+                state.top.topologySnapshot(),
+                Collections.<Long, Collection<ClusterNode>>emptyMap(),
+                null);
+        }
     }
 
     /**
      * @return Remote nodes.
      */
     public Collection<ClusterNode> remoteNodes() {
-        checkState();
+        while (!busyLock.enterBusy())
+            checkState();
 
         return state.top.remoteNodes();
     }
@@ -258,14 +339,9 @@ public class ZookeeperDiscoveryImpl {
      * @return {@code True} if node joined or joining topology.
      */
     public boolean knownNode(UUID nodeId) {
-        checkState();
-
-        if (!busyLock.enterBusy()) {
+        while (!busyLock.enterBusy())
             checkState();
 
-            throw new IgniteSpiException("Zookeeper client closed.");
-        }
-
         try {
             List<String> children = state.zkClient.getChildren(zkPaths.aliveNodesDir);
 
@@ -295,8 +371,6 @@ public class ZookeeperDiscoveryImpl {
      * @param msg Message.
      */
     public void sendCustomMessage(DiscoverySpiCustomMessage msg) {
-        checkState();
-
         assert msg != null;
 
         byte[] msgBytes;
@@ -308,12 +382,9 @@ public class ZookeeperDiscoveryImpl {
             throw new IgniteSpiException("Failed to marshal custom message: " + msg, e);
         }
 
-        if (!busyLock.enterBusy()) {
+        while (!busyLock.enterBusy())
             checkState();
 
-            throw new IgniteSpiException("Zookeeper client closed.");
-        }
-
         try {
             String prefix = UUID.randomUUID().toString();
 
@@ -367,7 +438,9 @@ public class ZookeeperDiscoveryImpl {
     /**
      * @throws InterruptedException If interrupted.
      */
-    private void joinTopology0(boolean reconnect) throws InterruptedException {
+    private void joinTopology0(boolean prevJoined) throws InterruptedException {
+        state = new ZkRuntimeState(prevJoined);
+
         DiscoveryDataBag discoDataBag = new DiscoveryDataBag(locNode.id());
 
         exchange.collect(discoDataBag);
@@ -394,14 +467,6 @@ public class ZookeeperDiscoveryImpl {
             throw new IgniteSpiException("Failed to create Zookeeper client", e);
         }
 
-        if (!reconnect) {
-            evtWorker = new ZkEventWorker(igniteInstanceName, "zookeeper-disco-evt-worker", log);
-
-            evtWorker.start();
-        }
-        else
-            assert evtWorker != null;
-
         startJoin(joinDataBytes);
     }
 
@@ -822,37 +887,32 @@ public class ZookeeperDiscoveryImpl {
         if (log.isInfoEnabled())
             log.info("Delete join data: " + joinDataPath);
 
-        // TODO ZK async
-        state.zkClient.deleteIfExists(joinDataPath, -1);
+        state.zkClient.deleteIfExistsAsync(joinDataPath);
 
         final List<ClusterNode> topSnapshot = Collections.singletonList((ClusterNode)locNode);
 
-        evtWorker.evtsQ.add(new Runnable() {
-            @Override public void run() {
-                if (connState == ConnectionState.DISCONNECTED)
-                    connState = ConnectionState.STARTED;
-
-                lsnr.onDiscovery(EventType.EVT_NODE_JOINED,
-                    1L,
-                    locNode,
-                    topSnapshot,
-                    Collections.<Long, Collection<ClusterNode>>emptyMap(),
-                    null);
-
-                if (state.prevJoined) {
-                    lsnr.onDiscovery(EVT_CLIENT_NODE_RECONNECTED,
-                        1L,
-                        locNode,
-                        topSnapshot,
-                        Collections.<Long, Collection<ClusterNode>>emptyMap(),
-                        null);
-
-                    U.quietAndWarn(log, "Client node was reconnected after it was already considered failed.");
-                }
+        if (connState == ConnectionState.DISCONNECTED)
+            connState = ConnectionState.STARTED;
 
-                joinFut.onDone();
-            }
-        });
+        lsnr.onDiscovery(EventType.EVT_NODE_JOINED,
+            1L,
+            locNode,
+            topSnapshot,
+            Collections.<Long, Collection<ClusterNode>>emptyMap(),
+            null);
+
+        if (state.prevJoined) {
+            lsnr.onDiscovery(EVT_CLIENT_NODE_RECONNECTED,
+                1L,
+                locNode,
+                topSnapshot,
+                Collections.<Long, Collection<ClusterNode>>emptyMap(),
+                null);
+
+            U.quietAndWarn(log, "Client node was reconnected after it was already considered failed.");
+        }
+
+        joinFut.onDone();
     }
 
     /**
@@ -948,8 +1008,8 @@ public class ZookeeperDiscoveryImpl {
 
                         state.evtsData.addEvent(state.top.nodesByOrder.values(), evtData, null);
 
-                        if (log.isInfoEnabled())
-                            log.info("Generated CUSTOM event [evt=" + evtData + ", msg=" + msg + ']');
+                        if (log.isDebugEnabled())
+                            log.debug("Generated CUSTOM event [evt=" + evtData + ", msg=" + msg + ']');
                     }
                     catch (IgniteCheckedException e) {
                         U.error(log, "Failed to unmarshal custom discovery message: " + e, e);
@@ -958,7 +1018,7 @@ public class ZookeeperDiscoveryImpl {
                 else {
                     U.warn(log, "Ignore custom event from unknown node: " + sndNodeId);
 
-                    state.zkClient.deleteIfExists(evtDataPath, -1);
+                    state.zkClient.deleteIfExistsAsync(evtDataPath);
                 }
 
                 state.evtsData.procCustEvt = evtE.getKey();
@@ -1005,7 +1065,7 @@ public class ZookeeperDiscoveryImpl {
      * @throws Exception If failed.
      */
     @SuppressWarnings("unchecked")
-    private void processNewEvents(ZkDiscoveryEventsData evtsData) throws Exception {
+    private void processNewEvents(final ZkDiscoveryEventsData evtsData) throws Exception {
         TreeMap<Long, ZkDiscoveryEventData> evts = evtsData.evts;
 
         boolean updateNodeInfo = false;
@@ -1026,8 +1086,8 @@ public class ZookeeperDiscoveryImpl {
                     processLocalJoin(evtsData, evtData0);
             }
             else {
-                if (log.isInfoEnabled())
-                    log.info("New discovery event data [evt=" + evtData + ", evtsHist=" + evts.size() + ']');
+                if (log.isDebugEnabled())
+                    log.debug("New discovery event data [evt=" + evtData + ", evtsHist=" + evts.size() + ']');
 
                 switch (evtData.eventType()) {
                     case EventType.EVT_NODE_JOINED: {
@@ -1165,40 +1225,35 @@ public class ZookeeperDiscoveryImpl {
 
         final List<ClusterNode> topSnapshot = state.top.topologySnapshot();
 
-        evtWorker.evtsQ.add(new Runnable() {
-            @Override public void run() {
-                if (connState == ConnectionState.DISCONNECTED)
-                    connState = ConnectionState.STARTED;
-
-                lsnr.onDiscovery(evtData.eventType(),
-                    evtData.topologyVersion(),
-                    locNode,
-                    topSnapshot,
-                    Collections.<Long, Collection<ClusterNode>>emptyMap(),
-                    null);
-
-                if (state.prevJoined) {
-                    lsnr.onDiscovery(EVT_CLIENT_NODE_RECONNECTED,
-                        evtData.topologyVersion(),
-                        locNode,
-                        topSnapshot,
-                        Collections.<Long, Collection<ClusterNode>>emptyMap(),
-                        null);
-
-                    U.quietAndWarn(log, "Client node was reconnected after it was already considered failed.");
-                }
+        if (connState == ConnectionState.DISCONNECTED)
+            connState = ConnectionState.STARTED;
 
-                joinFut.onDone();
-            }
-        });
+        lsnr.onDiscovery(evtData.eventType(),
+            evtData.topologyVersion(),
+            locNode,
+            topSnapshot,
+            Collections.<Long, Collection<ClusterNode>>emptyMap(),
+            null);
+
+        if (state.prevJoined) {
+            lsnr.onDiscovery(EVT_CLIENT_NODE_RECONNECTED,
+                evtData.topologyVersion(),
+                locNode,
+                topSnapshot,
+                Collections.<Long, Collection<ClusterNode>>emptyMap(),
+                null);
+
+            U.quietAndWarn(log, "Client node was reconnected after it was already considered failed.");
+        }
+
+        joinFut.onDone();
 
         state.joined = true;
 
-        if (log.isInfoEnabled())
-            log.info("Delete data for joined: " + path);
+        if (log.isDebugEnabled())
+            log.debug("Delete data for joined: " + path);
 
-        // TODO ZK: async
-        state.zkClient.deleteIfExists(path, -1);
+        state.zkClient.deleteIfExistsAsync(path);
     }
 
     /**
@@ -1207,8 +1262,8 @@ public class ZookeeperDiscoveryImpl {
      */
     @SuppressWarnings("unchecked")
     private void notifyCustomEvent(final ZkDiscoveryCustomEventData evtData, final DiscoverySpiCustomMessage msg) {
-        if (log.isInfoEnabled())
-            log.info(" [topVer=" + evtData.topologyVersion() + ", msg=" + msg + ']');
+        if (log.isDebugEnabled())
+            log.debug(" [topVer=" + evtData.topologyVersion() + ", msg=" + msg + ']');
 
         final ZookeeperClusterNode sndNode = state.top.nodesById.get(evtData.sndNodeId);
 
@@ -1216,16 +1271,12 @@ public class ZookeeperDiscoveryImpl {
 
         final List<ClusterNode> topSnapshot = state.top.topologySnapshot();
 
-        evtWorker.evtsQ.add(new Runnable() {
-            @Override public void run() {
-                lsnr.onDiscovery(evtData.eventType(),
-                    evtData.topologyVersion(),
-                    sndNode,
-                    topSnapshot,
-                    Collections.<Long, Collection<ClusterNode>>emptyMap(),
-                    msg);
-            }
-        });
+        lsnr.onDiscovery(evtData.eventType(),
+            evtData.topologyVersion(),
+            sndNode,
+            topSnapshot,
+            Collections.<Long, Collection<ClusterNode>>emptyMap(),
+            msg);
     }
 
     /**
@@ -1245,16 +1296,12 @@ public class ZookeeperDiscoveryImpl {
 
         final List<ClusterNode> topSnapshot = state.top.topologySnapshot();
 
-        evtWorker.evtsQ.add(new Runnable() {
-            @Override public void run() {
-                lsnr.onDiscovery(evtData.eventType(),
-                    evtData.topologyVersion(),
-                    joinedNode,
-                    topSnapshot,
-                    Collections.<Long, Collection<ClusterNode>>emptyMap(),
-                    null);
-            }
-        });
+        lsnr.onDiscovery(evtData.eventType(),
+            evtData.topologyVersion(),
+            joinedNode,
+            topSnapshot,
+            Collections.<Long, Collection<ClusterNode>>emptyMap(),
+            null);
     }
 
     /**
@@ -1268,16 +1315,12 @@ public class ZookeeperDiscoveryImpl {
 
         final List<ClusterNode> topSnapshot = state.top.topologySnapshot();
 
-        evtWorker.evtsQ.add(new Runnable() {
-            @Override public void run() {
-                lsnr.onDiscovery(evtData.eventType(),
-                    evtData.topologyVersion(),
-                    failedNode,
-                    topSnapshot,
-                    Collections.<Long, Collection<ClusterNode>>emptyMap(),
-                    null);
-            }
-        });
+        lsnr.onDiscovery(evtData.eventType(),
+            evtData.topologyVersion(),
+            failedNode,
+            topSnapshot,
+            Collections.<Long, Collection<ClusterNode>>emptyMap(),
+            null);
     }
 
     /**
@@ -1326,8 +1369,8 @@ public class ZookeeperDiscoveryImpl {
 
                             String path = zkPaths.ackEventDataPath(evtId);
 
-                            if (log.isInfoEnabled())
-                                log.info("Create ack event: " + path);
+                            if (log.isDebugEnabled())
+                                log.debug("Create ack event: " + path);
 
                             // TODO ZK: delete is previous exists?
                             state.zkClient.createIfNeeded(
@@ -1349,9 +1392,10 @@ public class ZookeeperDiscoveryImpl {
 
                             newEvts.add(ackEvtData);
 
-                            if (log.isInfoEnabled()) {
-                                log.info("Generated CUSTOM event ack [baseEvtId=" + evtData.eventId() +
+                            if (log.isDebugEnabled()) {
+                                log.debug("Generated CUSTOM event ack [baseEvtId=" + evtData.eventId() +
                                     ", evt=" + ackEvtData +
+                                    ", evtSize=" + ackBytes.length +
                                     ", msg=" + ack + ']');
                             }
                         }
@@ -1360,8 +1404,8 @@ public class ZookeeperDiscoveryImpl {
                     }
 
                     case EventType.EVT_NODE_FAILED: {
-                        if (log.isInfoEnabled())
-                            log.info("All nodes processed node fail [evtData=" + evtData + ']');
+                        if (log.isDebugEnabled())
+                            log.debug("All nodes processed node fail [evtData=" + evtData + ']');
 
                         break; // Do not need addition cleanup.
                     }
@@ -1417,8 +1461,8 @@ public class ZookeeperDiscoveryImpl {
         if (log.isInfoEnabled())
             log.info("Delete processed event data [path1=" + evtDataPath + ", path2=" + dataForJoinedPath + ']');
 
-        state.zkClient.deleteIfExists(evtDataPath, -1);
-        state.zkClient.deleteIfExists(dataForJoinedPath, -1);
+        state.zkClient.deleteIfExistsAsync(evtDataPath);
+        state.zkClient.deleteIfExistsAsync(dataForJoinedPath);
     }
 
     /**
@@ -1429,15 +1473,16 @@ public class ZookeeperDiscoveryImpl {
     @Nullable private DiscoverySpiCustomMessage handleProcessedCustomEvent(String ctx, ZkDiscoveryCustomEventData evtData)
         throws Exception
     {
-        if (log.isInfoEnabled())
-            log.info("All nodes processed custom event [ctx=" + ctx + ", evtData=" + evtData + ']');
+        if (log.isDebugEnabled())
+            log.debug("All nodes processed custom event [ctx=" + ctx + ", evtData=" + evtData + ']');
 
         if (!evtData.ackEvent()) {
             String path = zkPaths.customEventDataPath(false, evtData.evtPath);
 
-            log.info("Delete path: " + path);
+            if (log.isDebugEnabled())
+                log.debug("Delete path: " + path);
 
-            state.zkClient.deleteIfExists(path, -1);
+            state.zkClient.deleteIfExistsAsync(path);
 
             assert evtData.msg != null || locNode.order() > evtData.topologyVersion() : evtData;
 
@@ -1447,9 +1492,10 @@ public class ZookeeperDiscoveryImpl {
         else {
             String path = zkPaths.ackEventDataPath(evtData.eventId());
 
-            log.info("Delete path: " + path);
+            if (log.isDebugEnabled())
+                log.debug("Delete path: " + path);
 
-            state.zkClient.deleteIfExists(path, -1);
+            state.zkClient.deleteIfExistsAsync(path);
         }
 
         return null;
@@ -1472,7 +1518,9 @@ public class ZookeeperDiscoveryImpl {
 
         log.info("Stop ZookeeperDiscovery [nodeId=" + locNode.id() + ", err=" + e + ']');
 
-        connState = ConnectionState.STOPPED;
+        synchronized (stateMux) {
+            connState = ConnectionState.STOPPED;
+        }
 
         ZookeeperClient zkClient = state.zkClient;
 
@@ -1481,18 +1529,12 @@ public class ZookeeperDiscoveryImpl {
 
         busyLock.block();
 
+        busyLock.unblock();
+
         joinFut.onDone(e);
 
         if (zkClient != null)
             zkClient.close();
-
-        ZkEventWorker evtWorker = this.evtWorker;
-
-        if (evtWorker != null) {
-            evtWorker.interrupt();
-
-            evtWorker.join();
-        }
     }
 
     /**
@@ -1552,165 +1594,27 @@ public class ZookeeperDiscoveryImpl {
     /**
      *
      */
-    private class ZkEventWorker extends IgniteSpiThread {
-        /** */
-        private final Runnable RECONNECT = new Runnable() {@Override public void run() {}};
-
-        /** */
-        private final Runnable CONNECTION_LOST = new Runnable() {@Override public void run() {}};
-
-        /** */
-        private final BlockingQueue<Runnable> evtsQ;
-
-        /**
-         * @param igniteInstanceName Ignite instance name.
-         * @param name Thread name.
-         * @param log Logger.
-         */
-        ZkEventWorker(String igniteInstanceName, String name, IgniteLogger log) {
-            super(igniteInstanceName, name, log);
-
-            evtsQ = new LinkedBlockingQueue<>();
-        }
-
-        /** {@inheritDoc} */
-        @Override protected void body() throws InterruptedException {
-            while (!isInterrupted()) {
-                Runnable r = evtsQ.take();
-
-                if (r == RECONNECT)
-                    processReconnect();
-                if (r == CONNECTION_LOST)
-                    processConnectionLost();
-                else {
-                    if (!busyLock.enterBusy())
-                        return;
-
-                    try {
-                        r.run();
-                    }
-                    finally {
-                        busyLock.leaveBusy();
-                    }
-                }
-            }
-        }
-
+    private class ReconnectorThread extends IgniteSpiThread {
         /**
          *
          */
-        void onConnectionLoss() {
-            evtsQ.add(CONNECTION_LOST);
+        ReconnectorThread() {
+            super(ZookeeperDiscoveryImpl.this.igniteInstanceName, "zk-reconnector", log);
         }
 
-        /**
-         *
-         */
-        void onReconnectRequest() {
-            evtsQ.add(RECONNECT);
-        }
-
-        /**
-         *
-         */
-        private void processReconnect() {
-            assert locNode.isClient() : locNode;
-
-            if (connState == ConnectionState.DISCONNECTED)
-                return;
-
-            connState = ConnectionState.DISCONNECTED;
-
-            state.zkClient.onCloseStart();
-
+        @Override protected void body() throws InterruptedException {
             busyLock.block();
 
             busyLock.unblock();
 
-            state.zkClient.close();
-
             UUID newId = UUID.randomUUID();
 
-            U.quietAndWarn(log, "Local node will try to reconnect to cluster with new id due to network problems [" +
+            U.quietAndWarn(log, "Connection to Zookeeper server is lost, local node will try to reconnect with new id [" +
                 "newId=" + newId +
                 ", prevId=" + locNode.id() +
                 ", locNode=" + locNode + ']');
 
-            reconnect(newId);
-        }
-
-        /**
-         *
-         */
-        void processConnectionLost() {
-            if (clientReconnectEnabled) {
-                connState = ConnectionState.DISCONNECTED;
-
-                busyLock.block();
-
-                busyLock.unblock();
-
-                UUID newId = UUID.randomUUID();
-
-                U.quietAndWarn(log, "Connection to Zookeeper server is lost, local node will try to reconnect with new id [" +
-                    "newId=" + newId +
-                    ", prevId=" + locNode.id() +
-                    ", locNode=" + locNode + ']');
-
-                reconnect(newId);
-            }
-            else {
-                U.warn(log, "Connection to Zookeeper server is lost, local node SEGMENTED.");
-
-                onSegmented(new IgniteSpiException("Zookeeper connection loss."));
-            }
-        }
-
-        /**
-         * @param newId New ID.
-         */
-        private void reconnect(UUID newId) {
-            locNode.onClientDisconnected(newId);
-
-            if (state.joined) {
-                assert state.evtsData != null;
-
-                lsnr.onDiscovery(EVT_CLIENT_NODE_DISCONNECTED,
-                    state.evtsData.topVer,
-                    locNode,
-                    state.top.topologySnapshot(),
-                    Collections.<Long, Collection<ClusterNode>>emptyMap(),
-                    null);
-            }
-
-            state = new ZkRuntimeState(state.joined);
-
-            try {
-                joinTopology0(true);
-            }
-            catch (Exception e) {
-                U.error(log, "Failed to reconnect: " + e, e);
-
-                onSegmented(e);
-            }
-        }
-
-        /**
-         * @param e Error.
-         */
-        private void onSegmented(Exception e) {
-            if (state.joined) {
-                assert state.evtsData != null;
-
-                lsnr.onDiscovery(EventType.EVT_NODE_SEGMENTED,
-                    state.evtsData.topVer,
-                    locNode,
-                    state.top.topologySnapshot(),
-                    Collections.<Long, Collection<ClusterNode>>emptyMap(),
-                    null);
-            }
-            else
-                joinFut.onDone(e);
+            doReconnect(newId);
         }
     }
 
@@ -1723,7 +1627,21 @@ public class ZookeeperDiscoveryImpl {
 
         /** {@inheritDoc} */
         @Override public void run() {
-            evtWorker.onConnectionLoss();
+            if (clientReconnectEnabled) {
+                synchronized (stateMux) {
+                    if (connState == ConnectionState.STARTED)
+                        connState = ConnectionState.DISCONNECTED;
+                    else
+                        return;
+                }
+
+                new ReconnectorThread().start();
+            }
+            else {
+                U.warn(log, "Connection to Zookeeper server is lost, local node SEGMENTED.");
+
+                onSegmented(new IgniteSpiException("Zookeeper connection loss."));
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/827b7085/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
index 8ae84c1..d50e9b9 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.spi.discovery.zk.internal;
 
 import java.io.File;
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -306,6 +307,22 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
+    public void testMetadataUpdate() throws Exception {
+        startGrid(0);
+
+        GridTestUtils.runMultiThreaded(new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                ignite(0).configuration().getMarshaller().marshal(new C1());
+                ignite(0).configuration().getMarshaller().marshal(new C2());
+
+                return null;
+            }
+        }, 64, "marshal");
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testClientNodesStatus() throws Exception {
         startGrid(0);
 
@@ -1661,4 +1678,18 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest {
             return data;
         }
     }
+
+    /**
+     *
+     */
+    private static class C1 implements Serializable {
+        // No-op.
+    }
+
+    /**
+     *
+     */
+    private static class C2 implements Serializable {
+        // No-op.
+    }
 }