You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/12/20 08:04:40 UTC
[32/50] [abbrv] ignite git commit: zk
zk
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ed492a4f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ed492a4f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ed492a4f
Branch: refs/heads/ignite-zk
Commit: ed492a4f3aca046b6e196420bd986d0f47232aed
Parents: 78a994a
Author: sboikov <sb...@gridgain.com>
Authored: Fri Dec 15 16:00:49 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Dec 15 17:08:26 2017 +0300
----------------------------------------------------------------------
.../communication/GridIoMessageFactory.java | 17 -
.../communication/tcp/TcpCommunicationSpi.java | 148 ++-----
.../tcp/internal/ConnectionKey.java | 92 ++++
.../TcpCommunicationConnectionCheckFuture.java | 423 +++++++++++++++++--
...pCommunicationNodeConnectionCheckFuture.java | 30 ++
.../GridTcpCommunicationSpiAbstractTest.java | 83 ++--
6 files changed, 618 insertions(+), 175 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/ed492a4f/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index a30c439..78cb7a8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -117,8 +117,6 @@ import org.apache.ignite.internal.processors.cache.transactions.TxLocksResponse;
import org.apache.ignite.internal.processors.cache.version.GridCacheRawVersionedEntry;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx;
-import org.apache.ignite.internal.processors.cluster.ClusterMetricsUpdateMessage;
-import org.apache.ignite.internal.processors.continuous.ContinuousRoutineStartResultMessage;
import org.apache.ignite.internal.processors.continuous.GridContinuousMessage;
import org.apache.ignite.internal.processors.datastreamer.DataStreamerEntry;
import org.apache.ignite.internal.processors.datastreamer.DataStreamerRequest;
@@ -186,11 +184,6 @@ public class GridIoMessageFactory implements MessageFactory {
switch (type) {
// -54 is reserved for SQL.
// -46 ... -51 - snapshot messages.
- case -62:
- msg = new TcpCommunicationConnectionCheckMessage();
-
- break;
-
case -61:
msg = new IgniteDiagnosticMessage();
@@ -886,16 +879,6 @@ public class GridIoMessageFactory implements MessageFactory {
break;
- case 129:
- msg = new ClusterMetricsUpdateMessage();
-
- break;
-
- case 130:
- msg = new ContinuousRoutineStartResultMessage();
-
- break;
-
// [-3..119] [124..128] [-23..-27] [-36..-55]- this
// [120..123] - DR
http://git-wip-us.apache.org/repos/asf/ignite/blob/ed492a4f/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 3588a79..afa6953 100755
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -72,6 +72,7 @@ import org.apache.ignite.internal.util.GridConcurrentFactory;
import org.apache.ignite.internal.util.GridSpinReadWriteLock;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.future.IgniteFutureImpl;
import org.apache.ignite.internal.util.ipc.IpcEndpoint;
import org.apache.ignite.internal.util.ipc.IpcToNioAdapter;
import org.apache.ignite.internal.util.ipc.shmem.IpcOutOfSystemResourcesException;
@@ -82,7 +83,6 @@ import org.apache.ignite.internal.util.nio.GridConnectionBytesVerifyFilter;
import org.apache.ignite.internal.util.nio.GridDirectParser;
import org.apache.ignite.internal.util.nio.GridNioCodecFilter;
import org.apache.ignite.internal.util.nio.GridNioFilter;
-import org.apache.ignite.internal.util.nio.GridNioFuture;
import org.apache.ignite.internal.util.nio.GridNioMessageReaderFactory;
import org.apache.ignite.internal.util.nio.GridNioMessageTracker;
import org.apache.ignite.internal.util.nio.GridNioMessageWriterFactory;
@@ -135,11 +135,13 @@ import org.apache.ignite.spi.IgniteSpiThread;
import org.apache.ignite.spi.IgniteSpiTimeoutObject;
import org.apache.ignite.spi.communication.CommunicationListener;
import org.apache.ignite.spi.communication.CommunicationSpi;
+import org.apache.ignite.spi.communication.tcp.internal.ConnectionKey;
+import org.apache.ignite.spi.communication.tcp.internal.TcpCommunicationConnectionCheckFuture;
+import org.apache.ignite.spi.communication.tcp.internal.TcpCommunicationNodeConnectionCheckFuture;
import org.apache.ignite.spi.communication.tcp.messages.HandshakeMessage;
import org.apache.ignite.spi.communication.tcp.messages.HandshakeMessage2;
import org.apache.ignite.spi.communication.tcp.messages.NodeIdMessage;
import org.apache.ignite.spi.communication.tcp.messages.RecoveryLastReceivedMessage;
-import org.apache.ignite.spi.communication.tcp.internal.TcpCommunicationConnectionCheckFuture;
import org.apache.ignite.spi.discovery.DiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.thread.IgniteThread;
@@ -149,6 +151,8 @@ import org.jsr166.ConcurrentLinkedDeque8;
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.SSL_META;
+import static org.apache.ignite.spi.communication.tcp.internal.TcpCommunicationConnectionCheckFuture.CONN_CHECK_DUMMY_KEY;
+import static org.apache.ignite.spi.communication.tcp.internal.TcpCommunicationConnectionCheckFuture.SES_FUT_META;
import static org.apache.ignite.spi.communication.tcp.messages.RecoveryLastReceivedMessage.ALREADY_CONNECTED;
import static org.apache.ignite.spi.communication.tcp.messages.RecoveryLastReceivedMessage.NEED_WAIT;
import static org.apache.ignite.spi.communication.tcp.messages.RecoveryLastReceivedMessage.NODE_STOPPING;
@@ -318,12 +322,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
/** Message tracker meta for session. */
private static final int TRACKER_META = GridNioSessionMetaKey.nextUniqueKey();
- /** Session future. */
- public static final int SES_FUT_META = GridNioSessionMetaKey.nextUniqueKey();
-
- /** */
- public static final ConnectionKey CONN_CHECK_DUMMY_KEY = new ConnectionKey(null, -1, -1);
-
/**
* Default local port range (value is <tt>100</tt>).
* See {@link #setLocalPortRange(int)} for details.
@@ -409,17 +407,18 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
else {
ConnectionKey connId = ses.meta(CONN_IDX_META);
- if (connId != CONN_CHECK_DUMMY_KEY) {
- if (log.isInfoEnabled())
- log.info("Established outgoing communication connection [locAddr=" + ses.localAddress() +
- ", rmtAddr=" + ses.remoteAddress() + ']');
- }
+ if (log.isInfoEnabled())
+ log.info("Established outgoing communication connection [locAddr=" + ses.localAddress() +
+ ", rmtAddr=" + ses.remoteAddress() + ']');
}
}
@Override public void onDisconnected(GridNioSession ses, @Nullable Exception e) {
ConnectionKey connId = ses.meta(CONN_IDX_META);
+ if (connId == CONN_CHECK_DUMMY_KEY)
+ return;
+
if (connId != null) {
UUID id = connId.nodeId();
@@ -691,7 +690,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
ConnectionKey connKey = ses.meta(CONN_IDX_META);
if (connKey == null) {
- assert ses.accepted() : msg;
+ assert ses.accepted() : ses;
if (!connectGate.tryEnter()) {
if (log.isDebugEnabled())
@@ -714,9 +713,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
}
}
else {
- metricsLsnr.onMessageReceived(msg, connKey.nodeId());
-
if (msg instanceof RecoveryLastReceivedMessage) {
+ metricsLsnr.onMessageReceived(msg, connKey.nodeId());
+
GridNioRecoveryDescriptor recovery = ses.outRecoveryDescriptor();
if (recovery != null) {
@@ -729,9 +728,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
}
recovery.ackReceived(msg0.received());
-
- return;
}
+
+ return;
}
else {
GridNioRecoveryDescriptor recovery = ses.inRecoveryDescriptor();
@@ -754,9 +753,11 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
else if (connKey == CONN_CHECK_DUMMY_KEY) {
assert msg instanceof NodeIdMessage : msg;
- TcpCommunicationConnectionCheckFuture fut = ses.meta(SES_FUT_META);
+ TcpCommunicationNodeConnectionCheckFuture fut = ses.meta(SES_FUT_META);
+
+ assert fut != null : msg;
- fut.onConnected(U.bytesToUuid(((NodeIdMessage) msg).nodeIdBytes, 0));
+ fut.onConnected(U.bytesToUuid(((NodeIdMessage)msg).nodeIdBytes(), 0));
nioSrvr.closeFromWorkerThread(ses);
@@ -764,6 +765,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
}
}
+ metricsLsnr.onMessageReceived(msg, connKey.nodeId());
+
IgniteRunnable c;
if (msgQueueLimit > 0) {
@@ -2592,18 +2595,17 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
sendMessage0(node, msg, null);
}
+ /** {@inheritDoc} */
public IgniteFuture<BitSet> checkConnection(List<ClusterNode> nodes) {
- ClusterNode node = nodes.get(0);
-
- try {
- Collection<InetSocketAddress> addrs = nodeAddresses(node);
+ TcpCommunicationConnectionCheckFuture fut = new TcpCommunicationConnectionCheckFuture(
+ this,
+ log.getLogger(TcpCommunicationConnectionCheckFuture.class),
+ nioSrvr,
+ nodes);
- }
- catch (Exception e) {
- throw new IgniteSpiException(e);
- }
+ fut.init(failureDetectionTimeoutEnabled() ? failureDetectionTimeout() : connTimeout);
- return null;
+ return new IgniteFutureImpl<>(fut);
}
/**
@@ -3028,7 +3030,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
ConnectionKey id = ses.meta(CONN_IDX_META);
if (id != null) {
- ClusterNode node = getSpiContext().node(id.nodeId);
+ ClusterNode node = getSpiContext().node(id.nodeId());
if (node != null && node.isClient()) {
String msg = "Client node outbound message queue size exceeded slowClientQueueLimit, " +
@@ -3049,9 +3051,20 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
/**
* @param node Node.
* @return Node addresses.
+ * @throws IgniteCheckedException If failed.
+ */
+ private Collection<InetSocketAddress> nodeAddresses(ClusterNode node) throws IgniteCheckedException {
+ return nodeAddresses(node, filterReachableAddresses);
+ }
+
+ /**
+ * @param node Node.
+ * @param filterReachableAddresses Filter addresses flag.
+ * @return Node addresses.
* @throws IgniteCheckedException If node does not have addresses.
*/
- private LinkedHashSet<InetSocketAddress> nodeAddresses(ClusterNode node) throws IgniteCheckedException {
+ public Collection<InetSocketAddress> nodeAddresses(ClusterNode node, boolean filterReachableAddresses)
+ throws IgniteCheckedException {
Collection<String> rmtAddrs0 = node.attribute(createSpiAttributeName(ATTR_ADDRS));
Collection<String> rmtHostNames0 = node.attribute(createSpiAttributeName(ATTR_HOST_NAMES));
Integer boundPort = node.attribute(createSpiAttributeName(ATTR_PORT));
@@ -3132,7 +3145,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
* @throws IgniteCheckedException If failed.
*/
protected GridCommunicationClient createTcpClient(ClusterNode node, int connIdx) throws IgniteCheckedException {
- LinkedHashSet<InetSocketAddress> addrs = nodeAddresses(node);
+ Collection<InetSocketAddress> addrs = nodeAddresses(node);
GridCommunicationClient client = null;
IgniteCheckedException errs = null;
@@ -4629,77 +4642,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
/**
*
*/
- private static class ConnectionKey {
- /** */
- private final UUID nodeId;
-
- /** */
- private final int idx;
-
- /** */
- private final long connCnt;
-
- /**
- * @param nodeId Node ID.
- * @param idx Connection index.
- * @param connCnt Connection counter (set only for incoming connections).
- */
- ConnectionKey(UUID nodeId, int idx, long connCnt) {
- this.nodeId = nodeId;
- this.idx = idx;
- this.connCnt = connCnt;
- }
-
- /**
- * @return Connection counter.
- */
- long connectCount() {
- return connCnt;
- }
-
- /**
- * @return Node ID.
- */
- UUID nodeId() {
- return nodeId;
- }
-
- /**
- * @return Connection index.
- */
- int connectionIndex() {
- return idx;
- }
-
- /** {@inheritDoc} */
- @Override public boolean equals(Object o) {
- if (this == o)
- return true;
-
- if (o == null || getClass() != o.getClass())
- return false;
-
- ConnectionKey key = (ConnectionKey) o;
-
- return idx == key.idx && nodeId.equals(key.nodeId);
- }
-
- /** {@inheritDoc} */
- @Override public int hashCode() {
- int res = nodeId.hashCode();
- res = 31 * res + idx;
- return res;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(ConnectionKey.class, this);
- }
- }
-
- /**
- *
- */
interface ConnectionPolicy {
/**
* @return Thread connection index.
http://git-wip-us.apache.org/repos/asf/ignite/blob/ed492a4f/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/ConnectionKey.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/ConnectionKey.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/ConnectionKey.java
new file mode 100644
index 0000000..6716446
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/ConnectionKey.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.communication.tcp.internal;
+
+import java.util.UUID;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ *
+ */
+public class ConnectionKey {
+ /** */
+ private final UUID nodeId;
+
+ /** */
+ private final int idx;
+
+ /** */
+ private final long connCnt;
+
+ /**
+ * @param nodeId Node ID.
+ * @param idx Connection index.
+ * @param connCnt Connection counter (set only for incoming connections).
+ */
+ public ConnectionKey(UUID nodeId, int idx, long connCnt) {
+ this.nodeId = nodeId;
+ this.idx = idx;
+ this.connCnt = connCnt;
+ }
+
+ /**
+ * @return Connection counter.
+ */
+ public long connectCount() {
+ return connCnt;
+ }
+
+ /**
+ * @return Node ID.
+ */
+ public UUID nodeId() {
+ return nodeId;
+ }
+
+ /**
+ * @return Connection index.
+ */
+ public int connectionIndex() {
+ return idx;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ ConnectionKey key = (ConnectionKey) o;
+
+ return idx == key.idx && nodeId.equals(key.nodeId);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ int res = nodeId.hashCode();
+ res = 31 * res + idx;
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(ConnectionKey.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ed492a4f/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java
index e08ea13..99e1eca 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java
@@ -17,84 +17,445 @@
package org.apache.ignite.spi.communication.tcp.internal;
-import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;
+import java.util.BitSet;
+import java.util.Collection;
+import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.util.GridLeanMap;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.nio.GridNioServer;
import org.apache.ignite.internal.util.nio.GridNioSession;
+import org.apache.ignite.internal.util.nio.GridNioSessionMetaKey;
+import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.spi.IgniteSpiTimeoutObject;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.jetbrains.annotations.Nullable;
/**
*
*/
-public class TcpCommunicationConnectionCheckFuture extends GridFutureAdapter<Boolean> {
+public class TcpCommunicationConnectionCheckFuture extends GridFutureAdapter<BitSet> implements IgniteSpiTimeoutObject {
+ /** Session future. */
+ public static final int SES_FUT_META = GridNioSessionMetaKey.nextUniqueKey();
+
+ /** */
+ public static final ConnectionKey CONN_CHECK_DUMMY_KEY = new ConnectionKey(null, -1, -1);
+
+ /** */
+ private static final AtomicIntegerFieldUpdater<SingleAddressConnectFuture> connFutDoneUpdater =
+ AtomicIntegerFieldUpdater.newUpdater(SingleAddressConnectFuture.class, "done");
+
+ /** */
+ private static final AtomicIntegerFieldUpdater<MultipleAddressesConnectFuture> connResCntUpdater =
+ AtomicIntegerFieldUpdater.newUpdater(MultipleAddressesConnectFuture.class, "resCnt");
+
+ /** */
+ private final AtomicInteger resCntr = new AtomicInteger();
+
/** */
- private final UUID nodeId;
+ private final List<ClusterNode> nodes;
+
+ /** */
+ private volatile ConnectFuture[] futs;
/** */
private final GridNioServer nioSrvr;
/** */
- private Map<Integer, Object> sesMeta;
+ private final TcpCommunicationSpi spi;
+
+ /** */
+ private final IgniteUuid timeoutObjId = IgniteUuid.randomUuid();
+
+ /** */
+ private final BitSet resBitSet;
+
+ /** */
+ private long endTime;
/** */
- private SocketChannel ch;
+ private final IgniteLogger log;
/**
- * @param nodeId Remote note ID.
+ * @param spi SPI instance.
+ * @param log Logger.
+ * @param nioSrvr NIO server.
+ * @param nodes Nodes to check.
*/
- public TcpCommunicationConnectionCheckFuture(GridNioServer nioSrvr, UUID nodeId) {
+ public TcpCommunicationConnectionCheckFuture(TcpCommunicationSpi spi,
+ IgniteLogger log,
+ GridNioServer nioSrvr,
+ List<ClusterNode> nodes)
+ {
+ this.spi = spi;
+ this.log = log;
this.nioSrvr = nioSrvr;
- this.nodeId = nodeId;
+ this.nodes = nodes;
+
+ resBitSet = new BitSet(nodes.size());
}
/**
- * @param addr
- * @throws IOException
+ * @param timeout Connect timeout.
*/
- public void init(InetSocketAddress addr) throws IOException {
- ch = SocketChannel.open();
+ public void init(long timeout) {
+ ConnectFuture[] futs = new ConnectFuture[nodes.size()];
+
+ UUID locId = spi.getSpiContext().localNode().id();
+
+ for (int i = 0; i < nodes.size(); i++) {
+ ClusterNode node = nodes.get(i);
+
+ if (!node.id().equals(locId)) {
+ if (spi.getSpiContext().node(node.id()) == null) {
+ receivedConnectionStatus(i, false);
+
+ continue;
+ }
+
+ Collection<InetSocketAddress> addrs;
+
+ try {
+ addrs = spi.nodeAddresses(node, false);
+ }
+ catch (Exception e) {
+ U.error(log, "Failed to get node addresses: " + node, e);
- ch.configureBlocking(false);
+ receivedConnectionStatus(i, false);
- ch.socket().setTcpNoDelay(true);
- ch.socket().setKeepAlive(false);
+ continue;
+ }
- boolean connect = ch.connect(addr);
+ if (addrs.size() == 1) {
+ SingleAddressConnectFuture fut = new SingleAddressConnectFuture(i);
- if (!connect) {
- sesMeta = new GridLeanMap<>(2);
+ fut.init(addrs.iterator().next());
- sesMeta.put(TcpCommunicationSpi.CONN_IDX_META, TcpCommunicationSpi.CONN_CHECK_DUMMY_KEY);
- sesMeta.put(TcpCommunicationSpi.SES_FUT_META, this);
+ futs[i] = fut;
+ }
+ else {
+ MultipleAddressesConnectFuture fut = new MultipleAddressesConnectFuture(i);
- nioSrvr.createSession(ch, sesMeta, true, new IgniteInClosure<IgniteInternalFuture<GridNioSession>>() {
- @Override public void apply(IgniteInternalFuture<GridNioSession> fut) {
- if (fut.error() != null)
- onDone(false);
+ fut.init(addrs);
}
- });
+ }
+ else
+ receivedConnectionStatus(i, true);
+ }
+
+ this.futs = futs;
+
+ if (!isDone()) {
+ endTime = System.currentTimeMillis() - timeout;
+
+ spi.getSpiContext().addTimeoutObject(this);
}
}
/**
+ * @param idx Node index.
+ * @param res Success flag.
+ */
+ private void receivedConnectionStatus(int idx, boolean res) {
+ assert resCntr.get() < nodes.size();
+
+ synchronized (resBitSet) {
+ resBitSet.set(idx, res);
+ }
+
+ if (resCntr.incrementAndGet() == nodes.size())
+ onDone(resBitSet);
+ }
+
+ /**
+ * @param nodeIdx Node index.
+ * @return Node ID.
+ */
+ private UUID nodeId(int nodeIdx) {
+ return nodes.get(nodeIdx).id();
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteUuid id() {
+ return timeoutObjId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long endTime() {
+ return endTime;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onTimeout() {
+ if (!isDone())
+ return;
+
+ ConnectFuture[] futs = this.futs;
+
+ for (int i = 0; i < futs.length; i++) {
+ ConnectFuture fut = futs[i];
+
+ if (fut != null)
+ fut.onTimeout();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean onDone(@Nullable BitSet res, @Nullable Throwable err) {
+ if (super.onDone(res, err)) {
+ spi.getSpiContext().removeTimeoutObject(this);
+
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
*
*/
- public void onTimeout() {
- if (super.onDone(false))
- nioSrvr.cancelConnect(ch, sesMeta);
+ private interface ConnectFuture {
+ /**
+ *
+ */
+ void onTimeout();
}
/**
- * @param rmtNodeId
+ *
*/
- public void onConnected(UUID rmtNodeId) {
- onDone(nodeId.equals(rmtNodeId));
+ private class SingleAddressConnectFuture implements TcpCommunicationNodeConnectionCheckFuture, ConnectFuture {
+ /** */
+ final int nodeIdx;
+
+ /** */
+ volatile int done;
+
+ /** */
+ Map<Integer, Object> sesMeta;
+
+ /** */
+ private SocketChannel ch;
+
+ /**
+ * @param nodeIdx Node index.
+ */
+ SingleAddressConnectFuture(int nodeIdx) {
+ this.nodeIdx = nodeIdx;
+ }
+
+ /**
+ * @param addr Node address.
+ */
+ public void init(InetSocketAddress addr) {
+ boolean connect;
+
+ try {
+ ch = SocketChannel.open();
+
+ ch.configureBlocking(false);
+
+ ch.socket().setTcpNoDelay(true);
+ ch.socket().setKeepAlive(false);
+
+ connect = ch.connect(addr);
+ }
+ catch (Exception e) {
+ finish(false);
+
+ return;
+ }
+
+ if (!connect) {
+ sesMeta = new GridLeanMap<>(3);
+
+ // Set dummy key to identify connection-check outgoing connection.
+ sesMeta.put(TcpCommunicationSpi.CONN_IDX_META, CONN_CHECK_DUMMY_KEY);
+ sesMeta.put(SES_FUT_META, this);
+
+ nioSrvr.createSession(ch, sesMeta, true, new IgniteInClosure<IgniteInternalFuture<GridNioSession>>() {
+ @Override public void apply(IgniteInternalFuture<GridNioSession> fut) {
+ if (fut.error() != null)
+ finish(false);
+ }
+ });
+ }
+ }
+
+ /**
+ *
+ */
+ void cancel() {
+ if (finish(false))
+ nioSrvr.cancelConnect(ch, sesMeta);
+ }
+
+ /** {@inheritDoc} */
+ public void onTimeout() {
+ cancel();
+ }
+
+ /** {@inheritDoc} */
+ public void onConnected(UUID rmtNodeId) {
+ finish(nodeId(nodeIdx).equals(rmtNodeId));
+ }
+
+ /**
+ * @param res Result.
+ * @return {@code True} if result was set by this call.
+ */
+ boolean finish(boolean res) {
+ if (connFutDoneUpdater.compareAndSet(this, 0, 1)) {
+ onStatusReceived(res);
+
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
+ * @param res Result.
+ */
+ void onStatusReceived(boolean res) {
+ receivedConnectionStatus(nodeIdx, res);
+ }
+ }
+
+ /**
+ *
+ */
+ private class MultipleAddressesConnectFuture implements ConnectFuture {
+ /** */
+ volatile int resCnt;
+
+ /** */
+ volatile SingleAddressConnectFuture[] futs;
+
+ /** */
+ final int nodeIdx;
+
+ /**
+ * @param nodeIdx Node index.
+ */
+ MultipleAddressesConnectFuture(int nodeIdx) {
+ this.nodeIdx = nodeIdx;
+
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onTimeout() {
+ SingleAddressConnectFuture[] futs = this.futs;
+
+ for (int i = 0; i < futs.length; i++) {
+ ConnectFuture fut = futs[i];
+
+ if (fut != null)
+ fut.onTimeout();
+ }
+ }
+
+ /**
+ * @param addrs Node addresses.
+ */
+ void init(Collection<InetSocketAddress> addrs) {
+ SingleAddressConnectFuture[] futs = new SingleAddressConnectFuture[addrs.size()];
+
+ int idx = 0;
+
+ for (InetSocketAddress addr : addrs) {
+ SingleAddressConnectFuture fut = new SingleAddressConnectFuture(nodeIdx) {
+ @Override void onStatusReceived(boolean res) {
+ receivedAddressStatus(res);
+ }
+ };
+
+ fut.init(addr);
+
+ futs[idx++] = fut;
+
+ if (done())
+ return;
+ }
+
+ this.futs = futs;
+
+ // Close race.
+ if (done())
+ cancelFutures();
+ }
+
+ /**
+ * @return {@code True}
+ */
+ private boolean done() {
+ int resCnt0 = resCnt;
+
+ return resCnt0 == Integer.MAX_VALUE || resCnt0 == futs.length;
+ }
+
+ /**
+ *
+ */
+ private void cancelFutures() {
+ SingleAddressConnectFuture[] futs = this.futs;
+
+ if (futs != null) {
+ for (int i = 0; i < futs.length; i++) {
+ SingleAddressConnectFuture fut = futs[i];
+
+ fut.cancel();
+ }
+ }
+ }
+
+ /**
+ * @param res Result.
+ */
+ void receivedAddressStatus(boolean res) {
+ if (res) {
+ for (;;) {
+ int resCnt0 = resCnt;
+
+ if (resCnt0 == Integer.MAX_VALUE)
+ return;
+
+ if (connResCntUpdater.compareAndSet(this, resCnt0, Integer.MAX_VALUE)) {
+ receivedConnectionStatus(nodeIdx, true);
+
+ cancelFutures(); // Cancel others connects if they are still in progress.
+
+ return;
+ }
+ }
+ }
+ else {
+ for (;;) {
+ int resCnt0 = resCnt;
+
+ if (resCnt0 == Integer.MAX_VALUE)
+ return;
+
+ int resCnt1 = resCnt0 + 1;
+
+ if (connResCntUpdater.compareAndSet(this, resCnt0, resCnt1)) {
+ if (resCnt1 == futs.length)
+ receivedConnectionStatus(nodeIdx, false);
+
+ return;
+ }
+ }
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ed492a4f/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationNodeConnectionCheckFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationNodeConnectionCheckFuture.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationNodeConnectionCheckFuture.java
new file mode 100644
index 0000000..c034782
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationNodeConnectionCheckFuture.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.communication.tcp.internal;
+
+import java.util.UUID;
+
+/**
+ *
+ */
+public interface TcpCommunicationNodeConnectionCheckFuture {
+ /**
+ * @param nodeId Remote node ID.
+ */
+ public void onConnected(UUID nodeId);
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ed492a4f/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
index caa2d87..e89a4c8 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
@@ -18,20 +18,23 @@
package org.apache.ignite.spi.communication.tcp;
import java.util.ArrayList;
-import java.util.HashSet;
+import java.util.BitSet;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CyclicBarrier;
import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.util.nio.GridCommunicationClient;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.spi.IgniteSpiAdapter;
import org.apache.ignite.spi.communication.CommunicationSpi;
import org.apache.ignite.spi.communication.GridAbstractCommunicationSelfTest;
-import org.apache.ignite.spi.communication.GridTestMessage;
import org.apache.ignite.testframework.GridTestUtils;
/**
@@ -39,7 +42,7 @@ import org.apache.ignite.testframework.GridTestUtils;
*/
abstract class GridTcpCommunicationSpiAbstractTest extends GridAbstractCommunicationSelfTest<CommunicationSpi> {
/** */
- private static final int SPI_COUNT = 2;
+ private static final int SPI_COUNT = 3;
/** */
public static final int IDLE_CONN_TIMEOUT = 2000;
@@ -92,33 +95,65 @@ abstract class GridTcpCommunicationSpiAbstractTest extends GridAbstractCommunica
}
}
- public void testConnectionCheck() {
- for (Map.Entry<UUID, CommunicationSpi<Message>> entry : spis.entrySet()) {
- UUID id = entry.getKey();
+ /**
+ *
+ */
+ public void testCheckConnection1() {
+ for (int i = 0; i < 100; i++) {
+ for (Map.Entry<UUID, CommunicationSpi<Message>> entry : spis.entrySet()) {
+ TcpCommunicationSpi spi = (TcpCommunicationSpi)entry.getValue();
- TcpCommunicationSpi spi = (TcpCommunicationSpi)entry.getValue();
+ List<ClusterNode> checkNodes = new ArrayList<>(nodes);
- List<ClusterNode> checkNodes = new ArrayList<>();
+ assert checkNodes.size() > 1;
- for (ClusterNode node : nodes) {
- if (!id.equals(node.id()))
- checkNodes.add(node);
+ IgniteFuture<BitSet> fut = spi.checkConnection(checkNodes);
+
+ BitSet res = fut.get();
+
+ for (int n = 0; n < checkNodes.size(); n++)
+ assertTrue(res.get(n));
}
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testCheckConnection2() throws Exception {
+ final int THREADS = spis.size();
+
+ final CyclicBarrier b = new CyclicBarrier(THREADS);
+
+ List<IgniteInternalFuture> futs = new ArrayList<>();
- spi.checkConnection(checkNodes);
-
- break;
-// for (ClusterNode node : nodes) {
-// synchronized (mux) {
-// if (!msgDestMap.containsKey(entry.getKey()))
-// msgDestMap.put(entry.getKey(), new HashSet<UUID>());
-//
-// msgDestMap.get(entry.getKey()).add(node.id());
-// }
-//
-// entry.getValue().sendMessage(node, new GridTestMessage(entry.getKey(), msgId++, 0));
-// }
+ for (Map.Entry<UUID, CommunicationSpi<Message>> entry : spis.entrySet()) {
+ final TcpCommunicationSpi spi = (TcpCommunicationSpi)entry.getValue();
+
+ futs.add(GridTestUtils.runAsync(new Callable() {
+ @Override public Object call() throws Exception {
+ List<ClusterNode> checkNodes = new ArrayList<>(nodes);
+
+ assert checkNodes.size() > 1;
+
+ b.await();
+
+ for (int i = 0; i < 100; i++) {
+ IgniteFuture<BitSet> fut = spi.checkConnection(checkNodes);
+
+ BitSet res = fut.get();
+
+ for (int n = 0; n < checkNodes.size(); n++)
+ assertTrue(res.get(n));
+ }
+
+ return null;
+ }
+ }));
}
+
+ for (IgniteInternalFuture f : futs)
+ f.get();
}
/** {@inheritDoc} */