You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/12/05 14:57:43 UTC

[2/2] ignite git commit: zk

zk


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/bbd5a889
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/bbd5a889
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/bbd5a889

Branch: refs/heads/ignite-zk
Commit: bbd5a889fc980e36c8c500ef221a551e451ad854
Parents: 961167a
Author: sboikov <sb...@gridgain.com>
Authored: Tue Dec 5 14:57:04 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Dec 5 17:57:35 2017 +0300

----------------------------------------------------------------------
 .../internal/ZkDiscoveryNodeJoinEventData.java  |  16 +-
 .../discovery/zk/internal/ZkIgnitePaths.java    |  37 +--
 .../zk/internal/ZkInternalJoinErrorMessage.java |   3 +
 .../zk/internal/ZkJoiningNodeData.java          |  15 +-
 .../discovery/zk/internal/ZkRuntimeState.java   |   3 +
 .../discovery/zk/internal/ZookeeperClient.java  |  45 +++
 .../zk/internal/ZookeeperDiscoveryImpl.java     | 289 ++++++++++++++-----
 .../zk/internal/ZookeeperClientTest.java        |  58 ++--
 8 files changed, 328 insertions(+), 138 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/bbd5a889/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java
index df4c137..fbf1fc8 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java
@@ -34,6 +34,12 @@ class ZkDiscoveryNodeJoinEventData extends ZkDiscoveryEventData {
     final UUID nodeId;
 
     /** */
+    final int joinDataPartCnt;
+
+    /** */
+    final UUID joinDataPrefixId;
+
+    /** */
     transient ZkJoiningNodeData joiningNodeData;
 
     /**
@@ -42,11 +48,19 @@ class ZkDiscoveryNodeJoinEventData extends ZkDiscoveryEventData {
      * @param nodeId Joined node ID.
      * @param joinedInternalId Joined node internal ID.
      */
-    ZkDiscoveryNodeJoinEventData(long evtId, long topVer, UUID nodeId, int joinedInternalId) {
+    ZkDiscoveryNodeJoinEventData(long evtId,
+        long topVer,
+        UUID nodeId,
+        int joinedInternalId,
+        UUID joinDataPrefixId,
+        int joinDataPartCnt)
+    {
         super(evtId, EventType.EVT_NODE_JOINED, topVer);
 
         this.nodeId = nodeId;
         this.joinedInternalId = joinedInternalId;
+        this.joinDataPrefixId = joinDataPrefixId;
+        this.joinDataPartCnt = joinDataPartCnt;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/bbd5a889/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
index 2478979..e52127a 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
@@ -151,14 +151,8 @@ class ZkIgnitePaths {
         return clusterDir + "/" + path;
     }
 
-    String joiningNodeDataPath(UUID nodeId, String aliveNodePath) {
-        int joinSeq = ZkIgnitePaths.aliveJoinDataSequence(aliveNodePath);
-
-        return joinDataDir + '/' +
-            ZkIgnitePaths.aliveNodePrefixId(aliveNodePath) + ":" +
-            nodeId.toString() +
-            "|" +
-            String.format("%010d", joinSeq);
+    String joiningNodeDataPath(UUID nodeId, UUID prefixId) {
+        return joinDataDir + '/' + prefixId + ":" + nodeId.toString();
     }
 
     /**
@@ -175,8 +169,8 @@ class ZkIgnitePaths {
      * @param path Alive node zk path.
      * @return Node ID.
      */
-    static String aliveNodePrefixId(String path) {
-        return path.substring(0, ZkIgnitePaths.UUID_LEN);
+    static UUID aliveNodePrefixId(String path) {
+        return UUID.fromString(path.substring(0, ZkIgnitePaths.UUID_LEN));
     }
 
     /**
@@ -184,7 +178,7 @@ class ZkIgnitePaths {
      * @return Node ID.
      */
     static UUID aliveNodeId(String path) {
-        // <uuid prefix>:<node id>|<join data seq>|<alive seq>
+        // <uuid prefix>:<node id>|<alive seq>
         int startIdx = ZkIgnitePaths.UUID_LEN + 1;
 
         String idStr = path.substring(startIdx, startIdx + ZkIgnitePaths.UUID_LEN);
@@ -193,17 +187,6 @@ class ZkIgnitePaths {
     }
 
     /**
-     * @param path Alive node zk path.
-     * @return Joined node sequence.
-     */
-    private static int aliveJoinDataSequence(String path) {
-        int idx2 = path.lastIndexOf('|');
-        int idx1 = path.lastIndexOf('|', idx2 - 1);
-
-        return Integer.parseInt(path.substring(idx1 + 1, idx2));
-    }
-
-    /**
      * @param path Event zk path.
      * @return Event sequence number.
      */
@@ -230,18 +213,14 @@ class ZkIgnitePaths {
      * @param evtId Event ID.
      * @return Event zk path.
      */
-    String joinEventDataPath(long evtId) {
-        return evtsPath + "/" + evtId;
+    String joinEventDataPathForJoined(long evtId) {
+        return evtsPath + "/joined-" + evtId;
     }
 
     /**
      * @param evtId Event ID.
-     * @return Event zk path.
+     * @return Path for custom event ack.
      */
-    String joinEventDataPathForJoined(long evtId) {
-        return evtsPath + "/joined-" + evtId;
-    }
-
     String ackEventDataPath(long evtId) {
         return customEventDataPath(true, String.valueOf(evtId));
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/bbd5a889/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalJoinErrorMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalJoinErrorMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalJoinErrorMessage.java
index e724673..6040c20 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalJoinErrorMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalJoinErrorMessage.java
@@ -25,6 +25,9 @@ class ZkInternalJoinErrorMessage implements ZkInternalMessage {
     private static final long serialVersionUID = 0L;
 
     /** */
+    transient boolean notifyNode = true;
+
+    /** */
     final int nodeInternalId;
 
     /** */

http://git-wip-us.apache.org/repos/asf/ignite/blob/bbd5a889/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoiningNodeData.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoiningNodeData.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoiningNodeData.java
index 6733ab6..284cbff 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoiningNodeData.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoiningNodeData.java
@@ -30,12 +30,19 @@ class ZkJoiningNodeData implements Serializable {
     private static final long serialVersionUID = 0L;
 
     /** */
+    private int partCnt;
+
+    /** */
     @GridToStringInclude
-    private final ZookeeperClusterNode node;
+    private ZookeeperClusterNode node;
 
     /** */
     @GridToStringInclude
-    private final Map<Integer, Serializable> discoData;
+    private Map<Integer, Serializable> discoData;
+
+    ZkJoiningNodeData(int partCnt) {
+        this.partCnt = partCnt;
+    }
 
     /**
      * @param node Node.
@@ -49,6 +56,10 @@ class ZkJoiningNodeData implements Serializable {
         this.discoData = discoData;
     }
 
+    int partCount() {
+        return partCnt;
+    }
+
     /**
      * @return Node.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/bbd5a889/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java
index 660dc42..4653109 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java
@@ -33,6 +33,9 @@ class ZkRuntimeState {
     int internalOrder;
 
     /** */
+    int joinDataPartCnt;
+
+    /** */
     IgniteSpiTimeoutObject joinTimeoutObj;
 
     /** */

http://git-wip-us.apache.org/repos/asf/ignite/blob/bbd5a889/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
index bc024f1..a806548 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
@@ -276,6 +276,51 @@ public class ZookeeperClient implements Watcher {
 
     }
 
+    /** */
+    private static final int MAX_REQ_SIZE = 1048528;
+
+    /**
+     * @param path Path.
+     * @param data Data.
+     * @return {@code True}
+     */
+    boolean needSplitNodeData(String path, byte[] data, int overhead) {
+        return requestOverhead(path) + data.length + overhead > MAX_REQ_SIZE;
+    }
+
+    List<byte[]> splitNodeData(String path, byte[] data, int overhead) {
+        int partSize = MAX_REQ_SIZE - requestOverhead(path) - overhead;
+
+        int partCnt = data.length / partSize;
+
+        if (data.length % partSize != 0)
+            partCnt++;
+
+        assert partCnt > 1 : "Do not need split";
+
+        List<byte[]> parts = new ArrayList<>(partCnt);
+
+        int remaining = data.length;
+
+        for (int i = 0; i < partCnt; i++) {
+            int partSize0 = Math.min(remaining, partSize);
+
+            byte[] part = new byte[partSize0];
+
+            System.arraycopy(data, i * partCnt, part, 0, part.length);
+
+            remaining -= partSize0;
+        }
+
+        assert remaining == 0 : remaining;
+
+        return parts;
+    }
+
+    private int requestOverhead(String path) {
+        return path.length();
+    }
+
     /**
      * @param path Path.
      * @param data Data.

http://git-wip-us.apache.org/repos/asf/ignite/blob/bbd5a889/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
index e246a35..366c162 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
@@ -564,6 +564,66 @@ public class ZookeeperDiscoveryImpl {
         }
     }
 
+    private void deleteMultiplePartsAsync(ZookeeperClient zkClient, String basePath, int partCnt) {
+        for (int i = 0; i < partCnt; i++) {
+            String path = multipartPathName(basePath, i);
+
+            zkClient.deleteIfExistsAsync(path);
+        }
+
+    }
+
+    private byte[] readMultipleParts(ZookeeperClient zkClient, String basePath, int partCnt)
+        throws Exception {
+        assert partCnt >= 1;
+
+        if (partCnt > 1) {
+            List<byte[]> parts = new ArrayList<>(partCnt);
+
+            int totSize = 0;
+
+            for (int i = 0; i < partCnt; i++) {
+                byte[] part = zkClient.getData(multipartPathName(basePath, i));
+
+                totSize += part.length;
+            }
+
+            byte[] res = new byte[totSize];
+
+            int pos = 0;
+
+            for (int i = 0; i < partCnt; i++) {
+                byte[] part = parts.get(i);
+
+                System.arraycopy(part, 0, res, pos, part.length);
+
+                pos += part.length;
+            }
+
+            return res;
+        }
+        else
+            return zkClient.getData(multipartPathName(basePath, 0));
+    }
+
+    private void saveMultipleParts(ZookeeperClient zkClient, String basePath, List<byte[]> parts)
+        throws ZookeeperClientFailedException, InterruptedException
+    {
+        assert parts.size() > 1;
+
+        for (int i = 0; i < parts.size(); i++) {
+            byte[] part = parts.get(i);
+
+            String path = multipartPathName(basePath, i);
+
+            zkClient.createIfNeeded(path, part, PERSISTENT);
+        }
+    }
+
+    private static String multipartPathName(String basePath, int part) {
+        return basePath + String.format("%04d", part);
+    }
+
     /**
      * @param joinDataBytes Joining node data.
      * @throws InterruptedException If interrupted.
@@ -577,23 +637,38 @@ public class ZookeeperDiscoveryImpl {
 
             String prefix = UUID.randomUUID().toString();
 
-            // TODO ZK: handle max size.
-
             final ZkRuntimeState rtState = this.rtState;
 
-            String joinDataPath = rtState.zkClient.createSequential(prefix,
-                zkPaths.joinDataDir,
-                prefix + ":" + locNode.id() + "|",
-                joinDataBytes,
-                EPHEMERAL_SEQUENTIAL);
+            ZookeeperClient zkClient = rtState.zkClient;
+
+            final int OVERHEAD = 5;
+
+            // TODO ZK: need clean up join data if failed before was able to create alive node.
+            String joinDataPath = zkPaths.joinDataDir + "/" + prefix + ":" + locNode.id();
 
-            // TODO ZK: no need to use sequential
-            int seqNum = Integer.parseInt(joinDataPath.substring(joinDataPath.lastIndexOf('|') + 1));
+            if (zkClient.needSplitNodeData(joinDataPath, joinDataBytes, OVERHEAD)) {
+                List<byte[]> parts = zkClient.splitNodeData(joinDataPath, joinDataBytes, OVERHEAD);
+
+                rtState.joinDataPartCnt = parts.size();
+
+                saveMultipleParts(zkClient, joinDataPath + ":", parts);
+
+                joinDataPath = zkClient.createIfNeeded(
+                    joinDataPath,
+                    marshalZip(new ZkJoiningNodeData(parts.size())),
+                    PERSISTENT);
+            }
+            else {
+                joinDataPath = zkClient.createIfNeeded(
+                    joinDataPath,
+                    joinDataBytes,
+                    PERSISTENT);
+            }
 
-            rtState.locNodeZkPath = rtState.zkClient.createSequential(
+            rtState.locNodeZkPath = zkClient.createSequential(
                 prefix,
                 zkPaths.aliveNodesDir,
-                prefix + ":" + locNode.id() + "|" + seqNum + "|",
+                prefix + ":" + locNode.id() + "|",
                 null,
                 EPHEMERAL_SEQUENTIAL);
 
@@ -605,21 +680,21 @@ public class ZookeeperDiscoveryImpl {
             rtState.internalOrder = ZkIgnitePaths.aliveInternalId(rtState.locNodeZkPath);
 
             /*
-            If node can not join due to some validation error this error is reported in join data,
-            As a minor optimization do not start watch this immediately, but only if do not receive
-            join event after timeout.
+            If node can not join due to validation error this error is reported in join data,
+            As a minor optimization do not start watch join data immediately, but only if do not receive
+            join event after some timeout.
              */
             rtState.joinTimeoutObj = new CheckJoinStateTimeoutObject(
-                joinDataPath,
+                multipartPathName(joinDataPath, 0),
                 rtState);
 
             spi.getSpiContext().addTimeoutObject(rtState.joinTimeoutObj);
 
-            rtState.zkClient.getChildrenAsync(zkPaths.aliveNodesDir, null, new CheckCoordinatorCallback());
+            zkClient.getChildrenAsync(zkPaths.aliveNodesDir, null, new CheckCoordinatorCallback());
 
-            rtState.zkClient.getDataAsync(zkPaths.evtsPath, watcher, dataCallback);
+            zkClient.getDataAsync(zkPaths.evtsPath, watcher, dataCallback);
         }
-        catch (ZookeeperClientFailedException e) {
+        catch (IgniteCheckedException | ZookeeperClientFailedException e) {
             throw new IgniteSpiException("Failed to initialize Zookeeper nodes", e);
         }
         finally {
@@ -889,80 +964,135 @@ public class ZookeeperDiscoveryImpl {
             handleProcessedEventsOnNodesFail(failedNodes);
     }
 
-    /**
-     * @param curTop Current nodes.
-     * @param internalId Joined node internal ID.
-     * @param aliveNodePath Joined node path.
-     * @throws Exception If failed.
-     */
-    private boolean processJoinOnCoordinator(TreeMap<Long, ZookeeperClusterNode> curTop,
-        int internalId,
-        String aliveNodePath) throws Exception {
-        UUID nodeId = ZkIgnitePaths.aliveNodeId(aliveNodePath);
+    private ZkJoiningNodeData unmarshalJoinData(UUID nodeId, UUID prefixId) throws Exception {
+        String joinDataPath = zkPaths.joiningNodeDataPath(nodeId, prefixId);
 
-        String joinDataPath = zkPaths.joiningNodeDataPath(nodeId, aliveNodePath);
-        byte[] joinData;
+        byte[] joinData = rtState.zkClient.getData(joinDataPath);
 
-        try {
-            joinData = rtState.zkClient.getData(joinDataPath);
-        }
-        catch (KeeperException.NoNodeException e) {
-            U.warn(log, "Failed to read joining node data, node left before join process finished: " + nodeId);
+        Object dataObj = unmarshalZip(joinData);
 
-            return false;
+        if (!(dataObj instanceof ZkJoiningNodeData))
+            throw new Exception("Invalid joined node data: " + dataObj);
+
+        ZkJoiningNodeData joiningNodeData = (ZkJoiningNodeData)dataObj;
+
+        if (joiningNodeData.partCount() > 1) {
+            joinData = readMultipleParts(rtState.zkClient, joinDataPath + ":", joiningNodeData.partCount());
+
+            joiningNodeData = unmarshalZip(joinData);
         }
 
-        String err = null;
+        return joiningNodeData;
+    }
+
+    /**
+     * @param nodeId
+     * @param aliveNodePath
+     * @return
+     * @throws Exception If failed.
+     */
+    private Object unmarshalJoinDataOnCoordinator(UUID nodeId, UUID prefixId, String aliveNodePath) throws Exception {
+        String joinDataPath = zkPaths.joiningNodeDataPath(nodeId, prefixId);
+
+        byte[] joinData = rtState.zkClient.getData(joinDataPath);
 
-        Object dataObj = null;
+        Object dataObj;
 
         try {
             dataObj = unmarshalZip(joinData);
 
-            if (dataObj instanceof ZkInternalJoinErrorMessage) {
-                if (log.isInfoEnabled())
-                    log.info("Ignore join data, node was failed by previous coordinator: " + aliveNodePath);
-
-                zkClient().deleteIfExists(zkPaths.aliveNodesDir + "/" + aliveNodePath, -1);
-
-                return false;
-            }
+            if (dataObj instanceof ZkInternalJoinErrorMessage)
+                return dataObj;
         }
         catch (Exception e) {
             U.error(log, "Failed to unmarshal joining node data [nodePath=" + aliveNodePath + "']", e);
 
-            err = "Failed to unmarshal join data: " + e;
+            return new ZkInternalJoinErrorMessage(ZkIgnitePaths.aliveInternalId(aliveNodePath),
+                "Failed to unmarshal join data: " + e);
         }
 
         assert dataObj instanceof ZkJoiningNodeData : dataObj;
 
         ZkJoiningNodeData joiningNodeData = (ZkJoiningNodeData)dataObj;
 
-        if (err == null)
-            err = validateJoiningNode(joiningNodeData.node());
+        if (joiningNodeData.partCount() > 1) {
+            joinData = readMultipleParts(rtState.zkClient, joinDataPath + ":", joiningNodeData.partCount());
+
+            try {
+                joiningNodeData = unmarshalZip(joinData);
+            }
+            catch (Exception e) {
+                U.error(log, "Failed to unmarshal joining node data [nodePath=" + aliveNodePath + "']", e);
+
+                return new ZkInternalJoinErrorMessage(ZkIgnitePaths.aliveInternalId(aliveNodePath),
+                    "Failed to unmarshal join data: " + e);
+            }
+        }
+
+        assert joiningNodeData.node() != null : joiningNodeData;
+
+        return joiningNodeData;
+    }
+
+    /**
+     * @param curTop Current nodes.
+     * @param internalId Joined node internal ID.
+     * @param aliveNodePath Joined node path.
+     * @throws Exception If failed.
+     */
+    private boolean processJoinOnCoordinator(
+        TreeMap<Long, ZookeeperClusterNode> curTop,
+        int internalId,
+        String aliveNodePath)
+        throws Exception
+    {
+        UUID nodeId = ZkIgnitePaths.aliveNodeId(aliveNodePath);
+        UUID prefixId = ZkIgnitePaths.aliveNodePrefixId(aliveNodePath);
+
+        Object data = unmarshalJoinDataOnCoordinator(nodeId, prefixId, aliveNodePath);
 
-        if (err == null) {
+        ZkInternalJoinErrorMessage joinErr = null;
+        ZkJoiningNodeData joiningNodeData = null;
+
+        if (data instanceof ZkJoiningNodeData) {
+            joiningNodeData = (ZkJoiningNodeData)data;
+
+            String err = validateJoiningNode(joiningNodeData.node());
+
+            if (err != null)
+                joinErr = new ZkInternalJoinErrorMessage(ZkIgnitePaths.aliveInternalId(aliveNodePath), err);
+        }
+        else {
+            assert data instanceof ZkInternalJoinErrorMessage : data;
+
+            joinErr = (ZkInternalJoinErrorMessage)data;
+        }
+
+        if (joinErr == null) {
             ZookeeperClusterNode joinedNode = joiningNodeData.node();
 
             assert nodeId.equals(joinedNode.id()) : joiningNodeData.node();
 
-            generateNodeJoin(curTop, joinData, joiningNodeData, internalId);
+            generateNodeJoin(curTop, joiningNodeData, internalId, prefixId);
 
             watchAliveNodeData(aliveNodePath);
 
             return true;
         }
         else {
-            ZkInternalJoinErrorMessage msg = new ZkInternalJoinErrorMessage(internalId, err);
+            if (joinErr.notifyNode) {
+                String joinDataPath = zkPaths.joiningNodeDataPath(nodeId, prefixId);
 
-            try {
-                zkClient().setData(joinDataPath, marshalZip(msg), -1);
-            }
-            catch (KeeperException.NoNodeException e) {
-                // Ignore, node already failed.
+                zkClient().setData(joinDataPath, marshalZip(joinErr), -1);
+
+                zkClient().deleteIfExists(zkPaths.aliveNodesDir + "/" + aliveNodePath, -1);
             }
+            else {
+                if (log.isInfoEnabled())
+                    log.info("Ignore join data, node was failed by previous coordinator: " + aliveNodePath);
 
-            zkClient().deleteIfExists(zkPaths.aliveNodesDir + "/" + aliveNodePath, -1);
+                zkClient().deleteIfExists(zkPaths.aliveNodesDir + "/" + aliveNodePath, -1);
+            }
 
             return false;
         }
@@ -1046,9 +1176,9 @@ public class ZookeeperDiscoveryImpl {
      */
     private void generateNodeJoin(
         TreeMap<Long, ZookeeperClusterNode> curTop,
-        byte[] joinData,
         ZkJoiningNodeData joiningNodeData,
-        int internalId)
+        int internalId,
+        UUID prefixId)
         throws Exception
     {
         ZookeeperClusterNode joinedNode = joiningNodeData.node();
@@ -1085,7 +1215,9 @@ public class ZookeeperDiscoveryImpl {
             rtState.evtsData.evtIdGen,
             rtState.evtsData.topVer,
             joinedNode.id(),
-            joinedNode.internalId());
+            joinedNode.internalId(),
+            prefixId,
+            joiningNodeData.partCount());
 
         evtData.joiningNodeData = joiningNodeData;
 
@@ -1097,14 +1229,12 @@ public class ZookeeperDiscoveryImpl {
 
         long start = System.currentTimeMillis();
 
-        rtState.zkClient.createIfNeeded(zkPaths.joinEventDataPath(evtData.eventId()), joinData, PERSISTENT);
         rtState.zkClient.createIfNeeded(zkPaths.joinEventDataPathForJoined(evtData.eventId()), dataForJoinedBytes, PERSISTENT);
 
         long time = System.currentTimeMillis() - start;
 
         if (log.isInfoEnabled()) {
             log.info("Generated NODE_JOINED event [evt=" + evtData +
-                ", joinedDataSize=" + joinData.length +
                 ", dataForJoinedSize=" + dataForJoinedBytes.length +
                 ", addDataTime=" + time + ']');
         }
@@ -1131,14 +1261,11 @@ public class ZookeeperDiscoveryImpl {
 
         rtState.top.addNode(locNode);
 
-        String path = rtState.locNodeZkPath.substring(rtState.locNodeZkPath.lastIndexOf('/') + 1);
-
-        String joinDataPath = zkPaths.joiningNodeDataPath(locNode.id(), path);
-
-        if (log.isDebugEnabled())
-            log.debug("Delete join data: " + joinDataPath);
+        String locAlivePath = rtState.locNodeZkPath.substring(rtState.locNodeZkPath.lastIndexOf('/') + 1);
 
-        rtState.zkClient.deleteIfExistsAsync(joinDataPath);
+        deleteJoiningNodeData(locNode.id(),
+            ZkIgnitePaths.aliveNodePrefixId(locAlivePath),
+            rtState.joinDataPartCnt);
 
         final List<ClusterNode> topSnapshot = Collections.singletonList((ClusterNode)locNode);
 
@@ -1398,9 +1525,7 @@ public class ZookeeperDiscoveryImpl {
                             joiningData = evtData0.joiningNodeData;
                         }
                         else {
-                            String path = zkPaths.joinEventDataPath(evtData.eventId());
-
-                            joiningData = unmarshalZip(rtState.zkClient.getData(path));
+                            joiningData = unmarshalJoinData(evtData0.nodeId, evtData0.joinDataPrefixId);
 
                             DiscoveryDataBag dataBag = new DiscoveryDataBag(evtData0.nodeId);
 
@@ -1846,16 +1971,28 @@ public class ZookeeperDiscoveryImpl {
         if (log.isDebugEnabled())
             log.debug("All nodes processed node join [evtData=" + evtData + ']');
 
-        String evtDataPath = zkPaths.joinEventDataPath(evtData.eventId());
+        deleteJoiningNodeData(evtData.nodeId, evtData.joinDataPrefixId, evtData.joinDataPartCnt);
+
         String dataForJoinedPath = zkPaths.joinEventDataPathForJoined(evtData.eventId());
 
         if (log.isDebugEnabled())
-            log.debug("Delete processed event data [path1=" + evtDataPath + ", path2=" + dataForJoinedPath + ']');
+            log.debug("Delete data for joined node [path=" + dataForJoinedPath + ']');
 
-        rtState.zkClient.deleteIfExistsAsync(evtDataPath);
         rtState.zkClient.deleteIfExistsAsync(dataForJoinedPath);
     }
 
+    private void deleteJoiningNodeData(UUID nodeId, UUID joinDataPrefixId, int partCnt) throws Exception {
+        String evtDataPath = zkPaths.joiningNodeDataPath(nodeId, joinDataPrefixId);
+
+        if (log.isDebugEnabled())
+            log.debug("Delete joining node data [path=" + evtDataPath + ']');
+
+        rtState.zkClient.deleteIfExistsAsync(evtDataPath);
+
+        if (partCnt > 1)
+            deleteMultiplePartsAsync(rtState.zkClient, evtDataPath + ":", partCnt);
+    }
+
     /**
      * @param evtData Event data.
      * @throws Exception If failed.

http://git-wip-us.apache.org/repos/asf/ignite/blob/bbd5a889/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientTest.java
index ec495cf..0c43f62 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientTest.java
@@ -54,36 +54,34 @@ public class ZookeeperClientTest extends GridCommonAbstractTest {
         super.afterTest();
     }
 
-//    /**
-//     * @throws Exception If failed.
-//     */
-//    public void testSaveLargeValue() throws Exception {
-//        startZK(1);
-//
-//        final ZookeeperClient client = new ZookeeperClient(log, zkCluster.getConnectString(), 3000, null);
-//
-//        ZooKeeper zk = client.zk();
-//
-//        int s = 1048526 + 1;
-//        // 1048517 11 1048528
-//        // 1048519 9 1048528
-//        // 1048520 8 1048528
-//
-//        String path = "/aaaaaaa";
-//
-//        while (true) {
-//            try {
-//                zk.create(path, new byte[s], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-//
-//                info("Created: " + s + " " + path.length() + " " + (s + path.length()));
-//
-//                break;
-//            }
-//            catch (KeeperException.ConnectionLossException e) {
-//                s -= 1;
-//            }
-//        }
-//    }
+    /**
+     * @throws Exception If failed.
+     */
+    public void testSaveLargeValue() throws Exception {
+        startZK(1);
+
+        final ZookeeperClient client = new ZookeeperClient(log, zkCluster.getConnectString(), 3000, null);
+
+        byte[] data = new byte[1024 * 1024];
+
+        String basePath = "/ignite";
+
+        assertTrue(client.needSplitNodeData(basePath, data, 2));
+
+        List<byte[]> parts = client.splitNodeData(basePath, data, 2);
+
+        ZooKeeper zk = client.zk();
+
+        for (int i = 0; i < parts.size(); i++) {
+            byte[] part = parts.get(i);
+
+            assertTrue(part.length > 0);
+
+            String path0 = basePath + ":" + 1;
+
+            zk.create(path0, part, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        }
+    }
 
     /**
      * @throws Exception If failed.