You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/12/20 08:04:37 UTC
[29/50] [abbrv] ignite git commit: zk
zk
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/60b625a9
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/60b625a9
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/60b625a9
Branch: refs/heads/ignite-zk
Commit: 60b625a99f956db75f1c2abfdbeb59b1df91166d
Parents: 0b22522
Author: sboikov <sb...@gridgain.com>
Authored: Fri Dec 15 14:16:36 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Dec 15 14:16:36 2017 +0300
----------------------------------------------------------------------
.../communication/GridIoMessageFactory.java | 6 ++
.../ignite/internal/util/nio/GridNioServer.java | 2 +-
.../communication/tcp/TcpCommunicationSpi.java | 69 ++++++-------
.../TcpCommunicationConnectionCheckFuture.java | 100 +++++++++++++++++++
.../ZkCommunicationErrorProcessFuture.java | 2 +-
.../GridTcpCommunicationSpiAbstractTest.java | 38 ++++++-
.../ZookeeperDiscoverySpiBasicTest.java | 6 +-
7 files changed, 177 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/60b625a9/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 7761020..37a7d8e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -155,6 +155,7 @@ import org.apache.ignite.lang.IgniteOutClosure;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
import org.apache.ignite.spi.collision.jobstealing.JobStealingRequest;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationConnectionCheckMessage;
import org.apache.ignite.spi.communication.tcp.HandshakeMessage;
import org.apache.ignite.spi.communication.tcp.HandshakeMessage2;
import org.apache.ignite.spi.communication.tcp.NodeIdMessage;
@@ -186,6 +187,11 @@ public class GridIoMessageFactory implements MessageFactory {
switch (type) {
// -54 is reserved for SQL.
// -46 ... -51 - snapshot messages.
+ case -62:
+ msg = new TcpCommunicationConnectionCheckMessage();
+
+ break;
+
case -61:
msg = new IgniteDiagnosticMessage();
http://git-wip-us.apache.org/repos/asf/ignite/blob/60b625a9/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index 14d55d8..9784549 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -842,7 +842,7 @@ public class GridNioServer<T> {
NioOperationFuture<GridNioSession> req = new NioOperationFuture<>(ch, false, meta);
if (async) {
- // assert meta != null;
+ assert meta != null;
req.op = NioOperation.CONNECT;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/60b625a9/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index ca73e7b..d4ccc33 100755
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -135,6 +135,7 @@ import org.apache.ignite.spi.IgniteSpiThread;
import org.apache.ignite.spi.IgniteSpiTimeoutObject;
import org.apache.ignite.spi.communication.CommunicationListener;
import org.apache.ignite.spi.communication.CommunicationSpi;
+import org.apache.ignite.spi.communication.tcp.internal.TcpCommunicationConnectionCheckFuture;
import org.apache.ignite.spi.discovery.DiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.thread.IgniteThread;
@@ -308,11 +309,17 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
private static final IgniteProductVersion VERSION_SINCE_CLIENT_COULD_WAIT_TO_CONNECT = IgniteProductVersion.fromString("2.1.4");
/** Connection index meta for session. */
- private static final int CONN_IDX_META = GridNioSessionMetaKey.nextUniqueKey();
+ public static final int CONN_IDX_META = GridNioSessionMetaKey.nextUniqueKey();
/** Message tracker meta for session. */
private static final int TRACKER_META = GridNioSessionMetaKey.nextUniqueKey();
+ /** Session future. */
+ public static final int SES_FUT_META = GridNioSessionMetaKey.nextUniqueKey();
+
+ /** */
+ public static final ConnectionKey CONN_CHECK_DUMMY_KEY = new ConnectionKey(null, -1, -1);
+
/**
* Default local port range (value is <tt>100</tt>).
* See {@link #setLocalPortRange(int)} for details.
@@ -396,9 +403,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
}
}
else {
- if (log.isInfoEnabled())
- log.info("Established outgoing communication connection [locAddr=" + ses.localAddress() +
- ", rmtAddr=" + ses.remoteAddress() + ']');
+ ConnectionKey connId = ses.meta(CONN_IDX_META);
+
+ if (connId != CONN_CHECK_DUMMY_KEY) {
+ if (log.isInfoEnabled())
+ log.info("Established outgoing communication connection [locAddr=" + ses.localAddress() +
+ ", rmtAddr=" + ses.remoteAddress() + ']');
+ }
}
}
@@ -676,7 +687,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
ConnectionKey connKey = ses.meta(CONN_IDX_META);
if (connKey == null) {
- assert ses.accepted() : ses;
+ assert ses.accepted() : msg;
if (!connectGate.tryEnter()) {
if (log.isDebugEnabled())
@@ -736,6 +747,17 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
recovery.lastAcknowledged(rcvCnt);
}
}
+ else if (connKey == CONN_CHECK_DUMMY_KEY) {
+ assert msg instanceof NodeIdMessage : msg;
+
+ TcpCommunicationConnectionCheckFuture fut = ses.meta(SES_FUT_META);
+
+ fut.onConnected(U.bytesToUuid(((NodeIdMessage) msg).nodeIdBytes, 0));
+
+ nioSrvr.closeFromWorkerThread(ses);
+
+ return;
+ }
}
IgniteRunnable c;
@@ -2566,45 +2588,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
sendMessage0(node, msg, null);
}
- public IgniteFuture<BitSet> pingNodes(List<ClusterNode> nodes) {
+ public IgniteFuture<BitSet> checkConnection(List<ClusterNode> nodes) {
ClusterNode node = nodes.get(0);
try {
- LinkedHashSet<InetSocketAddress> addrs = nodeAddresses(node);
-
- // /172.25.4.90:45012
+ Collection<InetSocketAddress> addrs = nodeAddresses(node);
- for (InetSocketAddress addr : addrs) {
- SocketChannel ch = SocketChannel.open();
-
- ch.configureBlocking(false);
-
- ch.socket().setTcpNoDelay(tcpNoDelay);
- ch.socket().setKeepAlive(true);
-
- boolean connect = ch.connect(addr);
-
- if (!connect) {
- GridNioFuture<GridNioSession> fut = nioSrvr.createSession(ch, null, true, new IgniteInClosure<IgniteInternalFuture<GridNioSession>>() {
- @Override public void apply(IgniteInternalFuture<GridNioSession> fut) {
- try {
- GridNioSession ses = fut.get();
-
- log.info("Ping connected");
-
- nioSrvr.closeFromWorkerThread(ses);
- }
- catch (Exception e) {
- e.printStackTrace();
- }
- }
- });
-
- fut.get();
- }
- else
- log.info("Connected");
- }
}
catch (Exception e) {
throw new IgniteSpiException(e);
http://git-wip-us.apache.org/repos/asf/ignite/blob/60b625a9/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java
new file mode 100644
index 0000000..e08ea13
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.communication.tcp.internal;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.channels.SocketChannel;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.GridLeanMap;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.nio.GridNioServer;
+import org.apache.ignite.internal.util.nio.GridNioSession;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+
+/**
+ *
+ */
+public class TcpCommunicationConnectionCheckFuture extends GridFutureAdapter<Boolean> {
+ /** */
+ private final UUID nodeId;
+
+ /** */
+ private final GridNioServer nioSrvr;
+
+ /** */
+ private Map<Integer, Object> sesMeta;
+
+ /** */
+ private SocketChannel ch;
+
+ /**
+ * @param nodeId Remote note ID.
+ */
+ public TcpCommunicationConnectionCheckFuture(GridNioServer nioSrvr, UUID nodeId) {
+ this.nioSrvr = nioSrvr;
+ this.nodeId = nodeId;
+ }
+
+ /**
+ * @param addr
+ * @throws IOException
+ */
+ public void init(InetSocketAddress addr) throws IOException {
+ ch = SocketChannel.open();
+
+ ch.configureBlocking(false);
+
+ ch.socket().setTcpNoDelay(true);
+ ch.socket().setKeepAlive(false);
+
+ boolean connect = ch.connect(addr);
+
+ if (!connect) {
+ sesMeta = new GridLeanMap<>(2);
+
+ sesMeta.put(TcpCommunicationSpi.CONN_IDX_META, TcpCommunicationSpi.CONN_CHECK_DUMMY_KEY);
+ sesMeta.put(TcpCommunicationSpi.SES_FUT_META, this);
+
+ nioSrvr.createSession(ch, sesMeta, true, new IgniteInClosure<IgniteInternalFuture<GridNioSession>>() {
+ @Override public void apply(IgniteInternalFuture<GridNioSession> fut) {
+ if (fut.error() != null)
+ onDone(false);
+ }
+ });
+ }
+ }
+
+ /**
+ *
+ */
+ public void onTimeout() {
+ if (super.onDone(false))
+ nioSrvr.cancelConnect(ch, sesMeta);
+ }
+
+ /**
+ * @param rmtNodeId
+ */
+ public void onConnected(UUID rmtNodeId) {
+ onDone(nodeId.equals(rmtNodeId));
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/60b625a9/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java
index 6812ab0..d7d4bd1 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java
@@ -151,7 +151,7 @@ class ZkCommunicationErrorProcessFuture extends GridFutureAdapter<Void> implemen
throws Exception {
TcpCommunicationSpi spi = (TcpCommunicationSpi)impl.spi.ignite().configuration().getCommunicationSpi();
- spi.pingNodes(nodes);
+ spi.checkConnection(nodes);
ZkDistributedCollectDataFuture.saveNodeResult(futPath, rtState.zkClient, locNodeOrder, null);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/60b625a9/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
index 54b3a78..caa2d87 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
@@ -17,14 +17,21 @@
package org.apache.ignite.spi.communication.tcp;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
+import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.util.nio.GridCommunicationClient;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.spi.IgniteSpiAdapter;
import org.apache.ignite.spi.communication.CommunicationSpi;
import org.apache.ignite.spi.communication.GridAbstractCommunicationSelfTest;
+import org.apache.ignite.spi.communication.GridTestMessage;
import org.apache.ignite.testframework.GridTestUtils;
/**
@@ -32,7 +39,7 @@ import org.apache.ignite.testframework.GridTestUtils;
*/
abstract class GridTcpCommunicationSpiAbstractTest extends GridAbstractCommunicationSelfTest<CommunicationSpi> {
/** */
- private static final int SPI_COUNT = 3;
+ private static final int SPI_COUNT = 2;
/** */
public static final int IDLE_CONN_TIMEOUT = 2000;
@@ -85,6 +92,35 @@ abstract class GridTcpCommunicationSpiAbstractTest extends GridAbstractCommunica
}
}
+ public void testConnectionCheck() {
+ for (Map.Entry<UUID, CommunicationSpi<Message>> entry : spis.entrySet()) {
+ UUID id = entry.getKey();
+
+ TcpCommunicationSpi spi = (TcpCommunicationSpi)entry.getValue();
+
+ List<ClusterNode> checkNodes = new ArrayList<>();
+
+ for (ClusterNode node : nodes) {
+ if (!id.equals(node.id()))
+ checkNodes.add(node);
+ }
+
+ spi.checkConnection(checkNodes);
+
+ break;
+// for (ClusterNode node : nodes) {
+// synchronized (mux) {
+// if (!msgDestMap.containsKey(entry.getKey()))
+// msgDestMap.put(entry.getKey(), new HashSet<UUID>());
+//
+// msgDestMap.get(entry.getKey()).add(node.id());
+// }
+//
+// entry.getValue().sendMessage(node, new GridTestMessage(entry.getKey(), msgId++, 0));
+// }
+ }
+ }
+
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
super.afterTest();
http://git-wip-us.apache.org/repos/asf/ignite/blob/60b625a9/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
index 64fcd34..a2e8784 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
@@ -1929,7 +1929,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest {
nodes.add(ignite(2).cluster().localNode());
- spi.pingNodes(nodes);
+ // spi.pingNodes(nodes);
}
/**
@@ -2628,7 +2628,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest {
}
/** {@inheritDoc} */
- @Override public IgniteFuture<BitSet> pingNodes(List<ClusterNode> nodes) {
+ @Override public IgniteFuture<BitSet> checkConnection(List<ClusterNode> nodes) {
CountDownLatch pingLatch = this.pingLatch;
try {
@@ -2639,7 +2639,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest {
throw new IgniteException(e);
}
- return super.pingNodes(nodes);
+ return super.checkConnection(nodes);
}
}
}