You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2018/11/20 18:04:05 UTC
[12/50] [abbrv] ignite git commit: IGNITE-10303: SQL: Moved H2
connection management logic into separate class to simplify further
development. This closes #5418.
IGNITE-10303: SQL: Moved H2 connection management logic into separate class to simplify further development. This closes #5418.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8526adda
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8526adda
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8526adda
Branch: refs/heads/ignite-9720
Commit: 8526adda0bbb069f0179343234b6021e09cc390e
Parents: 16dc88f
Author: devozerov <pp...@gmail.com>
Authored: Sat Nov 17 14:09:59 2018 +0300
Committer: devozerov <pp...@gmail.com>
Committed: Sat Nov 17 14:09:59 2018 +0300
----------------------------------------------------------------------
.../processors/query/GridQueryIndexing.java | 2 +-
.../processors/query/GridQueryProcessor.java | 2 +-
.../internal/processors/query/QueryUtils.java | 3 +
...IgniteClientCacheInitializationFailTest.java | 2 +-
.../processors/query/h2/ConnectionManager.java | 443 ++++++++++++++++++
.../processors/query/h2/IgniteH2Indexing.java | 451 ++-----------------
.../processors/query/h2/dml/UpdatePlan.java | 2 +-
.../query/h2/twostep/GridMapQueryExecutor.java | 2 +-
.../h2/twostep/GridReduceQueryExecutor.java | 2 +-
.../IgniteCacheQueryH2IndexingLeakTest.java | 10 +-
.../cache/index/H2ConnectionLeaksSelfTest.java | 4 +-
.../query/h2/sql/GridQueryParsingTest.java | 2 +-
12 files changed, 504 insertions(+), 421 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/8526adda/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
index 3eb732c..dab2516 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
@@ -330,7 +330,7 @@ public interface GridQueryIndexing {
/**
* Cancels all executing queries.
*/
- public void cancelAllQueries();
+ public void onKernalStop();
/**
* Gets database schema from cache name.
http://git-wip-us.apache.org/repos/asf/ignite/blob/8526adda/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index c4f0197..3842d77 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -267,7 +267,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
if (cancel && idx != null) {
try {
while (!busyLock.tryBlock(500))
- idx.cancelAllQueries();
+ idx.onKernalStop();
return;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8526adda/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryUtils.java
index e530ab1..6a2c22a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryUtils.java
@@ -80,6 +80,9 @@ public class QueryUtils {
/** Schema for system view. */
public static final String SCHEMA_SYS = "IGNITE";
+ /** Schema for system view. */
+ public static final String SCHEMA_INFORMATION = "INFORMATION_SCHEMA";
+
/** Field name for key. */
public static final String KEY_FIELD_NAME = "_KEY";
http://git-wip-us.apache.org/repos/asf/ignite/blob/8526adda/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java
index ba2fec6..5432257 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java
@@ -371,7 +371,7 @@ public class IgniteClientCacheInitializationFailTest extends GridCommonAbstractT
}
/** {@inheritDoc} */
- @Override public void cancelAllQueries() {
+ @Override public void onKernalStop() {
// No-op
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8526adda/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ConnectionManager.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ConnectionManager.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ConnectionManager.java
new file mode 100644
index 0000000..09400c8
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ConnectionManager.java
@@ -0,0 +1,443 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.query.IgniteSQLException;
+import org.apache.ignite.internal.processors.query.QueryUtils;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2DefaultTableEngine;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2PlainRowFactory;
+import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.h2.server.web.WebServer;
+import org.h2.tools.Server;
+import org.jetbrains.annotations.Nullable;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_H2_DEBUG_CONSOLE;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_H2_DEBUG_CONSOLE_PORT;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_H2_INDEXING_CACHE_CLEANUP_PERIOD;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_H2_INDEXING_CACHE_THREAD_USAGE_TIMEOUT;
+import static org.apache.ignite.IgniteSystemProperties.getInteger;
+import static org.apache.ignite.IgniteSystemProperties.getString;
+
+/**
+ * H2 connection manager.
+ */
+public class ConnectionManager {
+ /** Default DB options. */
+ private static final String DB_OPTIONS = ";LOCK_MODE=3;MULTI_THREADED=1;DB_CLOSE_ON_EXIT=FALSE" +
+ ";DEFAULT_LOCK_TIMEOUT=10000;FUNCTIONS_IN_SCHEMA=true;OPTIMIZE_REUSE_RESULTS=0;QUERY_CACHE_SIZE=0" +
+ ";MAX_OPERATION_MEMORY=0;BATCH_JOINS=1" +
+ ";ROW_FACTORY=\"" + GridH2PlainRowFactory.class.getName() + "\"" +
+ ";DEFAULT_TABLE_ENGINE=" + GridH2DefaultTableEngine.class.getName();
+
+ /** The period of clean up the {@link #threadConns}. */
+ private static final Long CONN_CLEANUP_PERIOD = 2000L;
+
+ /** The period of clean up the statement cache. */
+ private static final Long STMT_CLEANUP_PERIOD = Long.getLong(IGNITE_H2_INDEXING_CACHE_CLEANUP_PERIOD, 10_000);
+
+ /** The timeout to remove entry from the statement cache if the thread doesn't perform any queries. */
+ private static final Long STMT_TIMEOUT = Long.getLong(IGNITE_H2_INDEXING_CACHE_THREAD_USAGE_TIMEOUT, 600 * 1000);
+
+ /*
+ * Initialize system properties for H2.
+ */
+ static {
+ System.setProperty("h2.objectCache", "false");
+ System.setProperty("h2.serializeJavaObject", "false");
+ System.setProperty("h2.objectCacheMaxPerElementSize", "0"); // Avoid ValueJavaObject caching.
+ System.setProperty("h2.optimizeTwoEquals", "false"); // Makes splitter fail on subqueries in WHERE.
+ System.setProperty("h2.dropRestrict", "false"); // Drop schema with cascade semantics.
+ }
+
+ /** Shared connection pool. */
+ private final ThreadLocalObjectPool<H2ConnectionWrapper> connPool =
+ new ThreadLocalObjectPool<>(this::newConnectionWrapper, 5);
+
+ /** Per-thread connections. */
+ private final ConcurrentMap<Thread, H2ConnectionWrapper> threadConns = new ConcurrentHashMap<>();
+
+ /** Connection cache. */
+ private final ThreadLocal<ThreadLocalObjectPool.Reusable<H2ConnectionWrapper>> threadConn =
+ new ThreadLocal<ThreadLocalObjectPool.Reusable<H2ConnectionWrapper>>() {
+ @Override public ThreadLocalObjectPool.Reusable<H2ConnectionWrapper> get() {
+ ThreadLocalObjectPool.Reusable<H2ConnectionWrapper> reusable = super.get();
+
+ boolean reconnect = true;
+
+ try {
+ reconnect = reusable == null || reusable.object().connection().isClosed();
+ }
+ catch (SQLException e) {
+ U.warn(log, "Failed to check connection status.", e);
+ }
+
+ if (reconnect) {
+ reusable = initialValue();
+
+ set(reusable);
+ }
+
+ return reusable;
+ }
+
+ @Override protected ThreadLocalObjectPool.Reusable<H2ConnectionWrapper> initialValue() {
+ ThreadLocalObjectPool.Reusable<H2ConnectionWrapper> reusableConnection = connPool.borrow();
+
+ threadConns.put(Thread.currentThread(), reusableConnection.object());
+
+ return reusableConnection;
+ }
+ };
+
+ /** Database URL. */
+ private final String dbUrl;
+
+ /** Connection cleanup task. */
+ private final GridTimeoutProcessor.CancelableTask connCleanupTask;
+
+ /** Statement cleanup task. */
+ private final GridTimeoutProcessor.CancelableTask stmtCleanupTask;
+
+ /** H2 connection for INFORMATION_SCHEMA. Holds H2 open until node is stopped. */
+ private volatile Connection sysConn;
+
+ /** Logger. */
+ private final IgniteLogger log;
+
+ /**
+ * Constructor.
+ *
+ * @param ctx Context.
+ */
+ public ConnectionManager(GridKernalContext ctx) throws IgniteCheckedException {
+ dbUrl = "jdbc:h2:mem:" + ctx.localNodeId() + DB_OPTIONS;
+
+ log = ctx.log(ConnectionManager.class);
+
+ org.h2.Driver.load();
+
+ sysConn = connectionNoCache(QueryUtils.SCHEMA_INFORMATION);
+
+ stmtCleanupTask = ctx.timeout().schedule(new Runnable() {
+ @Override public void run() {
+ cleanupStatements();
+ }
+ }, STMT_CLEANUP_PERIOD, STMT_CLEANUP_PERIOD);
+
+ connCleanupTask = ctx.timeout().schedule(new Runnable() {
+ @Override public void run() {
+ cleanupConnections();
+ }
+ }, CONN_CLEANUP_PERIOD, CONN_CLEANUP_PERIOD);
+
+ startDebugConsole();
+ }
+
+ /**
+ * Gets DB connection.
+ *
+ * @param schema Whether to set schema for connection or not.
+ * @return DB connection.
+ */
+ public Connection connectionForThread(@Nullable String schema) {
+ H2ConnectionWrapper c = threadConn.get().object();
+
+ if (c == null)
+ throw new IgniteSQLException("Failed to get DB connection for thread (check log for details).");
+
+ if (schema != null && !F.eq(c.schema(), schema)) {
+ try {
+ c.connection().setSchema(schema);
+ c.schema(schema);
+
+ if (log.isDebugEnabled())
+ log.debug("Set schema: " + schema);
+ }
+ catch (SQLException e) {
+ throw new IgniteSQLException("Failed to set schema for DB connection for thread [schema=" +
+ schema + "]", e);
+ }
+ }
+
+ return c.connection();
+ }
+
+ /**
+ * @return Per-thread connections (for testing purposes only).
+ */
+ public Map<Thread, H2ConnectionWrapper> connectionsForThread() {
+ return threadConns;
+ }
+
+ /**
+ * Removes from cache and returns associated with current thread connection.
+ *
+ * @return Connection associated with current thread.
+ */
+ public ThreadLocalObjectPool.Reusable<H2ConnectionWrapper> detachThreadConnection() {
+ Thread key = Thread.currentThread();
+
+ ThreadLocalObjectPool.Reusable<H2ConnectionWrapper> reusableConnection = threadConn.get();
+
+ H2ConnectionWrapper connection = threadConns.remove(key);
+
+ threadConn.remove();
+
+ assert reusableConnection.object().connection() == connection.connection();
+
+ return reusableConnection;
+ }
+
+ /**
+ * Get connection without cache.
+ *
+ * @param schema Schema name.
+ * @return Connection.
+ */
+ public Connection connectionNoCache(String schema) throws IgniteSQLException {
+ try {
+ Connection conn = DriverManager.getConnection(dbUrl);
+
+ conn.setSchema(schema);
+
+ return conn;
+ }
+ catch (SQLException e) {
+ throw new IgniteSQLException("Failed to initialize system DB connection: " + dbUrl, e);
+ }
+ }
+
+ /**
+ * @return {@link H2StatementCache} associated with current thread.
+ */
+ public H2StatementCache statementCacheForThread() {
+ H2StatementCache statementCache = threadConn.get().object().statementCache();
+
+ statementCache.updateLastUsage();
+
+ return statementCache;
+ }
+
+ /**
+ * Execute SQL statement on specific schema.
+ *
+ * @param schema Schema
+ * @param sql SQL statement.
+ * @throws IgniteCheckedException If failed.
+ */
+ public void executeStatement(String schema, String sql) throws IgniteCheckedException {
+ Statement stmt = null;
+
+ try {
+ Connection c = connectionForThread(schema);
+
+ stmt = c.createStatement();
+
+ stmt.executeUpdate(sql);
+ }
+ catch (SQLException e) {
+ onSqlException();
+
+ throw new IgniteSQLException("Failed to execute statement: " + sql, e);
+ }
+ finally {
+ U.close(stmt, log);
+ }
+ }
+
+ /**
+ * Execute statement on H2 INFORMATION_SCHEMA.
+ *
+ * @param sql SQL statement.
+ */
+ public void executeSystemStatement(String sql) {
+ Statement stmt = null;
+
+ try {
+ stmt = sysConn.createStatement();
+
+ stmt.executeUpdate(sql);
+ }
+ catch (SQLException e) {
+ onSqlException();
+
+ throw new IgniteSQLException("Failed to execute system statement: " + sql, e);
+ }
+ finally {
+ U.close(stmt, log);
+ }
+ }
+
+ /**
+ * Clear statement cache when cache is unregistered..
+ */
+ public void onCacheUnregistered() {
+ threadConns.values().forEach(H2ConnectionWrapper::clearStatementCache);
+ }
+
+ /**
+ * Cancel all queries.
+ */
+ public void onKernalStop() {
+ for (H2ConnectionWrapper c : threadConns.values())
+ U.close(c, log);
+ }
+
+ /**
+ * Close executor.
+ */
+ public void stop() {
+ for (H2ConnectionWrapper c : threadConns.values())
+ U.close(c, log);
+
+ threadConns.clear();
+
+ try (Connection c = connectionNoCache(QueryUtils.SCHEMA_INFORMATION); Statement s = c.createStatement()) {
+ s.execute("SHUTDOWN");
+ }
+ catch (SQLException e) {
+ U.error(log, "Failed to shutdown database.", e);
+ }
+
+ if (stmtCleanupTask != null)
+ stmtCleanupTask.close();
+
+ if (connCleanupTask != null)
+ connCleanupTask.close();
+
+ if (sysConn != null) {
+ U.close(sysConn, log);
+
+ sysConn = null;
+ }
+ }
+
+ /**
+ * Handles SQL exception.
+ */
+ public void onSqlException() {
+ Connection conn = threadConn.get().object().connection();
+
+ threadConn.set(null);
+
+ if (conn != null) {
+ threadConns.remove(Thread.currentThread());
+
+ // Reset connection to receive new one at next call.
+ U.close(conn, log);
+ }
+ }
+
+ /**
+ * Start debug console if needed.
+ *
+ * @throws IgniteCheckedException If failed.
+ */
+ private void startDebugConsole() throws IgniteCheckedException {
+ try {
+ if (getString(IGNITE_H2_DEBUG_CONSOLE) != null) {
+ Connection c = DriverManager.getConnection(dbUrl);
+
+ int port = getInteger(IGNITE_H2_DEBUG_CONSOLE_PORT, 0);
+
+ WebServer webSrv = new WebServer();
+ Server web = new Server(webSrv, "-webPort", Integer.toString(port));
+ web.start();
+ String url = webSrv.addSession(c);
+
+ U.quietAndInfo(log, "H2 debug console URL: " + url);
+
+ try {
+ Server.openBrowser(url);
+ }
+ catch (Exception e) {
+ U.warn(log, "Failed to open browser: " + e.getMessage());
+ }
+ }
+ }
+ catch (SQLException e) {
+ throw new IgniteCheckedException(e);
+ }
+ }
+
+ /**
+ * Create new connection wrapper.
+ *
+ * @return Connection wrapper.
+ */
+ private H2ConnectionWrapper newConnectionWrapper() {
+ try {
+ return new H2ConnectionWrapper(DriverManager.getConnection(dbUrl));
+ }
+ catch (SQLException e) {
+ throw new IgniteSQLException("Failed to initialize DB connection: " + dbUrl, e);
+ }
+ }
+
+ /**
+ * Called periodically to cleanup connections.
+ */
+ private void cleanupConnections() {
+ for (Iterator<Map.Entry<Thread, H2ConnectionWrapper>> it = threadConns.entrySet().iterator(); it.hasNext(); ) {
+ Map.Entry<Thread, H2ConnectionWrapper> entry = it.next();
+
+ Thread t = entry.getKey();
+
+ if (t.getState() == Thread.State.TERMINATED) {
+ U.close(entry.getValue(), log);
+
+ it.remove();
+ }
+ }
+ }
+
+ /**
+ * Called periodically to clean up the statement cache.
+ */
+ private void cleanupStatements() {
+ long now = U.currentTimeMillis();
+
+ for (Iterator<Map.Entry<Thread, H2ConnectionWrapper>> it = threadConns.entrySet().iterator(); it.hasNext(); ) {
+ Map.Entry<Thread, H2ConnectionWrapper> entry = it.next();
+
+ Thread t = entry.getKey();
+
+ if (t.getState() == Thread.State.TERMINATED) {
+ U.close(entry.getValue(), log);
+
+ it.remove();
+ }
+ else if (now - entry.getValue().statementCache().lastUsage() > STMT_TIMEOUT)
+ entry.getValue().clearStatementCache();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8526adda/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 58e09cb..937363a 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -22,7 +22,6 @@ import java.lang.reflect.Modifier;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.sql.Connection;
-import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
@@ -112,9 +111,7 @@ import org.apache.ignite.internal.processors.query.h2.database.io.H2MvccLeafIO;
import org.apache.ignite.internal.processors.query.h2.ddl.DdlStatementsProcessor;
import org.apache.ignite.internal.processors.query.h2.dml.DmlUtils;
import org.apache.ignite.internal.processors.query.h2.dml.UpdatePlan;
-import org.apache.ignite.internal.processors.query.h2.opt.GridH2DefaultTableEngine;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2IndexBase;
-import org.apache.ignite.internal.processors.query.h2.opt.GridH2PlainRowFactory;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryContext;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2Row;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor;
@@ -141,7 +138,6 @@ import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryReq
import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitor;
import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorClosure;
import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorImpl;
-import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
import org.apache.ignite.internal.sql.SqlParseException;
import org.apache.ignite.internal.sql.SqlParser;
import org.apache.ignite.internal.sql.SqlStrictParseException;
@@ -188,19 +184,11 @@ import org.h2.engine.Session;
import org.h2.engine.SysProperties;
import org.h2.index.Index;
import org.h2.jdbc.JdbcStatement;
-import org.h2.server.web.WebServer;
import org.h2.table.IndexColumn;
-import org.h2.tools.Server;
import org.h2.util.JdbcUtils;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
-import static org.apache.ignite.IgniteSystemProperties.IGNITE_H2_DEBUG_CONSOLE;
-import static org.apache.ignite.IgniteSystemProperties.IGNITE_H2_DEBUG_CONSOLE_PORT;
-import static org.apache.ignite.IgniteSystemProperties.IGNITE_H2_INDEXING_CACHE_CLEANUP_PERIOD;
-import static org.apache.ignite.IgniteSystemProperties.IGNITE_H2_INDEXING_CACHE_THREAD_USAGE_TIMEOUT;
-import static org.apache.ignite.IgniteSystemProperties.getInteger;
-import static org.apache.ignite.IgniteSystemProperties.getString;
import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.checkActive;
import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.mvccEnabled;
import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.tx;
@@ -237,27 +225,11 @@ public class IgniteH2Indexing implements GridQueryIndexing {
*/
static {
PageIO.registerH2(H2InnerIO.VERSIONS, H2LeafIO.VERSIONS, H2MvccInnerIO.VERSIONS, H2MvccLeafIO.VERSIONS);
+
H2ExtrasInnerIO.register();
H2ExtrasLeafIO.register();
-
- // Initialize system properties for H2.
- System.setProperty("h2.objectCache", "false");
- System.setProperty("h2.serializeJavaObject", "false");
- System.setProperty("h2.objectCacheMaxPerElementSize", "0"); // Avoid ValueJavaObject caching.
- System.setProperty("h2.optimizeTwoEquals", "false"); // Makes splitter fail on subqueries in WHERE.
- System.setProperty("h2.dropRestrict", "false"); // Drop schema with cascade semantics.
}
- /** Default DB options. */
- private static final String DB_OPTIONS = ";LOCK_MODE=3;MULTI_THREADED=1;DB_CLOSE_ON_EXIT=FALSE" +
- ";DEFAULT_LOCK_TIMEOUT=10000;FUNCTIONS_IN_SCHEMA=true;OPTIMIZE_REUSE_RESULTS=0;QUERY_CACHE_SIZE=0" +
- ";MAX_OPERATION_MEMORY=0;BATCH_JOINS=1" +
- ";ROW_FACTORY=\"" + GridH2PlainRowFactory.class.getName() + "\"" +
- ";DEFAULT_TABLE_ENGINE=" + GridH2DefaultTableEngine.class.getName();
-
- // Uncomment this setting to get debug output from H2 to sysout.
-// ";TRACE_LEVEL_SYSTEM_OUT=3";
-
/** Dummy metadata for update result. */
public static final List<GridQueryFieldMetadata> UPDATE_RESULT_META = Collections.<GridQueryFieldMetadata>
singletonList(new H2SqlFieldMetadata(null, null, "UPDATED", Long.class.getName(), -1, -1));
@@ -265,23 +237,6 @@ public class IgniteH2Indexing implements GridQueryIndexing {
/** */
private static final int TWO_STEP_QRY_CACHE_SIZE = 1024;
- /** The period of clean up the statement cache. */
- private final Long CLEANUP_STMT_CACHE_PERIOD = Long.getLong(IGNITE_H2_INDEXING_CACHE_CLEANUP_PERIOD, 10_000);
-
- /** The period of clean up the {@link #conns}. */
- @SuppressWarnings("FieldCanBeLocal")
- private final Long CLEANUP_CONNECTIONS_PERIOD = 2000L;
-
- /** The timeout to remove entry from the statement cache if the thread doesn't perform any queries. */
- private final Long STATEMENT_CACHE_THREAD_USAGE_TIMEOUT =
- Long.getLong(IGNITE_H2_INDEXING_CACHE_THREAD_USAGE_TIMEOUT, 600 * 1000);
-
- /** */
- private GridTimeoutProcessor.CancelableTask stmtCacheCleanupTask;
-
- /** */
- private GridTimeoutProcessor.CancelableTask connCleanupTask;
-
/** Logger. */
@LoggerResource
private IgniteLogger log;
@@ -296,13 +251,6 @@ public class IgniteH2Indexing implements GridQueryIndexing {
private final ConcurrentMap<String, H2Schema> schemas = new ConcurrentHashMap<>();
/** */
- private String dbUrl = "jdbc:h2:mem:";
-
- /** */
- // TODO https://issues.apache.org/jira/browse/IGNITE-9062
- private final ConcurrentMap<Thread, H2ConnectionWrapper> conns = new ConcurrentHashMap<>();
-
- /** */
private GridMapQueryExecutor mapQryExec;
/** */
@@ -327,43 +275,6 @@ public class IgniteH2Indexing implements GridQueryIndexing {
private final H2RowCacheRegistry rowCache = new H2RowCacheRegistry();
/** */
- // TODO https://issues.apache.org/jira/browse/IGNITE-9062
- private final ThreadLocalObjectPool<H2ConnectionWrapper> connectionPool = new ThreadLocalObjectPool<>(IgniteH2Indexing.this::newConnectionWrapper, 5);
-
- /** */
- // TODO https://issues.apache.org/jira/browse/IGNITE-9062
- private final ThreadLocal<ThreadLocalObjectPool.Reusable<H2ConnectionWrapper>> connCache = new ThreadLocal<ThreadLocalObjectPool.Reusable<H2ConnectionWrapper>>() {
- @Override public ThreadLocalObjectPool.Reusable<H2ConnectionWrapper> get() {
- ThreadLocalObjectPool.Reusable<H2ConnectionWrapper> reusable = super.get();
-
- boolean reconnect = true;
-
- try {
- reconnect = reusable == null || reusable.object().connection().isClosed();
- }
- catch (SQLException e) {
- U.warn(log, "Failed to check connection status.", e);
- }
-
- if (reconnect) {
- reusable = initialValue();
-
- set(reusable);
- }
-
- return reusable;
- }
-
- @Override protected ThreadLocalObjectPool.Reusable<H2ConnectionWrapper> initialValue() {
- ThreadLocalObjectPool.Reusable<H2ConnectionWrapper> reusableConnection = connectionPool.borrow();
-
- conns.put(Thread.currentThread(), reusableConnection.object());
-
- return reusableConnection;
- }
- };
-
- /** */
protected volatile GridKernalContext ctx;
/** Cache object value context. */
@@ -394,8 +305,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
};
- /** H2 JDBC connection for INFORMATION_SCHEMA. Holds H2 open until node is stopped. */
- private Connection sysConn;
+ /** Query executor. */
+ private ConnectionManager connMgr;
/**
* @return Kernal context.
@@ -405,48 +316,6 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
/**
- * @param schema Schema.
- * @return Connection.
- */
- public Connection connectionForSchema(String schema) {
- try {
- return connectionForThread(schema);
- }
- catch (IgniteCheckedException e) {
- throw new IgniteException(e);
- }
- }
-
- /**
- * @return H2 JDBC connection to INFORMATION_SCHEMA.
- */
- private Connection systemConnection() {
- assert Thread.holdsLock(schemaMux);
-
- if (sysConn == null) {
- try {
- sysConn = DriverManager.getConnection(dbUrl);
-
- sysConn.setSchema("INFORMATION_SCHEMA");
- }
- catch (SQLException e) {
- throw new IgniteSQLException("Failed to initialize system DB connection: " + dbUrl, e);
- }
- }
-
- return sysConn;
- }
-
- /** */
- private H2ConnectionWrapper newConnectionWrapper() {
- try {
- return new H2ConnectionWrapper(DriverManager.getConnection(dbUrl));
- } catch (SQLException e) {
- throw new IgniteSQLException("Failed to initialize DB connection: " + dbUrl, e);
- }
- }
-
- /**
* @param c Connection.
* @param sql SQL.
* @return <b>Cached</b> prepared statement.
@@ -489,7 +358,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
assert useStmtCache || !cachedOnly;
if (useStmtCache) {
- H2StatementCache cache = getStatementsCacheForCurrentThread();
+ H2StatementCache cache = connMgr.statementCacheForThread();
H2CachedStatementKey key = new H2CachedStatementKey(c.getSchema(), sql);
@@ -538,63 +407,14 @@ public class IgniteH2Indexing implements GridQueryIndexing {
return c.prepareStatement(sql, ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY);
}
- /**
- * @return {@link H2StatementCache} associated with current thread.
- */
- @NotNull private H2StatementCache getStatementsCacheForCurrentThread() {
- H2StatementCache statementCache = connCache.get().object().statementCache();
-
- statementCache.updateLastUsage();
-
- return statementCache;
- }
-
/** {@inheritDoc} */
@Override public PreparedStatement prepareNativeStatement(String schemaName, String sql) {
- Connection conn = connectionForSchema(schemaName);
+ Connection conn = connMgr.connectionForThread(schemaName);
return prepareStatementAndCaches(conn, sql);
}
/**
- * Gets DB connection.
- *
- * @param schema Whether to set schema for connection or not.
- * @return DB connection.
- * @throws IgniteCheckedException In case of error.
- */
- private Connection connectionForThread(@Nullable String schema) throws IgniteCheckedException {
- H2ConnectionWrapper c = connCache.get().object();
-
- if (c == null)
- throw new IgniteCheckedException("Failed to get DB connection for thread (check log for details).");
-
- if (schema != null && !F.eq(c.schema(), schema)) {
- Statement stmt = null;
-
- try {
- stmt = c.connection().createStatement();
-
- stmt.executeUpdate("SET SCHEMA " + H2Utils.withQuotes(schema));
-
- if (log.isDebugEnabled())
- log.debug("Set schema: " + schema);
-
- c.schema(schema);
- }
- catch (SQLException e) {
- throw new IgniteSQLException("Failed to set schema for DB connection for thread [schema=" +
- schema + "]", e);
- }
- finally {
- U.close(stmt, log);
- }
- }
-
- return c.connection();
- }
-
- /**
* Create and register schema if needed.
*
* @param schemaName Schema name.
@@ -642,7 +462,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
* @param schema Schema name.
*/
private void createSchema0(String schema) {
- executeSystemStatement("CREATE SCHEMA IF NOT EXISTS " + H2Utils.withQuotes(schema));
+ connMgr.executeSystemStatement("CREATE SCHEMA IF NOT EXISTS " + H2Utils.withQuotes(schema));
if (log.isDebugEnabled())
log.debug("Created H2 schema for index database: " + schema);
@@ -654,62 +474,13 @@ public class IgniteH2Indexing implements GridQueryIndexing {
* @param schema Schema name.
*/
private void dropSchema(String schema) {
- executeSystemStatement("DROP SCHEMA IF EXISTS " + H2Utils.withQuotes(schema));
+ connMgr.executeSystemStatement("DROP SCHEMA IF EXISTS " + H2Utils.withQuotes(schema));
if (log.isDebugEnabled())
log.debug("Dropped H2 schema for index database: " + schema);
}
/**
- * @param schema Schema
- * @param sql SQL statement.
- * @throws IgniteCheckedException If failed.
- */
- public void executeStatement(String schema, String sql) throws IgniteCheckedException {
- Statement stmt = null;
-
- try {
- Connection c = connectionForThread(schema);
-
- stmt = c.createStatement();
-
- stmt.executeUpdate(sql);
- }
- catch (SQLException e) {
- onSqlException();
-
- throw new IgniteSQLException("Failed to execute statement: " + sql, e);
- }
- finally {
- U.close(stmt, log);
- }
- }
-
- /**
- * Execute statement on H2 INFORMATION_SCHEMA.
- * @param sql SQL statement.
- */
- public void executeSystemStatement(String sql) {
- assert Thread.holdsLock(schemaMux);
-
- Statement stmt = null;
-
- try {
- stmt = systemConnection().createStatement();
-
- stmt.executeUpdate(sql);
- }
- catch (SQLException e) {
- onSqlException();
-
- throw new IgniteSQLException("Failed to execute statement: " + sql, e);
- }
- finally {
- U.close(stmt, log);
- }
- }
-
- /**
* Binds object to prepared statement.
*
* @param stmt SQL statement.
@@ -734,22 +505,6 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
}
- /**
- * Handles SQL exception.
- */
- private void onSqlException() {
- Connection conn = connCache.get().object().connection();
-
- connCache.set(null);
-
- if (conn != null) {
- conns.remove(Thread.currentThread());
-
- // Reset connection to receive new one at next call.
- U.close(conn, log);
- }
- }
-
/** {@inheritDoc} */
@Override public void store(GridCacheContext cctx,
GridQueryTypeDescriptor type,
@@ -811,7 +566,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
if (log.isDebugEnabled())
log.debug("Removing query index table: " + tbl.fullTableName());
- Connection c = connectionForThread(tbl.schemaName());
+ Connection c = connMgr.connectionForThread(tbl.schemaName());
Statement stmt = null;
@@ -826,7 +581,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
stmt.executeUpdate(sql);
}
catch (SQLException e) {
- onSqlException();
+ connMgr.onSqlException();
throw new IgniteSQLException("Failed to drop database index table [type=" + tbl.type().name() +
", table=" + tbl.fullTableName() + "]", IgniteQueryErrorCode.TABLE_DROP_FAILED, e);
@@ -970,7 +725,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
*/
private void executeSql(String schemaName, String sql) throws IgniteCheckedException {
try {
- Connection conn = connectionForSchema(schemaName);
+ Connection conn = connMgr.connectionForThread(schemaName);
try (PreparedStatement stmt = prepareStatement(conn, sql, false)) {
stmt.execute();
@@ -1081,7 +836,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
assert mvccEnabled || mvccTracker == null;
try {
- final Connection conn = connectionForSchema(schemaName);
+ final Connection conn = connMgr.connectionForThread(schemaName);
H2Utils.setupConnection(conn, false, enforceJoinOrder);
@@ -1283,9 +1038,9 @@ public class IgniteH2Indexing implements GridQueryIndexing {
*/
public static int operationTimeout(int qryTimeout, IgniteTxAdapter tx) {
if (tx != null) {
- int tm1 = (int)tx.remainingTime(), tm2 = qryTimeout;
+ int remaining = (int)tx.remainingTime();
- return tm1 > 0 && tm2 > 0 ? Math.min(tm1, tm2) : Math.max(tm1, tm2);
+ return remaining > 0 && qryTimeout > 0 ? Math.min(remaining, qryTimeout) : Math.max(remaining, qryTimeout);
}
return qryTimeout;
@@ -1294,7 +1049,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
/** {@inheritDoc} */
@Override public long streamUpdateQuery(String schemaName, String qry,
@Nullable Object[] params, IgniteDataStreamer<?, ?> streamer) throws IgniteCheckedException {
- final Connection conn = connectionForSchema(schemaName);
+ final Connection conn = connMgr.connectionForThread(schemaName);
final PreparedStatement stmt;
@@ -1318,7 +1073,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
return zeroBatchedStreamedUpdateResult(params.size());
}
- final Connection conn = connectionForSchema(schemaName);
+ final Connection conn = connMgr.connectionForThread(schemaName);
final PreparedStatement stmt = prepareStatementAndCaches(conn, qry);
@@ -1512,7 +1267,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
return rs;
}
catch (SQLException e) {
- onSqlException();
+ connMgr.onSqlException();
throw new IgniteCheckedException(e);
}
@@ -1728,6 +1483,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
/** {@inheritDoc} */
+ @SuppressWarnings("deprecation")
@Override public SqlFieldsQuery generateFieldsQuery(String cacheName, SqlQuery qry) {
String schemaName = schema(cacheName);
@@ -2096,7 +1852,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
// Second, let's check if we already have a parsed statement...
PreparedStatement cachedStmt;
- if ((cachedStmt = cachedStatement(connectionForSchema(schemaName), qry.getSql())) != null) {
+ if ((cachedStmt = cachedStatement(connMgr.connectionForThread(schemaName), qry.getSql())) != null) {
Prepared prepared = GridSqlQueryParser.prepared(cachedStmt);
// We may use this cached statement only for local queries and non queries.
@@ -2189,7 +1945,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
if (!prepared.isQuery()) {
if (DmlStatementsProcessor.isDmlStatement(prepared)) {
try {
- Connection conn = connectionForSchema(schemaName);
+ Connection conn = connMgr.connectionForThread(schemaName);
if (!loc)
return dmlProc.updateSqlFieldsDistributed(schemaName, conn, prepared, qry, cancel);
@@ -2287,7 +2043,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
* metadata for two-step query (if needed), evaluated query local execution flag.
*/
private ParsingResult parseAndSplit(String schemaName, SqlFieldsQuery qry, int firstArg) {
- Connection c = connectionForSchema(schemaName);
+ Connection c = connMgr.connectionForThread(schemaName);
// For queries that are explicitly local, we rely on the flag specified in the query
// because this parsing result will be cached and used for queries directly.
@@ -2379,7 +2135,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
// Let's not cache multiple statements and distributed queries as whole two step query will be cached later on.
if (remainingSql != null || hasTwoStep)
- getStatementsCacheForCurrentThread().remove(schemaName, qry.getSql());
+ connMgr.statementCacheForThread().remove(schemaName, qry.getSql());
if (!hasTwoStep)
return new ParsingResult(prepared, newQry, remainingSql, null, null, null);
@@ -2448,7 +2204,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
*/
private GridCacheTwoStepQuery split(Prepared prepared, SqlFieldsQuery qry) throws IgniteCheckedException,
SQLException {
- GridCacheTwoStepQuery res = GridSqlQuerySplitter.split(connectionForThread(qry.getSchema()), prepared,
+ GridCacheTwoStepQuery res = GridSqlQuerySplitter.split(connMgr.connectionForThread(qry.getSchema()), prepared,
qry.getArgs(), qry.isCollocated(), qry.isDistributedJoins(), qry.isEnforceJoinOrder(), this);
List<Integer> cacheIds = collectCacheIds(null, res);
@@ -2474,6 +2230,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
* @param qry Sql fields query.autoStartTx(qry)
* @return {@code True} if need to start transaction.
*/
+ @SuppressWarnings("SimplifiableIfStatement")
public boolean autoStartTx(SqlFieldsQuery qry) {
if (!mvccEnabled(ctx))
return false;
@@ -2512,7 +2269,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
loc = false;
}
- Connection conn = connectionForSchema(schema);
+ Connection conn = connMgr.connectionForThread(schema);
H2Utils.setupConnection(conn, false, fldsQry.isEnforceJoinOrder());
@@ -2619,7 +2376,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
*/
public UpdateResult mapDistributedUpdate(String schemaName, SqlFieldsQuery fldsQry, IndexingQueryFilter filter,
GridQueryCancel cancel, boolean local) throws IgniteCheckedException {
- Connection conn = connectionForSchema(schemaName);
+ Connection conn = connMgr.connectionForThread(schemaName);
H2Utils.setupConnection(conn, false, fldsQry.isEnforceJoinOrder());
@@ -2703,14 +2460,14 @@ public class IgniteH2Indexing implements GridQueryIndexing {
H2TableDescriptor tbl = new H2TableDescriptor(this, schema, type, cctx, isSql);
try {
- Connection conn = connectionForThread(schemaName);
+ Connection conn = connMgr.connectionForThread(schemaName);
createTable(schemaName, schema, tbl, conn);
schema.add(tbl);
}
catch (SQLException e) {
- onSqlException();
+ connMgr.onSqlException();
throw new IgniteCheckedException("Failed to register query type: " + type, e);
}
@@ -2938,62 +2695,6 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
/**
- * Called periodically by {@link GridTimeoutProcessor} to clean up the statement cache.
- */
- private void cleanupStatementCache() {
- long now = U.currentTimeMillis();
-
- for (Iterator<Map.Entry<Thread, H2ConnectionWrapper>> it = conns.entrySet().iterator(); it.hasNext(); ) {
- Map.Entry<Thread, H2ConnectionWrapper> entry = it.next();
-
- Thread t = entry.getKey();
-
- if (t.getState() == Thread.State.TERMINATED) {
- U.close(entry.getValue(), log);
-
- it.remove();
- }
- else if (now - entry.getValue().statementCache().lastUsage() > STATEMENT_CACHE_THREAD_USAGE_TIMEOUT)
- entry.getValue().clearStatementCache();
- }
- }
-
- /**
- * Called periodically by {@link GridTimeoutProcessor} to clean up the {@link #conns}.
- */
- private void cleanupConnections() {
- for (Iterator<Map.Entry<Thread, H2ConnectionWrapper>> it = conns.entrySet().iterator(); it.hasNext(); ) {
- Map.Entry<Thread, H2ConnectionWrapper> entry = it.next();
-
- Thread t = entry.getKey();
-
- if (t.getState() == Thread.State.TERMINATED) {
- U.close(entry.getValue(), log);
-
- it.remove();
- }
- }
- }
-
- /**
- * Removes from cache and returns associated with current thread connection.
- * @return Connection associated with current thread.
- */
- public ThreadLocalObjectPool.Reusable<H2ConnectionWrapper> detach() {
- Thread key = Thread.currentThread();
-
- ThreadLocalObjectPool.Reusable<H2ConnectionWrapper> reusableConnection = connCache.get();
-
- H2ConnectionWrapper connection = conns.remove(key);
-
- connCache.remove();
-
- assert reusableConnection.object().connection() == connection.connection();
-
- return reusableConnection;
- }
-
- /**
* Rebuild indexes from hash index.
*
* @param cacheName Cache name.
@@ -3060,36 +2761,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
SysProperties.serializeJavaObject = false;
}
- String dbName = (ctx != null ? ctx.localNodeId() : UUID.randomUUID()).toString();
-
- dbUrl = "jdbc:h2:mem:" + dbName + DB_OPTIONS;
-
- org.h2.Driver.load();
-
- try {
- if (getString(IGNITE_H2_DEBUG_CONSOLE) != null) {
- Connection c = DriverManager.getConnection(dbUrl);
-
- int port = getInteger(IGNITE_H2_DEBUG_CONSOLE_PORT, 0);
-
- WebServer webSrv = new WebServer();
- Server web = new Server(webSrv, "-webPort", Integer.toString(port));
- web.start();
- String url = webSrv.addSession(c);
-
- U.quietAndInfo(log, "H2 debug console URL: " + url);
-
- try {
- Server.openBrowser(url);
- }
- catch (Exception e) {
- U.warn(log, "Failed to open browser: " + e.getMessage());
- }
- }
- }
- catch (SQLException e) {
- throw new IgniteCheckedException(e);
- }
+ connMgr = new ConnectionManager(ctx);
if (ctx == null) {
// This is allowed in some tests.
@@ -3129,12 +2801,6 @@ public class IgniteH2Indexing implements GridQueryIndexing {
mapQryExec.start(ctx, this);
rdcQryExec.start(ctx, this);
- stmtCacheCleanupTask = ctx.timeout().schedule(new Runnable() {
- @Override public void run() {
- cleanupStatementCache();
- }
- }, CLEANUP_STMT_CACHE_PERIOD, CLEANUP_STMT_CACHE_PERIOD);
-
dmlProc = new DmlStatementsProcessor();
ddlProc = new DdlStatementsProcessor();
@@ -3150,17 +2816,14 @@ public class IgniteH2Indexing implements GridQueryIndexing {
createSchema0(QueryUtils.SCHEMA_SYS);
}
- Connection c = connectionForSchema(QueryUtils.SCHEMA_SYS);
-
- for (SqlSystemView view : systemViews(ctx))
- SqlSystemTableEngine.registerView(c, view);
+ try (Connection c = connMgr.connectionNoCache(QueryUtils.SCHEMA_SYS)) {
+ for (SqlSystemView view : systemViews(ctx))
+ SqlSystemTableEngine.registerView(c, view);
+ }
}
catch (SQLException e) {
throw new IgniteCheckedException("Failed to register system view.", e);
}
-
- // Caching this connection in ThreadLocal may lead to memory leaks.
- connCache.set(null);
}
else {
if (log.isDebugEnabled())
@@ -3175,12 +2838,6 @@ public class IgniteH2Indexing implements GridQueryIndexing {
JdbcUtils.serializer = h2Serializer();
assert ctx != null;
-
- connCleanupTask = ctx.timeout().schedule(new Runnable() {
- @Override public void run() {
- cleanupConnections();
- }
- }, CLEANUP_CONNECTIONS_PERIOD, CLEANUP_CONNECTIONS_PERIOD);
}
/**
@@ -3348,7 +3005,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
" FOR \"") +
cls.getName() + '.' + m.getName() + '"';
- executeStatement(schema, clause);
+ connMgr.executeStatement(schema, clause);
}
}
}
@@ -3361,39 +3018,18 @@ public class IgniteH2Indexing implements GridQueryIndexing {
mapQryExec.cancelLazyWorkers();
- for (H2ConnectionWrapper c : conns.values())
- U.close(c, log);
-
- conns.clear();
schemas.clear();
cacheName2schema.clear();
- try (Connection c = DriverManager.getConnection(dbUrl); Statement s = c.createStatement()) {
- s.execute("SHUTDOWN");
- }
- catch (SQLException e) {
- U.error(log, "Failed to shutdown database.", e);
- }
-
- if (stmtCacheCleanupTask != null)
- stmtCacheCleanupTask.close();
-
- if (connCleanupTask != null)
- connCleanupTask.close();
-
GridH2QueryContext.clearLocalNodeStop(nodeId);
- if (log.isDebugEnabled())
- log.debug("Cache query index stopped.");
-
// Close system H2 connection to INFORMATION_SCHEMA
synchronized (schemaMux) {
- if (sysConn != null) {
- U.close(sysConn, log);
-
- sysConn = null;
- }
+ connMgr.stop();
}
+
+ if (log.isDebugEnabled())
+ log.debug("Cache query index stopped.");
}
/** {@inheritDoc} */
@@ -3471,7 +3107,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
}
- conns.values().forEach(H2ConnectionWrapper::clearStatementCache);
+ connMgr.onCacheUnregistered();
for (H2TableDescriptor tbl : rmvTbls) {
for (Index idx : tbl.table().getIndexes())
@@ -3628,18 +3264,17 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
/** {@inheritDoc} */
- @Override public void cancelAllQueries() {
+ @Override public void onKernalStop() {
mapQryExec.cancelLazyWorkers();
- for (H2ConnectionWrapper c : conns.values())
- U.close(c, log);
+ connMgr.onKernalStop();
}
/**
- * @return Per-thread connections.
+ * @return Query executor.
*/
- public Map<Thread, ?> perThreadConnections() {
- return conns;
+ public ConnectionManager connections() {
+ return connMgr;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/8526adda/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java
index ba4b12b..0f8b6d8 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java
@@ -647,7 +647,7 @@ public final class UpdatePlan {
/** {@inheritDoc} */
@Override public void beforeDetach() {
- ThreadLocalObjectPool.Reusable<H2ConnectionWrapper> conn0 = conn = idx.detach();
+ ThreadLocalObjectPool.Reusable<H2ConnectionWrapper> conn0 = conn = idx.connections().detachThreadConnection();
if (isClosed())
conn0.recycle();
http://git-wip-us.apache.org/repos/asf/ignite/blob/8526adda/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index c2ea9a8..569cb60 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -875,7 +875,7 @@ public class GridMapQueryExecutor {
.mvccSnapshot(mvccSnapshot)
.lazyWorker(worker);
- Connection conn = h2.connectionForSchema(schemaName);
+ Connection conn = h2.connections().connectionForThread(schemaName);
H2Utils.setupConnection(conn, distributedJoinMode != OFF, enforceJoinOrder);
http://git-wip-us.apache.org/repos/asf/ignite/blob/8526adda/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 87c8ce9..36287b3 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -656,7 +656,7 @@ public class GridReduceQueryExecutor {
}
final ReduceQueryRun r = new ReduceQueryRun(qryReqId, qry.originalSql(), schemaName,
- h2.connectionForSchema(schemaName), qry.mapQueries().size(), qry.pageSize(),
+ h2.connections().connectionForThread(schemaName), qry.mapQueries().size(), qry.pageSize(),
U.currentTimeMillis(), sfuFut, cancel);
Collection<ClusterNode> nodes;
http://git-wip-us.apache.org/repos/asf/ignite/blob/8526adda/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryH2IndexingLeakTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryH2IndexingLeakTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryH2IndexingLeakTest.java
index 14593de..9458887 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryH2IndexingLeakTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryH2IndexingLeakTest.java
@@ -17,7 +17,7 @@
package org.apache.ignite.internal.processors.cache;
-import java.util.concurrent.ConcurrentMap;
+import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.IgniteCache;
@@ -119,9 +119,10 @@ public class IgniteCacheQueryH2IndexingLeakTest extends GridCommonAbstractTest {
* @return size of statement cache.
*/
private static int getStatementCacheSize(GridQueryProcessor qryProcessor) {
- IgniteH2Indexing h2Idx = GridTestUtils.getFieldValue(qryProcessor, GridQueryProcessor.class, "idx");
- ConcurrentMap<Thread, H2ConnectionWrapper> conns = GridTestUtils.getFieldValue(h2Idx, IgniteH2Indexing.class, "conns");
+ IgniteH2Indexing h2Idx = (IgniteH2Indexing)qryProcessor.getIndexing();
+
+ Map<Thread, H2ConnectionWrapper> conns = h2Idx.connections().connectionsForThread();
int cntr = 0;
@@ -145,10 +146,10 @@ public class IgniteCacheQueryH2IndexingLeakTest extends GridCommonAbstractTest {
// Open iterator on the created cursor: add entries to the cache.
IgniteInternalFuture<?> fut = multithreadedAsync(
new CAX() {
+ @SuppressWarnings("unchecked")
@Override public void applyx() throws IgniteCheckedException {
while (!stop.get()) {
c.query(new SqlQuery(Integer.class, "_val >= 0")).getAll();
-
c.query(new SqlQuery(Integer.class, "_val >= 1")).getAll();
}
}
@@ -197,6 +198,7 @@ public class IgniteCacheQueryH2IndexingLeakTest extends GridCommonAbstractTest {
// Open iterator on the created cursor: add entries to the cache
IgniteInternalFuture<?> fut = multithreadedAsync(
new CAX() {
+ @SuppressWarnings("unchecked")
@Override public void applyx() throws IgniteCheckedException {
c.query(new SqlQuery(Integer.class, "_val >= 0")).getAll();
http://git-wip-us.apache.org/repos/asf/ignite/blob/8526adda/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2ConnectionLeaksSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2ConnectionLeaksSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2ConnectionLeaksSelfTest.java
index 7713004..d8d6735 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2ConnectionLeaksSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2ConnectionLeaksSelfTest.java
@@ -108,7 +108,7 @@ public class H2ConnectionLeaksSelfTest extends GridCommonAbstractTest {
try {
IgniteH2Indexing idx = (IgniteH2Indexing)grid(1).context().query().getIndexing();
- idx.executeStatement(CACHE_NAME, "select *");
+ idx.connections().executeStatement(CACHE_NAME, "select *");
}
catch (Exception e) {
// No-op.
@@ -172,6 +172,6 @@ public class H2ConnectionLeaksSelfTest extends GridCommonAbstractTest {
* @return Per-thread connections.
*/
private Map<Thread, ?> perThreadConnections(int nodeIdx) {
- return ((IgniteH2Indexing)grid(nodeIdx).context().query().getIndexing()).perThreadConnections();
+ return ((IgniteH2Indexing)grid(nodeIdx).context().query().getIndexing()).connections().connectionsForThread();
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8526adda/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/GridQueryParsingTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/GridQueryParsingTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/GridQueryParsingTest.java
index ca24e57..1a4b248 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/GridQueryParsingTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/GridQueryParsingTest.java
@@ -1008,7 +1008,7 @@ public class GridQueryParsingTest extends GridCommonAbstractTest {
String schemaName = idx.schema(DEFAULT_CACHE_NAME);
- return (JdbcConnection)idx.connectionForSchema(schemaName);
+ return (JdbcConnection)idx.connections().connectionForThread(schemaName);
}
/**