You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2018/11/20 18:04:05 UTC

[12/50] [abbrv] ignite git commit: IGNITE-10303: SQL: Moved H2 connection management logic into separate class to simplify further development. This closes #5418.

IGNITE-10303: SQL: Moved H2 connection management logic into separate class to simplify further development. This closes #5418.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8526adda
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8526adda
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8526adda

Branch: refs/heads/ignite-9720
Commit: 8526adda0bbb069f0179343234b6021e09cc390e
Parents: 16dc88f
Author: devozerov <pp...@gmail.com>
Authored: Sat Nov 17 14:09:59 2018 +0300
Committer: devozerov <pp...@gmail.com>
Committed: Sat Nov 17 14:09:59 2018 +0300

----------------------------------------------------------------------
 .../processors/query/GridQueryIndexing.java     |   2 +-
 .../processors/query/GridQueryProcessor.java    |   2 +-
 .../internal/processors/query/QueryUtils.java   |   3 +
 ...IgniteClientCacheInitializationFailTest.java |   2 +-
 .../processors/query/h2/ConnectionManager.java  | 443 ++++++++++++++++++
 .../processors/query/h2/IgniteH2Indexing.java   | 451 ++-----------------
 .../processors/query/h2/dml/UpdatePlan.java     |   2 +-
 .../query/h2/twostep/GridMapQueryExecutor.java  |   2 +-
 .../h2/twostep/GridReduceQueryExecutor.java     |   2 +-
 .../IgniteCacheQueryH2IndexingLeakTest.java     |  10 +-
 .../cache/index/H2ConnectionLeaksSelfTest.java  |   4 +-
 .../query/h2/sql/GridQueryParsingTest.java      |   2 +-
 12 files changed, 504 insertions(+), 421 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/8526adda/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
index 3eb732c..dab2516 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
@@ -330,7 +330,7 @@ public interface GridQueryIndexing {
     /**
      * Cancels all executing queries.
      */
-    public void cancelAllQueries();
+    public void onKernalStop();
 
     /**
      * Gets database schema from cache name.

http://git-wip-us.apache.org/repos/asf/ignite/blob/8526adda/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index c4f0197..3842d77 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -267,7 +267,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
         if (cancel && idx != null) {
             try {
                 while (!busyLock.tryBlock(500))
-                    idx.cancelAllQueries();
+                    idx.onKernalStop();
 
                 return;
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/8526adda/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryUtils.java
index e530ab1..6a2c22a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryUtils.java
@@ -80,6 +80,9 @@ public class QueryUtils {
     /** Schema for system view. */
     public static final String SCHEMA_SYS = "IGNITE";
 
+    /** Schema for system view. */
+    public static final String SCHEMA_INFORMATION = "INFORMATION_SCHEMA";
+
     /** Field name for key. */
     public static final String KEY_FIELD_NAME = "_KEY";
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/8526adda/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java
index ba2fec6..5432257 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java
@@ -371,7 +371,7 @@ public class IgniteClientCacheInitializationFailTest extends GridCommonAbstractT
         }
 
         /** {@inheritDoc} */
-        @Override public void cancelAllQueries() {
+        @Override public void onKernalStop() {
             // No-op
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/8526adda/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ConnectionManager.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ConnectionManager.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ConnectionManager.java
new file mode 100644
index 0000000..09400c8
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ConnectionManager.java
@@ -0,0 +1,443 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.query.IgniteSQLException;
+import org.apache.ignite.internal.processors.query.QueryUtils;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2DefaultTableEngine;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2PlainRowFactory;
+import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.h2.server.web.WebServer;
+import org.h2.tools.Server;
+import org.jetbrains.annotations.Nullable;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_H2_DEBUG_CONSOLE;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_H2_DEBUG_CONSOLE_PORT;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_H2_INDEXING_CACHE_CLEANUP_PERIOD;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_H2_INDEXING_CACHE_THREAD_USAGE_TIMEOUT;
+import static org.apache.ignite.IgniteSystemProperties.getInteger;
+import static org.apache.ignite.IgniteSystemProperties.getString;
+
+/**
+ * H2 connection manager.
+ */
+public class ConnectionManager {
+    /** Default DB options. */
+    private static final String DB_OPTIONS = ";LOCK_MODE=3;MULTI_THREADED=1;DB_CLOSE_ON_EXIT=FALSE" +
+        ";DEFAULT_LOCK_TIMEOUT=10000;FUNCTIONS_IN_SCHEMA=true;OPTIMIZE_REUSE_RESULTS=0;QUERY_CACHE_SIZE=0" +
+        ";MAX_OPERATION_MEMORY=0;BATCH_JOINS=1" +
+        ";ROW_FACTORY=\"" + GridH2PlainRowFactory.class.getName() + "\"" +
+        ";DEFAULT_TABLE_ENGINE=" + GridH2DefaultTableEngine.class.getName();
+
+    /** The period of clean up the {@link #threadConns}. */
+    private static final Long CONN_CLEANUP_PERIOD = 2000L;
+
+    /** The period of clean up the statement cache. */
+    private static final Long STMT_CLEANUP_PERIOD = Long.getLong(IGNITE_H2_INDEXING_CACHE_CLEANUP_PERIOD, 10_000);
+
+    /** The timeout to remove entry from the statement cache if the thread doesn't perform any queries. */
+    private static final Long STMT_TIMEOUT = Long.getLong(IGNITE_H2_INDEXING_CACHE_THREAD_USAGE_TIMEOUT, 600 * 1000);
+
+    /*
+     * Initialize system properties for H2.
+     */
+    static {
+        System.setProperty("h2.objectCache", "false");
+        System.setProperty("h2.serializeJavaObject", "false");
+        System.setProperty("h2.objectCacheMaxPerElementSize", "0"); // Avoid ValueJavaObject caching.
+        System.setProperty("h2.optimizeTwoEquals", "false"); // Makes splitter fail on subqueries in WHERE.
+        System.setProperty("h2.dropRestrict", "false"); // Drop schema with cascade semantics.
+    }
+
+    /** Shared connection pool. */
+    private final ThreadLocalObjectPool<H2ConnectionWrapper> connPool =
+        new ThreadLocalObjectPool<>(this::newConnectionWrapper, 5);
+
+    /** Per-thread connections. */
+    private final ConcurrentMap<Thread, H2ConnectionWrapper> threadConns = new ConcurrentHashMap<>();
+
+    /** Connection cache. */
+    private final ThreadLocal<ThreadLocalObjectPool.Reusable<H2ConnectionWrapper>> threadConn =
+        new ThreadLocal<ThreadLocalObjectPool.Reusable<H2ConnectionWrapper>>() {
+        @Override public ThreadLocalObjectPool.Reusable<H2ConnectionWrapper> get() {
+            ThreadLocalObjectPool.Reusable<H2ConnectionWrapper> reusable = super.get();
+
+            boolean reconnect = true;
+
+            try {
+                reconnect = reusable == null || reusable.object().connection().isClosed();
+            }
+            catch (SQLException e) {
+                U.warn(log, "Failed to check connection status.", e);
+            }
+
+            if (reconnect) {
+                reusable = initialValue();
+
+                set(reusable);
+            }
+
+            return reusable;
+        }
+
+        @Override protected ThreadLocalObjectPool.Reusable<H2ConnectionWrapper> initialValue() {
+            ThreadLocalObjectPool.Reusable<H2ConnectionWrapper> reusableConnection = connPool.borrow();
+
+            threadConns.put(Thread.currentThread(), reusableConnection.object());
+
+            return reusableConnection;
+        }
+    };
+
+    /** Database URL. */
+    private final String dbUrl;
+
+    /** Connection cleanup task. */
+    private final GridTimeoutProcessor.CancelableTask connCleanupTask;
+
+    /** Statement cleanup task. */
+    private final GridTimeoutProcessor.CancelableTask stmtCleanupTask;
+
+    /** H2 connection for INFORMATION_SCHEMA. Holds H2 open until node is stopped. */
+    private volatile Connection sysConn;
+
+    /** Logger. */
+    private final IgniteLogger log;
+
+    /**
+     * Constructor.
+     *
+     * @param ctx Context.
+     */
+    public ConnectionManager(GridKernalContext ctx) throws IgniteCheckedException {
+        dbUrl = "jdbc:h2:mem:" + ctx.localNodeId() + DB_OPTIONS;
+
+        log = ctx.log(ConnectionManager.class);
+
+        org.h2.Driver.load();
+
+        sysConn = connectionNoCache(QueryUtils.SCHEMA_INFORMATION);
+
+        stmtCleanupTask = ctx.timeout().schedule(new Runnable() {
+            @Override public void run() {
+                cleanupStatements();
+            }
+        }, STMT_CLEANUP_PERIOD, STMT_CLEANUP_PERIOD);
+
+        connCleanupTask = ctx.timeout().schedule(new Runnable() {
+            @Override public void run() {
+                cleanupConnections();
+            }
+        }, CONN_CLEANUP_PERIOD, CONN_CLEANUP_PERIOD);
+
+        startDebugConsole();
+    }
+
+    /**
+     * Gets DB connection.
+     *
+     * @param schema Whether to set schema for connection or not.
+     * @return DB connection.
+     */
+    public Connection connectionForThread(@Nullable String schema) {
+        H2ConnectionWrapper c = threadConn.get().object();
+
+        if (c == null)
+            throw new IgniteSQLException("Failed to get DB connection for thread (check log for details).");
+
+        if (schema != null && !F.eq(c.schema(), schema)) {
+            try {
+                c.connection().setSchema(schema);
+                c.schema(schema);
+
+                if (log.isDebugEnabled())
+                    log.debug("Set schema: " + schema);
+            }
+            catch (SQLException e) {
+                throw new IgniteSQLException("Failed to set schema for DB connection for thread [schema=" +
+                    schema + "]", e);
+            }
+        }
+
+        return c.connection();
+    }
+
+    /**
+     * @return Per-thread connections (for testing purposes only).
+     */
+    public Map<Thread, H2ConnectionWrapper> connectionsForThread() {
+        return threadConns;
+    }
+
+    /**
+     * Removes from cache and returns associated with current thread connection.
+     *
+     * @return Connection associated with current thread.
+     */
+    public ThreadLocalObjectPool.Reusable<H2ConnectionWrapper> detachThreadConnection() {
+        Thread key = Thread.currentThread();
+
+        ThreadLocalObjectPool.Reusable<H2ConnectionWrapper> reusableConnection = threadConn.get();
+
+        H2ConnectionWrapper connection = threadConns.remove(key);
+
+        threadConn.remove();
+
+        assert reusableConnection.object().connection() == connection.connection();
+
+        return reusableConnection;
+    }
+
+    /**
+     * Get connection without cache.
+     *
+     * @param schema Schema name.
+     * @return Connection.
+     */
+    public Connection connectionNoCache(String schema) throws IgniteSQLException {
+        try {
+            Connection conn = DriverManager.getConnection(dbUrl);
+
+            conn.setSchema(schema);
+
+            return conn;
+        }
+        catch (SQLException e) {
+            throw new IgniteSQLException("Failed to initialize system DB connection: " + dbUrl, e);
+        }
+    }
+
+    /**
+     * @return {@link H2StatementCache} associated with current thread.
+     */
+    public H2StatementCache statementCacheForThread() {
+        H2StatementCache statementCache = threadConn.get().object().statementCache();
+
+        statementCache.updateLastUsage();
+
+        return statementCache;
+    }
+
+    /**
+     * Execute SQL statement on specific schema.
+     *
+     * @param schema Schema
+     * @param sql SQL statement.
+     * @throws IgniteCheckedException If failed.
+     */
+    public void executeStatement(String schema, String sql) throws IgniteCheckedException {
+        Statement stmt = null;
+
+        try {
+            Connection c = connectionForThread(schema);
+
+            stmt = c.createStatement();
+
+            stmt.executeUpdate(sql);
+        }
+        catch (SQLException e) {
+            onSqlException();
+
+            throw new IgniteSQLException("Failed to execute statement: " + sql, e);
+        }
+        finally {
+            U.close(stmt, log);
+        }
+    }
+
+    /**
+     * Execute statement on H2 INFORMATION_SCHEMA.
+     *
+     * @param sql SQL statement.
+     */
+    public void executeSystemStatement(String sql) {
+        Statement stmt = null;
+
+        try {
+            stmt = sysConn.createStatement();
+
+            stmt.executeUpdate(sql);
+        }
+        catch (SQLException e) {
+            onSqlException();
+
+            throw new IgniteSQLException("Failed to execute system statement: " + sql, e);
+        }
+        finally {
+            U.close(stmt, log);
+        }
+    }
+
+    /**
+     * Clear statement cache when cache is unregistered..
+     */
+    public void onCacheUnregistered() {
+        threadConns.values().forEach(H2ConnectionWrapper::clearStatementCache);
+    }
+
+    /**
+     * Cancel all queries.
+     */
+    public void onKernalStop() {
+        for (H2ConnectionWrapper c : threadConns.values())
+            U.close(c, log);
+    }
+
+    /**
+     * Close executor.
+     */
+    public void stop() {
+        for (H2ConnectionWrapper c : threadConns.values())
+            U.close(c, log);
+
+        threadConns.clear();
+
+        try (Connection c = connectionNoCache(QueryUtils.SCHEMA_INFORMATION); Statement s = c.createStatement()) {
+            s.execute("SHUTDOWN");
+        }
+        catch (SQLException e) {
+            U.error(log, "Failed to shutdown database.", e);
+        }
+
+        if (stmtCleanupTask != null)
+            stmtCleanupTask.close();
+
+        if (connCleanupTask != null)
+            connCleanupTask.close();
+
+        if (sysConn != null) {
+            U.close(sysConn, log);
+
+            sysConn = null;
+        }
+    }
+
+    /**
+     * Handles SQL exception.
+     */
+    public void onSqlException() {
+        Connection conn = threadConn.get().object().connection();
+
+        threadConn.set(null);
+
+        if (conn != null) {
+            threadConns.remove(Thread.currentThread());
+
+            // Reset connection to receive new one at next call.
+            U.close(conn, log);
+        }
+    }
+
+    /**
+     * Start debug console if needed.
+     *
+     * @throws IgniteCheckedException If failed.
+     */
+    private void startDebugConsole() throws IgniteCheckedException {
+        try {
+            if (getString(IGNITE_H2_DEBUG_CONSOLE) != null) {
+                Connection c = DriverManager.getConnection(dbUrl);
+
+                int port = getInteger(IGNITE_H2_DEBUG_CONSOLE_PORT, 0);
+
+                WebServer webSrv = new WebServer();
+                Server web = new Server(webSrv, "-webPort", Integer.toString(port));
+                web.start();
+                String url = webSrv.addSession(c);
+
+                U.quietAndInfo(log, "H2 debug console URL: " + url);
+
+                try {
+                    Server.openBrowser(url);
+                }
+                catch (Exception e) {
+                    U.warn(log, "Failed to open browser: " + e.getMessage());
+                }
+            }
+        }
+        catch (SQLException e) {
+            throw new IgniteCheckedException(e);
+        }
+    }
+
+    /**
+     * Create new connection wrapper.
+     *
+     * @return Connection wrapper.
+     */
+    private H2ConnectionWrapper newConnectionWrapper() {
+        try {
+            return new H2ConnectionWrapper(DriverManager.getConnection(dbUrl));
+        }
+        catch (SQLException e) {
+            throw new IgniteSQLException("Failed to initialize DB connection: " + dbUrl, e);
+        }
+    }
+
+    /**
+     * Called periodically to cleanup connections.
+     */
+    private void cleanupConnections() {
+        for (Iterator<Map.Entry<Thread, H2ConnectionWrapper>> it = threadConns.entrySet().iterator(); it.hasNext(); ) {
+            Map.Entry<Thread, H2ConnectionWrapper> entry = it.next();
+
+            Thread t = entry.getKey();
+
+            if (t.getState() == Thread.State.TERMINATED) {
+                U.close(entry.getValue(), log);
+
+                it.remove();
+            }
+        }
+    }
+
+    /**
+     * Called periodically to clean up the statement cache.
+     */
+    private void cleanupStatements() {
+        long now = U.currentTimeMillis();
+
+        for (Iterator<Map.Entry<Thread, H2ConnectionWrapper>> it = threadConns.entrySet().iterator(); it.hasNext(); ) {
+            Map.Entry<Thread, H2ConnectionWrapper> entry = it.next();
+
+            Thread t = entry.getKey();
+
+            if (t.getState() == Thread.State.TERMINATED) {
+                U.close(entry.getValue(), log);
+
+                it.remove();
+            }
+            else if (now - entry.getValue().statementCache().lastUsage() > STMT_TIMEOUT)
+                entry.getValue().clearStatementCache();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/8526adda/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 58e09cb..937363a 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -22,7 +22,6 @@ import java.lang.reflect.Modifier;
 import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.sql.Connection;
-import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
@@ -112,9 +111,7 @@ import org.apache.ignite.internal.processors.query.h2.database.io.H2MvccLeafIO;
 import org.apache.ignite.internal.processors.query.h2.ddl.DdlStatementsProcessor;
 import org.apache.ignite.internal.processors.query.h2.dml.DmlUtils;
 import org.apache.ignite.internal.processors.query.h2.dml.UpdatePlan;
-import org.apache.ignite.internal.processors.query.h2.opt.GridH2DefaultTableEngine;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2IndexBase;
-import org.apache.ignite.internal.processors.query.h2.opt.GridH2PlainRowFactory;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryContext;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2Row;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor;
@@ -141,7 +138,6 @@ import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryReq
 import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitor;
 import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorClosure;
 import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorImpl;
-import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
 import org.apache.ignite.internal.sql.SqlParseException;
 import org.apache.ignite.internal.sql.SqlParser;
 import org.apache.ignite.internal.sql.SqlStrictParseException;
@@ -188,19 +184,11 @@ import org.h2.engine.Session;
 import org.h2.engine.SysProperties;
 import org.h2.index.Index;
 import org.h2.jdbc.JdbcStatement;
-import org.h2.server.web.WebServer;
 import org.h2.table.IndexColumn;
-import org.h2.tools.Server;
 import org.h2.util.JdbcUtils;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
-import static org.apache.ignite.IgniteSystemProperties.IGNITE_H2_DEBUG_CONSOLE;
-import static org.apache.ignite.IgniteSystemProperties.IGNITE_H2_DEBUG_CONSOLE_PORT;
-import static org.apache.ignite.IgniteSystemProperties.IGNITE_H2_INDEXING_CACHE_CLEANUP_PERIOD;
-import static org.apache.ignite.IgniteSystemProperties.IGNITE_H2_INDEXING_CACHE_THREAD_USAGE_TIMEOUT;
-import static org.apache.ignite.IgniteSystemProperties.getInteger;
-import static org.apache.ignite.IgniteSystemProperties.getString;
 import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.checkActive;
 import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.mvccEnabled;
 import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.tx;
@@ -237,27 +225,11 @@ public class IgniteH2Indexing implements GridQueryIndexing {
      */
     static {
         PageIO.registerH2(H2InnerIO.VERSIONS, H2LeafIO.VERSIONS, H2MvccInnerIO.VERSIONS, H2MvccLeafIO.VERSIONS);
+
         H2ExtrasInnerIO.register();
         H2ExtrasLeafIO.register();
-
-        // Initialize system properties for H2.
-        System.setProperty("h2.objectCache", "false");
-        System.setProperty("h2.serializeJavaObject", "false");
-        System.setProperty("h2.objectCacheMaxPerElementSize", "0"); // Avoid ValueJavaObject caching.
-        System.setProperty("h2.optimizeTwoEquals", "false"); // Makes splitter fail on subqueries in WHERE.
-        System.setProperty("h2.dropRestrict", "false"); // Drop schema with cascade semantics.
     }
 
-    /** Default DB options. */
-    private static final String DB_OPTIONS = ";LOCK_MODE=3;MULTI_THREADED=1;DB_CLOSE_ON_EXIT=FALSE" +
-        ";DEFAULT_LOCK_TIMEOUT=10000;FUNCTIONS_IN_SCHEMA=true;OPTIMIZE_REUSE_RESULTS=0;QUERY_CACHE_SIZE=0" +
-        ";MAX_OPERATION_MEMORY=0;BATCH_JOINS=1" +
-        ";ROW_FACTORY=\"" + GridH2PlainRowFactory.class.getName() + "\"" +
-        ";DEFAULT_TABLE_ENGINE=" + GridH2DefaultTableEngine.class.getName();
-
-        // Uncomment this setting to get debug output from H2 to sysout.
-//        ";TRACE_LEVEL_SYSTEM_OUT=3";
-
     /** Dummy metadata for update result. */
     public static final List<GridQueryFieldMetadata> UPDATE_RESULT_META = Collections.<GridQueryFieldMetadata>
         singletonList(new H2SqlFieldMetadata(null, null, "UPDATED", Long.class.getName(), -1, -1));
@@ -265,23 +237,6 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     /** */
     private static final int TWO_STEP_QRY_CACHE_SIZE = 1024;
 
-    /** The period of clean up the statement cache. */
-    private final Long CLEANUP_STMT_CACHE_PERIOD = Long.getLong(IGNITE_H2_INDEXING_CACHE_CLEANUP_PERIOD, 10_000);
-
-    /** The period of clean up the {@link #conns}. */
-    @SuppressWarnings("FieldCanBeLocal")
-    private final Long CLEANUP_CONNECTIONS_PERIOD = 2000L;
-
-    /** The timeout to remove entry from the statement cache if the thread doesn't perform any queries. */
-    private final Long STATEMENT_CACHE_THREAD_USAGE_TIMEOUT =
-        Long.getLong(IGNITE_H2_INDEXING_CACHE_THREAD_USAGE_TIMEOUT, 600 * 1000);
-
-    /** */
-    private GridTimeoutProcessor.CancelableTask stmtCacheCleanupTask;
-
-    /** */
-    private GridTimeoutProcessor.CancelableTask connCleanupTask;
-
     /** Logger. */
     @LoggerResource
     private IgniteLogger log;
@@ -296,13 +251,6 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     private final ConcurrentMap<String, H2Schema> schemas = new ConcurrentHashMap<>();
 
     /** */
-    private String dbUrl = "jdbc:h2:mem:";
-
-    /** */
-    // TODO https://issues.apache.org/jira/browse/IGNITE-9062
-    private final ConcurrentMap<Thread, H2ConnectionWrapper> conns = new ConcurrentHashMap<>();
-
-    /** */
     private GridMapQueryExecutor mapQryExec;
 
     /** */
@@ -327,43 +275,6 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     private final H2RowCacheRegistry rowCache = new H2RowCacheRegistry();
 
     /** */
-    // TODO https://issues.apache.org/jira/browse/IGNITE-9062
-    private final ThreadLocalObjectPool<H2ConnectionWrapper> connectionPool = new ThreadLocalObjectPool<>(IgniteH2Indexing.this::newConnectionWrapper, 5);
-
-    /** */
-    // TODO https://issues.apache.org/jira/browse/IGNITE-9062
-    private final ThreadLocal<ThreadLocalObjectPool.Reusable<H2ConnectionWrapper>> connCache = new ThreadLocal<ThreadLocalObjectPool.Reusable<H2ConnectionWrapper>>() {
-        @Override public ThreadLocalObjectPool.Reusable<H2ConnectionWrapper> get() {
-            ThreadLocalObjectPool.Reusable<H2ConnectionWrapper> reusable = super.get();
-
-            boolean reconnect = true;
-
-            try {
-                reconnect = reusable == null || reusable.object().connection().isClosed();
-            }
-            catch (SQLException e) {
-                U.warn(log, "Failed to check connection status.", e);
-            }
-
-            if (reconnect) {
-                reusable = initialValue();
-
-                set(reusable);
-            }
-
-            return reusable;
-        }
-
-        @Override protected ThreadLocalObjectPool.Reusable<H2ConnectionWrapper> initialValue() {
-            ThreadLocalObjectPool.Reusable<H2ConnectionWrapper> reusableConnection = connectionPool.borrow();
-
-            conns.put(Thread.currentThread(), reusableConnection.object());
-
-            return reusableConnection;
-        }
-    };
-
-    /** */
     protected volatile GridKernalContext ctx;
 
     /** Cache object value context. */
@@ -394,8 +305,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         }
     };
 
-    /** H2 JDBC connection for INFORMATION_SCHEMA. Holds H2 open until node is stopped. */
-    private Connection sysConn;
+    /** Query executor. */
+    private ConnectionManager connMgr;
 
     /**
      * @return Kernal context.
@@ -405,48 +316,6 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     }
 
     /**
-     * @param schema Schema.
-     * @return Connection.
-     */
-    public Connection connectionForSchema(String schema) {
-        try {
-            return connectionForThread(schema);
-        }
-        catch (IgniteCheckedException e) {
-            throw new IgniteException(e);
-        }
-    }
-
-    /**
-     * @return H2 JDBC connection to INFORMATION_SCHEMA.
-     */
-    private Connection systemConnection() {
-        assert Thread.holdsLock(schemaMux);
-
-        if (sysConn == null) {
-            try {
-                sysConn = DriverManager.getConnection(dbUrl);
-
-                sysConn.setSchema("INFORMATION_SCHEMA");
-            }
-            catch (SQLException e) {
-                throw new IgniteSQLException("Failed to initialize system DB connection: " + dbUrl, e);
-            }
-        }
-
-        return sysConn;
-    }
-
-    /** */
-    private H2ConnectionWrapper newConnectionWrapper() {
-        try {
-            return new H2ConnectionWrapper(DriverManager.getConnection(dbUrl));
-        } catch (SQLException e) {
-            throw new IgniteSQLException("Failed to initialize DB connection: " + dbUrl, e);
-        }
-    }
-
-    /**
      * @param c Connection.
      * @param sql SQL.
      * @return <b>Cached</b> prepared statement.
@@ -489,7 +358,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         assert useStmtCache || !cachedOnly;
 
         if (useStmtCache) {
-            H2StatementCache cache = getStatementsCacheForCurrentThread();
+            H2StatementCache cache = connMgr.statementCacheForThread();
 
             H2CachedStatementKey key = new H2CachedStatementKey(c.getSchema(), sql);
 
@@ -538,63 +407,14 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             return c.prepareStatement(sql, ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY);
     }
 
-    /**
-     * @return {@link H2StatementCache} associated with current thread.
-     */
-    @NotNull private H2StatementCache getStatementsCacheForCurrentThread() {
-        H2StatementCache statementCache = connCache.get().object().statementCache();
-
-        statementCache.updateLastUsage();
-
-        return statementCache;
-    }
-
     /** {@inheritDoc} */
     @Override public PreparedStatement prepareNativeStatement(String schemaName, String sql) {
-        Connection conn = connectionForSchema(schemaName);
+        Connection conn = connMgr.connectionForThread(schemaName);
 
         return prepareStatementAndCaches(conn, sql);
     }
 
     /**
-     * Gets DB connection.
-     *
-     * @param schema Whether to set schema for connection or not.
-     * @return DB connection.
-     * @throws IgniteCheckedException In case of error.
-     */
-    private Connection connectionForThread(@Nullable String schema) throws IgniteCheckedException {
-        H2ConnectionWrapper c = connCache.get().object();
-
-        if (c == null)
-            throw new IgniteCheckedException("Failed to get DB connection for thread (check log for details).");
-
-        if (schema != null && !F.eq(c.schema(), schema)) {
-            Statement stmt = null;
-
-            try {
-                stmt = c.connection().createStatement();
-
-                stmt.executeUpdate("SET SCHEMA " + H2Utils.withQuotes(schema));
-
-                if (log.isDebugEnabled())
-                    log.debug("Set schema: " + schema);
-
-                c.schema(schema);
-            }
-            catch (SQLException e) {
-                throw new IgniteSQLException("Failed to set schema for DB connection for thread [schema=" +
-                    schema + "]", e);
-            }
-            finally {
-                U.close(stmt, log);
-            }
-        }
-
-        return c.connection();
-    }
-
-    /**
      * Create and register schema if needed.
      *
      * @param schemaName Schema name.
@@ -642,7 +462,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
      * @param schema Schema name.
      */
     private void createSchema0(String schema) {
-        executeSystemStatement("CREATE SCHEMA IF NOT EXISTS " + H2Utils.withQuotes(schema));
+        connMgr.executeSystemStatement("CREATE SCHEMA IF NOT EXISTS " + H2Utils.withQuotes(schema));
 
         if (log.isDebugEnabled())
             log.debug("Created H2 schema for index database: " + schema);
@@ -654,62 +474,13 @@ public class IgniteH2Indexing implements GridQueryIndexing {
      * @param schema Schema name.
      */
     private void dropSchema(String schema) {
-        executeSystemStatement("DROP SCHEMA IF EXISTS " + H2Utils.withQuotes(schema));
+        connMgr.executeSystemStatement("DROP SCHEMA IF EXISTS " + H2Utils.withQuotes(schema));
 
         if (log.isDebugEnabled())
             log.debug("Dropped H2 schema for index database: " + schema);
     }
 
     /**
-     * @param schema Schema
-     * @param sql SQL statement.
-     * @throws IgniteCheckedException If failed.
-     */
-    public void executeStatement(String schema, String sql) throws IgniteCheckedException {
-        Statement stmt = null;
-
-        try {
-            Connection c = connectionForThread(schema);
-
-            stmt = c.createStatement();
-
-            stmt.executeUpdate(sql);
-        }
-        catch (SQLException e) {
-            onSqlException();
-
-            throw new IgniteSQLException("Failed to execute statement: " + sql, e);
-        }
-        finally {
-            U.close(stmt, log);
-        }
-    }
-
-    /**
-     * Execute statement on H2 INFORMATION_SCHEMA.
-     * @param sql SQL statement.
-     */
-    public void executeSystemStatement(String sql) {
-        assert Thread.holdsLock(schemaMux);
-
-        Statement stmt = null;
-
-        try {
-            stmt = systemConnection().createStatement();
-
-            stmt.executeUpdate(sql);
-        }
-        catch (SQLException e) {
-            onSqlException();
-
-            throw new IgniteSQLException("Failed to execute statement: " + sql, e);
-        }
-        finally {
-            U.close(stmt, log);
-        }
-    }
-
-    /**
      * Binds object to prepared statement.
      *
      * @param stmt SQL statement.
@@ -734,22 +505,6 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         }
     }
 
-    /**
-     * Handles SQL exception.
-     */
-    private void onSqlException() {
-        Connection conn = connCache.get().object().connection();
-
-        connCache.set(null);
-
-        if (conn != null) {
-            conns.remove(Thread.currentThread());
-
-            // Reset connection to receive new one at next call.
-            U.close(conn, log);
-        }
-    }
-
     /** {@inheritDoc} */
     @Override public void store(GridCacheContext cctx,
         GridQueryTypeDescriptor type,
@@ -811,7 +566,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         if (log.isDebugEnabled())
             log.debug("Removing query index table: " + tbl.fullTableName());
 
-        Connection c = connectionForThread(tbl.schemaName());
+        Connection c = connMgr.connectionForThread(tbl.schemaName());
 
         Statement stmt = null;
 
@@ -826,7 +581,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             stmt.executeUpdate(sql);
         }
         catch (SQLException e) {
-            onSqlException();
+            connMgr.onSqlException();
 
             throw new IgniteSQLException("Failed to drop database index table [type=" + tbl.type().name() +
                 ", table=" + tbl.fullTableName() + "]", IgniteQueryErrorCode.TABLE_DROP_FAILED, e);
@@ -970,7 +725,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
      */
     private void executeSql(String schemaName, String sql) throws IgniteCheckedException {
         try {
-            Connection conn = connectionForSchema(schemaName);
+            Connection conn = connMgr.connectionForThread(schemaName);
 
             try (PreparedStatement stmt = prepareStatement(conn, sql, false)) {
                 stmt.execute();
@@ -1081,7 +836,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         assert mvccEnabled || mvccTracker == null;
 
         try {
-            final Connection conn = connectionForSchema(schemaName);
+            final Connection conn = connMgr.connectionForThread(schemaName);
 
             H2Utils.setupConnection(conn, false, enforceJoinOrder);
 
@@ -1283,9 +1038,9 @@ public class IgniteH2Indexing implements GridQueryIndexing {
      */
     public static int operationTimeout(int qryTimeout, IgniteTxAdapter tx) {
         if (tx != null) {
-            int tm1 = (int)tx.remainingTime(), tm2 = qryTimeout;
+            int remaining = (int)tx.remainingTime();
 
-            return tm1 > 0 && tm2 > 0 ? Math.min(tm1, tm2) : Math.max(tm1, tm2);
+            return remaining > 0 && qryTimeout > 0 ? Math.min(remaining, qryTimeout) : Math.max(remaining, qryTimeout);
         }
 
         return qryTimeout;
@@ -1294,7 +1049,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     /** {@inheritDoc} */
     @Override public long streamUpdateQuery(String schemaName, String qry,
         @Nullable Object[] params, IgniteDataStreamer<?, ?> streamer) throws IgniteCheckedException {
-        final Connection conn = connectionForSchema(schemaName);
+        final Connection conn = connMgr.connectionForThread(schemaName);
 
         final PreparedStatement stmt;
 
@@ -1318,7 +1073,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             return zeroBatchedStreamedUpdateResult(params.size());
         }
 
-        final Connection conn = connectionForSchema(schemaName);
+        final Connection conn = connMgr.connectionForThread(schemaName);
 
         final PreparedStatement stmt = prepareStatementAndCaches(conn, qry);
 
@@ -1512,7 +1267,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             return rs;
         }
         catch (SQLException e) {
-            onSqlException();
+            connMgr.onSqlException();
 
             throw new IgniteCheckedException(e);
         }
@@ -1728,6 +1483,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     }
 
     /** {@inheritDoc} */
+    @SuppressWarnings("deprecation")
     @Override public SqlFieldsQuery generateFieldsQuery(String cacheName, SqlQuery qry) {
         String schemaName = schema(cacheName);
 
@@ -2096,7 +1852,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
                 // Second, let's check if we already have a parsed statement...
                 PreparedStatement cachedStmt;
 
-                if ((cachedStmt = cachedStatement(connectionForSchema(schemaName), qry.getSql())) != null) {
+                if ((cachedStmt = cachedStatement(connMgr.connectionForThread(schemaName), qry.getSql())) != null) {
                     Prepared prepared = GridSqlQueryParser.prepared(cachedStmt);
 
                     // We may use this cached statement only for local queries and non queries.
@@ -2189,7 +1945,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         if (!prepared.isQuery()) {
             if (DmlStatementsProcessor.isDmlStatement(prepared)) {
                 try {
-                    Connection conn = connectionForSchema(schemaName);
+                    Connection conn = connMgr.connectionForThread(schemaName);
 
                     if (!loc)
                         return dmlProc.updateSqlFieldsDistributed(schemaName, conn, prepared, qry, cancel);
@@ -2287,7 +2043,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
      *     metadata for two-step query (if needed), evaluated query local execution flag.
      */
     private ParsingResult parseAndSplit(String schemaName, SqlFieldsQuery qry, int firstArg) {
-        Connection c = connectionForSchema(schemaName);
+        Connection c = connMgr.connectionForThread(schemaName);
 
         // For queries that are explicitly local, we rely on the flag specified in the query
         // because this parsing result will be cached and used for queries directly.
@@ -2379,7 +2135,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
         // Let's not cache multiple statements and distributed queries as whole two step query will be cached later on.
         if (remainingSql != null || hasTwoStep)
-            getStatementsCacheForCurrentThread().remove(schemaName, qry.getSql());
+            connMgr.statementCacheForThread().remove(schemaName, qry.getSql());
 
         if (!hasTwoStep)
             return new ParsingResult(prepared, newQry, remainingSql, null, null, null);
@@ -2448,7 +2204,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
      */
     private GridCacheTwoStepQuery split(Prepared prepared, SqlFieldsQuery qry) throws IgniteCheckedException,
         SQLException {
-        GridCacheTwoStepQuery res = GridSqlQuerySplitter.split(connectionForThread(qry.getSchema()), prepared,
+        GridCacheTwoStepQuery res = GridSqlQuerySplitter.split(connMgr.connectionForThread(qry.getSchema()), prepared,
             qry.getArgs(), qry.isCollocated(), qry.isDistributedJoins(), qry.isEnforceJoinOrder(), this);
 
         List<Integer> cacheIds = collectCacheIds(null, res);
@@ -2474,6 +2230,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
      * @param qry Sql fields query.autoStartTx(qry)
      * @return {@code True} if need to start transaction.
      */
+    @SuppressWarnings("SimplifiableIfStatement")
     public boolean autoStartTx(SqlFieldsQuery qry) {
         if (!mvccEnabled(ctx))
             return false;
@@ -2512,7 +2269,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             loc = false;
         }
 
-        Connection conn = connectionForSchema(schema);
+        Connection conn = connMgr.connectionForThread(schema);
 
         H2Utils.setupConnection(conn, false, fldsQry.isEnforceJoinOrder());
 
@@ -2619,7 +2376,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
      */
     public UpdateResult mapDistributedUpdate(String schemaName, SqlFieldsQuery fldsQry, IndexingQueryFilter filter,
         GridQueryCancel cancel, boolean local) throws IgniteCheckedException {
-        Connection conn = connectionForSchema(schemaName);
+        Connection conn = connMgr.connectionForThread(schemaName);
 
         H2Utils.setupConnection(conn, false, fldsQry.isEnforceJoinOrder());
 
@@ -2703,14 +2460,14 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         H2TableDescriptor tbl = new H2TableDescriptor(this, schema, type, cctx, isSql);
 
         try {
-            Connection conn = connectionForThread(schemaName);
+            Connection conn = connMgr.connectionForThread(schemaName);
 
             createTable(schemaName, schema, tbl, conn);
 
             schema.add(tbl);
         }
         catch (SQLException e) {
-            onSqlException();
+            connMgr.onSqlException();
 
             throw new IgniteCheckedException("Failed to register query type: " + type, e);
         }
@@ -2938,62 +2695,6 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     }
 
     /**
-     * Called periodically by {@link GridTimeoutProcessor} to clean up the statement cache.
-     */
-    private void cleanupStatementCache() {
-        long now = U.currentTimeMillis();
-
-        for (Iterator<Map.Entry<Thread, H2ConnectionWrapper>> it = conns.entrySet().iterator(); it.hasNext(); ) {
-            Map.Entry<Thread, H2ConnectionWrapper> entry = it.next();
-
-            Thread t = entry.getKey();
-
-            if (t.getState() == Thread.State.TERMINATED) {
-                U.close(entry.getValue(), log);
-
-                it.remove();
-            }
-            else if (now - entry.getValue().statementCache().lastUsage() > STATEMENT_CACHE_THREAD_USAGE_TIMEOUT)
-                entry.getValue().clearStatementCache();
-        }
-    }
-
-    /**
-     * Called periodically by {@link GridTimeoutProcessor} to clean up the {@link #conns}.
-     */
-    private void cleanupConnections() {
-        for (Iterator<Map.Entry<Thread, H2ConnectionWrapper>> it = conns.entrySet().iterator(); it.hasNext(); ) {
-            Map.Entry<Thread, H2ConnectionWrapper> entry = it.next();
-
-            Thread t = entry.getKey();
-
-            if (t.getState() == Thread.State.TERMINATED) {
-                U.close(entry.getValue(), log);
-
-                it.remove();
-            }
-        }
-    }
-
-    /**
-     * Removes from cache and returns associated with current thread connection.
-     * @return Connection associated with current thread.
-     */
-    public ThreadLocalObjectPool.Reusable<H2ConnectionWrapper> detach() {
-        Thread key = Thread.currentThread();
-
-        ThreadLocalObjectPool.Reusable<H2ConnectionWrapper> reusableConnection = connCache.get();
-
-        H2ConnectionWrapper connection = conns.remove(key);
-
-        connCache.remove();
-
-        assert reusableConnection.object().connection() == connection.connection();
-
-        return reusableConnection;
-    }
-
-    /**
      * Rebuild indexes from hash index.
      *
      * @param cacheName Cache name.
@@ -3060,36 +2761,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             SysProperties.serializeJavaObject = false;
         }
 
-        String dbName = (ctx != null ? ctx.localNodeId() : UUID.randomUUID()).toString();
-
-        dbUrl = "jdbc:h2:mem:" + dbName + DB_OPTIONS;
-
-        org.h2.Driver.load();
-
-        try {
-            if (getString(IGNITE_H2_DEBUG_CONSOLE) != null) {
-                Connection c = DriverManager.getConnection(dbUrl);
-
-                int port = getInteger(IGNITE_H2_DEBUG_CONSOLE_PORT, 0);
-
-                WebServer webSrv = new WebServer();
-                Server web = new Server(webSrv, "-webPort", Integer.toString(port));
-                web.start();
-                String url = webSrv.addSession(c);
-
-                U.quietAndInfo(log, "H2 debug console URL: " + url);
-
-                try {
-                    Server.openBrowser(url);
-                }
-                catch (Exception e) {
-                    U.warn(log, "Failed to open browser: " + e.getMessage());
-                }
-            }
-        }
-        catch (SQLException e) {
-            throw new IgniteCheckedException(e);
-        }
+        connMgr = new ConnectionManager(ctx);
 
         if (ctx == null) {
             // This is allowed in some tests.
@@ -3129,12 +2801,6 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             mapQryExec.start(ctx, this);
             rdcQryExec.start(ctx, this);
 
-            stmtCacheCleanupTask = ctx.timeout().schedule(new Runnable() {
-                @Override public void run() {
-                    cleanupStatementCache();
-                }
-            }, CLEANUP_STMT_CACHE_PERIOD, CLEANUP_STMT_CACHE_PERIOD);
-
             dmlProc = new DmlStatementsProcessor();
             ddlProc = new DdlStatementsProcessor();
 
@@ -3150,17 +2816,14 @@ public class IgniteH2Indexing implements GridQueryIndexing {
                         createSchema0(QueryUtils.SCHEMA_SYS);
                     }
 
-                    Connection c = connectionForSchema(QueryUtils.SCHEMA_SYS);
-
-                    for (SqlSystemView view : systemViews(ctx))
-                        SqlSystemTableEngine.registerView(c, view);
+                    try (Connection c = connMgr.connectionNoCache(QueryUtils.SCHEMA_SYS)) {
+                        for (SqlSystemView view : systemViews(ctx))
+                            SqlSystemTableEngine.registerView(c, view);
+                    }
                 }
                 catch (SQLException e) {
                     throw new IgniteCheckedException("Failed to register system view.", e);
                 }
-
-                // Caching this connection in ThreadLocal may lead to memory leaks.
-                connCache.set(null);
             }
             else {
                 if (log.isDebugEnabled())
@@ -3175,12 +2838,6 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         JdbcUtils.serializer = h2Serializer();
 
         assert ctx != null;
-
-        connCleanupTask = ctx.timeout().schedule(new Runnable() {
-            @Override public void run() {
-                cleanupConnections();
-            }
-        }, CLEANUP_CONNECTIONS_PERIOD, CLEANUP_CONNECTIONS_PERIOD);
     }
 
     /**
@@ -3348,7 +3005,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
                         " FOR \"") +
                         cls.getName() + '.' + m.getName() + '"';
 
-                    executeStatement(schema, clause);
+                    connMgr.executeStatement(schema, clause);
                 }
             }
         }
@@ -3361,39 +3018,18 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
         mapQryExec.cancelLazyWorkers();
 
-        for (H2ConnectionWrapper c : conns.values())
-            U.close(c, log);
-
-        conns.clear();
         schemas.clear();
         cacheName2schema.clear();
 
-        try (Connection c = DriverManager.getConnection(dbUrl); Statement s = c.createStatement()) {
-            s.execute("SHUTDOWN");
-        }
-        catch (SQLException e) {
-            U.error(log, "Failed to shutdown database.", e);
-        }
-
-        if (stmtCacheCleanupTask != null)
-            stmtCacheCleanupTask.close();
-
-        if (connCleanupTask != null)
-            connCleanupTask.close();
-
         GridH2QueryContext.clearLocalNodeStop(nodeId);
 
-        if (log.isDebugEnabled())
-            log.debug("Cache query index stopped.");
-
         // Close system H2 connection to INFORMATION_SCHEMA
         synchronized (schemaMux) {
-            if (sysConn != null) {
-                U.close(sysConn, log);
-
-                sysConn = null;
-            }
+            connMgr.stop();
         }
+
+        if (log.isDebugEnabled())
+            log.debug("Cache query index stopped.");
     }
 
     /** {@inheritDoc} */
@@ -3471,7 +3107,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
                 }
             }
 
-            conns.values().forEach(H2ConnectionWrapper::clearStatementCache);
+            connMgr.onCacheUnregistered();
 
             for (H2TableDescriptor tbl : rmvTbls) {
                 for (Index idx : tbl.table().getIndexes())
@@ -3628,18 +3264,17 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     }
 
     /** {@inheritDoc} */
-    @Override public void cancelAllQueries() {
+    @Override public void onKernalStop() {
         mapQryExec.cancelLazyWorkers();
 
-        for (H2ConnectionWrapper c : conns.values())
-            U.close(c, log);
+        connMgr.onKernalStop();
     }
 
     /**
-     * @return Per-thread connections.
+     * @return Query executor.
      */
-    public Map<Thread, ?> perThreadConnections() {
-        return conns;
+    public ConnectionManager connections() {
+        return connMgr;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/8526adda/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java
index ba4b12b..0f8b6d8 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java
@@ -647,7 +647,7 @@ public final class UpdatePlan {
 
         /** {@inheritDoc} */
         @Override public void beforeDetach() {
-            ThreadLocalObjectPool.Reusable<H2ConnectionWrapper> conn0 = conn = idx.detach();
+            ThreadLocalObjectPool.Reusable<H2ConnectionWrapper> conn0 = conn = idx.connections().detachThreadConnection();
 
             if (isClosed())
                 conn0.recycle();

http://git-wip-us.apache.org/repos/asf/ignite/blob/8526adda/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index c2ea9a8..569cb60 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -875,7 +875,7 @@ public class GridMapQueryExecutor {
                 .mvccSnapshot(mvccSnapshot)
                 .lazyWorker(worker);
 
-            Connection conn = h2.connectionForSchema(schemaName);
+            Connection conn = h2.connections().connectionForThread(schemaName);
 
             H2Utils.setupConnection(conn, distributedJoinMode != OFF, enforceJoinOrder);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/8526adda/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 87c8ce9..36287b3 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -656,7 +656,7 @@ public class GridReduceQueryExecutor {
             }
 
             final ReduceQueryRun r = new ReduceQueryRun(qryReqId, qry.originalSql(), schemaName,
-                h2.connectionForSchema(schemaName), qry.mapQueries().size(), qry.pageSize(),
+                h2.connections().connectionForThread(schemaName), qry.mapQueries().size(), qry.pageSize(),
                 U.currentTimeMillis(), sfuFut, cancel);
 
             Collection<ClusterNode> nodes;

http://git-wip-us.apache.org/repos/asf/ignite/blob/8526adda/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryH2IndexingLeakTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryH2IndexingLeakTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryH2IndexingLeakTest.java
index 14593de..9458887 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryH2IndexingLeakTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryH2IndexingLeakTest.java
@@ -17,7 +17,7 @@
 
 package org.apache.ignite.internal.processors.cache;
 
-import java.util.concurrent.ConcurrentMap;
+import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.ignite.IgniteCache;
@@ -119,9 +119,10 @@ public class IgniteCacheQueryH2IndexingLeakTest extends GridCommonAbstractTest {
      * @return size of statement cache.
      */
     private static int getStatementCacheSize(GridQueryProcessor qryProcessor) {
-        IgniteH2Indexing h2Idx = GridTestUtils.getFieldValue(qryProcessor, GridQueryProcessor.class, "idx");
 
-        ConcurrentMap<Thread, H2ConnectionWrapper> conns = GridTestUtils.getFieldValue(h2Idx, IgniteH2Indexing.class, "conns");
+        IgniteH2Indexing h2Idx = (IgniteH2Indexing)qryProcessor.getIndexing();
+
+        Map<Thread, H2ConnectionWrapper> conns = h2Idx.connections().connectionsForThread();
 
         int cntr = 0;
 
@@ -145,10 +146,10 @@ public class IgniteCacheQueryH2IndexingLeakTest extends GridCommonAbstractTest {
             // Open iterator on the created cursor: add entries to the cache.
             IgniteInternalFuture<?> fut = multithreadedAsync(
                 new CAX() {
+                    @SuppressWarnings("unchecked")
                     @Override public void applyx() throws IgniteCheckedException {
                         while (!stop.get()) {
                             c.query(new SqlQuery(Integer.class, "_val >= 0")).getAll();
-
                             c.query(new SqlQuery(Integer.class, "_val >= 1")).getAll();
                         }
                     }
@@ -197,6 +198,7 @@ public class IgniteCacheQueryH2IndexingLeakTest extends GridCommonAbstractTest {
             // Open iterator on the created cursor: add entries to the cache
             IgniteInternalFuture<?> fut = multithreadedAsync(
                 new CAX() {
+                    @SuppressWarnings("unchecked")
                     @Override public void applyx() throws IgniteCheckedException {
                         c.query(new SqlQuery(Integer.class, "_val >= 0")).getAll();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/8526adda/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2ConnectionLeaksSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2ConnectionLeaksSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2ConnectionLeaksSelfTest.java
index 7713004..d8d6735 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2ConnectionLeaksSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2ConnectionLeaksSelfTest.java
@@ -108,7 +108,7 @@ public class H2ConnectionLeaksSelfTest extends GridCommonAbstractTest {
                     try {
                         IgniteH2Indexing idx = (IgniteH2Indexing)grid(1).context().query().getIndexing();
 
-                        idx.executeStatement(CACHE_NAME, "select *");
+                        idx.connections().executeStatement(CACHE_NAME, "select *");
                     }
                     catch (Exception e) {
                         // No-op.
@@ -172,6 +172,6 @@ public class H2ConnectionLeaksSelfTest extends GridCommonAbstractTest {
      * @return Per-thread connections.
      */
     private Map<Thread, ?> perThreadConnections(int nodeIdx) {
-        return ((IgniteH2Indexing)grid(nodeIdx).context().query().getIndexing()).perThreadConnections();
+        return ((IgniteH2Indexing)grid(nodeIdx).context().query().getIndexing()).connections().connectionsForThread();
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/8526adda/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/GridQueryParsingTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/GridQueryParsingTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/GridQueryParsingTest.java
index ca24e57..1a4b248 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/GridQueryParsingTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/GridQueryParsingTest.java
@@ -1008,7 +1008,7 @@ public class GridQueryParsingTest extends GridCommonAbstractTest {
 
         String schemaName = idx.schema(DEFAULT_CACHE_NAME);
 
-        return (JdbcConnection)idx.connectionForSchema(schemaName);
+        return (JdbcConnection)idx.connections().connectionForThread(schemaName);
     }
 
     /**