You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2018/10/11 11:34:31 UTC
[3/3] ignite git commit: IGNITE-9171: SQL: always execute queries in
lazy mode. This closes #4514. This closes #4538. This closes #4870.
IGNITE-9171: SQL: always execute queries in lazy mode. This closes #4514. This closes #4538. This closes #4870.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f97ebff9
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f97ebff9
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f97ebff9
Branch: refs/heads/master
Commit: f97ebff9a59514a681258b46ae1b74c1ce4e0a3b
Parents: a3c2ea3
Author: tledkov-gridgain <tl...@gridgain.com>
Authored: Thu Oct 11 14:34:20 2018 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Thu Oct 11 14:34:20 2018 +0300
----------------------------------------------------------------------
.../internal/jdbc2/JdbcConnectionSelfTest.java | 12 +-
.../jdbc/thin/JdbcThinConnectionSelfTest.java | 38 +-
.../jdbc/thin/JdbcThinDataSourceSelfTest.java | 12 +-
.../apache/ignite/IgniteSystemProperties.java | 7 +-
.../ignite/cache/query/SqlFieldsQuery.java | 20 +-
.../jdbc/thin/ConnectionPropertiesImpl.java | 2 +-
.../ignite/internal/jdbc2/JdbcConnection.java | 2 +-
.../ignite/internal/util/IgniteUtils.java | 2 +-
.../query/h2/H2ConnectionWrapper.java | 11 +
.../internal/processors/query/h2/H2Utils.java | 15 +
.../processors/query/h2/IgniteH2Indexing.java | 180 ++++---
.../processors/query/h2/ObjectPool.java | 97 ++++
.../processors/query/h2/ObjectPoolReusable.java | 58 +++
.../query/h2/ThreadLocalObjectPool.java | 103 ----
.../processors/query/h2/dml/UpdatePlan.java | 8 +-
.../query/h2/opt/GridH2QueryContext.java | 33 +-
.../processors/query/h2/opt/GridH2Table.java | 133 ++++-
.../query/h2/twostep/GridMapQueryExecutor.java | 498 ++++++++++---------
.../query/h2/twostep/GridResultPage.java | 7 +-
.../query/h2/twostep/MapNodeResults.java | 13 +-
.../query/h2/twostep/MapQueryLazyWorker.java | 223 +++++++--
.../query/h2/twostep/MapQueryResult.java | 34 +-
.../query/h2/twostep/MapQueryResults.java | 40 +-
...GridCacheLazyQueryPartitionsReleaseTest.java | 2 -
.../IgniteCacheQueryH2IndexingLeakTest.java | 9 +-
...butedQueryStopOnCancelOrTimeoutSelfTest.java | 6 +-
...QueryNodeRestartDistributedJoinSelfTest.java | 14 +-
...ynamicColumnsAbstractConcurrentSelfTest.java | 6 +-
.../cache/index/H2ConnectionLeaksSelfTest.java | 2 +-
.../processors/query/LazyQuerySelfTest.java | 202 +++++++-
.../processors/query/h2/ObjectPoolSelfTest.java | 125 +++++
.../query/h2/ThreadLocalObjectPoolSelfTest.java | 113 -----
.../h2/twostep/RetryCauseMessageSelfTest.java | 16 -
.../IgniteCacheQuerySelfTestSuite.java | 4 +-
.../ignite/cache/query/query_sql_fields.h | 4 +-
.../cpp/odbc-test/src/configuration_test.cpp | 4 +-
.../cpp/odbc/src/config/configuration.cpp | 2 +-
.../Query/Linq/CacheLinqTest.Introspection.cs | 2 +
.../Client/Cache/SqlQueryTest.cs | 4 +-
.../Cache/Query/SqlFieldsQuery.cs | 43 +-
...benchmark-native-sql-cache-select.properties | 96 ++++
.../benchmark-native-sql-select.properties | 17 +-
.../ignite-localhost-sql-query-config.xml | 91 ++++
.../yardstick/IgniteAbstractBenchmark.java | 30 +-
.../yardstick/IgniteBenchmarkArguments.java | 13 +
.../yardstick/jdbc/AbstractNativeBenchmark.java | 3 +
.../apache/ignite/yardstick/jdbc/JdbcUtils.java | 47 +-
.../jdbc/NativeSqlCacheQueryRangeBenchmark.java | 145 ++++++
.../jdbc/NativeSqlQueryRangeBenchmark.java | 13 +-
49 files changed, 1751 insertions(+), 810 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/f97ebff9/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcConnectionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcConnectionSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcConnectionSelfTest.java
index d560d74..db0a959 100644
--- a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcConnectionSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcConnectionSelfTest.java
@@ -308,7 +308,7 @@ public class JdbcConnectionSelfTest extends GridCommonAbstractTest {
assertTrue(((JdbcConnection)conn).isEnforceJoinOrder());
assertFalse(((JdbcConnection)conn).isDistributedJoins());
assertFalse(((JdbcConnection)conn).isCollocatedQuery());
- assertFalse(((JdbcConnection)conn).isLazy());
+ assertTrue(((JdbcConnection)conn).isLazy());
assertFalse(((JdbcConnection)conn).skipReducerOnUpdate());
}
@@ -317,7 +317,7 @@ public class JdbcConnectionSelfTest extends GridCommonAbstractTest {
assertFalse(((JdbcConnection)conn).isEnforceJoinOrder());
assertTrue(((JdbcConnection)conn).isDistributedJoins());
assertFalse(((JdbcConnection)conn).isCollocatedQuery());
- assertFalse(((JdbcConnection)conn).isLazy());
+ assertTrue(((JdbcConnection)conn).isLazy());
assertFalse(((JdbcConnection)conn).skipReducerOnUpdate());
}
@@ -326,15 +326,15 @@ public class JdbcConnectionSelfTest extends GridCommonAbstractTest {
assertFalse(((JdbcConnection)conn).isEnforceJoinOrder());
assertFalse(((JdbcConnection)conn).isDistributedJoins());
assertTrue(((JdbcConnection)conn).isCollocatedQuery());
- assertFalse(((JdbcConnection)conn).isLazy());
+ assertTrue(((JdbcConnection)conn).isLazy());
assertFalse(((JdbcConnection)conn).skipReducerOnUpdate());
}
- try (final Connection conn = DriverManager.getConnection(CFG_URL_PREFIX + "lazy=true@" + configURL())) {
+ try (final Connection conn = DriverManager.getConnection(CFG_URL_PREFIX + "lazy=false@" + configURL())) {
assertFalse(((JdbcConnection)conn).isEnforceJoinOrder());
assertFalse(((JdbcConnection)conn).isDistributedJoins());
assertFalse(((JdbcConnection)conn).isCollocatedQuery());
- assertTrue(((JdbcConnection)conn).isLazy());
+ assertFalse(((JdbcConnection)conn).isLazy());
assertFalse(((JdbcConnection)conn).skipReducerOnUpdate());
}
try (final Connection conn = DriverManager.getConnection(CFG_URL_PREFIX + "skipReducerOnUpdate=true@"
@@ -342,7 +342,7 @@ public class JdbcConnectionSelfTest extends GridCommonAbstractTest {
assertFalse(((JdbcConnection)conn).isEnforceJoinOrder());
assertFalse(((JdbcConnection)conn).isDistributedJoins());
assertFalse(((JdbcConnection)conn).isCollocatedQuery());
- assertFalse(((JdbcConnection)conn).isLazy());
+ assertTrue(((JdbcConnection)conn).isLazy());
assertTrue(((JdbcConnection)conn).skipReducerOnUpdate());
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f97ebff9/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionSelfTest.java
index 80397e6..26c34cf 100644
--- a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionSelfTest.java
@@ -230,36 +230,36 @@ public class JdbcThinConnectionSelfTest extends JdbcThinAbstractSelfTest {
*/
public void testSqlHints() throws Exception {
try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1")) {
- assertHints(conn, false, false, false, false, false, false);
+ assertHints(conn, false, false, false, false, true, false);
}
try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1?distributedJoins=true")) {
- assertHints(conn, true, false, false, false, false, false);
+ assertHints(conn, true, false, false, false, true, false);
}
try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1?enforceJoinOrder=true")) {
- assertHints(conn, false, true, false, false, false, false);
+ assertHints(conn, false, true, false, false, true, false);
}
try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1?collocated=true")) {
- assertHints(conn, false, false, true, false, false, false);
+ assertHints(conn, false, false, true, false, true, false);
}
try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1?replicatedOnly=true")) {
- assertHints(conn, false, false, false, true, false, false);
+ assertHints(conn, false, false, false, true, true, false);
}
- try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1?lazy=true")) {
- assertHints(conn, false, false, false, false, true, false);
+ try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1?lazy=false")) {
+ assertHints(conn, false, false, false, false, false, false);
}
try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1?skipReducerOnUpdate=true")) {
- assertHints(conn, false, false, false, false, false, true);
+ assertHints(conn, false, false, false, false, true, true);
}
try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1?distributedJoins=true&" +
- "enforceJoinOrder=true&collocated=true&replicatedOnly=true&lazy=true&skipReducerOnUpdate=true")) {
- assertHints(conn, true, true, true, true, true, true);
+ "enforceJoinOrder=true&collocated=true&replicatedOnly=true&lazy=false&skipReducerOnUpdate=true")) {
+ assertHints(conn, true, true, true, true, false, true);
}
}
@@ -270,32 +270,32 @@ public class JdbcThinConnectionSelfTest extends JdbcThinAbstractSelfTest {
*/
public void testSqlHintsSemicolon() throws Exception {
try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1;distributedJoins=true")) {
- assertHints(conn, true, false, false, false, false, false);
+ assertHints(conn, true, false, false, false, true, false);
}
try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1;enforceJoinOrder=true")) {
- assertHints(conn, false, true, false, false, false, false);
+ assertHints(conn, false, true, false, false, true, false);
}
try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1;collocated=true")) {
- assertHints(conn, false, false, true, false, false, false);
+ assertHints(conn, false, false, true, false, true, false);
}
try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1;replicatedOnly=true")) {
- assertHints(conn, false, false, false, true, false, false);
+ assertHints(conn, false, false, false, true, true, false);
}
- try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1;lazy=true")) {
- assertHints(conn, false, false, false, false, true, false);
+ try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1;lazy=false")) {
+ assertHints(conn, false, false, false, false, false, false);
}
try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1;skipReducerOnUpdate=true")) {
- assertHints(conn, false, false, false, false, false, true);
+ assertHints(conn, false, false, false, false, true, true);
}
try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1;distributedJoins=true;" +
- "enforceJoinOrder=true;collocated=true;replicatedOnly=true;lazy=true;skipReducerOnUpdate=true")) {
- assertHints(conn, true, true, true, true, true, true);
+ "enforceJoinOrder=true;collocated=true;replicatedOnly=true;lazy=false;skipReducerOnUpdate=true")) {
+ assertHints(conn, true, true, true, true, false, true);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f97ebff9/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinDataSourceSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinDataSourceSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinDataSourceSelfTest.java
index 6040bed..834b4ca 100644
--- a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinDataSourceSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinDataSourceSelfTest.java
@@ -142,15 +142,15 @@ public class JdbcThinDataSourceSelfTest extends JdbcThinAbstractSelfTest {
public void testResetUrl() throws Exception {
IgniteJdbcThinDataSource ids = new IgniteJdbcThinDataSource();
- ids.setUrl("jdbc:ignite:thin://127.0.0.1:10800/test?lazy=true");
+ ids.setUrl("jdbc:ignite:thin://127.0.0.1:10800/test?lazy=false");
assertEquals("test", ids.getSchema());
- assertTrue(ids.isLazy());
+ assertFalse(ids.isLazy());
ids.setUrl("jdbc:ignite:thin://mydomain.org,localhost?collocated=true");
assertNull(ids.getSchema());
- assertFalse(ids.isLazy());
+ assertTrue(ids.isLazy());
assertTrue(ids.isCollocated());
}
@@ -168,7 +168,7 @@ public class JdbcThinDataSourceSelfTest extends JdbcThinAbstractSelfTest {
assertFalse(io.connectionProperties().isAutoCloseServerCursor());
assertFalse(io.connectionProperties().isCollocated());
assertFalse(io.connectionProperties().isEnforceJoinOrder());
- assertFalse(io.connectionProperties().isLazy());
+ assertTrue(io.connectionProperties().isLazy());
assertFalse(io.connectionProperties().isDistributedJoins());
assertFalse(io.connectionProperties().isReplicatedOnly());
}
@@ -176,7 +176,7 @@ public class JdbcThinDataSourceSelfTest extends JdbcThinAbstractSelfTest {
ids.setAutoCloseServerCursor(true);
ids.setCollocated(true);
ids.setEnforceJoinOrder(true);
- ids.setLazy(true);
+ ids.setLazy(false);
ids.setDistributedJoins(true);
ids.setReplicatedOnly(true);
@@ -186,7 +186,7 @@ public class JdbcThinDataSourceSelfTest extends JdbcThinAbstractSelfTest {
assertTrue(io.connectionProperties().isAutoCloseServerCursor());
assertTrue(io.connectionProperties().isCollocated());
assertTrue(io.connectionProperties().isEnforceJoinOrder());
- assertTrue(io.connectionProperties().isLazy());
+ assertFalse(io.connectionProperties().isLazy());
assertTrue(io.connectionProperties().isDistributedJoins());
assertTrue(io.connectionProperties().isReplicatedOnly());
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f97ebff9/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index 892689c..d05bdb0 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -485,7 +485,12 @@ public final class IgniteSystemProperties {
/** Disable fallback to H2 SQL parser if the internal SQL parser fails to parse the statement. */
public static final String IGNITE_SQL_PARSER_DISABLE_H2_FALLBACK = "IGNITE_SQL_PARSER_DISABLE_H2_FALLBACK";
- /** Force all SQL queries to be processed lazily regardless of what clients request. */
+ /**
+ * Force all SQL queries to be processed lazily regardless of what clients request.
+ *
+ * @deprecated Since version 2.7.
+ */
+ @Deprecated
public static final String IGNITE_SQL_FORCE_LAZY_RESULT_SET = "IGNITE_SQL_FORCE_LAZY_RESULT_SET";
/** Disable SQL system views. */
http://git-wip-us.apache.org/repos/asf/ignite/blob/f97ebff9/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java
index 4e12b8c..3e5c706 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java
@@ -71,8 +71,8 @@ public class SqlFieldsQuery extends Query<List<?>> {
/** */
private boolean replicatedOnly;
- /** */
- private boolean lazy;
+ /** Lazy mode is default since Ignite v.2.7. */
+ private boolean lazy = true;
/** Partitions for query */
private int[] parts;
@@ -292,19 +292,24 @@ public class SqlFieldsQuery extends Query<List<?>> {
/**
* Sets lazy query execution flag.
* <p>
- * By default Ignite attempts to fetch the whole query result set to memory and send it to the client. For small
- * and medium result sets this provides optimal performance and minimize duration of internal database locks, thus
- * increasing concurrency.
- * <p>
* If result set is too big to fit in available memory this could lead to excessive GC pauses and even
* OutOfMemoryError. Use this flag as a hint for Ignite to fetch result set lazily, thus minimizing memory
* consumption at the cost of moderate performance hit.
+ * Now lazy mode is optimized for small and medium result set. Small result set means results rows count
+ * less then page size (see {@link #setPageSize}).
* <p>
- * Defaults to {@code false}, meaning that the whole result set is fetched to memory eagerly.
+ * To compatibility with previous version behavior lazy mode may be switched off. In this case Ignite attempts
+ * to fetch the whole query result set to memory and send it to the client.
+ * <p>
+ * Since version 2.7 lazy mode is used by default.
+ * Defaults to {@code true}, meaning that the result set is fetched lazily if it is possible.
*
* @param lazy Lazy query execution flag.
* @return {@code this} For chaining.
+ *
+ * @deprecated Since Ignite 2.7.
*/
+ @Deprecated
public SqlFieldsQuery setLazy(boolean lazy) {
this.lazy = lazy;
@@ -318,6 +323,7 @@ public class SqlFieldsQuery extends Query<List<?>> {
*
* @return Lazy flag.
*/
+ @Deprecated
public boolean isLazy() {
return lazy;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f97ebff9/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/ConnectionPropertiesImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/ConnectionPropertiesImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/ConnectionPropertiesImpl.java
index 51a3837..054807a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/ConnectionPropertiesImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/ConnectionPropertiesImpl.java
@@ -84,7 +84,7 @@ public class ConnectionPropertiesImpl implements ConnectionProperties, Serializa
/** Lazy query execution property. */
private BooleanProperty lazy = new BooleanProperty(
- "lazy", "Enable lazy query execution", false, false);
+ "lazy", "Enable lazy query execution (lazy mode is used by default since v.2.7)", true, false);
/** Socket send buffer size property. */
private IntegerProperty socketSendBuffer = new IntegerProperty(
http://git-wip-us.apache.org/repos/asf/ignite/blob/f97ebff9/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java
index c589c06..481794e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java
@@ -195,7 +195,7 @@ public class JdbcConnection implements Connection {
collocatedQry = Boolean.parseBoolean(props.getProperty(PROP_COLLOCATED));
distributedJoins = Boolean.parseBoolean(props.getProperty(PROP_DISTRIBUTED_JOINS));
enforceJoinOrder = Boolean.parseBoolean(props.getProperty(PROP_ENFORCE_JOIN_ORDER));
- lazy = Boolean.parseBoolean(props.getProperty(PROP_LAZY));
+ lazy = Boolean.parseBoolean(props.getProperty(PROP_LAZY, "true"));
txAllowed = Boolean.parseBoolean(props.getProperty(PROP_TX_ALLOWED));
stream = Boolean.parseBoolean(props.getProperty(PROP_STREAMING));
http://git-wip-us.apache.org/repos/asf/ignite/blob/f97ebff9/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 3ffbb00..5f397d5 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -4054,7 +4054,7 @@ public abstract class IgniteUtils {
rsrc.close();
}
catch (Exception e) {
- warn(log, "Failed to close resource: " + e.getMessage());
+ warn(log, "Failed to close resource: " + e.getMessage(), e);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f97ebff9/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2ConnectionWrapper.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2ConnectionWrapper.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2ConnectionWrapper.java
index 425015a..020cd5e 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2ConnectionWrapper.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2ConnectionWrapper.java
@@ -33,6 +33,9 @@ public class H2ConnectionWrapper implements AutoCloseable {
private final Connection conn;
/** */
+ private final Thread intiThread;
+
+ /** */
private volatile String schema;
/** */
@@ -43,6 +46,7 @@ public class H2ConnectionWrapper implements AutoCloseable {
*/
H2ConnectionWrapper(Connection conn) {
this.conn = conn;
+ intiThread = Thread.currentThread();
initStatementCache();
}
@@ -96,6 +100,13 @@ public class H2ConnectionWrapper implements AutoCloseable {
statementCache = new H2StatementCache(STATEMENT_CACHE_SIZE);
}
+ /**
+ * @return Thread where the connection was created.
+ */
+ public Thread initialThread() {
+ return intiThread;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(H2ConnectionWrapper.class, this);
http://git-wip-us.apache.org/repos/asf/ignite/blob/f97ebff9/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java
index b9d9d8e..074a3e4 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java
@@ -237,10 +237,25 @@ public class H2Utils {
* @param enforceJoinOrder Enforce join order of tables.
*/
public static void setupConnection(Connection conn, boolean distributedJoins, boolean enforceJoinOrder) {
+ setupConnection(conn,distributedJoins, enforceJoinOrder, false);
+ }
+
+ /**
+ * @param conn Connection to use.
+ * @param distributedJoins If distributed joins are enabled.
+ * @param enforceJoinOrder Enforce join order of tables.
+ * @param lazy Lazy query execution mode.
+ */
+ public static void setupConnection(
+ Connection conn,
+ boolean distributedJoins,
+ boolean enforceJoinOrder,
+ boolean lazy) {
Session s = session(conn);
s.setForceJoinOrder(enforceJoinOrder);
s.setJoinBatchEnabled(distributedJoins);
+ s.setLazyQueryExecution(lazy);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/f97ebff9/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 7c5f274..c4d5eea 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -137,7 +137,6 @@ import org.apache.ignite.internal.processors.query.h2.sys.view.SqlSystemViewNode
import org.apache.ignite.internal.processors.query.h2.sys.view.SqlSystemViewNodes;
import org.apache.ignite.internal.processors.query.h2.twostep.GridMapQueryExecutor;
import org.apache.ignite.internal.processors.query.h2.twostep.GridReduceQueryExecutor;
-import org.apache.ignite.internal.processors.query.h2.twostep.MapQueryLazyWorker;
import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryRequest;
import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitor;
import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorClosure;
@@ -298,9 +297,9 @@ public class IgniteH2Indexing implements GridQueryIndexing {
/** */
private String dbUrl = "jdbc:h2:mem:";
- /** */
+ /** All connections are used by Ignite instance. Map of (H2ConnectionWrapper, Boolean) is used as a Set. */
// TODO https://issues.apache.org/jira/browse/IGNITE-9062
- private final ConcurrentMap<Thread, H2ConnectionWrapper> conns = new ConcurrentHashMap<>();
+ private final ConcurrentMap<Thread, ConcurrentHashMap<H2ConnectionWrapper, Boolean>> conns = new ConcurrentHashMap<>();
/** */
private GridMapQueryExecutor mapQryExec;
@@ -328,13 +327,23 @@ public class IgniteH2Indexing implements GridQueryIndexing {
/** */
// TODO https://issues.apache.org/jira/browse/IGNITE-9062
- private final ThreadLocalObjectPool<H2ConnectionWrapper> connectionPool = new ThreadLocalObjectPool<>(IgniteH2Indexing.this::newConnectionWrapper, 5);
+ private final ThreadLocal<ObjectPool<H2ConnectionWrapper>> connectionPool
+ = new ThreadLocal<ObjectPool<H2ConnectionWrapper>>() {
+ @Override protected ObjectPool<H2ConnectionWrapper> initialValue() {
+ return new ObjectPool<>(
+ IgniteH2Indexing.this::newConnectionWrapper,
+ 50,
+ IgniteH2Indexing.this::closePooledConnectionWrapper,
+ IgniteH2Indexing.this::recycleConnection);
+ }
+ };
/** */
// TODO https://issues.apache.org/jira/browse/IGNITE-9062
- private final ThreadLocal<ThreadLocalObjectPool.Reusable<H2ConnectionWrapper>> connCache = new ThreadLocal<ThreadLocalObjectPool.Reusable<H2ConnectionWrapper>>() {
- @Override public ThreadLocalObjectPool.Reusable<H2ConnectionWrapper> get() {
- ThreadLocalObjectPool.Reusable<H2ConnectionWrapper> reusable = super.get();
+ private final ThreadLocal<ObjectPoolReusable<H2ConnectionWrapper>> connCache
+ = new ThreadLocal<ObjectPoolReusable<H2ConnectionWrapper>>() {
+ @Override public ObjectPoolReusable<H2ConnectionWrapper> get() {
+ ObjectPoolReusable<H2ConnectionWrapper> reusable = super.get();
boolean reconnect = true;
@@ -354,10 +363,21 @@ public class IgniteH2Indexing implements GridQueryIndexing {
return reusable;
}
- @Override protected ThreadLocalObjectPool.Reusable<H2ConnectionWrapper> initialValue() {
- ThreadLocalObjectPool.Reusable<H2ConnectionWrapper> reusableConnection = connectionPool.borrow();
+ @Override protected ObjectPoolReusable<H2ConnectionWrapper> initialValue() {
+ ObjectPool<H2ConnectionWrapper> pool = connectionPool.get();
+
+ ObjectPoolReusable<H2ConnectionWrapper> reusableConnection = pool.borrow();
+
+ ConcurrentHashMap<H2ConnectionWrapper, Boolean> perThreadConns = conns.get(Thread.currentThread());
+
+ ConcurrentHashMap<H2ConnectionWrapper, Boolean> newMap = new ConcurrentHashMap<>();
+
+ perThreadConns = conns.putIfAbsent(Thread.currentThread(), newMap);
+
+ if (perThreadConns == null)
+ perThreadConns = newMap;
- conns.put(Thread.currentThread(), reusableConnection.object());
+ perThreadConns.put(reusableConnection.object(), false);
return reusableConnection;
}
@@ -437,16 +457,54 @@ public class IgniteH2Indexing implements GridQueryIndexing {
return sysConn;
}
- /** */
+ /**
+ * @return Connection wrapper.
+ */
private H2ConnectionWrapper newConnectionWrapper() {
try {
- return new H2ConnectionWrapper(DriverManager.getConnection(dbUrl));
+ Connection c = DriverManager.getConnection(dbUrl);
+ return new H2ConnectionWrapper(c);
} catch (SQLException e) {
throw new IgniteSQLException("Failed to initialize DB connection: " + dbUrl, e);
}
}
/**
+ * @param conn Connection wrapper to close.
+ */
+ private void closePooledConnectionWrapper(H2ConnectionWrapper conn) {
+ conns.get(conn.initialThread()).remove(conn);
+
+ U.closeQuiet(conn);
+ }
+
+ /**
+ * Removes from threadlocal cache and returns associated with current thread connection.
+ * @return Connection associated with current thread.
+ */
+ public ObjectPoolReusable<H2ConnectionWrapper> detachConnection() {
+ ObjectPoolReusable<H2ConnectionWrapper> reusableConnection = connCache.get();
+
+ connCache.remove();
+
+ conns.get(Thread.currentThread()).remove(reusableConnection.object());
+
+ return reusableConnection;
+ }
+
+ /**
+ * Return connection to the glob all connection collection.
+ * @param conn Recycled connection.
+ */
+ private void recycleConnection(H2ConnectionWrapper conn) {
+ ConcurrentMap<H2ConnectionWrapper, Boolean> perThreadConns = conns.get(conn.initialThread());
+
+ // Mau be null when node is stopping.
+ if (perThreadConns != null)
+ perThreadConns.put(conn, false);
+ }
+
+ /**
* @param c Connection.
* @param sql SQL.
* @return <b>Cached</b> prepared statement.
@@ -738,12 +796,12 @@ public class IgniteH2Indexing implements GridQueryIndexing {
* Handles SQL exception.
*/
private void onSqlException() {
- Connection conn = connCache.get().object().connection();
+ H2ConnectionWrapper conn = connCache.get().object();
connCache.set(null);
if (conn != null) {
- conns.remove(Thread.currentThread());
+ conns.get(Thread.currentThread()).remove(conn);
// Reset connection to receive new one at next call.
U.close(conn, log);
@@ -1390,32 +1448,14 @@ public class IgniteH2Indexing implements GridQueryIndexing {
*/
private ResultSet executeSqlQuery(final Connection conn, final PreparedStatement stmt,
int timeoutMillis, @Nullable GridQueryCancel cancel) throws IgniteCheckedException {
- final MapQueryLazyWorker lazyWorker = MapQueryLazyWorker.currentWorker();
-
- if (cancel != null) {
- cancel.set(new Runnable() {
- @Override public void run() {
- if (lazyWorker != null) {
- lazyWorker.submit(new Runnable() {
- @Override public void run() {
- cancelStatement(stmt);
- }
- });
- }
- else
- cancelStatement(stmt);
- }
- });
- }
+ if (cancel != null)
+ cancel.set(() -> cancelStatement(stmt));
Session ses = H2Utils.session(conn);
if (timeoutMillis > 0)
ses.setQueryTimeout(timeoutMillis);
- if (lazyWorker != null)
- ses.setLazyQueryExecution(true);
-
try {
return stmt.executeQuery();
}
@@ -1429,9 +1469,6 @@ public class IgniteH2Indexing implements GridQueryIndexing {
finally {
if (timeoutMillis > 0)
ses.setQueryTimeout(0);
-
- if (lazyWorker != null)
- ses.setLazyQueryExecution(false);
}
}
@@ -2541,6 +2578,11 @@ public class IgniteH2Indexing implements GridQueryIndexing {
topVer, mvccSnapshot);
}
+ /**
+ * @param flags Flags holder.
+ * @param flag Flag mask to check.
+ * @return {@code true} if flag is set, otherwise returns {@code false}.
+ */
private boolean isFlagSet(int flags, int flag) {
return (flags & flag) == flag;
}
@@ -3018,18 +3060,24 @@ public class IgniteH2Indexing implements GridQueryIndexing {
private void cleanupStatementCache() {
long now = U.currentTimeMillis();
- for (Iterator<Map.Entry<Thread, H2ConnectionWrapper>> it = conns.entrySet().iterator(); it.hasNext(); ) {
- Map.Entry<Thread, H2ConnectionWrapper> entry = it.next();
+ for (Iterator<Map.Entry<Thread, ConcurrentHashMap<H2ConnectionWrapper, Boolean>>> it
+ = conns.entrySet().iterator(); it.hasNext(); ) {
+ Map.Entry<Thread, ConcurrentHashMap<H2ConnectionWrapper, Boolean>> entry = it.next();
Thread t = entry.getKey();
if (t.getState() == Thread.State.TERMINATED) {
- U.close(entry.getValue(), log);
+ for (H2ConnectionWrapper c : entry.getValue().keySet())
+ U.close(c, log);
it.remove();
}
- else if (now - entry.getValue().statementCache().lastUsage() > STATEMENT_CACHE_THREAD_USAGE_TIMEOUT)
- entry.getValue().clearStatementCache();
+ else {
+ for (H2ConnectionWrapper c : entry.getValue().keySet()) {
+ if (now - c.statementCache().lastUsage() > STATEMENT_CACHE_THREAD_USAGE_TIMEOUT)
+ c.clearStatementCache();
+ }
+ }
}
}
@@ -3037,13 +3085,15 @@ public class IgniteH2Indexing implements GridQueryIndexing {
* Called periodically by {@link GridTimeoutProcessor} to clean up the {@link #conns}.
*/
private void cleanupConnections() {
- for (Iterator<Map.Entry<Thread, H2ConnectionWrapper>> it = conns.entrySet().iterator(); it.hasNext(); ) {
- Map.Entry<Thread, H2ConnectionWrapper> entry = it.next();
+ for (Iterator<Map.Entry<Thread, ConcurrentHashMap<H2ConnectionWrapper, Boolean>>> it
+ = conns.entrySet().iterator(); it.hasNext(); ) {
+ Map.Entry<Thread, ConcurrentHashMap<H2ConnectionWrapper, Boolean>> entry = it.next();
Thread t = entry.getKey();
if (t.getState() == Thread.State.TERMINATED) {
- U.close(entry.getValue(), log);
+ for (H2ConnectionWrapper c : entry.getValue().keySet())
+ U.close(c, log);
it.remove();
}
@@ -3051,24 +3101,6 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
/**
- * Removes from cache and returns associated with current thread connection.
- * @return Connection associated with current thread.
- */
- public ThreadLocalObjectPool.Reusable<H2ConnectionWrapper> detach() {
- Thread key = Thread.currentThread();
-
- ThreadLocalObjectPool.Reusable<H2ConnectionWrapper> reusableConnection = connCache.get();
-
- H2ConnectionWrapper connection = conns.remove(key);
-
- connCache.remove();
-
- assert reusableConnection.object().connection() == connection.connection();
-
- return reusableConnection;
- }
-
- /**
* Rebuild indexes from hash index.
*
* @param cacheName Cache name.
@@ -3433,10 +3465,15 @@ public class IgniteH2Indexing implements GridQueryIndexing {
if (log.isDebugEnabled())
log.debug("Stopping cache query index...");
- mapQryExec.cancelLazyWorkers();
+ mapQryExec.stop();
+
+ for (ConcurrentMap<H2ConnectionWrapper, Boolean> perThreadConns : conns.values()) {
+ for (H2ConnectionWrapper c : perThreadConns.keySet())
+ U.close(c, log);
+ }
- for (H2ConnectionWrapper c : conns.values())
- U.close(c, log);
+ connectionPool.remove();
+ connCache.remove();
conns.clear();
schemas.clear();
@@ -3545,7 +3582,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
}
- conns.values().forEach(H2ConnectionWrapper::clearStatementCache);
+ conns.values().forEach(map -> map.keySet().forEach(H2ConnectionWrapper::clearStatementCache));
for (H2TableDescriptor tbl : rmvTbls) {
for (Index idx : tbl.table().getIndexes())
@@ -3703,10 +3740,10 @@ public class IgniteH2Indexing implements GridQueryIndexing {
/** {@inheritDoc} */
@Override public void cancelAllQueries() {
- mapQryExec.cancelLazyWorkers();
-
- for (H2ConnectionWrapper c : conns.values())
- U.close(c, log);
+ for (ConcurrentHashMap<H2ConnectionWrapper, Boolean> perThreadConns : conns.values()) {
+ for (H2ConnectionWrapper c : perThreadConns.keySet())
+ U.close(c, log);
+ }
}
/**
@@ -3756,6 +3793,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
/**
+ * @param twoStepQry Query.
* @return {@code True} is system views exist.
*/
private boolean hasSystemViews(GridCacheTwoStepQuery twoStepQry) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/f97ebff9/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ObjectPool.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ObjectPool.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ObjectPool.java
new file mode 100644
index 0000000..9d2a580
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ObjectPool.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2;
+
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+/**
+ * Thread-safe pool for managing limited number objects for further reuse.
+ *
+ * @param <E> Pooled objects type.
+ */
+public final class ObjectPool<E extends AutoCloseable> {
+ /** */
+ private final Supplier<E> objectFactory;
+
+ /** */
+ private final ConcurrentLinkedQueue<E> bag = new ConcurrentLinkedQueue<>();
+
+ /** */
+ private final int poolSize;
+
+ /** The function to close object. */
+ private final Consumer<E> closer;
+
+ /** The listener is called when object is returned to the pool. */
+ private final Consumer<E> recycler;
+
+ /**
+ * @param objectFactory Factory used for new objects creation.
+ * @param poolSize Number of objects which pool can contain.
+ * @param closer Function to close object.
+ * @param recycler The listener is called when object is returned to the pool.
+ */
+ public ObjectPool(Supplier<E> objectFactory, int poolSize, Consumer<E> closer, Consumer<E> recycler) {
+ this.objectFactory = objectFactory;
+ this.poolSize = poolSize;
+ this.closer = closer != null ? closer : U::closeQuiet;
+ this.recycler = recycler;
+ }
+
+ /**
+ * Picks an object from the pool if one is present or creates new one otherwise.
+ * Returns an object wrapper which could be returned to the pool.
+ *
+ * @return Reusable object wrapper.
+ */
+ public ObjectPoolReusable<E> borrow() {
+ E pooled = bag.poll();
+
+ return new ObjectPoolReusable<>(this, pooled != null ? pooled : objectFactory.get());
+ }
+
+ /**
+ * Recycles an object.
+ *
+ * @param object Object.
+ */
+ void recycle(E object) {
+ assert object != null : "Already recycled";
+
+ if (bag.size() < poolSize) {
+ bag.add(object);
+
+ if (recycler != null)
+ recycler.accept(object);
+ }
+ else
+ closer.accept(object);
+ }
+
+ /**
+ * Visible for test
+ * @return Pool bag size.
+ */
+ int bagSize() {
+ return bag.size();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f97ebff9/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ObjectPoolReusable.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ObjectPoolReusable.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ObjectPoolReusable.java
new file mode 100644
index 0000000..48fee42
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ObjectPoolReusable.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2;
+
+/**
+ * Wrapper for a pooled object with capability to return the object to a pool.
+ *
+ * @param <T> Enclosed object type.
+ */
+public class ObjectPoolReusable<T extends AutoCloseable> {
+ /** Object pool to recycle. */
+ private final ObjectPool<T> pool;
+
+ /** Detached object. */
+ private T object;
+
+ /**
+ * @param pool Object pool.
+ * @param object Detached object.
+ */
+ ObjectPoolReusable(ObjectPool<T> pool, T object) {
+ this.pool = pool;
+ this.object = object;
+ }
+
+ /**
+ * @return Enclosed object.
+ */
+ public T object() {
+ return object;
+ }
+
+ /**
+ * Returns an object to a pool or closes it if the pool is already full.
+ */
+ public void recycle() {
+ assert object != null : "Already recycled";
+
+ pool.recycle(object);
+
+ object = null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f97ebff9/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ThreadLocalObjectPool.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ThreadLocalObjectPool.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ThreadLocalObjectPool.java
deleted file mode 100644
index 25daa23..0000000
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ThreadLocalObjectPool.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.h2;
-
-import java.util.LinkedList;
-import java.util.Queue;
-import java.util.function.Supplier;
-import org.apache.ignite.internal.util.typedef.internal.U;
-
-/**
- * Special pool for managing limited number objects for further reuse.
- * This pool maintains separate object bag for each thread by means of {@link ThreadLocal}.
- * <p>
- * If object is borrowed on one thread and recycled on different then it will be returned to
- * recycling thread bag. For thread-safe use either pooled objects should be thread-safe or
- * <i>happens-before</i> should be established between borrowing object and subsequent recycling.
- *
- * @param <E> pooled objects type
- */
-public final class ThreadLocalObjectPool<E extends AutoCloseable> {
- /**
- * Wrapper for a pooled object with capability to return the object to a pool.
- *
- * @param <T> enclosed object type
- */
- public static class Reusable<T extends AutoCloseable> {
- /** */
- private final ThreadLocalObjectPool<T> pool;
- /** */
- private final T object;
-
- /** */
- private Reusable(ThreadLocalObjectPool<T> pool, T object) {
- this.pool = pool;
- this.object = object;
- }
-
- /**
- * @return enclosed object
- */
- public T object() {
- return object;
- }
-
- /**
- * Returns an object to a pool or closes it if the pool is already full.
- */
- public void recycle() {
- Queue<Reusable<T>> bag = pool.bag.get();
- if (bag.size() < pool.poolSize)
- bag.add(this);
- else
- U.closeQuiet(object);
- }
- }
-
- /** */
- private final Supplier<E> objectFactory;
- /** */
- private final ThreadLocal<Queue<Reusable<E>>> bag = ThreadLocal.withInitial(LinkedList::new);
- /** */
- private final int poolSize;
-
- /**
- * @param objectFactory factory used for new objects creation
- * @param poolSize number of objects which pool can contain
- */
- public ThreadLocalObjectPool(Supplier<E> objectFactory, int poolSize) {
- this.objectFactory = objectFactory;
- this.poolSize = poolSize;
- }
-
- /**
- * Picks an object from the pool if one is present or creates new one otherwise.
- * Returns an object wrapper which could be returned to the pool.
- *
- * @return reusable object wrapper
- */
- public Reusable<E> borrow() {
- Reusable<E> pooled = bag.get().poll();
- return pooled != null ? pooled : new Reusable<>(this, objectFactory.get());
- }
-
- /** Visible for test */
- int bagSize() {
- return bag.get().size();
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f97ebff9/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java
index ba4b12b..31a444e 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java
@@ -36,7 +36,7 @@ import org.apache.ignite.internal.processors.query.QueryUtils;
import org.apache.ignite.internal.processors.query.UpdateSourceIterator;
import org.apache.ignite.internal.processors.query.h2.H2ConnectionWrapper;
import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
-import org.apache.ignite.internal.processors.query.h2.ThreadLocalObjectPool;
+import org.apache.ignite.internal.processors.query.h2.ObjectPoolReusable;
import org.apache.ignite.internal.processors.query.h2.UpdateResult;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
@@ -623,7 +623,7 @@ public final class UpdatePlan {
private final EnlistOperation op;
/** */
- private volatile ThreadLocalObjectPool.Reusable<H2ConnectionWrapper> conn;
+ private volatile ObjectPoolReusable<H2ConnectionWrapper> conn;
/**
* @param idx Indexing.
@@ -647,7 +647,7 @@ public final class UpdatePlan {
/** {@inheritDoc} */
@Override public void beforeDetach() {
- ThreadLocalObjectPool.Reusable<H2ConnectionWrapper> conn0 = conn = idx.detach();
+ ObjectPoolReusable<H2ConnectionWrapper> conn0 = conn = idx.detachConnection();
if (isClosed())
conn0.recycle();
@@ -657,7 +657,7 @@ public final class UpdatePlan {
@Override protected void onClose() {
cur.close();
- ThreadLocalObjectPool.Reusable<H2ConnectionWrapper> conn0 = conn;
+ ObjectPoolReusable<H2ConnectionWrapper> conn0 = conn;
if (conn0 != null)
conn0.recycle();
http://git-wip-us.apache.org/repos/asf/ignite/blob/f97ebff9/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java
index f12c0f3..9971b78 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java
@@ -18,8 +18,10 @@
package org.apache.ignite.internal.processors.query.h2.opt;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -89,7 +91,7 @@ public class GridH2QueryContext {
private MvccSnapshot mvccSnapshot;
/** */
- private MapQueryLazyWorker lazyWorker;
+ private Set<GridH2Table> lockedTables = new HashSet<>();
/**
* @param locNodeId Local node ID.
@@ -351,7 +353,8 @@ public class GridH2QueryContext {
assert qctx.get() == null;
// We need MAP query context to be available to other threads to run distributed joins.
- if (x.key.type == MAP && x.distributedJoinMode() != OFF && qctxs.putIfAbsent(x.key, x) != null)
+ if (x.key.type == MAP && x.distributedJoinMode() != OFF && qctxs.putIfAbsent(x.key, x) != null
+ && MapQueryLazyWorker.currentWorker() == null)
throw new IllegalStateException("Query context is already set.");
qctx.set(x);
@@ -401,10 +404,7 @@ public class GridH2QueryContext {
assert x.key.equals(key);
- if (x.lazyWorker() != null)
- x.lazyWorker().stop(nodeStop);
- else
- x.clearContext(nodeStop);
+ x.clearContext(nodeStop);
return true;
}
@@ -413,7 +413,10 @@ public class GridH2QueryContext {
* @param nodeStop Node is stopping.
*/
@SuppressWarnings("ForLoopReplaceableByForEach")
- public void clearContext(boolean nodeStop) {
+ public synchronized void clearContext(boolean nodeStop) {
+ if (cleared)
+ return;
+
cleared = true;
List<GridReservable> r = reservations;
@@ -516,20 +519,10 @@ public class GridH2QueryContext {
}
/**
- * @return Lazy worker, if any, or {@code null} if none.
+ * @return The set of tables have been locked by current thread.
*/
- public MapQueryLazyWorker lazyWorker() {
- return lazyWorker;
- }
-
- /**
- * @param lazyWorker Lazy worker, if any, or {@code null} if none.
- * @return {@code this}.
- */
- public GridH2QueryContext lazyWorker(MapQueryLazyWorker lazyWorker) {
- this.lazyWorker = lazyWorker;
-
- return this;
+ public Set<GridH2Table> lockedTables() {
+ return lockedTables;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/f97ebff9/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
index a612b63..709ded7 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
@@ -17,17 +17,6 @@
package org.apache.ignite.internal.processors.query.h2.opt;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.LongAdder;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteInterruptedException;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
@@ -37,7 +26,7 @@ import org.apache.ignite.internal.processors.query.IgniteSQLException;
import org.apache.ignite.internal.processors.query.QueryField;
import org.apache.ignite.internal.processors.query.h2.database.H2RowFactory;
import org.apache.ignite.internal.processors.query.h2.database.H2TreeIndex;
-import org.apache.ignite.internal.processors.query.h2.twostep.GridMapQueryExecutor;
+import org.apache.ignite.internal.processors.query.h2.twostep.MapQueryLazyWorker;
import org.apache.ignite.internal.util.typedef.F;
import org.h2.command.ddl.CreateTableData;
import org.h2.command.dml.Insert;
@@ -58,6 +47,19 @@ import org.h2.table.TableType;
import org.h2.value.DataType;
import org.jetbrains.annotations.Nullable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.LongAdder;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.internal.processors.query.h2.opt.GridH2KeyValueRowOnheap.DEFAULT_COLUMNS_COUNT;
import static org.apache.ignite.internal.processors.query.h2.opt.GridH2KeyValueRowOnheap.KEY_COL;
@@ -90,6 +92,12 @@ public class GridH2Table extends TableBase {
/** */
private final ReadWriteLock lock;
+ /** Number of reading threads which currently move execution from query pool to dedicated thread. */
+ private final AtomicInteger lazyTransferCnt = new AtomicInteger();
+
+ /** Has writer that waits lock in the loop. */
+ private volatile boolean hasWaitedWriter;
+
/** */
private boolean destroyed;
@@ -265,6 +273,11 @@ public class GridH2Table extends TableBase {
ses.addLock(this);
+ GridH2QueryContext qctx = GridH2QueryContext.get();
+
+ if (qctx != null)
+ qctx.lockedTables().add(this);
+
return false;
}
@@ -291,15 +304,44 @@ public class GridH2Table extends TableBase {
Lock l = exclusive ? lock.writeLock() : lock.readLock();
try {
- if (!exclusive || !GridMapQueryExecutor.FORCE_LAZY)
- l.lockInterruptibly();
- else {
+ if (exclusive) {
+ // Attempting to obtain exclusive lock for DDL.
+ // Lock is considered acquired only if "lazyTransferCnt" is zero, meaning that
+ // currently there are no reader threads moving query execution from query
+ // pool to dedicated thread.
+ // It is possible that reader which is currently transferring execution gets
+ // queued after the write lock we are trying to acquire. So we use timed waiting
+ // and a loop to avoid deadlocks.
for (;;) {
- if (l.tryLock(200, TimeUnit.MILLISECONDS))
- break;
- else
- Thread.yield();
+ if (l.tryLock(200, TimeUnit.MILLISECONDS)) {
+ if (lazyTransferCnt.get() == 0)
+ break;
+ else
+ l.unlock();
+ }
+
+ hasWaitedWriter = true;
+
+ Thread.yield();
}
+
+ hasWaitedWriter = false;
+ }
+ else {
+ // Attempt to acquire read lock (query execution, DML, cache update).
+ // If query is being executed inside a query pool, we do not want it to be blocked
+ // for a long time, as it would prevent other queries from being executed. So we
+ // wait a little and then force transfer to dedicated thread by throwing special
+ // timeout exception.GridNioSslSelfTest
+ // If query is not in the query pool, then we simply wait for lock acquisition.
+ if (isSqlNotInLazy()) {
+ if (hasWaitedWriter || !l.tryLock(200, TimeUnit.MILLISECONDS)) {
+ throw new GridH2RetryException("Long wait on Table lock: [tableName=" + getName()
+ + ", hasWaitedWriter=" + hasWaitedWriter + ']');
+ }
+ }
+ else
+ l.lockInterruptibly();
}
}
catch (InterruptedException e) {
@@ -321,6 +363,49 @@ public class GridH2Table extends TableBase {
}
/**
+ * Check if table is being locked in not lazy thread by SQL query.
+ *
+ * @return {@code True} if is in query pool.
+ */
+ private static boolean isSqlNotInLazy() {
+ return GridH2QueryContext.get() != null && MapQueryLazyWorker.currentWorker() == null;
+ }
+
+ /**
+ * Callback invoked when session is to be transferred to lazy thread. In order to prevent concurrent changes
+ * by DDL during move we increment counter before releasing read lock.
+ *
+ * @param ses Session.
+ */
+ public void onLazyTransferStarted(Session ses) {
+ assert sessions.containsKey(ses) : "Detached session have not locked the table: " + getName();
+
+ lazyTransferCnt.incrementAndGet();
+
+ lock.readLock().unlock();
+ }
+
+ /**
+ * Callback invoked when lazy transfer finished. Acquire the lock, decrement transfer counter.
+ *
+ * @param ses Session to detach.
+ */
+ public void onLazyTransferFinished(Session ses) {
+ assert sessions.containsKey(ses) : "Attached session have not locked the table: " + getName();
+
+ try {
+ lock.readLock().lockInterruptibly();
+
+ lazyTransferCnt.decrementAndGet();
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+
+ throw new IgniteInterruptedException("Thread got interrupted while trying to acquire table lock.", e);
+ }
+ }
+
+ /**
* Check if table is not destroyed.
*/
private void ensureNotDestroyed() {
@@ -410,6 +495,11 @@ public class GridH2Table extends TableBase {
if (exclusive == null)
return;
+ GridH2QueryContext qctx = GridH2QueryContext.get();
+
+ if (qctx != null)
+ qctx.lockedTables().remove(this);
+
unlock(exclusive);
}
@@ -949,9 +1039,10 @@ public class GridH2Table extends TableBase {
}
/**
+ * Drop columns.
*
- * @param cols
- * @param ifExists
+ * @param cols Columns.
+ * @param ifExists IF EXISTS flag.
*/
public void dropColumns(List<String> cols, boolean ifExists) {
assert !ifExists || cols.size() == 1;