You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/11/30 13:47:42 UTC
[6/6] ignite git commit: zk
zk
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/827b7085
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/827b7085
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/827b7085
Branch: refs/heads/ignite-zk
Commit: 827b708545cf851f784998c5faef874bda8e0898
Parents: 4797181
Author: sboikov <sb...@gridgain.com>
Authored: Thu Nov 30 15:23:44 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Nov 30 16:47:05 2017 +0300
----------------------------------------------------------------------
.../binary/CacheObjectBinaryProcessorImpl.java | 3 +
.../discovery/zk/internal/ZookeeperClient.java | 44 ++
.../zk/internal/ZookeeperDiscoveryImpl.java | 500 ++++++++-----------
.../ZookeeperDiscoverySpiBasicTest.java | 31 ++
4 files changed, 287 insertions(+), 291 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/827b7085/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
index ed4c520..e7f86cb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
@@ -440,6 +440,9 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
BinaryMetadata mergedMeta = BinaryUtils.mergeMetadata(oldMeta, newMeta0);
+ if (oldMeta == mergedMeta)
+ return;
+
MetadataUpdateResult res = transport.requestMetadataUpdate(mergedMeta).get();
assert res != null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/827b7085/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
index ea9b289..73547cf 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
@@ -521,6 +521,13 @@ public class ZookeeperClient implements Watcher {
/**
* @param path Path.
+ */
+ void deleteIfExistsAsync(String path) {
+ new DeleteIfExistsOperation(path).execute();
+ }
+
+ /**
+ * @param path Path.
* @param watcher Watcher.
* @param cb Callback.
*/
@@ -842,6 +849,43 @@ public class ZookeeperClient implements Watcher {
/**
*
*/
+ class DeleteIfExistsOperation implements AsyncCallback.VoidCallback, ZkAsyncOperation {
+ /** */
+ private final String path;
+
+ /**
+ * @param path Path.
+ */
+ DeleteIfExistsOperation(String path) {
+ this.path = path;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void execute() {
+ zk.delete(path, -1, this, null);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void processResult(int rc, String path, Object ctx) {
+ if (rc == KeeperException.Code.NONODE.intValue())
+ return;
+
+ if (needRetry(rc)) {
+ U.warn(log, "Failed to execute async operation, connection lost. Will retry after connection restore [" +
+ "path=" + path + ']');
+
+ retryQ.add(this);
+ }
+ else if (rc == KeeperException.Code.SESSIONEXPIRED.intValue())
+ U.warn(log, "Failed to execute async operation, connection lost [path=" + path + ']');
+ else
+ assert rc == 0 : rc;
+ }
+ }
+
+ /**
+ *
+ */
class CreateCallbackWrapper implements AsyncCallback.StringCallback {
/** */
final CreateOperation op;
http://git-wip-us.apache.org/repos/asf/ignite/blob/827b7085/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
index 0ecbaf3..38f9cbf 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
@@ -27,9 +27,7 @@ import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.UUID;
-import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteClientDisconnectedException;
@@ -127,16 +125,16 @@ public class ZookeeperDiscoveryImpl {
private final int evtsAckThreshold;
/** */
- private ZkRuntimeState state = new ZkRuntimeState(false);
+ private ZkRuntimeState state;
/** */
private volatile ConnectionState connState = ConnectionState.STARTED;
/** */
- private ZkEventWorker evtWorker;
+ private final AtomicBoolean stop = new AtomicBoolean();
/** */
- private final AtomicBoolean stop = new AtomicBoolean();
+ private final Object stateMux = new Object();
/**
* @param log Logger.
@@ -213,7 +211,8 @@ public class ZookeeperDiscoveryImpl {
* @return Ping result.
*/
public boolean pingNode(UUID nodeId) {
- checkState();
+ while (!busyLock.enterBusy())
+ checkState();
// TODO ZK
return node(nodeId) != null;
@@ -225,14 +224,96 @@ public class ZookeeperDiscoveryImpl {
public void reconnect() {
assert clientReconnectEnabled;
- evtWorker.onReconnectRequest();
+ synchronized (stateMux) {
+ if (connState == ConnectionState.STARTED)
+ connState = ConnectionState.DISCONNECTED;
+ else
+ return;
+ }
+
+ state.zkClient.onCloseStart();
+
+ busyLock.block();
+
+ busyLock.unblock();
+
+ state.zkClient.close();
+
+ UUID newId = UUID.randomUUID();
+
+ U.quietAndWarn(log, "Local node will try to reconnect to cluster with new id due to network problems [" +
+ "newId=" + newId +
+ ", prevId=" + locNode.id() +
+ ", locNode=" + locNode + ']');
+
+ doReconnect(newId);
+ }
+
+ /**
+ * @param newId New ID.
+ */
+ private void doReconnect(UUID newId) {
+ locNode.onClientDisconnected(newId);
+
+ if (state.joined) {
+ assert state.evtsData != null;
+
+ lsnr.onDiscovery(EVT_CLIENT_NODE_DISCONNECTED,
+ state.evtsData.topVer,
+ locNode,
+ state.top.topologySnapshot(),
+ Collections.<Long, Collection<ClusterNode>>emptyMap(),
+ null);
+ }
+
+ try {
+ joinTopology0(state.joined);
+ }
+ catch (Exception e) {
+ U.error(log, "Failed to reconnect: " + e, e);
+
+ onSegmented(e);
+ }
+ }
+
+
+ /**
+ * @param e Error.
+ */
+ private void onSegmented(Exception e) {
+ if (state.joined) {
+ synchronized (stateMux) {
+ connState = ConnectionState.STOPPED;
+ }
+
+ zkClient().zk().sync(zkPaths.clusterDir, new SegmentedWatcher(), null);
+ }
+ else
+ joinFut.onDone(e);
+ }
+
+ /**
+ *
+ */
+ class SegmentedWatcher implements AsyncCallback.VoidCallback {
+ @Override public void processResult(int rc, String path, Object ctx) {
+ assert state.evtsData != null;
+
+ lsnr.onDiscovery(EventType.EVT_NODE_SEGMENTED,
+ state.evtsData.topVer,
+ locNode,
+ state.top.topologySnapshot(),
+ Collections.<Long, Collection<ClusterNode>>emptyMap(),
+ null);
+ }
}
/**
* @return Remote nodes.
*/
public Collection<ClusterNode> remoteNodes() {
- checkState();
+ while (!busyLock.enterBusy())
+ checkState();
return state.top.remoteNodes();
}
@@ -258,14 +339,9 @@ public class ZookeeperDiscoveryImpl {
* @return {@code True} if node joined or joining topology.
*/
public boolean knownNode(UUID nodeId) {
- checkState();
-
- if (!busyLock.enterBusy()) {
+ while (!busyLock.enterBusy())
checkState();
- throw new IgniteSpiException("Zookeeper client closed.");
- }
-
try {
List<String> children = state.zkClient.getChildren(zkPaths.aliveNodesDir);
@@ -295,8 +371,6 @@ public class ZookeeperDiscoveryImpl {
* @param msg Message.
*/
public void sendCustomMessage(DiscoverySpiCustomMessage msg) {
- checkState();
-
assert msg != null;
byte[] msgBytes;
@@ -308,12 +382,9 @@ public class ZookeeperDiscoveryImpl {
throw new IgniteSpiException("Failed to marshal custom message: " + msg, e);
}
- if (!busyLock.enterBusy()) {
+ while (!busyLock.enterBusy())
checkState();
- throw new IgniteSpiException("Zookeeper client closed.");
- }
-
try {
String prefix = UUID.randomUUID().toString();
@@ -367,7 +438,9 @@ public class ZookeeperDiscoveryImpl {
/**
* @throws InterruptedException If interrupted.
*/
- private void joinTopology0(boolean reconnect) throws InterruptedException {
+ private void joinTopology0(boolean prevJoined) throws InterruptedException {
+ state = new ZkRuntimeState(prevJoined);
+
DiscoveryDataBag discoDataBag = new DiscoveryDataBag(locNode.id());
exchange.collect(discoDataBag);
@@ -394,14 +467,6 @@ public class ZookeeperDiscoveryImpl {
throw new IgniteSpiException("Failed to create Zookeeper client", e);
}
- if (!reconnect) {
- evtWorker = new ZkEventWorker(igniteInstanceName, "zookeeper-disco-evt-worker", log);
-
- evtWorker.start();
- }
- else
- assert evtWorker != null;
-
startJoin(joinDataBytes);
}
@@ -822,37 +887,32 @@ public class ZookeeperDiscoveryImpl {
if (log.isInfoEnabled())
log.info("Delete join data: " + joinDataPath);
- // TODO ZK async
- state.zkClient.deleteIfExists(joinDataPath, -1);
+ state.zkClient.deleteIfExistsAsync(joinDataPath);
final List<ClusterNode> topSnapshot = Collections.singletonList((ClusterNode)locNode);
- evtWorker.evtsQ.add(new Runnable() {
- @Override public void run() {
- if (connState == ConnectionState.DISCONNECTED)
- connState = ConnectionState.STARTED;
-
- lsnr.onDiscovery(EventType.EVT_NODE_JOINED,
- 1L,
- locNode,
- topSnapshot,
- Collections.<Long, Collection<ClusterNode>>emptyMap(),
- null);
-
- if (state.prevJoined) {
- lsnr.onDiscovery(EVT_CLIENT_NODE_RECONNECTED,
- 1L,
- locNode,
- topSnapshot,
- Collections.<Long, Collection<ClusterNode>>emptyMap(),
- null);
-
- U.quietAndWarn(log, "Client node was reconnected after it was already considered failed.");
- }
+ if (connState == ConnectionState.DISCONNECTED)
+ connState = ConnectionState.STARTED;
- joinFut.onDone();
- }
- });
+ lsnr.onDiscovery(EventType.EVT_NODE_JOINED,
+ 1L,
+ locNode,
+ topSnapshot,
+ Collections.<Long, Collection<ClusterNode>>emptyMap(),
+ null);
+
+ if (state.prevJoined) {
+ lsnr.onDiscovery(EVT_CLIENT_NODE_RECONNECTED,
+ 1L,
+ locNode,
+ topSnapshot,
+ Collections.<Long, Collection<ClusterNode>>emptyMap(),
+ null);
+
+ U.quietAndWarn(log, "Client node was reconnected after it was already considered failed.");
+ }
+
+ joinFut.onDone();
}
/**
@@ -948,8 +1008,8 @@ public class ZookeeperDiscoveryImpl {
state.evtsData.addEvent(state.top.nodesByOrder.values(), evtData, null);
- if (log.isInfoEnabled())
- log.info("Generated CUSTOM event [evt=" + evtData + ", msg=" + msg + ']');
+ if (log.isDebugEnabled())
+ log.debug("Generated CUSTOM event [evt=" + evtData + ", msg=" + msg + ']');
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to unmarshal custom discovery message: " + e, e);
@@ -958,7 +1018,7 @@ public class ZookeeperDiscoveryImpl {
else {
U.warn(log, "Ignore custom event from unknown node: " + sndNodeId);
- state.zkClient.deleteIfExists(evtDataPath, -1);
+ state.zkClient.deleteIfExistsAsync(evtDataPath);
}
state.evtsData.procCustEvt = evtE.getKey();
@@ -1005,7 +1065,7 @@ public class ZookeeperDiscoveryImpl {
* @throws Exception If failed.
*/
@SuppressWarnings("unchecked")
- private void processNewEvents(ZkDiscoveryEventsData evtsData) throws Exception {
+ private void processNewEvents(final ZkDiscoveryEventsData evtsData) throws Exception {
TreeMap<Long, ZkDiscoveryEventData> evts = evtsData.evts;
boolean updateNodeInfo = false;
@@ -1026,8 +1086,8 @@ public class ZookeeperDiscoveryImpl {
processLocalJoin(evtsData, evtData0);
}
else {
- if (log.isInfoEnabled())
- log.info("New discovery event data [evt=" + evtData + ", evtsHist=" + evts.size() + ']');
+ if (log.isDebugEnabled())
+ log.debug("New discovery event data [evt=" + evtData + ", evtsHist=" + evts.size() + ']');
switch (evtData.eventType()) {
case EventType.EVT_NODE_JOINED: {
@@ -1165,40 +1225,35 @@ public class ZookeeperDiscoveryImpl {
final List<ClusterNode> topSnapshot = state.top.topologySnapshot();
- evtWorker.evtsQ.add(new Runnable() {
- @Override public void run() {
- if (connState == ConnectionState.DISCONNECTED)
- connState = ConnectionState.STARTED;
-
- lsnr.onDiscovery(evtData.eventType(),
- evtData.topologyVersion(),
- locNode,
- topSnapshot,
- Collections.<Long, Collection<ClusterNode>>emptyMap(),
- null);
-
- if (state.prevJoined) {
- lsnr.onDiscovery(EVT_CLIENT_NODE_RECONNECTED,
- evtData.topologyVersion(),
- locNode,
- topSnapshot,
- Collections.<Long, Collection<ClusterNode>>emptyMap(),
- null);
-
- U.quietAndWarn(log, "Client node was reconnected after it was already considered failed.");
- }
+ if (connState == ConnectionState.DISCONNECTED)
+ connState = ConnectionState.STARTED;
- joinFut.onDone();
- }
- });
+ lsnr.onDiscovery(evtData.eventType(),
+ evtData.topologyVersion(),
+ locNode,
+ topSnapshot,
+ Collections.<Long, Collection<ClusterNode>>emptyMap(),
+ null);
+
+ if (state.prevJoined) {
+ lsnr.onDiscovery(EVT_CLIENT_NODE_RECONNECTED,
+ evtData.topologyVersion(),
+ locNode,
+ topSnapshot,
+ Collections.<Long, Collection<ClusterNode>>emptyMap(),
+ null);
+
+ U.quietAndWarn(log, "Client node was reconnected after it was already considered failed.");
+ }
+
+ joinFut.onDone();
state.joined = true;
- if (log.isInfoEnabled())
- log.info("Delete data for joined: " + path);
+ if (log.isDebugEnabled())
+ log.debug("Delete data for joined: " + path);
- // TODO ZK: async
- state.zkClient.deleteIfExists(path, -1);
+ state.zkClient.deleteIfExistsAsync(path);
}
/**
@@ -1207,8 +1262,8 @@ public class ZookeeperDiscoveryImpl {
*/
@SuppressWarnings("unchecked")
private void notifyCustomEvent(final ZkDiscoveryCustomEventData evtData, final DiscoverySpiCustomMessage msg) {
- if (log.isInfoEnabled())
- log.info(" [topVer=" + evtData.topologyVersion() + ", msg=" + msg + ']');
+ if (log.isDebugEnabled())
+ log.debug(" [topVer=" + evtData.topologyVersion() + ", msg=" + msg + ']');
final ZookeeperClusterNode sndNode = state.top.nodesById.get(evtData.sndNodeId);
@@ -1216,16 +1271,12 @@ public class ZookeeperDiscoveryImpl {
final List<ClusterNode> topSnapshot = state.top.topologySnapshot();
- evtWorker.evtsQ.add(new Runnable() {
- @Override public void run() {
- lsnr.onDiscovery(evtData.eventType(),
- evtData.topologyVersion(),
- sndNode,
- topSnapshot,
- Collections.<Long, Collection<ClusterNode>>emptyMap(),
- msg);
- }
- });
+ lsnr.onDiscovery(evtData.eventType(),
+ evtData.topologyVersion(),
+ sndNode,
+ topSnapshot,
+ Collections.<Long, Collection<ClusterNode>>emptyMap(),
+ msg);
}
/**
@@ -1245,16 +1296,12 @@ public class ZookeeperDiscoveryImpl {
final List<ClusterNode> topSnapshot = state.top.topologySnapshot();
- evtWorker.evtsQ.add(new Runnable() {
- @Override public void run() {
- lsnr.onDiscovery(evtData.eventType(),
- evtData.topologyVersion(),
- joinedNode,
- topSnapshot,
- Collections.<Long, Collection<ClusterNode>>emptyMap(),
- null);
- }
- });
+ lsnr.onDiscovery(evtData.eventType(),
+ evtData.topologyVersion(),
+ joinedNode,
+ topSnapshot,
+ Collections.<Long, Collection<ClusterNode>>emptyMap(),
+ null);
}
/**
@@ -1268,16 +1315,12 @@ public class ZookeeperDiscoveryImpl {
final List<ClusterNode> topSnapshot = state.top.topologySnapshot();
- evtWorker.evtsQ.add(new Runnable() {
- @Override public void run() {
- lsnr.onDiscovery(evtData.eventType(),
- evtData.topologyVersion(),
- failedNode,
- topSnapshot,
- Collections.<Long, Collection<ClusterNode>>emptyMap(),
- null);
- }
- });
+ lsnr.onDiscovery(evtData.eventType(),
+ evtData.topologyVersion(),
+ failedNode,
+ topSnapshot,
+ Collections.<Long, Collection<ClusterNode>>emptyMap(),
+ null);
}
/**
@@ -1326,8 +1369,8 @@ public class ZookeeperDiscoveryImpl {
String path = zkPaths.ackEventDataPath(evtId);
- if (log.isInfoEnabled())
- log.info("Create ack event: " + path);
+ if (log.isDebugEnabled())
+ log.debug("Create ack event: " + path);
// TODO ZK: delete is previous exists?
state.zkClient.createIfNeeded(
@@ -1349,9 +1392,10 @@ public class ZookeeperDiscoveryImpl {
newEvts.add(ackEvtData);
- if (log.isInfoEnabled()) {
- log.info("Generated CUSTOM event ack [baseEvtId=" + evtData.eventId() +
+ if (log.isDebugEnabled()) {
+ log.debug("Generated CUSTOM event ack [baseEvtId=" + evtData.eventId() +
", evt=" + ackEvtData +
+ ", evtSize=" + ackBytes.length +
", msg=" + ack + ']');
}
}
@@ -1360,8 +1404,8 @@ public class ZookeeperDiscoveryImpl {
}
case EventType.EVT_NODE_FAILED: {
- if (log.isInfoEnabled())
- log.info("All nodes processed node fail [evtData=" + evtData + ']');
+ if (log.isDebugEnabled())
+ log.debug("All nodes processed node fail [evtData=" + evtData + ']');
break; // Do not need addition cleanup.
}
@@ -1417,8 +1461,8 @@ public class ZookeeperDiscoveryImpl {
if (log.isInfoEnabled())
log.info("Delete processed event data [path1=" + evtDataPath + ", path2=" + dataForJoinedPath + ']');
- state.zkClient.deleteIfExists(evtDataPath, -1);
- state.zkClient.deleteIfExists(dataForJoinedPath, -1);
+ state.zkClient.deleteIfExistsAsync(evtDataPath);
+ state.zkClient.deleteIfExistsAsync(dataForJoinedPath);
}
/**
@@ -1429,15 +1473,16 @@ public class ZookeeperDiscoveryImpl {
@Nullable private DiscoverySpiCustomMessage handleProcessedCustomEvent(String ctx, ZkDiscoveryCustomEventData evtData)
throws Exception
{
- if (log.isInfoEnabled())
- log.info("All nodes processed custom event [ctx=" + ctx + ", evtData=" + evtData + ']');
+ if (log.isDebugEnabled())
+ log.debug("All nodes processed custom event [ctx=" + ctx + ", evtData=" + evtData + ']');
if (!evtData.ackEvent()) {
String path = zkPaths.customEventDataPath(false, evtData.evtPath);
- log.info("Delete path: " + path);
+ if (log.isDebugEnabled())
+ log.debug("Delete path: " + path);
- state.zkClient.deleteIfExists(path, -1);
+ state.zkClient.deleteIfExistsAsync(path);
assert evtData.msg != null || locNode.order() > evtData.topologyVersion() : evtData;
@@ -1447,9 +1492,10 @@ public class ZookeeperDiscoveryImpl {
else {
String path = zkPaths.ackEventDataPath(evtData.eventId());
- log.info("Delete path: " + path);
+ if (log.isDebugEnabled())
+ log.debug("Delete path: " + path);
- state.zkClient.deleteIfExists(path, -1);
+ state.zkClient.deleteIfExistsAsync(path);
}
return null;
@@ -1472,7 +1518,9 @@ public class ZookeeperDiscoveryImpl {
log.info("Stop ZookeeperDiscovery [nodeId=" + locNode.id() + ", err=" + e + ']');
- connState = ConnectionState.STOPPED;
+ synchronized (stateMux) {
+ connState = ConnectionState.STOPPED;
+ }
ZookeeperClient zkClient = state.zkClient;
@@ -1481,18 +1529,12 @@ public class ZookeeperDiscoveryImpl {
busyLock.block();
+ busyLock.unblock();
+
joinFut.onDone(e);
if (zkClient != null)
zkClient.close();
-
- ZkEventWorker evtWorker = this.evtWorker;
-
- if (evtWorker != null) {
- evtWorker.interrupt();
-
- evtWorker.join();
- }
}
/**
@@ -1552,165 +1594,27 @@ public class ZookeeperDiscoveryImpl {
/**
*
*/
- private class ZkEventWorker extends IgniteSpiThread {
- /** */
- private final Runnable RECONNECT = new Runnable() {@Override public void run() {}};
-
- /** */
- private final Runnable CONNECTION_LOST = new Runnable() {@Override public void run() {}};
-
- /** */
- private final BlockingQueue<Runnable> evtsQ;
-
- /**
- * @param igniteInstanceName Ignite instance name.
- * @param name Thread name.
- * @param log Logger.
- */
- ZkEventWorker(String igniteInstanceName, String name, IgniteLogger log) {
- super(igniteInstanceName, name, log);
-
- evtsQ = new LinkedBlockingQueue<>();
- }
-
- /** {@inheritDoc} */
- @Override protected void body() throws InterruptedException {
- while (!isInterrupted()) {
- Runnable r = evtsQ.take();
-
- if (r == RECONNECT)
- processReconnect();
- if (r == CONNECTION_LOST)
- processConnectionLost();
- else {
- if (!busyLock.enterBusy())
- return;
-
- try {
- r.run();
- }
- finally {
- busyLock.leaveBusy();
- }
- }
- }
- }
-
+ private class ReconnectorThread extends IgniteSpiThread {
/**
*
*/
- void onConnectionLoss() {
- evtsQ.add(CONNECTION_LOST);
+ ReconnectorThread() {
+ super(ZookeeperDiscoveryImpl.this.igniteInstanceName, "zk-reconnector", log);
}
- /**
- *
- */
- void onReconnectRequest() {
- evtsQ.add(RECONNECT);
- }
-
- /**
- *
- */
- private void processReconnect() {
- assert locNode.isClient() : locNode;
-
- if (connState == ConnectionState.DISCONNECTED)
- return;
-
- connState = ConnectionState.DISCONNECTED;
-
- state.zkClient.onCloseStart();
-
+ @Override protected void body() throws InterruptedException {
busyLock.block();
busyLock.unblock();
- state.zkClient.close();
-
UUID newId = UUID.randomUUID();
- U.quietAndWarn(log, "Local node will try to reconnect to cluster with new id due to network problems [" +
+ U.quietAndWarn(log, "Connection to Zookeeper server is lost, local node will try to reconnect with new id [" +
"newId=" + newId +
", prevId=" + locNode.id() +
", locNode=" + locNode + ']');
- reconnect(newId);
- }
-
- /**
- *
- */
- void processConnectionLost() {
- if (clientReconnectEnabled) {
- connState = ConnectionState.DISCONNECTED;
-
- busyLock.block();
-
- busyLock.unblock();
-
- UUID newId = UUID.randomUUID();
-
- U.quietAndWarn(log, "Connection to Zookeeper server is lost, local node will try to reconnect with new id [" +
- "newId=" + newId +
- ", prevId=" + locNode.id() +
- ", locNode=" + locNode + ']');
-
- reconnect(newId);
- }
- else {
- U.warn(log, "Connection to Zookeeper server is lost, local node SEGMENTED.");
-
- onSegmented(new IgniteSpiException("Zookeeper connection loss."));
- }
- }
-
- /**
- * @param newId New ID.
- */
- private void reconnect(UUID newId) {
- locNode.onClientDisconnected(newId);
-
- if (state.joined) {
- assert state.evtsData != null;
-
- lsnr.onDiscovery(EVT_CLIENT_NODE_DISCONNECTED,
- state.evtsData.topVer,
- locNode,
- state.top.topologySnapshot(),
- Collections.<Long, Collection<ClusterNode>>emptyMap(),
- null);
- }
-
- state = new ZkRuntimeState(state.joined);
-
- try {
- joinTopology0(true);
- }
- catch (Exception e) {
- U.error(log, "Failed to reconnect: " + e, e);
-
- onSegmented(e);
- }
- }
-
- /**
- * @param e Error.
- */
- private void onSegmented(Exception e) {
- if (state.joined) {
- assert state.evtsData != null;
-
- lsnr.onDiscovery(EventType.EVT_NODE_SEGMENTED,
- state.evtsData.topVer,
- locNode,
- state.top.topologySnapshot(),
- Collections.<Long, Collection<ClusterNode>>emptyMap(),
- null);
- }
- else
- joinFut.onDone(e);
+ doReconnect(newId);
}
}
@@ -1723,7 +1627,21 @@ public class ZookeeperDiscoveryImpl {
/** {@inheritDoc} */
@Override public void run() {
- evtWorker.onConnectionLoss();
+ if (clientReconnectEnabled) {
+ synchronized (stateMux) {
+ if (connState == ConnectionState.STARTED)
+ connState = ConnectionState.DISCONNECTED;
+ else
+ return;
+ }
+
+ new ReconnectorThread().start();
+ }
+ else {
+ U.warn(log, "Connection to Zookeeper server is lost, local node SEGMENTED.");
+
+ onSegmented(new IgniteSpiException("Zookeeper connection loss."));
+ }
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/827b7085/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
index 8ae84c1..d50e9b9 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
@@ -18,6 +18,7 @@
package org.apache.ignite.spi.discovery.zk.internal;
import java.io.File;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -306,6 +307,22 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest {
/**
* @throws Exception If failed.
*/
+ public void testMetadataUpdate() throws Exception {
+ startGrid(0);
+
+ GridTestUtils.runMultiThreaded(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ ignite(0).configuration().getMarshaller().marshal(new C1());
+ ignite(0).configuration().getMarshaller().marshal(new C2());
+
+ return null;
+ }
+ }, 64, "marshal");
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testClientNodesStatus() throws Exception {
startGrid(0);
@@ -1661,4 +1678,18 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest {
return data;
}
}
+
+ /**
+ *
+ */
+ private static class C1 implements Serializable {
+ // No-op.
+ }
+
+ /**
+ *
+ */
+ private static class C2 implements Serializable {
+ // No-op.
+ }
}