You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/11/22 10:23:30 UTC
[14/15] ignite git commit: zk
zk
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e0aba812
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e0aba812
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e0aba812
Branch: refs/heads/ignite-zk
Commit: e0aba812643c0d773359a95b514daead9730ee6e
Parents: 4090eb7
Author: sboikov <sb...@gridgain.com>
Authored: Wed Nov 22 11:47:55 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Nov 22 13:21:21 2017 +0300
----------------------------------------------------------------------
.../zk/internal/ZkDiscoveryCustomEventData.java | 12 +-
.../zk/internal/ZkDiscoveryEventData.java | 7 +
.../discovery/zk/internal/ZkIgnitePaths.java | 11 +-
.../zk/internal/ZookeeperDiscoveryImpl.java | 226 ++++++++++++-------
.../zk/ZookeeperDiscoverySpiBasicTest.java | 4 +-
5 files changed, 177 insertions(+), 83 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/e0aba812/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java
index 1346c24..5668428 100644
--- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java
+++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java
@@ -27,6 +27,9 @@ import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
*/
class ZkDiscoveryCustomEventData extends ZkDiscoveryEventData {
/** */
+ private static final int CUSTOM_MSG_ACK_FLAG = 1;
+
+ /** */
final UUID sndNodeId;
/** */
@@ -41,7 +44,7 @@ class ZkDiscoveryCustomEventData extends ZkDiscoveryEventData {
* @param sndNodeId Sender node ID.
* @param evtPath Event path.
*/
- ZkDiscoveryCustomEventData(long evtId, long topVer, UUID sndNodeId, String evtPath) {
+ ZkDiscoveryCustomEventData(long evtId, long topVer, UUID sndNodeId, String evtPath, boolean ack) {
super(evtId, DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT, topVer);
assert sndNodeId != null;
@@ -49,6 +52,13 @@ class ZkDiscoveryCustomEventData extends ZkDiscoveryEventData {
this.sndNodeId = sndNodeId;
this.evtPath = evtPath;
+
+ if (ack)
+ flags |= CUSTOM_MSG_ACK_FLAG;
+ }
+
+ boolean ackEvent() {
+ return flagSet(CUSTOM_MSG_ACK_FLAG);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/e0aba812/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java
index 9f18f4f..00330e4 100644
--- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java
+++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java
@@ -45,6 +45,9 @@ abstract class ZkDiscoveryEventData implements Serializable {
/** */
private transient Set<Integer> remainingAcks;
+ /** */
+ int flags;
+
/**
* @param evtType Event type.
* @param topVer Topology version.
@@ -89,6 +92,10 @@ abstract class ZkDiscoveryEventData implements Serializable {
return remainingAcks.isEmpty();
}
+ boolean flagSet(int flag) {
+ return (flags & flag) == flag;
+ }
+
long eventId() {
return evtId;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e0aba812/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
index ad35c05..591f18d 100644
--- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
+++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
@@ -33,6 +33,9 @@ class ZkIgnitePaths {
private static final String CUSTOM_EVTS_DIR = "customEvts";
/** */
+ private static final String CUSTOM_EVTS_ACKS_DIR = "customEvtsAcks";
+
+ /** */
private static final String ALIVE_NODES_DIR = "alive";
/** */
@@ -59,6 +62,9 @@ class ZkIgnitePaths {
/** */
final String customEvtsDir;
+ /** */
+ final String customEvtsAcksDir;
+
/**
* @param basePath Base directory.
* @param clusterName Cluster name.
@@ -73,6 +79,7 @@ class ZkIgnitePaths {
joinDataDir = zkPath(JOIN_DATA_DIR);
evtsPath = zkPath(DISCO_EVENTS_PATH);
customEvtsDir = zkPath(CUSTOM_EVTS_DIR);
+ customEvtsAcksDir = zkPath(CUSTOM_EVTS_ACKS_DIR);
}
/**
@@ -122,7 +129,7 @@ class ZkIgnitePaths {
return evtsPath + "/joined-" + evtId;
}
- String customEventDataPath(String child) {
- return customEvtsDir + "/" + child;
+ String customEventDataPath(boolean ack, String child) {
+ return ack ? customEvtsAcksDir : customEvtsDir + "/" + child;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e0aba812/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
index 8246e19..5e9c5a3 100644
--- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
+++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
@@ -232,6 +232,7 @@ public class ZookeeperDiscoveryImpl {
}
try {
+ // TODO ZK: handle retries.
zkClient.createIfNeeded(zkPaths.customEvtsDir + "/" + locNode.id() + '|', msgBytes, CreateMode.PERSISTENT_SEQUENTIAL);
}
catch (ZookeeperClientFailedException e) {
@@ -347,6 +348,7 @@ public class ZookeeperDiscoveryImpl {
private void startJoin(byte[] joinDataBytes) throws InterruptedException {
try {
// TODO ZK: handle max size.
+ // TODO ZK: handle retries.
String path = zkClient.createIfNeeded(zkPaths.joinDataDir + "/" + locNode.id() + "|",
joinDataBytes,
EPHEMERAL_SEQUENTIAL);
@@ -494,7 +496,7 @@ public class ZookeeperDiscoveryImpl {
byte[] evtsData = zkClient.getData(zkPaths.evtsPath);
if (evtsData.length > 0)
- onEventsUpdate(evtsData);
+ processNewEvents(evtsData);
crd = true;
@@ -511,13 +513,9 @@ public class ZookeeperDiscoveryImpl {
ZkDiscoveryEventData evtData = it.next();
evtData.remainingAcks(top.nodesByOrder.values());
-
- if (evtData.allAcksReceived()) {
- processNodesAckEvent(evtData);
-
- it.remove();
- }
}
+
+ handleProcessedEvents();
}
else {
if (log.isInfoEnabled())
@@ -586,15 +584,17 @@ public class ZookeeperDiscoveryImpl {
Iterator<ZkDiscoveryEventData> it = evtsData.evts.values().iterator();
+ boolean processed = false;
+
while (it.hasNext()) {
ZkDiscoveryEventData evtData = it.next();
- if (evtData.onAckReceived(nodeInternalId, nodeData.lastProcEvt)) {
- processNodesAckEvent(evtData);
-
- it.remove();
- }
+ if (evtData.onAckReceived(nodeInternalId, nodeData.lastProcEvt))
+ processed = true;
}
+
+ if (processed)
+ handleProcessedEvents();
}
}
catch (Throwable e) {
@@ -647,18 +647,24 @@ public class ZookeeperDiscoveryImpl {
}
}
- if (newEvts) {
- long start = System.currentTimeMillis();
+ if (newEvts)
+ saveAndProcessNewEvents();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ private void saveAndProcessNewEvents() throws Exception {
+ long start = System.currentTimeMillis();
- zkClient.setData(zkPaths.evtsPath, marsh.marshal(evtsData), -1);
+ zkClient.setData(zkPaths.evtsPath, marsh.marshal(evtsData), -1);
- long time = System.currentTimeMillis() - start;
+ long time = System.currentTimeMillis() - start;
- if (log.isInfoEnabled())
- log.info("Discovery coordinator saved new topology events [topVer=" + evtsData.topVer + ", saveTime=" + time + ']');
+ if (log.isInfoEnabled())
+ log.info("Discovery coordinator saved new topology events [topVer=" + evtsData.topVer + ", saveTime=" + time + ']');
- onEventsUpdate(evtsData);
- }
+ processNewEvents(evtsData);
}
/**
@@ -877,7 +883,8 @@ public class ZookeeperDiscoveryImpl {
evtsData.evtIdGen,
evtsData.topVer,
sndNodeId,
- evtE.getValue());
+ evtE.getValue(),
+ false);
evtData.msg = msg;
@@ -899,16 +906,7 @@ public class ZookeeperDiscoveryImpl {
evtsData.procCustEvt = evtE.getKey();
}
- long start = System.currentTimeMillis();
-
- zkClient.setData(zkPaths.evtsPath, marsh.marshal(evtsData), -1);
-
- long time = System.currentTimeMillis() - start;
-
- if (log.isInfoEnabled())
- log.info("Discovery coordinator saved new topology events [topVer=" + evtsData.topVer + ", saveTime=" + time + ']');
-
- onEventsUpdate(evtsData);
+ saveAndProcessNewEvents();
}
}
@@ -916,7 +914,7 @@ public class ZookeeperDiscoveryImpl {
* @param data Marshalled events.
* @throws Exception If failed.
*/
- private void onEventsUpdate(byte[] data) throws Exception {
+ private void processNewEvents(byte[] data) throws Exception {
if (data.length == 0)
return;
@@ -924,7 +922,7 @@ public class ZookeeperDiscoveryImpl {
ZkDiscoveryEventsData evtsData = unmarshal(data);
- onEventsUpdate(evtsData);
+ processNewEvents(evtsData);
this.evtsData = evtsData;
}
@@ -937,7 +935,7 @@ public class ZookeeperDiscoveryImpl {
* @throws Exception If failed.
*/
@SuppressWarnings("unchecked")
- private void onEventsUpdate(ZkDiscoveryEventsData evtsData) throws Exception {
+ private void processNewEvents(ZkDiscoveryEventsData evtsData) throws Exception {
TreeMap<Long, ZkDiscoveryEventData> evts = evtsData.evts;
boolean updateNodeInfo = false;
@@ -1002,6 +1000,9 @@ public class ZookeeperDiscoveryImpl {
case DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT: {
ZkDiscoveryCustomEventData evtData0 = (ZkDiscoveryCustomEventData)evtData;
+ if (evtData0.ackEvent() && evtData0.topologyVersion() < locNode.order())
+ break;
+
DiscoverySpiCustomMessage msg;
if (crd) {
@@ -1010,27 +1011,25 @@ public class ZookeeperDiscoveryImpl {
msg = evtData0.msg;
}
else {
- String path = zkPaths.customEventDataPath(evtData0.evtPath);
+ String path = zkPaths.customEventDataPath(evtData0.ackEvent(),
+ evtData0.evtPath);
msg = unmarshal(zkClient.getData(path));
+
+ evtData0.msg = msg;
}
notifyCustomEvent(evtData0, msg);
+ if (!evtData0.ackEvent())
+ updateNodeInfo = true;
+
break;
}
default:
assert false : "Invalid event: " + evtData;
}
-
- if (crd) {
- if (evtData.allAcksReceived()) {
- processNodesAckEvent(evtData);
-
- it.remove();
- }
- }
}
if (joined) {
@@ -1043,13 +1042,93 @@ public class ZookeeperDiscoveryImpl {
}
}
- if (!crd && updateNodeInfo) {
+ if (crd) {
+ handleProcessedEvents();
+ }
+ else if (updateNodeInfo) {
assert locNodeZkPath != null;
zkClient.setData(locNodeZkPath, marshal(locNodeInfo), -1);
}
}
+ /**
+ * @throws Exception If failed.
+ */
+ private void handleProcessedEvents() throws Exception {
+ Iterator<ZkDiscoveryEventData> it = this.evtsData.evts.values().iterator();
+
+ List<ZkDiscoveryCustomEventData> newEvts = null;
+
+ while (it.hasNext()) {
+ ZkDiscoveryEventData evtData = it.next();
+
+ if (evtData.allAcksReceived()) {
+ switch (evtData.eventType()) {
+ case EventType.EVT_NODE_JOINED: {
+ processNodesAckJoinEvent((ZkDiscoveryNodeJoinEventData)evtData);
+
+ break;
+ }
+
+ case DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT: {
+ DiscoverySpiCustomMessage ack = handleProcessedCustomEvent((ZkDiscoveryCustomEventData)evtData);
+
+ if (ack != null) {
+ evtsData.evtIdGen++;
+
+ long evtId = evtsData.evtIdGen;
+
+ byte[] ackBytes = marshal(ack);
+
+ String evtChildPath = String.valueOf(evtId);
+
+ zkClient.createIfNeeded(
+ zkPaths.customEventDataPath(true, evtChildPath),
+ ackBytes,
+ CreateMode.PERSISTENT);
+
+ ZkDiscoveryCustomEventData ackEvtData = new ZkDiscoveryCustomEventData(
+ evtId,
+ evtData.topologyVersion(), // Use topology version from original event.
+ locNode.id(),
+ evtChildPath,
+ true);
+
+ ackEvtData.msg = ack;
+
+ if (newEvts == null)
+ newEvts = new ArrayList<>();
+
+ newEvts.add(ackEvtData);
+
+ if (log.isInfoEnabled())
+ log.info("Generated CUSTOM event [topVer=" + evtData.topologyVersion() + ", evt=" + ack + ']');
+ }
+
+ break;
+ }
+
+ case EventType.EVT_NODE_FAILED: {
+ log.info("All nodes processed node fail [evtId=" + evtData.eventId() + ']');
+
+ // Do not need cleanup.
+ break;
+ }
+ }
+
+ it.remove();
+ }
+ }
+
+ if (newEvts != null) {
+ for (int i = 0; i < newEvts.size(); i++)
+ evtsData.addEvent(top.nodesByOrder.values(), newEvts.get(i));
+
+ saveAndProcessNewEvents();
+ }
+ }
+
private void processLocalJoin(ZkDiscoveryEventsData evtsData, ZkDiscoveryNodeJoinEventData evtData)
throws Exception
{
@@ -1161,47 +1240,22 @@ public class ZookeeperDiscoveryImpl {
}
/**
- * @param evtData
- * @throws Exception
- */
- private void processNodesAckEvent(ZkDiscoveryEventData evtData) throws Exception {
- switch (evtData.eventType()) {
- case EventType.EVT_NODE_JOINED: {
- processNodesAckJoinEvent((ZkDiscoveryNodeJoinEventData)evtData);
-
- break;
- }
-
- case DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT: {
- processNodesAckCustomEvent((ZkDiscoveryCustomEventData)evtData);
-
- break;
- }
-
- case EventType.EVT_NODE_FAILED: {
- log.info("All nodes processed node fail [evtId=" + evtData.eventId() + ']');
-
- // Do not need cleanup.
- break;
- }
- }
- }
-
- /**
* @param failedNode Failed node.
*/
private void processEventAcksOnNodeFail(ZookeeperClusterNode failedNode) throws Exception {
+ boolean processed = false;
+
for (Iterator<Map.Entry<Long, ZkDiscoveryEventData>> it = evtsData.evts.entrySet().iterator(); it.hasNext();) {
Map.Entry<Long, ZkDiscoveryEventData> e = it.next();
ZkDiscoveryEventData evtData = e.getValue();
- if (evtData.onNodeFail(failedNode)) {
- processNodesAckEvent(evtData);
-
- it.remove();
- }
+ if (evtData.onNodeFail(failedNode))
+ processed = true;
}
+
+ if (processed)
+ handleProcessedEvents();
}
/**
@@ -1218,11 +1272,25 @@ public class ZookeeperDiscoveryImpl {
/**
* @param evtData Event data.
* @throws Exception If failed.
+ * @return Ack message.
*/
- private void processNodesAckCustomEvent(ZkDiscoveryCustomEventData evtData) throws Exception {
+ @Nullable private DiscoverySpiCustomMessage handleProcessedCustomEvent(ZkDiscoveryCustomEventData evtData)
+ throws Exception
+ {
log.info("All nodes processed custom event [evtId=" + evtData.eventId() + ']');
- zkClient.deleteIfExists(zkPaths.customEventDataPath(evtData.evtPath), -1);
+ if (!evtData.ackEvent()) {
+ zkClient.deleteIfExists(zkPaths.customEventDataPath(false, evtData.evtPath), -1);
+
+ assert evtData.msg != null || locNode.order() > evtData.topologyVersion() : evtData;
+
+ if (evtData.msg != null)
+ return evtData.msg.ackMessage();
+ }
+ else
+ zkClient.deleteIfExists(zkPaths.customEventDataPath(true, evtData.evtPath), -1);
+
+ return null;
}
/**
@@ -1352,7 +1420,7 @@ public class ZookeeperDiscoveryImpl {
if (path.equals(zkPaths.evtsPath)) {
if (!crd)
- onEventsUpdate(data);
+ processNewEvents(data);
}
else
U.warn(log, "Data callback for unknown path: " + path);
http://git-wip-us.apache.org/repos/asf/ignite/blob/e0aba812/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java
index aa1f836..d579c08 100644
--- a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java
+++ b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java
@@ -121,7 +121,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest {
cfg.setCacheConfiguration(ccfg);
- cfg.setMarshaller(new JdkMarshaller());
+ // cfg.setMarshaller(new JdkMarshaller());
cfg.setClientMode(client);
@@ -609,6 +609,8 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest {
node.compute().broadcast(new DummyCallable(null));
awaitPartitionMapExchange();
+
+ waitForEventsAcks(ignite(0));
}
/**