You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/12/05 14:57:42 UTC
[1/2] ignite git commit: zk
Repository: ignite
Updated Branches:
refs/heads/ignite-zk e909027fa -> bbd5a889f
zk
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/961167ab
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/961167ab
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/961167ab
Branch: refs/heads/ignite-zk
Commit: 961167ab2e3637ff8454e5e99196dd7d133e8889
Parents: e909027
Author: sboikov <sb...@gridgain.com>
Authored: Tue Dec 5 14:04:47 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Dec 5 14:23:40 2017 +0300
----------------------------------------------------------------------
.../discovery/zk/internal/ZookeeperDiscoveryImpl.java | 12 +++++++++++-
1 file changed, 11 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/961167ab/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
index 6c9d53a..e246a35 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
@@ -47,11 +47,13 @@ import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpiInternalL
import org.apache.ignite.internal.util.GridSpinBusyLock;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.marshaller.MarshallerUtils;
import org.apache.ignite.marshaller.jdk.JdkMarshaller;
+import org.apache.ignite.spi.IgniteNodeValidationResult;
import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.IgniteSpiTimeoutObject;
import org.apache.ignite.spi.discovery.DiscoveryDataBag;
@@ -977,7 +979,15 @@ public class ZookeeperDiscoveryImpl {
U.error(log, "Failed to include node in cluster, node with the same ID already exists [joiningNode=" + node +
", existingNode=" + node0 + ']');
- return "Node with the same ID already exists";
+ return "Node with the same ID already exists: " + node0;
+ }
+
+ IgniteNodeValidationResult err = spi.getSpiContext().validateNode(node);
+
+ if (err != null) {
+ LT.warn(log, err.message());
+
+ return err.sendMessage();
}
return null;
[2/2] ignite git commit: zk
Posted by sb...@apache.org.
zk
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/bbd5a889
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/bbd5a889
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/bbd5a889
Branch: refs/heads/ignite-zk
Commit: bbd5a889fc980e36c8c500ef221a551e451ad854
Parents: 961167a
Author: sboikov <sb...@gridgain.com>
Authored: Tue Dec 5 14:57:04 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Dec 5 17:57:35 2017 +0300
----------------------------------------------------------------------
.../internal/ZkDiscoveryNodeJoinEventData.java | 16 +-
.../discovery/zk/internal/ZkIgnitePaths.java | 37 +--
.../zk/internal/ZkInternalJoinErrorMessage.java | 3 +
.../zk/internal/ZkJoiningNodeData.java | 15 +-
.../discovery/zk/internal/ZkRuntimeState.java | 3 +
.../discovery/zk/internal/ZookeeperClient.java | 45 +++
.../zk/internal/ZookeeperDiscoveryImpl.java | 289 ++++++++++++++-----
.../zk/internal/ZookeeperClientTest.java | 58 ++--
8 files changed, 328 insertions(+), 138 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/bbd5a889/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java
index df4c137..fbf1fc8 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java
@@ -34,6 +34,12 @@ class ZkDiscoveryNodeJoinEventData extends ZkDiscoveryEventData {
final UUID nodeId;
/** */
+ final int joinDataPartCnt;
+
+ /** */
+ final UUID joinDataPrefixId;
+
+ /** */
transient ZkJoiningNodeData joiningNodeData;
/**
@@ -42,11 +48,19 @@ class ZkDiscoveryNodeJoinEventData extends ZkDiscoveryEventData {
* @param nodeId Joined node ID.
* @param joinedInternalId Joined node internal ID.
*/
- ZkDiscoveryNodeJoinEventData(long evtId, long topVer, UUID nodeId, int joinedInternalId) {
+ ZkDiscoveryNodeJoinEventData(long evtId,
+ long topVer,
+ UUID nodeId,
+ int joinedInternalId,
+ UUID joinDataPrefixId,
+ int joinDataPartCnt)
+ {
super(evtId, EventType.EVT_NODE_JOINED, topVer);
this.nodeId = nodeId;
this.joinedInternalId = joinedInternalId;
+ this.joinDataPrefixId = joinDataPrefixId;
+ this.joinDataPartCnt = joinDataPartCnt;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/bbd5a889/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
index 2478979..e52127a 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
@@ -151,14 +151,8 @@ class ZkIgnitePaths {
return clusterDir + "/" + path;
}
- String joiningNodeDataPath(UUID nodeId, String aliveNodePath) {
- int joinSeq = ZkIgnitePaths.aliveJoinDataSequence(aliveNodePath);
-
- return joinDataDir + '/' +
- ZkIgnitePaths.aliveNodePrefixId(aliveNodePath) + ":" +
- nodeId.toString() +
- "|" +
- String.format("%010d", joinSeq);
+ String joiningNodeDataPath(UUID nodeId, UUID prefixId) {
+ return joinDataDir + '/' + prefixId + ":" + nodeId.toString();
}
/**
@@ -175,8 +169,8 @@ class ZkIgnitePaths {
* @param path Alive node zk path.
* @return Node ID.
*/
- static String aliveNodePrefixId(String path) {
- return path.substring(0, ZkIgnitePaths.UUID_LEN);
+ static UUID aliveNodePrefixId(String path) {
+ return UUID.fromString(path.substring(0, ZkIgnitePaths.UUID_LEN));
}
/**
@@ -184,7 +178,7 @@ class ZkIgnitePaths {
* @return Node ID.
*/
static UUID aliveNodeId(String path) {
- // <uuid prefix>:<node id>|<join data seq>|<alive seq>
+ // <uuid prefix>:<node id>|<alive seq>
int startIdx = ZkIgnitePaths.UUID_LEN + 1;
String idStr = path.substring(startIdx, startIdx + ZkIgnitePaths.UUID_LEN);
@@ -193,17 +187,6 @@ class ZkIgnitePaths {
}
/**
- * @param path Alive node zk path.
- * @return Joined node sequence.
- */
- private static int aliveJoinDataSequence(String path) {
- int idx2 = path.lastIndexOf('|');
- int idx1 = path.lastIndexOf('|', idx2 - 1);
-
- return Integer.parseInt(path.substring(idx1 + 1, idx2));
- }
-
- /**
* @param path Event zk path.
* @return Event sequence number.
*/
@@ -230,18 +213,14 @@ class ZkIgnitePaths {
* @param evtId Event ID.
* @return Event zk path.
*/
- String joinEventDataPath(long evtId) {
- return evtsPath + "/" + evtId;
+ String joinEventDataPathForJoined(long evtId) {
+ return evtsPath + "/joined-" + evtId;
}
/**
* @param evtId Event ID.
- * @return Event zk path.
+ * @return Path for custom event ack.
*/
- String joinEventDataPathForJoined(long evtId) {
- return evtsPath + "/joined-" + evtId;
- }
-
String ackEventDataPath(long evtId) {
return customEventDataPath(true, String.valueOf(evtId));
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/bbd5a889/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalJoinErrorMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalJoinErrorMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalJoinErrorMessage.java
index e724673..6040c20 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalJoinErrorMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalJoinErrorMessage.java
@@ -25,6 +25,9 @@ class ZkInternalJoinErrorMessage implements ZkInternalMessage {
private static final long serialVersionUID = 0L;
/** */
+ transient boolean notifyNode = true;
+
+ /** */
final int nodeInternalId;
/** */
http://git-wip-us.apache.org/repos/asf/ignite/blob/bbd5a889/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoiningNodeData.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoiningNodeData.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoiningNodeData.java
index 6733ab6..284cbff 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoiningNodeData.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoiningNodeData.java
@@ -30,12 +30,19 @@ class ZkJoiningNodeData implements Serializable {
private static final long serialVersionUID = 0L;
/** */
+ private int partCnt;
+
+ /** */
@GridToStringInclude
- private final ZookeeperClusterNode node;
+ private ZookeeperClusterNode node;
/** */
@GridToStringInclude
- private final Map<Integer, Serializable> discoData;
+ private Map<Integer, Serializable> discoData;
+
+ ZkJoiningNodeData(int partCnt) {
+ this.partCnt = partCnt;
+ }
/**
* @param node Node.
@@ -49,6 +56,10 @@ class ZkJoiningNodeData implements Serializable {
this.discoData = discoData;
}
+ int partCount() {
+ return partCnt;
+ }
+
/**
* @return Node.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/bbd5a889/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java
index 660dc42..4653109 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java
@@ -33,6 +33,9 @@ class ZkRuntimeState {
int internalOrder;
/** */
+ int joinDataPartCnt;
+
+ /** */
IgniteSpiTimeoutObject joinTimeoutObj;
/** */
http://git-wip-us.apache.org/repos/asf/ignite/blob/bbd5a889/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
index bc024f1..a806548 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
@@ -276,6 +276,51 @@ public class ZookeeperClient implements Watcher {
}
+ /** */
+ private static final int MAX_REQ_SIZE = 1048528;
+
+ /**
+ * @param path Path.
+ * @param data Data.
+ * @return {@code True}
+ */
+ boolean needSplitNodeData(String path, byte[] data, int overhead) {
+ return requestOverhead(path) + data.length + overhead > MAX_REQ_SIZE;
+ }
+
+ List<byte[]> splitNodeData(String path, byte[] data, int overhead) {
+ int partSize = MAX_REQ_SIZE - requestOverhead(path) - overhead;
+
+ int partCnt = data.length / partSize;
+
+ if (data.length % partSize != 0)
+ partCnt++;
+
+ assert partCnt > 1 : "Do not need split";
+
+ List<byte[]> parts = new ArrayList<>(partCnt);
+
+ int remaining = data.length;
+
+ for (int i = 0; i < partCnt; i++) {
+ int partSize0 = Math.min(remaining, partSize);
+
+ byte[] part = new byte[partSize0];
+
+ System.arraycopy(data, i * partCnt, part, 0, part.length);
+
+ remaining -= partSize0;
+ }
+
+ assert remaining == 0 : remaining;
+
+ return parts;
+ }
+
+ private int requestOverhead(String path) {
+ return path.length();
+ }
+
/**
* @param path Path.
* @param data Data.
http://git-wip-us.apache.org/repos/asf/ignite/blob/bbd5a889/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
index e246a35..366c162 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
@@ -564,6 +564,66 @@ public class ZookeeperDiscoveryImpl {
}
}
+ private void deleteMultiplePartsAsync(ZookeeperClient zkClient, String basePath, int partCnt) {
+ for (int i = 0; i < partCnt; i++) {
+ String path = multipartPathName(basePath, i);
+
+ zkClient.deleteIfExistsAsync(path);
+ }
+
+ }
+
+ private byte[] readMultipleParts(ZookeeperClient zkClient, String basePath, int partCnt)
+ throws Exception {
+ assert partCnt >= 1;
+
+ if (partCnt > 1) {
+ List<byte[]> parts = new ArrayList<>(partCnt);
+
+ int totSize = 0;
+
+ for (int i = 0; i < partCnt; i++) {
+ byte[] part = zkClient.getData(multipartPathName(basePath, i));
+
+ totSize += part.length;
+ }
+
+ byte[] res = new byte[totSize];
+
+ int pos = 0;
+
+ for (int i = 0; i < partCnt; i++) {
+ byte[] part = parts.get(i);
+
+ System.arraycopy(part, 0, res, pos, part.length);
+
+ pos += part.length;
+ }
+
+ return res;
+ }
+ else
+ return zkClient.getData(multipartPathName(basePath, 0));
+ }
+
+ private void saveMultipleParts(ZookeeperClient zkClient, String basePath, List<byte[]> parts)
+ throws ZookeeperClientFailedException, InterruptedException
+ {
+ assert parts.size() > 1;
+
+ for (int i = 0; i < parts.size(); i++) {
+ byte[] part = parts.get(i);
+
+ String path = multipartPathName(basePath, i);
+
+ zkClient.createIfNeeded(path, part, PERSISTENT);
+ }
+ }
+
+ private static String multipartPathName(String basePath, int part) {
+ return basePath + String.format("%04d", part);
+ }
+
/**
* @param joinDataBytes Joining node data.
* @throws InterruptedException If interrupted.
@@ -577,23 +637,38 @@ public class ZookeeperDiscoveryImpl {
String prefix = UUID.randomUUID().toString();
- // TODO ZK: handle max size.
-
final ZkRuntimeState rtState = this.rtState;
- String joinDataPath = rtState.zkClient.createSequential(prefix,
- zkPaths.joinDataDir,
- prefix + ":" + locNode.id() + "|",
- joinDataBytes,
- EPHEMERAL_SEQUENTIAL);
+ ZookeeperClient zkClient = rtState.zkClient;
+
+ final int OVERHEAD = 5;
+
+ // TODO ZK: need clean up join data if failed before was able to create alive node.
+ String joinDataPath = zkPaths.joinDataDir + "/" + prefix + ":" + locNode.id();
- // TODO ZK: no need to use sequential
- int seqNum = Integer.parseInt(joinDataPath.substring(joinDataPath.lastIndexOf('|') + 1));
+ if (zkClient.needSplitNodeData(joinDataPath, joinDataBytes, OVERHEAD)) {
+ List<byte[]> parts = zkClient.splitNodeData(joinDataPath, joinDataBytes, OVERHEAD);
+
+ rtState.joinDataPartCnt = parts.size();
+
+ saveMultipleParts(zkClient, joinDataPath + ":", parts);
+
+ joinDataPath = zkClient.createIfNeeded(
+ joinDataPath,
+ marshalZip(new ZkJoiningNodeData(parts.size())),
+ PERSISTENT);
+ }
+ else {
+ joinDataPath = zkClient.createIfNeeded(
+ joinDataPath,
+ joinDataBytes,
+ PERSISTENT);
+ }
- rtState.locNodeZkPath = rtState.zkClient.createSequential(
+ rtState.locNodeZkPath = zkClient.createSequential(
prefix,
zkPaths.aliveNodesDir,
- prefix + ":" + locNode.id() + "|" + seqNum + "|",
+ prefix + ":" + locNode.id() + "|",
null,
EPHEMERAL_SEQUENTIAL);
@@ -605,21 +680,21 @@ public class ZookeeperDiscoveryImpl {
rtState.internalOrder = ZkIgnitePaths.aliveInternalId(rtState.locNodeZkPath);
/*
- If node can not join due to some validation error this error is reported in join data,
- As a minor optimization do not start watch this immediately, but only if do not receive
- join event after timeout.
+ If node can not join due to validation error this error is reported in join data,
+ As a minor optimization do not start watch join data immediately, but only if do not receive
+ join event after some timeout.
*/
rtState.joinTimeoutObj = new CheckJoinStateTimeoutObject(
- joinDataPath,
+ multipartPathName(joinDataPath, 0),
rtState);
spi.getSpiContext().addTimeoutObject(rtState.joinTimeoutObj);
- rtState.zkClient.getChildrenAsync(zkPaths.aliveNodesDir, null, new CheckCoordinatorCallback());
+ zkClient.getChildrenAsync(zkPaths.aliveNodesDir, null, new CheckCoordinatorCallback());
- rtState.zkClient.getDataAsync(zkPaths.evtsPath, watcher, dataCallback);
+ zkClient.getDataAsync(zkPaths.evtsPath, watcher, dataCallback);
}
- catch (ZookeeperClientFailedException e) {
+ catch (IgniteCheckedException | ZookeeperClientFailedException e) {
throw new IgniteSpiException("Failed to initialize Zookeeper nodes", e);
}
finally {
@@ -889,80 +964,135 @@ public class ZookeeperDiscoveryImpl {
handleProcessedEventsOnNodesFail(failedNodes);
}
- /**
- * @param curTop Current nodes.
- * @param internalId Joined node internal ID.
- * @param aliveNodePath Joined node path.
- * @throws Exception If failed.
- */
- private boolean processJoinOnCoordinator(TreeMap<Long, ZookeeperClusterNode> curTop,
- int internalId,
- String aliveNodePath) throws Exception {
- UUID nodeId = ZkIgnitePaths.aliveNodeId(aliveNodePath);
+ private ZkJoiningNodeData unmarshalJoinData(UUID nodeId, UUID prefixId) throws Exception {
+ String joinDataPath = zkPaths.joiningNodeDataPath(nodeId, prefixId);
- String joinDataPath = zkPaths.joiningNodeDataPath(nodeId, aliveNodePath);
- byte[] joinData;
+ byte[] joinData = rtState.zkClient.getData(joinDataPath);
- try {
- joinData = rtState.zkClient.getData(joinDataPath);
- }
- catch (KeeperException.NoNodeException e) {
- U.warn(log, "Failed to read joining node data, node left before join process finished: " + nodeId);
+ Object dataObj = unmarshalZip(joinData);
- return false;
+ if (!(dataObj instanceof ZkJoiningNodeData))
+ throw new Exception("Invalid joined node data: " + dataObj);
+
+ ZkJoiningNodeData joiningNodeData = (ZkJoiningNodeData)dataObj;
+
+ if (joiningNodeData.partCount() > 1) {
+ joinData = readMultipleParts(rtState.zkClient, joinDataPath + ":", joiningNodeData.partCount());
+
+ joiningNodeData = unmarshalZip(joinData);
}
- String err = null;
+ return joiningNodeData;
+ }
+
+ /**
+ * @param nodeId
+ * @param aliveNodePath
+ * @return
+ * @throws Exception If failed.
+ */
+ private Object unmarshalJoinDataOnCoordinator(UUID nodeId, UUID prefixId, String aliveNodePath) throws Exception {
+ String joinDataPath = zkPaths.joiningNodeDataPath(nodeId, prefixId);
+
+ byte[] joinData = rtState.zkClient.getData(joinDataPath);
- Object dataObj = null;
+ Object dataObj;
try {
dataObj = unmarshalZip(joinData);
- if (dataObj instanceof ZkInternalJoinErrorMessage) {
- if (log.isInfoEnabled())
- log.info("Ignore join data, node was failed by previous coordinator: " + aliveNodePath);
-
- zkClient().deleteIfExists(zkPaths.aliveNodesDir + "/" + aliveNodePath, -1);
-
- return false;
- }
+ if (dataObj instanceof ZkInternalJoinErrorMessage)
+ return dataObj;
}
catch (Exception e) {
U.error(log, "Failed to unmarshal joining node data [nodePath=" + aliveNodePath + "']", e);
- err = "Failed to unmarshal join data: " + e;
+ return new ZkInternalJoinErrorMessage(ZkIgnitePaths.aliveInternalId(aliveNodePath),
+ "Failed to unmarshal join data: " + e);
}
assert dataObj instanceof ZkJoiningNodeData : dataObj;
ZkJoiningNodeData joiningNodeData = (ZkJoiningNodeData)dataObj;
- if (err == null)
- err = validateJoiningNode(joiningNodeData.node());
+ if (joiningNodeData.partCount() > 1) {
+ joinData = readMultipleParts(rtState.zkClient, joinDataPath + ":", joiningNodeData.partCount());
+
+ try {
+ joiningNodeData = unmarshalZip(joinData);
+ }
+ catch (Exception e) {
+ U.error(log, "Failed to unmarshal joining node data [nodePath=" + aliveNodePath + "']", e);
+
+ return new ZkInternalJoinErrorMessage(ZkIgnitePaths.aliveInternalId(aliveNodePath),
+ "Failed to unmarshal join data: " + e);
+ }
+ }
+
+ assert joiningNodeData.node() != null : joiningNodeData;
+
+ return joiningNodeData;
+ }
+
+ /**
+ * @param curTop Current nodes.
+ * @param internalId Joined node internal ID.
+ * @param aliveNodePath Joined node path.
+ * @throws Exception If failed.
+ */
+ private boolean processJoinOnCoordinator(
+ TreeMap<Long, ZookeeperClusterNode> curTop,
+ int internalId,
+ String aliveNodePath)
+ throws Exception
+ {
+ UUID nodeId = ZkIgnitePaths.aliveNodeId(aliveNodePath);
+ UUID prefixId = ZkIgnitePaths.aliveNodePrefixId(aliveNodePath);
+
+ Object data = unmarshalJoinDataOnCoordinator(nodeId, prefixId, aliveNodePath);
- if (err == null) {
+ ZkInternalJoinErrorMessage joinErr = null;
+ ZkJoiningNodeData joiningNodeData = null;
+
+ if (data instanceof ZkJoiningNodeData) {
+ joiningNodeData = (ZkJoiningNodeData)data;
+
+ String err = validateJoiningNode(joiningNodeData.node());
+
+ if (err != null)
+ joinErr = new ZkInternalJoinErrorMessage(ZkIgnitePaths.aliveInternalId(aliveNodePath), err);
+ }
+ else {
+ assert data instanceof ZkInternalJoinErrorMessage : data;
+
+ joinErr = (ZkInternalJoinErrorMessage)data;
+ }
+
+ if (joinErr == null) {
ZookeeperClusterNode joinedNode = joiningNodeData.node();
assert nodeId.equals(joinedNode.id()) : joiningNodeData.node();
- generateNodeJoin(curTop, joinData, joiningNodeData, internalId);
+ generateNodeJoin(curTop, joiningNodeData, internalId, prefixId);
watchAliveNodeData(aliveNodePath);
return true;
}
else {
- ZkInternalJoinErrorMessage msg = new ZkInternalJoinErrorMessage(internalId, err);
+ if (joinErr.notifyNode) {
+ String joinDataPath = zkPaths.joiningNodeDataPath(nodeId, prefixId);
- try {
- zkClient().setData(joinDataPath, marshalZip(msg), -1);
- }
- catch (KeeperException.NoNodeException e) {
- // Ignore, node already failed.
+ zkClient().setData(joinDataPath, marshalZip(joinErr), -1);
+
+ zkClient().deleteIfExists(zkPaths.aliveNodesDir + "/" + aliveNodePath, -1);
}
+ else {
+ if (log.isInfoEnabled())
+ log.info("Ignore join data, node was failed by previous coordinator: " + aliveNodePath);
- zkClient().deleteIfExists(zkPaths.aliveNodesDir + "/" + aliveNodePath, -1);
+ zkClient().deleteIfExists(zkPaths.aliveNodesDir + "/" + aliveNodePath, -1);
+ }
return false;
}
@@ -1046,9 +1176,9 @@ public class ZookeeperDiscoveryImpl {
*/
private void generateNodeJoin(
TreeMap<Long, ZookeeperClusterNode> curTop,
- byte[] joinData,
ZkJoiningNodeData joiningNodeData,
- int internalId)
+ int internalId,
+ UUID prefixId)
throws Exception
{
ZookeeperClusterNode joinedNode = joiningNodeData.node();
@@ -1085,7 +1215,9 @@ public class ZookeeperDiscoveryImpl {
rtState.evtsData.evtIdGen,
rtState.evtsData.topVer,
joinedNode.id(),
- joinedNode.internalId());
+ joinedNode.internalId(),
+ prefixId,
+ joiningNodeData.partCount());
evtData.joiningNodeData = joiningNodeData;
@@ -1097,14 +1229,12 @@ public class ZookeeperDiscoveryImpl {
long start = System.currentTimeMillis();
- rtState.zkClient.createIfNeeded(zkPaths.joinEventDataPath(evtData.eventId()), joinData, PERSISTENT);
rtState.zkClient.createIfNeeded(zkPaths.joinEventDataPathForJoined(evtData.eventId()), dataForJoinedBytes, PERSISTENT);
long time = System.currentTimeMillis() - start;
if (log.isInfoEnabled()) {
log.info("Generated NODE_JOINED event [evt=" + evtData +
- ", joinedDataSize=" + joinData.length +
", dataForJoinedSize=" + dataForJoinedBytes.length +
", addDataTime=" + time + ']');
}
@@ -1131,14 +1261,11 @@ public class ZookeeperDiscoveryImpl {
rtState.top.addNode(locNode);
- String path = rtState.locNodeZkPath.substring(rtState.locNodeZkPath.lastIndexOf('/') + 1);
-
- String joinDataPath = zkPaths.joiningNodeDataPath(locNode.id(), path);
-
- if (log.isDebugEnabled())
- log.debug("Delete join data: " + joinDataPath);
+ String locAlivePath = rtState.locNodeZkPath.substring(rtState.locNodeZkPath.lastIndexOf('/') + 1);
- rtState.zkClient.deleteIfExistsAsync(joinDataPath);
+ deleteJoiningNodeData(locNode.id(),
+ ZkIgnitePaths.aliveNodePrefixId(locAlivePath),
+ rtState.joinDataPartCnt);
final List<ClusterNode> topSnapshot = Collections.singletonList((ClusterNode)locNode);
@@ -1398,9 +1525,7 @@ public class ZookeeperDiscoveryImpl {
joiningData = evtData0.joiningNodeData;
}
else {
- String path = zkPaths.joinEventDataPath(evtData.eventId());
-
- joiningData = unmarshalZip(rtState.zkClient.getData(path));
+ joiningData = unmarshalJoinData(evtData0.nodeId, evtData0.joinDataPrefixId);
DiscoveryDataBag dataBag = new DiscoveryDataBag(evtData0.nodeId);
@@ -1846,16 +1971,28 @@ public class ZookeeperDiscoveryImpl {
if (log.isDebugEnabled())
log.debug("All nodes processed node join [evtData=" + evtData + ']');
- String evtDataPath = zkPaths.joinEventDataPath(evtData.eventId());
+ deleteJoiningNodeData(evtData.nodeId, evtData.joinDataPrefixId, evtData.joinDataPartCnt);
+
String dataForJoinedPath = zkPaths.joinEventDataPathForJoined(evtData.eventId());
if (log.isDebugEnabled())
- log.debug("Delete processed event data [path1=" + evtDataPath + ", path2=" + dataForJoinedPath + ']');
+ log.debug("Delete data for joined node [path=" + dataForJoinedPath + ']');
- rtState.zkClient.deleteIfExistsAsync(evtDataPath);
rtState.zkClient.deleteIfExistsAsync(dataForJoinedPath);
}
+ private void deleteJoiningNodeData(UUID nodeId, UUID joinDataPrefixId, int partCnt) throws Exception {
+ String evtDataPath = zkPaths.joiningNodeDataPath(nodeId, joinDataPrefixId);
+
+ if (log.isDebugEnabled())
+ log.debug("Delete joining node data [path=" + evtDataPath + ']');
+
+ rtState.zkClient.deleteIfExistsAsync(evtDataPath);
+
+ if (partCnt > 1)
+ deleteMultiplePartsAsync(rtState.zkClient, evtDataPath + ":", partCnt);
+ }
+
/**
* @param evtData Event data.
* @throws Exception If failed.
http://git-wip-us.apache.org/repos/asf/ignite/blob/bbd5a889/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientTest.java
index ec495cf..0c43f62 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientTest.java
@@ -54,36 +54,34 @@ public class ZookeeperClientTest extends GridCommonAbstractTest {
super.afterTest();
}
-// /**
-// * @throws Exception If failed.
-// */
-// public void testSaveLargeValue() throws Exception {
-// startZK(1);
-//
-// final ZookeeperClient client = new ZookeeperClient(log, zkCluster.getConnectString(), 3000, null);
-//
-// ZooKeeper zk = client.zk();
-//
-// int s = 1048526 + 1;
-// // 1048517 11 1048528
-// // 1048519 9 1048528
-// // 1048520 8 1048528
-//
-// String path = "/aaaaaaa";
-//
-// while (true) {
-// try {
-// zk.create(path, new byte[s], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-//
-// info("Created: " + s + " " + path.length() + " " + (s + path.length()));
-//
-// break;
-// }
-// catch (KeeperException.ConnectionLossException e) {
-// s -= 1;
-// }
-// }
-// }
+ /**
+ * @throws Exception If failed.
+ */
+ public void testSaveLargeValue() throws Exception {
+ startZK(1);
+
+ final ZookeeperClient client = new ZookeeperClient(log, zkCluster.getConnectString(), 3000, null);
+
+ byte[] data = new byte[1024 * 1024];
+
+ String basePath = "/ignite";
+
+ assertTrue(client.needSplitNodeData(basePath, data, 2));
+
+ List<byte[]> parts = client.splitNodeData(basePath, data, 2);
+
+ ZooKeeper zk = client.zk();
+
+ for (int i = 0; i < parts.size(); i++) {
+ byte[] part = parts.get(i);
+
+ assertTrue(part.length > 0);
+
+ String path0 = basePath + ":" + 1;
+
+ zk.create(path0, part, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ }
+ }
/**
* @throws Exception If failed.