You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2018/10/11 11:34:29 UTC
[1/3] ignite git commit: IGNITE-9171: SQL: always execute queries in
lazy mode. This closes #4514. This closes #4538. This closes #4870.
Repository: ignite
Updated Branches:
refs/heads/master a3c2ea3bf -> f97ebff9a
http://git-wip-us.apache.org/repos/asf/ignite/blob/f97ebff9/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
index 7633d2a..24ff297 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
@@ -180,8 +180,8 @@ import org.apache.ignite.internal.processors.query.h2.H2ResultSetIteratorNullify
import org.apache.ignite.internal.processors.query.h2.H2StatementCacheSelfTest;
import org.apache.ignite.internal.processors.query.h2.IgniteSqlBigIntegerKeyTest;
import org.apache.ignite.internal.processors.query.h2.IgniteSqlQueryMinMaxTest;
+import org.apache.ignite.internal.processors.query.h2.ObjectPoolSelfTest;
import org.apache.ignite.internal.processors.query.h2.PreparedStatementExSelfTest;
-import org.apache.ignite.internal.processors.query.h2.ThreadLocalObjectPoolSelfTest;
import org.apache.ignite.internal.processors.query.h2.sql.BaseH2CompareQueryTest;
import org.apache.ignite.internal.processors.query.h2.sql.GridQueryParsingTest;
import org.apache.ignite.internal.processors.query.h2.sql.H2CompareBigQueryDistributedJoinsTest;
@@ -475,7 +475,7 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite {
suite.addTestSuite(SqlUserCommandSelfTest.class);
suite.addTestSuite(EncryptedSqlTableTest.class);
- suite.addTestSuite(ThreadLocalObjectPoolSelfTest.class);
+ suite.addTestSuite(ObjectPoolSelfTest.class);
suite.addTestSuite(H2StatementCacheSelfTest.class);
suite.addTestSuite(PreparedStatementExSelfTest.class);
http://git-wip-us.apache.org/repos/asf/ignite/blob/f97ebff9/modules/platforms/cpp/core/include/ignite/cache/query/query_sql_fields.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/cache/query/query_sql_fields.h b/modules/platforms/cpp/core/include/ignite/cache/query/query_sql_fields.h
index 519b2ed..4e4614c 100644
--- a/modules/platforms/cpp/core/include/ignite/cache/query/query_sql_fields.h
+++ b/modules/platforms/cpp/core/include/ignite/cache/query/query_sql_fields.h
@@ -54,7 +54,7 @@ namespace ignite
loc(false),
distributedJoins(false),
enforceJoinOrder(false),
- lazy(false),
+ lazy(true),
args()
{
// No-op.
@@ -73,7 +73,7 @@ namespace ignite
loc(false),
distributedJoins(false),
enforceJoinOrder(false),
- lazy(false),
+ lazy(true),
args()
{
// No-op.
http://git-wip-us.apache.org/repos/asf/ignite/blob/f97ebff9/modules/platforms/cpp/odbc-test/src/configuration_test.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc-test/src/configuration_test.cpp b/modules/platforms/cpp/odbc-test/src/configuration_test.cpp
index 4cfc940..c7014f9 100644
--- a/modules/platforms/cpp/odbc-test/src/configuration_test.cpp
+++ b/modules/platforms/cpp/odbc-test/src/configuration_test.cpp
@@ -46,7 +46,7 @@ namespace
const bool testEnforceJoinOrder = true;
const bool testReplicatedOnly = true;
const bool testCollocated = true;
- const bool testLazy = true;
+ const bool testLazy = false;
const bool testSkipReducerOnUpdate = true;
const std::string testAddressStr = testServerHost + ':' + ignite::common::LexicalCast<std::string>(testServerPort);
@@ -234,7 +234,7 @@ void CheckDsnConfig(const Configuration& cfg)
BOOST_CHECK_EQUAL(cfg.IsEnforceJoinOrder(), false);
BOOST_CHECK_EQUAL(cfg.IsReplicatedOnly(), false);
BOOST_CHECK_EQUAL(cfg.IsCollocated(), false);
- BOOST_CHECK_EQUAL(cfg.IsLazy(), false);
+ BOOST_CHECK_EQUAL(cfg.IsLazy(), true);
BOOST_CHECK_EQUAL(cfg.IsSkipReducerOnUpdate(), false);
BOOST_CHECK(cfg.GetAddresses().empty());
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f97ebff9/modules/platforms/cpp/odbc/src/config/configuration.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc/src/config/configuration.cpp b/modules/platforms/cpp/odbc/src/config/configuration.cpp
index a99894d..6caf86e 100644
--- a/modules/platforms/cpp/odbc/src/config/configuration.cpp
+++ b/modules/platforms/cpp/odbc/src/config/configuration.cpp
@@ -43,7 +43,7 @@ namespace ignite
const bool Configuration::DefaultValue::enforceJoinOrder = false;
const bool Configuration::DefaultValue::replicatedOnly = false;
const bool Configuration::DefaultValue::collocated = false;
- const bool Configuration::DefaultValue::lazy = false;
+ const bool Configuration::DefaultValue::lazy = true;
const bool Configuration::DefaultValue::skipReducerOnUpdate = false;
const ProtocolVersion& Configuration::DefaultValue::protocolVersion = ProtocolVersion::GetCurrent();
http://git-wip-us.apache.org/repos/asf/ignite/blob/f97ebff9/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Query/Linq/CacheLinqTest.Introspection.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Query/Linq/CacheLinqTest.Introspection.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Query/Linq/CacheLinqTest.Introspection.cs
index f5b5baa..67bde2e 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Query/Linq/CacheLinqTest.Introspection.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Query/Linq/CacheLinqTest.Introspection.cs
@@ -103,6 +103,8 @@ namespace Apache.Ignite.Core.Tests.Cache.Query.Linq
#pragma warning restore 618 // Type or member is obsolete
fq = fieldsQuery.GetFieldsQuery();
+ fq.Lazy = false;
+
Assert.AreEqual(GetSqlEscapeAll()
? "select _T0.\"Name\" from PERSON_ORG_SCHEMA.\"Person\" as _T0"
: "select _T0.NAME from PERSON_ORG_SCHEMA.Person as _T0",
http://git-wip-us.apache.org/repos/asf/ignite/blob/f97ebff9/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/SqlQueryTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/SqlQueryTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/SqlQueryTest.cs
index 02d13f6..d99fd156 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/SqlQueryTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/SqlQueryTest.cs
@@ -150,9 +150,9 @@ namespace Apache.Ignite.Core.Tests.Client.Cache
{
var cache = GetClientCache<Person>();
- cache.PutAll(Enumerable.Range(1, 30000).ToDictionary(x => x, x => new Person(x)));
+ cache.PutAll(Enumerable.Range(1, 1000).ToDictionary(x => x, x => new Person(x)));
- var qry = new SqlFieldsQuery("select * from Person where Name like '%ers%'")
+ var qry = new SqlFieldsQuery("select * from Person p0, Person p1, Person p2'")
{
Timeout = TimeSpan.FromMilliseconds(1)
};
http://git-wip-us.apache.org/repos/asf/ignite/blob/f97ebff9/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Query/SqlFieldsQuery.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Query/SqlFieldsQuery.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Query/SqlFieldsQuery.cs
index a93e00d..f81e4ac 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Query/SqlFieldsQuery.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Query/SqlFieldsQuery.cs
@@ -36,22 +36,23 @@ namespace Apache.Ignite.Core.Cache.Query
/// <param name="args">Arguments.</param>
public SqlFieldsQuery(string sql, params object[] args) : this(sql, false, args)
{
- // No-op.
+ Lazy = true;
}
- /// <summary>
- /// Constructor,
- /// </summary>
- /// <param name="sql">SQL.</param>
- /// <param name="loc">Whether query should be executed locally.</param>
- /// <param name="args">Arguments.</param>
- public SqlFieldsQuery(string sql, bool loc, params object[] args)
+ /// <summary>
+ /// Constructor,
+ /// </summary>
+ /// <param name="sql">SQL.</param>
+ /// <param name="loc">Whether query should be executed locally.</param>
+ /// <param name="args">Arguments.</param>
+ public SqlFieldsQuery(string sql, bool loc, params object[] args)
{
Sql = sql;
Local = loc;
Arguments = args;
PageSize = DefaultPageSize;
+ Lazy = true;
}
/// <summary>
@@ -135,18 +136,20 @@ namespace Apache.Ignite.Core.Cache.Query
/// </summary>
public string Schema { get; set; }
- /// <summary>
- /// Gets or sets a value indicating whether this <see cref="SqlFieldsQuery"/> is lazy.
- /// <para />
- /// By default Ignite attempts to fetch the whole query result set to memory and send it to the client.
- /// For small and medium result sets this provides optimal performance and minimize duration of internal
- /// database locks, thus increasing concurrency.
- /// <para />
- /// If result set is too big to fit in available memory this could lead to excessive GC pauses and even
- /// OutOfMemoryError. Use this flag as a hint for Ignite to fetch result set lazily, thus minimizing memory
- /// consumption at the cost of moderate performance hit.
- /// </summary>
- public bool Lazy { get; set; }
+ /// <summary>
+ /// Gets or sets a value indicating whether this <see cref="SqlFieldsQuery"/> is lazy.
+ /// <para />
+ /// When lazy mode is turned off Ignite attempts to fetch the whole query result set to memory and send it to the client.
+ /// For small and medium result sets this provides optimal performance and minimize duration of internal
+ /// database locks, thus increasing concurrency.
+ /// <para />
+ /// If result set is too big to fit in available memory this could lead to excessive GC pauses and even
+ /// OutOfMemoryError. Use this flag as a hint for Ignite to fetch result set lazily, thus minimizing memory
+ /// consumption at the cost of moderate performance hit.
+ /// <para />
+ /// Default mode is lazy since version 2.7. The default value of the flag is changed to 'true'.
+ /// </summary>
+ public bool Lazy { get; set; }
/// <summary>
/// Returns a <see cref="string" /> that represents this instance.
http://git-wip-us.apache.org/repos/asf/ignite/blob/f97ebff9/modules/yardstick/config/benchmark-native-sql-cache-select.properties
----------------------------------------------------------------------
diff --git a/modules/yardstick/config/benchmark-native-sql-cache-select.properties b/modules/yardstick/config/benchmark-native-sql-cache-select.properties
new file mode 100644
index 0000000..cb397db
--- /dev/null
+++ b/modules/yardstick/config/benchmark-native-sql-cache-select.properties
@@ -0,0 +1,96 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+#
+# Contains benchmarks for select queries
+#
+
+now0=`date +'%H%M%S'`
+
+# JVM options.
+JVM_OPTS=${JVM_OPTS}" -DIGNITE_QUIET=false"
+
+# Uncomment to enable concurrent garbage collection (GC) if you encounter long GC pauses.
+JVM_OPTS=${JVM_OPTS}" \
+-Xms8g \
+-Xmx8g \
+-Xloggc:./gc${now0}.log \
+-XX:+PrintGCDetails \
+-verbose:gc \
+-XX:+UseParNewGC \
+-XX:+UseConcMarkSweepGC \
+-XX:+PrintGCDateStamps \
+"
+
+# Add to JVM_OPTS to generate JFR profile.
+#-XX:+UnlockCommercialFeatures \
+#-XX:+FlightRecorder -XX:StartFlightRecording=delay=300s,duration=120s,filename=#filename#.jfr \
+
+#Ignite version
+ver="RELEASE-"
+
+# List of default probes.
+# Add DStatProbe or VmStatProbe if your OS supports it (e.g. if running on Linux).
+BENCHMARK_DEFAULT_PROBES=ThroughputLatencyProbe,PercentileProbe,DStatProbe
+
+# Packages where the specified benchmark is searched by reflection mechanism.
+BENCHMARK_PACKAGES=org.yardstickframework,org.apache.ignite.yardstick
+
+# Flag which indicates to restart the servers before every benchmark execution.
+RESTART_SERVERS=true
+
+# Probe point writer class name.
+# BENCHMARK_WRITER=
+
+# The benchmark is applicable only for 1 server and 1 driver
+SERVER_HOSTS=127.0.0.1,127.0.0.1
+DRIVER_HOSTS=127.0.0.1
+
+# Remote username.
+# REMOTE_USER=
+
+# Number of nodes, used to wait for the specified number of nodes to start.
+nodesNum=$((`echo ${SERVER_HOSTS} | tr ',' '\n' | wc -l` + `echo ${DRIVER_HOSTS} | tr ',' '\n' | wc -l`))
+
+# Backups count.
+b=1
+
+# Warmup.
+w=60
+
+# Duration.
+d=300
+
+# Threads count.
+t=4
+
+# Sync mode.
+sm=PRIMARY_SYNC
+
+# Jobs.
+j=10
+
+# Run configuration which contains all benchmarks.
+# Note that each benchmark is set to run for 300 seconds (5 min) with warm-up set to 60 seconds (1 minute).
+CONFIGS="\
+-cfg ${SCRIPT_DIR}/../config/ignite-localhost-sql-query-config.xml -nn ${nodesNum} -w ${w} -d ${d} -t ${t} -sm ${sm} --sqlRange 1 -dn NativeSqlCacheQueryRangeBenchmark -sn IgniteNode --lazy false -ds ${ver}sql-select-native-r1-${b}-backup,\
+-cfg ${SCRIPT_DIR}/../config/ignite-localhost-sql-query-config.xml -nn ${nodesNum} -w ${w} -d ${d} -t ${t} -sm ${sm} --sqlRange 1000 -dn NativeSqlCacheQueryRangeBenchmark -sn IgniteNode --lazy false -ds ${ver}sql-select-native-r1K-${b}-backup,\
+-cfg ${SCRIPT_DIR}/../config/ignite-localhost-sql-query-config.xml -nn ${nodesNum} -w ${w} -d ${d} -t ${t} -sm ${sm} --sqlRange 2000 -dn NativeSqlCacheQueryRangeBenchmark -sn IgniteNode --lazy false -ds ${ver}sql-select-native-r2K-${b}-backup,\
+-cfg ${SCRIPT_DIR}/../config/ignite-localhost-sql-query-config.xml -nn ${nodesNum} -w ${w} -d ${d} -t ${t} -sm ${sm} --sqlRange 0 -dn NativeSqlCacheQueryRangeBenchmark -sn IgniteNode --lazy false -ds ${ver}sql-select-native-r1M-${b}-backup,\
+-cfg ${SCRIPT_DIR}/../config/ignite-localhost-sql-query-config.xml -nn ${nodesNum} -w ${w} -d ${d} -t ${t} -sm ${sm} --sqlRange 1 -dn NativeSqlCacheQueryRangeBenchmark -sn IgniteNode --lazy true -ds ${ver}sql-select-native-r1-lazy-${b}-backup,\
+-cfg ${SCRIPT_DIR}/../config/ignite-localhost-sql-query-config.xml -nn ${nodesNum} -w ${w} -d ${d} -t ${t} -sm ${sm} --sqlRange 1000 -dn NativeSqlCacheQueryRangeBenchmark -sn IgniteNode --lazy true -ds ${ver}sql-select-native-r1K-lazy-${b}-backup,\
+-cfg ${SCRIPT_DIR}/../config/ignite-localhost-sql-query-config.xml -nn ${nodesNum} -w ${w} -d ${d} -t ${t} -sm ${sm} --sqlRange 2000 -dn NativeSqlCacheQueryRangeBenchmark -sn IgniteNode --lazy true -ds ${ver}sql-select-native-r2K-lazy-${b}-backup,\
+-cfg ${SCRIPT_DIR}/../config/ignite-localhost-sql-query-config.xml -nn ${nodesNum} -w ${w} -d ${d} -t ${t} -sm ${sm} --sqlRange 0 -dn NativeSqlCacheQueryRangeBenchmark -sn IgniteNode --lazy true -ds ${ver}sql-select-native-r1M-lazy-${b}-backup,\
+"
http://git-wip-us.apache.org/repos/asf/ignite/blob/f97ebff9/modules/yardstick/config/benchmark-native-sql-select.properties
----------------------------------------------------------------------
diff --git a/modules/yardstick/config/benchmark-native-sql-select.properties b/modules/yardstick/config/benchmark-native-sql-select.properties
index 0f0b606..23b78be 100644
--- a/modules/yardstick/config/benchmark-native-sql-select.properties
+++ b/modules/yardstick/config/benchmark-native-sql-select.properties
@@ -51,8 +51,8 @@ RESTART_SERVERS=true
# BENCHMARK_WRITER=
# The benchmark is applicable only for 1 server and 1 driver
-SERVER_HOSTS=127.0.0.1
-DRIVER_HOSTS=127.0.0.1
+SERVER_HOSTS=127.0.0.1,127.0.0.1
+DRIVER_HOSTS=127.0.0.1,127.0.0.1,127.0.0.1
# Remote username.
# REMOTE_USER=
@@ -64,7 +64,7 @@ nodesNum=$((`echo ${SERVER_HOSTS} | tr ',' '\n' | wc -l` + `echo ${DRIVER_HOSTS}
b=1
# Warmup.
-w=30
+w=60
# Duration.
d=300
@@ -81,6 +81,13 @@ j=10
# Run configuration which contains all benchmarks.
# Note that each benchmark is set to run for 300 seconds (5 min) with warm-up set to 60 seconds (1 minute).
CONFIGS="\
--cfg ${SCRIPT_DIR}/../config/ignite-localhost-config.xml -nn ${nodesNum} -b ${b} -w ${w} -d ${d} -t ${t} -sm ${sm} --sqlRange 1 -dn NativeSqlQueryRangeBenchmark -sn IgniteNode -ds ${ver}sql-select-native-r1-${b}-backup -cl,\
--cfg ${SCRIPT_DIR}/../config/ignite-localhost-config.xml -nn ${nodesNum} -b ${b} -w ${w} -d ${d} -t ${t} -sm ${sm} --sqlRange 1000 -dn NativeSqlQueryRangeBenchmark -sn IgniteNode -ds ${ver}sql-select-native-r1000-${b}-backup -cl,\
+-cfg ${SCRIPT_DIR}/../config/ignite-localhost-config.xml -nn ${nodesNum} -b ${b} -w ${w} -d ${d} -t ${t} -sm ${sm} --sqlRange 1 -dn NativeSqlQueryRangeBenchmark -sn IgniteNode -ds ${ver}sql-select-native-backups-${b}-r1 -cl --lazy false,\
+-cfg ${SCRIPT_DIR}/../config/ignite-localhost-config.xml -nn ${nodesNum} -b ${b} -w ${w} -d ${d} -t ${t} -sm ${sm} --sqlRange 1000 -dn NativeSqlQueryRangeBenchmark -sn IgniteNode -ds ${ver}sql-select-native-backups-${b}-r1K -cl --lazy false,\
+-cfg ${SCRIPT_DIR}/../config/ignite-localhost-config.xml -nn ${nodesNum} -b ${b} -w ${w} -d ${d} -t ${t} -sm ${sm} --sqlRange 0 -dn NativeSqlQueryRangeBenchmark -sn IgniteNode -ds ${ver}sql-select-native-backups-${b}-r1M -cl --lazy false,\
+-cfg ${SCRIPT_DIR}/../config/ignite-localhost-config.xml -nn ${nodesNum} -b ${b} -w ${w} -d ${d} -t ${t} -sm ${sm} --sqlRange 1 -dn NativeSqlQueryRangeBenchmark -sn IgniteNode -ds ${ver}sql-select-native-backups-${b}-r1-lazy -cl --lazy true,\
+-cfg ${SCRIPT_DIR}/../config/ignite-localhost-config.xml -nn ${nodesNum} -b ${b} -w ${w} -d ${d} -t ${t} -sm ${sm} --sqlRange 1000 -dn NativeSqlQueryRangeBenchmark -sn IgniteNode -ds ${ver}sql-select-native-backups-${b}-r1K-lazy -cl --lazy true,\
+-cfg ${SCRIPT_DIR}/../config/ignite-localhost-config.xml -nn ${nodesNum} -b ${b} -w ${w} -d ${d} -t ${t} -sm ${sm} --sqlRange 0 -dn NativeSqlQueryRangeBenchmark -sn IgniteNode -ds ${ver}sql-select-native-backups-${b}-r1M-lazy -cl --lazy true,\
+"
+CONFIGS="\
+-cfg ${SCRIPT_DIR}/../config/ignite-localhost-config.xml -nn ${nodesNum} -b ${b} -w ${w} -d ${d} -t ${t} -sm ${sm} --sqlRange 1 -dn NativeSqlQueryRangeBenchmark -sn IgniteNode -ds ${ver}sql-select-native-backups-${b}-r1 -cl --lazy false -r 10000,\
"
http://git-wip-us.apache.org/repos/asf/ignite/blob/f97ebff9/modules/yardstick/config/ignite-localhost-sql-query-config.xml
----------------------------------------------------------------------
diff --git a/modules/yardstick/config/ignite-localhost-sql-query-config.xml b/modules/yardstick/config/ignite-localhost-sql-query-config.xml
new file mode 100644
index 0000000..79a4de6
--- /dev/null
+++ b/modules/yardstick/config/ignite-localhost-sql-query-config.xml
@@ -0,0 +1,91 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!--
+ Ignite Spring configuration file to startup grid.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="
+ http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd">
+ <import resource="ignite-base-config.xml"/>
+
+ <bean id="grid.cfg" class="org.apache.ignite.configuration.IgniteConfiguration" parent="base-ignite.cfg">
+ <property name="discoverySpi">
+ <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
+ <property name="ipFinder">
+ <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
+ <property name="addresses">
+ <list>
+ <value>127.0.0.1:47500</value>
+ <value>127.0.0.1:47501</value>
+ <value>127.0.0.1:47502</value>
+ <value>127.0.0.1:47503</value>
+ <value>127.0.0.1:47504</value>
+ <value>127.0.0.1:47505</value>
+ <value>127.0.0.1:47506</value>
+ <value>127.0.0.1:47507</value>
+ <value>127.0.0.1:47508</value>
+ <value>127.0.0.1:47509</value>
+ </list>
+ </property>
+ </bean>
+ </property>
+ </bean>
+ </property>
+
+ <property name="communicationSpi">
+ <bean class="org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi">
+ <property name="sharedMemoryPort" value="-1"/>
+ </bean>
+ </property>
+
+ <property name="cacheConfiguration">
+ <list>
+ <bean class="org.apache.ignite.configuration.CacheConfiguration">
+ <property name="name" value="test"/>
+
+ <property name="backups" value="1"/>
+
+ <property name="cacheMode" value="PARTITIONED"/>
+
+ <property name="atomicityMode" value="ATOMIC"/>
+
+ <property name="queryEntities">
+ <list>
+ <bean class="org.apache.ignite.cache.QueryEntity">
+ <property name="keyType" value="java.lang.Long"/>
+ <property name="valueType" value="TEST_LONG"/>
+ <property name="tableName" value="TEST_LONG"/>
+ <property name="keyFieldName" value="ID"/>
+
+ <property name="fields">
+ <map>
+ <entry key="ID" value="java.lang.Long"/>
+ <entry key="VAL" value="java.lang.Long"/>
+ </map>
+ </property>
+ </bean>
+ </list>
+ </property>
+ </bean>
+ </list>
+ </property>
+ </bean>
+</beans>
http://git-wip-us.apache.org/repos/asf/ignite/blob/f97ebff9/modules/yardstick/src/main/java/org/apache/ignite/yardstick/IgniteAbstractBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/IgniteAbstractBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/IgniteAbstractBenchmark.java
index 5aec308..21442dc 100644
--- a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/IgniteAbstractBenchmark.java
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/IgniteAbstractBenchmark.java
@@ -17,9 +17,10 @@
package org.apache.ignite.yardstick;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCountDownLatch;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteState;
import org.apache.ignite.Ignition;
@@ -29,6 +30,7 @@ import org.yardstickframework.BenchmarkConfiguration;
import org.yardstickframework.BenchmarkDriverAdapter;
import org.yardstickframework.BenchmarkUtils;
+import static org.apache.ignite.events.EventType.EVTS_DISCOVERY;
import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
import static org.yardstickframework.BenchmarkUtils.jcommander;
import static org.yardstickframework.BenchmarkUtils.println;
@@ -37,6 +39,8 @@ import static org.yardstickframework.BenchmarkUtils.println;
* Abstract class for Ignite benchmarks.
*/
public abstract class IgniteAbstractBenchmark extends BenchmarkDriverAdapter {
+ private static final long WAIT_NODES_TIMEOUT = TimeUnit.SECONDS.toMillis(30);
+
/** Arguments. */
protected final IgniteBenchmarkArguments args = new IgniteBenchmarkArguments();
@@ -126,22 +130,30 @@ public abstract class IgniteAbstractBenchmark extends BenchmarkDriverAdapter {
* @throws Exception If failed.
*/
private void waitForNodes() throws Exception {
- final CountDownLatch nodesStartedLatch = new CountDownLatch(1);
+ IgniteCountDownLatch allNodesReady = ignite().countDownLatch("allNodesReady", 1, false, true);
+ // wait for condition when all nodes are ready and release distributed barrier.
ignite().events().localListen(new IgnitePredicate<Event>() {
@Override public boolean apply(Event gridEvt) {
- if (nodesStarted())
- nodesStartedLatch.countDown();
+ if (nodesStarted()) {
+ allNodesReady.countDown();
+ // todo: return false so unregister?
+ }
return true;
}
- }, EVT_NODE_JOINED);
+ }, EVTS_DISCOVERY);
- if (!nodesStarted()) {
- println(cfg, "Waiting for " + (args.nodes() - 1) + " nodes to start...");
+ if (nodesStarted())
+ allNodesReady.countDown();
- nodesStartedLatch.await();
- }
+ // block on distributed barrier till member 0 release it.
+ println(cfg, "Start waiting for cluster to contain " + args.nodes() + ".");
+
+ //todo: timeouts?
+ allNodesReady.await();
+
+ println(cfg, "Cluster is ready.");
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/f97ebff9/modules/yardstick/src/main/java/org/apache/ignite/yardstick/IgniteBenchmarkArguments.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/IgniteBenchmarkArguments.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/IgniteBenchmarkArguments.java
index 3f4fddc..3bd5e87 100644
--- a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/IgniteBenchmarkArguments.java
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/IgniteBenchmarkArguments.java
@@ -289,6 +289,12 @@ public class IgniteBenchmarkArguments {
@GridToStringInclude
public long mvccContentionRange = 10_000;
+ /** */
+ @Parameter(names = {"--lazy"},
+ arity = 1,
+ description = "Lazy mode for SQL query execution (default true).")
+ private boolean lazy = true;
+
/**
* @return {@code True} if need set {@link DataStorageConfiguration}.
*/
@@ -712,6 +718,13 @@ public class IgniteBenchmarkArguments {
return mvccContentionRange;
}
+ /**
+ * @return Lazy query execution mode.
+ */
+ public boolean isLazy() {
+ return lazy;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return GridToStringBuilder.toString(IgniteBenchmarkArguments.class, this);
http://git-wip-us.apache.org/repos/asf/ignite/blob/f97ebff9/modules/yardstick/src/main/java/org/apache/ignite/yardstick/jdbc/AbstractNativeBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/jdbc/AbstractNativeBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/jdbc/AbstractNativeBenchmark.java
index 129e6a4..0f99a9d 100644
--- a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/jdbc/AbstractNativeBenchmark.java
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/jdbc/AbstractNativeBenchmark.java
@@ -21,6 +21,7 @@ import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.yardstick.IgniteAbstractBenchmark;
import org.yardstickframework.BenchmarkConfiguration;
+import org.yardstickframework.BenchmarkUtils;
import static org.apache.ignite.yardstick.jdbc.JdbcUtils.fillData;
@@ -33,5 +34,7 @@ public abstract class AbstractNativeBenchmark extends IgniteAbstractBenchmark {
super.setUp(cfg);
fillData(cfg, (IgniteEx)ignite(), args.range(), args.atomicMode());
+
+ BenchmarkUtils.println("Lazy mode: " + args.isLazy());
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f97ebff9/modules/yardstick/src/main/java/org/apache/ignite/yardstick/jdbc/JdbcUtils.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/jdbc/JdbcUtils.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/jdbc/JdbcUtils.java
index 81d6c17..4466f89 100644
--- a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/jdbc/JdbcUtils.java
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/jdbc/JdbcUtils.java
@@ -17,6 +17,7 @@
package org.apache.ignite.yardstick.jdbc;
+import org.apache.ignite.IgniteSemaphore;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.internal.IgniteEx;
@@ -33,30 +34,46 @@ public class JdbcUtils {
* @param cfg Benchmark configuration.
* @param ignite Ignite node.
* @param range Data key range.
+ * @param atomicMode Cache atomic mode.
*/
public static void fillData(BenchmarkConfiguration cfg, IgniteEx ignite, long range, CacheAtomicityMode atomicMode) {
- println(cfg, "Create table...");
+ IgniteSemaphore sem = ignite.semaphore("jdbc-setup", 1, true, true);
- String withExpr = atomicMode != null ? " WITH \"atomicity=" + atomicMode.name() + "\";" : ";";
+ try {
+ if (sem.tryAcquire()) {
+ println(cfg, "Create table...");
- String qry = "CREATE TABLE test_long (id long primary key, val long)" + withExpr;
+ String withExpr = atomicMode != null ? " WITH \"atomicity=" + atomicMode.name() + "\";" : ";";
- println(cfg, "Creating table with schema: " + qry);
+ String qry = "CREATE TABLE test_long (id long primary key, val long)" + withExpr;
- ignite.context().query().querySqlFields(
- new SqlFieldsQuery(qry), true);
+ println(cfg, "Creating table with schema: " + qry);
- println(cfg, "Populate data...");
+ ignite.context().query().querySqlFields(
+ new SqlFieldsQuery(qry), true);
- for (long l = 1; l <= range; ++l) {
- ignite.context().query().querySqlFields(
- new SqlFieldsQuery("insert into test_long (id, val) values (?, ?)")
- .setArgs(l, l + 1), true);
+ println(cfg, "Populate data...");
- if (l % 10000 == 0)
- println(cfg, "Populate " + l);
- }
+ for (long l = 1; l <= range; ++l) {
+ ignite.context().query().querySqlFields(
+ new SqlFieldsQuery("insert into test_long (id, val) values (?, ?)")
+ .setArgs(l, l + 1), true);
+
+ if (l % 10000 == 0)
+ println(cfg, "Populate " + l);
+ }
+
+ println(cfg, "Finished populating data");
+ }
+ else {
+ // Acquire (wait setup by other client) and immediately release/
+ println(cfg, "Waits for setup...");
- println(cfg, "Finished populating data");
+ sem.acquire();
+ }
+ }
+ finally {
+ sem.release();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f97ebff9/modules/yardstick/src/main/java/org/apache/ignite/yardstick/jdbc/NativeSqlCacheQueryRangeBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/jdbc/NativeSqlCacheQueryRangeBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/jdbc/NativeSqlCacheQueryRangeBenchmark.java
new file mode 100644
index 0000000..e361c9a
--- /dev/null
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/jdbc/NativeSqlCacheQueryRangeBenchmark.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.yardstick.jdbc;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteSemaphore;
+import org.apache.ignite.cache.query.FieldsQueryCursor;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.yardstick.IgniteAbstractBenchmark;
+import org.yardstickframework.BenchmarkConfiguration;
+import org.yardstickframework.BenchmarkUtils;
+
+import static org.apache.ignite.yardstick.jdbc.JdbcUtils.fillData;
+import static org.yardstickframework.BenchmarkUtils.println;
+
+/**
+ * Native sql benchmark that performs select operations.
+ */
+public class NativeSqlCacheQueryRangeBenchmark extends IgniteAbstractBenchmark {
+ private IgniteCache cache;
+
+ /** {@inheritDoc} */
+ @Override public void setUp(BenchmarkConfiguration cfg) throws Exception {
+ super.setUp(cfg);
+
+ fillData();
+
+ BenchmarkUtils.println("Lazy mode: " + args.isLazy());
+ }
+
+ /**
+ *
+ */
+ private void fillData() {
+ IgniteEx ignite = (IgniteEx)ignite();
+
+ IgniteSemaphore sem = ignite.semaphore("data-setup", 1, true, true);
+
+ cache = ignite.getOrCreateCache("test").withKeepBinary();
+
+ try {
+ if (sem.tryAcquire()) {
+
+ println(cfg, "Populate data...");
+
+ for (long l = 1; l <= args.range(); ++l) {
+ cache.query(
+ new SqlFieldsQuery("insert into test_long(id, val) values (?, ?)")
+ .setArgs(l, l + 1));
+
+ if (l % 10000 == 0)
+ println(cfg, "Populate " + l);
+ }
+
+ println(cfg, "Finished populating data");
+ }
+ else {
+ // Acquire (wait setup by other client) and immediately release/
+ println(cfg, "Waits for setup...");
+
+ sem.acquire();
+ }
+ }
+ finally {
+ sem.release();
+ }
+ }
+
+ /**
+ * Benchmarked action that performs selects and validates results.
+ *
+ * {@inheritDoc}
+ */
+ @Override public boolean test(Map<Object, Object> ctx) throws Exception {
+ long expRsSize;
+
+ SqlFieldsQuery qry;
+
+ if (args.sqlRange() == 1) {
+ qry = new SqlFieldsQuery("SELECT id, val FROM test_long WHERE id = ?");
+
+ qry.setArgs(ThreadLocalRandom.current().nextLong(args.range()) + 1);
+
+ expRsSize = 1;
+ }
+ else if (args.sqlRange() <= 0) {
+ qry = new SqlFieldsQuery("SELECT id, val FROM test_long");
+
+ expRsSize = args.range();
+ }
+ else {
+ qry = new SqlFieldsQuery("SELECT id, val FROM test_long WHERE id BETWEEN ? AND ?");
+
+ long id = ThreadLocalRandom.current().nextLong(args.range() - args.sqlRange()) + 1;
+ long maxId = id + args.sqlRange() - 1;
+
+ qry.setArgs(id, maxId);
+
+ expRsSize = args.sqlRange();
+ }
+
+ qry.setLazy(args.isLazy());
+
+ long rsSize = 0;
+
+ try (FieldsQueryCursor<List<?>> cursor = cache.query(qry)) {
+ Iterator<List<?>> it = cursor.iterator();
+
+ while (it.hasNext()) {
+ List<?> row = it.next();
+
+ if ((Long)row.get(0) + 1 != (Long)row.get(1))
+ throw new Exception("Invalid result retrieved");
+
+ rsSize++;
+ }
+ }
+
+ if (rsSize != expRsSize)
+ throw new Exception("Invalid result set size [actual=" + rsSize + ", expected=" + expRsSize + ']');
+
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f97ebff9/modules/yardstick/src/main/java/org/apache/ignite/yardstick/jdbc/NativeSqlQueryRangeBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/jdbc/NativeSqlQueryRangeBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/jdbc/NativeSqlQueryRangeBenchmark.java
index 8dcdda7..33630fd 100644
--- a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/jdbc/NativeSqlQueryRangeBenchmark.java
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/jdbc/NativeSqlQueryRangeBenchmark.java
@@ -17,6 +17,7 @@
package org.apache.ignite.yardstick.jdbc;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
@@ -45,6 +46,11 @@ public class NativeSqlQueryRangeBenchmark extends AbstractNativeBenchmark {
expRsSize = 1;
}
+ else if (args.sqlRange() <= 0) {
+ qry = new SqlFieldsQuery("SELECT id, val FROM test_long");
+
+ expRsSize = args.range();
+ }
else {
qry = new SqlFieldsQuery("SELECT id, val FROM test_long WHERE id BETWEEN ? AND ?");
@@ -56,12 +62,17 @@ public class NativeSqlQueryRangeBenchmark extends AbstractNativeBenchmark {
expRsSize = args.sqlRange();
}
+ qry.setLazy(args.isLazy());
+
long rsSize = 0;
try (FieldsQueryCursor<List<?>> cursor = ((IgniteEx)ignite()).context().query()
.querySqlFields(qry, false)) {
+ Iterator<List<?>> it = cursor.iterator();
+
+ while (it.hasNext()) {
+ List<?> row = it.next();
- for (List<?> row : cursor) {
if ((Long)row.get(0) + 1 != (Long)row.get(1))
throw new Exception("Invalid result retrieved");
[3/3] ignite git commit: IGNITE-9171: SQL: always execute queries in
lazy mode. This closes #4514. This closes #4538. This closes #4870.
Posted by vo...@apache.org.
IGNITE-9171: SQL: always execute queries in lazy mode. This closes #4514. This closes #4538. This closes #4870.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f97ebff9
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f97ebff9
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f97ebff9
Branch: refs/heads/master
Commit: f97ebff9a59514a681258b46ae1b74c1ce4e0a3b
Parents: a3c2ea3
Author: tledkov-gridgain <tl...@gridgain.com>
Authored: Thu Oct 11 14:34:20 2018 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Thu Oct 11 14:34:20 2018 +0300
----------------------------------------------------------------------
.../internal/jdbc2/JdbcConnectionSelfTest.java | 12 +-
.../jdbc/thin/JdbcThinConnectionSelfTest.java | 38 +-
.../jdbc/thin/JdbcThinDataSourceSelfTest.java | 12 +-
.../apache/ignite/IgniteSystemProperties.java | 7 +-
.../ignite/cache/query/SqlFieldsQuery.java | 20 +-
.../jdbc/thin/ConnectionPropertiesImpl.java | 2 +-
.../ignite/internal/jdbc2/JdbcConnection.java | 2 +-
.../ignite/internal/util/IgniteUtils.java | 2 +-
.../query/h2/H2ConnectionWrapper.java | 11 +
.../internal/processors/query/h2/H2Utils.java | 15 +
.../processors/query/h2/IgniteH2Indexing.java | 180 ++++---
.../processors/query/h2/ObjectPool.java | 97 ++++
.../processors/query/h2/ObjectPoolReusable.java | 58 +++
.../query/h2/ThreadLocalObjectPool.java | 103 ----
.../processors/query/h2/dml/UpdatePlan.java | 8 +-
.../query/h2/opt/GridH2QueryContext.java | 33 +-
.../processors/query/h2/opt/GridH2Table.java | 133 ++++-
.../query/h2/twostep/GridMapQueryExecutor.java | 498 ++++++++++---------
.../query/h2/twostep/GridResultPage.java | 7 +-
.../query/h2/twostep/MapNodeResults.java | 13 +-
.../query/h2/twostep/MapQueryLazyWorker.java | 223 +++++++--
.../query/h2/twostep/MapQueryResult.java | 34 +-
.../query/h2/twostep/MapQueryResults.java | 40 +-
...GridCacheLazyQueryPartitionsReleaseTest.java | 2 -
.../IgniteCacheQueryH2IndexingLeakTest.java | 9 +-
...butedQueryStopOnCancelOrTimeoutSelfTest.java | 6 +-
...QueryNodeRestartDistributedJoinSelfTest.java | 14 +-
...ynamicColumnsAbstractConcurrentSelfTest.java | 6 +-
.../cache/index/H2ConnectionLeaksSelfTest.java | 2 +-
.../processors/query/LazyQuerySelfTest.java | 202 +++++++-
.../processors/query/h2/ObjectPoolSelfTest.java | 125 +++++
.../query/h2/ThreadLocalObjectPoolSelfTest.java | 113 -----
.../h2/twostep/RetryCauseMessageSelfTest.java | 16 -
.../IgniteCacheQuerySelfTestSuite.java | 4 +-
.../ignite/cache/query/query_sql_fields.h | 4 +-
.../cpp/odbc-test/src/configuration_test.cpp | 4 +-
.../cpp/odbc/src/config/configuration.cpp | 2 +-
.../Query/Linq/CacheLinqTest.Introspection.cs | 2 +
.../Client/Cache/SqlQueryTest.cs | 4 +-
.../Cache/Query/SqlFieldsQuery.cs | 43 +-
...benchmark-native-sql-cache-select.properties | 96 ++++
.../benchmark-native-sql-select.properties | 17 +-
.../ignite-localhost-sql-query-config.xml | 91 ++++
.../yardstick/IgniteAbstractBenchmark.java | 30 +-
.../yardstick/IgniteBenchmarkArguments.java | 13 +
.../yardstick/jdbc/AbstractNativeBenchmark.java | 3 +
.../apache/ignite/yardstick/jdbc/JdbcUtils.java | 47 +-
.../jdbc/NativeSqlCacheQueryRangeBenchmark.java | 145 ++++++
.../jdbc/NativeSqlQueryRangeBenchmark.java | 13 +-
49 files changed, 1751 insertions(+), 810 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/f97ebff9/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcConnectionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcConnectionSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcConnectionSelfTest.java
index d560d74..db0a959 100644
--- a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcConnectionSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcConnectionSelfTest.java
@@ -308,7 +308,7 @@ public class JdbcConnectionSelfTest extends GridCommonAbstractTest {
assertTrue(((JdbcConnection)conn).isEnforceJoinOrder());
assertFalse(((JdbcConnection)conn).isDistributedJoins());
assertFalse(((JdbcConnection)conn).isCollocatedQuery());
- assertFalse(((JdbcConnection)conn).isLazy());
+ assertTrue(((JdbcConnection)conn).isLazy());
assertFalse(((JdbcConnection)conn).skipReducerOnUpdate());
}
@@ -317,7 +317,7 @@ public class JdbcConnectionSelfTest extends GridCommonAbstractTest {
assertFalse(((JdbcConnection)conn).isEnforceJoinOrder());
assertTrue(((JdbcConnection)conn).isDistributedJoins());
assertFalse(((JdbcConnection)conn).isCollocatedQuery());
- assertFalse(((JdbcConnection)conn).isLazy());
+ assertTrue(((JdbcConnection)conn).isLazy());
assertFalse(((JdbcConnection)conn).skipReducerOnUpdate());
}
@@ -326,15 +326,15 @@ public class JdbcConnectionSelfTest extends GridCommonAbstractTest {
assertFalse(((JdbcConnection)conn).isEnforceJoinOrder());
assertFalse(((JdbcConnection)conn).isDistributedJoins());
assertTrue(((JdbcConnection)conn).isCollocatedQuery());
- assertFalse(((JdbcConnection)conn).isLazy());
+ assertTrue(((JdbcConnection)conn).isLazy());
assertFalse(((JdbcConnection)conn).skipReducerOnUpdate());
}
- try (final Connection conn = DriverManager.getConnection(CFG_URL_PREFIX + "lazy=true@" + configURL())) {
+ try (final Connection conn = DriverManager.getConnection(CFG_URL_PREFIX + "lazy=false@" + configURL())) {
assertFalse(((JdbcConnection)conn).isEnforceJoinOrder());
assertFalse(((JdbcConnection)conn).isDistributedJoins());
assertFalse(((JdbcConnection)conn).isCollocatedQuery());
- assertTrue(((JdbcConnection)conn).isLazy());
+ assertFalse(((JdbcConnection)conn).isLazy());
assertFalse(((JdbcConnection)conn).skipReducerOnUpdate());
}
try (final Connection conn = DriverManager.getConnection(CFG_URL_PREFIX + "skipReducerOnUpdate=true@"
@@ -342,7 +342,7 @@ public class JdbcConnectionSelfTest extends GridCommonAbstractTest {
assertFalse(((JdbcConnection)conn).isEnforceJoinOrder());
assertFalse(((JdbcConnection)conn).isDistributedJoins());
assertFalse(((JdbcConnection)conn).isCollocatedQuery());
- assertFalse(((JdbcConnection)conn).isLazy());
+ assertTrue(((JdbcConnection)conn).isLazy());
assertTrue(((JdbcConnection)conn).skipReducerOnUpdate());
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f97ebff9/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionSelfTest.java
index 80397e6..26c34cf 100644
--- a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionSelfTest.java
@@ -230,36 +230,36 @@ public class JdbcThinConnectionSelfTest extends JdbcThinAbstractSelfTest {
*/
public void testSqlHints() throws Exception {
try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1")) {
- assertHints(conn, false, false, false, false, false, false);
+ assertHints(conn, false, false, false, false, true, false);
}
try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1?distributedJoins=true")) {
- assertHints(conn, true, false, false, false, false, false);
+ assertHints(conn, true, false, false, false, true, false);
}
try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1?enforceJoinOrder=true")) {
- assertHints(conn, false, true, false, false, false, false);
+ assertHints(conn, false, true, false, false, true, false);
}
try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1?collocated=true")) {
- assertHints(conn, false, false, true, false, false, false);
+ assertHints(conn, false, false, true, false, true, false);
}
try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1?replicatedOnly=true")) {
- assertHints(conn, false, false, false, true, false, false);
+ assertHints(conn, false, false, false, true, true, false);
}
- try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1?lazy=true")) {
- assertHints(conn, false, false, false, false, true, false);
+ try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1?lazy=false")) {
+ assertHints(conn, false, false, false, false, false, false);
}
try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1?skipReducerOnUpdate=true")) {
- assertHints(conn, false, false, false, false, false, true);
+ assertHints(conn, false, false, false, false, true, true);
}
try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1?distributedJoins=true&" +
- "enforceJoinOrder=true&collocated=true&replicatedOnly=true&lazy=true&skipReducerOnUpdate=true")) {
- assertHints(conn, true, true, true, true, true, true);
+ "enforceJoinOrder=true&collocated=true&replicatedOnly=true&lazy=false&skipReducerOnUpdate=true")) {
+ assertHints(conn, true, true, true, true, false, true);
}
}
@@ -270,32 +270,32 @@ public class JdbcThinConnectionSelfTest extends JdbcThinAbstractSelfTest {
*/
public void testSqlHintsSemicolon() throws Exception {
try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1;distributedJoins=true")) {
- assertHints(conn, true, false, false, false, false, false);
+ assertHints(conn, true, false, false, false, true, false);
}
try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1;enforceJoinOrder=true")) {
- assertHints(conn, false, true, false, false, false, false);
+ assertHints(conn, false, true, false, false, true, false);
}
try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1;collocated=true")) {
- assertHints(conn, false, false, true, false, false, false);
+ assertHints(conn, false, false, true, false, true, false);
}
try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1;replicatedOnly=true")) {
- assertHints(conn, false, false, false, true, false, false);
+ assertHints(conn, false, false, false, true, true, false);
}
- try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1;lazy=true")) {
- assertHints(conn, false, false, false, false, true, false);
+ try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1;lazy=false")) {
+ assertHints(conn, false, false, false, false, false, false);
}
try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1;skipReducerOnUpdate=true")) {
- assertHints(conn, false, false, false, false, false, true);
+ assertHints(conn, false, false, false, false, true, true);
}
try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1;distributedJoins=true;" +
- "enforceJoinOrder=true;collocated=true;replicatedOnly=true;lazy=true;skipReducerOnUpdate=true")) {
- assertHints(conn, true, true, true, true, true, true);
+ "enforceJoinOrder=true;collocated=true;replicatedOnly=true;lazy=false;skipReducerOnUpdate=true")) {
+ assertHints(conn, true, true, true, true, false, true);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f97ebff9/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinDataSourceSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinDataSourceSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinDataSourceSelfTest.java
index 6040bed..834b4ca 100644
--- a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinDataSourceSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinDataSourceSelfTest.java
@@ -142,15 +142,15 @@ public class JdbcThinDataSourceSelfTest extends JdbcThinAbstractSelfTest {
public void testResetUrl() throws Exception {
IgniteJdbcThinDataSource ids = new IgniteJdbcThinDataSource();
- ids.setUrl("jdbc:ignite:thin://127.0.0.1:10800/test?lazy=true");
+ ids.setUrl("jdbc:ignite:thin://127.0.0.1:10800/test?lazy=false");
assertEquals("test", ids.getSchema());
- assertTrue(ids.isLazy());
+ assertFalse(ids.isLazy());
ids.setUrl("jdbc:ignite:thin://mydomain.org,localhost?collocated=true");
assertNull(ids.getSchema());
- assertFalse(ids.isLazy());
+ assertTrue(ids.isLazy());
assertTrue(ids.isCollocated());
}
@@ -168,7 +168,7 @@ public class JdbcThinDataSourceSelfTest extends JdbcThinAbstractSelfTest {
assertFalse(io.connectionProperties().isAutoCloseServerCursor());
assertFalse(io.connectionProperties().isCollocated());
assertFalse(io.connectionProperties().isEnforceJoinOrder());
- assertFalse(io.connectionProperties().isLazy());
+ assertTrue(io.connectionProperties().isLazy());
assertFalse(io.connectionProperties().isDistributedJoins());
assertFalse(io.connectionProperties().isReplicatedOnly());
}
@@ -176,7 +176,7 @@ public class JdbcThinDataSourceSelfTest extends JdbcThinAbstractSelfTest {
ids.setAutoCloseServerCursor(true);
ids.setCollocated(true);
ids.setEnforceJoinOrder(true);
- ids.setLazy(true);
+ ids.setLazy(false);
ids.setDistributedJoins(true);
ids.setReplicatedOnly(true);
@@ -186,7 +186,7 @@ public class JdbcThinDataSourceSelfTest extends JdbcThinAbstractSelfTest {
assertTrue(io.connectionProperties().isAutoCloseServerCursor());
assertTrue(io.connectionProperties().isCollocated());
assertTrue(io.connectionProperties().isEnforceJoinOrder());
- assertTrue(io.connectionProperties().isLazy());
+ assertFalse(io.connectionProperties().isLazy());
assertTrue(io.connectionProperties().isDistributedJoins());
assertTrue(io.connectionProperties().isReplicatedOnly());
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f97ebff9/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index 892689c..d05bdb0 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -485,7 +485,12 @@ public final class IgniteSystemProperties {
/** Disable fallback to H2 SQL parser if the internal SQL parser fails to parse the statement. */
public static final String IGNITE_SQL_PARSER_DISABLE_H2_FALLBACK = "IGNITE_SQL_PARSER_DISABLE_H2_FALLBACK";
- /** Force all SQL queries to be processed lazily regardless of what clients request. */
+ /**
+ * Force all SQL queries to be processed lazily regardless of what clients request.
+ *
+ * @deprecated Since version 2.7.
+ */
+ @Deprecated
public static final String IGNITE_SQL_FORCE_LAZY_RESULT_SET = "IGNITE_SQL_FORCE_LAZY_RESULT_SET";
/** Disable SQL system views. */
http://git-wip-us.apache.org/repos/asf/ignite/blob/f97ebff9/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java
index 4e12b8c..3e5c706 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java
@@ -71,8 +71,8 @@ public class SqlFieldsQuery extends Query<List<?>> {
/** */
private boolean replicatedOnly;
- /** */
- private boolean lazy;
+ /** Lazy mode is default since Ignite v.2.7. */
+ private boolean lazy = true;
/** Partitions for query */
private int[] parts;
@@ -292,19 +292,24 @@ public class SqlFieldsQuery extends Query<List<?>> {
/**
* Sets lazy query execution flag.
* <p>
- * By default Ignite attempts to fetch the whole query result set to memory and send it to the client. For small
- * and medium result sets this provides optimal performance and minimize duration of internal database locks, thus
- * increasing concurrency.
- * <p>
* If result set is too big to fit in available memory this could lead to excessive GC pauses and even
* OutOfMemoryError. Use this flag as a hint for Ignite to fetch result set lazily, thus minimizing memory
* consumption at the cost of moderate performance hit.
+ * Now lazy mode is optimized for small and medium result set. Small result set means results rows count
+ * less then page size (see {@link #setPageSize}).
* <p>
- * Defaults to {@code false}, meaning that the whole result set is fetched to memory eagerly.
+ * To compatibility with previous version behavior lazy mode may be switched off. In this case Ignite attempts
+ * to fetch the whole query result set to memory and send it to the client.
+ * <p>
+ * Since version 2.7 lazy mode is used by default.
+ * Defaults to {@code true}, meaning that the result set is fetched lazily if it is possible.
*
* @param lazy Lazy query execution flag.
* @return {@code this} For chaining.
+ *
+ * @deprecated Since Ignite 2.7.
*/
+ @Deprecated
public SqlFieldsQuery setLazy(boolean lazy) {
this.lazy = lazy;
@@ -318,6 +323,7 @@ public class SqlFieldsQuery extends Query<List<?>> {
*
* @return Lazy flag.
*/
+ @Deprecated
public boolean isLazy() {
return lazy;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f97ebff9/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/ConnectionPropertiesImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/ConnectionPropertiesImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/ConnectionPropertiesImpl.java
index 51a3837..054807a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/ConnectionPropertiesImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/ConnectionPropertiesImpl.java
@@ -84,7 +84,7 @@ public class ConnectionPropertiesImpl implements ConnectionProperties, Serializa
/** Lazy query execution property. */
private BooleanProperty lazy = new BooleanProperty(
- "lazy", "Enable lazy query execution", false, false);
+ "lazy", "Enable lazy query execution (lazy mode is used by default since v.2.7)", true, false);
/** Socket send buffer size property. */
private IntegerProperty socketSendBuffer = new IntegerProperty(
http://git-wip-us.apache.org/repos/asf/ignite/blob/f97ebff9/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java
index c589c06..481794e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java
@@ -195,7 +195,7 @@ public class JdbcConnection implements Connection {
collocatedQry = Boolean.parseBoolean(props.getProperty(PROP_COLLOCATED));
distributedJoins = Boolean.parseBoolean(props.getProperty(PROP_DISTRIBUTED_JOINS));
enforceJoinOrder = Boolean.parseBoolean(props.getProperty(PROP_ENFORCE_JOIN_ORDER));
- lazy = Boolean.parseBoolean(props.getProperty(PROP_LAZY));
+ lazy = Boolean.parseBoolean(props.getProperty(PROP_LAZY, "true"));
txAllowed = Boolean.parseBoolean(props.getProperty(PROP_TX_ALLOWED));
stream = Boolean.parseBoolean(props.getProperty(PROP_STREAMING));
http://git-wip-us.apache.org/repos/asf/ignite/blob/f97ebff9/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 3ffbb00..5f397d5 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -4054,7 +4054,7 @@ public abstract class IgniteUtils {
rsrc.close();
}
catch (Exception e) {
- warn(log, "Failed to close resource: " + e.getMessage());
+ warn(log, "Failed to close resource: " + e.getMessage(), e);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f97ebff9/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2ConnectionWrapper.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2ConnectionWrapper.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2ConnectionWrapper.java
index 425015a..020cd5e 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2ConnectionWrapper.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2ConnectionWrapper.java
@@ -33,6 +33,9 @@ public class H2ConnectionWrapper implements AutoCloseable {
private final Connection conn;
/** */
+ private final Thread intiThread;
+
+ /** */
private volatile String schema;
/** */
@@ -43,6 +46,7 @@ public class H2ConnectionWrapper implements AutoCloseable {
*/
H2ConnectionWrapper(Connection conn) {
this.conn = conn;
+ intiThread = Thread.currentThread();
initStatementCache();
}
@@ -96,6 +100,13 @@ public class H2ConnectionWrapper implements AutoCloseable {
statementCache = new H2StatementCache(STATEMENT_CACHE_SIZE);
}
+ /**
+ * @return Thread where the connection was created.
+ */
+ public Thread initialThread() {
+ return intiThread;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(H2ConnectionWrapper.class, this);
http://git-wip-us.apache.org/repos/asf/ignite/blob/f97ebff9/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java
index b9d9d8e..074a3e4 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java
@@ -237,10 +237,25 @@ public class H2Utils {
* @param enforceJoinOrder Enforce join order of tables.
*/
public static void setupConnection(Connection conn, boolean distributedJoins, boolean enforceJoinOrder) {
+ setupConnection(conn,distributedJoins, enforceJoinOrder, false);
+ }
+
+ /**
+ * @param conn Connection to use.
+ * @param distributedJoins If distributed joins are enabled.
+ * @param enforceJoinOrder Enforce join order of tables.
+ * @param lazy Lazy query execution mode.
+ */
+ public static void setupConnection(
+ Connection conn,
+ boolean distributedJoins,
+ boolean enforceJoinOrder,
+ boolean lazy) {
Session s = session(conn);
s.setForceJoinOrder(enforceJoinOrder);
s.setJoinBatchEnabled(distributedJoins);
+ s.setLazyQueryExecution(lazy);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/f97ebff9/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 7c5f274..c4d5eea 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -137,7 +137,6 @@ import org.apache.ignite.internal.processors.query.h2.sys.view.SqlSystemViewNode
import org.apache.ignite.internal.processors.query.h2.sys.view.SqlSystemViewNodes;
import org.apache.ignite.internal.processors.query.h2.twostep.GridMapQueryExecutor;
import org.apache.ignite.internal.processors.query.h2.twostep.GridReduceQueryExecutor;
-import org.apache.ignite.internal.processors.query.h2.twostep.MapQueryLazyWorker;
import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryRequest;
import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitor;
import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorClosure;
@@ -298,9 +297,9 @@ public class IgniteH2Indexing implements GridQueryIndexing {
/** */
private String dbUrl = "jdbc:h2:mem:";
- /** */
+ /** All connections are used by Ignite instance. Map of (H2ConnectionWrapper, Boolean) is used as a Set. */
// TODO https://issues.apache.org/jira/browse/IGNITE-9062
- private final ConcurrentMap<Thread, H2ConnectionWrapper> conns = new ConcurrentHashMap<>();
+ private final ConcurrentMap<Thread, ConcurrentHashMap<H2ConnectionWrapper, Boolean>> conns = new ConcurrentHashMap<>();
/** */
private GridMapQueryExecutor mapQryExec;
@@ -328,13 +327,23 @@ public class IgniteH2Indexing implements GridQueryIndexing {
/** */
// TODO https://issues.apache.org/jira/browse/IGNITE-9062
- private final ThreadLocalObjectPool<H2ConnectionWrapper> connectionPool = new ThreadLocalObjectPool<>(IgniteH2Indexing.this::newConnectionWrapper, 5);
+ private final ThreadLocal<ObjectPool<H2ConnectionWrapper>> connectionPool
+ = new ThreadLocal<ObjectPool<H2ConnectionWrapper>>() {
+ @Override protected ObjectPool<H2ConnectionWrapper> initialValue() {
+ return new ObjectPool<>(
+ IgniteH2Indexing.this::newConnectionWrapper,
+ 50,
+ IgniteH2Indexing.this::closePooledConnectionWrapper,
+ IgniteH2Indexing.this::recycleConnection);
+ }
+ };
/** */
// TODO https://issues.apache.org/jira/browse/IGNITE-9062
- private final ThreadLocal<ThreadLocalObjectPool.Reusable<H2ConnectionWrapper>> connCache = new ThreadLocal<ThreadLocalObjectPool.Reusable<H2ConnectionWrapper>>() {
- @Override public ThreadLocalObjectPool.Reusable<H2ConnectionWrapper> get() {
- ThreadLocalObjectPool.Reusable<H2ConnectionWrapper> reusable = super.get();
+ private final ThreadLocal<ObjectPoolReusable<H2ConnectionWrapper>> connCache
+ = new ThreadLocal<ObjectPoolReusable<H2ConnectionWrapper>>() {
+ @Override public ObjectPoolReusable<H2ConnectionWrapper> get() {
+ ObjectPoolReusable<H2ConnectionWrapper> reusable = super.get();
boolean reconnect = true;
@@ -354,10 +363,21 @@ public class IgniteH2Indexing implements GridQueryIndexing {
return reusable;
}
- @Override protected ThreadLocalObjectPool.Reusable<H2ConnectionWrapper> initialValue() {
- ThreadLocalObjectPool.Reusable<H2ConnectionWrapper> reusableConnection = connectionPool.borrow();
+ @Override protected ObjectPoolReusable<H2ConnectionWrapper> initialValue() {
+ ObjectPool<H2ConnectionWrapper> pool = connectionPool.get();
+
+ ObjectPoolReusable<H2ConnectionWrapper> reusableConnection = pool.borrow();
+
+ ConcurrentHashMap<H2ConnectionWrapper, Boolean> perThreadConns = conns.get(Thread.currentThread());
+
+ ConcurrentHashMap<H2ConnectionWrapper, Boolean> newMap = new ConcurrentHashMap<>();
+
+ perThreadConns = conns.putIfAbsent(Thread.currentThread(), newMap);
+
+ if (perThreadConns == null)
+ perThreadConns = newMap;
- conns.put(Thread.currentThread(), reusableConnection.object());
+ perThreadConns.put(reusableConnection.object(), false);
return reusableConnection;
}
@@ -437,16 +457,54 @@ public class IgniteH2Indexing implements GridQueryIndexing {
return sysConn;
}
- /** */
+ /**
+ * @return Connection wrapper.
+ */
private H2ConnectionWrapper newConnectionWrapper() {
try {
- return new H2ConnectionWrapper(DriverManager.getConnection(dbUrl));
+ Connection c = DriverManager.getConnection(dbUrl);
+ return new H2ConnectionWrapper(c);
} catch (SQLException e) {
throw new IgniteSQLException("Failed to initialize DB connection: " + dbUrl, e);
}
}
/**
+ * @param conn Connection wrapper to close.
+ */
+ private void closePooledConnectionWrapper(H2ConnectionWrapper conn) {
+ conns.get(conn.initialThread()).remove(conn);
+
+ U.closeQuiet(conn);
+ }
+
+ /**
+ * Removes from threadlocal cache and returns associated with current thread connection.
+ * @return Connection associated with current thread.
+ */
+ public ObjectPoolReusable<H2ConnectionWrapper> detachConnection() {
+ ObjectPoolReusable<H2ConnectionWrapper> reusableConnection = connCache.get();
+
+ connCache.remove();
+
+ conns.get(Thread.currentThread()).remove(reusableConnection.object());
+
+ return reusableConnection;
+ }
+
+ /**
+ * Return connection to the glob all connection collection.
+ * @param conn Recycled connection.
+ */
+ private void recycleConnection(H2ConnectionWrapper conn) {
+ ConcurrentMap<H2ConnectionWrapper, Boolean> perThreadConns = conns.get(conn.initialThread());
+
+ // Mau be null when node is stopping.
+ if (perThreadConns != null)
+ perThreadConns.put(conn, false);
+ }
+
+ /**
* @param c Connection.
* @param sql SQL.
* @return <b>Cached</b> prepared statement.
@@ -738,12 +796,12 @@ public class IgniteH2Indexing implements GridQueryIndexing {
* Handles SQL exception.
*/
private void onSqlException() {
- Connection conn = connCache.get().object().connection();
+ H2ConnectionWrapper conn = connCache.get().object();
connCache.set(null);
if (conn != null) {
- conns.remove(Thread.currentThread());
+ conns.get(Thread.currentThread()).remove(conn);
// Reset connection to receive new one at next call.
U.close(conn, log);
@@ -1390,32 +1448,14 @@ public class IgniteH2Indexing implements GridQueryIndexing {
*/
private ResultSet executeSqlQuery(final Connection conn, final PreparedStatement stmt,
int timeoutMillis, @Nullable GridQueryCancel cancel) throws IgniteCheckedException {
- final MapQueryLazyWorker lazyWorker = MapQueryLazyWorker.currentWorker();
-
- if (cancel != null) {
- cancel.set(new Runnable() {
- @Override public void run() {
- if (lazyWorker != null) {
- lazyWorker.submit(new Runnable() {
- @Override public void run() {
- cancelStatement(stmt);
- }
- });
- }
- else
- cancelStatement(stmt);
- }
- });
- }
+ if (cancel != null)
+ cancel.set(() -> cancelStatement(stmt));
Session ses = H2Utils.session(conn);
if (timeoutMillis > 0)
ses.setQueryTimeout(timeoutMillis);
- if (lazyWorker != null)
- ses.setLazyQueryExecution(true);
-
try {
return stmt.executeQuery();
}
@@ -1429,9 +1469,6 @@ public class IgniteH2Indexing implements GridQueryIndexing {
finally {
if (timeoutMillis > 0)
ses.setQueryTimeout(0);
-
- if (lazyWorker != null)
- ses.setLazyQueryExecution(false);
}
}
@@ -2541,6 +2578,11 @@ public class IgniteH2Indexing implements GridQueryIndexing {
topVer, mvccSnapshot);
}
+ /**
+ * @param flags Flags holder.
+ * @param flag Flag mask to check.
+ * @return {@code true} if flag is set, otherwise returns {@code false}.
+ */
private boolean isFlagSet(int flags, int flag) {
return (flags & flag) == flag;
}
@@ -3018,18 +3060,24 @@ public class IgniteH2Indexing implements GridQueryIndexing {
private void cleanupStatementCache() {
long now = U.currentTimeMillis();
- for (Iterator<Map.Entry<Thread, H2ConnectionWrapper>> it = conns.entrySet().iterator(); it.hasNext(); ) {
- Map.Entry<Thread, H2ConnectionWrapper> entry = it.next();
+ for (Iterator<Map.Entry<Thread, ConcurrentHashMap<H2ConnectionWrapper, Boolean>>> it
+ = conns.entrySet().iterator(); it.hasNext(); ) {
+ Map.Entry<Thread, ConcurrentHashMap<H2ConnectionWrapper, Boolean>> entry = it.next();
Thread t = entry.getKey();
if (t.getState() == Thread.State.TERMINATED) {
- U.close(entry.getValue(), log);
+ for (H2ConnectionWrapper c : entry.getValue().keySet())
+ U.close(c, log);
it.remove();
}
- else if (now - entry.getValue().statementCache().lastUsage() > STATEMENT_CACHE_THREAD_USAGE_TIMEOUT)
- entry.getValue().clearStatementCache();
+ else {
+ for (H2ConnectionWrapper c : entry.getValue().keySet()) {
+ if (now - c.statementCache().lastUsage() > STATEMENT_CACHE_THREAD_USAGE_TIMEOUT)
+ c.clearStatementCache();
+ }
+ }
}
}
@@ -3037,13 +3085,15 @@ public class IgniteH2Indexing implements GridQueryIndexing {
* Called periodically by {@link GridTimeoutProcessor} to clean up the {@link #conns}.
*/
private void cleanupConnections() {
- for (Iterator<Map.Entry<Thread, H2ConnectionWrapper>> it = conns.entrySet().iterator(); it.hasNext(); ) {
- Map.Entry<Thread, H2ConnectionWrapper> entry = it.next();
+ for (Iterator<Map.Entry<Thread, ConcurrentHashMap<H2ConnectionWrapper, Boolean>>> it
+ = conns.entrySet().iterator(); it.hasNext(); ) {
+ Map.Entry<Thread, ConcurrentHashMap<H2ConnectionWrapper, Boolean>> entry = it.next();
Thread t = entry.getKey();
if (t.getState() == Thread.State.TERMINATED) {
- U.close(entry.getValue(), log);
+ for (H2ConnectionWrapper c : entry.getValue().keySet())
+ U.close(c, log);
it.remove();
}
@@ -3051,24 +3101,6 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
/**
- * Removes from cache and returns associated with current thread connection.
- * @return Connection associated with current thread.
- */
- public ThreadLocalObjectPool.Reusable<H2ConnectionWrapper> detach() {
- Thread key = Thread.currentThread();
-
- ThreadLocalObjectPool.Reusable<H2ConnectionWrapper> reusableConnection = connCache.get();
-
- H2ConnectionWrapper connection = conns.remove(key);
-
- connCache.remove();
-
- assert reusableConnection.object().connection() == connection.connection();
-
- return reusableConnection;
- }
-
- /**
* Rebuild indexes from hash index.
*
* @param cacheName Cache name.
@@ -3433,10 +3465,15 @@ public class IgniteH2Indexing implements GridQueryIndexing {
if (log.isDebugEnabled())
log.debug("Stopping cache query index...");
- mapQryExec.cancelLazyWorkers();
+ mapQryExec.stop();
+
+ for (ConcurrentMap<H2ConnectionWrapper, Boolean> perThreadConns : conns.values()) {
+ for (H2ConnectionWrapper c : perThreadConns.keySet())
+ U.close(c, log);
+ }
- for (H2ConnectionWrapper c : conns.values())
- U.close(c, log);
+ connectionPool.remove();
+ connCache.remove();
conns.clear();
schemas.clear();
@@ -3545,7 +3582,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
}
- conns.values().forEach(H2ConnectionWrapper::clearStatementCache);
+ conns.values().forEach(map -> map.keySet().forEach(H2ConnectionWrapper::clearStatementCache));
for (H2TableDescriptor tbl : rmvTbls) {
for (Index idx : tbl.table().getIndexes())
@@ -3703,10 +3740,10 @@ public class IgniteH2Indexing implements GridQueryIndexing {
/** {@inheritDoc} */
@Override public void cancelAllQueries() {
- mapQryExec.cancelLazyWorkers();
-
- for (H2ConnectionWrapper c : conns.values())
- U.close(c, log);
+ for (ConcurrentHashMap<H2ConnectionWrapper, Boolean> perThreadConns : conns.values()) {
+ for (H2ConnectionWrapper c : perThreadConns.keySet())
+ U.close(c, log);
+ }
}
/**
@@ -3756,6 +3793,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
/**
+ * @param twoStepQry Query.
* @return {@code True} is system views exist.
*/
private boolean hasSystemViews(GridCacheTwoStepQuery twoStepQry) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/f97ebff9/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ObjectPool.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ObjectPool.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ObjectPool.java
new file mode 100644
index 0000000..9d2a580
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ObjectPool.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2;
+
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+/**
+ * Thread-safe pool for managing limited number objects for further reuse.
+ *
+ * @param <E> Pooled objects type.
+ */
+public final class ObjectPool<E extends AutoCloseable> {
+ /** */
+ private final Supplier<E> objectFactory;
+
+ /** */
+ private final ConcurrentLinkedQueue<E> bag = new ConcurrentLinkedQueue<>();
+
+ /** */
+ private final int poolSize;
+
+ /** The function to close object. */
+ private final Consumer<E> closer;
+
+ /** The listener is called when object is returned to the pool. */
+ private final Consumer<E> recycler;
+
+ /**
+ * @param objectFactory Factory used for new objects creation.
+ * @param poolSize Number of objects which pool can contain.
+ * @param closer Function to close object.
+ * @param recycler The listener is called when object is returned to the pool.
+ */
+ public ObjectPool(Supplier<E> objectFactory, int poolSize, Consumer<E> closer, Consumer<E> recycler) {
+ this.objectFactory = objectFactory;
+ this.poolSize = poolSize;
+ this.closer = closer != null ? closer : U::closeQuiet;
+ this.recycler = recycler;
+ }
+
+ /**
+ * Picks an object from the pool if one is present or creates new one otherwise.
+ * Returns an object wrapper which could be returned to the pool.
+ *
+ * @return Reusable object wrapper.
+ */
+ public ObjectPoolReusable<E> borrow() {
+ E pooled = bag.poll();
+
+ return new ObjectPoolReusable<>(this, pooled != null ? pooled : objectFactory.get());
+ }
+
+ /**
+ * Recycles an object.
+ *
+ * @param object Object.
+ */
+ void recycle(E object) {
+ assert object != null : "Already recycled";
+
+ if (bag.size() < poolSize) {
+ bag.add(object);
+
+ if (recycler != null)
+ recycler.accept(object);
+ }
+ else
+ closer.accept(object);
+ }
+
+ /**
+ * Visible for test
+ * @return Pool bag size.
+ */
+ int bagSize() {
+ return bag.size();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f97ebff9/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ObjectPoolReusable.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ObjectPoolReusable.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ObjectPoolReusable.java
new file mode 100644
index 0000000..48fee42
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ObjectPoolReusable.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2;
+
+/**
+ * Wrapper for a pooled object with capability to return the object to a pool.
+ *
+ * @param <T> Enclosed object type.
+ */
+public class ObjectPoolReusable<T extends AutoCloseable> {
+ /** Object pool to recycle. */
+ private final ObjectPool<T> pool;
+
+ /** Detached object. */
+ private T object;
+
+ /**
+ * @param pool Object pool.
+ * @param object Detached object.
+ */
+ ObjectPoolReusable(ObjectPool<T> pool, T object) {
+ this.pool = pool;
+ this.object = object;
+ }
+
+ /**
+ * @return Enclosed object.
+ */
+ public T object() {
+ return object;
+ }
+
+ /**
+ * Returns an object to a pool or closes it if the pool is already full.
+ */
+ public void recycle() {
+ assert object != null : "Already recycled";
+
+ pool.recycle(object);
+
+ object = null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f97ebff9/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ThreadLocalObjectPool.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ThreadLocalObjectPool.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ThreadLocalObjectPool.java
deleted file mode 100644
index 25daa23..0000000
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ThreadLocalObjectPool.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.h2;
-
-import java.util.LinkedList;
-import java.util.Queue;
-import java.util.function.Supplier;
-import org.apache.ignite.internal.util.typedef.internal.U;
-
-/**
- * Special pool for managing limited number objects for further reuse.
- * This pool maintains separate object bag for each thread by means of {@link ThreadLocal}.
- * <p>
- * If object is borrowed on one thread and recycled on different then it will be returned to
- * recycling thread bag. For thread-safe use either pooled objects should be thread-safe or
- * <i>happens-before</i> should be established between borrowing object and subsequent recycling.
- *
- * @param <E> pooled objects type
- */
-public final class ThreadLocalObjectPool<E extends AutoCloseable> {
- /**
- * Wrapper for a pooled object with capability to return the object to a pool.
- *
- * @param <T> enclosed object type
- */
- public static class Reusable<T extends AutoCloseable> {
- /** */
- private final ThreadLocalObjectPool<T> pool;
- /** */
- private final T object;
-
- /** */
- private Reusable(ThreadLocalObjectPool<T> pool, T object) {
- this.pool = pool;
- this.object = object;
- }
-
- /**
- * @return enclosed object
- */
- public T object() {
- return object;
- }
-
- /**
- * Returns an object to a pool or closes it if the pool is already full.
- */
- public void recycle() {
- Queue<Reusable<T>> bag = pool.bag.get();
- if (bag.size() < pool.poolSize)
- bag.add(this);
- else
- U.closeQuiet(object);
- }
- }
-
- /** */
- private final Supplier<E> objectFactory;
- /** */
- private final ThreadLocal<Queue<Reusable<E>>> bag = ThreadLocal.withInitial(LinkedList::new);
- /** */
- private final int poolSize;
-
- /**
- * @param objectFactory factory used for new objects creation
- * @param poolSize number of objects which pool can contain
- */
- public ThreadLocalObjectPool(Supplier<E> objectFactory, int poolSize) {
- this.objectFactory = objectFactory;
- this.poolSize = poolSize;
- }
-
- /**
- * Picks an object from the pool if one is present or creates new one otherwise.
- * Returns an object wrapper which could be returned to the pool.
- *
- * @return reusable object wrapper
- */
- public Reusable<E> borrow() {
- Reusable<E> pooled = bag.get().poll();
- return pooled != null ? pooled : new Reusable<>(this, objectFactory.get());
- }
-
- /** Visible for test */
- int bagSize() {
- return bag.get().size();
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f97ebff9/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java
index ba4b12b..31a444e 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java
@@ -36,7 +36,7 @@ import org.apache.ignite.internal.processors.query.QueryUtils;
import org.apache.ignite.internal.processors.query.UpdateSourceIterator;
import org.apache.ignite.internal.processors.query.h2.H2ConnectionWrapper;
import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
-import org.apache.ignite.internal.processors.query.h2.ThreadLocalObjectPool;
+import org.apache.ignite.internal.processors.query.h2.ObjectPoolReusable;
import org.apache.ignite.internal.processors.query.h2.UpdateResult;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
@@ -623,7 +623,7 @@ public final class UpdatePlan {
private final EnlistOperation op;
/** */
- private volatile ThreadLocalObjectPool.Reusable<H2ConnectionWrapper> conn;
+ private volatile ObjectPoolReusable<H2ConnectionWrapper> conn;
/**
* @param idx Indexing.
@@ -647,7 +647,7 @@ public final class UpdatePlan {
/** {@inheritDoc} */
@Override public void beforeDetach() {
- ThreadLocalObjectPool.Reusable<H2ConnectionWrapper> conn0 = conn = idx.detach();
+ ObjectPoolReusable<H2ConnectionWrapper> conn0 = conn = idx.detachConnection();
if (isClosed())
conn0.recycle();
@@ -657,7 +657,7 @@ public final class UpdatePlan {
@Override protected void onClose() {
cur.close();
- ThreadLocalObjectPool.Reusable<H2ConnectionWrapper> conn0 = conn;
+ ObjectPoolReusable<H2ConnectionWrapper> conn0 = conn;
if (conn0 != null)
conn0.recycle();
http://git-wip-us.apache.org/repos/asf/ignite/blob/f97ebff9/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java
index f12c0f3..9971b78 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java
@@ -18,8 +18,10 @@
package org.apache.ignite.internal.processors.query.h2.opt;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -89,7 +91,7 @@ public class GridH2QueryContext {
private MvccSnapshot mvccSnapshot;
/** */
- private MapQueryLazyWorker lazyWorker;
+ private Set<GridH2Table> lockedTables = new HashSet<>();
/**
* @param locNodeId Local node ID.
@@ -351,7 +353,8 @@ public class GridH2QueryContext {
assert qctx.get() == null;
// We need MAP query context to be available to other threads to run distributed joins.
- if (x.key.type == MAP && x.distributedJoinMode() != OFF && qctxs.putIfAbsent(x.key, x) != null)
+ if (x.key.type == MAP && x.distributedJoinMode() != OFF && qctxs.putIfAbsent(x.key, x) != null
+ && MapQueryLazyWorker.currentWorker() == null)
throw new IllegalStateException("Query context is already set.");
qctx.set(x);
@@ -401,10 +404,7 @@ public class GridH2QueryContext {
assert x.key.equals(key);
- if (x.lazyWorker() != null)
- x.lazyWorker().stop(nodeStop);
- else
- x.clearContext(nodeStop);
+ x.clearContext(nodeStop);
return true;
}
@@ -413,7 +413,10 @@ public class GridH2QueryContext {
* @param nodeStop Node is stopping.
*/
@SuppressWarnings("ForLoopReplaceableByForEach")
- public void clearContext(boolean nodeStop) {
+ public synchronized void clearContext(boolean nodeStop) {
+ if (cleared)
+ return;
+
cleared = true;
List<GridReservable> r = reservations;
@@ -516,20 +519,10 @@ public class GridH2QueryContext {
}
/**
- * @return Lazy worker, if any, or {@code null} if none.
+ * @return The set of tables have been locked by current thread.
*/
- public MapQueryLazyWorker lazyWorker() {
- return lazyWorker;
- }
-
- /**
- * @param lazyWorker Lazy worker, if any, or {@code null} if none.
- * @return {@code this}.
- */
- public GridH2QueryContext lazyWorker(MapQueryLazyWorker lazyWorker) {
- this.lazyWorker = lazyWorker;
-
- return this;
+ public Set<GridH2Table> lockedTables() {
+ return lockedTables;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/f97ebff9/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
index a612b63..709ded7 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
@@ -17,17 +17,6 @@
package org.apache.ignite.internal.processors.query.h2.opt;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.LongAdder;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteInterruptedException;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
@@ -37,7 +26,7 @@ import org.apache.ignite.internal.processors.query.IgniteSQLException;
import org.apache.ignite.internal.processors.query.QueryField;
import org.apache.ignite.internal.processors.query.h2.database.H2RowFactory;
import org.apache.ignite.internal.processors.query.h2.database.H2TreeIndex;
-import org.apache.ignite.internal.processors.query.h2.twostep.GridMapQueryExecutor;
+import org.apache.ignite.internal.processors.query.h2.twostep.MapQueryLazyWorker;
import org.apache.ignite.internal.util.typedef.F;
import org.h2.command.ddl.CreateTableData;
import org.h2.command.dml.Insert;
@@ -58,6 +47,19 @@ import org.h2.table.TableType;
import org.h2.value.DataType;
import org.jetbrains.annotations.Nullable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.LongAdder;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.internal.processors.query.h2.opt.GridH2KeyValueRowOnheap.DEFAULT_COLUMNS_COUNT;
import static org.apache.ignite.internal.processors.query.h2.opt.GridH2KeyValueRowOnheap.KEY_COL;
@@ -90,6 +92,12 @@ public class GridH2Table extends TableBase {
/** */
private final ReadWriteLock lock;
+ /** Number of reading threads which currently move execution from query pool to dedicated thread. */
+ private final AtomicInteger lazyTransferCnt = new AtomicInteger();
+
+ /** Has writer that waits lock in the loop. */
+ private volatile boolean hasWaitedWriter;
+
/** */
private boolean destroyed;
@@ -265,6 +273,11 @@ public class GridH2Table extends TableBase {
ses.addLock(this);
+ GridH2QueryContext qctx = GridH2QueryContext.get();
+
+ if (qctx != null)
+ qctx.lockedTables().add(this);
+
return false;
}
@@ -291,15 +304,44 @@ public class GridH2Table extends TableBase {
Lock l = exclusive ? lock.writeLock() : lock.readLock();
try {
- if (!exclusive || !GridMapQueryExecutor.FORCE_LAZY)
- l.lockInterruptibly();
- else {
+ if (exclusive) {
+ // Attempting to obtain exclusive lock for DDL.
+ // Lock is considered acquired only if "lazyTransferCnt" is zero, meaning that
+ // currently there are no reader threads moving query execution from query
+ // pool to dedicated thread.
+ // It is possible that reader which is currently transferring execution gets
+ // queued after the write lock we are trying to acquire. So we use timed waiting
+ // and a loop to avoid deadlocks.
for (;;) {
- if (l.tryLock(200, TimeUnit.MILLISECONDS))
- break;
- else
- Thread.yield();
+ if (l.tryLock(200, TimeUnit.MILLISECONDS)) {
+ if (lazyTransferCnt.get() == 0)
+ break;
+ else
+ l.unlock();
+ }
+
+ hasWaitedWriter = true;
+
+ Thread.yield();
}
+
+ hasWaitedWriter = false;
+ }
+ else {
+ // Attempt to acquire read lock (query execution, DML, cache update).
+ // If query is being executed inside a query pool, we do not want it to be blocked
+ // for a long time, as it would prevent other queries from being executed. So we
+ // wait a little and then force transfer to dedicated thread by throwing special
+ // timeout exception.GridNioSslSelfTest
+ // If query is not in the query pool, then we simply wait for lock acquisition.
+ if (isSqlNotInLazy()) {
+ if (hasWaitedWriter || !l.tryLock(200, TimeUnit.MILLISECONDS)) {
+ throw new GridH2RetryException("Long wait on Table lock: [tableName=" + getName()
+ + ", hasWaitedWriter=" + hasWaitedWriter + ']');
+ }
+ }
+ else
+ l.lockInterruptibly();
}
}
catch (InterruptedException e) {
@@ -321,6 +363,49 @@ public class GridH2Table extends TableBase {
}
/**
+ * Check if table is being locked in not lazy thread by SQL query.
+ *
+ * @return {@code True} if is in query pool.
+ */
+ private static boolean isSqlNotInLazy() {
+ return GridH2QueryContext.get() != null && MapQueryLazyWorker.currentWorker() == null;
+ }
+
+ /**
+ * Callback invoked when session is to be transferred to lazy thread. In order to prevent concurrent changes
+ * by DDL during move we increment counter before releasing read lock.
+ *
+ * @param ses Session.
+ */
+ public void onLazyTransferStarted(Session ses) {
+ assert sessions.containsKey(ses) : "Detached session have not locked the table: " + getName();
+
+ lazyTransferCnt.incrementAndGet();
+
+ lock.readLock().unlock();
+ }
+
+ /**
+ * Callback invoked when lazy transfer finished. Acquire the lock, decrement transfer counter.
+ *
+ * @param ses Session to detach.
+ */
+ public void onLazyTransferFinished(Session ses) {
+ assert sessions.containsKey(ses) : "Attached session have not locked the table: " + getName();
+
+ try {
+ lock.readLock().lockInterruptibly();
+
+ lazyTransferCnt.decrementAndGet();
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+
+ throw new IgniteInterruptedException("Thread got interrupted while trying to acquire table lock.", e);
+ }
+ }
+
+ /**
* Check if table is not destroyed.
*/
private void ensureNotDestroyed() {
@@ -410,6 +495,11 @@ public class GridH2Table extends TableBase {
if (exclusive == null)
return;
+ GridH2QueryContext qctx = GridH2QueryContext.get();
+
+ if (qctx != null)
+ qctx.lockedTables().remove(this);
+
unlock(exclusive);
}
@@ -949,9 +1039,10 @@ public class GridH2Table extends TableBase {
}
/**
+ * Drop columns.
*
- * @param cols
- * @param ifExists
+ * @param cols Columns.
+ * @param ifExists IF EXISTS flag.
*/
public void dropColumns(List<String> cols, boolean ifExists) {
assert !ifExists || cols.size() == 1;
[2/3] ignite git commit: IGNITE-9171: SQL: always execute queries in
lazy mode. This closes #4514. This closes #4538. This closes #4870.
Posted by vo...@apache.org.
http://git-wip-us.apache.org/repos/asf/ignite/blob/f97ebff9/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index f228111..9b7d268 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -40,7 +40,6 @@ import javax.cache.CacheException;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cache.PartitionLossPolicy;
import org.apache.ignite.cache.query.QueryCancelledException;
import org.apache.ignite.cache.query.SqlFieldsQuery;
@@ -58,12 +57,12 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheInvalidStateException;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.distributed.dht.CompoundLockFuture;
-import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
-import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
-import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionsReservation;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTransactionalCacheAdapter;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxLocalAdapter;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionsReservation;
import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
import org.apache.ignite.internal.processors.cache.mvcc.MvccUtils;
import org.apache.ignite.internal.processors.cache.query.CacheQueryType;
@@ -71,8 +70,10 @@ import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshalla
import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
import org.apache.ignite.internal.processors.query.GridQueryCancel;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
+import org.apache.ignite.internal.processors.query.h2.H2ConnectionWrapper;
import org.apache.ignite.internal.processors.query.h2.H2Utils;
import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
+import org.apache.ignite.internal.processors.query.h2.ObjectPoolReusable;
import org.apache.ignite.internal.processors.query.h2.ResultSetEnlistFuture;
import org.apache.ignite.internal.processors.query.h2.UpdateResult;
import org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode;
@@ -97,13 +98,13 @@ import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.spi.indexing.IndexingQueryFilter;
-import org.apache.ignite.thread.IgniteThread;
+import org.h2.api.ErrorCode;
import org.h2.command.Prepared;
import org.h2.jdbc.JdbcResultSet;
+import org.h2.jdbc.JdbcSQLException;
import org.h2.value.Value;
import org.jetbrains.annotations.Nullable;
-import static org.apache.ignite.IgniteSystemProperties.IGNITE_SQL_FORCE_LAZY_RESULT_SET;
import static org.apache.ignite.cache.PartitionLossPolicy.READ_ONLY_SAFE;
import static org.apache.ignite.cache.PartitionLossPolicy.READ_WRITE_SAFE;
import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_EXECUTED;
@@ -123,9 +124,6 @@ import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2V
@SuppressWarnings("ForLoopReplaceableByForEach")
public class GridMapQueryExecutor {
/** */
- public static final boolean FORCE_LAZY = IgniteSystemProperties.getBoolean(IGNITE_SQL_FORCE_LAZY_RESULT_SET);
-
- /** */
private IgniteLogger log;
/** */
@@ -149,8 +147,8 @@ public class GridMapQueryExecutor {
/** Busy lock for lazy workers. */
private final GridSpinBusyLock lazyWorkerBusyLock = new GridSpinBusyLock();
- /** Lazy worker stop guard. */
- private final AtomicBoolean lazyWorkerStopGuard = new AtomicBoolean();
+ /** Stop guard. */
+ private final AtomicBoolean stopGuard = new AtomicBoolean();
/**
* @param busyLock Busy lock.
@@ -207,18 +205,21 @@ public class GridMapQueryExecutor {
}
/**
- * Cancel active lazy queries and prevent submit of new queries.
+ * Stop query map executor, cleanup resources.
*/
- public void cancelLazyWorkers() {
- if (!lazyWorkerStopGuard.compareAndSet(false, true))
+ public void stop() {
+ if (!stopGuard.compareAndSet(false, true))
return;
- lazyWorkerBusyLock.block();
+ for (MapNodeResults res : qryRess.values())
+ res.cancelAll();
- for (MapQueryLazyWorker worker : lazyWorkers.values())
- worker.stop(false);
+ for (MapQueryLazyWorker w : lazyWorkers.values())
+ w.stop(true);
- lazyWorkers.clear();
+ lazyWorkerBusyLock.block();
+
+ assert lazyWorkers.isEmpty() : "Not cleaned lazy workers: " + lazyWorkers.size();
}
/**
@@ -259,7 +260,7 @@ public class GridMapQueryExecutor {
* @return Busy lock for lazy workers to guard their operations with.
*/
GridSpinBusyLock busyLock() {
- return busyLock;
+ return lazyWorkerBusyLock;
}
/**
@@ -554,6 +555,7 @@ public class GridMapQueryExecutor {
/**
* @param node Node.
* @param req Query request.
+ * @throws IgniteCheckedException On error.
*/
private void onQueryRequest(final ClusterNode node, final GridH2QueryRequest req) throws IgniteCheckedException {
int[] qryParts = req.queryPartitions();
@@ -566,10 +568,14 @@ public class GridMapQueryExecutor {
req.isFlagSet(GridH2QueryRequest.FLAG_IS_LOCAL),
req.isFlagSet(GridH2QueryRequest.FLAG_DISTRIBUTED_JOINS));
+ final GridDhtTxLocalAdapter tx;
+
+ GridH2SelectForUpdateTxDetails txReq = req.txDetails();
+
final boolean enforceJoinOrder = req.isFlagSet(GridH2QueryRequest.FLAG_ENFORCE_JOIN_ORDER);
final boolean explain = req.isFlagSet(GridH2QueryRequest.FLAG_EXPLAIN);
final boolean replicated = req.isFlagSet(GridH2QueryRequest.FLAG_REPLICATED);
- final boolean lazy = (FORCE_LAZY && req.queries().size() == 1) || req.isFlagSet(GridH2QueryRequest.FLAG_LAZY);
+ final boolean lazy = req.isFlagSet(GridH2QueryRequest.FLAG_LAZY) && txReq == null;
final List<Integer> cacheIds = req.caches();
@@ -578,10 +584,6 @@ public class GridMapQueryExecutor {
final Object[] params = req.parameters();
- final GridDhtTxLocalAdapter tx;
-
- GridH2SelectForUpdateTxDetails txReq = req.txDetails();
-
try {
if (txReq != null) {
// Prepare to run queries.
@@ -736,7 +738,11 @@ public class GridMapQueryExecutor {
* @param parts Explicit partitions for current node.
* @param pageSize Page size.
* @param distributedJoinMode Query distributed join mode.
- * @param lazy Streaming flag.
+ * @param enforceJoinOrder Enforce join order flag.
+ * @param replicated Replicated flag.
+ * @param timeout Query timeout.
+ * @param params Query params.
+ * @param lazy Lazy query execution flag.
* @param mvccSnapshot MVCC snapshot.
* @param tx Transaction.
* @param txDetails TX details, if it's a {@code FOR UPDATE} request, or {@code null}.
@@ -765,75 +771,24 @@ public class GridMapQueryExecutor {
@Nullable final GridH2SelectForUpdateTxDetails txDetails,
@Nullable final CompoundLockFuture lockFut,
@Nullable final AtomicInteger runCntr) {
- MapQueryLazyWorker worker = MapQueryLazyWorker.currentWorker();
-
// In presence of TX, we also must always have matching details.
assert tx == null || txDetails != null;
- boolean inTx = (tx != null);
-
- if (lazy && worker == null) {
- // Lazy queries must be re-submitted to dedicated workers.
- MapQueryLazyWorkerKey key = new MapQueryLazyWorkerKey(node.id(), reqId, segmentId);
- worker = new MapQueryLazyWorker(ctx.igniteInstanceName(), key, log, this);
-
- worker.submit(new Runnable() {
- @Override public void run() {
- onQueryRequest0(
- node,
- reqId,
- segmentId,
- schemaName,
- qrys,
- cacheIds,
- topVer,
- partsMap,
- parts,
- pageSize,
- distributedJoinMode,
- enforceJoinOrder,
- replicated,
- timeout,
- params,
- true,
- mvccSnapshot,
- tx,
- txDetails,
- lockFut,
- runCntr);
- }
- });
-
- if (lazyWorkerBusyLock.enterBusy()) {
- try {
- MapQueryLazyWorker oldWorker = lazyWorkers.put(key, worker);
-
- if (oldWorker != null)
- oldWorker.stop(false);
+ assert !lazy || txDetails == null : "Lazy execution of SELECT FOR UPDATE queries is not supported.";
- IgniteThread thread = new IgniteThread(worker);
-
- thread.start();
- }
- finally {
- lazyWorkerBusyLock.leaveBusy();
- }
- }
- else
- log.info("Ignored query request (node is stopping) [nodeId=" + node.id() + ", reqId=" + reqId + ']');
+ boolean inTx = (tx != null);
- return;
- }
+ MapQueryLazyWorker worker = MapQueryLazyWorker.currentWorker();
- if (lazy && txDetails != null)
- throw new IgniteSQLException("Lazy execution of SELECT FOR UPDATE queries is not supported.");
+ if (lazy && worker == null)
+ worker = createLazyWorker(node, reqId, segmentId);
// Prepare to run queries.
GridCacheContext<?, ?> mainCctx = mainCacheContext(cacheIds);
MapNodeResults nodeRess = resultsForNode(node.id());
- MapQueryResults qr = null;
+ MapQueryResults qryResults = null;
List<GridReservable> reserved = new ArrayList<>();
@@ -847,7 +802,7 @@ public class GridMapQueryExecutor {
if (!F.isEmpty(err)) {
// Unregister lazy worker because re-try may never reach this node again.
if (lazy)
- stopAndUnregisterCurrentLazyWorker();
+ worker.stop(false);
sendRetry(node, reqId, segmentId, err);
@@ -855,10 +810,7 @@ public class GridMapQueryExecutor {
}
}
- qr = new MapQueryResults(h2, reqId, qrys.size(), mainCctx, MapQueryLazyWorker.currentWorker(), inTx);
-
- if (nodeRess.put(reqId, segmentId, qr) != null)
- throw new IllegalStateException();
+ qryResults = new MapQueryResults(h2, reqId, qrys.size(), mainCctx, worker, inTx);
// Prepare query context.
GridH2QueryContext qctx = new GridH2QueryContext(ctx.localNodeId(),
@@ -872,186 +824,207 @@ public class GridMapQueryExecutor {
.pageSize(pageSize)
.topologyVersion(topVer)
.reservations(reserved)
- .mvccSnapshot(mvccSnapshot)
- .lazyWorker(worker);
-
- Connection conn = h2.connectionForSchema(schemaName);
-
- H2Utils.setupConnection(conn, distributedJoinMode != OFF, enforceJoinOrder);
-
- GridH2QueryContext.set(qctx);
+ .mvccSnapshot(mvccSnapshot);
// qctx is set, we have to release reservations inside of it.
reserved = null;
- try {
- if (nodeRess.cancelled(reqId)) {
- GridH2QueryContext.clear(ctx.localNodeId(), node.id(), reqId, qctx.type());
+ if (worker != null)
+ worker.queryContext(qctx);
- nodeRess.cancelRequest(reqId);
+ GridH2QueryContext.set(qctx);
- throw new QueryCancelledException();
- }
+ if (nodeRess.put(reqId, segmentId, qryResults) != null)
+ throw new IllegalStateException();
- // Run queries.
- int qryIdx = 0;
+ Connection conn = h2.connectionForSchema(schemaName);
- boolean evt = mainCctx != null && mainCctx.events().isRecordable(EVT_CACHE_QUERY_EXECUTED);
+ H2Utils.setupConnection(conn, distributedJoinMode != OFF, enforceJoinOrder, lazy);
- for (GridCacheSqlQuery qry : qrys) {
- ResultSet rs = null;
+ if (nodeRess.cancelled(reqId)) {
+ GridH2QueryContext.clear(ctx.localNodeId(), node.id(), reqId, qctx.type());
- boolean removeMapping = false;
+ nodeRess.cancelRequest(reqId);
- // If we are not the target node for this replicated query, just ignore it.
- if (qry.node() == null || (segmentId == 0 && qry.node().equals(ctx.localNodeId()))) {
- String sql = qry.query(); Collection<Object> params0 = F.asList(qry.parameters(params));
+ throw new QueryCancelledException();
+ }
- PreparedStatement stmt;
+ // Run queries.
+ int qryIdx = 0;
- try {
- stmt = h2.prepareStatement(conn, sql, true);
- }
- catch (SQLException e) {
- throw new IgniteCheckedException("Failed to parse SQL query: " + sql, e);
- }
+ boolean evt = mainCctx != null && mainCctx.events().isRecordable(EVT_CACHE_QUERY_EXECUTED);
- Prepared p = GridSqlQueryParser.prepared(stmt);
+ for (GridCacheSqlQuery qry : qrys) {
+ ResultSet rs = null;
- if (GridSqlQueryParser.isForUpdateQuery(p)) {
- sql = GridSqlQueryParser.rewriteQueryForUpdateIfNeeded(p, inTx);
- stmt = h2.prepareStatement(conn, sql, true);
- }
+ boolean removeMapping = false;
- h2.bindParameters(stmt, params0);
+ // If we are not the target node for this replicated query, just ignore it.
+ if (qry.node() == null || (segmentId == 0 && qry.node().equals(ctx.localNodeId()))) {
+ String sql = qry.query(); Collection<Object> params0 = F.asList(qry.parameters(params));
- int opTimeout = IgniteH2Indexing.operationTimeout(timeout, tx);
+ PreparedStatement stmt;
- rs = h2.executeSqlQueryWithTimer(stmt, conn, sql, params0, opTimeout, qr.queryCancel(qryIdx));
+ try {
+ stmt = h2.prepareStatement(conn, sql, true);
+ }
+ catch (SQLException e) {
+ throw new IgniteCheckedException("Failed to parse SQL query: " + sql, e);
+ }
- if (inTx) {
- ResultSetEnlistFuture enlistFut = ResultSetEnlistFuture.future(
- ctx.localNodeId(),
- txDetails.version(),
- mvccSnapshot,
- txDetails.threadId(),
- IgniteUuid.randomUuid(),
- txDetails.miniId(),
- parts,
- tx,
- opTimeout,
- mainCctx,
- rs
- );
+ Prepared p = GridSqlQueryParser.prepared(stmt);
- if (lockFut != null)
- lockFut.register(enlistFut);
+ if (GridSqlQueryParser.isForUpdateQuery(p)) {
+ sql = GridSqlQueryParser.rewriteQueryForUpdateIfNeeded(p, inTx);
+ stmt = h2.prepareStatement(conn, sql, true);
+ }
- enlistFut.init();
+ h2.bindParameters(stmt, params0);
- enlistFut.get();
+ int opTimeout = IgniteH2Indexing.operationTimeout(timeout, tx);
- rs.beforeFirst();
- }
+ rs = h2.executeSqlQueryWithTimer(stmt, conn, sql, params0, opTimeout, qryResults.queryCancel(qryIdx));
- if (evt) {
- ctx.event().record(new CacheQueryExecutedEvent<>(
- node,
- "SQL query executed.",
- EVT_CACHE_QUERY_EXECUTED,
- CacheQueryType.SQL.name(),
- mainCctx.name(),
- null,
- qry.query(),
- null,
- null,
- params,
- node.id(),
- null));
- }
+ if (inTx) {
+ ResultSetEnlistFuture enlistFut = ResultSetEnlistFuture.future(
+ ctx.localNodeId(),
+ txDetails.version(),
+ mvccSnapshot,
+ txDetails.threadId(),
+ IgniteUuid.randomUuid(),
+ txDetails.miniId(),
+ parts,
+ tx,
+ opTimeout,
+ mainCctx,
+ rs
+ );
+
+ if (lockFut != null)
+ lockFut.register(enlistFut);
+
+ enlistFut.init();
+
+ enlistFut.get();
+
+ rs.beforeFirst();
+ }
- assert rs instanceof JdbcResultSet : rs.getClass();
+ if (evt) {
+ ctx.event().record(new CacheQueryExecutedEvent<>(
+ node,
+ "SQL query executed.",
+ EVT_CACHE_QUERY_EXECUTED,
+ CacheQueryType.SQL.name(),
+ mainCctx.name(),
+ null,
+ qry.query(),
+ null,
+ null,
+ params,
+ node.id(),
+ null));
}
- qr.addResult(qryIdx, qry, node.id(), rs, params);
+ assert rs instanceof JdbcResultSet : rs.getClass();
+ }
- if (qr.cancelled()) {
- qr.result(qryIdx).close();
+ qryResults.addResult(qryIdx, qry, node.id(), rs, params);
- throw new QueryCancelledException();
- }
+ if (qryResults.cancelled()) {
+ qryResults.result(qryIdx).close();
- if (inTx) {
- if (tx.dht() && (runCntr == null || runCntr.decrementAndGet() == 0)) {
- if (removeMapping = tx.empty() && !tx.queryEnlisted())
- tx.rollbackAsync().get();
- }
+ throw new QueryCancelledException();
+ }
+
+ if (inTx) {
+ if (tx.dht() && (runCntr == null || runCntr.decrementAndGet() == 0)) {
+ if (removeMapping = tx.empty() && !tx.queryEnlisted())
+ tx.rollbackAsync().get();
}
+ }
- // Send the first page.
- if (lockFut == null)
- sendNextPage(nodeRess, node, qr, qryIdx, segmentId, pageSize, removeMapping);
- else {
- GridQueryNextPageResponse msg = prepareNextPage(nodeRess, node, qr, qryIdx, segmentId, pageSize, removeMapping);
-
- if (msg != null) {
- lockFut.listen(new IgniteInClosure<IgniteInternalFuture<Void>>() {
- @Override public void apply(IgniteInternalFuture<Void> future) {
- try {
- if (node.isLocal())
- h2.reduceQueryExecutor().onMessage(ctx.localNodeId(), msg);
- else
- ctx.io().sendToGridTopic(node, GridTopic.TOPIC_QUERY, msg, QUERY_POOL);
- }
- catch (Exception e) {
- U.error(log, e);
- }
+ // Send the first page.
+ if (lockFut == null)
+ sendNextPage(nodeRess, node, qryResults, qryIdx, segmentId, pageSize, removeMapping);
+ else {
+ GridQueryNextPageResponse msg = prepareNextPage(nodeRess, node, qryResults, qryIdx, segmentId, pageSize, removeMapping);
+
+ if (msg != null) {
+ lockFut.listen(new IgniteInClosure<IgniteInternalFuture<Void>>() {
+ @Override public void apply(IgniteInternalFuture<Void> future) {
+ try {
+ if (node.isLocal())
+ h2.reduceQueryExecutor().onMessage(ctx.localNodeId(), msg);
+ else
+ ctx.io().sendToGridTopic(node, GridTopic.TOPIC_QUERY, msg, QUERY_POOL);
}
- });
- }
+ catch (Exception e) {
+ U.error(log, e);
+ }
+ }
+ });
}
-
- qryIdx++;
}
- // All request results are in the memory in result set already, so it's ok to release partitions.
- if (!lazy)
- releaseReservations();
+ qryIdx++;
}
- catch (Throwable e){
+
+ // All request results are in the memory in result set already, so it's ok to release partitions.
+ if (!lazy)
releaseReservations();
+ else if (!qryResults.isAllClosed()) {
+ if (MapQueryLazyWorker.currentWorker() == null) {
+ final ObjectPoolReusable<H2ConnectionWrapper> detachedConn = h2.detachConnection();
- throw e;
+ worker.start(H2Utils.session(conn), detachedConn);
+
+ GridH2QueryContext.clearThreadLocal();
+ }
}
+ else
+ unregisterLazyWorker(worker);
}
catch (Throwable e) {
- if (qr != null) {
- nodeRess.remove(reqId, segmentId, qr);
+ if (qryResults != null) {
+ nodeRess.remove(reqId, segmentId, qryResults);
- qr.cancel(false);
+ qryResults.close();
}
+ else
+ releaseReservations();
- // Unregister worker after possible cancellation.
+ // Stop and unregister worker after possible cancellation.
if (lazy)
- stopAndUnregisterCurrentLazyWorker();
+ worker.stop(false);
- GridH2RetryException retryErr = X.cause(e, GridH2RetryException.class);
+ if (e instanceof QueryCancelledException)
+ sendError(node, reqId, e);
+ else {
+ JdbcSQLException sqlEx = X.cause(e, JdbcSQLException.class);
- if (retryErr != null) {
- final String retryCause = String.format(
- "Failed to execute non-collocated query (will retry) [localNodeId=%s, rmtNodeId=%s, reqId=%s, " +
- "errMsg=%s]", ctx.localNodeId(), node.id(), reqId, retryErr.getMessage()
- );
+ if (sqlEx != null && sqlEx.getErrorCode() == ErrorCode.STATEMENT_WAS_CANCELED)
+ sendError(node, reqId, new QueryCancelledException());
+ else {
+ GridH2RetryException retryErr = X.cause(e, GridH2RetryException.class);
- sendRetry(node, reqId, segmentId, retryCause);
- }
- else {
- U.error(log, "Failed to execute local query.", e);
+ if (retryErr != null) {
+ final String retryCause = String.format(
+ "Failed to execute non-collocated query (will retry) [localNodeId=%s, rmtNodeId=%s, reqId=%s, " +
+ "errMsg=%s]", ctx.localNodeId(), node.id(), reqId, retryErr.getMessage()
+ );
- sendError(node, reqId, e);
+ sendRetry(node, reqId, segmentId, retryCause);
+ }
+ else {
+ U.error(log, "Failed to execute local query.", e);
+
+ sendError(node, reqId, e);
- if (e instanceof Error)
- throw (Error)e;
+ if (e instanceof Error)
+ throw (Error)e;
+ }
+ }
}
}
finally {
@@ -1060,10 +1033,25 @@ public class GridMapQueryExecutor {
for (int i = 0; i < reserved.size(); i++)
reserved.get(i).release();
}
+
+ if (MapQueryLazyWorker.currentWorker() == null && GridH2QueryContext.get() != null)
+ GridH2QueryContext.clearThreadLocal();
}
}
/**
+ * @param node The node has sent map query request.
+ * @param reqId Request ID.
+ * @param segmentId Segment ID.
+ * @return Lazy worker.
+ */
+ private MapQueryLazyWorker createLazyWorker(ClusterNode node, long reqId, int segmentId) {
+ MapQueryLazyWorkerKey key = new MapQueryLazyWorkerKey(node.id(), reqId, segmentId);
+
+ return new MapQueryLazyWorker(ctx.igniteInstanceName(), key, log, this);
+ }
+
+ /**
* @param cacheIds Cache ids.
* @return Id of the first cache in list, or {@code null} if list is empty.
*/
@@ -1088,6 +1076,7 @@ public class GridMapQueryExecutor {
/**
* @param node Node.
* @param req DML request.
+ * @throws IgniteCheckedException On error.
*/
private void onDmlRequest(final ClusterNode node, final GridH2DmlRequest req) throws IgniteCheckedException {
int[] parts = req.queryPartitions();
@@ -1255,24 +1244,34 @@ public class GridMapQueryExecutor {
return;
}
- final MapQueryResults qr = nodeRess.get(req.queryRequestId(), req.segmentId());
+ final MapQueryResults qryResults = nodeRess.get(req.queryRequestId(), req.segmentId());
- if (qr == null)
+ if (qryResults == null)
sendError(node, req.queryRequestId(), new CacheException("No query result found for request: " + req));
- else if (qr.cancelled())
+ else if (qryResults.cancelled())
sendError(node, req.queryRequestId(), new QueryCancelledException());
else {
- MapQueryLazyWorker lazyWorker = qr.lazyWorker();
+ MapQueryLazyWorker lazyWorker = qryResults.lazyWorker();
if (lazyWorker != null) {
lazyWorker.submit(new Runnable() {
@Override public void run() {
- sendNextPage(nodeRess, node, qr, req.query(), req.segmentId(), req.pageSize(), false);
+ try {
+ sendNextPage(nodeRess, node, qryResults, req.query(), req.segmentId(), req.pageSize(), false);
+ }
+ catch (Throwable e) {
+ JdbcSQLException sqlEx = X.cause(e, JdbcSQLException.class);
+
+ if (sqlEx != null && sqlEx.getErrorCode() == ErrorCode.STATEMENT_WAS_CANCELED)
+ sendError(node, qryResults.queryRequestId(), new QueryCancelledException());
+ else
+ throw e;
+ }
}
});
}
else
- sendNextPage(nodeRess, node, qr, req.query(), req.segmentId(), req.pageSize(), false);
+ sendNextPage(nodeRess, node, qryResults, req.query(), req.segmentId(), req.pageSize(), false);
}
}
@@ -1287,8 +1286,14 @@ public class GridMapQueryExecutor {
* @return Next page.
* @throws IgniteCheckedException If failed.
*/
- private GridQueryNextPageResponse prepareNextPage(MapNodeResults nodeRess, ClusterNode node, MapQueryResults qr, int qry, int segmentId,
- int pageSize, boolean removeMapping) throws IgniteCheckedException {
+ private GridQueryNextPageResponse prepareNextPage(
+ MapNodeResults nodeRess,
+ ClusterNode node,
+ MapQueryResults qr,
+ int qry,
+ int segmentId,
+ int pageSize,
+ boolean removeMapping) throws IgniteCheckedException {
MapQueryResult res = qr.result(qry);
assert res != null;
@@ -1309,8 +1314,11 @@ public class GridMapQueryExecutor {
nodeRess.remove(qr.queryRequestId(), segmentId, qr);
// Release reservations if the last page fetched, all requests are closed and this is a lazy worker.
- if (MapQueryLazyWorker.currentWorker() != null)
+ if (qr.lazyWorker() != null) {
releaseReservations();
+
+ qr.lazyWorker().stop(false);
+ }
}
}
@@ -1342,8 +1350,14 @@ public class GridMapQueryExecutor {
* @param removeMapping Remove mapping flag.
*/
@SuppressWarnings("unchecked")
- private void sendNextPage(MapNodeResults nodeRess, ClusterNode node, MapQueryResults qr, int qry, int segmentId,
- int pageSize, boolean removeMapping) {
+ private void sendNextPage(
+ MapNodeResults nodeRess,
+ ClusterNode node,
+ MapQueryResults qr,
+ int qry,
+ int segmentId,
+ int pageSize,
+ boolean removeMapping) {
try {
GridQueryNextPageResponse msg = prepareNextPage(nodeRess, node, qr, qry, segmentId, pageSize, removeMapping);
@@ -1365,6 +1379,7 @@ public class GridMapQueryExecutor {
* @param node Node.
* @param reqId Request ID.
* @param segmentId Index segment ID.
+ * @param retryCause Description of the retry cause.
*/
private void sendRetry(ClusterNode node, long reqId, int segmentId, String retryCause) {
try {
@@ -1401,25 +1416,11 @@ public class GridMapQueryExecutor {
}
/**
- * Unregister lazy worker if needed (i.e. if we are currently in lazy worker thread).
- */
- public void stopAndUnregisterCurrentLazyWorker() {
- MapQueryLazyWorker worker = MapQueryLazyWorker.currentWorker();
-
- if (worker != null) {
- worker.stop(false);
-
- // Just stop is not enough as worker may be registered, but not started due to exception.
- unregisterLazyWorker(worker);
- }
- }
-
- /**
* Unregister lazy worker.
*
* @param worker Worker.
*/
- public void unregisterLazyWorker(MapQueryLazyWorker worker) {
+ void unregisterLazyWorker(MapQueryLazyWorker worker) {
lazyWorkers.remove(worker.key(), worker);
}
@@ -1429,4 +1430,17 @@ public class GridMapQueryExecutor {
public int registeredLazyWorkers() {
return lazyWorkers.size();
}
+
+ /**
+ * @param worker Worker to register.
+ */
+ void registerLazyWorker(MapQueryLazyWorker worker) {
+ MapQueryLazyWorker oldWorker = lazyWorkers.put(worker.key(), worker);
+
+ if (oldWorker != null) {
+ log.warning("Duplicates lazy worker: [key=" + worker.key() + ']');
+
+ oldWorker.stop(false);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f97ebff9/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java
index 0cb986b..217cfad 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java
@@ -17,12 +17,13 @@
package org.apache.ignite.internal.processors.query.h2.twostep;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
+import java.util.List;
import java.util.NoSuchElementException;
+import java.util.RandomAccess;
import java.util.UUID;
import javax.cache.CacheException;
import org.apache.ignite.IgniteCheckedException;
@@ -72,9 +73,11 @@ public class GridResultPage {
Collection<?> plainRows = res.plainRows();
if (plainRows != null) {
+ assert plainRows instanceof RandomAccess : "instance of " + plainRows.getClass();
+
rowsInPage = plainRows.size();
- if (rowsInPage == 0 || ((ArrayList<Value[]>)plainRows).get(0).length == res.columns())
+ if (rowsInPage == 0 || ((List<Value[]>)plainRows).get(0).length == res.columns())
rows = (Iterator<Value[]>)plainRows.iterator();
else {
// If it's a result of SELECT FOR UPDATE (we can tell by difference in number
http://git-wip-us.apache.org/repos/asf/ignite/blob/f97ebff9/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapNodeResults.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapNodeResults.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapNodeResults.java
index 48116d3..8f8553a 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapNodeResults.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapNodeResults.java
@@ -17,12 +17,11 @@
package org.apache.ignite.internal.processors.query.h2.twostep;
-import org.apache.ignite.internal.processors.query.GridQueryCancel;
-import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashMap;
-import java.util.concurrent.ConcurrentHashMap;
-
import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import org.apache.ignite.internal.processors.query.GridQueryCancel;
+import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashMap;
import static org.jsr166.ConcurrentLinkedHashMap.QueuePolicy.PER_SEGMENT_Q;
@@ -86,10 +85,10 @@ class MapNodeResults {
public void cancelRequest(long reqId) {
for (MapRequestKey key : res.keySet()) {
if (key.requestId() == reqId) {
- MapQueryResults removed = res.remove(key);
+ final MapQueryResults removed = res.remove(key);
if (removed != null)
- removed.cancel(true);
+ removed.cancel();
}
}
@@ -144,7 +143,7 @@ class MapNodeResults {
*/
public void cancelAll() {
for (MapQueryResults ress : res.values())
- ress.cancel(true);
+ ress.cancel();
// Cancel update requests
for (GridQueryCancel upd: updCancels.values())
http://git-wip-us.apache.org/repos/asf/ignite/blob/f97ebff9/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryLazyWorker.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryLazyWorker.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryLazyWorker.java
index 98f3df9..1cbab19 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryLazyWorker.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryLazyWorker.java
@@ -20,25 +20,41 @@ package org.apache.ignite.internal.processors.query.h2.twostep;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.query.QueryCancelledException;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.processors.query.h2.H2ConnectionWrapper;
+import org.apache.ignite.internal.processors.query.h2.H2Utils;
+import org.apache.ignite.internal.processors.query.h2.ObjectPoolReusable;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryContext;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
+import org.apache.ignite.thread.IgniteThread;
+import org.h2.engine.Session;
import org.jetbrains.annotations.Nullable;
+import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.OFF;
+
/**
* Worker for lazy query execution.
*/
public class MapQueryLazyWorker extends GridWorker {
+ /** Poll task timeout milliseconds. */
+ private static final int POLL_TASK_TIMEOUT_MS = 1000;
+
/** Lazy thread flag. */
private static final ThreadLocal<MapQueryLazyWorker> LAZY_WORKER = new ThreadLocal<>();
/** Active lazy worker count (for testing purposes). */
private static final LongAdder ACTIVE_CNT = new LongAdder();
+ /** Mutex to synchronization worker start/stop. */
+ private final Object mux = new Object();
+
/** Task to be executed. */
private final BlockingQueue<Runnable> tasks = new LinkedBlockingDeque<>();
@@ -51,8 +67,14 @@ public class MapQueryLazyWorker extends GridWorker {
/** Latch decremented when worker finishes. */
private final CountDownLatch stopLatch = new CountDownLatch(1);
- /** Map query result. */
- private volatile MapQueryResult res;
+ /** Query context. */
+ private GridH2QueryContext qctx;
+
+ /** Worker is started flag. */
+ private boolean started;
+
+ /** Detached connection. */
+ private ObjectPoolReusable<H2ConnectionWrapper> detached;
/**
* Constructor.
@@ -70,38 +92,106 @@ public class MapQueryLazyWorker extends GridWorker {
this.exec = exec;
}
+ /**
+ * Start lazy worker for half-processed query.
+ * In this case we have to detach H2 connection from current thread and use it for current query processing.
+ * Also tables locks must be transferred to lazy thread from QUERY_POOL thread pool.
+ *
+ * @param ses H2 Session.
+ * @param detached H2 connection detached from current thread.
+ * @throws QueryCancelledException In case query is canceled during the worker start.
+ */
+ void start(Session ses, ObjectPoolReusable<H2ConnectionWrapper> detached) throws QueryCancelledException {
+ synchronized (mux) {
+ if (!exec.busyLock().enterBusy()) {
+ log.warning("Lazy worker isn't started. Node is stopped [key=" + key + ']');
+
+ return;
+ }
+
+ try {
+ if (started)
+ return;
+
+ if (isCancelled) {
+ if (detached != null)
+ detached.recycle();
+
+ throw new QueryCancelledException();
+ }
+
+ if (ses != null)
+ lazyTransferStart(ses);
+
+ this.detached = detached;
+
+ exec.registerLazyWorker(this);
+
+ IgniteThread thread = new IgniteThread(this);
+
+ started = true;
+
+ thread.start();
+ }
+ finally {
+ exec.busyLock().leaveBusy();
+ }
+ }
+ }
+
/** {@inheritDoc} */
@Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
LAZY_WORKER.set(this);
ACTIVE_CNT.increment();
+ boolean lockBusy = false;
+
try {
+ if (qctx != null)
+ GridH2QueryContext.set(qctx);
+
+ if(detached != null)
+ lazyTransferFinish(H2Utils.session(detached.object().connection()));
+
while (!isCancelled()) {
- Runnable task = tasks.take();
+ Runnable task = tasks.poll(POLL_TASK_TIMEOUT_MS, TimeUnit.MILLISECONDS);
if (task != null) {
- if (!exec.busyLock().enterBusy())
- return;
-
try {
task.run();
}
+ catch (Throwable t) {
+ log.warning("Lazy task error", t);
+ }
+ }
+ else {
+ try {
+ lockBusy = false;
+
+ if (!exec.busyLock().enterBusy()) {
+ log.info("Stop lazy worker [key=" + key + ']');
+
+ return;
+ }
+
+ lockBusy = true;
+ }
finally {
- exec.busyLock().leaveBusy();
+ if (lockBusy)
+ exec.busyLock().leaveBusy();
}
}
}
}
finally {
- if (res != null)
- res.close();
+ exec.unregisterLazyWorker(this);
LAZY_WORKER.set(null);
ACTIVE_CNT.decrement();
- exec.unregisterLazyWorker(this);
+ stopLatch.countDown();
}
}
@@ -111,6 +201,9 @@ public class MapQueryLazyWorker extends GridWorker {
* @param task Task to be executed.
*/
public void submit(Runnable task) {
+ if (isCancelled)
+ return;
+
tasks.add(task);
}
@@ -125,45 +218,76 @@ public class MapQueryLazyWorker extends GridWorker {
* Stop the worker.
* @param nodeStop Node is stopping.
*/
- public void stop(final boolean nodeStop) {
- if (MapQueryLazyWorker.currentWorker() == null)
- submit(new Runnable() {
- @Override public void run() {
- stop(nodeStop);
- }
- });
- else {
- GridH2QueryContext qctx = GridH2QueryContext.get();
-
- if (qctx != null) {
+ private void stop0(boolean nodeStop) {
+ synchronized (mux) {
+ if (qctx != null && qctx.distributedJoinMode() == OFF && !qctx.isCleared())
qctx.clearContext(nodeStop);
- GridH2QueryContext.clearThreadLocal();
+ if (detached != null) {
+ detached.recycle();
+
+ detached = null;
}
isCancelled = true;
- stopLatch.countDown();
+ mux.notifyAll();
}
}
/**
- * Await worker stop.
+ * @param task Stop task.
*/
- public void awaitStop() {
- try {
- U.await(stopLatch);
+ public void submitStopTask(Runnable task) {
+ synchronized (mux) {
+ if (LAZY_WORKER.get() != null)
+ task.run();
+ else
+ submit(task);
}
- catch (IgniteInterruptedCheckedException e) {
- throw new IgniteException("Failed to wait for lazy worker stop (interrupted): " + name(), e);
+ }
+
+ /**
+ * Stop the worker.
+ * @param nodeStop Node is stopping.
+ */
+ public void stop(final boolean nodeStop) {
+ synchronized (mux) {
+ if (isCancelled)
+ return;
+
+ if (started && currentWorker() == null) {
+ submit(new Runnable() {
+ @Override public void run() {
+ stop0(nodeStop);
+ }
+ });
+
+ awaitStop();
+ }
+ else if (currentWorker() != null)
+ stop0(nodeStop);
}
}
/**
- * @param res Map query result.
+ * Await worker stop.
*/
- public void result(MapQueryResult res) {
- this.res = res;
+ private void awaitStop() {
+ synchronized (mux) {
+ try {
+ if (!isCancelled)
+ mux.wait();
+
+ U.await(stopLatch);
+ }
+ catch (IgniteInterruptedCheckedException e) {
+ throw new IgniteException("Failed to wait for lazy worker stop (interrupted): " + name(), e);
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
}
/**
@@ -181,6 +305,13 @@ public class MapQueryLazyWorker extends GridWorker {
}
/**
+ * @param qctx Query context.
+ */
+ public void queryContext(GridH2QueryContext qctx) {
+ this.qctx = qctx;
+ }
+
+ /**
* Construct worker name.
*
* @param instanceName Instance name.
@@ -191,4 +322,32 @@ public class MapQueryLazyWorker extends GridWorker {
return "query-lazy-worker_" + instanceName + "_" + key.nodeId() + "_" + key.queryRequestId() + "_" +
key.segment();
}
+
+ /**
+ * Start session transfer to lazy thread.
+ *
+ * @param ses Session.
+ */
+ private static void lazyTransferStart(Session ses) {
+ GridH2QueryContext qctx = GridH2QueryContext.get();
+
+ assert qctx != null;
+
+ for(GridH2Table tbl : qctx.lockedTables())
+ tbl.onLazyTransferStarted(ses);
+ }
+
+ /**
+ * Finish session transfer to lazy thread.
+ *
+ * @param ses Session.
+ */
+ private static void lazyTransferFinish(Session ses) {
+ GridH2QueryContext qctx = GridH2QueryContext.get();
+
+ assert qctx != null;
+
+ for(GridH2Table tbl : qctx.lockedTables())
+ tbl.onLazyTransferFinished(ses);
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f97ebff9/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java
index fb928c4..5a0c410 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java
@@ -22,6 +22,7 @@ import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
+import org.apache.ignite.IgniteLogger;
import org.apache.ignite.events.CacheQueryReadEvent;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
@@ -60,6 +61,9 @@ class MapQueryResult {
}
}
+ /** Logger. */
+ private final IgniteLogger log;
+
/** Indexing. */
private final IgniteH2Indexing h2;
@@ -96,26 +100,23 @@ class MapQueryResult {
/** */
private final Object[] params;
- /** Lazy worker. */
- private final MapQueryLazyWorker lazyWorker;
-
/**
+ * @param h2 H2 indexing.
* @param rs Result set.
* @param cctx Cache context.
* @param qrySrcNodeId Query source node.
* @param qry Query.
* @param params Query params.
- * @param lazyWorker Lazy worker.
*/
MapQueryResult(IgniteH2Indexing h2, ResultSet rs, @Nullable GridCacheContext cctx,
- UUID qrySrcNodeId, GridCacheSqlQuery qry, Object[] params, @Nullable MapQueryLazyWorker lazyWorker) {
+ UUID qrySrcNodeId, GridCacheSqlQuery qry, Object[] params) {
+ this.log = h2.kernalContext().log(MapQueryResult.class);
this.h2 = h2;
this.cctx = cctx;
this.qry = qry;
this.params = params;
this.qrySrcNodeId = qrySrcNodeId;
this.cpNeeded = F.eq(h2.kernalContext().localNodeId(), qrySrcNodeId);
- this.lazyWorker = lazyWorker;
if (rs != null) {
this.rs = rs;
@@ -174,8 +175,6 @@ class MapQueryResult {
* @return {@code true} If there are no more rows available.
*/
synchronized boolean fetchNextPage(List<Value[]> rows, int pageSize) {
- assert lazyWorker == null || lazyWorker == MapQueryLazyWorker.currentWorker();
-
if (closed)
return true;
@@ -259,30 +258,13 @@ class MapQueryResult {
* Close the result.
*/
public void close() {
- if (lazyWorker != null && MapQueryLazyWorker.currentWorker() == null) {
- lazyWorker.submit(new Runnable() {
- @Override public void run() {
- close();
- }
- });
-
- lazyWorker.awaitStop();
-
- return;
- }
-
synchronized (this) {
- assert lazyWorker == null || lazyWorker == MapQueryLazyWorker.currentWorker();
-
if (closed)
return;
closed = true;
- U.closeQuiet(rs);
-
- if (lazyWorker != null)
- lazyWorker.stop(false);
+ U.close(rs, log);
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f97ebff9/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResults.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResults.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResults.java
index 76527bc..b13137c 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResults.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResults.java
@@ -30,7 +30,7 @@ import org.jetbrains.annotations.Nullable;
* Mapper query results.
*/
class MapQueryResults {
- /** H@ indexing. */
+ /** H2 indexing. */
private final IgniteH2Indexing h2;
/** */
@@ -113,10 +113,7 @@ class MapQueryResults {
* @param params Query arguments.
*/
void addResult(int qry, GridCacheSqlQuery q, UUID qrySrcNodeId, ResultSet rs, Object[] params) {
- MapQueryResult res = new MapQueryResult(h2, rs, cctx, qrySrcNodeId, q, params, lazyWorker);
-
- if (lazyWorker != null)
- lazyWorker.result(res);
+ MapQueryResult res = new MapQueryResult(h2, rs, cctx, qrySrcNodeId, q, params);
if (!results.compareAndSet(qry, null, res))
throw new IllegalStateException();
@@ -139,28 +136,37 @@ class MapQueryResults {
/**
* Cancels the query.
*/
- void cancel(boolean forceQryCancel) {
+ void cancel() {
if (cancelled)
return;
cancelled = true;
for (int i = 0; i < results.length(); i++) {
- MapQueryResult res = results.get(i);
+ GridQueryCancel cancel = cancels[i];
- if (res != null) {
- res.close();
+ if (cancel != null)
+ cancel.cancel();
+ }
- continue;
- }
+ if (lazyWorker == null)
+ close();
+ else {
+ lazyWorker.submitStopTask(this::close);
- // NB: Cancel is already safe even for lazy queries (see implementation of passed Runnable).
- if (forceQryCancel) {
- GridQueryCancel cancel = cancels[i];
+ lazyWorker.stop(false);
+ }
+ }
- if (cancel != null)
- cancel.cancel();
- }
+ /**
+ *
+ */
+ public void close() {
+ for (int i = 0; i < results.length(); i++) {
+ MapQueryResult res = results.get(i);
+
+ if (res != null)
+ res.close();
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f97ebff9/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheLazyQueryPartitionsReleaseTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheLazyQueryPartitionsReleaseTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheLazyQueryPartitionsReleaseTest.java
index a112969..a991530 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheLazyQueryPartitionsReleaseTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheLazyQueryPartitionsReleaseTest.java
@@ -96,7 +96,6 @@ public class GridCacheLazyQueryPartitionsReleaseTest extends GridCommonAbstractT
int partsFilled = fillAllPartitions(cache, aff);
SqlFieldsQuery qry = new SqlFieldsQuery("select name, age from person")
- .setLazy(true)
.setPageSize(1);
FieldsQueryCursor<List<?>> qryCursor = cache.query(qry);
@@ -143,7 +142,6 @@ public class GridCacheLazyQueryPartitionsReleaseTest extends GridCommonAbstractT
int partsFilled = fillAllPartitions(cache, aff);
SqlFieldsQuery qry = new SqlFieldsQuery("select name, age from person")
- .setLazy(true)
.setPageSize(1);
FieldsQueryCursor<List<?>> qryCursor = cache.query(qry);
http://git-wip-us.apache.org/repos/asf/ignite/blob/f97ebff9/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryH2IndexingLeakTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryH2IndexingLeakTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryH2IndexingLeakTest.java
index 59be138..24e2fb2 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryH2IndexingLeakTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryH2IndexingLeakTest.java
@@ -121,12 +121,15 @@ public class IgniteCacheQueryH2IndexingLeakTest extends GridCommonAbstractTest {
private static int getStatementCacheSize(GridQueryProcessor qryProcessor) {
IgniteH2Indexing h2Idx = GridTestUtils.getFieldValue(qryProcessor, GridQueryProcessor.class, "idx");
- ConcurrentMap<Thread, H2ConnectionWrapper> conns = GridTestUtils.getFieldValue(h2Idx, IgniteH2Indexing.class, "conns");
+ ConcurrentMap<Thread, ConcurrentMap<H2ConnectionWrapper, Boolean>> conns =
+ GridTestUtils.getFieldValue(h2Idx, IgniteH2Indexing.class, "conns");
int cntr = 0;
- for (H2ConnectionWrapper w : conns.values())
- cntr += w.statementCacheSize();
+ for (ConcurrentMap<H2ConnectionWrapper, Boolean> connPerThread: conns.values()) {
+ for (H2ConnectionWrapper w : connPerThread.keySet())
+ cntr += w.statementCacheSize();
+ }
return cntr;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f97ebff9/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedQueryStopOnCancelOrTimeoutSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedQueryStopOnCancelOrTimeoutSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedQueryStopOnCancelOrTimeoutSelfTest.java
index 56fd7b8..8542f43 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedQueryStopOnCancelOrTimeoutSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedQueryStopOnCancelOrTimeoutSelfTest.java
@@ -177,7 +177,6 @@ public class IgniteCacheDistributedQueryStopOnCancelOrTimeoutSelfTest extends Gr
private void testQueryCancel(int keyCnt, int valSize, String sql, int timeoutUnits, TimeUnit timeUnit,
boolean timeout) throws Exception {
try (Ignite client = startGrid("client")) {
-
IgniteCache<Object, Object> cache = client.cache(DEFAULT_CACHE_NAME);
assertEquals(0, cache.localSize());
@@ -204,7 +203,8 @@ public class IgniteCacheDistributedQueryStopOnCancelOrTimeoutSelfTest extends Gr
qry.setTimeout(timeoutUnits, timeUnit);
cursor = cache.query(qry);
- } else {
+ }
+ else {
cursor = cache.query(qry);
client.scheduler().runLocal(new Runnable() {
@@ -214,7 +214,7 @@ public class IgniteCacheDistributedQueryStopOnCancelOrTimeoutSelfTest extends Gr
}, timeoutUnits, timeUnit);
}
- try(QueryCursor<List<?>> ignored = cursor) {
+ try (QueryCursor<List<?>> ignored = cursor) {
cursor.iterator();
}
catch (CacheException ex) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/f97ebff9/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartDistributedJoinSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartDistributedJoinSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartDistributedJoinSelfTest.java
index bad5303..3beebff 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartDistributedJoinSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartDistributedJoinSelfTest.java
@@ -33,6 +33,7 @@ import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerArray;
+import org.apache.ignite.testframework.GridTestUtils;
/**
* Test for distributed queries with node restarts.
@@ -101,11 +102,11 @@ public class IgniteCacheQueryNodeRestartDistributedJoinSelfTest extends IgniteCa
assertEquals(broadcastQry, plan.contains("batched:broadcast"));
- final List<List<?>> pRes = grid(0).cache("pu").query(qry0).getAll();
+ final List<List<?>> goldenRes = grid(0).cache("pu").query(qry0).getAll();
Thread.sleep(3000);
- assertEquals(pRes, grid(0).cache("pu").query(qry0).getAll());
+ assertEquals(goldenRes, grid(0).cache("pu").query(qry0).getAll());
final SqlFieldsQuery qry1;
@@ -122,7 +123,7 @@ public class IgniteCacheQueryNodeRestartDistributedJoinSelfTest extends IgniteCa
final List<List<?>> rRes = grid(0).cache("co").query(qry1).getAll();
- assertFalse(pRes.isEmpty());
+ assertFalse(goldenRes.isEmpty());
assertFalse(rRes.isEmpty());
final AtomicInteger qryCnt = new AtomicInteger();
@@ -161,9 +162,12 @@ public class IgniteCacheQueryNodeRestartDistributedJoinSelfTest extends IgniteCa
qry.setPageSize(smallPageSize ? 30 : 1000);
try {
- assertEquals(pRes, cache.query(qry).getAll());
+ assertEquals(goldenRes, cache.query(qry).getAll());
}
catch (CacheException e) {
+ if (!smallPageSize)
+ log.error("Unexpected exception at the test", e);
+
assertTrue("On large page size must retry.", smallPageSize);
boolean failedOnRemoteFetch = false;
@@ -263,7 +267,7 @@ public class IgniteCacheQueryNodeRestartDistributedJoinSelfTest extends IgniteCa
}
}, restartThreadsNum, "restart-thread");
- Thread.sleep(duration);
+ GridTestUtils.waitForCondition(() -> fail.get(), duration);
info("Stopping...");
http://git-wip-us.apache.org/repos/asf/ignite/blob/f97ebff9/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicColumnsAbstractConcurrentSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicColumnsAbstractConcurrentSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicColumnsAbstractConcurrentSelfTest.java
index 072f1ab..4d02b2e 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicColumnsAbstractConcurrentSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicColumnsAbstractConcurrentSelfTest.java
@@ -627,6 +627,8 @@ public abstract class DynamicColumnsAbstractConcurrentSelfTest extends DynamicCo
* @throws Exception If failed.
*/
public void testQueryConsistencyMultithreaded() throws Exception {
+ final int KEY_COUNT = 5000;
+
// Start complex topology.
ignitionStart(serverConfiguration(1));
ignitionStart(serverConfiguration(2));
@@ -638,7 +640,7 @@ public abstract class DynamicColumnsAbstractConcurrentSelfTest extends DynamicCo
run(cli, createSql);
- put(cli, 0, 5000);
+ put(cli, 0, KEY_COUNT);
final AtomicBoolean stopped = new AtomicBoolean();
@@ -696,7 +698,7 @@ public abstract class DynamicColumnsAbstractConcurrentSelfTest extends DynamicCo
List<Cache.Entry<BinaryObject, BinaryObject>> res = cache.query(
new SqlQuery<BinaryObject, BinaryObject>(valTypeName, "from " + TBL_NAME)).getAll();
- assertEquals(5000, res.size());
+ assertEquals(KEY_COUNT, res.size());
}
return null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/f97ebff9/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2ConnectionLeaksSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2ConnectionLeaksSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2ConnectionLeaksSelfTest.java
index 7713004..fe45ed6 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2ConnectionLeaksSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2ConnectionLeaksSelfTest.java
@@ -160,7 +160,7 @@ public class H2ConnectionLeaksSelfTest extends GridCommonAbstractTest {
Map<Thread, ?> conns = perThreadConnections(i);
for(Thread t : conns.keySet())
- log.error("+++ Connection is not closed for thread: " + t.getName());
+ log.error("Connection is not closed for thread: " + t.getName());
}
fail("H2 JDBC connections leak detected. See the log above.");
http://git-wip-us.apache.org/repos/asf/ignite/blob/f97ebff9/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LazyQuerySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LazyQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LazyQuerySelfTest.java
index d5cc0eb..140eb6e 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LazyQuerySelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LazyQuerySelfTest.java
@@ -17,13 +17,21 @@
package org.apache.ignite.internal.processors.query;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cache.query.FieldsQueryCursor;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cache.query.annotations.QuerySqlField;
import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
import org.apache.ignite.internal.processors.query.h2.twostep.MapQueryLazyWorker;
@@ -31,16 +39,11 @@ import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.ThreadLocalRandom;
-
/**
* Tests for lazy query execution.
*/
public class LazyQuerySelfTest extends GridCommonAbstractTest {
- /** Keys ocunt. */
+ /** Keys count. */
private static final int KEY_CNT = 200;
/** Base query argument. */
@@ -94,6 +97,91 @@ public class LazyQuerySelfTest extends GridCommonAbstractTest {
}
/**
+ * Test DDL operation on table with high load queries.
+ *
+ * @throws Exception If failed.
+ */
+ public void testTableWriteLockStarvation() throws Exception {
+ final Ignite srv = startGrid(1);
+
+ srv.createCache(cacheConfiguration(4));
+
+ populateBaseQueryData(srv);
+
+ final AtomicBoolean end = new AtomicBoolean(false);
+
+ final int qryThreads = 10;
+
+ final CountDownLatch latch = new CountDownLatch(qryThreads);
+
+ // Do many concurrent queries.
+ IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Runnable() {
+ @Override public void run() {
+ latch.countDown();
+
+ while(!end.get()) {
+ FieldsQueryCursor<List<?>> cursor = execute(srv, query(0)
+ .setPageSize(PAGE_SIZE_SMALL));
+
+ cursor.getAll();
+ }
+ }
+ }, qryThreads, "usr-qry");
+
+ latch.await();
+
+ Thread.sleep(500);
+
+ execute(srv, new SqlFieldsQuery("CREATE INDEX PERSON_NAME ON Person (name asc)")).getAll();
+ execute(srv, new SqlFieldsQuery("DROP INDEX PERSON_NAME")).getAll();
+
+ // Test is OK in case DDL operations is passed on hi load queries pressure.
+ end.set(true);
+ fut.get();
+ }
+
+ /**
+ * Test release reserved partition after query complete (results is bigger than one page).
+ *
+ * @throws Exception If failed.
+ */
+ public void testReleasePartitionReservationSeveralPagesResults() throws Exception {
+ checkReleasePartitionReservation(PAGE_SIZE_SMALL);
+ }
+
+ /**
+ * Test release reserved partition after query complete (results is placed on one page).
+ *
+ * @throws Exception If failed.
+ */
+ public void testReleasePartitionReservationOnePageResults() throws Exception {
+ checkReleasePartitionReservation(KEY_CNT);
+ }
+
+ /**
+ * Test release reserved partition after query complete.
+ *
+ * @param pageSize Results page size.
+ * @throws Exception If failed.
+ */
+ public void checkReleasePartitionReservation(int pageSize) throws Exception {
+ Ignite srv1 = startGrid(1);
+ startGrid(2);
+
+ srv1.createCache(cacheConfiguration(1));
+
+ populateBaseQueryData(srv1);
+
+ FieldsQueryCursor<List<?>> cursor = execute(srv1, query(0).setPageSize(pageSize));
+
+ cursor.getAll();
+
+ startGrid(3);
+
+ awaitPartitionMapExchange();
+ }
+
+ /**
* Check local query execution.
*
* @param parallelism Query parallelism.
@@ -151,18 +239,18 @@ public class LazyQuerySelfTest extends GridCommonAbstractTest {
assertNoWorkers();
// Test server node leave with active worker.
- cursor = execute(srv1, baseQuery().setPageSize(PAGE_SIZE_SMALL));
+ FieldsQueryCursor<List<?>> cursor2 = execute(srv1, baseQuery().setPageSize(PAGE_SIZE_SMALL));
try {
- iter = cursor.iterator();
+ Iterator<List<?>> iter2 = cursor2.iterator();
for (int i = 0; i < 30; i++)
- iter.next();
+ iter2.next();
stopGrid(2);
}
finally {
- cursor.close();
+ cursor2.close();
}
assertNoWorkers();
@@ -233,7 +321,55 @@ public class LazyQuerySelfTest extends GridCommonAbstractTest {
}
}
+ checkHoldLazyQuery(node);
+
+ checkShortLazyQuery(node);
+ }
+
+ /**
+ * @param node Ignite node.
+ * @throws Exception If failed.
+ */
+ public void checkHoldLazyQuery(Ignite node) throws Exception {
+ ArrayList rows = new ArrayList<>();
+
+ FieldsQueryCursor<List<?>> cursor0 = execute(node, query(BASE_QRY_ARG).setPageSize(PAGE_SIZE_SMALL));
+
+ // Do many concurrent queries to Test full iteration.
+ GridTestUtils.runMultiThreaded(new Runnable() {
+ @Override public void run() {
+ for (int i = 0; i < 5; ++i) {
+ FieldsQueryCursor<List<?>> cursor = execute(node, query(KEY_CNT - PAGE_SIZE_SMALL + 1)
+ .setPageSize(PAGE_SIZE_SMALL));
+
+ cursor.getAll();
+ }
+ }
+ }, 5, "usr-qry");
+
+ for (List<?> row : cursor0)
+ rows.add(row);
+
+ assertBaseQueryResults(rows);
+ }
+
+ /**
+ * @param node Ignite node.
+ * @throws Exception If failed.
+ */
+ public void checkShortLazyQuery(Ignite node) throws Exception {
+ ArrayList rows = new ArrayList<>();
+
+ FieldsQueryCursor<List<?>> cursor0 = execute(node, query(KEY_CNT - PAGE_SIZE_SMALL + 1).setPageSize(PAGE_SIZE_SMALL));
+
+ Iterator<List<?>> it = cursor0.iterator();
+
assertNoWorkers();
+
+ while (it.hasNext())
+ rows.add(it.next());
+
+ assertQueryResults(rows, KEY_CNT - PAGE_SIZE_SMALL + 1);
}
/**
@@ -267,8 +403,11 @@ public class LazyQuerySelfTest extends GridCommonAbstractTest {
* @return Default cache configuration.
*/
private static CacheConfiguration<Long, Person> cacheConfiguration(int parallelism) {
- return new CacheConfiguration<Long, Person>().setName(CACHE_NAME).setIndexedTypes(Long.class, Person.class)
- .setQueryParallelism(parallelism);
+ return new CacheConfiguration<Long, Person>()
+ .setName(CACHE_NAME)
+ .setIndexedTypes(Long.class, Person.class)
+ .setQueryParallelism(parallelism)
+ .setAffinity(new RendezvousAffinityFunction(false, 10));
}
/**
@@ -278,7 +417,7 @@ public class LazyQuerySelfTest extends GridCommonAbstractTest {
* @return Query.
*/
private static SqlFieldsQuery query(long arg) {
- return new SqlFieldsQuery("SELECT id, name FROM Person WHERE id >= ?").setArgs(arg);
+ return new SqlFieldsQuery("SELECT id, name FROM Person WHERE id >= " + arg);
}
/**
@@ -287,13 +426,23 @@ public class LazyQuerySelfTest extends GridCommonAbstractTest {
* @param rows Result rows.
*/
private static void assertBaseQueryResults(List<List<?>> rows) {
- assertEquals(KEY_CNT - BASE_QRY_ARG, rows.size());
+ assertQueryResults(rows, BASE_QRY_ARG);
+ }
+
+ /**
+ * Assert base query results.
+ *
+ * @param rows Result rows.
+ * @param resSize Result size.
+ */
+ private static void assertQueryResults(List<List<?>> rows, int resSize) {
+ assertEquals(KEY_CNT - resSize, rows.size());
for (List<?> row : rows) {
Long id = (Long)row.get(0);
String name = (String)row.get(1);
- assertTrue(id >= BASE_QRY_ARG);
+ assertTrue(id >= resSize);
assertEquals(nameForId(id), name);
}
}
@@ -317,7 +466,7 @@ public class LazyQuerySelfTest extends GridCommonAbstractTest {
*/
@SuppressWarnings("unchecked")
private static FieldsQueryCursor<List<?>> execute(Ignite node, SqlFieldsQuery qry) {
- return cache(node).query(qry.setLazy(true));
+ return cache(node).query(qry);
}
/**
@@ -325,8 +474,8 @@ public class LazyQuerySelfTest extends GridCommonAbstractTest {
*
* @throws Exception If failed.
*/
- private static void assertNoWorkers() throws Exception {
- assert GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ private void assertNoWorkers() throws Exception {
+ if (!GridTestUtils.waitForCondition(new GridAbsPredicate() {
@Override public boolean apply() {
for (Ignite node : Ignition.allGrids()) {
IgniteH2Indexing idx = (IgniteH2Indexing) ((IgniteKernal)node).context().query().getIndexing();
@@ -337,7 +486,22 @@ public class LazyQuerySelfTest extends GridCommonAbstractTest {
return MapQueryLazyWorker.activeCount() == 0;
}
- }, 1000L);
+ }, 1000L)) {
+ log.error("Lazy workers on nodes:");
+
+ for (Ignite node : Ignition.allGrids()) {
+ IgniteH2Indexing idx = (IgniteH2Indexing) ((IgniteKernal)node).context().query().getIndexing();
+
+ if (idx.mapQueryExecutor().registeredLazyWorkers() != 0) {
+ log.error("[node=" + node + ", " + "registeredLazyWorkers="
+ + idx.mapQueryExecutor().registeredLazyWorkers() + ']');
+ }
+
+ log.error("Active lazy workers: " + MapQueryLazyWorker.activeCount());
+
+ fail("There are not stopped lazy workers. See error message above.");
+ }
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/f97ebff9/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/ObjectPoolSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/ObjectPoolSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/ObjectPoolSelfTest.java
new file mode 100644
index 0000000..9be0870
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/ObjectPoolSelfTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ *
+ */
+public class ObjectPoolSelfTest extends GridCommonAbstractTest {
+ /** */
+ private ObjectPool<Obj> pool = new ObjectPool<>(Obj::new, 1, null, null);
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testObjectIsReusedAfterRecycling() throws Exception {
+ ObjectPoolReusable<Obj> r1 = pool.borrow();
+
+ Obj o1 = r1.object();
+
+ r1.recycle();
+
+ ObjectPoolReusable<Obj> r2 = pool.borrow();
+
+ Obj o2 = r2.object();
+
+ assertSame(o1, o2);
+
+ assertFalse(o1.isClosed());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testBorrowedObjectIsNotReturnedTwice() throws Exception {
+ ObjectPoolReusable<Obj> r1 = pool.borrow();
+ ObjectPoolReusable<Obj> r2 = pool.borrow();
+
+ assertNotSame(r1.object(), r2.object());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testObjectShouldBeClosedOnRecycleIfPoolIsFull() throws Exception {
+ ObjectPoolReusable<Obj> r1 = pool.borrow();
+ ObjectPoolReusable<Obj> r2 = pool.borrow();
+
+ Obj o2 = r2.object();
+
+ r1.recycle();
+ r2.recycle();
+
+ assertNull(r1.object());
+ assertNull(r2.object());
+
+ assertTrue(o2.isClosed());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testObjectShouldNotBeReturnedIfPoolIsFull() throws Exception {
+ ObjectPoolReusable<Obj> r1 = pool.borrow();
+ ObjectPoolReusable<Obj> r2 = pool.borrow();
+
+ r1.recycle();
+
+ assertEquals(1, pool.bagSize());
+
+ r2.recycle();
+
+ assertEquals(1, pool.bagSize());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testObjectShouldReturnedToBag() throws Exception {
+ ObjectPoolReusable<Obj> r1 = pool.borrow();
+
+ CompletableFuture.runAsync(() -> {
+ r1.recycle();
+
+ assertEquals(1, pool.bagSize());
+ }).join();
+
+ assertEquals(1, pool.bagSize());
+ }
+
+ /** */
+ private static class Obj implements AutoCloseable {
+ /** */
+ private boolean closed = false;
+
+ /** {@inheritDoc} */
+ @Override public void close() {
+ closed = true;
+ }
+
+ /**
+ * @return {@code True} if closed.
+ */
+ public boolean isClosed() {
+ return closed;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f97ebff9/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/ThreadLocalObjectPoolSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/ThreadLocalObjectPoolSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/ThreadLocalObjectPoolSelfTest.java
deleted file mode 100644
index b7b7a37..0000000
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/ThreadLocalObjectPoolSelfTest.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.h2;
-
-import java.util.concurrent.CompletableFuture;
-import org.apache.ignite.internal.processors.query.h2.ThreadLocalObjectPool.Reusable;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-
-/**
- *
- */
-public class ThreadLocalObjectPoolSelfTest extends GridCommonAbstractTest {
- /** */
- private ThreadLocalObjectPool<Obj> pool = new ThreadLocalObjectPool<>(Obj::new, 1);
-
- /**
- * @throws Exception If failed.
- */
- public void testObjectIsReusedAfterRecycling() throws Exception {
- Reusable<Obj> o1 = pool.borrow();
- o1.recycle();
- Reusable<Obj> o2 = pool.borrow();
-
- assertSame(o1.object(), o2.object());
- assertFalse(o1.object().isClosed());
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testBorrowedObjectIsNotReturnedTwice() throws Exception {
- Reusable<Obj> o1 = pool.borrow();
- Reusable<Obj> o2 = pool.borrow();
-
- assertNotSame(o1.object(), o2.object());
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testObjectShouldBeClosedOnRecycleIfPoolIsFull() throws Exception {
- Reusable<Obj> o1 = pool.borrow();
- Reusable<Obj> o2 = pool.borrow();
- o1.recycle();
- o2.recycle();
-
- assertTrue(o2.object().isClosed());
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testObjectShouldNotBeReturnedIfPoolIsFull() throws Exception {
- Reusable<Obj> o1 = pool.borrow();
- Reusable<Obj> o2 = pool.borrow();
-
- o1.recycle();
-
- assertEquals(1, pool.bagSize());
-
- o2.recycle();
-
- assertEquals(1, pool.bagSize());
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testObjectShouldReturnedToRecyclingThreadBag() throws Exception {
- Reusable<Obj> o1 = pool.borrow();
-
- CompletableFuture.runAsync(() -> {
- o1.recycle();
-
- assertEquals(1, pool.bagSize());
- }).join();
-
- assertEquals(0, pool.bagSize());
- }
-
- /** */
- private static class Obj implements AutoCloseable {
- /** */
- private boolean closed = false;
-
- /** {@inheritDoc} */
- @Override public void close() {
- closed = true;
- }
-
- /**
- * @return {@code True} if closed.
- */
- public boolean isClosed() {
- return closed;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f97ebff9/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/RetryCauseMessageSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/RetryCauseMessageSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/RetryCauseMessageSelfTest.java
index dbb2c59..ac467d5 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/RetryCauseMessageSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/RetryCauseMessageSelfTest.java
@@ -384,11 +384,6 @@ public class RetryCauseMessageSelfTest extends GridCommonAbstractTest {
}
/** {@inheritDoc} */
- @Override public void cancelLazyWorkers() {
- startedExecutor.cancelLazyWorkers();
- }
-
- /** {@inheritDoc} */
@Override GridSpinBusyLock busyLock() {
return startedExecutor.busyLock();
}
@@ -399,19 +394,8 @@ public class RetryCauseMessageSelfTest extends GridCommonAbstractTest {
}
/** {@inheritDoc} */
- @Override public void stopAndUnregisterCurrentLazyWorker() {
- startedExecutor.stopAndUnregisterCurrentLazyWorker();
- }
-
- /** {@inheritDoc} */
- @Override public void unregisterLazyWorker(MapQueryLazyWorker worker) {
- startedExecutor.unregisterLazyWorker(worker);
- }
-
- /** {@inheritDoc} */
@Override public int registeredLazyWorkers() {
return startedExecutor.registeredLazyWorkers();
}
}
-
}