You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/11/14 13:02:33 UTC
[1/3] ignite git commit: zk
Repository: ignite
Updated Branches:
refs/heads/ignite-zk 8ab3b5667 -> 73f5af60c
zk
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3736abe2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3736abe2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3736abe2
Branch: refs/heads/ignite-zk
Commit: 3736abe27a080ac7962dd8ede9e7b7414c4a82ae
Parents: 8ab3b56
Author: sboikov <sb...@gridgain.com>
Authored: Tue Nov 14 14:14:25 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Nov 14 14:14:25 2017 +0300
----------------------------------------------------------------------
.../discovery/GridDiscoveryManager.java | 5 ++-
.../GridDhtPartitionsExchangeFuture.java | 3 +-
.../spi/discovery/zk/ZookeeperClusterNode.java | 15 ++++++++-
.../spi/discovery/zk/ZookeeperDiscoverySpi.java | 32 ++++++++++++++------
.../zk/ZookeeperDiscoverySpiBasicTest.java | 6 +++-
5 files changed, 45 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/3736abe2/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 022dc97..a6737dc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -2522,9 +2522,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
switch (type) {
case EVT_NODE_JOINED: {
-// TODO ZK
-// assert !discoOrdered || topVer.topologyVersion() == node.order() : "Invalid topology version [topVer=" + topVer +
-// ", node=" + node + ']';
+ assert !discoOrdered || topVer.topologyVersion() == node.order() : "Invalid topology version [topVer=" + topVer +
+ ", node=" + node + ']';
try {
checkAttributes(F.asList(node));
http://git-wip-us.apache.org/repos/asf/ignite/blob/3736abe2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 5d9186b..d29293e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -1598,8 +1598,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
mergedJoinExchMsgs = new LinkedHashMap<>();
if (msg != null) {
- // TODO ZK
- // assert msg.exchangeId().topologyVersion().equals(new AffinityTopologyVersion(node.order()));
+ assert msg.exchangeId().topologyVersion().equals(new AffinityTopologyVersion(node.order()));
if (log.isInfoEnabled()) {
log.info("Merge server join exchange, message received [curFut=" + initialVersion() +
http://git-wip-us.apache.org/repos/asf/ignite/blob/3736abe2/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperClusterNode.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperClusterNode.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperClusterNode.java
index dc0f374..ae638b8 100644
--- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperClusterNode.java
+++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperClusterNode.java
@@ -47,6 +47,9 @@ public class ZookeeperClusterNode implements ClusterNode, Serializable {
private Serializable consistentId;
/** */
+ private long internalOrder;
+
+ /** */
private long order;
/** */
@@ -154,9 +157,19 @@ public class ZookeeperClusterNode implements ClusterNode, Serializable {
return order;
}
+ public long internalOrder() {
+ return internalOrder;
+ }
+
/**
- * @param order Order of the node.
+ * @param internalOrder Order of the node.
*/
+ public void internalOrder(long internalOrder) {
+ assert internalOrder > 0 : internalOrder;
+
+ this.internalOrder = internalOrder;
+ }
+
public void order(long order) {
assert order > 0 : order;
http://git-wip-us.apache.org/repos/asf/ignite/blob/3736abe2/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
index ce04d33..bae183d 100644
--- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
+++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
@@ -380,12 +380,18 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery
zkCurator.getCuratorListenable().addListener(new CuratorListener() {
@Override public void eventReceived(CuratorFramework client, CuratorEvent evt) throws Exception {
- log.info("Curator event: " + evt.getType());
+ IgniteLogger log = ZookeeperDiscoverySpi.this.log;
+
+ if (log != null)
+ log.info("Curator event: " + evt.getType());
}
});
zkCurator.getConnectionStateListenable().addListener(new ConnectionStateListener() {
@Override public void stateChanged(CuratorFramework client, ConnectionState newState) {
- log.info("Curator event, connection: " + newState);
+ IgniteLogger log = ZookeeperDiscoverySpi.this.log;
+
+ if (log != null)
+ log.info("Curator event, connection: " + newState);
}
});
@@ -644,7 +650,7 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery
assert joinData != null && joinData.node != null && joinData.joiningNodeData != null : joinData;
- joinData.node.order(data.order);
+ joinData.node.internalOrder(data.order);
data.joinData = joinData;
}
@@ -823,7 +829,9 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery
ZKJoiningNodeData joinData = joined.joinData;
synchronized (curTop) {
- curTop.put(joinData.node.order(), joinData.node);
+ joinData.node.order(v);
+
+ curTop.put(joinData.node.internalOrder(), joinData.node);
ZKDiscoveryEvent joinEvt = new ZKDiscoveryEvent(EventType.EVT_NODE_JOINED,
v,
@@ -963,6 +971,9 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery
locJoin = e.evtType == EventType.EVT_NODE_JOINED && e.node.id().equals(locNode.id());
if (locJoin) {
+ assert e.node.order() > 0 && e.node.internalOrder() > 0 : e.node;
+
+ locNode.internalOrder(e.node.internalOrder());
locNode.order(e.node.order());
fireEvt = true;
@@ -980,9 +991,9 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery
synchronized (curTop) {
if (locJoin) {
for (ZookeeperClusterNode node : e.allNodes) {
- assert node.order() > 0 : node;
+ assert node.internalOrder() > 0 : node;
- Object old = curTop.put(node.order(), node);
+ Object old = curTop.put(node.internalOrder(), node);
assert old == null : node;
}
@@ -1006,7 +1017,7 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery
exchange.onExchange(dataBag);
- Object old = curTop.put(node.order(), node);
+ Object old = curTop.put(node.internalOrder(), node);
assert old == null : node;
@@ -1016,7 +1027,7 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery
case EventType.EVT_NODE_FAILED: {
ZookeeperClusterNode node = e.node;
- Object failedNode = curTop.remove(node.order());
+ Object failedNode = curTop.remove(node.internalOrder());
assert failedNode != null : node;
@@ -1160,7 +1171,10 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery
private class ZookeeperWatcher implements Watcher {
/** {@inheritDoc} */
@Override public void process(WatchedEvent event) {
- log.info("Process event [type=" + event.getType() + ", state=" + event.getState() + ", path=" + event.getPath() + ']');
+ IgniteLogger log = ZookeeperDiscoverySpi.this.log;
+
+ if (log != null)
+ log.info("Process event [type=" + event.getType() + ", state=" + event.getState() + ", path=" + event.getPath() + ']');
if (event.getType() == Event.EventType.NodeChildrenChanged) {
zk.getChildren(event.getPath(), this, nodesUpdateCallback, null);
http://git-wip-us.apache.org/repos/asf/ignite/blob/3736abe2/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java
index dfb2199..b4db065 100644
--- a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java
+++ b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java
@@ -78,6 +78,9 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest {
private boolean testSockNio;
/** */
+ private int sesTimeout;
+
+ /** */
private ConcurrentHashMap<String, ZookeeperDiscoverySpi> spis = new ConcurrentHashMap<>();
/** {@inheritDoc} */
@@ -91,7 +94,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest {
ZookeeperDiscoverySpi zkSpi = new ZookeeperDiscoverySpi();
- zkSpi.setSessionTimeout(10_000);
+ zkSpi.setSessionTimeout(sesTimeout > 0 ? sesTimeout : 10_000);
spis.put(igniteInstanceName, zkSpi);
@@ -386,6 +389,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest {
* @throws Exception If failed.
*/
private void connectionRestore_Coordinator(int initNodes, int startNodes, int failCnt) throws Exception {
+ sesTimeout = 30_000;
testSockNio = true;
Ignite node0 = startGrids(initNodes);
[2/3] ignite git commit: zk
Posted by sb...@apache.org.
zk
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1842bb4c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1842bb4c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1842bb4c
Branch: refs/heads/ignite-zk
Commit: 1842bb4c48ce245a5b69b669087590351de686fa
Parents: 3736abe
Author: sboikov <sb...@gridgain.com>
Authored: Tue Nov 14 15:07:55 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Nov 14 15:10:16 2017 +0300
----------------------------------------------------------------------
.../spi/discovery/zk/ZookeeperDiscoverySpi.java | 33 +++++++++++---------
1 file changed, 19 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/1842bb4c/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
index bae183d..80d563e 100644
--- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
+++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
@@ -97,6 +97,9 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery
private static final String ALIVE_NODES_PATH = CLUSTER_PATH + "/alive";
/** */
+ private static final String CUSTOM_EVTS_PATH = CLUSTER_PATH + "/customEvts";
+
+ /** */
private static final byte[] EMPTY_BYTES = new byte[0];
/** */
@@ -127,7 +130,7 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery
private final JdkMarshaller marsh = new JdkMarshaller();
/** */
- private final NodesUpdateCallback nodesUpdateCallback;
+ private final ZKChildrenUpdateCallback zkChildrenUpdateCallback;
/** */
private final DataUpdateCallback dataUpdateCallback;
@@ -166,7 +169,7 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery
public ZookeeperDiscoverySpi() {
zkWatcher = new ZookeeperWatcher();
- nodesUpdateCallback = new NodesUpdateCallback();
+ zkChildrenUpdateCallback = new ZKChildrenUpdateCallback();
dataUpdateCallback = new DataUpdateCallback();
}
@@ -307,7 +310,12 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery
/** {@inheritDoc} */
@Override public void sendCustomEvent(DiscoverySpiCustomMessage msg) throws IgniteException {
// TODO ZK
- //throw new UnsupportedOperationException();
+ try {
+ zkCurator.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath(CUSTOM_EVTS_PATH, marshal(msg));
+ }
+ catch (Exception e) {
+ throw new IgniteSpiException(e);
+ }
}
/** {@inheritDoc} */
@@ -318,8 +326,7 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery
/** {@inheritDoc} */
@Override public boolean isClientMode() throws IllegalStateException {
- // TODO ZK
- return false;
+ return locNode.isClient();
}
/**
@@ -354,11 +361,9 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery
}
private boolean igniteClusterStarted() throws Exception {
- boolean started = zkCurator.checkExists().forPath(IGNITE_PATH) != null &&
+ return zkCurator.checkExists().forPath(IGNITE_PATH) != null &&
zkCurator.checkExists().forPath(ALIVE_NODES_PATH) != null &&
!zk.getChildren(ALIVE_NODES_PATH, false).isEmpty();
-
- return started;
}
/** {@inheritDoc} */
@@ -454,8 +459,8 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery
gridStartTime = clusterData.gridStartTime;
zk.getData(EVENTS_PATH, zkWatcher, dataUpdateCallback, null);
- zk.getChildren(JOIN_HIST_PATH, zkWatcher, nodesUpdateCallback, null);
- zk.getChildren(ALIVE_NODES_PATH, zkWatcher, nodesUpdateCallback, null);
+ zk.getChildren(JOIN_HIST_PATH, zkWatcher, zkChildrenUpdateCallback, null);
+ zk.getChildren(ALIVE_NODES_PATH, zkWatcher, zkChildrenUpdateCallback, null);
List<Op> joinOps = new ArrayList<>();
@@ -693,7 +698,7 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery
/**
*
*/
- class NodesUpdateCallback implements AsyncCallback.Children2Callback {
+ class ZKChildrenUpdateCallback implements AsyncCallback.Children2Callback {
@Override public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
try {
if (children == null || children.isEmpty())
@@ -756,8 +761,8 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery
private final TreeMap<Long, ZookeeperClusterNode> curTop = new TreeMap<>();
/**
- * @param oldNodes
- * @param newNodes
+ * @param oldNodes Previous processed state.
+ * @param newNodes Current state.
*/
private void generateEvents(ZKAliveNodes oldNodes, ZKAliveNodes newNodes) {
assert newNodes != null;
@@ -1177,7 +1182,7 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery
log.info("Process event [type=" + event.getType() + ", state=" + event.getState() + ", path=" + event.getPath() + ']');
if (event.getType() == Event.EventType.NodeChildrenChanged) {
- zk.getChildren(event.getPath(), this, nodesUpdateCallback, null);
+ zk.getChildren(event.getPath(), this, zkChildrenUpdateCallback, null);
} else if (event.getType() == Event.EventType.NodeDataChanged) {
zk.getData(event.getPath(), this, dataUpdateCallback, null);
}
[3/3] ignite git commit: zk
Posted by sb...@apache.org.
zk
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/73f5af60
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/73f5af60
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/73f5af60
Branch: refs/heads/ignite-zk
Commit: 73f5af60cc6701a88712735f94f14f4fe0cdd92c
Parents: 1842bb4
Author: sboikov <sb...@gridgain.com>
Authored: Tue Nov 14 15:41:47 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Nov 14 16:01:20 2017 +0300
----------------------------------------------------------------------
.../spi/discovery/zk/ZookeeperDiscoverySpi.java | 76 ++++++++++++++------
1 file changed, 56 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/73f5af60/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
index 80d563e..cc0e6a4 100644
--- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
+++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
@@ -26,6 +26,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
+import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -100,6 +101,9 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery
private static final String CUSTOM_EVTS_PATH = CLUSTER_PATH + "/customEvts";
/** */
+ private static final String DISCO_EVTS_HIST_PATH = CLUSTER_PATH + "/evtsHist";
+
+ /** */
private static final byte[] EMPTY_BYTES = new byte[0];
/** */
@@ -439,6 +443,8 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery
initOps.add(Op.create(JOIN_HIST_PATH, EMPTY_BYTES, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
initOps.add(Op.create(ALIVE_NODES_PATH, EMPTY_BYTES, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
initOps.add(Op.create(EVENTS_PATH, EMPTY_BYTES, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
+ initOps.add(Op.create(DISCO_EVTS_HIST_PATH, EMPTY_BYTES, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
+ initOps.add(Op.create(CUSTOM_EVTS_PATH, EMPTY_BYTES, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
zk.multi(initOps);
}
@@ -569,17 +575,13 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery
final UUID nodeId;
/** */
- final String zkPath;
-
- /** */
transient ZKJoiningNodeData joinData;
/**
* @param order Node order.
* @param nodeId Node ID.
*/
- ZKNodeData(String zkPath, long order, UUID nodeId) {
- this.zkPath = zkPath;
+ ZKNodeData(long order, UUID nodeId) {
this.order = order;
this.nodeId = nodeId;
}
@@ -602,9 +604,6 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery
@GridToStringInclude
final TreeMap<Long, ZKNodeData> nodesByOrder;
- /** */
- final TreeMap<UUID, ZKNodeData> nodesById;
-
/**
* @param ver
* @param nodesByOrder
@@ -612,11 +611,15 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery
ZKAliveNodes(int ver, TreeMap<Long, ZKNodeData> nodesByOrder) {
this.ver = ver;
this.nodesByOrder = nodesByOrder;
+ }
- nodesById = new TreeMap<>();
+ ZKNodeData nodeById(UUID nodeId) {
+ for (ZKNodeData nodeData : nodesByOrder.values()) {
+ if (nodeId.equals(nodeData.nodeId))
+ return nodeData;
+ }
- for (ZKNodeData nodeData : nodesByOrder.values())
- nodesById.put(nodeData.nodeId, nodeData);
+ return null;
}
/** {@inheritDoc} */
@@ -636,7 +639,7 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery
int nodeOrder = Integer.parseInt(path.substring(ID_LEN + 1)) + 1;
- return new ZKNodeData(path, nodeOrder, nodeId);
+ return new ZKNodeData(nodeOrder, nodeId);
}
/** */
@@ -767,7 +770,7 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery
private void generateEvents(ZKAliveNodes oldNodes, ZKAliveNodes newNodes) {
assert newNodes != null;
- ZKNodeData locNode = newNodes.nodesById.get(this.locNode.id());
+ ZKNodeData locNode = newNodes.nodeById(this.locNode.id());
if (locNode == null)
return;
@@ -912,15 +915,22 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery
if (curCrdEvts == null) {
expVer = 0;
- newEvents = new ZKDiscoveryEvents(nextJoinOrder, newNodes, evts);
+ newEvents = new ZKDiscoveryEvents(nextJoinOrder, newNodes, new TreeSet<>(evts.keySet()));
}
else {
- TreeMap<Integer, ZKDiscoveryEvent> evts0 = new TreeMap<>(curCrdEvts.evts);
+// TreeMap<Integer, ZKDiscoveryEvent> evts0 = new TreeMap<>(curCrdEvts.evts);
+//
+// for (ZKDiscoveryEvent e : evts.values()) {
+// assert !evts0.containsKey(e.topVer) : "[newEvt=" + e + ", oldEvt=" + evts0.get(e.topVer) + ']';
+//
+// evts0.put(e.topVer, e);
+// }
+ TreeSet<Integer> evts0 = new TreeSet<>(curCrdEvts.evts);
for (ZKDiscoveryEvent e : evts.values()) {
- assert !evts0.containsKey(e.topVer) : "[newEvt=" + e + ", oldEvt=" + evts0.get(e.topVer) + ']';
+ assert !evts0.contains(e.topVer) : "[newEvt=" + e + ", oldEvts=" + evts0 + ']';
- evts0.put(e.topVer, e);
+ evts0.add(e.topVer);
}
newEvents = new ZKDiscoveryEvents(nextJoinOrder, newNodes, evts0);
@@ -928,6 +938,14 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery
expVer = curCrdEvts.ver;
}
+ try {
+ for (ZKDiscoveryEvent evt : evts.values())
+ zkCurator.create().withMode(CreateMode.PERSISTENT).forPath(DISCO_EVTS_HIST_PATH + "/" + evt.topVer, marshal(evt));
+ }
+ catch (Exception e) {
+ U.error(log, "Failed to create discovery event node: " + e, e);
+ }
+
newEvents.ver = expVer + 1;
try {
@@ -951,6 +969,8 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery
/** */
private ZKDiscoveryEvent lastEvt;
+ private int lastProcessed = -1;
+
/**
*
*/
@@ -968,10 +988,26 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery
newEvts.ver = stat.getVersion();
- for (ZKDiscoveryEvent e : newEvts.evts.values()) {
+ for (Integer evtVer : newEvts.evts) {
+ if (evtVer > lastProcessed)
+ lastProcessed = evtVer;
+ else
+ continue;
+
boolean fireEvt;
boolean locJoin = false;
+ ZKDiscoveryEvent e;
+
+ try {
+ e = unmarshal(zkCurator.getData().forPath(DISCO_EVTS_HIST_PATH + "/" + evtVer));
+ }
+ catch (Exception err) {
+ U.error(log, "Failed to read discovery event: " + err, err);
+
+ continue;
+ }
+
if (lastEvt == null) {
locJoin = e.evtType == EventType.EVT_NODE_JOINED && e.node.id().equals(locNode.id());
@@ -1095,7 +1131,7 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery
/** */
@GridToStringInclude
- final TreeMap<Integer, ZKDiscoveryEvent> evts;
+ final TreeSet<Integer> evts;
/** */
final long nextJoinOrder;
@@ -1104,7 +1140,7 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery
* @param aliveNodes
* @param evts
*/
- ZKDiscoveryEvents(long nextJoinOrder, ZKAliveNodes aliveNodes, TreeMap<Integer, ZKDiscoveryEvent> evts) {
+ ZKDiscoveryEvents(long nextJoinOrder, ZKAliveNodes aliveNodes, TreeSet<Integer> evts) {
this.nextJoinOrder = nextJoinOrder;
this.aliveNodes = aliveNodes;
this.evts = evts;