You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2018/04/13 09:33:18 UTC
[06/54] [abbrv] ignite git commit: IGNITE-7222 Added ZooKeeper
discovery SPI
http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveStartMessage.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveStartMessage.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveStartMessage.java
new file mode 100644
index 0000000..0c79c36
--- /dev/null
+++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveStartMessage.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.util.UUID;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Zk Communication Error Resolve Start Message.
+ */
+public class ZkCommunicationErrorResolveStartMessage implements DiscoverySpiCustomMessage, ZkInternalMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ final UUID id;
+
+ /**
+ * @param id Unique ID.
+ */
+ ZkCommunicationErrorResolveStartMessage(UUID id) {
+ this.id = id;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public DiscoverySpiCustomMessage ackMessage() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isMutable() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean stopProcess() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(ZkCommunicationErrorResolveStartMessage.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationFailureContext.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationFailureContext.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationFailureContext.java
new file mode 100644
index 0000000..d27b717
--- /dev/null
+++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationFailureContext.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.CommunicationFailureContext;
+import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
+import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ *
+ */
+class ZkCommunicationFailureContext implements CommunicationFailureContext {
+ /** */
+ private static final Comparator<ClusterNode> NODE_ORDER_CMP = new Comparator<ClusterNode>() {
+ @Override public int compare(ClusterNode node1, ClusterNode node2) {
+ return Long.compare(node1.order(), node2.order());
+ }
+ };
+
+ /** */
+ private Set<ClusterNode> killedNodes = new HashSet<>();
+
+ /** */
+ private final Map<UUID, BitSet> nodesState;
+
+ /** */
+ private final List<ClusterNode> initialNodes;
+
+ /** */
+ private final List<ClusterNode> curNodes;
+
+ /** */
+ private final GridCacheSharedContext<?, ?> ctx;
+
+ /**
+ * @param ctx Context.
+ * @param curNodes Current topology snapshot.
+ * @param initialNodes Topology snapshot when communication error resolve started.
+ * @param nodesState Nodes communication state.
+ */
+ ZkCommunicationFailureContext(
+ GridCacheSharedContext<?, ?> ctx,
+ List<ClusterNode> curNodes,
+ List<ClusterNode> initialNodes,
+ Map<UUID, BitSet> nodesState)
+ {
+ this.ctx = ctx;
+ this.curNodes = Collections.unmodifiableList(curNodes);
+ this.initialNodes = initialNodes;
+ this.nodesState = nodesState;
+ }
+
+ /** {@inheritDoc} */
+ @Override public List<ClusterNode> topologySnapshot() {
+ return curNodes;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean connectionAvailable(ClusterNode node1, ClusterNode node2) {
+ BitSet nodeState = nodesState.get(node1.id());
+
+ if (nodeState == null)
+ throw new IllegalArgumentException("Invalid node: " + node1);
+
+ int nodeIdx = Collections.binarySearch(initialNodes, node2, NODE_ORDER_CMP);
+
+ if (nodeIdx < 0)
+ throw new IllegalArgumentException("Invalid node: " + node2);
+
+ assert nodeIdx < nodeState.size() : nodeIdx;
+
+ return nodeState.get(nodeIdx);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Map<String, CacheConfiguration<?, ?>> startedCaches() {
+ Map<Integer, DynamicCacheDescriptor> cachesMap = ctx.affinity().caches();
+
+ Map<String, CacheConfiguration<?, ?>> res = U.newHashMap(cachesMap.size());
+
+ for (DynamicCacheDescriptor desc : cachesMap.values()) {
+ if (desc.cacheType().userCache())
+ res.put(desc.cacheName(), desc.cacheConfiguration());
+ }
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public List<List<ClusterNode>> cacheAffinity(String cacheName) {
+ if (cacheName == null)
+ throw new NullPointerException("Null cache name.");
+
+ DynamicCacheDescriptor cacheDesc = ctx.affinity().caches().get(CU.cacheId(cacheName));
+
+ if (cacheDesc == null)
+ throw new IllegalArgumentException("Invalid cache name: " + cacheName);
+
+ GridAffinityAssignmentCache aff = ctx.affinity().groupAffinity(cacheDesc.groupId());
+
+ assert aff != null : cacheName;
+
+ return aff.readyAssignments(aff.lastVersion());
+ }
+
+ /** {@inheritDoc} */
+ @Override public List<List<ClusterNode>> cachePartitionOwners(String cacheName) {
+ if (cacheName == null)
+ throw new NullPointerException("Null cache name.");
+
+ DynamicCacheDescriptor cacheDesc = ctx.affinity().caches().get(CU.cacheId(cacheName));
+
+ if (cacheDesc == null)
+ throw new IllegalArgumentException("Invalid cache name: " + cacheName);
+
+ if (cacheDesc.cacheConfiguration().getCacheMode() == CacheMode.LOCAL)
+ return Collections.emptyList();
+
+ CacheGroupContext grp = ctx.cache().cacheGroup(cacheDesc.groupId());
+
+ GridDhtPartitionTopology top;
+
+ if (grp == null) {
+ top = ctx.exchange().clientTopologyIfExists(cacheDesc.groupId());
+
+ assert top != null : cacheName;
+ }
+ else
+ top = grp.topology();
+
+ return top.allOwners();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void killNode(ClusterNode node) {
+ if (node == null)
+ throw new NullPointerException();
+
+ if (Collections.binarySearch(curNodes, node, NODE_ORDER_CMP) < 0)
+ throw new IllegalArgumentException("Invalid node: " + node);
+
+ killedNodes.add(node);
+ }
+
+ /**
+ * @return Nodes to fail.
+ */
+ Set<ClusterNode> killedNodes() {
+ return killedNodes;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return "ZkCommunicationFailureContext []";
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java
new file mode 100644
index 0000000..21dfe62
--- /dev/null
+++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.util.UUID;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
+
+/**
+ *
+ */
+class ZkDiscoveryCustomEventData extends ZkDiscoveryEventData {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ final long origEvtId;
+
+ /** */
+ final UUID sndNodeId;
+
+ /** */
+ final String evtPath;
+
+ /** Message instance (can be marshalled as part of ZkDiscoveryCustomEventData or stored in separate znode. */
+ DiscoverySpiCustomMessage msg;
+
+ /** Unmarshalled message. */
+ transient DiscoverySpiCustomMessage resolvedMsg;
+
+ /**
+ * @param evtId Event ID.
+ * @param origEvtId For acknowledge events ID of original event.
+ * @param topVer Topology version.
+ * @param sndNodeId Sender node ID.
+ * @param msg Message instance.
+ * @param evtPath Event path.
+ */
+ ZkDiscoveryCustomEventData(
+ long evtId,
+ long origEvtId,
+ long topVer,
+ UUID sndNodeId,
+ DiscoverySpiCustomMessage msg,
+ String evtPath)
+ {
+ super(evtId, ZK_EVT_CUSTOM_EVT, topVer);
+
+ assert sndNodeId != null;
+ assert msg != null || origEvtId != 0 || !F.isEmpty(evtPath);
+
+ this.origEvtId = origEvtId;
+ this.msg = msg;
+ this.sndNodeId = sndNodeId;
+ this.evtPath = evtPath;
+ }
+
+ /**
+ * @return {@code True} for custom event ack message.
+ */
+ boolean ackEvent() {
+ return origEvtId != 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return "ZkDiscoveryCustomEventData [" +
+ "evtId=" + eventId() +
+ ", topVer=" + topologyVersion() +
+ ", sndNode=" + sndNodeId +
+ ", ack=" + ackEvent() +
+ ']';
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java
new file mode 100644
index 0000000..d667a17
--- /dev/null
+++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Set;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ *
+ */
+abstract class ZkDiscoveryEventData implements Serializable {
+ /** */
+ static final byte ZK_EVT_NODE_JOIN = 1;
+
+ /** */
+ static final byte ZK_EVT_NODE_FAILED = 2;
+
+ /** */
+ static final byte ZK_EVT_CUSTOM_EVT = 3;
+
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private final long evtId;
+
+ /** */
+ private final byte evtType;
+
+ /** */
+ private final long topVer;
+
+ /** */
+ private transient Set<Long> remainingAcks;
+
+ /** */
+ int flags;
+
+ /**
+ * @param evtId Event ID.
+ * @param evtType Event type.
+ * @param topVer Topology version.
+ */
+ ZkDiscoveryEventData(long evtId, byte evtType, long topVer) {
+ assert evtType == ZK_EVT_NODE_JOIN || evtType == ZK_EVT_NODE_FAILED || evtType == ZK_EVT_CUSTOM_EVT : evtType;
+
+ this.evtId = evtId;
+ this.evtType = evtType;
+ this.topVer = topVer;
+ }
+
+ /**
+ * @param nodes Current nodes in topology.
+ */
+ void initRemainingAcks(Collection<ZookeeperClusterNode> nodes) {
+ assert remainingAcks == null : this;
+
+ remainingAcks = U.newHashSet(nodes.size());
+
+ for (ZookeeperClusterNode node : nodes) {
+ if (!node.isLocal() && node.order() <= topVer) {
+ boolean add = remainingAcks.add(node.internalId());
+
+ assert add : node;
+ }
+ }
+ }
+
+ /**
+ * @param node Node.
+ */
+ void addRemainingAck(ZookeeperClusterNode node) {
+ assert node.order() <= topVer : node;
+
+ boolean add = remainingAcks.add(node.internalId());
+
+ assert add : node;
+ }
+
+ /**
+ * @return {@code True} if all nodes processed event.
+ */
+ boolean allAcksReceived() {
+ return remainingAcks.isEmpty();
+ }
+
+ /**
+ * @return Remaining acks.
+ */
+ Set<Long> remainingAcks() {
+ return remainingAcks;
+ }
+
+ /**
+ * @param nodeInternalId Node ID.
+ * @param ackEvtId Last event ID processed on node.
+ * @return {@code True} if all nodes processed event.
+ */
+ boolean onAckReceived(Long nodeInternalId, long ackEvtId) {
+ assert remainingAcks != null;
+
+ if (ackEvtId >= evtId)
+ remainingAcks.remove(nodeInternalId);
+
+ return remainingAcks.isEmpty();
+ }
+
+ /**
+ * @param node Failed node.
+ * @return {@code True} if all nodes processed event.
+ */
+ boolean onNodeFail(ZookeeperClusterNode node) {
+ assert remainingAcks != null : this;
+
+ remainingAcks.remove(node.internalId());
+
+ return remainingAcks.isEmpty();
+ }
+
+ /**
+ * @param flag Flag mask.
+ * @return {@code True} if flag set.
+ */
+ boolean flagSet(int flag) {
+ return (flags & flag) == flag;
+ }
+
+ /**
+ * @return Event ID.
+ */
+ long eventId() {
+ return evtId;
+ }
+
+ /**
+ * @return Event type.
+ */
+ byte eventType() {
+ return evtType;
+ }
+
+ /**
+ * @return Event topology version.
+ */
+ long topologyVersion() {
+ return topVer;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java
new file mode 100644
index 0000000..dce861b
--- /dev/null
+++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.TreeMap;
+import java.util.UUID;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+class ZkDiscoveryEventsData implements Serializable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Unique cluster ID (generated when first node in cluster starts). */
+ final UUID clusterId;
+
+ /** Internal order of last processed custom event. */
+ long procCustEvt = -1;
+
+ /** Event ID counter. */
+ long evtIdGen;
+
+ /** Current topology version. */
+ long topVer;
+
+ /** Max node internal order in cluster. */
+ long maxInternalOrder;
+
+ /** Cluster start time (recorded when first node in cluster starts). */
+ final long clusterStartTime;
+
+ /** Events to process. */
+ final TreeMap<Long, ZkDiscoveryEventData> evts;
+
+ /** ID of current active communication error resolve process. */
+ private UUID commErrFutId;
+
+ /**
+ * @param clusterStartTime Start time of first node in cluster.
+ * @return Events.
+ */
+ static ZkDiscoveryEventsData createForNewCluster(long clusterStartTime) {
+ return new ZkDiscoveryEventsData(
+ UUID.randomUUID(),
+ clusterStartTime,
+ 1L,
+ new TreeMap<Long, ZkDiscoveryEventData>()
+ );
+ }
+
+ /**
+ * @param clusterId Cluster ID.
+ * @param topVer Current topology version.
+ * @param clusterStartTime Cluster start time.
+ * @param evts Events history.
+ */
+ private ZkDiscoveryEventsData(
+ UUID clusterId,
+ long clusterStartTime,
+ long topVer,
+ TreeMap<Long, ZkDiscoveryEventData> evts)
+ {
+ this.clusterId = clusterId;
+ this.clusterStartTime = clusterStartTime;
+ this.topVer = topVer;
+ this.evts = evts;
+ }
+
+ /**
+ * @param node Joined node.
+ */
+ void onNodeJoin(ZookeeperClusterNode node) {
+ if (node.internalId() > maxInternalOrder)
+ maxInternalOrder = node.internalId();
+ }
+
+ /**
+ * @return Future ID.
+ */
+ @Nullable UUID communicationErrorResolveFutureId() {
+ return commErrFutId;
+ }
+
+ /**
+ * @param id Future ID.
+ */
+ void communicationErrorResolveFutureId(@Nullable UUID id) {
+ commErrFutId = id;
+ }
+
+ /**
+ * @param nodes Current nodes in topology (these nodes should ack that event processed).
+ * @param evt Event.
+ */
+ void addEvent(Collection<ZookeeperClusterNode> nodes, ZkDiscoveryEventData evt) {
+ Object old = evts.put(evt.eventId(), evt);
+
+ assert old == null : old;
+
+ evt.initRemainingAcks(nodes);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeFailEventData.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeFailEventData.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeFailEventData.java
new file mode 100644
index 0000000..c76158f
--- /dev/null
+++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeFailEventData.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+/**
+ *
+ */
+class ZkDiscoveryNodeFailEventData extends ZkDiscoveryEventData {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private long failedNodeInternalId;
+
+ /**
+ * @param evtId Event ID.
+ * @param topVer Topology version.
+ * @param failedNodeInternalId Failed node ID.
+ */
+ ZkDiscoveryNodeFailEventData(long evtId, long topVer, long failedNodeInternalId) {
+ super(evtId, ZK_EVT_NODE_FAILED, topVer);
+
+ this.failedNodeInternalId = failedNodeInternalId;
+ }
+
+ /**
+ * @return Failed node ID.
+ */
+ long failedNodeInternalId() {
+ return failedNodeInternalId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return "ZkDiscoveryNodeFailEventData [" +
+ "evtId=" + eventId() +
+ ", topVer=" + topologyVersion() +
+ ", nodeId=" + failedNodeInternalId + ']';
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java
new file mode 100644
index 0000000..e46d52d
--- /dev/null
+++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.util.List;
+
+/**
+ *
+ */
+class ZkDiscoveryNodeJoinEventData extends ZkDiscoveryEventData {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ final List<ZkJoinedNodeEvtData> joinedNodes;
+
+ /** */
+ final int dataForJoinedPartCnt;
+
+ /**
+ * @param evtId Event ID.
+ * @param topVer Topology version.
+ * @param joinedNodes Joined nodes data.
+ * @param dataForJoinedPartCnt Data for joined part count.
+ */
+ ZkDiscoveryNodeJoinEventData(
+ long evtId,
+ long topVer,
+ List<ZkJoinedNodeEvtData> joinedNodes,
+ int dataForJoinedPartCnt)
+ {
+ super(evtId, ZK_EVT_NODE_JOIN, topVer);
+
+ this.joinedNodes = joinedNodes;
+ this.dataForJoinedPartCnt = dataForJoinedPartCnt;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return "ZkDiscoveryNodeJoinEventData [" +
+ "evtId=" + eventId() +
+ ", topVer=" + topologyVersion() +
+ ", nodes=" + joinedNodes + ']';
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDistributedCollectDataFuture.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDistributedCollectDataFuture.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDistributedCollectDataFuture.java
new file mode 100644
index 0000000..174d698
--- /dev/null
+++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDistributedCollectDataFuture.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.Stat;
+
+/**
+ *
+ */
+class ZkDistributedCollectDataFuture extends GridFutureAdapter<Void> {
+ /** */
+ private final IgniteLogger log;
+
+ /** */
+ private final String futPath;
+
+ /** */
+ private final Set<Long> remainingNodes;
+
+ /** */
+ private final Callable<Void> lsnr;
+
+ /**
+ * @param impl Disovery impl
+ * @param rtState Runtime state.
+ * @param futPath Future path.
+ * @param lsnr Future listener.
+ * @throws Exception If listener call failed.
+ */
+ ZkDistributedCollectDataFuture(
+ ZookeeperDiscoveryImpl impl,
+ ZkRuntimeState rtState,
+ String futPath,
+ Callable<Void> lsnr)
+ throws Exception
+ {
+ this.log = impl.log();
+ this.futPath = futPath;
+ this.lsnr = lsnr;
+
+ ZkClusterNodes top = rtState.top;
+
+ // Assume new nodes can not join while future is in progress.
+
+ remainingNodes = U.newHashSet(top.nodesByOrder.size());
+
+ for (ZookeeperClusterNode node : top.nodesByInternalId.values())
+ remainingNodes.add(node.order());
+
+ NodeResultsWatcher watcher = new NodeResultsWatcher(rtState, impl);
+
+ if (remainingNodes.isEmpty())
+ completeAndNotifyListener();
+ else {
+ if (log.isInfoEnabled()) {
+ log.info("Initialize data collect future [futPath=" + futPath + ", " +
+ "remainingNodes=" + remainingNodes.size() + ']');
+ }
+
+ rtState.zkClient.getChildrenAsync(futPath, watcher, watcher);
+ }
+ }
+
+ /**
+ * @throws Exception If listener call failed.
+ */
+ private void completeAndNotifyListener() throws Exception {
+ if (super.onDone())
+ lsnr.call();
+ }
+
+ /**
+ * @param futPath
+ * @param client
+ * @param nodeOrder
+ * @param data
+ * @throws Exception If failed.
+ */
+ static void saveNodeResult(String futPath, ZookeeperClient client, long nodeOrder, byte[] data) throws Exception {
+ client.createIfNeeded(futPath + "/" + nodeOrder, data, CreateMode.PERSISTENT);
+ }
+
+ /**
+ * @param futPath
+ * @param client
+ * @param nodeOrder
+ * @return Node result data.
+ * @throws Exception If fai.ed
+ */
+ static byte[] readNodeResult(String futPath, ZookeeperClient client, long nodeOrder) throws Exception {
+ return client.getData(futPath + "/" + nodeOrder);
+ }
+
+ /**
+ * @param futResPath Result path.
+ * @param client Client.
+ * @param data Result data.
+ * @throws Exception If failed.
+ */
+ static void saveResult(String futResPath, ZookeeperClient client, byte[] data) throws Exception {
+ client.createIfNeeded(futResPath, data, CreateMode.PERSISTENT);
+ }
+
+ static byte[] readResult(ZookeeperClient client, ZkIgnitePaths paths, UUID futId) throws Exception {
+ return client.getData(paths.distributedFutureResultPath(futId));
+ }
+
+ /**
+ * @param client Client.
+ * @param paths Paths utils.
+ * @param futId Future ID.
+ * @param log Ignite Logger.
+ * @throws Exception If failed.
+ */
+ static void deleteFutureData(ZookeeperClient client,
+ ZkIgnitePaths paths,
+ UUID futId,
+ IgniteLogger log
+ ) throws Exception {
+ // TODO ZK: https://issues.apache.org/jira/browse/IGNITE-8189
+ String evtDir = paths.distributedFutureBasePath(futId);
+
+ try {
+ client.deleteAll(evtDir,
+ client.getChildren(evtDir),
+ -1);
+ }
+ catch (KeeperException.NoNodeException e) {
+ U.log(log, "Node for deletion was not found: " + e.getPath());
+
+ // TODO ZK: https://issues.apache.org/jira/browse/IGNITE-8189
+ }
+
+ client.deleteIfExists(evtDir, -1);
+
+ client.deleteIfExists(paths.distributedFutureResultPath(futId), -1);
+ }
+
+ /**
+ * @param top Current topology.
+ * @throws Exception If listener call failed.
+ */
+ void onTopologyChange(ZkClusterNodes top) throws Exception {
+ if (remainingNodes.isEmpty())
+ return;
+
+ for (Iterator<Long> it = remainingNodes.iterator(); it.hasNext();) {
+ Long nodeOrder = it.next();
+
+ if (!top.nodesByOrder.containsKey(nodeOrder)) {
+ it.remove();
+
+ int remaining = remainingNodes.size();
+
+ if (log.isInfoEnabled()) {
+ log.info("ZkDistributedCollectDataFuture removed remaining failed node [node=" + nodeOrder +
+ ", remaining=" + remaining +
+ ", futPath=" + futPath + ']');
+ }
+
+ if (remaining == 0) {
+ completeAndNotifyListener();
+
+ break;
+ }
+ }
+ }
+ }
+
+ /**
+ *
+ */
+ class NodeResultsWatcher extends ZkAbstractWatcher implements AsyncCallback.Children2Callback {
+ /**
+ * @param rtState Runtime state.
+ * @param impl Discovery impl.
+ */
+ NodeResultsWatcher(ZkRuntimeState rtState, ZookeeperDiscoveryImpl impl) {
+ super(rtState, impl);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void process0(WatchedEvent evt) {
+ if (evt.getType() == Watcher.Event.EventType.NodeChildrenChanged)
+ rtState.zkClient.getChildrenAsync(evt.getPath(), this, this);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
+ if (!onProcessStart())
+ return;
+
+ try {
+ if (!isDone()) {
+ assert rc == 0 : KeeperException.Code.get(rc);
+
+ for (int i = 0; i < children.size(); i++) {
+ Long nodeOrder = Long.parseLong(children.get(i));
+
+ if (remainingNodes.remove(nodeOrder)) {
+ int remaining = remainingNodes.size();
+
+ if (log.isInfoEnabled()) {
+ log.info("ZkDistributedCollectDataFuture added new result [node=" + nodeOrder +
+ ", remaining=" + remaining +
+ ", futPath=" + path + ']');
+ }
+
+ if (remaining == 0)
+ completeAndNotifyListener();
+ }
+ }
+ }
+
+ onProcessEnd();
+ }
+ catch (Throwable e) {
+ onProcessError(e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkForceNodeFailMessage.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkForceNodeFailMessage.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkForceNodeFailMessage.java
new file mode 100644
index 0000000..de7291c
--- /dev/null
+++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkForceNodeFailMessage.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Zk Force Node Fail Message.
+ */
+public class ZkForceNodeFailMessage implements DiscoverySpiCustomMessage, ZkInternalMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ final long nodeInternalId;
+
+ /** */
+ final String warning;
+
+ /**
+ * @param nodeInternalId Node ID.
+ * @param warning Warning to be displayed on all nodes.
+ */
+ ZkForceNodeFailMessage(long nodeInternalId, String warning) {
+ this.nodeInternalId = nodeInternalId;
+ this.warning = warning;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public DiscoverySpiCustomMessage ackMessage() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isMutable() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean stopProcess() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(ZkForceNodeFailMessage.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
new file mode 100644
index 0000000..9caf00f
--- /dev/null
+++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.util.UUID;
+
+/**
+ *
+ */
+class ZkIgnitePaths {
+ /** */
+ static final String PATH_SEPARATOR = "/";
+
+ /** */
+ private static final byte CLIENT_NODE_FLAG_MASK = 0x01;
+
+ /** */
+ private static final int UUID_LEN = 36;
+
+ /** Directory to store joined node data. */
+ private static final String JOIN_DATA_DIR = "jd";
+
+ /** Directory to store new custom events. */
+ private static final String CUSTOM_EVTS_DIR = "ce";
+
+ /** Directory to store parts of multi-parts custom events. */
+ private static final String CUSTOM_EVTS_PARTS_DIR = "cp";
+
+ /** Directory to store acknowledge messages for custom events. */
+ private static final String CUSTOM_EVTS_ACKS_DIR = "ca";
+
+ /** Directory to store EPHEMERAL znodes for alive cluster nodes. */
+ static final String ALIVE_NODES_DIR = "n";
+
+ /** Path to store discovery events {@link ZkDiscoveryEventsData}. */
+ private static final String DISCO_EVENTS_PATH = "e";
+
+ /** */
+ final String clusterDir;
+
+ /** */
+ final String aliveNodesDir;
+
+ /** */
+ final String joinDataDir;
+
+ /** */
+ final String evtsPath;
+
+ /** */
+ final String customEvtsDir;
+
+ /** */
+ final String customEvtsPartsDir;
+
+ /** */
+ final String customEvtsAcksDir;
+
+ /**
+ * @param zkRootPath Base Zookeeper directory for all Ignite nodes.
+ */
+ ZkIgnitePaths(String zkRootPath) {
+ clusterDir = zkRootPath;
+
+ aliveNodesDir = zkPath(ALIVE_NODES_DIR);
+ joinDataDir = zkPath(JOIN_DATA_DIR);
+ evtsPath = zkPath(DISCO_EVENTS_PATH);
+ customEvtsDir = zkPath(CUSTOM_EVTS_DIR);
+ customEvtsPartsDir = zkPath(CUSTOM_EVTS_PARTS_DIR);
+ customEvtsAcksDir = zkPath(CUSTOM_EVTS_ACKS_DIR);
+ }
+
+ /**
+ * @param path Relative path.
+ * @return Full path.
+ */
+ private String zkPath(String path) {
+ return clusterDir + "/" + path;
+ }
+
+ /**
+ * @param nodeId Node ID.
+ * @param prefixId Unique prefix ID.
+ * @return Path.
+ */
+ String joiningNodeDataPath(UUID nodeId, UUID prefixId) {
+ return joinDataDir + '/' + prefixId + ":" + nodeId.toString();
+ }
+
+ /**
+ * @param path Alive node zk path.
+ * @return Node internal ID.
+ */
+ static long aliveInternalId(String path) {
+ int idx = path.lastIndexOf('|');
+
+ return Integer.parseInt(path.substring(idx + 1));
+ }
+
+ /**
+ * @param prefix Node unique path prefix.
+ * @param node Node.
+ * @return Path.
+ */
+ String aliveNodePathForCreate(String prefix, ZookeeperClusterNode node) {
+ byte flags = 0;
+
+ if (node.isClient())
+ flags |= CLIENT_NODE_FLAG_MASK;
+
+ return aliveNodesDir + "/" + prefix + ":" + node.id() + ":" + encodeFlags(flags) + "|";
+ }
+
+ /**
+ * @param path Alive node zk path.
+ * @return {@code True} if node is client.
+ */
+ static boolean aliveNodeClientFlag(String path) {
+ return (aliveFlags(path) & CLIENT_NODE_FLAG_MASK) != 0;
+ }
+
+ /**
+ * @param path Alive node zk path.
+ * @return Node ID.
+ */
+ static UUID aliveNodePrefixId(String path) {
+ return UUID.fromString(path.substring(0, ZkIgnitePaths.UUID_LEN));
+ }
+
+ /**
+ * @param path Alive node zk path.
+ * @return Node ID.
+ */
+ static UUID aliveNodeId(String path) {
+ // <uuid prefix>:<node id>:<flags>|<alive seq>
+ int startIdx = ZkIgnitePaths.UUID_LEN + 1;
+
+ String idStr = path.substring(startIdx, startIdx + ZkIgnitePaths.UUID_LEN);
+
+ return UUID.fromString(idStr);
+ }
+
+ /**
+ * @param path Event zk path.
+ * @return Event sequence number.
+ */
+ static int customEventSequence(String path) {
+ int idx = path.lastIndexOf('|');
+
+ return Integer.parseInt(path.substring(idx + 1));
+ }
+
+ /**
+ * @param path Custom event zl path.
+ * @return Event node ID.
+ */
+ static UUID customEventSendNodeId(String path) {
+ // <uuid prefix>:<node id>:<partCnt>|<seq>
+ int startIdx = ZkIgnitePaths.UUID_LEN + 1;
+
+ String idStr = path.substring(startIdx, startIdx + ZkIgnitePaths.UUID_LEN);
+
+ return UUID.fromString(idStr);
+ }
+
+ /**
+ * @param path Event path.
+ * @return Event unique prefix.
+ */
+ static String customEventPrefix(String path) {
+ // <uuid prefix>:<node id>:<partCnt>|<seq>
+
+ return path.substring(0, ZkIgnitePaths.UUID_LEN);
+ }
+
+ /**
+ * @param path Custom event zl path.
+ * @return Event node ID.
+ */
+ static int customEventPartsCount(String path) {
+ // <uuid prefix>:<node id>:<partCnt>|<seq>
+ int startIdx = 2 * ZkIgnitePaths.UUID_LEN + 2;
+
+ String cntStr = path.substring(startIdx, startIdx + 4);
+
+ int partCnt = Integer.parseInt(cntStr);
+
+ assert partCnt >= 1 : partCnt;
+
+ return partCnt;
+ }
+
+ /**
+ * @param prefix Prefix.
+ * @param nodeId Node ID.
+ * @param partCnt Parts count.
+ * @return Path.
+ */
+ String createCustomEventPath(String prefix, UUID nodeId, int partCnt) {
+ return customEvtsDir + "/" + prefix + ":" + nodeId + ":" + String.format("%04d", partCnt) + '|';
+ }
+
+ /**
+ * @param prefix Prefix.
+ * @param nodeId Node ID.
+ * @return Path.
+ */
+ String customEventPartsBasePath(String prefix, UUID nodeId) {
+ return customEvtsPartsDir + "/" + prefix + ":" + nodeId + ":";
+ }
+
+ /**
+ * @param prefix Prefix.
+ * @param nodeId Node ID.
+ * @param part Part number.
+ * @return Path.
+ */
+ String customEventPartPath(String prefix, UUID nodeId, int part) {
+ return customEventPartsBasePath(prefix, nodeId) + String.format("%04d", part);
+ }
+
+ /**
+ * @param evtId Event ID.
+ * @return Event zk path.
+ */
+ String joinEventDataPathForJoined(long evtId) {
+ return evtsPath + "/fj-" + evtId;
+ }
+
+ /**
+ * @param topVer Event topology version.
+ * @return Event zk path.
+ */
+ String joinEventSecuritySubjectPath(long topVer) {
+ return evtsPath + "/s-" + topVer;
+ }
+
+ /**
+ * @param origEvtId ID of original custom event.
+ * @return Path for custom event ack.
+ */
+ String ackEventDataPath(long origEvtId) {
+ assert origEvtId != 0;
+
+ return customEvtsAcksDir + "/" + String.valueOf(origEvtId);
+ }
+
+ /**
+ * @param id Future ID.
+ * @return Future path.
+ */
+ String distributedFutureBasePath(UUID id) {
+ return evtsPath + "/f-" + id;
+ }
+
+ /**
+ * @param id Future ID.
+ * @return Future path.
+ */
+ String distributedFutureResultPath(UUID id) {
+ return evtsPath + "/fr-" + id;
+ }
+
+ /**
+ * @param flags Flags.
+ * @return Flags string.
+ */
+ private static String encodeFlags(byte flags) {
+ int intVal = flags + 128;
+
+ String str = Integer.toString(intVal, 16);
+
+ if (str.length() == 1)
+ str = '0' + str;
+
+ assert str.length() == 2 : str;
+
+ return str;
+ }
+
+ /**
+ * @param path Alive node zk path.
+ * @return Flags.
+ */
+ private static byte aliveFlags(String path) {
+ int startIdx = path.lastIndexOf(':') + 1;
+
+ String flagsStr = path.substring(startIdx, startIdx + 2);
+
+ return (byte)(Integer.parseInt(flagsStr, 16) - 128);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalJoinErrorMessage.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalJoinErrorMessage.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalJoinErrorMessage.java
new file mode 100644
index 0000000..a73312c
--- /dev/null
+++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalJoinErrorMessage.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+/**
+ *
+ */
+class ZkInternalJoinErrorMessage implements ZkInternalMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ transient boolean notifyNode = true;
+
+ /** */
+ final long nodeInternalId;
+
+ /** */
+ final String err;
+
+ /**
+ * @param nodeInternalId Joining node internal ID.
+ * @param err Error message.
+ */
+ ZkInternalJoinErrorMessage(long nodeInternalId, String err) {
+ this.nodeInternalId = nodeInternalId;
+ this.err = err;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalMessage.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalMessage.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalMessage.java
new file mode 100644
index 0000000..c1d56f0
--- /dev/null
+++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalMessage.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.io.Serializable;
+
+/**
+ *
+ */
+interface ZkInternalMessage extends Serializable {
+ // No-op.
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoinEventDataForJoined.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoinEventDataForJoined.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoinEventDataForJoined.java
new file mode 100644
index 0000000..e4ae4ba0
--- /dev/null
+++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoinEventDataForJoined.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+class ZkJoinEventDataForJoined implements Serializable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private final List<ZookeeperClusterNode> top;
+
+ /** */
+ private final Map<Long, byte[]> discoData;
+
+ /** */
+ private final Map<Long, Long> dupDiscoData;
+
+ /**
+ * @param top Topology.
+ * @param discoData Discovery data.
+ */
+ ZkJoinEventDataForJoined(List<ZookeeperClusterNode> top, Map<Long, byte[]> discoData, @Nullable Map<Long, Long> dupDiscoData) {
+ assert top != null;
+ assert discoData != null && !discoData.isEmpty();
+
+ this.top = top;
+ this.discoData = discoData;
+ this.dupDiscoData = dupDiscoData;
+ }
+
+ byte[] discoveryDataForNode(long nodeOrder) {
+ assert discoData != null;
+
+ byte[] dataBytes = discoData.get(nodeOrder);
+
+ if (dataBytes != null)
+ return dataBytes;
+
+ assert dupDiscoData != null;
+
+ Long dupDataNode = dupDiscoData.get(nodeOrder);
+
+ assert dupDataNode != null;
+
+ dataBytes = discoData.get(dupDataNode);
+
+ assert dataBytes != null;
+
+ return dataBytes;
+ }
+
+ /**
+ * @return Current topology.
+ */
+ List<ZookeeperClusterNode> topology() {
+ assert top != null;
+
+ return top;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoinedNodeEvtData.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoinedNodeEvtData.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoinedNodeEvtData.java
new file mode 100644
index 0000000..3c367cf
--- /dev/null
+++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoinedNodeEvtData.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.io.Serializable;
+import java.util.UUID;
+
+/**
+ * Zk Joined Node Evt Data.
+ */
+public class ZkJoinedNodeEvtData implements Serializable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ final long topVer;
+
+ /** */
+ final long joinedInternalId;
+
+ /** */
+ final UUID nodeId;
+
+ /** */
+ final int joinDataPartCnt;
+
+ /** */
+ final int secSubjPartCnt;
+
+ /** */
+ final UUID joinDataPrefixId;
+
+ /** */
+ transient ZkJoiningNodeData joiningNodeData;
+
+ /**
+ * @param topVer Topology version for node join event.
+ * @param nodeId Joined node ID.
+ * @param joinedInternalId Joined node internal ID.
+ * @param joinDataPrefixId Join data unique prefix.
+ * @param joinDataPartCnt Join data part count.
+ * @param secSubjPartCnt Security subject part count.
+ */
+ ZkJoinedNodeEvtData(
+ long topVer,
+ UUID nodeId,
+ long joinedInternalId,
+ UUID joinDataPrefixId,
+ int joinDataPartCnt,
+ int secSubjPartCnt)
+ {
+ this.topVer = topVer;
+ this.nodeId = nodeId;
+ this.joinedInternalId = joinedInternalId;
+ this.joinDataPrefixId = joinDataPrefixId;
+ this.joinDataPartCnt = joinDataPartCnt;
+ this.secSubjPartCnt = secSubjPartCnt;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return "ZkJoinedNodeData [id=" + nodeId + ", order=" + topVer + ']';
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoiningNodeData.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoiningNodeData.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoiningNodeData.java
new file mode 100644
index 0000000..ff8311d
--- /dev/null
+++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoiningNodeData.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.io.Serializable;
+import java.util.Map;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ *
+ */
+class ZkJoiningNodeData implements Serializable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private int partCnt;
+
+ /** */
+ @GridToStringInclude
+ private ZookeeperClusterNode node;
+
+ /** */
+ @GridToStringInclude
+ private Map<Integer, Serializable> discoData;
+
+ /**
+ * @param partCnt Number of parts in multi-parts message.
+ */
+ ZkJoiningNodeData(int partCnt) {
+ this.partCnt = partCnt;
+ }
+
+ /**
+ * @param node Node.
+ * @param discoData Discovery data.
+ */
+ ZkJoiningNodeData(ZookeeperClusterNode node, Map<Integer, Serializable> discoData) {
+ assert node != null && node.id() != null : node;
+ assert discoData != null;
+
+ this.node = node;
+ this.discoData = discoData;
+ }
+
+ /**
+ * @return Number of parts in multi-parts message.
+ */
+ int partCount() {
+ return partCnt;
+ }
+
+ /**
+ * @return Node.
+ */
+ ZookeeperClusterNode node() {
+ return node;
+ }
+
+ /**
+ * @return Discovery data.
+ */
+ Map<Integer, Serializable> discoveryData() {
+ return discoData;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(ZkJoiningNodeData.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkNoServersMessage.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkNoServersMessage.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkNoServersMessage.java
new file mode 100644
index 0000000..626fe74
--- /dev/null
+++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkNoServersMessage.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+class ZkNoServersMessage implements DiscoverySpiCustomMessage, ZkInternalMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** {@inheritDoc} */
+ @Nullable @Override public DiscoverySpiCustomMessage ackMessage() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isMutable() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean stopProcess() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(ZkNoServersMessage.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkNodeValidateResult.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkNodeValidateResult.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkNodeValidateResult.java
new file mode 100644
index 0000000..2abfee3
--- /dev/null
+++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkNodeValidateResult.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+/**
+ *
+ */
+class ZkNodeValidateResult {
+ /** */
+ String err;
+
+ /** */
+ byte[] secSubjZipBytes;
+
+ /**
+ * @param err Error.
+ */
+ ZkNodeValidateResult(String err) {
+ this.err = err;
+ }
+
+ /**
+ * @param secSubjZipBytes Marshalled security subject.
+ */
+ ZkNodeValidateResult(byte[] secSubjZipBytes) {
+ this.secSubjZipBytes = secSubjZipBytes;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRunnable.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRunnable.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRunnable.java
new file mode 100644
index 0000000..965bdc0
--- /dev/null
+++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRunnable.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+/**
+ * Zk Runnable.
+ */
+public abstract class ZkRunnable extends ZkAbstractCallabck implements Runnable {
+ /**
+ * @param rtState Runtime state.
+ * @param impl Discovery impl.
+ */
+ ZkRunnable(ZkRuntimeState rtState, ZookeeperDiscoveryImpl impl) {
+ super(rtState, impl);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void run() {
+ if (!onProcessStart())
+ return;
+
+ try {
+ run0();
+
+ onProcessEnd();
+ }
+ catch (Throwable e) {
+ onProcessError(e);
+ }
+ }
+
+ /**
+ *
+ */
+ protected abstract void run0() throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java
new file mode 100644
index 0000000..cb04ac3
--- /dev/null
+++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.util.List;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.spi.IgniteSpiTimeoutObject;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.Watcher;
+
+/**
+ *
+ */
+class ZkRuntimeState {
+ /** */
+ ZkWatcher watcher;
+
+ /** */
+ ZkAliveNodeDataWatcher aliveNodeDataWatcher;
+
+ /** */
+ volatile Exception errForClose;
+
+ /** */
+ final boolean prevJoined;
+
+ /** */
+ ZookeeperClient zkClient;
+
+ /** */
+ long internalOrder;
+
+ /** */
+ int joinDataPartCnt;
+
+ /** */
+ long gridStartTime;
+
+ /** */
+ volatile boolean joined;
+
+ /** */
+ ZkDiscoveryEventsData evtsData;
+
+ /** */
+ boolean crd;
+
+ /** */
+ String locNodeZkPath;
+
+ /** */
+ final ZkAliveNodeData locNodeInfo = new ZkAliveNodeData();
+
+ /** */
+ int procEvtCnt;
+
+ /** */
+ final ZkClusterNodes top = new ZkClusterNodes();
+
+ /** */
+ List<ClusterNode> commErrProcNodes;
+
+ /** Timeout callback registering watcher for join error
+ * (set this watcher after timeout as a minor optimization).
+ */
+ ZkTimeoutObject joinErrTo;
+
+ /** Timeout callback set to wait for join timeout. */
+ ZkTimeoutObject joinTo;
+
+ /** Timeout callback to update processed events counter. */
+ ZkTimeoutObject procEvtsUpdateTo;
+
+ /** */
+ boolean updateAlives;
+
+ /**
+ * @param prevJoined {@code True} if joined topology before reconnect attempt.
+ */
+ ZkRuntimeState(boolean prevJoined) {
+ this.prevJoined = prevJoined;
+ }
+
+ /**
+ * @param watcher Watcher.
+ * @param aliveNodeDataWatcher Alive nodes data watcher.
+ */
+ void init(ZkWatcher watcher, ZkAliveNodeDataWatcher aliveNodeDataWatcher) {
+ this.watcher = watcher;
+ this.aliveNodeDataWatcher = aliveNodeDataWatcher;
+ }
+
+ /**
+ * @param err Error.
+ */
+ void onCloseStart(Exception err) {
+ assert err != null;
+
+ errForClose = err;
+
+ ZookeeperClient zkClient = this.zkClient;
+
+ if (zkClient != null)
+ zkClient.onCloseStart();
+ }
+
+ /**
+ *
+ */
+ interface ZkWatcher extends Watcher, AsyncCallback.Children2Callback, AsyncCallback.DataCallback {
+ // No-op.
+ }
+
+ /**
+ *
+ */
+ interface ZkAliveNodeDataWatcher extends Watcher, AsyncCallback.DataCallback {
+ // No-op.
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkTimeoutObject.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkTimeoutObject.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkTimeoutObject.java
new file mode 100644
index 0000000..4d3d5b4
--- /dev/null
+++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkTimeoutObject.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.spi.IgniteSpiTimeoutObject;
+
+/**
+ *
+ */
+abstract class ZkTimeoutObject implements IgniteSpiTimeoutObject {
+ /** */
+ private final IgniteUuid id = IgniteUuid.randomUuid();
+
+ /** */
+ private final long endTime;
+
+ /** */
+ volatile boolean cancelled;
+
+ /**
+ * @param timeout Timeout.
+ */
+ ZkTimeoutObject(long timeout) {
+ long endTime = timeout >= 0 ? System.currentTimeMillis() + timeout : Long.MAX_VALUE;
+
+ this.endTime = endTime >= 0 ? endTime : Long.MAX_VALUE;
+ }
+
+ /** {@inheritDoc} */
+ @Override public final IgniteUuid id() {
+ return id;
+ }
+
+ /** {@inheritDoc} */
+ @Override public final long endTime() {
+ return endTime;
+ }
+}