You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/12/20 08:04:37 UTC

[29/50] [abbrv] ignite git commit: zk

zk


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/60b625a9
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/60b625a9
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/60b625a9

Branch: refs/heads/ignite-zk
Commit: 60b625a99f956db75f1c2abfdbeb59b1df91166d
Parents: 0b22522
Author: sboikov <sb...@gridgain.com>
Authored: Fri Dec 15 14:16:36 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Dec 15 14:16:36 2017 +0300

----------------------------------------------------------------------
 .../communication/GridIoMessageFactory.java     |   6 ++
 .../ignite/internal/util/nio/GridNioServer.java |   2 +-
 .../communication/tcp/TcpCommunicationSpi.java  |  69 ++++++-------
 .../TcpCommunicationConnectionCheckFuture.java  | 100 +++++++++++++++++++
 .../ZkCommunicationErrorProcessFuture.java      |   2 +-
 .../GridTcpCommunicationSpiAbstractTest.java    |  38 ++++++-
 .../ZookeeperDiscoverySpiBasicTest.java         |   6 +-
 7 files changed, 177 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/60b625a9/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 7761020..37a7d8e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -155,6 +155,7 @@ import org.apache.ignite.lang.IgniteOutClosure;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.plugin.extensions.communication.MessageFactory;
 import org.apache.ignite.spi.collision.jobstealing.JobStealingRequest;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationConnectionCheckMessage;
 import org.apache.ignite.spi.communication.tcp.HandshakeMessage;
 import org.apache.ignite.spi.communication.tcp.HandshakeMessage2;
 import org.apache.ignite.spi.communication.tcp.NodeIdMessage;
@@ -186,6 +187,11 @@ public class GridIoMessageFactory implements MessageFactory {
         switch (type) {
             // -54 is reserved for SQL.
             // -46 ... -51 - snapshot messages.
+            case -62:
+                msg = new TcpCommunicationConnectionCheckMessage();
+
+                break;
+
             case -61:
                 msg = new IgniteDiagnosticMessage();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/60b625a9/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index 14d55d8..9784549 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -842,7 +842,7 @@ public class GridNioServer<T> {
                 NioOperationFuture<GridNioSession> req = new NioOperationFuture<>(ch, false, meta);
 
                 if (async) {
-                    // assert meta != null;
+                    assert meta != null;
 
                     req.op = NioOperation.CONNECT;
                 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/60b625a9/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index ca73e7b..d4ccc33 100755
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -135,6 +135,7 @@ import org.apache.ignite.spi.IgniteSpiThread;
 import org.apache.ignite.spi.IgniteSpiTimeoutObject;
 import org.apache.ignite.spi.communication.CommunicationListener;
 import org.apache.ignite.spi.communication.CommunicationSpi;
+import org.apache.ignite.spi.communication.tcp.internal.TcpCommunicationConnectionCheckFuture;
 import org.apache.ignite.spi.discovery.DiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.thread.IgniteThread;
@@ -308,11 +309,17 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
     private static final IgniteProductVersion VERSION_SINCE_CLIENT_COULD_WAIT_TO_CONNECT = IgniteProductVersion.fromString("2.1.4");
 
     /** Connection index meta for session. */
-    private static final int CONN_IDX_META = GridNioSessionMetaKey.nextUniqueKey();
+    public static final int CONN_IDX_META = GridNioSessionMetaKey.nextUniqueKey();
 
     /** Message tracker meta for session. */
     private static final int TRACKER_META = GridNioSessionMetaKey.nextUniqueKey();
 
+    /** Session future. */
+    public static final int SES_FUT_META = GridNioSessionMetaKey.nextUniqueKey();
+
+    /** */
+    public static final ConnectionKey CONN_CHECK_DUMMY_KEY = new ConnectionKey(null, -1, -1);
+
     /**
      * Default local port range (value is <tt>100</tt>).
      * See {@link #setLocalPortRange(int)} for details.
@@ -396,9 +403,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
                     }
                 }
                 else {
-                    if (log.isInfoEnabled())
-                        log.info("Established outgoing communication connection [locAddr=" + ses.localAddress() +
-                            ", rmtAddr=" + ses.remoteAddress() + ']');
+                    ConnectionKey connId = ses.meta(CONN_IDX_META);
+
+                    if (connId != CONN_CHECK_DUMMY_KEY) {
+                        if (log.isInfoEnabled())
+                            log.info("Established outgoing communication connection [locAddr=" + ses.localAddress() +
+                                ", rmtAddr=" + ses.remoteAddress() + ']');
+                    }
                 }
             }
 
@@ -676,7 +687,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
                 ConnectionKey connKey = ses.meta(CONN_IDX_META);
 
                 if (connKey == null) {
-                    assert ses.accepted() : ses;
+                    assert ses.accepted() : msg;
 
                     if (!connectGate.tryEnter()) {
                         if (log.isDebugEnabled())
@@ -736,6 +747,17 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
                                 recovery.lastAcknowledged(rcvCnt);
                             }
                         }
+                        else if (connKey == CONN_CHECK_DUMMY_KEY) {
+                            assert msg instanceof NodeIdMessage : msg;
+
+                            TcpCommunicationConnectionCheckFuture fut = ses.meta(SES_FUT_META);
+
+                            fut.onConnected(U.bytesToUuid(((NodeIdMessage) msg).nodeIdBytes, 0));
+
+                            nioSrvr.closeFromWorkerThread(ses);
+
+                            return;
+                        }
                     }
 
                     IgniteRunnable c;
@@ -2566,45 +2588,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
         sendMessage0(node, msg, null);
     }
 
-    public IgniteFuture<BitSet> pingNodes(List<ClusterNode> nodes) {
+    public IgniteFuture<BitSet> checkConnection(List<ClusterNode> nodes) {
         ClusterNode node = nodes.get(0);
 
         try {
-            LinkedHashSet<InetSocketAddress> addrs = nodeAddresses(node);
-
-            // /172.25.4.90:45012
+            Collection<InetSocketAddress> addrs = nodeAddresses(node);
 
-            for (InetSocketAddress addr : addrs) {
-                SocketChannel ch = SocketChannel.open();
-
-                ch.configureBlocking(false);
-
-                ch.socket().setTcpNoDelay(tcpNoDelay);
-                ch.socket().setKeepAlive(true);
-
-                boolean connect = ch.connect(addr);
-
-                if (!connect) {
-                    GridNioFuture<GridNioSession> fut = nioSrvr.createSession(ch, null, true, new IgniteInClosure<IgniteInternalFuture<GridNioSession>>() {
-                        @Override public void apply(IgniteInternalFuture<GridNioSession> fut) {
-                            try {
-                                GridNioSession ses = fut.get();
-
-                                log.info("Ping connected");
-
-                                nioSrvr.closeFromWorkerThread(ses);
-                            }
-                            catch (Exception e) {
-                                e.printStackTrace();
-                            }
-                        }
-                    });
-
-                    fut.get();
-                }
-                else
-                    log.info("Connected");
-            }
         }
         catch (Exception e) {
             throw new IgniteSpiException(e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/60b625a9/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java
new file mode 100644
index 0000000..e08ea13
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.communication.tcp.internal;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.channels.SocketChannel;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.GridLeanMap;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.nio.GridNioServer;
+import org.apache.ignite.internal.util.nio.GridNioSession;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+
+/**
+ *
+ */
+public class TcpCommunicationConnectionCheckFuture extends GridFutureAdapter<Boolean> {
+    /** */
+    private final UUID nodeId;
+
+    /** */
+    private final GridNioServer nioSrvr;
+
+    /** */
+    private Map<Integer, Object> sesMeta;
+
+    /** */
+    private SocketChannel ch;
+
+    /**
+     * @param nodeId Remote note ID.
+     */
+    public TcpCommunicationConnectionCheckFuture(GridNioServer nioSrvr, UUID nodeId) {
+        this.nioSrvr = nioSrvr;
+        this.nodeId = nodeId;
+    }
+
+    /**
+     * @param addr
+     * @throws IOException
+     */
+    public void init(InetSocketAddress addr) throws IOException {
+        ch = SocketChannel.open();
+
+        ch.configureBlocking(false);
+
+        ch.socket().setTcpNoDelay(true);
+        ch.socket().setKeepAlive(false);
+
+        boolean connect = ch.connect(addr);
+
+        if (!connect) {
+            sesMeta = new GridLeanMap<>(2);
+
+            sesMeta.put(TcpCommunicationSpi.CONN_IDX_META, TcpCommunicationSpi.CONN_CHECK_DUMMY_KEY);
+            sesMeta.put(TcpCommunicationSpi.SES_FUT_META, this);
+
+            nioSrvr.createSession(ch, sesMeta, true, new IgniteInClosure<IgniteInternalFuture<GridNioSession>>() {
+                @Override public void apply(IgniteInternalFuture<GridNioSession> fut) {
+                    if (fut.error() != null)
+                        onDone(false);
+                }
+            });
+        }
+    }
+
+    /**
+     *
+     */
+    public void onTimeout() {
+        if (super.onDone(false))
+            nioSrvr.cancelConnect(ch, sesMeta);
+    }
+
+    /**
+     * @param rmtNodeId
+     */
+    public void onConnected(UUID rmtNodeId) {
+        onDone(nodeId.equals(rmtNodeId));
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/60b625a9/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java
index 6812ab0..d7d4bd1 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java
@@ -151,7 +151,7 @@ class ZkCommunicationErrorProcessFuture extends GridFutureAdapter<Void> implemen
         throws Exception {
         TcpCommunicationSpi spi = (TcpCommunicationSpi)impl.spi.ignite().configuration().getCommunicationSpi();
 
-        spi.pingNodes(nodes);
+        spi.checkConnection(nodes);
 
         ZkDistributedCollectDataFuture.saveNodeResult(futPath, rtState.zkClient, locNodeOrder, null);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/60b625a9/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
index 54b3a78..caa2d87 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
@@ -17,14 +17,21 @@
 
 package org.apache.ignite.spi.communication.tcp;
 
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentMap;
+import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.util.nio.GridCommunicationClient;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.spi.IgniteSpiAdapter;
 import org.apache.ignite.spi.communication.CommunicationSpi;
 import org.apache.ignite.spi.communication.GridAbstractCommunicationSelfTest;
+import org.apache.ignite.spi.communication.GridTestMessage;
 import org.apache.ignite.testframework.GridTestUtils;
 
 /**
@@ -32,7 +39,7 @@ import org.apache.ignite.testframework.GridTestUtils;
  */
 abstract class GridTcpCommunicationSpiAbstractTest extends GridAbstractCommunicationSelfTest<CommunicationSpi> {
     /** */
-    private static final int SPI_COUNT = 3;
+    private static final int SPI_COUNT = 2;
 
     /** */
     public static final int IDLE_CONN_TIMEOUT = 2000;
@@ -85,6 +92,35 @@ abstract class GridTcpCommunicationSpiAbstractTest extends GridAbstractCommunica
         }
     }
 
+    public void testConnectionCheck() {
+        for (Map.Entry<UUID, CommunicationSpi<Message>> entry : spis.entrySet()) {
+            UUID id = entry.getKey();
+
+            TcpCommunicationSpi spi = (TcpCommunicationSpi)entry.getValue();
+
+            List<ClusterNode> checkNodes = new ArrayList<>();
+
+            for (ClusterNode node : nodes) {
+                if (!id.equals(node.id()))
+                    checkNodes.add(node);
+            }
+
+            spi.checkConnection(checkNodes);
+
+            break;
+//            for (ClusterNode node : nodes) {
+//                synchronized (mux) {
+//                    if (!msgDestMap.containsKey(entry.getKey()))
+//                        msgDestMap.put(entry.getKey(), new HashSet<UUID>());
+//
+//                    msgDestMap.get(entry.getKey()).add(node.id());
+//                }
+//
+//                entry.getValue().sendMessage(node, new GridTestMessage(entry.getKey(), msgId++, 0));
+//            }
+        }
+    }
+
     /** {@inheritDoc} */
     @Override protected void afterTest() throws Exception {
         super.afterTest();

http://git-wip-us.apache.org/repos/asf/ignite/blob/60b625a9/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
index 64fcd34..a2e8784 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
@@ -1929,7 +1929,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest {
 
        nodes.add(ignite(2).cluster().localNode());
 
-       spi.pingNodes(nodes);
+       // spi.pingNodes(nodes);
     }
 
     /**
@@ -2628,7 +2628,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest {
         }
 
         /** {@inheritDoc} */
-        @Override public IgniteFuture<BitSet> pingNodes(List<ClusterNode> nodes) {
+        @Override public IgniteFuture<BitSet> checkConnection(List<ClusterNode> nodes) {
             CountDownLatch pingLatch = this.pingLatch;
 
             try {
@@ -2639,7 +2639,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest {
                 throw new IgniteException(e);
             }
 
-            return super.pingNodes(nodes);
+            return super.checkConnection(nodes);
         }
     }
 }