You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/12/18 11:34:46 UTC
[4/4] ignite git commit: zk
zk
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/fc085a4a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/fc085a4a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/fc085a4a
Branch: refs/heads/ignite-zk-ce
Commit: fc085a4a018c1f90861a76842f3eebdeee0ba567
Parents: 8101455
Author: sboikov <sb...@gridgain.com>
Authored: Mon Dec 18 10:49:43 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Dec 18 12:50:53 2017 +0300
----------------------------------------------------------------------
.../managers/communication/GridIoManager.java | 4 +-
.../communication/GridIoMessageFactory.java | 12 ++++
.../ignite/internal/util/nio/GridNioServer.java | 59 +++++++++++--------
.../TcpCommunicationConnectionCheckFuture.java | 61 ++++++++++++++++----
.../internal/ZkCommunicationErrorNodeState.java | 44 ++++++++++++++
.../ZkCommunicationErrorProcessFuture.java | 41 ++++++++++---
.../ZkCommunicationErrorResolveResult.java | 3 +
.../ZkDistributedCollectDataFuture.java | 11 ++++
.../discovery/zk/internal/ZkRuntimeState.java | 5 ++
.../zk/internal/ZookeeperDiscoveryImpl.java | 37 ++++++++++--
.../ZookeeperDiscoverySpiBasicTest.java | 23 +++++---
11 files changed, 241 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/fc085a4a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 9c6271a..81f00e8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -300,9 +300,9 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
@Override public MessageReader reader(UUID rmtNodeId, MessageFactory msgFactory)
throws IgniteCheckedException {
- assert rmtNodeId != null;
- return new DirectMessageReader(msgFactory, U.directProtocolVersion(ctx, rmtNodeId));
+ return new DirectMessageReader(msgFactory,
+ rmtNodeId != null ? U.directProtocolVersion(ctx, rmtNodeId) : GridIoManager.DIRECT_PROTO_VER);
}
};
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/fc085a4a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 78cb7a8..51a6e25 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -117,6 +117,8 @@ import org.apache.ignite.internal.processors.cache.transactions.TxLocksResponse;
import org.apache.ignite.internal.processors.cache.version.GridCacheRawVersionedEntry;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx;
+import org.apache.ignite.internal.processors.cluster.ClusterMetricsUpdateMessage;
+import org.apache.ignite.internal.processors.continuous.ContinuousRoutineStartResultMessage;
import org.apache.ignite.internal.processors.continuous.GridContinuousMessage;
import org.apache.ignite.internal.processors.datastreamer.DataStreamerEntry;
import org.apache.ignite.internal.processors.datastreamer.DataStreamerRequest;
@@ -879,6 +881,16 @@ public class GridIoMessageFactory implements MessageFactory {
break;
+ case 129:
+ msg = new ClusterMetricsUpdateMessage();
+
+ break;
+
+
+ case 130:
+ msg = new ContinuousRoutineStartResultMessage();
+
+ break;
// [-3..119] [124..128] [-23..-27] [-36..-55]- this
// [120..123] - DR
http://git-wip-us.apache.org/repos/asf/ignite/blob/fc085a4a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index 9784549..e95f957 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -2301,7 +2301,11 @@ public class GridNioServer<T> {
else if (log.isDebugEnabled())
log.debug("Failed to process selector key [ses=" + ses + ", err=" + e + ']');
- close(ses, new GridNioException(e));
+ // Can be null if async connect failed.
+ if (ses != null)
+ close(ses, new GridNioException(e));
+ else
+ closeKey(key);
}
}
}
@@ -2525,6 +2529,34 @@ public class GridNioServer<T> {
}
/**
+ * @param key Key.
+ */
+ private void closeKey(SelectionKey key) {
+ // Shutdown input and output so that remote client will see correct socket close.
+ Socket sock = ((SocketChannel)key.channel()).socket();
+
+ try {
+ try {
+ sock.shutdownInput();
+ }
+ catch (IOException ignored) {
+ // No-op.
+ }
+
+ try {
+ sock.shutdownOutput();
+ }
+ catch (IOException ignored) {
+ // No-op.
+ }
+ }
+ finally {
+ U.close(key, log);
+ U.close(sock, log);
+ }
+ }
+
+ /**
* Closes the session and all associated resources, then notifies the listener.
*
* @param ses Session to be closed.
@@ -2544,8 +2576,6 @@ public class GridNioServer<T> {
sessions.remove(ses);
workerSessions.remove(ses);
- SelectionKey key = ses.key();
-
if (ses.setClosed()) {
ses.onClosed();
@@ -2557,28 +2587,7 @@ public class GridNioServer<T> {
((DirectBuffer)ses.readBuffer()).cleaner().clean();
}
- // Shutdown input and output so that remote client will see correct socket close.
- Socket sock = ((SocketChannel)key.channel()).socket();
-
- try {
- try {
- sock.shutdownInput();
- }
- catch (IOException ignored) {
- // No-op.
- }
-
- try {
- sock.shutdownOutput();
- }
- catch (IOException ignored) {
- // No-op.
- }
- }
- finally {
- U.close(key, log);
- U.close(sock, log);
- }
+ closeKey(ses.key());
if (e != null)
filterChain.onExceptionCaught(ses, e);
http://git-wip-us.apache.org/repos/asf/ignite/blob/fc085a4a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java
index 6cb5622..170ee44 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java
@@ -111,13 +111,6 @@ public class TcpCommunicationConnectionCheckFuture extends GridFutureAdapter<Bit
resBitSet = new BitSet(nodes.size());
}
- /** {@inheritDoc} */
- @Override public void onEvent(Event evt) {
- assert evt instanceof DiscoveryEvent : evt;
- assert evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED ;
-
- }
-
/**
* @param timeout Connect timeout.
*/
@@ -160,6 +153,8 @@ public class TcpCommunicationConnectionCheckFuture extends GridFutureAdapter<Bit
MultipleAddressesConnectFuture fut = new MultipleAddressesConnectFuture(i);
fut.init(addrs);
+
+ futs[i] = fut;
}
}
else
@@ -171,7 +166,7 @@ public class TcpCommunicationConnectionCheckFuture extends GridFutureAdapter<Bit
spi.getSpiContext().addLocalEventListener(this, EVT_NODE_LEFT, EVT_NODE_FAILED);
if (!isDone()) {
- endTime = System.currentTimeMillis() - timeout;
+ endTime = System.currentTimeMillis() + timeout;
spi.getSpiContext().addTimeoutObject(this);
}
@@ -211,8 +206,30 @@ public class TcpCommunicationConnectionCheckFuture extends GridFutureAdapter<Bit
}
/** {@inheritDoc} */
+ @Override public void onEvent(Event evt) {
+ if (isDone())
+ return;
+
+ assert evt instanceof DiscoveryEvent : evt;
+ assert evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED ;
+
+ UUID nodeId = ((DiscoveryEvent)evt).eventNode().id();
+
+ for (int i = 0; i < nodes.size(); i++) {
+ if (nodes.get(i).id().equals(nodeId)) {
+ ConnectFuture fut = futs[i];
+
+ if (fut != null)
+ fut.onNodeFailed();
+
+ return;
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
@Override public void onTimeout() {
- if (!isDone())
+ if (isDone())
return;
ConnectFuture[] futs = this.futs;
@@ -230,6 +247,8 @@ public class TcpCommunicationConnectionCheckFuture extends GridFutureAdapter<Bit
if (super.onDone(res, err)) {
spi.getSpiContext().removeTimeoutObject(this);
+ spi.getSpiContext().removeLocalEventListener(this);
+
return true;
}
@@ -244,6 +263,11 @@ public class TcpCommunicationConnectionCheckFuture extends GridFutureAdapter<Bit
*
*/
void onTimeout();
+
+ /**
+ *
+ */
+ void onNodeFailed();
}
/**
@@ -325,11 +349,16 @@ public class TcpCommunicationConnectionCheckFuture extends GridFutureAdapter<Bit
finish(nodeId(nodeIdx).equals(rmtNodeId));
}
+ /** {@inheritDoc} */
+ @Override public void onNodeFailed() {
+ cancel();
+ }
+
/**
* @param res Result.
* @return {@code True} if result was set by this call.
*/
- boolean finish(boolean res) {
+ public boolean finish(boolean res) {
if (connFutDoneUpdater.compareAndSet(this, 0, 1)) {
onStatusReceived(res);
@@ -369,6 +398,18 @@ public class TcpCommunicationConnectionCheckFuture extends GridFutureAdapter<Bit
}
/** {@inheritDoc} */
+ @Override public void onNodeFailed() {
+ SingleAddressConnectFuture[] futs = this.futs;
+
+ for (int i = 0; i < futs.length; i++) {
+ ConnectFuture fut = futs[i];
+
+ if (fut != null)
+ fut.onNodeFailed();
+ }
+ }
+
+ /** {@inheritDoc} */
@Override public void onTimeout() {
SingleAddressConnectFuture[] futs = this.futs;
http://git-wip-us.apache.org/repos/asf/ignite/blob/fc085a4a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorNodeState.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorNodeState.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorNodeState.java
new file mode 100644
index 0000000..ddc310d
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorNodeState.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.io.Serializable;
+import java.util.BitSet;
+
+/**
+ *
+ */
+public class ZkCommunicationErrorNodeState implements Serializable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private final BitSet commState;
+
+ /** */
+ private final Exception err;
+
+ /**
+ * @param commState Communication state.
+ * @param err Error if failed get communication state..
+ */
+ ZkCommunicationErrorNodeState(BitSet commState, Exception err) {
+ this.commState = commState;
+ this.err = err;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/fc085a4a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java
index d7d4bd1..a6294bd 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java
@@ -17,6 +17,7 @@
package org.apache.ignite.spi.discovery.zk.internal;
+import java.util.BitSet;
import java.util.Collection;
import java.util.List;
import java.util.Map;
@@ -29,6 +30,8 @@ import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.spi.IgniteSpiTimeoutObject;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
@@ -141,19 +144,41 @@ class ZkCommunicationErrorProcessFuture extends GridFutureAdapter<Void> implemen
}
/**
- * @param locNodeOrder Local node order.
* @param rtState Runtime state.
* @param futPath Future path.
* @param nodes Nodes to ping.
- * @throws Exception If failed.
*/
- void pingNodesAndNotifyFuture(long locNodeOrder, ZkRuntimeState rtState, String futPath, List<ClusterNode> nodes)
- throws Exception {
- TcpCommunicationSpi spi = (TcpCommunicationSpi)impl.spi.ignite().configuration().getCommunicationSpi();
-
- spi.checkConnection(nodes);
+ void pingNodesAndNotifyFuture(final ZkRuntimeState rtState, final String futPath, List<ClusterNode> nodes) throws Exception {
+ final TcpCommunicationSpi spi = (TcpCommunicationSpi)impl.spi.ignite().configuration().getCommunicationSpi();
+
+ IgniteFuture<BitSet> fut = spi.checkConnection(nodes);
+
+ fut.listen(new IgniteInClosure<IgniteFuture<BitSet>>() {
+ @Override public void apply(final IgniteFuture<BitSet> fut) {
+ // Future completed either from NIO thread or timeout worker, save result from another thread.
+ impl.runInWorkerThread(new ZkRunnable(rtState, impl) {
+ @Override public void run0() throws Exception {
+ BitSet commState = null;
+ Exception err = null;
+
+ try {
+ commState = fut.get();
+ }
+ catch (Exception e) {
+ err = e;
+ }
+
+ ZkCommunicationErrorNodeState state = new ZkCommunicationErrorNodeState(commState, err);
+
+ ZkDistributedCollectDataFuture.saveNodeResult(futPath,
+ rtState.zkClient,
+ impl.localNode().order(),
+ impl.marshalZip(state));
+ }
+ });
- ZkDistributedCollectDataFuture.saveNodeResult(futPath, rtState.zkClient, locNodeOrder, null);
+ }
+ });
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/fc085a4a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveResult.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveResult.java
index 745496b..607f93b 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveResult.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveResult.java
@@ -31,6 +31,9 @@ class ZkCommunicationErrorResolveResult implements Serializable {
/** */
final GridLongList failedNodes;
+ /**
+ * @param failedNodes
+ */
ZkCommunicationErrorResolveResult(@Nullable GridLongList failedNodes) {
this.failedNodes = failedNodes;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/fc085a4a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDistributedCollectDataFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDistributedCollectDataFuture.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDistributedCollectDataFuture.java
index e5d2356..19e2acc 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDistributedCollectDataFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDistributedCollectDataFuture.java
@@ -107,6 +107,17 @@ class ZkDistributedCollectDataFuture extends GridFutureAdapter<Void> {
}
/**
+ * @param futPath
+ * @param client
+ * @param nodeOrder
+ * @return Node result data.
+ * @throws Exception If fai.ed
+ */
+ static byte[] readNodeResult(String futPath, ZookeeperClient client, long nodeOrder) throws Exception {
+ return client.getData(futPath + "/" + nodeOrder);
+ }
+
+ /**
* @param futResPath Result path.
* @param client Client.
* @param data Result data.
http://git-wip-us.apache.org/repos/asf/ignite/blob/fc085a4a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java
index fc03f8d..dc7b1bb 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java
@@ -17,6 +17,8 @@
package org.apache.ignite.spi.discovery.zk.internal;
+import java.util.List;
+import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.spi.IgniteSpiTimeoutObject;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.Watcher;
@@ -73,6 +75,9 @@ class ZkRuntimeState {
/** */
final ZkClusterNodes top = new ZkClusterNodes();
+ /** */
+ List<ClusterNode> commErrProcNodes;
+
/**
* @param prevJoined {@code True} if joined topology before reconnect attempt.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/fc085a4a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
index 65bf6e7..62fc581 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
@@ -2056,6 +2056,8 @@ public class ZookeeperDiscoveryImpl {
rtState.evtsData.communicationErrorResolveFutureId(null);
+ rtState.commErrProcNodes = null;
+
ZkCommunicationErrorResolveResult res = msg.res;
if (res == null)
@@ -2120,7 +2122,8 @@ public class ZookeeperDiscoveryImpl {
final String futPath = zkPaths.distributedFutureBasePath(msg.id);
final ZkCommunicationErrorProcessFuture fut0 = fut;
- final List<ClusterNode> topSnapshot = rtState.top.topologySnapshot();
+
+ rtState.commErrProcNodes = rtState.top.topologySnapshot();
if (rtState.crd) {
ZkDistributedCollectDataFuture nodeResFut = collectCommunicationStatusFuture(msg.id);
@@ -2130,7 +2133,7 @@ public class ZookeeperDiscoveryImpl {
runInWorkerThread(new ZkRunnable(rtState, this) {
@Override protected void run0() throws Exception {
- fut0.pingNodesAndNotifyFuture(locNode.order(), rtState, futPath, topSnapshot);
+ fut0.pingNodesAndNotifyFuture(rtState, futPath, rtState.commErrProcNodes);
}
});
}
@@ -2145,7 +2148,7 @@ public class ZookeeperDiscoveryImpl {
new Callable<Void>() {
@Override public Void call() throws Exception {
// Future is completed from ZK event thread.
- onCommunicationResolveStatusReceived(rtState);
+ onCommunicationErrorResolveStatusReceived(rtState);
return null;
}
@@ -2157,16 +2160,38 @@ public class ZookeeperDiscoveryImpl {
* @param rtState Runtime state.
* @throws Exception If failed.
*/
- private void onCommunicationResolveStatusReceived(ZkRuntimeState rtState) throws Exception {
+ private void onCommunicationErrorResolveStatusReceived(ZkRuntimeState rtState) throws Exception {
ZkDiscoveryEventsData evtsData = rtState.evtsData;
UUID futId = rtState.evtsData.communicationErrorResolveFutureId();
if (log.isInfoEnabled())
- log.info("Received communication status from all nodes, call resolver [reqId=" + futId + ']');
+ log.info("Received communication status from all nodes [reqId=" + futId + ']');
assert futId != null;
+ String futPath = zkPaths.distributedFutureBasePath(futId);
+
+ List<ClusterNode> initialNodes = rtState.commErrProcNodes;
+
+ assert initialNodes != null;
+
+ rtState.commErrProcNodes = null;
+
+ ZkClusterNodes top = rtState.top;
+
+ List<ZkCommunicationErrorNodeState> nodesRes = new ArrayList<>();
+
+ for (ZookeeperClusterNode node : top.nodesByOrder.values()) {
+ byte[] stateBytes = ZkDistributedCollectDataFuture.readNodeResult(futPath,
+ rtState.zkClient,
+ node.order());
+
+ ZkCommunicationErrorNodeState nodeState = unmarshalZip(stateBytes);
+
+ nodesRes.add(nodeState);
+ }
+
ZkCommunicationErrorResolveFinishMessage msg = new ZkCommunicationErrorResolveFinishMessage(futId);
ZkCommunicationErrorResolveResult res = new ZkCommunicationErrorResolveResult(null);
@@ -2663,7 +2688,7 @@ public class ZookeeperDiscoveryImpl {
* @return Bytes.
* @throws IgniteCheckedException If failed.
*/
- private byte[] marshalZip(Object obj) throws IgniteCheckedException {
+ byte[] marshalZip(Object obj) throws IgniteCheckedException {
assert obj != null;
return U.zip(marsh.marshal(obj));
http://git-wip-us.apache.org/repos/asf/ignite/blob/fc085a4a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
index a2e8784..cee2e76 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
@@ -1916,20 +1916,27 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest {
}
/**
- * TODO ZK: move to comm spi tests.
- *
* @throws Exception If failed.
*/
- public void testNodesPing() throws Exception {
- startGrids(3);
+ public void testConnectionCheck() throws Exception {
+ final int NODES = 5;
+
+ startGridsMultiThreaded(NODES);
+
+ for (int i = 0; i < NODES; i++) {
+ Ignite node = ignite(i);
+
+ TcpCommunicationSpi spi = (TcpCommunicationSpi)node.configuration().getCommunicationSpi();
- TcpCommunicationSpi spi = (TcpCommunicationSpi)ignite(1).configuration().getCommunicationSpi();
+ List<ClusterNode> nodes = new ArrayList<>();
- List<ClusterNode> nodes = new ArrayList<>();
+ nodes.addAll(node.cluster().nodes());
- nodes.add(ignite(2).cluster().localNode());
+ BitSet res = spi.checkConnection(nodes).get();
- // spi.pingNodes(nodes);
+ for (int j = 0; j < NODES; j++)
+ assertTrue(res.get(j));
+ }
}
/**