You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by am...@apache.org on 2019/12/17 14:55:27 UTC

[ignite] branch ignite-11320 created (now 44132fb)

This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a change to branch ignite-11320
in repository https://gitbox.apache.org/repos/asf/ignite.git.


      at 44132fb  IGNITE-11320: Support for individual reconnect in case of best effort affinity mode added.

This branch includes the following new commits:

     new 44132fb  IGNITE-11320: Support for individual reconnect in case of best effort affinity mode added.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[ignite] 01/01: IGNITE-11320: Support for individual reconnect in case of best effort affinity mode added.

Posted by am...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch ignite-11320
in repository https://gitbox.apache.org/repos/asf/ignite.git

commit 44132fbf53c71acff5e2098606e0289c5bbf6d1a
Author: alapin <la...@gmail.com>
AuthorDate: Wed Jun 5 15:47:35 2019 +0300

    IGNITE-11320: Support for individual reconnect in case of best effort affinity mode added.
---
 ...teJdbcThinDriverAffinityAwarenessTestSuite.java |   2 +
 ...cThinAffinityAwarenessReconnectionSelfTest.java | 397 +++++++++++++++++++
 .../thin/JdbcThinAffinityAwarenessSelfTest.java    | 104 ++---
 .../jdbc/thin/JdbcThinConnectionSelfTest.java      |   5 +-
 .../ignite/internal/jdbc/thin/AffinityCache.java   |   2 +-
 .../internal/jdbc/thin/JdbcThinConnection.java     | 426 +++++++++++++++------
 .../ignite/internal/jdbc/thin/JdbcThinTcpIo.java   |  23 +-
 7 files changed, 765 insertions(+), 194 deletions(-)

diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcThinDriverAffinityAwarenessTestSuite.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcThinDriverAffinityAwarenessTestSuite.java
index 888d65e..3937fe2 100644
--- a/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcThinDriverAffinityAwarenessTestSuite.java
+++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcThinDriverAffinityAwarenessTestSuite.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.jdbc.suite;
 
 import org.apache.ignite.jdbc.thin.JdbcThinAbstractSelfTest;
+import org.apache.ignite.jdbc.thin.JdbcThinAffinityAwarenessReconnectionSelfTest;
 import org.apache.ignite.jdbc.thin.JdbcThinAffinityAwarenessSelfTest;
 import org.apache.ignite.jdbc.thin.JdbcThinAffinityAwarenessTransactionsSelfTest;
 import org.apache.ignite.jdbc.thin.JdbcThinConnectionSelfTest;
@@ -38,6 +39,7 @@ import org.junit.runners.Suite;
     JdbcThinStatementSelfTest.class,
     JdbcThinAffinityAwarenessSelfTest.class,
     JdbcThinAffinityAwarenessTransactionsSelfTest.class,
+    JdbcThinAffinityAwarenessReconnectionSelfTest.class,
 })
 public class IgniteJdbcThinDriverAffinityAwarenessTestSuite {
     /**
diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinAffinityAwarenessReconnectionSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinAffinityAwarenessReconnectionSelfTest.java
new file mode 100644
index 0000000..f612b5b
--- /dev/null
+++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinAffinityAwarenessReconnectionSelfTest.java
@@ -0,0 +1,397 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.jdbc.thin;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.logging.Handler;
+import java.util.logging.Level;
+import java.util.logging.LogRecord;
+import java.util.logging.Logger;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.jdbc.thin.JdbcThinConnection;
+import org.apache.ignite.internal.jdbc.thin.JdbcThinTcpIo;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Test;
+
+/**
+ * Jdbc thin affinity awareness reconnection test.
+ */
+public class JdbcThinAffinityAwarenessReconnectionSelfTest extends JdbcThinAbstractSelfTest {
+    /** URL. */
+    private static final String URL = "jdbc:ignite:thin://127.0.0.1:10800..10802?affinityAwareness=true";
+
+    /** Nodes count. */
+    private static final int INITIAL_NODES_CNT = 3;
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGridsMultiThreaded(INITIAL_NODES_CNT);
+    }
+
+    /**
+     * Check that background connection establishment works as expected.
+     * <p>
+     * Within new reconnection logic in affinity awareness mode when {@code JdbcThinConnection} is created
+     * it eagerly establishes a connection to one and only one ignite node. All other connections to nodes specified in
+     * connection properties are established by background thread.
+     * <p>
+     * So in given test we specify url with 3 different ports and verify that 3 connections will be created:
+     * one in eager mode and two within background thread. It takes some time for background thread to create
+     * a connection, and cause, in addition to that it runs periodically with some delay,
+     * {@code GridTestUtils.waitForCondition} is used in order to check that all expected connections are established.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testBackgroundConnectionEstablishment() throws Exception {
+        try (Connection conn = DriverManager.getConnection(URL)) {
+            Map<UUID, JdbcThinTcpIo> ios = GridTestUtils.getFieldValue(conn, "ios");
+
+            assertConnectionsCount(ios, 3);
+        }
+    }
+
+    /**
+     * Test connection failover:
+     * <ol>
+     * <li>Check that at the beginning of test {@code INITIAL_NODES_CNT} connections are established.</li>
+     * <li>Stop one node, invalidate dead connection (jdbc thin, won't detect that node has gone,
+     *   until it tries to touch it) and verify, that connections count has decremented. </li>
+     * <li>Start, previously stopped node, and check that connections count also restored to initial value.</li>
+     * </ol>
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testConnectionFailover() throws Exception {
+        try (Connection conn = DriverManager.getConnection(URL)) {
+            Map<UUID, JdbcThinTcpIo> ios = GridTestUtils.getFieldValue(conn, "ios");
+
+            assertConnectionsCount(ios, INITIAL_NODES_CNT);
+
+            assertEquals("Unexpected connections count.", INITIAL_NODES_CNT, ios.size());
+
+            stopGrid(1);
+
+            invalidateConnectionToStoppedNode(conn);
+
+            assertEquals("Unexpected connections count.", INITIAL_NODES_CNT - 1, ios.size());
+
+            startGrid(1);
+
+            assertConnectionsCount(ios, INITIAL_NODES_CNT);
+        }
+    }
+
+    /**
+     * Test total connection failover:
+     * <ol>
+     * <li>Check that at the beginning of test {@code INITIAL_NODES_CNT} connections are established.</li>
+     * <li>Stop all nodes, invalidate dead connections (jdbc thin, won't detect that node has gone,
+     *   until it tries to touch it) and verify, that connections count equals to zero. </li>
+     * <li>Start, previously stopped nodes, and check that connections count also restored to initial value.</li>
+     * </ol>
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void  testTotalConnectionFailover() throws Exception {
+        try(Connection conn = DriverManager.getConnection(URL)) {
+            Map<UUID, JdbcThinTcpIo> ios = GridTestUtils.getFieldValue(conn, "ios");
+
+            assertConnectionsCount(ios, INITIAL_NODES_CNT);
+
+            for (int i = 0; i < INITIAL_NODES_CNT; i++) {
+                stopGrid(i);
+                invalidateConnectionToStoppedNode(conn);
+            }
+
+            assertConnectionsCount(ios, 0);
+
+            for (int i = 0; i < INITIAL_NODES_CNT; i++)
+                startGrid(i);
+
+            assertConnectionsCount(ios, INITIAL_NODES_CNT);
+        }
+    }
+
+    /**
+     * Test eager connection failover:
+     * <ol>
+     * <li>Check that at the beginning of test {@code INITIAL_NODES_CNT} connections are established.</li>
+     * <li>Stop all nodes, invalidate dead connections (jdbc thin, won't detect that node has gone,
+     *   until it tries to touch it) and verify, that connections count equals to zero. </li>
+     * <li>Wait for some time, in order for reconnection thread to increase delay between connection attempts,
+     *   because of reconnection failures.</li>
+     * <li>Start, previously stopped nodes, and send simple query immediately. Eager reconnection is expected.
+     * <b>NOTE</b>:There's still a chance that connection would be recreated by background thread and not eager process.
+     *   In order to decrease given possibility we've waited for some time on previous step.</li>
+     * <li>Ensure that after some time all connections will be restored.</li>
+     * </ol>
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testEagerConnectionFailover() throws Exception {
+        try(Connection conn = DriverManager.getConnection(URL)) {
+            Map<UUID, JdbcThinTcpIo> ios = GridTestUtils.getFieldValue(conn, "ios");
+
+            assertConnectionsCount(ios, INITIAL_NODES_CNT);
+
+            for (int i = 0; i < INITIAL_NODES_CNT; i++) {
+                stopGrid(i);
+                invalidateConnectionToStoppedNode(conn);
+            }
+
+            assertEquals("Unexpected connections count.", 0, ios.size());
+
+            doSleep(4 * JdbcThinConnection.RECONNECTION_DELAY);
+
+            for (int i = 0; i < INITIAL_NODES_CNT; i++)
+                startGrid(i);
+
+            conn.createStatement().execute("select 1;");
+
+            assertConnectionsCount(ios, INITIAL_NODES_CNT);
+        }
+    }
+
+    /**
+     * Check that reconnection thread increases delay between unsuccessful connection attempts:
+     * <ol>
+     * <li>Specify two inet addresses one valid and one inoperative.</li>
+     * <li>Wait for specific amount of time. The reconnection logic suppose to increase delays between reconnection
+     *   attempts. The basic idea is very simple: delay is doubled on evey connection failure until connection succeeds
+     *   or until delay exceeds predefined maximum value {@code JdbcThinConnection.RECONNECTION_MAX_DELAY}
+     *   <pre>
+     *   |_|_ _|_ _ _ _|_ _ _ _ _ _ _ _|
+     *   where: '|' is connection attempt;
+     *          '_' is an amount of time that reconnection tread waits, equal to JdbcThinConnection.RECONNECTION_DELAY;
+     *
+     *   so if we wait for 9 * RECONNECTION_DELAY, we expect to see exact four connection attempts:
+     *   |_|_ _|_ _ _ _|_ _^_ _ _ _ _ _|
+     *   </pre>
+     *   </li>
+     * <li>Check that there were exact four reconnection attempts. In order to do this, we check logs, expecting to see
+     * four warning messages there.</li>
+     * </ol>
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testReconnectionDelayIncreasing() throws Exception {
+        Logger log = Logger.getLogger(JdbcThinConnection.class.getName());
+        LogHandler hnd = new LogHandler();
+        hnd.setLevel(Level.ALL);
+        log.setUseParentHandlers(false);
+        log.addHandler(hnd);
+        log.setLevel(Level.ALL);
+
+        try (Connection ignored = DriverManager.getConnection(
+            "jdbc:ignite:thin://127.0.0.1:10800,127.0.0.1:10810?affinityAwareness=true")) {
+            hnd.records.clear();
+
+            doSleep(9 * JdbcThinConnection.RECONNECTION_DELAY);
+
+            assertEquals("Unexpected log records count.", 4, hnd.records.size());
+
+            String expRecordMsg = "Failed to connect to Ignite node " +
+                "[url=jdbc:ignite:thin://127.0.0.1:10800,127.0.0.1:10810]. address = [localhost/127.0.0.1:10810].";
+
+            for (LogRecord record: hnd.records) {
+                assertEquals("Unexpected log record text.", expRecordMsg, record.getMessage());
+                assertEquals("Unexpected log level", Level.WARNING, record.getLevel());
+            }
+        }
+    }
+
+    /**
+     * Check that reconnection thread selectively increases delay between unsuccessful connection attempts:
+     * <ol>
+     * <li>Create {@code JdbcThinConnection} with two valid inet addresses.</li>
+     * <li>Stop one node and invalidate corresponding connection. Ensure that only one connection left.</li>
+     * <li>Wait for specific amount of time. The reconnection logic suppose to increase delays between reconnection
+     *   attempts. The basic idea is very simple: delay is doubled on evey connection failure until connection succeeds
+     *   or until delay exceeds predefined maximum value {@code JdbcThinConnection.RECONNECTION_MAX_DELAY}
+     *   <pre>
+     *   |_|_ _|_ _ _ _|_ _ _ _ _ _ _ _|
+     *   where: '|' is connection attempt;
+     *          '_' is an amount of time that reconnection tread waits, equal to JdbcThinConnection.RECONNECTION_DELAY;
+     *
+     *   so if we wait for 9 * RECONNECTION_DELAY, we expect to see exact four connection attempts:
+     *   |_|_ _|_ _ _ _|_ _^_ _ _ _ _ _|
+     *   </pre>
+     *   </li>
+     * <li>Check that there were exact four reconnection attempts. In order to do this, we check logs, expecting to see
+     *   four warning messages there.</li>
+     * <li>Start previously stopped node.</li>
+     * <li>Wait until next reconnection attempt.</li>
+     * <li>Check that both connections are established and that there are no warning messages within logs.</li>
+     * <li>One more time: stop one node and invalidate corresponding connection.
+     *   Ensure that only one connection left.</li>
+     * <li>Wait for some time.</li>
+     * <li>Ensure that delay between reconnection was reset to initial value.
+     *   In other words, we again expect four warning messages within logs.</li>
+     * </ol>
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testReconnectionDelaySelectiveIncreasing() throws Exception {
+        Logger log = Logger.getLogger(JdbcThinConnection.class.getName());
+        LogHandler hnd = new LogHandler();
+        hnd.setLevel(Level.ALL);
+        log.setUseParentHandlers(false);
+        log.addHandler(hnd);
+        log.setLevel(Level.ALL);
+
+        try (Connection conn = DriverManager.getConnection(
+            "jdbc:ignite:thin://127.0.0.1:10800..10801?affinityAwareness=true")) {
+            // Stop one node and invalidate corresponding connection. Ensure that only one connection left.
+            stopGrid(0);
+
+            invalidateConnectionToStoppedNode(conn);
+
+            Map<UUID, JdbcThinTcpIo> ios = GridTestUtils.getFieldValue(conn, "ios");
+
+            assertEquals("Unexpected connections count.", 1, ios.size());
+
+            hnd.records.clear();
+
+            // Wait for some specific amount of time and ensure that there were exact four reconnection attempts.
+            doSleep(9 * JdbcThinConnection.RECONNECTION_DELAY);
+
+            assertEquals("Unexpected log records count.", 4, hnd.records.size());
+
+            String expRecordMsg = "Failed to connect to Ignite node [url=jdbc:ignite:thin://127.0.0.1:10800..10801]." +
+                " address = [localhost/127.0.0.1:10800].";
+
+            for (LogRecord record: hnd.records) {
+                assertEquals("Unexpected log record text.", expRecordMsg, record.getMessage());
+                assertEquals("Unexpected log level", Level.WARNING, record.getLevel());
+            }
+
+            // Start previously stopped node.
+            startGrid(0);
+
+            hnd.records.clear();
+
+            // Waiting until next reconnection attempt.
+            doSleep(9 * JdbcThinConnection.RECONNECTION_DELAY);
+
+            // Checking that both connections are established and that there are no warning messages within logs.
+            assertEquals("Unexpected log records count.", 0, hnd.records.size());
+
+            assertEquals("Unexpected connections count.", 2, ios.size());
+
+            // One more time: stop one node, invalidate corresponding connection and ensure that only one connection
+            // left.
+            stopGrid(0);
+
+            invalidateConnectionToStoppedNode(conn);
+
+            assertEquals("Unexpected connections count.", 1, ios.size());
+
+            hnd.records.clear();
+
+            // Wait for some time and ensure that delay between reconnection was reset to initial value.
+            doSleep(9 * JdbcThinConnection.RECONNECTION_DELAY);
+
+            assertEquals("Unexpected log records count.", 4, hnd.records.size());
+
+            for (LogRecord record: hnd.records) {
+                assertEquals("Unexpected log record text.", expRecordMsg, record.getMessage());
+                assertEquals("Unexpected log level", Level.WARNING, record.getLevel());
+            }
+
+            startGrid(0);
+        }
+    }
+
+    /**
+     * Assert connections count.
+     *
+     * @param ios Map that holds connections.
+     * @param expConnCnt Expected connections count.
+     */
+    private void assertConnectionsCount(Map<UUID, JdbcThinTcpIo> ios, int expConnCnt)
+        throws IgniteInterruptedCheckedException {
+        boolean allConnectionsEstablished = GridTestUtils.waitForCondition(() -> ios.size() == expConnCnt,
+            10_000);
+
+        assertTrue("Unexpected connections count.", allConnectionsEstablished);
+    }
+
+    /**
+     * Invalidate connection to stopped node. Jdbc thin, won't detect that node has gone, until it tries to touch it.
+     * So sending simple query to randomly chosen connection(socket), sooner or later, will touch dead one,
+     * and thus invalidate it.
+     *
+     * @param conn Connections.
+     */
+    private void invalidateConnectionToStoppedNode(Connection conn) {
+        while (true) {
+            try (Statement stmt = conn.createStatement()) {
+                stmt.execute("select 1");
+            }
+            catch (SQLException e) {
+                return;
+            }
+        }
+    }
+
+    /**
+     * Simple {@code java.util.logging.Handler} implementation in order to check log records
+     * generated by {@code JdbcThinConnection}.
+     */
+    static class LogHandler extends Handler {
+
+        /** Log records. */
+        private final List<LogRecord> records = new ArrayList<>();
+
+        /** {@inheritDoc} */
+        @Override public void publish(LogRecord record) {
+            records.add(record);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public void close() {
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public void flush() {
+        }
+
+        /**
+         * @return Records.
+         */
+        @SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType") public List<LogRecord> records() {
+            return records;
+        }
+    }
+}
diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinAffinityAwarenessSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinAffinityAwarenessSelfTest.java
index 22e5d0d..f0e632c 100644
--- a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinAffinityAwarenessSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinAffinityAwarenessSelfTest.java
@@ -30,7 +30,6 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.Callable;
 import java.util.stream.Collectors;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.affinity.AffinityFunction;
@@ -133,14 +132,14 @@ public class JdbcThinAffinityAwarenessSelfTest extends JdbcThinAbstractSelfTest
      */
     @Test
     public void testExecuteQueries() throws Exception {
-        checkNodesUsage(null, "select * from Person where _key = 1", 1, 1,
+        checkNodesUsage(null, stmt, "select * from Person where _key = 1", 1, 1,
             false);
 
-        checkNodesUsage(null, "select * from Person where _key = 1 or _key = 2", 2,
+        checkNodesUsage(null, stmt,  "select * from Person where _key = 1 or _key = 2", 2,
             2, false);
 
-        checkNodesUsage(null, "select * from Person where _key in (1, 2)", 2, 2,
-            false);
+        checkNodesUsage(null, stmt, "select * from Person where _key in (1, 2)", 2,
+            2, false);
     }
 
     /**
@@ -155,7 +154,7 @@ public class JdbcThinAffinityAwarenessSelfTest extends JdbcThinAbstractSelfTest
 
         ps.setInt(1, 2);
 
-        checkNodesUsage(ps, null, 1, 1, false);
+        checkNodesUsage(ps, null, null, 1, 1, false);
 
         // Use case 2.
         ps = conn.prepareStatement("select * from Person where _key = ? or _key = ?");
@@ -164,7 +163,7 @@ public class JdbcThinAffinityAwarenessSelfTest extends JdbcThinAbstractSelfTest
 
         ps.setInt(2, 2);
 
-        checkNodesUsage(ps, null, 2, 2, false);
+        checkNodesUsage(ps, null, null, 2, 2, false);
 
         // Use case 3.
         ps = conn.prepareStatement("select * from Person where _key in (?, ?)");
@@ -173,7 +172,7 @@ public class JdbcThinAffinityAwarenessSelfTest extends JdbcThinAbstractSelfTest
 
         ps.setInt(2, 2);
 
-        checkNodesUsage(ps, null, 2, 2, false);
+        checkNodesUsage(ps, null, null, 2, 2, false);
     }
 
     /**
@@ -183,13 +182,13 @@ public class JdbcThinAffinityAwarenessSelfTest extends JdbcThinAbstractSelfTest
      */
     @Test
     public void testUpdateQueries() throws Exception {
-        checkNodesUsage(null, "update Person set firstName = 'TestFirstName' where _key = 1",
+        checkNodesUsage(null, stmt, "update Person set firstName = 'TestFirstName' where _key = 1",
             1, 1, true);
 
-        checkNodesUsage(null, "update Person set firstName = 'TestFirstName' where _key = 1 or _key = 2",
+        checkNodesUsage(null, stmt,  "update Person set firstName = 'TestFirstName' where _key = 1 or _key = 2",
             2, 2, true);
 
-        checkNodesUsage(null, "update Person set firstName = 'TestFirstName' where _key in (1, 2)",
+        checkNodesUsage(null, stmt, "update Person set firstName = 'TestFirstName' where _key in (1, 2)",
             2, 2, true);
     }
 
@@ -206,7 +205,7 @@ public class JdbcThinAffinityAwarenessSelfTest extends JdbcThinAbstractSelfTest
 
         ps.setInt(1, 2);
 
-        checkNodesUsage(ps, null, 1, 1, true);
+        checkNodesUsage(ps, null, null, 1, 1, true);
 
         // Use case 2.
         ps = conn.prepareStatement("update Person set firstName = 'TestFirstName' where _key = ? or _key = ?");
@@ -215,7 +214,7 @@ public class JdbcThinAffinityAwarenessSelfTest extends JdbcThinAbstractSelfTest
 
         ps.setInt(2, 2);
 
-        checkNodesUsage(ps, null, 2, 2, true);
+        checkNodesUsage(ps, null, null, 2, 2, true);
 
         // Use case 3.
         ps = conn.prepareStatement("update Person set firstName = 'TestFirstName' where _key in (?, ?)");
@@ -224,7 +223,7 @@ public class JdbcThinAffinityAwarenessSelfTest extends JdbcThinAbstractSelfTest
 
         ps.setInt(2, 2);
 
-        checkNodesUsage(ps, null, 2, 2, true);
+        checkNodesUsage(ps, null, null, 2, 2, true);
     }
 
     /**
@@ -235,12 +234,12 @@ public class JdbcThinAffinityAwarenessSelfTest extends JdbcThinAbstractSelfTest
     @Test
     public void testDeleteQueries() throws Exception {
         // In case of simple query like "delete from Person where _key = 1" fast update logic is used,
-        // so parition result is not calculated on the server side - nothing to check.
+        // so partition result is not calculated on the server side - nothing to check.
 
-        checkNodesUsage(null, "delete from Person where _key = 10000 or _key = 20000",
+        checkNodesUsage(null, stmt, "delete from Person where _key = 10000 or _key = 20000",
             2, 0, true);
 
-        checkNodesUsage(null, "delete from Person where _key in (10000, 20000)",
+        checkNodesUsage(null, stmt, "delete from Person where _key in (10000, 20000)",
             2, 0, true);
     }
 
@@ -252,7 +251,7 @@ public class JdbcThinAffinityAwarenessSelfTest extends JdbcThinAbstractSelfTest
     @Test
     public void testDeleteParametrizedQueries() throws Exception {
         // In case of simple query like "delete from Person where _key = ?" fast update logic is used,
-        // so parition result is not calculated on the server side - nothing to check.
+        // so partition result is not calculated on the server side - nothing to check.
 
         // Use case 1.
         PreparedStatement ps = conn.prepareStatement("delete from Person where _key = ? or _key = ?");
@@ -261,7 +260,7 @@ public class JdbcThinAffinityAwarenessSelfTest extends JdbcThinAbstractSelfTest
 
         ps.setInt(2, 2000);
 
-        checkNodesUsage(ps, null, 2, 0, true);
+        checkNodesUsage(ps, null, null, 2, 0, true);
 
         // Use case 2.
         ps = conn.prepareStatement("delete from Person where _key in (?, ?)");
@@ -270,7 +269,7 @@ public class JdbcThinAffinityAwarenessSelfTest extends JdbcThinAbstractSelfTest
 
         ps.setInt(2, 2000);
 
-        checkNodesUsage(ps, null, 2, 0, true);
+        checkNodesUsage(ps, null, null, 2, 0, true);
     }
 
     /**
@@ -352,14 +351,14 @@ public class JdbcThinAffinityAwarenessSelfTest extends JdbcThinAbstractSelfTest
 
         fillCache(cacheName);
 
-        checkNodesUsage(null,
+        checkNodesUsage(null, stmt,
             "select * from \"" + cacheName + "\".Person where _key = 1",
             1, 1, false);
     }
 
     /**
      * Check that affinity cache is invalidated in case of changing topology,
-     * detected during partions destribution retrieval.
+     * detected during partitions distribution retrieval.
      *
      * @throws Exception If failed.
      */
@@ -483,7 +482,7 @@ public class JdbcThinAffinityAwarenessSelfTest extends JdbcThinAbstractSelfTest
      * @throws Exception If failed.
      */
     @Test
-    public void testAffinityCacheStoresSchemaBindedQuries() throws Exception {
+    public void testAffinityCacheStoresSchemaBindedQueries() throws Exception {
         final String cacheName = "yacc";
 
         CacheConfiguration<Object, Object> cache = prepareCacheConfig(cacheName);
@@ -515,12 +514,12 @@ public class JdbcThinAffinityAwarenessSelfTest extends JdbcThinAbstractSelfTest
     }
 
     /**
-     * Check that affinity cache stores compacted version of partitoins destributions.
+     * Check that affinity cache stores compacted version of partitions distributions.
      *
      * @throws Exception If failed.
      */
     @Test
-    public void testAffinityCacheCompactsPartitonDestributions() throws Exception {
+    public void testAffinityCacheCompactsPartitionDistributions() throws Exception {
         final String cacheName = "yaccc";
 
         CacheConfiguration<Object, Object> cache = prepareCacheConfig(cacheName);
@@ -546,56 +545,16 @@ public class JdbcThinAffinityAwarenessSelfTest extends JdbcThinAbstractSelfTest
         assertEquals("Sql sub-cache of affinity cache has unexpected number of elements.",
             2, sqlCache.size());
 
-        assertEquals("Partitions destribution sub-cache of affinity cache has unexpected number of elements.",
+        assertEquals("Partitions distribution sub-cache of affinity cache has unexpected number of elements.",
             2, cachePartitionsDistribution.size());
 
-        // Main assertition of the test: we are checking that partitions destributions for different caches
+        // Main assertion of the test: we are checking that partitions distributions for different caches
         // are equal in therms of (==)
         assertTrue("Partitions distributions are not the same.",
             cachePartitionsDistribution.get(0) == cachePartitionsDistribution.get(1));
     }
 
     /**
-     * Check that affinity awareness works fine after reconnection.
-     *
-     * @throws Exception If failed.
-     */
-    @Test
-    public void testReconnect() throws Exception {
-        checkNodesUsage(null, "select * from Person where _key = 3", 1, 1,
-            false);
-
-        startGrid(7);
-
-        for(int i = 0; i < NODES_CNT; i++)
-            stopGrid(i);
-
-        GridTestUtils.assertThrows(log, new Callable<Object>() {
-            @Override public Object call() throws Exception {
-                stmt.execute("select * from Person where _key = 3");
-
-                return null;
-            }
-        }, SQLException.class, "Failed to communicate with Ignite cluster.");
-
-        for(int i = 0; i < NODES_CNT; i++)
-            startGrid(i);
-
-        stopGrid(4);
-        stopGrid(5);
-        stopGrid(6);
-        stopGrid(7);
-
-        stmt = conn.createStatement();
-
-        // We need this extra query to invalidate obsolete affinity cache
-        stmt.execute("select * from Person where _key = 3");
-
-        checkNodesUsage(null, "select * from Person where _key = 3", 1, 1,
-            false);
-    }
-
-    /**
      * Prepares default cache configuration with given name.
      *
      * @param cacheName Cache name.
@@ -607,6 +566,7 @@ public class JdbcThinAffinityAwarenessSelfTest extends JdbcThinAbstractSelfTest
 
         cache.setName(cacheName);
         cache.setCacheMode(PARTITIONED);
+        cache.setBackups(1);
         cache.setIndexedTypes(
             Integer.class, Person.class
         );
@@ -615,8 +575,8 @@ public class JdbcThinAffinityAwarenessSelfTest extends JdbcThinAbstractSelfTest
     }
 
     /**
-     * Utitlity method that executes given query and verifies that expeted number of records was returned.
-     * Besides that given method verified that partitoin result for corresponding query is null.
+     * Utility method that executes given query and verifies that expected number of records was returned.
+     * Besides that given method verified that partition result for corresponding query is null.
      *
      * @param sqlQry Sql query.
      * @param expRowsCnt Expected rows count.
@@ -656,8 +616,8 @@ public class JdbcThinAffinityAwarenessSelfTest extends JdbcThinAbstractSelfTest
      * @param dml Flag that signals whether we execute dml or not.
      * @throws Exception If failed.
      */
-    private void checkNodesUsage(PreparedStatement ps, String sql, int maxNodesUsedCnt, int expRowsCnt, boolean dml)
-        throws Exception {
+    private void checkNodesUsage(PreparedStatement ps, Statement stmt, String sql, int maxNodesUsedCnt, int expRowsCnt,
+        boolean dml) throws Exception {
         // Warm up an affinity cache.
         if (ps != null)
             if (dml)
@@ -729,7 +689,7 @@ public class JdbcThinAffinityAwarenessSelfTest extends JdbcThinAbstractSelfTest
                 "], got [" +  nonEmptyMetricsCntr + "]",
             nonEmptyMetricsCntr > 0 && nonEmptyMetricsCntr <= maxNodesUsedCnt);
 
-        assertEquals("Executions count doesn't match expeted value: expected [" +
+        assertEquals("Executions count doesn't match expected value: expected [" +
                 NODES_CNT * QUERY_EXECUTION_MULTIPLIER + "], got [" + qryExecutionsCntr + "]",
             NODES_CNT * QUERY_EXECUTION_MULTIPLIER, qryExecutionsCntr);
     }
diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionSelfTest.java
index 243f5c4..185fa91 100644
--- a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionSelfTest.java
@@ -286,8 +286,9 @@ public class JdbcThinConnectionSelfTest extends JdbcThinAbstractSelfTest {
      */
     @Test
     public void testSqlHints() throws Exception {
-        try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1")) {
-            assertHints(conn, false, false, false, false, false, false, affinityAwareness);
+        try (Connection conn = DriverManager.getConnection(urlWithAffinityAwarenessFlag)) {
+            assertHints(conn, false, false, false, false, false,
+                false, affinityAwareness);
         }
 
         try (Connection conn = DriverManager.getConnection(urlWithAffinityAwarenessFlag + "&distributedJoins=true")) {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/AffinityCache.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/AffinityCache.java
index d582ede..bd4dc4b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/AffinityCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/AffinityCache.java
@@ -107,7 +107,7 @@ public final class AffinityCache {
      * @param cacheId Cache Id.
      * @return Cache partitoins distribution for given cache Id or null.
      */
-    UUID[] cacheDistribution(int cacheId) {
+    public UUID[] cacheDistribution(int cacheId) {
         return cachePartitionsDistribution.get(cacheId);
     }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java
index 065bac9..971acdf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java
@@ -43,22 +43,27 @@ import java.sql.Struct;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.IdentityHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Random;
 import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
 import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.logging.Level;
 import java.util.logging.Logger;
+import java.util.stream.Collectors;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cache.query.QueryCancelledException;
 import org.apache.ignite.internal.jdbc2.JdbcUtils;
@@ -107,6 +112,12 @@ public class JdbcThinConnection implements Connection {
     /** Request timeout period. */
     private static final int REQUEST_TIMEOUT_PERIOD = 1_000;
 
+    /** Reconnection period. */
+    public static final int RECONNECTION_DELAY = 200;
+
+    /** Reconnection maximum period. */
+    private static final int RECONNECTION_MAX_DELAY = 300_000;
+
     /** Network timeout permission */
     private static final String SET_NETWORK_TIMEOUT_PERM = "setNetworkTimeout";
 
@@ -149,15 +160,12 @@ public class JdbcThinConnection implements Connection {
     /** Connection properties. */
     private final ConnectionProperties connProps;
 
-    /** Connected. */
-    private volatile boolean connected;
+    /** The amount of potentially alive {@code JdbcThinTcpIo} instances - connections to server nodes. */
+    private final AtomicInteger connCnt = new AtomicInteger();
 
     /** Tracked statements to close on disconnect. */
     private final Set<JdbcThinStatement> stmts = Collections.newSetFromMap(new IdentityHashMap<>());
 
-    /** Query timeout timer */
-    private final Timer timer;
-
     /** Affinity cache. */
     private AffinityCache affinityCache;
 
@@ -165,10 +173,7 @@ public class JdbcThinConnection implements Connection {
     private volatile JdbcThinTcpIo singleIo;
 
     /** Node Ids tp ignite endpoints. */
-    private final Map<UUID, JdbcThinTcpIo> ios = new ConcurrentHashMap<>();
-
-    /** Ignite endpoints to use for better performance in case of random access. */
-    private JdbcThinTcpIo[] iosArr;
+    private final ConcurrentSkipListMap<UUID, JdbcThinTcpIo> ios = new ConcurrentSkipListMap<>();
 
     /** Server index. */
     private int srvIdx;
@@ -188,6 +193,15 @@ public class JdbcThinConnection implements Connection {
     /** Network timeout. */
     private int netTimeout;
 
+    /** Background periodical maintenance: query timeouts and reconnection handler. */
+    private final ScheduledExecutorService maintenanceExecutor = Executors.newScheduledThreadPool(2);
+
+    /** Cancelable future for query timeout task. */
+    private ScheduledFuture<?> qryTimeoutScheduledFut;
+
+    /** Cancelable future for connections handler task. */
+    private ScheduledFuture<?> connectionsHndScheduledFut;
+
     /**
      * Creates new connection.
      *
@@ -203,32 +217,30 @@ public class JdbcThinConnection implements Connection {
 
         schema = JdbcUtils.normalizeSchema(connProps.getSchema());
 
-        timer = new Timer("query-timeout-timer");
-
         affinityAwareness = connProps.isAffinityAwareness();
 
         ensureConnected();
+
+        if (affinityAwareness)
+            connectionsHndScheduledFut = maintenanceExecutor.scheduleWithFixedDelay(new ConnectionHandlerTask(),
+                0, RECONNECTION_DELAY, TimeUnit.MILLISECONDS);
     }
 
     /**
      * @throws SQLException On connection error.
      */
     private void ensureConnected() throws SQLException {
-        if (connected)
+        if (connCnt.get() > 0)
             return;
 
         assert !closed;
 
         assert ios.isEmpty();
 
-        assert iosArr == null;
-
-        HostAndPortRange[] srvs = connProps.getAddresses();
-
         if (affinityAwareness)
-            connectInAffinityAwarenessMode(srvs);
+            connectInBestEffortAffinityMode();
         else
-            connectInCommonMode(srvs);
+            connectInCommonMode();
     }
 
     /**
@@ -445,6 +457,10 @@ public class JdbcThinConnection implements Connection {
         if (isClosed())
             return;
 
+        closed = true;
+
+        maintenanceExecutor.shutdown();
+
         if (streamState != null) {
             streamState.close();
 
@@ -457,23 +473,17 @@ public class JdbcThinConnection implements Connection {
 
         SQLException err = null;
 
-        closed = true;
-
         if (affinityAwareness) {
             for (JdbcThinTcpIo clioIo : ios.values())
                 clioIo.close();
 
             ios.clear();
-
-            iosArr = null;
         }
         else {
             if (singleIo != null)
                 singleIo.close();
         }
 
-        timer.cancel();
-
         if (err != null)
             throw err;
     }
@@ -858,7 +868,7 @@ public class JdbcThinConnection implements Connection {
         throws SQLException {
         ensureConnected();
 
-        RequestTimeoutTimerTask reqTimeoutTimerTask = null;
+        RequestTimeoutTask reqTimeoutTask = null;
 
         synchronized (mux) {
             if (ownThread != null) {
@@ -870,16 +880,18 @@ public class JdbcThinConnection implements Connection {
             ownThread = Thread.currentThread();
         }
         try {
+            JdbcThinTcpIo cliIo = null;
             try {
-                JdbcThinTcpIo cliIo = stickyIo == null ? cliIo(calculateNodeIds(req)) : stickyIo;
+                cliIo = (stickyIo == null || !stickyIo.connected()) ? cliIo(calculateNodeIds(req)) : stickyIo;
 
                 if (stmt != null && stmt.requestTimeout() != NO_TIMEOUT) {
-                    reqTimeoutTimerTask = new RequestTimeoutTimerTask(
+                    reqTimeoutTask = new RequestTimeoutTask(
                         req instanceof JdbcBulkLoadBatchRequest ? stmt.currentRequestId() : req.requestId(),
                         cliIo,
                         stmt.requestTimeout());
 
-                    timer.schedule(reqTimeoutTimerTask, 0, REQUEST_TIMEOUT_PERIOD);
+                    qryTimeoutScheduledFut = maintenanceExecutor.scheduleAtFixedRate(reqTimeoutTask, 0,
+                        REQUEST_TIMEOUT_PERIOD, TimeUnit.MILLISECONDS);
                 }
 
                 JdbcQueryExecuteRequest qryReq = null;
@@ -892,13 +904,15 @@ public class JdbcThinConnection implements Connection {
                 txIo = res.activeTransaction() ? cliIo : null;
 
                 if (res.status() == IgniteQueryErrorCode.QUERY_CANCELED && stmt != null &&
-                    stmt.requestTimeout() != NO_TIMEOUT && reqTimeoutTimerTask != null && reqTimeoutTimerTask.expired.get()) {
+                    stmt.requestTimeout() != NO_TIMEOUT && reqTimeoutTask != null &&
+                    reqTimeoutTask.expired.get()) {
 
                     throw new SQLTimeoutException(QueryCancelledException.ERR_MSG, SqlStateCode.QUERY_CANCELLED,
                         IgniteQueryErrorCode.QUERY_CANCELED);
                 }
                 else if (res.status() != ClientListenerResponse.STATUS_SUCCESS)
-                    throw new SQLException(res.error(), IgniteQueryErrorCode.codeToSqlState(res.status()), res.status());
+                    throw new SQLException(res.error(), IgniteQueryErrorCode.codeToSqlState(res.status()),
+                        res.status());
 
                 updateAffinityCache(qryReq, res);
 
@@ -908,16 +922,17 @@ public class JdbcThinConnection implements Connection {
                 throw e;
             }
             catch (Exception e) {
-                onDisconnect();
+                onDisconnect(cliIo);
 
                 if (e instanceof SocketTimeoutException)
                     throw new SQLException("Connection timed out.", SqlStateCode.CONNECTION_FAILURE, e);
                 else
-                    throw new SQLException("Failed to communicate with Ignite cluster.", SqlStateCode.CONNECTION_FAILURE, e);
+                    throw new SQLException("Failed to communicate with Ignite cluster.",
+                        SqlStateCode.CONNECTION_FAILURE, e);
             }
             finally {
-                if (stmt != null && stmt.requestTimeout() != NO_TIMEOUT && reqTimeoutTimerTask != null)
-                    reqTimeoutTimerTask.cancel();
+                if (stmt != null && stmt.requestTimeout() != NO_TIMEOUT && reqTimeoutTask != null)
+                    qryTimeoutScheduledFut.cancel(false);
             }
         }
         finally {
@@ -932,7 +947,7 @@ public class JdbcThinConnection implements Connection {
      *
      * @param req Jdbc request for which we'll try to calculate node id.
      * @return node UUID or null if failed to calculate.
-     * @throws IOException If Exception occured during the network partiton destribution retrieval.
+     * @throws IOException If Exception occurred during the network partition distribution retrieval.
      * @throws SQLException If Failed to calculate derived partitions.
      */
     @Nullable private List<UUID> calculateNodeIds(JdbcRequest req) throws IOException, SQLException {
@@ -982,12 +997,12 @@ public class JdbcThinConnection implements Connection {
     }
 
     /**
-     * Retrieve cache destribution for specified cache Id.
+     * Retrieve cache distribution for specified cache Id.
      *
      * @param cacheId Cache Id.
-     * @param partCnt Partitons count.
+     * @param partCnt Partitions count.
      * @return Partitions cache distribution.
-     * @throws IOException If Exception occured during the network partiton destribution retrieval.
+     * @throws IOException If Exception occurred during the network partition distribution retrieval.
      */
     private UUID[] retrieveCacheDistribution(int cacheId, int partCnt) throws IOException {
         UUID[] cacheDistr = affinityCache.cacheDistribution(cacheId);
@@ -997,7 +1012,8 @@ public class JdbcThinConnection implements Connection {
 
         JdbcResponse res;
 
-        res = cliIo(null).sendRequest(new JdbcCachePartitionsRequest(Collections.singleton(cacheId)), null);
+        res = cliIo(null).sendRequest(new JdbcCachePartitionsRequest(Collections.singleton(cacheId)),
+            null);
 
         assert res.status() == ClientListenerResponse.STATUS_SUCCESS;
 
@@ -1007,7 +1023,7 @@ public class JdbcThinConnection implements Connection {
             affinityCache = new AffinityCache(resAffinityVer);
         else if (affinityCache.version().compareTo(resAffinityVer) > 0) {
             // Jdbc thin affinity cache is binded to the newer affinity topology version, so we should ignore retrieved
-            // partition destribution. Given situation might occur in case of concurrent race and is not
+            // partition distribution. Given situation might occur in case of concurrent race and is not
             // possible in single-threaded jdbc thin client, so it's a reserve for the future.
             return null;
         }
@@ -1015,7 +1031,7 @@ public class JdbcThinConnection implements Connection {
         List<JdbcThinAffinityAwarenessMappingGroup> mappings =
             ((JdbcCachePartitionsResult)res.response()).getMappings();
 
-        // Despite the fact that, at this moment, we request partition destribution only for one cache,
+        // Despite the fact that, at this moment, we request partition distribution only for one cache,
         // we might retrieve multiple caches but exactly with same distribution.
         assert mappings.size() == 1;
 
@@ -1046,7 +1062,8 @@ public class JdbcThinConnection implements Connection {
                 return derivedParts.tree().apply(partResDesc.partitionClientContext(), args);
             }
             catch (IgniteCheckedException e) {
-                throw new SQLException("Failed to calculate derived partitions for query.", SqlStateCode.INTERNAL_ERROR);
+                throw new SQLException("Failed to calculate derived partitions for query.",
+                    SqlStateCode.INTERNAL_ERROR);
             }
         }
 
@@ -1061,7 +1078,7 @@ public class JdbcThinConnection implements Connection {
      * @throws SQLException On any error.
      */
     void sendQueryCancelRequest(JdbcQueryCancelRequest req, JdbcThinTcpIo cliIo) throws SQLException {
-        if (!connected)
+        if (connCnt.get() == 0)
             throw new SQLException("Failed to communicate with Ignite cluster.", SqlStateCode.CONNECTION_FAILURE);
 
         assert cliIo != null;
@@ -1082,7 +1099,8 @@ public class JdbcThinConnection implements Connection {
      * @param stickyIO Sticky ignite endpoint.
      * @throws SQLException On any error.
      */
-    private void sendRequestNotWaitResponse(JdbcOrderedBatchExecuteRequest req, JdbcThinTcpIo stickyIO) throws SQLException {
+    private void sendRequestNotWaitResponse(JdbcOrderedBatchExecuteRequest req, JdbcThinTcpIo stickyIO)
+        throws SQLException {
         ensureConnected();
 
         synchronized (mux) {
@@ -1102,12 +1120,13 @@ public class JdbcThinConnection implements Connection {
             throw e;
         }
         catch (Exception e) {
-            onDisconnect();
+            onDisconnect(stickyIO);
 
             if (e instanceof SocketTimeoutException)
                 throw new SQLException("Connection timed out.", SqlStateCode.CONNECTION_FAILURE, e);
             else
-                throw new SQLException("Failed to communicate with Ignite cluster.", SqlStateCode.CONNECTION_FAILURE, e);
+                throw new SQLException("Failed to communicate with Ignite cluster.",
+                    SqlStateCode.CONNECTION_FAILURE, e);
         }
         finally {
             synchronized (mux) {
@@ -1126,24 +1145,20 @@ public class JdbcThinConnection implements Connection {
     /**
      * Called on IO disconnect: close the client IO and opened statements.
      */
-    private void onDisconnect() {
-        if (!connected)
-            return;
+    private void onDisconnect(JdbcThinTcpIo cliIo) {
+        assert connCnt.get() > 0;
 
         if (affinityAwareness) {
-            for (JdbcThinTcpIo clioIo : ios.values())
-                clioIo.close();
-
-            ios.clear();
+            cliIo.close();
 
-            iosArr = null;
+            ios.remove(cliIo.nodeId());
         }
         else {
             if (singleIo != null)
                 singleIo.close();
         }
 
-        connected = false;
+        connCnt.decrementAndGet();
 
         if (streamState != null) {
             streamState.close0();
@@ -1157,8 +1172,6 @@ public class JdbcThinConnection implements Connection {
 
             stmts.clear();
         }
-
-        timer.cancel();
     }
 
     /**
@@ -1308,7 +1321,7 @@ public class JdbcThinConnection implements Connection {
                 if (err0 instanceof SQLException)
                     throw (SQLException)err0;
                 else {
-                    onDisconnect();
+                    onDisconnect(streamingStickyIo);
 
                     if (err0 instanceof SocketTimeoutException)
                         throw new SQLException("Connection timed out.", SqlStateCode.CONNECTION_FAILURE, err0);
@@ -1330,7 +1343,7 @@ public class JdbcThinConnection implements Connection {
         /**
          */
         void close0() {
-            if (connected) {
+            if (connCnt.get() > 0) {
                 try {
                     executeBatch(true);
                 }
@@ -1395,7 +1408,6 @@ public class JdbcThinConnection implements Connection {
      * @param nodeIds Set of node's UUIDs.
      * @return Ignite endpoint to use for request/response transferring.
      */
-    @SuppressWarnings("ZeroLengthArrayAllocation")
     private JdbcThinTcpIo cliIo(List<UUID> nodeIds) {
         if (!affinityAwareness)
             return singleIo;
@@ -1404,12 +1416,12 @@ public class JdbcThinConnection implements Connection {
             return txIo;
 
         if (nodeIds == null || nodeIds.isEmpty())
-            return iosArr[RND.nextInt(iosArr.length)];
+            return randomIo();
 
         JdbcThinTcpIo io = null;
 
         if (nodeIds.size() == 1)
-            io = ios.get(nodeIds.iterator().next());
+            io = ios.get(nodeIds.get(0));
         else {
             int initNodeId = RND.nextInt(nodeIds.size());
 
@@ -1427,7 +1439,42 @@ public class JdbcThinConnection implements Connection {
             }
         }
 
-        return io != null ? io : iosArr[RND.nextInt(iosArr.length)];
+        return io != null ? io : randomIo();
+    }
+
+    /**
+     * Returns random tcpIo, based on random UUID, generated in a custom way with the help of {@code Random}
+     * instead of {@code SecureRandom}. It's valid, cause cryptographically strong pseudo
+     * random number generator is not required in this particular case. {@code Random} is much faster
+     * than {@code SecureRandom}.
+     *
+     * @return random tcpIo
+     */
+    private JdbcThinTcpIo randomIo() {
+        byte[] randomBytes = new byte[16];
+
+        RND.nextBytes(randomBytes);
+
+        randomBytes[6]  &= 0x0f;  /* clear version        */
+        randomBytes[6]  |= 0x40;  /* set to version 4     */
+        randomBytes[8]  &= 0x3f;  /* clear variant        */
+        randomBytes[8]  |= 0x80;  /* set to IETF variant  */
+
+        long msb = 0;
+
+        long lsb = 0;
+
+        for (int i=0; i<8; i++)
+            msb = (msb << 8) | (randomBytes[i] & 0xff);
+
+        for (int i=8; i<16; i++)
+            lsb = (lsb << 8) | (randomBytes[i] & 0xff);
+
+        UUID randomUUID =  new UUID(msb, lsb);
+
+        Map.Entry<UUID, JdbcThinTcpIo> entry = ios.ceilingEntry(randomUUID);
+
+        return entry != null ? entry.getValue() : ios.floorEntry(randomUUID).getValue();
     }
 
     /**
@@ -1457,10 +1504,11 @@ public class JdbcThinConnection implements Connection {
      * Establishes a connection to ignite endpoint, trying all specified hosts and ports one by one.
      * Stops as soon as any connection is established.
      *
-     * @param srvs Ignite endpoints addresses.
      * @throws SQLException If failed to connect to ignite cluster.
      */
-    private void connectInCommonMode(HostAndPortRange[] srvs) throws SQLException {
+    private void connectInCommonMode() throws SQLException {
+        HostAndPortRange[] srvs = connProps.getAddresses();
+
         List<Exception> exceptions = null;
 
         for (int i = 0; i < srvs.length; i++) {
@@ -1481,7 +1529,7 @@ public class JdbcThinConnection implements Connection {
 
                             singleIo = cliIo;
 
-                            connected = true;
+                            connCnt.incrementAndGet();
 
                             return;
                         }
@@ -1513,7 +1561,7 @@ public class JdbcThinConnection implements Connection {
      * @throws SQLException Umbrella exception.
      */
     private void handleConnectExceptions(List<Exception> exceptions) throws SQLException {
-        if (!connected && exceptions != null) {
+        if (connCnt.get() == 0 && exceptions != null) {
             close();
 
             if (exceptions.size() == 1) {
@@ -1540,18 +1588,16 @@ public class JdbcThinConnection implements Connection {
      * Establishes a connection to ignite endpoint, trying all specified hosts and ports one by one.
      * Stops as soon as all iosArr are established.
      *
-     * @param srvs Ignite endpoints addresses.
      * @throws SQLException If failed to connect to at least one ignite endpoint,
      * or if endpoints versions are not the same.
      */
-    @SuppressWarnings("ZeroLengthArrayAllocation")
-    private void connectInAffinityAwarenessMode(HostAndPortRange[] srvs) throws SQLException {
+    private void connectInBestEffortAffinityMode() throws SQLException {
         List<Exception> exceptions = null;
 
-        IgniteProductVersion prevIgniteEnpointVer = null;
+        IgniteProductVersion prevIgniteEndpointVer = null;
 
-        for (int i = 0; i < srvs.length; i++) {
-            HostAndPortRange srv = srvs[i];
+        for (int i = 0; i < connProps.getAddresses().length; i++) {
+            HostAndPortRange srv = connProps.getAddresses()[i];
 
             try {
                 InetAddress[] addrs = InetAddress.getAllByName(srv.host());
@@ -1563,14 +1609,18 @@ public class JdbcThinConnection implements Connection {
                                 new JdbcThinTcpIo(connProps, new InetSocketAddress(addr, port), 0);
 
                             if (!cliIo.isAffinityAwarenessSupported()) {
+                                cliIo.close();
+
                                 throw new SQLException("Failed to connect to Ignite node [url=" +
                                     connProps.getUrl() + "]. address = [" + addr + ':' + port + "]." +
-                                    "Node doesn't support best affort affinity mode.",
+                                    "Node doesn't support affinity awareness mode.",
                                     SqlStateCode.INTERNAL_ERROR);
                             }
 
-                            if (prevIgniteEnpointVer != null && !prevIgniteEnpointVer.equals(cliIo.igniteVersion())) {
+                            if (prevIgniteEndpointVer != null && !prevIgniteEndpointVer.equals(cliIo.igniteVersion())) {
                                 // TODO: 13.02.19 IGNITE-11321 JDBC Thin: implement nodes multi version support.
+                                cliIo.close();
+
                                 throw new SQLException("Failed to connect to Ignite node [url=" +
                                     connProps.getUrl() + "]. address = [" + addr + ':' + port + "]." +
                                     "Different versions of nodes are not supported in affinity awareness mode.",
@@ -1579,17 +1629,18 @@ public class JdbcThinConnection implements Connection {
 
                             cliIo.timeout(netTimeout);
 
-                            JdbcThinTcpIo ioToSameNode = ios.get(cliIo.nodeId());
+                            JdbcThinTcpIo ioToSameNode = ios.putIfAbsent(cliIo.nodeId(), cliIo);
 
-                            // This can happen if the same node has several IPs.
+                            // This can happen if the same node has several IPs or if connection manager background
+                            // timer task runs concurrently.
                             if (ioToSameNode != null)
-                                ioToSameNode.close();
-
-                            ios.put(cliIo.nodeId(), cliIo);
+                                cliIo.close();
+                            else
+                                connCnt.incrementAndGet();
 
-                            connected = true;
+                            prevIgniteEndpointVer = cliIo.igniteVersion();
 
-                            prevIgniteEnpointVer = cliIo.igniteVersion();
+                            return;
                         }
                         catch (Exception exception) {
                             if (exceptions == null)
@@ -1609,14 +1660,49 @@ public class JdbcThinConnection implements Connection {
         }
 
         handleConnectExceptions(exceptions);
+    }
+
+    /**
+     * Recreates affinity cache if affinity topology version was changed and adds partition result to sql cache.
+     *
+     * @param qryReq Query request.
+     * @param res Jdbc Response.
+     */
+    private void updateAffinityCache(JdbcQueryExecuteRequest qryReq, JdbcResponse res) {
+        if (affinityAwareness) {
+            AffinityTopologyVersion resAffVer = res.affinityVersion();
+
+            if (resAffVer != null && (affinityCache == null || affinityCache.version().compareTo(resAffVer) < 0))
+                affinityCache = new AffinityCache(resAffVer);
+
+            // Partition result was requested.
+            if (res.response() instanceof JdbcQueryExecuteResult && qryReq.partitionResponseRequest()) {
+                PartitionResult partRes = ((JdbcQueryExecuteResult)res.response()).partitionResult();
+
+                if (partRes == null || affinityCache.version().equals(partRes.topologyVersion())) {
+                    int cacheId = (partRes != null && partRes.tree() != null) ?
+                        GridCacheUtils.cacheId(partRes.cacheName()) :
+                        -1;
+
+                    PartitionClientContext partClientCtx = partRes != null ?
+                        new PartitionClientContext(partRes.partitionsCount()) :
+                        null;
+
+                    QualifiedSQLQuery qry = new QualifiedSQLQuery(qryReq.schemaName(), qryReq.sqlQuery());
 
-        iosArr = ios.values().toArray(new JdbcThinTcpIo[0]);
+                    JdbcThinPartitionResultDescriptor partResDescr =
+                        new JdbcThinPartitionResultDescriptor(partRes, cacheId, partClientCtx);
+
+                    affinityCache.addSqlQuery(qry, partResDescr);
+                }
+            }
+        }
     }
 
     /**
-     * Request Timeout Timer Task
+     * Request Timeout Task
      */
-    private class RequestTimeoutTimerTask extends TimerTask {
+    private class RequestTimeoutTask implements Runnable {
         /** Request id. */
         private final long reqId;
 
@@ -1633,7 +1719,7 @@ public class JdbcThinConnection implements Connection {
          * @param reqId Request Id to cancel in case of timeout
          * @param initReqTimeout Initial request timeout
          */
-        RequestTimeoutTimerTask(long reqId, JdbcThinTcpIo stickyIO, int initReqTimeout) {
+        RequestTimeoutTask(long reqId, JdbcThinTcpIo stickyIO, int initReqTimeout) {
             this.reqId = reqId;
 
             this.stickyIO = stickyIO;
@@ -1651,7 +1737,9 @@ public class JdbcThinConnection implements Connection {
 
                     sendQueryCancelRequest(new JdbcQueryCancelRequest(reqId), stickyIO);
 
-                    cancel();
+                    qryTimeoutScheduledFut.cancel(false);
+
+                    return;
                 }
 
                 remainingQryTimeout -= REQUEST_TIMEOUT_PERIOD;
@@ -1660,45 +1748,155 @@ public class JdbcThinConnection implements Connection {
                 LOG.log(Level.WARNING,
                     "Request timeout processing failure: unable to cancel request [reqId=" + reqId + ']', e);
 
-                cancel();
+                qryTimeoutScheduledFut.cancel(false);
             }
         }
     }
 
     /**
-     * Recreates affinity cache if affinity topology version was changed and adds partition result to sql cache.
-     *
-     * @param qryReq Query request.
-     * @param res Jdbc Response.
+     * Connection Handler Task
      */
-    private void updateAffinityCache(JdbcQueryExecuteRequest qryReq, JdbcResponse res) {
-        if (affinityAwareness) {
-            AffinityTopologyVersion resAffVer = res.affinityVersion();
+    private class ConnectionHandlerTask  implements Runnable {
+        /** Map with reconnection delays. */
+        private Map<InetSocketAddress, Integer> reconnectionDelays = new HashMap<>();
 
-            if (resAffVer != null && (affinityCache == null || affinityCache.version().compareTo(resAffVer) < 0))
-                affinityCache = new AffinityCache(resAffVer);
+        /** Map with reconnection delays remainder. */
+        private Map<InetSocketAddress, Integer> reconnectionDelaysRemainder = new HashMap<>();
 
-            // Partition result was requested.
-            if (res.response() instanceof JdbcQueryExecuteResult && qryReq.partitionResponseRequest()) {
-                PartitionResult partRes = ((JdbcQueryExecuteResult)res.response()).partitionResult();
+        /** {@inheritDoc} */
+        @Override public void run() {
+            try {
+                for (Map.Entry<InetSocketAddress, Integer> delayEntry : reconnectionDelaysRemainder.entrySet())
+                    reconnectionDelaysRemainder.put(delayEntry.getKey(), delayEntry.getValue() - RECONNECTION_DELAY);
 
-                if (partRes == null || affinityCache.version().equals(partRes.topologyVersion())) {
-                    int cacheId = (partRes != null && partRes.tree() != null) ?
-                        GridCacheUtils.cacheId(partRes.cacheName()) :
-                        -1;
+                Set<InetSocketAddress> aliveSockAddrs =
+                    ios.values().stream().map(JdbcThinTcpIo::socketAddress).collect(Collectors.toSet());
 
-                    PartitionClientContext partClientCtx = partRes != null ?
-                        new PartitionClientContext(partRes.partitionsCount()) :
-                        null;
+                IgniteProductVersion prevIgniteEndpointVer = null;
 
-                    QualifiedSQLQuery qry = new QualifiedSQLQuery(qryReq.schemaName(), qryReq.sqlQuery());
+                for (int i = 0; i < connProps.getAddresses().length; i++) {
+                    HostAndPortRange srv = connProps.getAddresses()[i];
 
-                    JdbcThinPartitionResultDescriptor partResDescr =
-                        new JdbcThinPartitionResultDescriptor(partRes, cacheId, partClientCtx);
+                    try {
+                        InetAddress[] addrs = InetAddress.getAllByName(srv.host());
 
-                    affinityCache.addSqlQuery(qry, partResDescr);
+                        for (InetAddress addr : addrs) {
+                            for (int port = srv.portFrom(); port <= srv.portTo(); ++port) {
+                                InetSocketAddress sockAddr = null;
+
+                                try {
+                                    sockAddr = new InetSocketAddress(addr, port);
+
+                                    if (aliveSockAddrs.contains(sockAddr)) {
+                                        reconnectionDelaysRemainder.remove(sockAddr);
+                                        reconnectionDelays.remove(sockAddr);
+
+                                        continue;
+                                    }
+
+                                    Integer delayRemainder = reconnectionDelaysRemainder.get(sockAddr);
+
+                                    if (delayRemainder != null && delayRemainder != 0)
+                                        continue;
+
+                                    if (closed) {
+                                        maintenanceExecutor.shutdown();
+
+                                        return;
+                                    }
+
+                                    JdbcThinTcpIo cliIo =
+                                        new JdbcThinTcpIo(connProps, new InetSocketAddress(addr, port), 0);
+
+                                    if (!cliIo.isAffinityAwarenessSupported()) {
+                                        processDelay(sockAddr);
+
+                                        LOG.log(Level.WARNING, "Failed to connect to Ignite node [url=" +
+                                            connProps.getUrl() + "]. address = [" + addr + ':' + port + "]." +
+                                            "Node doesn't support best effort affinity mode.");
+
+                                        cliIo.close();
+
+                                        continue;
+                                    }
+
+                                    if (prevIgniteEndpointVer != null &&
+                                        !prevIgniteEndpointVer.equals(cliIo.igniteVersion())) {
+                                        processDelay(sockAddr);
+
+                                        LOG.log(Level.WARNING, "Failed to connect to Ignite node [url=" +
+                                            connProps.getUrl() + "]. address = [" + addr + ':' + port + "]." +
+                                            "Different versions of nodes are not supported in best " +
+                                            "effort affinity mode.");
+
+                                        cliIo.close();
+
+                                        continue;
+                                    }
+
+                                    cliIo.timeout(netTimeout);
+
+                                    JdbcThinTcpIo ioToSameNode = ios.putIfAbsent(cliIo.nodeId(), cliIo);
+
+                                    // This can happen if the same node has several IPs or if ensureConnected() runs
+                                    // concurrently
+                                    if (ioToSameNode != null)
+                                        cliIo.close();
+                                    else
+                                        connCnt.incrementAndGet();
+
+                                    prevIgniteEndpointVer = cliIo.igniteVersion();
+
+                                    if (closed) {
+                                        maintenanceExecutor.shutdown();
+
+                                        cliIo.close();
+
+                                        ios.remove(cliIo.nodeId());
+
+                                        return;
+                                    }
+                                }
+                                catch (Exception exception) {
+                                    if (sockAddr != null)
+                                        processDelay(sockAddr);
+
+                                    LOG.log(Level.WARNING, "Failed to connect to Ignite node [url=" +
+                                        connProps.getUrl() + "]. address = [" + addr + ':' + port + "].");
+                                }
+                            }
+                        }
+                    }
+                    catch (Exception exception) {
+                        LOG.log(Level.WARNING, "Failed to connect to Ignite node [url=" +
+                            connProps.getUrl() + "]. server = [" + srv + "].");
+                    }
                 }
             }
+            catch (Exception e) {
+                LOG.log(Level.WARNING, "Connection handler processing failure. Reconnection processes was stopped."
+                    , e);
+
+                connectionsHndScheduledFut.cancel(false);
+            }
+        }
+
+        /**
+         * Increase reconnection delay if needed and store it to corresponding maps.
+         *
+         * @param sockAddr Socket address.
+         */
+        private void processDelay(InetSocketAddress sockAddr) {
+            Integer delay = reconnectionDelays.get(sockAddr);
+
+            delay = delay == null ? RECONNECTION_DELAY : delay * 2;
+
+            if (delay > RECONNECTION_MAX_DELAY)
+                delay = RECONNECTION_MAX_DELAY;
+
+            reconnectionDelays.put(sockAddr, delay);
+
+            reconnectionDelaysRemainder.put(sockAddr, delay);
         }
     }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java
index 366be79..7663a80 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java
@@ -122,7 +122,7 @@ public class JdbcThinTcpIo {
     private final BufferedInputStream in;
 
     /** Connected flag. */
-    private boolean connected;
+    private volatile boolean connected;
 
     /** Ignite server version. */
     private final IgniteProductVersion igniteVer;
@@ -422,10 +422,9 @@ public class JdbcThinTcpIo {
 
         JdbcResponse resp = readResponse();
 
-        if (stmt != null && stmt.isCancelled())
-            return new JdbcResponse(IgniteQueryErrorCode.QUERY_CANCELED, QueryCancelledException.ERR_MSG);
-        else
-            return resp;
+        return stmt != null && stmt.isCancelled() ?
+            new JdbcResponse(IgniteQueryErrorCode.QUERY_CANCELED, QueryCancelledException.ERR_MSG) :
+            resp;
     }
 
     /**
@@ -650,4 +649,18 @@ public class JdbcThinTcpIo {
     public UUID nodeId() {
         return nodeId;
     }
+
+    /**
+     * @return Socket address.
+     */
+    public InetSocketAddress socketAddress() {
+        return sockAddr;
+    }
+
+    /**
+     * @return Connected flag.
+     */
+    public boolean connected() {
+        return connected;
+    }
 }