You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/12/18 11:34:43 UTC
[1/4] ignite git commit: IGNITE-7212 Fixed infinite loop in TCP
communication SPI skipping local address
Repository: ignite
Updated Branches:
refs/heads/ignite-zk-ce ed492a4f3 -> fc085a4a0
IGNITE-7212 Fixed infinite loop in TCP communication SPI skipping local address
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/86dc36d0
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/86dc36d0
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/86dc36d0
Branch: refs/heads/ignite-zk-ce
Commit: 86dc36d036f664ace223d3e97445b0016ccab3c0
Parents: e358ae2
Author: Alexey Goncharuk <al...@gmail.com>
Authored: Fri Dec 15 16:37:50 2017 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Fri Dec 15 16:37:50 2017 +0300
----------------------------------------------------------------------
.../apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/86dc36d0/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 69da9ca..3c5b5e9 100755
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -3107,7 +3107,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
log.debug("Skipping local address [addr=" + addr +
", locAddrs=" + node.attribute(createSpiAttributeName(ATTR_ADDRS)) +
", node=" + node + ']');
- continue;
+ break;
}
boolean needWait = false;
[2/4] ignite git commit: IGNITE-7213: Empty class descriptions for
KNNModelFormat
Posted by sb...@apache.org.
IGNITE-7213: Empty class descriptions for KNNModelFormat
this closes #3242
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/91be7aff
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/91be7aff
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/91be7aff
Branch: refs/heads/ignite-zk-ce
Commit: 91be7aff1050b2a62e42ae8ec16cdaf0f2de9629
Parents: 86dc36d
Author: YuriBabak <y....@gmail.com>
Authored: Fri Dec 15 20:22:32 2017 +0300
Committer: Yury Babak <yb...@gridgain.com>
Committed: Fri Dec 15 20:22:32 2017 +0300
----------------------------------------------------------------------
.../java/org/apache/ignite/ml/knn/models/KNNModelFormat.java | 6 +++++-
1 file changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/91be7aff/modules/ml/src/main/java/org/apache/ignite/ml/knn/models/KNNModelFormat.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/knn/models/KNNModelFormat.java b/modules/ml/src/main/java/org/apache/ignite/ml/knn/models/KNNModelFormat.java
index 17d9842..11a23f5 100644
--- a/modules/ml/src/main/java/org/apache/ignite/ml/knn/models/KNNModelFormat.java
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/knn/models/KNNModelFormat.java
@@ -22,7 +22,11 @@ import java.util.Arrays;
import org.apache.ignite.ml.math.distances.DistanceMeasure;
import org.apache.ignite.ml.structures.LabeledDataset;
-/** */
+/**
+ * kNN model representation.
+ *
+ * @see KNNModel
+ */
public class KNNModelFormat implements Serializable {
/** Amount of nearest neighbors. */
private int k;
[4/4] ignite git commit: zk
Posted by sb...@apache.org.
zk
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/fc085a4a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/fc085a4a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/fc085a4a
Branch: refs/heads/ignite-zk-ce
Commit: fc085a4a018c1f90861a76842f3eebdeee0ba567
Parents: 8101455
Author: sboikov <sb...@gridgain.com>
Authored: Mon Dec 18 10:49:43 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Dec 18 12:50:53 2017 +0300
----------------------------------------------------------------------
.../managers/communication/GridIoManager.java | 4 +-
.../communication/GridIoMessageFactory.java | 12 ++++
.../ignite/internal/util/nio/GridNioServer.java | 59 +++++++++++--------
.../TcpCommunicationConnectionCheckFuture.java | 61 ++++++++++++++++----
.../internal/ZkCommunicationErrorNodeState.java | 44 ++++++++++++++
.../ZkCommunicationErrorProcessFuture.java | 41 ++++++++++---
.../ZkCommunicationErrorResolveResult.java | 3 +
.../ZkDistributedCollectDataFuture.java | 11 ++++
.../discovery/zk/internal/ZkRuntimeState.java | 5 ++
.../zk/internal/ZookeeperDiscoveryImpl.java | 37 ++++++++++--
.../ZookeeperDiscoverySpiBasicTest.java | 23 +++++---
11 files changed, 241 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/fc085a4a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 9c6271a..81f00e8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -300,9 +300,9 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
@Override public MessageReader reader(UUID rmtNodeId, MessageFactory msgFactory)
throws IgniteCheckedException {
- assert rmtNodeId != null;
- return new DirectMessageReader(msgFactory, U.directProtocolVersion(ctx, rmtNodeId));
+ return new DirectMessageReader(msgFactory,
+ rmtNodeId != null ? U.directProtocolVersion(ctx, rmtNodeId) : GridIoManager.DIRECT_PROTO_VER);
}
};
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/fc085a4a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 78cb7a8..51a6e25 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -117,6 +117,8 @@ import org.apache.ignite.internal.processors.cache.transactions.TxLocksResponse;
import org.apache.ignite.internal.processors.cache.version.GridCacheRawVersionedEntry;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx;
+import org.apache.ignite.internal.processors.cluster.ClusterMetricsUpdateMessage;
+import org.apache.ignite.internal.processors.continuous.ContinuousRoutineStartResultMessage;
import org.apache.ignite.internal.processors.continuous.GridContinuousMessage;
import org.apache.ignite.internal.processors.datastreamer.DataStreamerEntry;
import org.apache.ignite.internal.processors.datastreamer.DataStreamerRequest;
@@ -879,6 +881,16 @@ public class GridIoMessageFactory implements MessageFactory {
break;
+ case 129:
+ msg = new ClusterMetricsUpdateMessage();
+
+ break;
+
+
+ case 130:
+ msg = new ContinuousRoutineStartResultMessage();
+
+ break;
// [-3..119] [124..128] [-23..-27] [-36..-55]- this
// [120..123] - DR
http://git-wip-us.apache.org/repos/asf/ignite/blob/fc085a4a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index 9784549..e95f957 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -2301,7 +2301,11 @@ public class GridNioServer<T> {
else if (log.isDebugEnabled())
log.debug("Failed to process selector key [ses=" + ses + ", err=" + e + ']');
- close(ses, new GridNioException(e));
+ // Can be null if async connect failed.
+ if (ses != null)
+ close(ses, new GridNioException(e));
+ else
+ closeKey(key);
}
}
}
@@ -2525,6 +2529,34 @@ public class GridNioServer<T> {
}
/**
+ * @param key Key.
+ */
+ private void closeKey(SelectionKey key) {
+ // Shutdown input and output so that remote client will see correct socket close.
+ Socket sock = ((SocketChannel)key.channel()).socket();
+
+ try {
+ try {
+ sock.shutdownInput();
+ }
+ catch (IOException ignored) {
+ // No-op.
+ }
+
+ try {
+ sock.shutdownOutput();
+ }
+ catch (IOException ignored) {
+ // No-op.
+ }
+ }
+ finally {
+ U.close(key, log);
+ U.close(sock, log);
+ }
+ }
+
+ /**
* Closes the session and all associated resources, then notifies the listener.
*
* @param ses Session to be closed.
@@ -2544,8 +2576,6 @@ public class GridNioServer<T> {
sessions.remove(ses);
workerSessions.remove(ses);
- SelectionKey key = ses.key();
-
if (ses.setClosed()) {
ses.onClosed();
@@ -2557,28 +2587,7 @@ public class GridNioServer<T> {
((DirectBuffer)ses.readBuffer()).cleaner().clean();
}
- // Shutdown input and output so that remote client will see correct socket close.
- Socket sock = ((SocketChannel)key.channel()).socket();
-
- try {
- try {
- sock.shutdownInput();
- }
- catch (IOException ignored) {
- // No-op.
- }
-
- try {
- sock.shutdownOutput();
- }
- catch (IOException ignored) {
- // No-op.
- }
- }
- finally {
- U.close(key, log);
- U.close(sock, log);
- }
+ closeKey(ses.key());
if (e != null)
filterChain.onExceptionCaught(ses, e);
http://git-wip-us.apache.org/repos/asf/ignite/blob/fc085a4a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java
index 6cb5622..170ee44 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java
@@ -111,13 +111,6 @@ public class TcpCommunicationConnectionCheckFuture extends GridFutureAdapter<Bit
resBitSet = new BitSet(nodes.size());
}
- /** {@inheritDoc} */
- @Override public void onEvent(Event evt) {
- assert evt instanceof DiscoveryEvent : evt;
- assert evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED ;
-
- }
-
/**
* @param timeout Connect timeout.
*/
@@ -160,6 +153,8 @@ public class TcpCommunicationConnectionCheckFuture extends GridFutureAdapter<Bit
MultipleAddressesConnectFuture fut = new MultipleAddressesConnectFuture(i);
fut.init(addrs);
+
+ futs[i] = fut;
}
}
else
@@ -171,7 +166,7 @@ public class TcpCommunicationConnectionCheckFuture extends GridFutureAdapter<Bit
spi.getSpiContext().addLocalEventListener(this, EVT_NODE_LEFT, EVT_NODE_FAILED);
if (!isDone()) {
- endTime = System.currentTimeMillis() - timeout;
+ endTime = System.currentTimeMillis() + timeout;
spi.getSpiContext().addTimeoutObject(this);
}
@@ -211,8 +206,30 @@ public class TcpCommunicationConnectionCheckFuture extends GridFutureAdapter<Bit
}
/** {@inheritDoc} */
+ @Override public void onEvent(Event evt) {
+ if (isDone())
+ return;
+
+ assert evt instanceof DiscoveryEvent : evt;
+ assert evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED ;
+
+ UUID nodeId = ((DiscoveryEvent)evt).eventNode().id();
+
+ for (int i = 0; i < nodes.size(); i++) {
+ if (nodes.get(i).id().equals(nodeId)) {
+ ConnectFuture fut = futs[i];
+
+ if (fut != null)
+ fut.onNodeFailed();
+
+ return;
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
@Override public void onTimeout() {
- if (!isDone())
+ if (isDone())
return;
ConnectFuture[] futs = this.futs;
@@ -230,6 +247,8 @@ public class TcpCommunicationConnectionCheckFuture extends GridFutureAdapter<Bit
if (super.onDone(res, err)) {
spi.getSpiContext().removeTimeoutObject(this);
+ spi.getSpiContext().removeLocalEventListener(this);
+
return true;
}
@@ -244,6 +263,11 @@ public class TcpCommunicationConnectionCheckFuture extends GridFutureAdapter<Bit
*
*/
void onTimeout();
+
+ /**
+ *
+ */
+ void onNodeFailed();
}
/**
@@ -325,11 +349,16 @@ public class TcpCommunicationConnectionCheckFuture extends GridFutureAdapter<Bit
finish(nodeId(nodeIdx).equals(rmtNodeId));
}
+ /** {@inheritDoc} */
+ @Override public void onNodeFailed() {
+ cancel();
+ }
+
/**
* @param res Result.
* @return {@code True} if result was set by this call.
*/
- boolean finish(boolean res) {
+ public boolean finish(boolean res) {
if (connFutDoneUpdater.compareAndSet(this, 0, 1)) {
onStatusReceived(res);
@@ -369,6 +398,18 @@ public class TcpCommunicationConnectionCheckFuture extends GridFutureAdapter<Bit
}
/** {@inheritDoc} */
+ @Override public void onNodeFailed() {
+ SingleAddressConnectFuture[] futs = this.futs;
+
+ for (int i = 0; i < futs.length; i++) {
+ ConnectFuture fut = futs[i];
+
+ if (fut != null)
+ fut.onNodeFailed();
+ }
+ }
+
+ /** {@inheritDoc} */
@Override public void onTimeout() {
SingleAddressConnectFuture[] futs = this.futs;
http://git-wip-us.apache.org/repos/asf/ignite/blob/fc085a4a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorNodeState.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorNodeState.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorNodeState.java
new file mode 100644
index 0000000..ddc310d
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorNodeState.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.io.Serializable;
+import java.util.BitSet;
+
+/**
+ *
+ */
+public class ZkCommunicationErrorNodeState implements Serializable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private final BitSet commState;
+
+ /** */
+ private final Exception err;
+
+ /**
+ * @param commState Communication state.
+ * @param err Error if failed get communication state..
+ */
+ ZkCommunicationErrorNodeState(BitSet commState, Exception err) {
+ this.commState = commState;
+ this.err = err;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/fc085a4a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java
index d7d4bd1..a6294bd 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java
@@ -17,6 +17,7 @@
package org.apache.ignite.spi.discovery.zk.internal;
+import java.util.BitSet;
import java.util.Collection;
import java.util.List;
import java.util.Map;
@@ -29,6 +30,8 @@ import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.spi.IgniteSpiTimeoutObject;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
@@ -141,19 +144,41 @@ class ZkCommunicationErrorProcessFuture extends GridFutureAdapter<Void> implemen
}
/**
- * @param locNodeOrder Local node order.
* @param rtState Runtime state.
* @param futPath Future path.
* @param nodes Nodes to ping.
- * @throws Exception If failed.
*/
- void pingNodesAndNotifyFuture(long locNodeOrder, ZkRuntimeState rtState, String futPath, List<ClusterNode> nodes)
- throws Exception {
- TcpCommunicationSpi spi = (TcpCommunicationSpi)impl.spi.ignite().configuration().getCommunicationSpi();
-
- spi.checkConnection(nodes);
+ void pingNodesAndNotifyFuture(final ZkRuntimeState rtState, final String futPath, List<ClusterNode> nodes) throws Exception {
+ final TcpCommunicationSpi spi = (TcpCommunicationSpi)impl.spi.ignite().configuration().getCommunicationSpi();
+
+ IgniteFuture<BitSet> fut = spi.checkConnection(nodes);
+
+ fut.listen(new IgniteInClosure<IgniteFuture<BitSet>>() {
+ @Override public void apply(final IgniteFuture<BitSet> fut) {
+ // Future completed either from NIO thread or timeout worker, save result from another thread.
+ impl.runInWorkerThread(new ZkRunnable(rtState, impl) {
+ @Override public void run0() throws Exception {
+ BitSet commState = null;
+ Exception err = null;
+
+ try {
+ commState = fut.get();
+ }
+ catch (Exception e) {
+ err = e;
+ }
+
+ ZkCommunicationErrorNodeState state = new ZkCommunicationErrorNodeState(commState, err);
+
+ ZkDistributedCollectDataFuture.saveNodeResult(futPath,
+ rtState.zkClient,
+ impl.localNode().order(),
+ impl.marshalZip(state));
+ }
+ });
- ZkDistributedCollectDataFuture.saveNodeResult(futPath, rtState.zkClient, locNodeOrder, null);
+ }
+ });
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/fc085a4a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveResult.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveResult.java
index 745496b..607f93b 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveResult.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveResult.java
@@ -31,6 +31,9 @@ class ZkCommunicationErrorResolveResult implements Serializable {
/** */
final GridLongList failedNodes;
+ /**
+ * @param failedNodes
+ */
ZkCommunicationErrorResolveResult(@Nullable GridLongList failedNodes) {
this.failedNodes = failedNodes;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/fc085a4a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDistributedCollectDataFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDistributedCollectDataFuture.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDistributedCollectDataFuture.java
index e5d2356..19e2acc 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDistributedCollectDataFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDistributedCollectDataFuture.java
@@ -107,6 +107,17 @@ class ZkDistributedCollectDataFuture extends GridFutureAdapter<Void> {
}
/**
+ * @param futPath
+ * @param client
+ * @param nodeOrder
+ * @return Node result data.
+ * @throws Exception If fai.ed
+ */
+ static byte[] readNodeResult(String futPath, ZookeeperClient client, long nodeOrder) throws Exception {
+ return client.getData(futPath + "/" + nodeOrder);
+ }
+
+ /**
* @param futResPath Result path.
* @param client Client.
* @param data Result data.
http://git-wip-us.apache.org/repos/asf/ignite/blob/fc085a4a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java
index fc03f8d..dc7b1bb 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java
@@ -17,6 +17,8 @@
package org.apache.ignite.spi.discovery.zk.internal;
+import java.util.List;
+import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.spi.IgniteSpiTimeoutObject;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.Watcher;
@@ -73,6 +75,9 @@ class ZkRuntimeState {
/** */
final ZkClusterNodes top = new ZkClusterNodes();
+ /** */
+ List<ClusterNode> commErrProcNodes;
+
/**
* @param prevJoined {@code True} if joined topology before reconnect attempt.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/fc085a4a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
index 65bf6e7..62fc581 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
@@ -2056,6 +2056,8 @@ public class ZookeeperDiscoveryImpl {
rtState.evtsData.communicationErrorResolveFutureId(null);
+ rtState.commErrProcNodes = null;
+
ZkCommunicationErrorResolveResult res = msg.res;
if (res == null)
@@ -2120,7 +2122,8 @@ public class ZookeeperDiscoveryImpl {
final String futPath = zkPaths.distributedFutureBasePath(msg.id);
final ZkCommunicationErrorProcessFuture fut0 = fut;
- final List<ClusterNode> topSnapshot = rtState.top.topologySnapshot();
+
+ rtState.commErrProcNodes = rtState.top.topologySnapshot();
if (rtState.crd) {
ZkDistributedCollectDataFuture nodeResFut = collectCommunicationStatusFuture(msg.id);
@@ -2130,7 +2133,7 @@ public class ZookeeperDiscoveryImpl {
runInWorkerThread(new ZkRunnable(rtState, this) {
@Override protected void run0() throws Exception {
- fut0.pingNodesAndNotifyFuture(locNode.order(), rtState, futPath, topSnapshot);
+ fut0.pingNodesAndNotifyFuture(rtState, futPath, rtState.commErrProcNodes);
}
});
}
@@ -2145,7 +2148,7 @@ public class ZookeeperDiscoveryImpl {
new Callable<Void>() {
@Override public Void call() throws Exception {
// Future is completed from ZK event thread.
- onCommunicationResolveStatusReceived(rtState);
+ onCommunicationErrorResolveStatusReceived(rtState);
return null;
}
@@ -2157,16 +2160,38 @@ public class ZookeeperDiscoveryImpl {
* @param rtState Runtime state.
* @throws Exception If failed.
*/
- private void onCommunicationResolveStatusReceived(ZkRuntimeState rtState) throws Exception {
+ private void onCommunicationErrorResolveStatusReceived(ZkRuntimeState rtState) throws Exception {
ZkDiscoveryEventsData evtsData = rtState.evtsData;
UUID futId = rtState.evtsData.communicationErrorResolveFutureId();
if (log.isInfoEnabled())
- log.info("Received communication status from all nodes, call resolver [reqId=" + futId + ']');
+ log.info("Received communication status from all nodes [reqId=" + futId + ']');
assert futId != null;
+ String futPath = zkPaths.distributedFutureBasePath(futId);
+
+ List<ClusterNode> initialNodes = rtState.commErrProcNodes;
+
+ assert initialNodes != null;
+
+ rtState.commErrProcNodes = null;
+
+ ZkClusterNodes top = rtState.top;
+
+ List<ZkCommunicationErrorNodeState> nodesRes = new ArrayList<>();
+
+ for (ZookeeperClusterNode node : top.nodesByOrder.values()) {
+ byte[] stateBytes = ZkDistributedCollectDataFuture.readNodeResult(futPath,
+ rtState.zkClient,
+ node.order());
+
+ ZkCommunicationErrorNodeState nodeState = unmarshalZip(stateBytes);
+
+ nodesRes.add(nodeState);
+ }
+
ZkCommunicationErrorResolveFinishMessage msg = new ZkCommunicationErrorResolveFinishMessage(futId);
ZkCommunicationErrorResolveResult res = new ZkCommunicationErrorResolveResult(null);
@@ -2663,7 +2688,7 @@ public class ZookeeperDiscoveryImpl {
* @return Bytes.
* @throws IgniteCheckedException If failed.
*/
- private byte[] marshalZip(Object obj) throws IgniteCheckedException {
+ byte[] marshalZip(Object obj) throws IgniteCheckedException {
assert obj != null;
return U.zip(marsh.marshal(obj));
http://git-wip-us.apache.org/repos/asf/ignite/blob/fc085a4a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
index a2e8784..cee2e76 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
@@ -1916,20 +1916,27 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest {
}
/**
- * TODO ZK: move to comm spi tests.
- *
* @throws Exception If failed.
*/
- public void testNodesPing() throws Exception {
- startGrids(3);
+ public void testConnectionCheck() throws Exception {
+ final int NODES = 5;
+
+ startGridsMultiThreaded(NODES);
+
+ for (int i = 0; i < NODES; i++) {
+ Ignite node = ignite(i);
+
+ TcpCommunicationSpi spi = (TcpCommunicationSpi)node.configuration().getCommunicationSpi();
- TcpCommunicationSpi spi = (TcpCommunicationSpi)ignite(1).configuration().getCommunicationSpi();
+ List<ClusterNode> nodes = new ArrayList<>();
- List<ClusterNode> nodes = new ArrayList<>();
+ nodes.addAll(node.cluster().nodes());
- nodes.add(ignite(2).cluster().localNode());
+ BitSet res = spi.checkConnection(nodes).get();
- // spi.pingNodes(nodes);
+ for (int j = 0; j < NODES; j++)
+ assertTrue(res.get(j));
+ }
}
/**
[3/4] ignite git commit: Merge remote-tracking branch
'remotes/origin/master' into ignite-zk-ce
Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/origin/master' into ignite-zk-ce
# Conflicts:
# modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/81014550
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/81014550
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/81014550
Branch: refs/heads/ignite-zk-ce
Commit: 8101455034a723eadf94dabe4f5d8f1fc0ae5606
Parents: ed492a4 91be7af
Author: sboikov <sb...@gridgain.com>
Authored: Mon Dec 18 10:38:07 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Dec 18 10:38:07 2017 +0300
----------------------------------------------------------------------
.../spi/communication/tcp/TcpCommunicationSpi.java | 3 +--
.../TcpCommunicationConnectionCheckFuture.java | 17 ++++++++++++++++-
.../ignite/ml/knn/models/KNNModelFormat.java | 6 +++++-
3 files changed, 22 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/81014550/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/81014550/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java
index 99e1eca,0000000..6cb5622
mode 100644,000000..100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java
@@@ -1,461 -1,0 +1,476 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.communication.tcp.internal;
+
+import java.net.InetSocketAddress;
+import java.nio.channels.SocketChannel;
+import java.util.BitSet;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
++import org.apache.ignite.events.DiscoveryEvent;
++import org.apache.ignite.events.Event;
+import org.apache.ignite.internal.IgniteInternalFuture;
++import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
+import org.apache.ignite.internal.util.GridLeanMap;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.nio.GridNioServer;
+import org.apache.ignite.internal.util.nio.GridNioSession;
+import org.apache.ignite.internal.util.nio.GridNioSessionMetaKey;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.spi.IgniteSpiTimeoutObject;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.jetbrains.annotations.Nullable;
+
++import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
++import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
++
+/**
+ *
+ */
- public class TcpCommunicationConnectionCheckFuture extends GridFutureAdapter<BitSet> implements IgniteSpiTimeoutObject {
++public class TcpCommunicationConnectionCheckFuture extends GridFutureAdapter<BitSet> implements IgniteSpiTimeoutObject, GridLocalEventListener {
+ /** Session future. */
+ public static final int SES_FUT_META = GridNioSessionMetaKey.nextUniqueKey();
+
+ /** */
+ public static final ConnectionKey CONN_CHECK_DUMMY_KEY = new ConnectionKey(null, -1, -1);
+
+ /** */
+ private static final AtomicIntegerFieldUpdater<SingleAddressConnectFuture> connFutDoneUpdater =
+ AtomicIntegerFieldUpdater.newUpdater(SingleAddressConnectFuture.class, "done");
+
+ /** */
+ private static final AtomicIntegerFieldUpdater<MultipleAddressesConnectFuture> connResCntUpdater =
+ AtomicIntegerFieldUpdater.newUpdater(MultipleAddressesConnectFuture.class, "resCnt");
+
+ /** */
+ private final AtomicInteger resCntr = new AtomicInteger();
+
+ /** */
+ private final List<ClusterNode> nodes;
+
+ /** */
+ private volatile ConnectFuture[] futs;
+
+ /** */
+ private final GridNioServer nioSrvr;
+
+ /** */
+ private final TcpCommunicationSpi spi;
+
+ /** */
+ private final IgniteUuid timeoutObjId = IgniteUuid.randomUuid();
+
+ /** */
+ private final BitSet resBitSet;
+
+ /** */
+ private long endTime;
+
+ /** */
+ private final IgniteLogger log;
+
+ /**
+ * @param spi SPI instance.
+ * @param log Logger.
+ * @param nioSrvr NIO server.
+ * @param nodes Nodes to check.
+ */
+ public TcpCommunicationConnectionCheckFuture(TcpCommunicationSpi spi,
+ IgniteLogger log,
+ GridNioServer nioSrvr,
+ List<ClusterNode> nodes)
+ {
+ this.spi = spi;
+ this.log = log;
+ this.nioSrvr = nioSrvr;
+ this.nodes = nodes;
+
+ resBitSet = new BitSet(nodes.size());
+ }
+
++ /** {@inheritDoc} */
++ @Override public void onEvent(Event evt) {
++ assert evt instanceof DiscoveryEvent : evt;
++ assert evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED ;
++
++ }
++
+ /**
+ * @param timeout Connect timeout.
+ */
+ public void init(long timeout) {
+ ConnectFuture[] futs = new ConnectFuture[nodes.size()];
+
+ UUID locId = spi.getSpiContext().localNode().id();
+
+ for (int i = 0; i < nodes.size(); i++) {
+ ClusterNode node = nodes.get(i);
+
+ if (!node.id().equals(locId)) {
+ if (spi.getSpiContext().node(node.id()) == null) {
+ receivedConnectionStatus(i, false);
+
+ continue;
+ }
+
+ Collection<InetSocketAddress> addrs;
+
+ try {
+ addrs = spi.nodeAddresses(node, false);
+ }
+ catch (Exception e) {
+ U.error(log, "Failed to get node addresses: " + node, e);
+
+ receivedConnectionStatus(i, false);
+
+ continue;
+ }
+
+ if (addrs.size() == 1) {
+ SingleAddressConnectFuture fut = new SingleAddressConnectFuture(i);
+
+ fut.init(addrs.iterator().next());
+
+ futs[i] = fut;
+ }
+ else {
+ MultipleAddressesConnectFuture fut = new MultipleAddressesConnectFuture(i);
+
+ fut.init(addrs);
+ }
+ }
+ else
+ receivedConnectionStatus(i, true);
+ }
+
+ this.futs = futs;
+
++ spi.getSpiContext().addLocalEventListener(this, EVT_NODE_LEFT, EVT_NODE_FAILED);
++
+ if (!isDone()) {
+ endTime = System.currentTimeMillis() - timeout;
+
+ spi.getSpiContext().addTimeoutObject(this);
+ }
+ }
+
+ /**
+ * @param idx Node index.
+ * @param res Success flag.
+ */
+ private void receivedConnectionStatus(int idx, boolean res) {
+ assert resCntr.get() < nodes.size();
+
+ synchronized (resBitSet) {
+ resBitSet.set(idx, res);
+ }
+
+ if (resCntr.incrementAndGet() == nodes.size())
+ onDone(resBitSet);
+ }
+
+ /**
+ * @param nodeIdx Node index.
+ * @return Node ID.
+ */
+ private UUID nodeId(int nodeIdx) {
+ return nodes.get(nodeIdx).id();
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteUuid id() {
+ return timeoutObjId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long endTime() {
+ return endTime;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onTimeout() {
+ if (!isDone())
+ return;
+
+ ConnectFuture[] futs = this.futs;
+
+ for (int i = 0; i < futs.length; i++) {
+ ConnectFuture fut = futs[i];
+
+ if (fut != null)
+ fut.onTimeout();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean onDone(@Nullable BitSet res, @Nullable Throwable err) {
+ if (super.onDone(res, err)) {
+ spi.getSpiContext().removeTimeoutObject(this);
+
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
+ *
+ */
+ private interface ConnectFuture {
+ /**
+ *
+ */
+ void onTimeout();
+ }
+
+ /**
+ *
+ */
+ private class SingleAddressConnectFuture implements TcpCommunicationNodeConnectionCheckFuture, ConnectFuture {
+ /** */
+ final int nodeIdx;
+
+ /** */
+ volatile int done;
+
+ /** */
+ Map<Integer, Object> sesMeta;
+
+ /** */
+ private SocketChannel ch;
+
+ /**
+ * @param nodeIdx Node index.
+ */
+ SingleAddressConnectFuture(int nodeIdx) {
+ this.nodeIdx = nodeIdx;
+ }
+
+ /**
+ * @param addr Node address.
+ */
+ public void init(InetSocketAddress addr) {
+ boolean connect;
+
+ try {
+ ch = SocketChannel.open();
+
+ ch.configureBlocking(false);
+
+ ch.socket().setTcpNoDelay(true);
+ ch.socket().setKeepAlive(false);
+
+ connect = ch.connect(addr);
+ }
+ catch (Exception e) {
+ finish(false);
+
+ return;
+ }
+
+ if (!connect) {
+ sesMeta = new GridLeanMap<>(3);
+
+ // Set dummy key to identify connection-check outgoing connection.
+ sesMeta.put(TcpCommunicationSpi.CONN_IDX_META, CONN_CHECK_DUMMY_KEY);
+ sesMeta.put(SES_FUT_META, this);
+
+ nioSrvr.createSession(ch, sesMeta, true, new IgniteInClosure<IgniteInternalFuture<GridNioSession>>() {
+ @Override public void apply(IgniteInternalFuture<GridNioSession> fut) {
+ if (fut.error() != null)
+ finish(false);
+ }
+ });
+ }
+ }
+
+ /**
+ *
+ */
+ void cancel() {
+ if (finish(false))
+ nioSrvr.cancelConnect(ch, sesMeta);
+ }
+
+ /** {@inheritDoc} */
+ public void onTimeout() {
+ cancel();
+ }
+
+ /** {@inheritDoc} */
+ public void onConnected(UUID rmtNodeId) {
+ finish(nodeId(nodeIdx).equals(rmtNodeId));
+ }
+
+ /**
+ * @param res Result.
+ * @return {@code True} if result was set by this call.
+ */
+ boolean finish(boolean res) {
+ if (connFutDoneUpdater.compareAndSet(this, 0, 1)) {
+ onStatusReceived(res);
+
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
+ * @param res Result.
+ */
+ void onStatusReceived(boolean res) {
+ receivedConnectionStatus(nodeIdx, res);
+ }
+ }
+
+ /**
+ *
+ */
+ private class MultipleAddressesConnectFuture implements ConnectFuture {
+ /** */
+ volatile int resCnt;
+
+ /** */
+ volatile SingleAddressConnectFuture[] futs;
+
+ /** */
+ final int nodeIdx;
+
+ /**
+ * @param nodeIdx Node index.
+ */
+ MultipleAddressesConnectFuture(int nodeIdx) {
+ this.nodeIdx = nodeIdx;
+
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onTimeout() {
+ SingleAddressConnectFuture[] futs = this.futs;
+
+ for (int i = 0; i < futs.length; i++) {
+ ConnectFuture fut = futs[i];
+
+ if (fut != null)
+ fut.onTimeout();
+ }
+ }
+
+ /**
+ * @param addrs Node addresses.
+ */
+ void init(Collection<InetSocketAddress> addrs) {
+ SingleAddressConnectFuture[] futs = new SingleAddressConnectFuture[addrs.size()];
+
+ int idx = 0;
+
+ for (InetSocketAddress addr : addrs) {
+ SingleAddressConnectFuture fut = new SingleAddressConnectFuture(nodeIdx) {
+ @Override void onStatusReceived(boolean res) {
+ receivedAddressStatus(res);
+ }
+ };
+
+ fut.init(addr);
+
+ futs[idx++] = fut;
+
+ if (done())
+ return;
+ }
+
+ this.futs = futs;
+
+ // Close race.
+ if (done())
+ cancelFutures();
+ }
+
+ /**
+ * @return {@code True}
+ */
+ private boolean done() {
+ int resCnt0 = resCnt;
+
+ return resCnt0 == Integer.MAX_VALUE || resCnt0 == futs.length;
+ }
+
+ /**
+ *
+ */
+ private void cancelFutures() {
+ SingleAddressConnectFuture[] futs = this.futs;
+
+ if (futs != null) {
+ for (int i = 0; i < futs.length; i++) {
+ SingleAddressConnectFuture fut = futs[i];
+
+ fut.cancel();
+ }
+ }
+ }
+
+ /**
+ * @param res Result.
+ */
+ void receivedAddressStatus(boolean res) {
+ if (res) {
+ for (;;) {
+ int resCnt0 = resCnt;
+
+ if (resCnt0 == Integer.MAX_VALUE)
+ return;
+
+ if (connResCntUpdater.compareAndSet(this, resCnt0, Integer.MAX_VALUE)) {
+ receivedConnectionStatus(nodeIdx, true);
+
+ cancelFutures(); // Cancel others connects if they are still in progress.
+
+ return;
+ }
+ }
+ }
+ else {
+ for (;;) {
+ int resCnt0 = resCnt;
+
+ if (resCnt0 == Integer.MAX_VALUE)
+ return;
+
+ int resCnt1 = resCnt0 + 1;
+
+ if (connResCntUpdater.compareAndSet(this, resCnt0, resCnt1)) {
+ if (resCnt1 == futs.length)
+ receivedConnectionStatus(nodeIdx, false);
+
+ return;
+ }
+ }
+ }
+ }
+ }
+}