You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/11/14 13:02:34 UTC
[2/3] ignite git commit: zk
zk
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1842bb4c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1842bb4c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1842bb4c
Branch: refs/heads/ignite-zk
Commit: 1842bb4c48ce245a5b69b669087590351de686fa
Parents: 3736abe
Author: sboikov <sb...@gridgain.com>
Authored: Tue Nov 14 15:07:55 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Nov 14 15:10:16 2017 +0300
----------------------------------------------------------------------
.../spi/discovery/zk/ZookeeperDiscoverySpi.java | 33 +++++++++++---------
1 file changed, 19 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/1842bb4c/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
index bae183d..80d563e 100644
--- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
+++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
@@ -97,6 +97,9 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery
private static final String ALIVE_NODES_PATH = CLUSTER_PATH + "/alive";
/** */
+ private static final String CUSTOM_EVTS_PATH = CLUSTER_PATH + "/customEvts";
+
+ /** */
private static final byte[] EMPTY_BYTES = new byte[0];
/** */
@@ -127,7 +130,7 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery
private final JdkMarshaller marsh = new JdkMarshaller();
/** */
- private final NodesUpdateCallback nodesUpdateCallback;
+ private final ZKChildrenUpdateCallback zkChildrenUpdateCallback;
/** */
private final DataUpdateCallback dataUpdateCallback;
@@ -166,7 +169,7 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery
public ZookeeperDiscoverySpi() {
zkWatcher = new ZookeeperWatcher();
- nodesUpdateCallback = new NodesUpdateCallback();
+ zkChildrenUpdateCallback = new ZKChildrenUpdateCallback();
dataUpdateCallback = new DataUpdateCallback();
}
@@ -307,7 +310,12 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery
/** {@inheritDoc} */
@Override public void sendCustomEvent(DiscoverySpiCustomMessage msg) throws IgniteException {
// TODO ZK
- //throw new UnsupportedOperationException();
+ try {
+ zkCurator.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath(CUSTOM_EVTS_PATH, marshal(msg));
+ }
+ catch (Exception e) {
+ throw new IgniteSpiException(e);
+ }
}
/** {@inheritDoc} */
@@ -318,8 +326,7 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery
/** {@inheritDoc} */
@Override public boolean isClientMode() throws IllegalStateException {
- // TODO ZK
- return false;
+ return locNode.isClient();
}
/**
@@ -354,11 +361,9 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery
}
private boolean igniteClusterStarted() throws Exception {
- boolean started = zkCurator.checkExists().forPath(IGNITE_PATH) != null &&
+ return zkCurator.checkExists().forPath(IGNITE_PATH) != null &&
zkCurator.checkExists().forPath(ALIVE_NODES_PATH) != null &&
!zk.getChildren(ALIVE_NODES_PATH, false).isEmpty();
-
- return started;
}
/** {@inheritDoc} */
@@ -454,8 +459,8 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery
gridStartTime = clusterData.gridStartTime;
zk.getData(EVENTS_PATH, zkWatcher, dataUpdateCallback, null);
- zk.getChildren(JOIN_HIST_PATH, zkWatcher, nodesUpdateCallback, null);
- zk.getChildren(ALIVE_NODES_PATH, zkWatcher, nodesUpdateCallback, null);
+ zk.getChildren(JOIN_HIST_PATH, zkWatcher, zkChildrenUpdateCallback, null);
+ zk.getChildren(ALIVE_NODES_PATH, zkWatcher, zkChildrenUpdateCallback, null);
List<Op> joinOps = new ArrayList<>();
@@ -693,7 +698,7 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery
/**
*
*/
- class NodesUpdateCallback implements AsyncCallback.Children2Callback {
+ class ZKChildrenUpdateCallback implements AsyncCallback.Children2Callback {
@Override public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
try {
if (children == null || children.isEmpty())
@@ -756,8 +761,8 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery
private final TreeMap<Long, ZookeeperClusterNode> curTop = new TreeMap<>();
/**
- * @param oldNodes
- * @param newNodes
+ * @param oldNodes Previous processed state.
+ * @param newNodes Current state.
*/
private void generateEvents(ZKAliveNodes oldNodes, ZKAliveNodes newNodes) {
assert newNodes != null;
@@ -1177,7 +1182,7 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery
log.info("Process event [type=" + event.getType() + ", state=" + event.getState() + ", path=" + event.getPath() + ']');
if (event.getType() == Event.EventType.NodeChildrenChanged) {
- zk.getChildren(event.getPath(), this, nodesUpdateCallback, null);
+ zk.getChildren(event.getPath(), this, zkChildrenUpdateCallback, null);
} else if (event.getType() == Event.EventType.NodeDataChanged) {
zk.getData(event.getPath(), this, dataUpdateCallback, null);
}