You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/10/27 12:15:02 UTC

[1/5] ignite git commit: IGNITE-6248 - Throw exception on unsupported Java 1.7 releases.

Repository: ignite
Updated Branches:
  refs/heads/ignite-3478 d46a03950 -> 6466adf54


IGNITE-6248 - Throw exception on unsupported Java 1.7 releases.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e228ce36
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e228ce36
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e228ce36

Branch: refs/heads/ignite-3478
Commit: e228ce3600332f1873f7250b7ca2919e2f3607bc
Parents: f0500e2
Author: dkarachentsev <dk...@gridgain.com>
Authored: Fri Oct 27 11:55:04 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Oct 27 11:55:48 2017 +0300

----------------------------------------------------------------------
 .../apache/ignite/IgniteSystemProperties.java   |  6 ++++
 .../org/apache/ignite/internal/IgnitionEx.java  | 34 ++++++++++++++++++--
 2 files changed, 38 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/e228ce36/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index d7d4443..4294c71 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -656,6 +656,12 @@ public final class IgniteSystemProperties {
     /** Ignite page memory concurrency level. */
     public static final String IGNITE_OFFHEAP_LOCK_CONCURRENCY_LEVEL = "IGNITE_OFFHEAP_LOCK_CONCURRENCY_LEVEL";
 
+    /**
+     * Start Ignite on versions of JRE 7 older than 1.7.0_71. For proper work it may require
+     * disabling JIT in some places.
+     */
+    public static final String IGNITE_FORCE_START_JAVA7 = "IGNITE_FORCE_START_JAVA7";
+
     /** Returns true for system properties only avoiding sending sensitive information. */
     private static final IgnitePredicate<Map.Entry<String, String>> PROPS_FILTER = new IgnitePredicate<Map.Entry<String, String>>() {
         @Override public boolean apply(final Map.Entry<String, String> entry) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/e228ce36/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
index 67c771b..d84f8a9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
@@ -118,6 +118,7 @@ import static org.apache.ignite.IgniteState.STOPPED_ON_SEGMENTATION;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_CACHE_CLIENT;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_CONFIG_URL;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_DEP_MODE_OVERRIDE;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_FORCE_START_JAVA7;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_LOCAL_HOST;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_NO_SHUTDOWN_HOOK;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_RESTART_CODE;
@@ -189,9 +190,38 @@ public class IgnitionEx {
     static {
         // Check 1.8 just in case for forward compatibility.
         if (!U.jdkVersion().contains("1.7") &&
-            !U.jdkVersion().contains("1.8"))
-            throw new IllegalStateException("Ignite requires Java 7 or above. Current Java version " +
+            !U.jdkVersion().contains("1.8")) {
+            throw new IllegalStateException("Ignite requires Java 1.7.0_71 or above. Current Java version " +
                 "is not supported: " + U.jdkVersion());
+        }
+
+        String jreVer = U.jreVersion();
+
+        if (jreVer.startsWith("1.7")) {
+            int upd = jreVer.indexOf('_');
+            int beta = jreVer.indexOf('-');
+
+            if (beta < 0)
+                beta = jreVer.length();
+
+            if (upd > 0 && beta > 0) {
+                try {
+                    int update = Integer.parseInt(jreVer.substring(upd + 1, beta));
+
+                    boolean forceJ7 = IgniteSystemProperties.getBoolean(IGNITE_FORCE_START_JAVA7, false);
+
+                    if (update < 71 && !forceJ7) {
+                        throw new IllegalStateException("Ignite requires Java 1.7.0_71 or above. Current Java version " +
+                            "is not supported: " + jreVer);
+                    }
+                    else if (forceJ7)
+                        System.err.println("Ignite requires Java 1.7.0_71 or above. Start on your own risk.");
+                }
+                catch (NumberFormatException ignore) {
+                    // No-op
+                }
+            }
+        }
 
         // To avoid nasty race condition in UUID.randomUUID() in JDK prior to 6u34.
         // For details please see:


[3/5] ignite git commit: ignite-5860 Try process TcpDiscoveryClientReconnectMessage from socket reader instead of always processing it on coordinator.

Posted by sb...@apache.org.
ignite-5860 Try process TcpDiscoveryClientReconnectMessage from socket reader instead of always processing it on coordinator.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/56a63f80
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/56a63f80
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/56a63f80

Branch: refs/heads/ignite-3478
Commit: 56a63f80d1181e53a3e2a4c4f88e42226bbac86e
Parents: 717c549
Author: Denis Mekhanikov <dm...@gmail.com>
Authored: Fri Oct 27 14:12:36 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Oct 27 14:13:40 2017 +0300

----------------------------------------------------------------------
 .../ignite/spi/discovery/tcp/ClientImpl.java    |  52 +++-
 .../ignite/spi/discovery/tcp/ServerImpl.java    | 311 ++++++++++---------
 .../spi/discovery/tcp/TcpDiscoveryImpl.java     |   4 +-
 ...lientDiscoverySpiFailureTimeoutSelfTest.java |  20 +-
 .../tcp/TcpClientDiscoverySpiSelfTest.java      | 275 +++++++++++++++-
 5 files changed, 467 insertions(+), 195 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/56a63f80/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index 5dbfe6e..139c110 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -26,6 +26,7 @@ import java.net.Socket;
 import java.net.SocketTimeoutException;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -470,7 +471,8 @@ class ClientImpl extends TcpDiscoveryImpl {
     }
 
     /**
-     * @param recon {@code True} if reconnects.
+     * @param prevAddr If reconnect is in progress, then previous address of the router the client was connected to
+     *      and {@code null} otherwise.
      * @param timeout Timeout.
      * @return Opened socket or {@code null} if timeout.
      * @throws InterruptedException If interrupted.
@@ -478,9 +480,9 @@ class ClientImpl extends TcpDiscoveryImpl {
      * @see TcpDiscoverySpi#joinTimeout
      */
     @SuppressWarnings("BusyWait")
-    @Nullable private T2<SocketStream, Boolean> joinTopology(boolean recon, long timeout)
+    @Nullable private T2<SocketStream, Boolean> joinTopology(InetSocketAddress prevAddr, long timeout)
         throws IgniteSpiException, InterruptedException {
-        Collection<InetSocketAddress> addrs = null;
+        List<InetSocketAddress> addrs = null;
 
         long startTime = U.currentTimeMillis();
 
@@ -489,7 +491,7 @@ class ClientImpl extends TcpDiscoveryImpl {
                 throw new InterruptedException();
 
             while (addrs == null || addrs.isEmpty()) {
-                addrs = spi.resolvedAddresses();
+                addrs = new ArrayList<>(spi.resolvedAddresses());
 
                 if (!F.isEmpty(addrs)) {
                     if (log.isDebugEnabled())
@@ -509,22 +511,30 @@ class ClientImpl extends TcpDiscoveryImpl {
                 }
             }
 
-            Collection<InetSocketAddress> addrs0 = new ArrayList<>(addrs);
+            // Process failed node last.
+            if (prevAddr != null) {
+                int idx = addrs.indexOf(prevAddr);
 
-            Iterator<InetSocketAddress> it = addrs.iterator();
+                if (idx != -1)
+                    Collections.swap(addrs, idx, 0);
+            }
+
+            Collection<InetSocketAddress> addrs0 = new ArrayList<>(addrs);
 
             boolean wait = false;
 
-            while (it.hasNext()) {
+            for (int i = addrs.size() - 1; i >= 0; i--) {
                 if (Thread.currentThread().isInterrupted())
                     throw new InterruptedException();
 
-                InetSocketAddress addr = it.next();
+                InetSocketAddress addr = addrs.get(i);
+
+                boolean recon = prevAddr != null;
 
                 T3<SocketStream, Integer, Boolean> sockAndRes = sendJoinRequest(recon, addr);
 
                 if (sockAndRes == null) {
-                    it.remove();
+                    addrs.remove(i);
 
                     continue;
                 }
@@ -852,8 +862,8 @@ class ClientImpl extends TcpDiscoveryImpl {
     }
 
     /** {@inheritDoc} */
-    @Override protected IgniteSpiThread workerThread() {
-        return msgWorker;
+    @Override protected Collection<IgniteSpiThread> threads() {
+        return Arrays.asList(sockWriter, msgWorker);
     }
 
     /**
@@ -1336,15 +1346,20 @@ class ClientImpl extends TcpDiscoveryImpl {
         private boolean clientAck;
 
         /** */
-        private boolean join;
+        private final boolean join;
+
+        /** */
+        private final InetSocketAddress prevAddr;
 
         /**
          * @param join {@code True} if reconnects during join.
+         * @param prevAddr Address of the node, that this client was previously connected to.
          */
-        protected Reconnector(boolean join) {
+        protected Reconnector(boolean join, InetSocketAddress prevAddr) {
             super(spi.ignite().name(), "tcp-client-disco-reconnector", log);
 
             this.join = join;
+            this.prevAddr = prevAddr;
         }
 
         /**
@@ -1374,7 +1389,7 @@ class ClientImpl extends TcpDiscoveryImpl {
 
             try {
                 while (true) {
-                    T2<SocketStream, Boolean> joinRes = joinTopology(true, timeout);
+                    T2<SocketStream, Boolean> joinRes = joinTopology(prevAddr, timeout);
 
                     if (joinRes == null) {
                         if (join) {
@@ -1609,6 +1624,10 @@ class ClientImpl extends TcpDiscoveryImpl {
                     }
                     else if (msg instanceof SocketClosedMessage) {
                         if (((SocketClosedMessage)msg).sock == currSock) {
+                            Socket sock = currSock.sock;
+
+                            InetSocketAddress prevAddr = new InetSocketAddress(sock.getInetAddress(), sock.getPort());
+
                             currSock = null;
 
                             boolean join = joinLatch.getCount() > 0;
@@ -1637,8 +1656,7 @@ class ClientImpl extends TcpDiscoveryImpl {
 
                                     assert reconnector == null;
 
-                                    final Reconnector reconnector = new Reconnector(join);
-                                    this.reconnector = reconnector;
+                                    reconnector = new Reconnector(join, prevAddr);
                                     reconnector.start();
                                 }
                             }
@@ -1811,7 +1829,7 @@ class ClientImpl extends TcpDiscoveryImpl {
             T2<SocketStream, Boolean> joinRes;
 
             try {
-                joinRes = joinTopology(false, spi.joinTimeout);
+                joinRes = joinTopology(null, spi.joinTimeout);
             }
             catch (IgniteSpiException e) {
                 joinError(e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/56a63f80/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index efe531a..1c3ec2e 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -219,6 +219,9 @@ class ServerImpl extends TcpDiscoveryImpl {
     /** Pending custom messages that should not be sent between NodeAdded and NodeAddFinished messages. */
     private Queue<TcpDiscoveryCustomEventMessage> pendingCustomMsgs = new ArrayDeque<>();
 
+    /** Messages history used for client reconnect. */
+    private final EnsuredMessageHistory msgHist = new EnsuredMessageHistory();
+
     /** If non-shared IP finder is used this flag shows whether IP finder contains local address. */
     private boolean ipFinderHasLocAddr;
 
@@ -1663,8 +1666,23 @@ class ServerImpl extends TcpDiscoveryImpl {
     }
 
     /** {@inheritDoc} */
-    @Override protected IgniteSpiThread workerThread() {
-        return msgWorker;
+    @Override protected Collection<IgniteSpiThread> threads() {
+        Collection<IgniteSpiThread> threads;
+
+        synchronized (mux) {
+            threads = new ArrayList<>(readers.size() + clientMsgWorkers.size() + 4);
+            threads.addAll(readers);
+        }
+
+        threads.addAll(clientMsgWorkers.values());
+        threads.add(tcpSrvr);
+        threads.add(ipFinderCleaner);
+        threads.add(msgWorker);
+        threads.add(statsPrinter);
+
+        threads.removeAll(Collections.<IgniteSpiThread>singleton(null));
+
+        return threads;
     }
 
     /**
@@ -2122,7 +2140,9 @@ class ServerImpl extends TcpDiscoveryImpl {
             else if (msg instanceof TcpDiscoveryNodeFailedMessage)
                 clearClientAddFinished(((TcpDiscoveryNodeFailedMessage)msg).failedNodeId());
 
-            msgs.add(msg);
+            synchronized (msgs) {
+                msgs.add(msg);
+            }
         }
 
         /**
@@ -2161,14 +2181,16 @@ class ServerImpl extends TcpDiscoveryImpl {
                 // Client connection failed before it received TcpDiscoveryNodeAddedMessage.
                 List<TcpDiscoveryAbstractMessage> res = null;
 
-                for (TcpDiscoveryAbstractMessage msg : msgs) {
-                    if (msg instanceof TcpDiscoveryNodeAddedMessage) {
-                        if (node.id().equals(((TcpDiscoveryNodeAddedMessage)msg).node().id()))
-                            res = new ArrayList<>(msgs.size());
-                    }
+                synchronized (msgs) {
+                    for (TcpDiscoveryAbstractMessage msg : msgs) {
+                        if (msg instanceof TcpDiscoveryNodeAddedMessage) {
+                            if (node.id().equals(((TcpDiscoveryNodeAddedMessage)msg).node().id()))
+                                res = new ArrayList<>(msgs.size());
+                        }
 
-                    if (res != null)
-                        res.add(prepare(msg, node.id()));
+                        if (res != null)
+                            res.add(prepare(msg, node.id()));
+                    }
                 }
 
                 if (log.isDebugEnabled()) {
@@ -2181,20 +2203,26 @@ class ServerImpl extends TcpDiscoveryImpl {
                 return res;
             }
             else {
-                if (msgs.isEmpty())
-                    return Collections.emptyList();
+                Collection<TcpDiscoveryAbstractMessage> cp;
 
-                Collection<TcpDiscoveryAbstractMessage> cp = new ArrayList<>(msgs.size());
+                boolean skip;
 
-                boolean skip = true;
+                synchronized (msgs) {
+                    if (msgs.isEmpty())
+                        return Collections.emptyList();
 
-                for (TcpDiscoveryAbstractMessage msg : msgs) {
-                    if (skip) {
-                        if (msg.id().equals(lastMsgId))
-                            skip = false;
+                    cp = new ArrayList<>(msgs.size());
+
+                    skip = true;
+
+                    for (TcpDiscoveryAbstractMessage msg : msgs) {
+                        if (skip) {
+                            if (msg.id().equals(lastMsgId))
+                                skip = false;
+                        }
+                        else
+                            cp.add(prepare(msg, node.id()));
                     }
-                    else
-                        cp.add(prepare(msg, node.id()));
                 }
 
                 cp = !skip ? cp : null;
@@ -2483,9 +2511,6 @@ class ServerImpl extends TcpDiscoveryImpl {
         /** Pending messages. */
         private final PendingMessages pendingMsgs = new PendingMessages();
 
-        /** Messages history used for client reconnect. */
-        private final EnsuredMessageHistory msgHist = new EnsuredMessageHistory();
-
         /** Last message that updated topology. */
         private TcpDiscoveryAbstractMessage lastMsg;
 
@@ -2659,8 +2684,10 @@ class ServerImpl extends TcpDiscoveryImpl {
             if (msg instanceof TcpDiscoveryJoinRequestMessage)
                 processJoinRequestMessage((TcpDiscoveryJoinRequestMessage)msg);
 
-            else if (msg instanceof TcpDiscoveryClientReconnectMessage)
-                processClientReconnectMessage((TcpDiscoveryClientReconnectMessage)msg);
+            else if (msg instanceof TcpDiscoveryClientReconnectMessage) {
+                if (sendMessageToRemotes(msg))
+                    sendMessageAcrossRing(msg);
+            }
 
             else if (msg instanceof TcpDiscoveryNodeAddedMessage)
                 processNodeAddedMessage((TcpDiscoveryNodeAddedMessage)msg);
@@ -2695,9 +2722,6 @@ class ServerImpl extends TcpDiscoveryImpl {
             else
                 assert false : "Unknown message type: " + msg.getClass().getSimpleName();
 
-            if (ensured && redirectToClients(msg))
-                msgHist.add(msg);
-
             if (msg.senderNodeId() != null && !msg.senderNodeId().equals(getLocalNodeId())) {
                 // Received a message from remote node.
                 onMessageExchanged();
@@ -2730,6 +2754,9 @@ class ServerImpl extends TcpDiscoveryImpl {
          */
         private void sendMessageToClients(TcpDiscoveryAbstractMessage msg) {
             if (redirectToClients(msg)) {
+                if (spi.ensured(msg))
+                    msgHist.add(msg);
+
                 byte[] msgBytes = null;
 
                 for (ClientMessageWorker clientMsgWorker : clientMsgWorkers.values()) {
@@ -3836,9 +3863,6 @@ class ServerImpl extends TcpDiscoveryImpl {
                 nodeAddedMsg.client(msg.client());
 
                 processNodeAddedMessage(nodeAddedMsg);
-
-                if (nodeAddedMsg.verified())
-                    msgHist.add(nodeAddedMsg);
             }
             else if (sendMessageToRemotes(msg))
                 sendMessageAcrossRing(msg);
@@ -3941,98 +3965,6 @@ class ServerImpl extends TcpDiscoveryImpl {
         }
 
         /**
-         * Processes client reconnect message.
-         *
-         * @param msg Client reconnect message.
-         */
-        private void processClientReconnectMessage(TcpDiscoveryClientReconnectMessage msg) {
-            UUID nodeId = msg.creatorNodeId();
-
-            UUID locNodeId = getLocalNodeId();
-
-            boolean isLocNodeRouter = locNodeId.equals(msg.routerNodeId());
-
-            if (!msg.verified()) {
-                TcpDiscoveryNode node = ring.node(nodeId);
-
-                assert node == null || node.isClient();
-
-                if (node != null) {
-                    node.clientRouterNodeId(msg.routerNodeId());
-                    node.clientAliveTime(spi.clientFailureDetectionTimeout());
-                }
-
-                if (isLocalNodeCoordinator()) {
-                    msg.verify(locNodeId);
-
-                    if (node != null) {
-                        Collection<TcpDiscoveryAbstractMessage> pending = msgHist.messages(msg.lastMessageId(), node);
-
-                        if (pending != null) {
-                            msg.pendingMessages(pending);
-                            msg.success(true);
-
-                            if (log.isDebugEnabled())
-                                log.debug("Accept client reconnect, restored pending messages " +
-                                    "[locNodeId=" + locNodeId + ", clientNodeId=" + nodeId + ']');
-                        }
-                        else {
-                            if (log.isDebugEnabled())
-                                log.debug("Failing reconnecting client node because failed to restore pending " +
-                                    "messages [locNodeId=" + locNodeId + ", clientNodeId=" + nodeId + ']');
-
-                            TcpDiscoveryNodeFailedMessage nodeFailedMsg = new TcpDiscoveryNodeFailedMessage(locNodeId,
-                                node.id(), node.internalOrder());
-
-                            processNodeFailedMessage(nodeFailedMsg);
-
-                            if (nodeFailedMsg.verified())
-                                msgHist.add(nodeFailedMsg);
-                        }
-                    }
-                    else if (log.isDebugEnabled())
-                        log.debug("Reconnecting client node is already failed [nodeId=" + nodeId + ']');
-
-                    if (isLocNodeRouter) {
-                        ClientMessageWorker wrk = clientMsgWorkers.get(nodeId);
-
-                        if (wrk != null)
-                            wrk.addMessage(msg);
-                        else if (log.isDebugEnabled())
-                            log.debug("Failed to reconnect client node (disconnected during the process) [locNodeId=" +
-                                locNodeId + ", clientNodeId=" + nodeId + ']');
-                    }
-                    else {
-                        if (sendMessageToRemotes(msg))
-                            sendMessageAcrossRing(msg);
-                    }
-                }
-                else {
-                    if (sendMessageToRemotes(msg))
-                        sendMessageAcrossRing(msg);
-                }
-            }
-            else {
-                if (isLocalNodeCoordinator())
-                    addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id(), false));
-
-                if (isLocNodeRouter) {
-                    ClientMessageWorker wrk = clientMsgWorkers.get(nodeId);
-
-                    if (wrk != null)
-                        wrk.addMessage(msg);
-                    else if (log.isDebugEnabled())
-                        log.debug("Failed to reconnect client node (disconnected during the process) [locNodeId=" +
-                            locNodeId + ", clientNodeId=" + nodeId + ']');
-                }
-                else {
-                    if (ring.hasRemoteNodes() && !isLocalNodeCoordinator())
-                        sendMessageAcrossRing(msg);
-                }
-            }
-        }
-
-        /**
          * Processes node added message.
          *
          * For coordinator node method marks the messages as verified for rest of nodes to apply the
@@ -4078,9 +4010,6 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                     processNodeAddFinishedMessage(addFinishMsg);
 
-                    if (addFinishMsg.verified())
-                        msgHist.add(addFinishMsg);
-
                     addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id(), false));
 
                     return;
@@ -5145,9 +5074,6 @@ class ServerImpl extends TcpDiscoveryImpl {
                                             locNodeId, clientNode.id(), clientNode.internalOrder());
 
                                         processNodeFailedMessage(nodeFailedMsg);
-
-                                        if (nodeFailedMsg.verified())
-                                            msgHist.add(nodeFailedMsg);
                                     }
                                 }
                             }
@@ -5342,9 +5268,6 @@ class ServerImpl extends TcpDiscoveryImpl {
                                 ackMsg.topologyVersion(msg.topologyVersion());
 
                                 processCustomMessage(ackMsg);
-
-                                if (ackMsg.verified())
-                                    msgHist.add(ackMsg);
                             }
                             catch (IgniteCheckedException e) {
                                 U.error(log, "Failed to marshal discovery custom message.", e);
@@ -5446,12 +5369,8 @@ class ServerImpl extends TcpDiscoveryImpl {
             if (joiningEmpty && isLocalNodeCoordinator()) {
                 TcpDiscoveryCustomEventMessage msg;
 
-                while ((msg = pollPendingCustomeMessage()) != null) {
+                while ((msg = pollPendingCustomeMessage()) != null)
                     processCustomMessage(msg);
-
-                    if (msg.verified())
-                        msgHist.add(msg);
-                }
             }
         }
 
@@ -6005,24 +5924,22 @@ class ServerImpl extends TcpDiscoveryImpl {
                             }
                         }
                         else if (msg instanceof TcpDiscoveryClientReconnectMessage) {
-                            if (clientMsgWrk != null) {
-                                TcpDiscoverySpiState state = spiStateCopy();
+                            TcpDiscoverySpiState state = spiStateCopy();
 
-                                if (state == CONNECTED) {
-                                    spi.writeToSocket(msg, sock, RES_OK, sockTimeout);
+                            if (state == CONNECTED) {
+                                spi.writeToSocket(msg, sock, RES_OK, sockTimeout);
 
-                                    if (clientMsgWrk.getState() == State.NEW)
-                                        clientMsgWrk.start();
+                                if (clientMsgWrk != null && clientMsgWrk.getState() == State.NEW)
+                                    clientMsgWrk.start();
 
-                                    msgWorker.addMessage(msg);
+                                processClientReconnectMessage((TcpDiscoveryClientReconnectMessage)msg);
 
-                                    continue;
-                                }
-                                else {
-                                    spi.writeToSocket(msg, sock, RES_CONTINUE_JOIN, sockTimeout);
+                                continue;
+                            }
+                            else {
+                                spi.writeToSocket(msg, sock, RES_CONTINUE_JOIN, sockTimeout);
 
-                                    break;
-                                }
+                                break;
                             }
                         }
                         else if (msg instanceof TcpDiscoveryDuplicateIdMessage) {
@@ -6266,6 +6183,100 @@ class ServerImpl extends TcpDiscoveryImpl {
         }
 
         /**
+         * Processes client reconnect message.
+         *
+         * @param msg Client reconnect message.
+         */
+        private void processClientReconnectMessage(TcpDiscoveryClientReconnectMessage msg) {
+            UUID nodeId = msg.creatorNodeId();
+
+            UUID locNodeId = getLocalNodeId();
+
+            boolean isLocNodeRouter = msg.routerNodeId().equals(locNodeId);
+
+            TcpDiscoveryNode node = ring.node(nodeId);
+
+            assert node == null || node.isClient();
+
+            if (node != null) {
+                node.clientRouterNodeId(msg.routerNodeId());
+                node.clientAliveTime(spi.clientFailureDetectionTimeout());
+            }
+
+            if (!msg.verified()) {
+                if (isLocNodeRouter || isLocalNodeCoordinator()) {
+                    if (node != null) {
+                        Collection<TcpDiscoveryAbstractMessage> pending = msgHist.messages(msg.lastMessageId(), node);
+
+                        if (pending != null) {
+                            msg.verify(locNodeId);
+                            msg.pendingMessages(pending);
+                            msg.success(true);
+
+                            if (log.isDebugEnabled())
+                                log.debug("Accept client reconnect, restored pending messages " +
+                                    "[locNodeId=" + locNodeId + ", clientNodeId=" + nodeId + ']');
+                        }
+                        else if (!isLocalNodeCoordinator()) {
+                            if (log.isDebugEnabled())
+                                log.debug("Failed to restore pending messages for reconnecting client. " +
+                                    "Forwarding reconnection message to coordinator " +
+                                    "[locNodeId=" + locNodeId + ", clientNodeId=" + nodeId + ']');
+                        }
+                        else {
+                            msg.verify(locNodeId);
+
+                            if (log.isDebugEnabled())
+                                log.debug("Failing reconnecting client node because failed to restore pending " +
+                                    "messages [locNodeId=" + locNodeId + ", clientNodeId=" + nodeId + ']');
+
+                            TcpDiscoveryNodeFailedMessage nodeFailedMsg = new TcpDiscoveryNodeFailedMessage(locNodeId,
+                                node.id(), node.internalOrder());
+
+                            msgWorker.addMessage(nodeFailedMsg);
+                        }
+                    }
+                    else {
+                        msg.verify(locNodeId);
+
+                        if (log.isDebugEnabled())
+                            log.debug("Reconnecting client node is already failed [nodeId=" + nodeId + ']');
+                    }
+
+                    if (msg.verified() && isLocNodeRouter) {
+                        ClientMessageWorker wrk = clientMsgWorkers.get(nodeId);
+
+                        if (wrk != null)
+                            wrk.addMessage(msg);
+                        else if (log.isDebugEnabled())
+                            log.debug("Failed to reconnect client node (disconnected during the process) [locNodeId=" +
+                                locNodeId + ", clientNodeId=" + nodeId + ']');
+                    }
+                    else
+                        msgWorker.addMessage(msg);
+                }
+                else
+                    msgWorker.addMessage(msg);
+            }
+            else {
+                if (isLocalNodeCoordinator())
+                    msgWorker.addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id(), false));
+
+                if (isLocNodeRouter) {
+                    ClientMessageWorker wrk = clientMsgWorkers.get(nodeId);
+
+                    if (wrk != null)
+                        wrk.addMessage(msg);
+                    else if (log.isDebugEnabled())
+                        log.debug("Failed to reconnect client node (disconnected during the process) [locNodeId=" +
+                            locNodeId + ", clientNodeId=" + nodeId + ']');
+                }
+                else if (ring.hasRemoteNodes() && !isLocalNodeCoordinator())
+                    msgWorker.addMessage(msg);
+            }
+        }
+
+        /**
          * Processes client metrics update message.
          *
          * @param msg Client metrics update message.

http://git-wip-us.apache.org/repos/asf/ignite/blob/56a63f80/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
index b31e2e4..f3cf48d 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
@@ -299,9 +299,9 @@ abstract class TcpDiscoveryImpl {
     /**
      * <strong>FOR TEST ONLY!!!</strong>
      *
-     * @return Worker thread.
+     * @return Worker threads.
      */
-    protected abstract IgniteSpiThread workerThread();
+    protected abstract Collection<IgniteSpiThread> threads();
 
     /**
      * @throws IgniteSpiException If failed.

http://git-wip-us.apache.org/repos/asf/ignite/blob/56a63f80/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
index 689ac72..f1c826a 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
@@ -56,15 +56,9 @@ public class TcpClientDiscoverySpiFailureTimeoutSelfTest extends TcpClientDiscov
     /** */
     private final static long FAILURE_THRESHOLD = 10_000;
 
-    /** */
-    private final static long CLIENT_FAILURE_THRESHOLD = 30_000;
-
     /** Failure detection timeout for nodes configuration. */
     private static long failureThreshold = FAILURE_THRESHOLD;
 
-    /** Client failure detection timeout for nodes configuration. */
-    private static long clientFailureThreshold = CLIENT_FAILURE_THRESHOLD;
-
     /** */
     private static boolean useTestSpi;
 
@@ -75,7 +69,7 @@ public class TcpClientDiscoverySpiFailureTimeoutSelfTest extends TcpClientDiscov
 
     /** {@inheritDoc} */
     @Override protected long clientFailureDetectionTimeout() {
-        return clientFailureThreshold;
+        return clientFailureDetectionTimeout;
     }
 
     /** {@inheritDoc} */
@@ -153,7 +147,7 @@ public class TcpClientDiscoverySpiFailureTimeoutSelfTest extends TcpClientDiscov
      */
     public void testFailureTimeoutServerClient() throws Exception {
         failureThreshold = 3000;
-        clientFailureThreshold = 2000;
+        clientFailureDetectionTimeout = 2000;
 
         try {
             startServerNodes(1);
@@ -190,13 +184,12 @@ public class TcpClientDiscoverySpiFailureTimeoutSelfTest extends TcpClientDiscov
             long detectTime = failureDetectTime[0] - failureTime;
 
             assertTrue("Client node failure detected too fast: " + detectTime + "ms",
-                detectTime > clientFailureThreshold - 200);
+                detectTime > clientFailureDetectionTimeout - 200);
             assertTrue("Client node failure detected too slow:  " + detectTime + "ms",
-                detectTime < clientFailureThreshold + 5000);
+                detectTime < clientFailureDetectionTimeout + 5000);
         }
         finally {
             failureThreshold = FAILURE_THRESHOLD;
-            clientFailureThreshold = CLIENT_FAILURE_THRESHOLD;
         }
     }
 
@@ -207,7 +200,7 @@ public class TcpClientDiscoverySpiFailureTimeoutSelfTest extends TcpClientDiscov
      */
     public void testFailureTimeout3Server() throws Exception {
         failureThreshold = 1000;
-        clientFailureThreshold = 10000;
+        clientFailureDetectionTimeout = 10000;
         useTestSpi = true;
 
         try {
@@ -254,11 +247,10 @@ public class TcpClientDiscoverySpiFailureTimeoutSelfTest extends TcpClientDiscov
             assertTrue("Server node failure detected too fast: " + detectTime + "ms",
                 detectTime > failureThreshold - 100);
             assertTrue("Server node failure detected too slow:  " + detectTime + "ms",
-                detectTime < clientFailureThreshold);
+                detectTime < clientFailureDetectionTimeout);
         }
         finally {
             failureThreshold = FAILURE_THRESHOLD;
-            clientFailureThreshold = CLIENT_FAILURE_THRESHOLD;
             useTestSpi = false;
         }
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/56a63f80/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
index 329783e..ee88b0f 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
@@ -62,8 +62,8 @@ import org.apache.ignite.resources.IgniteInstanceResource;
 import org.apache.ignite.spi.IgniteSpiException;
 import org.apache.ignite.spi.IgniteSpiOperationTimeoutException;
 import org.apache.ignite.spi.IgniteSpiOperationTimeoutHelper;
+import org.apache.ignite.spi.IgniteSpiThread;
 import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
 import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
 import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientReconnectMessage;
@@ -73,6 +73,7 @@ import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddedMessage
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.jetbrains.annotations.Nullable;
+
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.MINUTES;
 import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_DISCONNECTED;
@@ -87,7 +88,7 @@ import static org.apache.ignite.events.EventType.EVT_NODE_SEGMENTED;
  */
 public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
     /** */
-    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+    private static final TcpDiscoveryVmIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
 
     /** */
     protected static final AtomicInteger srvIdx = new AtomicInteger();
@@ -123,6 +124,9 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
     private static CountDownLatch clientFailedLatch;
 
     /** */
+    private static CountDownLatch clientReconnectedLatch;
+
+    /** */
     private static CountDownLatch msgLatch;
 
     /** */
@@ -138,10 +142,13 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
     protected long netTimeout = TcpDiscoverySpi.DFLT_NETWORK_TIMEOUT;
 
     /** */
+    protected Integer reconnectCnt;
+
+    /** */
     private boolean longSockTimeouts;
 
     /** */
-    private long clientFailureDetectionTimeout = 1000;
+    protected long clientFailureDetectionTimeout = 1000;
 
     /** */
     private IgniteInClosure2X<TcpDiscoveryAbstractMessage, Socket> afterWrite;
@@ -207,6 +214,9 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
         disco.setJoinTimeout(joinTimeout);
         disco.setNetworkTimeout(netTimeout);
 
+        if (reconnectCnt != null)
+            disco.setReconnectCount(reconnectCnt);
+
         disco.setClientReconnectDisabled(reconnectDisabled);
 
         if (disco instanceof TestTcpDiscoverySpi)
@@ -253,6 +263,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
         clientIpFinder = null;
         joinTimeout = TcpDiscoverySpi.DFLT_JOIN_TIMEOUT;
         netTimeout = TcpDiscoverySpi.DFLT_NETWORK_TIMEOUT;
+        clientFailureDetectionTimeout = 1000;
         longSockTimeouts = false;
 
         assert G.allGrids().isEmpty();
@@ -558,6 +569,221 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
     }
 
     /**
+     * Client should reconnect to available server without EVT_CLIENT_NODE_RECONNECTED event.
+     *
+     * @throws Exception If failed.
+     */
+    public void testClientReconnectOnRouterSuspend() throws Exception {
+        reconnectAfterSuspend(false);
+    }
+
+    /**
+     * Client should receive all topology updates after reconnect.
+     *
+     * @throws Exception If failed.
+     */
+    public void testClientReconnectOnRouterSuspendTopologyChange() throws Exception {
+        reconnectAfterSuspend(true);
+    }
+
+    /**
+     * @param changeTop If {@code true} topology is changed after client disconnects
+     * @throws Exception if failed.
+     */
+    private void reconnectAfterSuspend(boolean changeTop) throws Exception {
+        reconnectCnt = 2;
+
+        startServerNodes(2);
+
+        Ignite srv0 = grid("server-0");
+        TcpDiscoveryNode srv0Node = (TcpDiscoveryNode)srv0.cluster().localNode();
+
+        TcpDiscoveryNode srv1Node = (TcpDiscoveryNode)grid("server-1").cluster().localNode();
+
+        clientIpFinder = new TcpDiscoveryVmIpFinder();
+
+        clientIpFinder.setAddresses(
+            Collections.singleton("localhost:" + srv0Node.discoveryPort()));
+
+        startClientNodes(1);
+
+        Ignite client = grid("client-0");
+        TcpDiscoveryNode clientNode = (TcpDiscoveryNode)client.cluster().localNode();
+        TestTcpDiscoverySpi clientSpi = (TestTcpDiscoverySpi)client.configuration().getDiscoverySpi();
+
+        UUID clientNodeId = clientNode.id();
+
+        checkNodes(2, 1);
+
+        clientIpFinder.setAddresses(Collections.singleton("localhost:" + srv1Node.discoveryPort()));
+
+        srvFailedLatch = new CountDownLatch(1);
+
+        attachListeners(2, 1);
+
+        log.info("Pausing router");
+
+        TestTcpDiscoverySpi srvSpi = (TestTcpDiscoverySpi)srv0.configuration().getDiscoverySpi();
+
+        int joinedNodesNum = 3;
+        final CountDownLatch srvJoinedLatch = new CountDownLatch(joinedNodesNum);
+
+        if (changeTop) {
+            client.events().localListen(new IgnitePredicate<Event>() {
+                @Override public boolean apply(Event e) {
+                    srvJoinedLatch.countDown();
+
+                    return true;
+                }
+            }, EVT_NODE_JOINED);
+        }
+
+        srvSpi.pauseAll(true);
+
+        if (changeTop)
+            startServerNodes(joinedNodesNum);
+
+        try {
+            await(srvFailedLatch, 60_000);
+
+            if (changeTop)
+                await(srvJoinedLatch, 5000);
+
+            assertEquals("connected", clientSpi.getSpiState());
+            assertEquals(clientNodeId, clientNode.id());
+            assertEquals(srv1Node.id(), clientNode.clientRouterNodeId());
+        }
+        finally {
+            srvSpi.resumeAll();
+        }
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testClientReconnectHistoryMissingOnRouter() throws Exception {
+        clientFailureDetectionTimeout = 60000;
+        netTimeout = 60000;
+
+        startServerNodes(2);
+
+        Ignite srv0 = grid("server-0");
+        TcpDiscoveryNode srv0Node = (TcpDiscoveryNode)srv0.cluster().localNode();
+
+        clientIpFinder = new TcpDiscoveryVmIpFinder();
+        clientIpFinder.setAddresses(
+            Collections.singleton("localhost:" + srv0Node.discoveryPort()));
+
+        startClientNodes(1);
+
+        attachListeners(0, 1);
+
+        Ignite client = grid("client-0");
+        TcpDiscoveryNode clientNode = (TcpDiscoveryNode)client.cluster().localNode();
+        TestTcpDiscoverySpi clientSpi = (TestTcpDiscoverySpi)client.configuration().getDiscoverySpi();
+        UUID clientNodeId = clientNode.id();
+
+        checkNodes(2, 1);
+
+        clientSpi.pauseAll(true);
+
+        stopGrid(srv0.name());
+
+        startServerNodes(1);
+
+        Ignite srv2 = grid("server-2");
+        TcpDiscoveryNode srv2Node = (TcpDiscoveryNode)srv2.cluster().localNode();
+        clientIpFinder.setAddresses(
+            Collections.singleton("localhost:" + srv2Node.discoveryPort()));
+
+        clientSpi.resumeAll();
+
+        awaitPartitionMapExchange();
+
+        assertEquals("connected", clientSpi.getSpiState());
+        assertEquals(clientNodeId, clientNode.id());
+        assertEquals(srv2Node.id(), clientNode.clientRouterNodeId());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReconnectAfterPause() throws Exception {
+        startServerNodes(2);
+        startClientNodes(1);
+
+        Ignite client = grid("client-0");
+        TestTcpDiscoverySpi clientSpi = (TestTcpDiscoverySpi)client.configuration().getDiscoverySpi();
+
+        clientReconnectedLatch = new CountDownLatch(1);
+
+        attachListeners(0, 1);
+
+        clientSpi.pauseAll(false);
+
+        try {
+            clientSpi.brakeConnection();
+
+            Thread.sleep(clientFailureDetectionTimeout() * 2);
+        }
+        finally {
+            clientSpi.resumeAll();
+        }
+
+        await(clientReconnectedLatch);
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testReconnectAfterMassiveTopologyChange() throws Exception {
+        clientIpFinder = IP_FINDER;
+
+        clientFailureDetectionTimeout = 60000;
+        netTimeout = 60000;
+
+        int initSrvsNum = 5;
+        int killNum = 3;
+        int iterations = 10;
+
+        startServerNodes(initSrvsNum);
+        startClientNodes(1);
+
+        Ignite client = grid("client-0");
+        TcpDiscoveryNode clientNode = (TcpDiscoveryNode)client.cluster().localNode();
+        TestTcpDiscoverySpi clientSpi = (TestTcpDiscoverySpi)client.configuration().getDiscoverySpi();
+        final UUID clientNodeId = clientNode.id();
+
+        final CountDownLatch srvJoinedLatch = new CountDownLatch(iterations * killNum);
+
+        client.events().localListen(new IgnitePredicate<Event>() {
+            @Override public boolean apply(Event e) {
+                srvJoinedLatch.countDown();
+
+                return true;
+            }
+        }, EVT_NODE_JOINED);
+
+        int minAliveSrvId = 0;
+
+        for (int i = 0; i < iterations; i++) {
+            for (int j = 0; j < killNum; j++) {
+                stopGrid(minAliveSrvId);
+
+                minAliveSrvId++;
+            }
+
+            startServerNodes(killNum);
+
+            awaitPartitionMapExchange();
+        }
+
+        await(srvJoinedLatch);
+        assertEquals("connected", clientSpi.getSpiState());
+        assertEquals(clientNodeId, clientNode.id());
+    }
+
+    /**
      * @throws Exception If failed.
      */
     public void testClientReconnectOnNetworkProblem() throws Exception {
@@ -1410,17 +1636,16 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
 
         srvSpi.failNode(client.cluster().localNode().id(), null);
 
-        if (changeTop) {
-            Ignite g = startGrid("server-" + srvIdx.getAndIncrement());
+        assertTrue(disconnectLatch.await(5000, MILLISECONDS));
+        assertTrue(failLatch.await(5000, MILLISECONDS));
 
-            srvNodeIds.add(g.cluster().localNode().id());
+        if (changeTop) {
+            startServerNodes(1);
 
             clientSpi.resumeAll();
         }
 
-        assertTrue(disconnectLatch.await(5000, MILLISECONDS));
         assertTrue(reconnectLatch.await(5000, MILLISECONDS));
-        assertTrue(failLatch.await(5000, MILLISECONDS));
         assertTrue(joinLatch.await(5000, MILLISECONDS));
 
         long topVer = changeTop ? 5L : 4L;
@@ -2026,6 +2251,20 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
                 }, EVT_NODE_FAILED);
             }
         }
+
+        if (clientReconnectedLatch != null) {
+            for (int i = 0; i < clientCnt; i++) {
+                G.ignite("client-" + i).events().localListen(new IgnitePredicate<Event>() {
+                    @Override public boolean apply(Event evt) {
+                        info("Reconnected event fired on client: " + evt);
+
+                        clientReconnectedLatch.countDown();
+
+                        return true;
+                    }
+                }, EVT_CLIENT_NODE_RECONNECTED);
+            }
+        }
     }
 
     /**
@@ -2095,7 +2334,16 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
      * @throws InterruptedException If interrupted.
      */
     protected void await(CountDownLatch latch) throws InterruptedException {
-        assertTrue("Latch count: " + latch.getCount(), latch.await(awaitTime(), MILLISECONDS));
+        await(latch, awaitTime());
+    }
+
+    /**
+     * @param latch Latch.
+     * @param timeout Timeout.
+     * @throws InterruptedException If interrupted.
+     */
+    protected void await(CountDownLatch latch, long timeout) throws InterruptedException {
+        assertTrue("Latch count: " + latch.getCount(), latch.await(timeout, MILLISECONDS));
     }
 
     /**
@@ -2324,8 +2572,10 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
         public void pauseAll(boolean suspend) {
             pauseResumeOperation(true, openSockLock, writeLock);
 
-            if (suspend)
-                impl.workerThread().suspend();
+            if (suspend) {
+                for (Thread t : impl.threads())
+                    t.suspend();
+            }
         }
 
         /**
@@ -2334,7 +2584,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
         public void resumeAll() {
             pauseResumeOperation(false, openSockLock, writeLock);
 
-            impl.workerThread().resume();
+            for (IgniteSpiThread t : impl.threads())
+                t.resume();
         }
 
         /** {@inheritDoc} */


[2/5] ignite git commit: IGNITE-6768 .NET: Thin client: Fix cache id calculation

Posted by sb...@apache.org.
IGNITE-6768 .NET: Thin client: Fix cache id calculation

Do not force lower case


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/717c5492
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/717c5492
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/717c5492

Branch: refs/heads/ignite-3478
Commit: 717c549248eb377dd0dc7b28c8707d2e496c5a4e
Parents: e228ce3
Author: Pavel Tupitsyn <pt...@apache.org>
Authored: Fri Oct 27 12:56:37 2017 +0300
Committer: Pavel Tupitsyn <pt...@apache.org>
Committed: Fri Oct 27 12:56:37 2017 +0300

----------------------------------------------------------------------
 .../Binary/BinaryBuilderSelfTest.cs             | 10 ++---
 .../Binary/EnumsTest.cs                         |  2 +-
 .../Client/Cache/CacheTest.cs                   | 42 ++++++++++++++++++++
 .../Client/RawSocketTest.cs                     |  2 +-
 .../Impl/Binary/BinaryUtils.cs                  | 30 +++++++++++---
 .../Impl/Binary/Marshaller.cs                   |  4 +-
 .../Impl/Binary/SerializableSerializer.cs       | 18 ++++-----
 7 files changed, 85 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/717c5492/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Binary/BinaryBuilderSelfTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Binary/BinaryBuilderSelfTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Binary/BinaryBuilderSelfTest.cs
index 61f90a3..5837ab1 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Binary/BinaryBuilderSelfTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Binary/BinaryBuilderSelfTest.cs
@@ -1600,7 +1600,7 @@ namespace Apache.Ignite.Core.Tests.Binary
 
             Assert.AreEqual(IdMapper.TestTypeId, _grid.GetBinary().GetTypeId(IdMapper.TestTypeName));
             
-            Assert.AreEqual(BinaryUtils.GetStringHashCode("someTypeName"), _grid.GetBinary().GetTypeId("someTypeName"));
+            Assert.AreEqual(BinaryUtils.GetStringHashCodeLowerCase("someTypeName"), _grid.GetBinary().GetTypeId("someTypeName"));
         }
 
         /// <summary>
@@ -1615,7 +1615,7 @@ namespace Apache.Ignite.Core.Tests.Binary
 
             var binType = bin.GetBinaryType();
 
-            Assert.AreEqual(BinaryUtils.GetStringHashCode(NameMapper.TestTypeName + "_"), binType.TypeId);
+            Assert.AreEqual(BinaryUtils.GetStringHashCodeLowerCase(NameMapper.TestTypeName + "_"), binType.TypeId);
             Assert.AreEqual(17, bin.GetField<int>(NameMapper.TestFieldName));
         }
 
@@ -1666,7 +1666,7 @@ namespace Apache.Ignite.Core.Tests.Binary
             var enumVal = TestEnumRegistered.Two;
             var intVal = (int) enumVal;
             var typeName = GetTypeName(typeof(TestEnumRegistered));
-            var typeId = BinaryUtils.GetStringHashCode(typeName);
+            var typeId = BinaryUtils.GetStringHashCodeLowerCase(typeName);
 
             var binEnums = new[]
             {
@@ -2170,13 +2170,13 @@ namespace Apache.Ignite.Core.Tests.Binary
         /** <inheritdoc /> */
         public int GetTypeId(string typeName)
         {
-            return typeName == TestTypeName ? TestTypeId : BinaryUtils.GetStringHashCode(typeName);
+            return typeName == TestTypeName ? TestTypeId : BinaryUtils.GetStringHashCodeLowerCase(typeName);
         }
 
         /** <inheritdoc /> */
         public int GetFieldId(int typeId, string fieldName)
         {
-            return BinaryUtils.GetStringHashCode(fieldName);
+            return BinaryUtils.GetStringHashCodeLowerCase(fieldName);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/717c5492/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Binary/EnumsTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Binary/EnumsTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Binary/EnumsTest.cs
index 18ef29a..8993fb4 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Binary/EnumsTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Binary/EnumsTest.cs
@@ -88,7 +88,7 @@ namespace Apache.Ignite.Core.Tests.Binary
                 else
                 {
                     Assert.AreEqual(string.Format("BinaryEnum [typeId={0}, enumValue={1}]",
-                        BinaryUtils.GetStringHashCode(typeof(T).FullName), binRes.EnumValue), binRes.ToString());
+                        BinaryUtils.GetStringHashCodeLowerCase(typeof(T).FullName), binRes.EnumValue), binRes.ToString());
                 }
             }
             else

http://git-wip-us.apache.org/repos/asf/ignite/blob/717c5492/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/CacheTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/CacheTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/CacheTest.cs
index f2dd1de..cfdce73 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/CacheTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/CacheTest.cs
@@ -845,6 +845,48 @@ namespace Apache.Ignite.Core.Tests.Client.Cache
             }
         }
 
+        /// <summary>
+        /// Tests various cache names.
+        /// Cache id as calculated as a hash code and passed to the server side; this test verifies correct id
+        /// calculation for different strings.
+        /// </summary>
+        [Test]
+        public void TestCacheNames()
+        {
+            var cacheNames = new[]
+            {
+                "foo-bar",
+                "Foo-Bar",
+                "FOO-BAR",
+                "testCache1",
+                "TestCache2",
+                "TESTCACHE3",
+                new string('c', 100),
+                new string('C', 100),
+                Guid.NewGuid().ToString(),
+                "тест",
+                "Тест",
+                "ТЕСТ",
+                "тест1",
+                "Тест2",
+                "ТЕСТ3"
+            };
+
+            var ignite = Ignition.GetIgnite();
+
+            for (var i = 0; i < cacheNames.Length; i++)
+            {
+                var cacheName = cacheNames[i];
+                ignite.CreateCache<int, string>(cacheName).Put(i, cacheName);
+
+                using (var client = GetClient())
+                {
+                    var cache = client.GetCache<int, string>(cacheName);
+                    Assert.AreEqual(cacheName, cache[i]);
+                }
+            }
+        }
+
         private class Container
         {
             public Container Inner;

http://git-wip-us.apache.org/repos/asf/ignite/blob/717c5492/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/RawSocketTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/RawSocketTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/RawSocketTest.cs
index b637e88..0f1358a 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/RawSocketTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/RawSocketTest.cs
@@ -54,7 +54,7 @@ namespace Apache.Ignite.Core.Tests.Client
             {
                 stream.WriteShort(1); // OP_GET
                 stream.WriteLong(1); // Request id.
-                var cacheId = BinaryUtils.GetStringHashCode(cache.Name);
+                var cacheId = BinaryUtils.GetStringHashCodeLowerCase(cache.Name);
                 stream.WriteInt(cacheId);
                 stream.WriteByte(0); // Flags (withSkipStore, etc)
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/717c5492/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryUtils.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryUtils.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryUtils.cs
index 5233db8..20fea02 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryUtils.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryUtils.cs
@@ -23,12 +23,10 @@ namespace Apache.Ignite.Core.Impl.Binary
     using System.Diagnostics;
     using System.Diagnostics.CodeAnalysis;
     using System.IO;
-    using System.Linq;
     using System.Reflection;
     using System.Runtime.InteropServices;
     using System.Text;
     using Apache.Ignite.Core.Binary;
-    using Apache.Ignite.Core.Cache.Affinity;
     using Apache.Ignite.Core.Impl.Binary.IO;
     using Apache.Ignite.Core.Impl.Common;
 
@@ -1037,6 +1035,8 @@ namespace Apache.Ignite.Core.Impl.Binary
             {
                 var elemType = val.GetType().GetElementType();
 
+                Debug.Assert(elemType != null);
+
                 var typeId = ObjTypeId;
 
                 if (elemType != typeof(object))
@@ -1333,9 +1333,9 @@ namespace Apache.Ignite.Core.Impl.Binary
         }
 
         /// <summary>
-        /// Gets the string hash code using Java algorithm.
+        /// Gets the string hash code using Java algorithm, converting English letters to lower case.
         /// </summary>
-        public static int GetStringHashCode(string val)
+        public static int GetStringHashCodeLowerCase(string val)
         {
             if (val == null)
                 return 0;
@@ -1353,6 +1353,26 @@ namespace Apache.Ignite.Core.Impl.Binary
         }
 
         /// <summary>
+        /// Gets the string hash code using Java algorithm.
+        /// </summary>
+        private static int GetStringHashCode(string val)
+        {
+            if (val == null)
+                return 0;
+
+            int hash = 0;
+
+            unchecked
+            {
+                // ReSharper disable once LoopCanBeConvertedToQuery (performance)
+                foreach (var c in val)
+                    hash = 31 * hash + c;
+            }
+
+            return hash;
+        }
+
+        /// <summary>
         /// Gets the cache identifier.
         /// </summary>
         public static int GetCacheId(string cacheName)
@@ -1447,7 +1467,7 @@ namespace Apache.Ignite.Core.Impl.Binary
             }
 
             if (id == 0)
-                id = GetStringHashCode(fieldName);
+                id = GetStringHashCodeLowerCase(fieldName);
 
             if (id == 0)
                 throw new BinaryObjectException("Field ID is zero (please provide ID mapper or change field name) " +

http://git-wip-us.apache.org/repos/asf/ignite/blob/717c5492/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Marshaller.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Marshaller.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Marshaller.cs
index 55b6121..7212cd6 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Marshaller.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Marshaller.cs
@@ -694,7 +694,7 @@ namespace Apache.Ignite.Core.Impl.Binary
 
             if (typeId == 0)
             {
-                typeId = BinaryUtils.GetStringHashCode(typeName);
+                typeId = BinaryUtils.GetStringHashCodeLowerCase(typeName);
             }
 
             AddType(type, typeId, typeName, false, false, null, null, serializer, affKeyFldName, false);
@@ -826,7 +826,7 @@ namespace Apache.Ignite.Core.Impl.Binary
 
             if (id == 0)
             {
-                id = BinaryUtils.GetStringHashCode(typeName);
+                id = BinaryUtils.GetStringHashCodeLowerCase(typeName);
             }
 
             return id;

http://git-wip-us.apache.org/repos/asf/ignite/blob/717c5492/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/SerializableSerializer.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/SerializableSerializer.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/SerializableSerializer.cs
index 80f267a..fc91edb 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/SerializableSerializer.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/SerializableSerializer.cs
@@ -174,7 +174,7 @@ namespace Apache.Ignite.Core.Impl.Binary
 
             foreach (var dotNetField in dotNetFields)
             {
-                writer.WriteInt(BinaryUtils.GetStringHashCode(dotNetField));
+                writer.WriteInt(BinaryUtils.GetStringHashCodeLowerCase(dotNetField));
             }
         }
 
@@ -617,49 +617,49 @@ namespace Apache.Ignite.Core.Impl.Binary
             {
                 if (fieldType == typeof(byte))
                 {
-                    return dotNetFields.Contains(BinaryUtils.GetStringHashCode(fieldName)) 
+                    return dotNetFields.Contains(BinaryUtils.GetStringHashCodeLowerCase(fieldName)) 
                         ? (sbyte) (byte) fieldVal : fieldVal;
                 }
 
                 if (fieldType == typeof(short))
                 {
-                    return dotNetFields.Contains(BinaryUtils.GetStringHashCode(fieldName)) 
+                    return dotNetFields.Contains(BinaryUtils.GetStringHashCodeLowerCase(fieldName)) 
                         ? (ushort) (short) fieldVal : fieldVal;
                 }
 
                 if (fieldType == typeof(int))
                 {
-                    return dotNetFields.Contains(BinaryUtils.GetStringHashCode(fieldName)) 
+                    return dotNetFields.Contains(BinaryUtils.GetStringHashCodeLowerCase(fieldName)) 
                         ? (uint) (int) fieldVal : fieldVal;
                 }
 
                 if (fieldType == typeof(long))
                 {
-                    return dotNetFields.Contains(BinaryUtils.GetStringHashCode(fieldName)) 
+                    return dotNetFields.Contains(BinaryUtils.GetStringHashCodeLowerCase(fieldName)) 
                         ? (ulong) (long) fieldVal : fieldVal;
                 }
 
                 if (fieldType == typeof(byte[]))
                 {
-                    return dotNetFields.Contains(BinaryUtils.GetStringHashCode(fieldName)) 
+                    return dotNetFields.Contains(BinaryUtils.GetStringHashCodeLowerCase(fieldName)) 
                         ? ConvertArray<byte, sbyte>((byte[]) fieldVal) : fieldVal;
                 }
 
                 if (fieldType == typeof(short[]))
                 {
-                    return dotNetFields.Contains(BinaryUtils.GetStringHashCode(fieldName))
+                    return dotNetFields.Contains(BinaryUtils.GetStringHashCodeLowerCase(fieldName))
                         ? ConvertArray<short, ushort>((short[]) fieldVal) : fieldVal;
                 }
 
                 if (fieldType == typeof(int[]))
                 {
-                    return dotNetFields.Contains(BinaryUtils.GetStringHashCode(fieldName)) 
+                    return dotNetFields.Contains(BinaryUtils.GetStringHashCodeLowerCase(fieldName)) 
                         ? ConvertArray<int, uint>((int[]) fieldVal) : fieldVal;
                 }
 
                 if (fieldType == typeof(long[]))
                 {
-                    return dotNetFields.Contains(BinaryUtils.GetStringHashCode(fieldName)) 
+                    return dotNetFields.Contains(BinaryUtils.GetStringHashCodeLowerCase(fieldName)) 
                         ? ConvertArray<long, ulong>((long[]) fieldVal) : fieldVal;
                 }
             }


[4/5] ignite git commit: .NET: Fix TestAsciiChars

Posted by sb...@apache.org.
.NET: Fix TestAsciiChars


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/031f63c2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/031f63c2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/031f63c2

Branch: refs/heads/ignite-3478
Commit: 031f63c2e295bad90428e6d8e0edb7d7fafd466b
Parents: 56a63f8
Author: Pavel Tupitsyn <pt...@apache.org>
Authored: Fri Oct 27 14:29:30 2017 +0300
Committer: Pavel Tupitsyn <pt...@apache.org>
Committed: Fri Oct 27 14:29:30 2017 +0300

----------------------------------------------------------------------
 .../dotnet/Apache.Ignite.Core.Tests/ProjectFilesTest.cs      | 8 +++++++-
 1 file changed, 7 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/031f63c2/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ProjectFilesTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ProjectFilesTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ProjectFilesTest.cs
index 0c0d0f9..dff7ed7 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ProjectFilesTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ProjectFilesTest.cs
@@ -123,7 +123,13 @@ namespace Apache.Ignite.Core.Tests
         [Test]
         public void TestAsciiChars()
         {
-            var allowedFiles = new[] {"BinaryStringTest.cs", "BinarySelfTest.cs", "CacheDmlQueriesTest.cs"};
+            var allowedFiles = new[]
+            {
+                "BinaryStringTest.cs",
+                "BinarySelfTest.cs", 
+                "CacheDmlQueriesTest.cs",
+                "CacheTest.cs"
+            };
 
             var srcFiles = GetDotNetSourceDir()
                 .GetFiles("*.cs", SearchOption.AllDirectories)


[5/5] ignite git commit: Merge remote-tracking branch 'remotes/origin/master' into ignite-3478

Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/origin/master' into ignite-3478


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6466adf5
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6466adf5
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6466adf5

Branch: refs/heads/ignite-3478
Commit: 6466adf54f017219a16cf9835ee0214853db269e
Parents: d46a039 031f63c
Author: sboikov <sb...@gridgain.com>
Authored: Fri Oct 27 15:14:39 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Oct 27 15:14:39 2017 +0300

----------------------------------------------------------------------
 .../apache/ignite/IgniteSystemProperties.java   |   6 +
 .../org/apache/ignite/internal/IgnitionEx.java  |  34 +-
 .../ignite/spi/discovery/tcp/ClientImpl.java    |  52 +++-
 .../ignite/spi/discovery/tcp/ServerImpl.java    | 311 ++++++++++---------
 .../spi/discovery/tcp/TcpDiscoveryImpl.java     |   4 +-
 ...lientDiscoverySpiFailureTimeoutSelfTest.java |  20 +-
 .../tcp/TcpClientDiscoverySpiSelfTest.java      | 275 +++++++++++++++-
 .../Binary/BinaryBuilderSelfTest.cs             |  10 +-
 .../Binary/EnumsTest.cs                         |   2 +-
 .../Client/Cache/CacheTest.cs                   |  42 +++
 .../Client/RawSocketTest.cs                     |   2 +-
 .../ProjectFilesTest.cs                         |   8 +-
 .../Impl/Binary/BinaryUtils.cs                  |  30 +-
 .../Impl/Binary/Marshaller.cs                   |   4 +-
 .../Impl/Binary/SerializableSerializer.cs       |  18 +-
 15 files changed, 597 insertions(+), 221 deletions(-)
----------------------------------------------------------------------