You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ko...@apache.org on 2022/08/26 15:29:03 UTC
[ignite-3] branch main updated: IGNITE-17359 Sql. Implement session auto expiration (#1004)
This is an automated email from the ASF dual-hosted git repository.
korlov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 35eda6eee8 IGNITE-17359 Sql. Implement session auto expiration (#1004)
35eda6eee8 is described below
commit 35eda6eee8601d1c34a1cf5c074b2fa169bb4a9a
Author: Evgeniy Stanilovskiy <st...@gmail.com>
AuthorDate: Fri Aug 26 18:28:58 2022 +0300
IGNITE-17359 Sql. Implement session auto expiration (#1004)
---
.../apache/ignite/internal/sql/ResultSetImpl.java | 9 +-
.../main/java/org/apache/ignite/sql/Session.java | 42 +++++-
.../requests/sql/ClientSqlCursorCloseRequest.java | 5 +-
.../sql/ClientSqlCursorNextPageRequest.java | 7 +-
.../requests/sql/ClientSqlExecuteRequest.java | 29 ++--
...orCloseRequest.java => ClientSqlResultSet.java} | 47 +++++--
.../ignite/internal/client/sql/ClientSession.java | 30 ++++-
.../internal/client/sql/ClientSessionBuilder.java | 33 ++++-
.../ignite/internal/client/sql/ClientSql.java | 2 +-
.../org/apache/ignite/client/ClientSqlTest.java | 4 +-
.../ignite/client/fakes/FakeAsyncResultSet.java | 5 +-
.../client/fakes/FakeIgniteQueryProcessor.java | 2 +-
.../apache/ignite/client/fakes/FakeSession.java | 23 +++-
.../ignite/client/fakes/FakeSessionBuilder.java | 30 ++++-
.../dotnet/Apache.Ignite.Tests/FakeServer.cs | 1 +
.../dotnet/Apache.Ignite.Tests/Sql/SqlTests.cs | 2 +-
.../Table/RecordViewBinaryTests.cs | 1 -
.../Table/RecordViewPocoTests.cs | 1 -
.../dotnet/Apache.Ignite/Internal/Sql/Sql.cs | 1 +
.../ignite/internal/sql/api/ItCommonApiTest.java | 97 ++++++++++++++
.../internal/table/ItPublicApiColocationTest.java | 2 +
.../internal/sql/api/SessionBuilderImpl.java | 34 +++--
.../ignite/internal/sql/api/SessionImpl.java | 15 ++-
.../ignite/internal/sql/engine/QueryProcessor.java | 3 +-
.../internal/sql/engine/SqlQueryProcessor.java | 15 ++-
.../internal/sql/engine/session/Session.java | 5 +
.../sql/engine/session/SessionManager.java | 73 +++++++++-
.../internal/sql/engine/IgniteSqlApiTest.java | 4 +-
.../sql/engine/exec/MockedStructuresTest.java | 2 +-
.../sql/engine/session/SessionManagerTest.java | 148 +++++++++++++++++++++
30 files changed, 580 insertions(+), 92 deletions(-)
diff --git a/modules/api/src/main/java/org/apache/ignite/internal/sql/ResultSetImpl.java b/modules/api/src/main/java/org/apache/ignite/internal/sql/ResultSetImpl.java
index 281ac63ad9..2eaef7a2d2 100644
--- a/modules/api/src/main/java/org/apache/ignite/internal/sql/ResultSetImpl.java
+++ b/modules/api/src/main/java/org/apache/ignite/internal/sql/ResultSetImpl.java
@@ -19,7 +19,10 @@ package org.apache.ignite.internal.sql;
import java.util.Iterator;
import java.util.NoSuchElementException;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
+import org.apache.ignite.internal.util.ExceptionUtils;
+import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.sql.NoRowSetExpectedException;
import org.apache.ignite.sql.ResultSet;
import org.apache.ignite.sql.ResultSetMetadata;
@@ -117,7 +120,11 @@ public class ResultSetImpl implements ResultSet {
if (curPage.hasNext()) {
return true;
} else if (nextPageStage != null) {
- curRes = nextPageStage.toCompletableFuture().join();
+ try {
+ curRes = nextPageStage.toCompletableFuture().join();
+ } catch (CompletionException ex) {
+ throw (IgniteException) ExceptionUtils.unwrapCause(ex);
+ }
advance();
diff --git a/modules/api/src/main/java/org/apache/ignite/sql/Session.java b/modules/api/src/main/java/org/apache/ignite/sql/Session.java
index 24fbbc11e4..8cab66cc22 100644
--- a/modules/api/src/main/java/org/apache/ignite/sql/Session.java
+++ b/modules/api/src/main/java/org/apache/ignite/sql/Session.java
@@ -215,12 +215,23 @@ public interface Session extends AutoCloseable {
CompletableFuture<Void> executeScriptAsync(String query, @Nullable Object... arguments);
/**
- * Return default query timeout.
+ * Return default query timeout which bound query execution time. In case a query take more time it's will be interrupted.
*
* @param timeUnit Timeunit to convert timeout to.
* @return Default query timeout in the given timeunit.
*/
- long defaultTimeout(TimeUnit timeUnit);
+ long defaultQueryTimeout(TimeUnit timeUnit);
+
+ /**
+ * Return default idle session timeout.
+ *
+ * <p>The maximum idle time (that is, time when no requests are performed on behalf the session) in milliseconds, after which this
+ * session will be considered expired.
+ *
+ * @param timeUnit Timeunit to convert timeout to.
+ * @return Session timeout in the given timeunit.
+ */
+ long idleTimeout(TimeUnit timeUnit);
/**
* Returns session default schema.
@@ -281,7 +292,7 @@ public interface Session extends AutoCloseable {
* @param timeUnit Timeunit to convert timeout to.
* @return Default query timeout in the given timeunit.
*/
- long defaultTimeout(TimeUnit timeUnit);
+ long defaultQueryTimeout(TimeUnit timeUnit);
/**
* Sets default query timeout.
@@ -290,7 +301,30 @@ public interface Session extends AutoCloseable {
* @param timeUnit Timeunit.
* @return {@code this} for chaining.
*/
- SessionBuilder defaultTimeout(long timeout, TimeUnit timeUnit);
+ SessionBuilder defaultQueryTimeout(long timeout, TimeUnit timeUnit);
+
+ /**
+ * Return the idle timeout.
+ *
+ * <p>The maximum idle time (that is, time when no requests are performed on behalf the session) in milliseconds, after which this
+ * session will be considered expired.
+ *
+ * @param timeUnit Timeunit to convert timeout to.
+ * @return Session timeout in the given timeunit.
+ */
+ long idleTimeout(TimeUnit timeUnit);
+
+ /**
+ * Sets idle timeout.
+ *
+ * <p>The maximum idle time (that is, time when no requests are performed on behalf the session) in milliseconds, after which this
+ * session will be considered expired.
+ *
+ * @param timeout Session timeout value.
+ * @param timeUnit Timeunit.
+ * @return {@code this} for chaining.
+ */
+ SessionBuilder idleTimeout(long timeout, TimeUnit timeUnit);
/**
* Returns session default schema.
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCursorCloseRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCursorCloseRequest.java
index 8ce318c847..35876043cc 100644
--- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCursorCloseRequest.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCursorCloseRequest.java
@@ -21,7 +21,6 @@ import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.handler.ClientResourceRegistry;
import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
import org.apache.ignite.lang.IgniteInternalCheckedException;
-import org.apache.ignite.sql.async.AsyncResultSet;
/**
* Client SQL cursor close request.
@@ -37,8 +36,8 @@ public class ClientSqlCursorCloseRequest {
throws IgniteInternalCheckedException {
long resourceId = in.unpackLong();
- AsyncResultSet asyncResultSet = resources.remove(resourceId).get(AsyncResultSet.class);
+ ClientSqlResultSet asyncResultSet = resources.remove(resourceId).get(ClientSqlResultSet.class);
- return asyncResultSet.closeAsync().toCompletableFuture();
+ return asyncResultSet.closeAsync();
}
}
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCursorNextPageRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCursorNextPageRequest.java
index 24c0bf7ce8..37ffbf3c71 100644
--- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCursorNextPageRequest.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCursorNextPageRequest.java
@@ -24,7 +24,6 @@ import org.apache.ignite.client.handler.ClientResourceRegistry;
import org.apache.ignite.internal.client.proto.ClientMessagePacker;
import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
import org.apache.ignite.lang.IgniteInternalCheckedException;
-import org.apache.ignite.sql.async.AsyncResultSet;
/**
* Client SQL cursor next page request.
@@ -44,9 +43,9 @@ public class ClientSqlCursorNextPageRequest {
throws IgniteInternalCheckedException {
long resourceId = in.unpackLong();
- AsyncResultSet asyncResultSet = resources.get(resourceId).get(AsyncResultSet.class);
+ var resultSet = resources.get(resourceId).get(ClientSqlResultSet.class);
- return asyncResultSet.fetchNextPage()
+ return resultSet.resultSet().fetchNextPage()
.thenCompose(r -> {
packCurrentPage(out, r);
out.packBoolean(r.hasMorePages());
@@ -58,7 +57,7 @@ public class ClientSqlCursorNextPageRequest {
// Ignore: either resource already removed, or registry is closing.
}
- return r.closeAsync();
+ return resultSet.closeAsync();
} else {
return CompletableFuture.completedFuture(null);
}
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
index b213f87d0a..ad106c35ef 100644
--- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
@@ -68,18 +68,23 @@ public class ClientSqlExecuteRequest {
return session
.executeAsync(tx, statement, arguments)
- .thenCompose(asyncResultSet -> writeResultSetAsync(out, resources, asyncResultSet));
+ .thenCompose(asyncResultSet -> writeResultSetAsync(out, resources, asyncResultSet, session));
}
private static CompletionStage<Void> writeResultSetAsync(
ClientMessagePacker out,
ClientResourceRegistry resources,
- AsyncResultSet asyncResultSet) {
- if (asyncResultSet.hasRowSet() && asyncResultSet.hasMorePages()) {
+ AsyncResultSet asyncResultSet,
+ Session session) {
+ boolean hasResource = asyncResultSet.hasRowSet() && asyncResultSet.hasMorePages();
+
+ if (hasResource) {
try {
+ var clientResultSet = new ClientSqlResultSet(asyncResultSet, session);
+
ClientResource resource = new ClientResource(
- asyncResultSet,
- () -> asyncResultSet.closeAsync().toCompletableFuture().join());
+ clientResultSet,
+ () -> clientResultSet.closeAsync().join());
out.packLong(resources.put(resource));
} catch (IgniteInternalCheckedException e) {
@@ -103,11 +108,13 @@ public class ClientSqlExecuteRequest {
// Pack first page.
if (asyncResultSet.hasRowSet()) {
packCurrentPage(out, asyncResultSet);
+
+ return hasResource
+ ? CompletableFuture.completedFuture(null)
+ : asyncResultSet.closeAsync().thenCompose(res -> session.closeAsync());
} else {
- return asyncResultSet.closeAsync();
+ return asyncResultSet.closeAsync().thenCompose(res -> session.closeAsync());
}
-
- return CompletableFuture.completedFuture(null);
}
private static Statement readStatement(ClientMessageUnpacker in, IgniteSql sql) {
@@ -130,7 +137,11 @@ public class ClientSqlExecuteRequest {
}
if (!in.tryUnpackNil()) {
- sessionBuilder.defaultTimeout(in.unpackLong(), TimeUnit.MILLISECONDS);
+ sessionBuilder.defaultQueryTimeout(in.unpackLong(), TimeUnit.MILLISECONDS);
+ }
+
+ if (!in.tryUnpackNil()) {
+ sessionBuilder.idleTimeout(in.unpackLong(), TimeUnit.MILLISECONDS);
}
var propCount = in.unpackMapHeader();
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCursorCloseRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlResultSet.java
similarity index 50%
copy from modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCursorCloseRequest.java
copy to modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlResultSet.java
index 8ce318c847..82734f5ce8 100644
--- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCursorCloseRequest.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlResultSet.java
@@ -18,27 +18,48 @@
package org.apache.ignite.client.handler.requests.sql;
import java.util.concurrent.CompletableFuture;
-import org.apache.ignite.client.handler.ClientResourceRegistry;
-import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
-import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.apache.ignite.sql.Session;
import org.apache.ignite.sql.async.AsyncResultSet;
/**
- * Client SQL cursor close request.
+ * Client result set wrapper.
*/
-public class ClientSqlCursorCloseRequest {
+class ClientSqlResultSet {
+ /** Result set. */
+ private final AsyncResultSet resultSet;
+
+ /** Session. */
+ private final Session session;
+
/**
- * Processes the request.
+ * Constructor.
*
- * @param in Unpacker.
- * @param resources Resources.
+ * @param resultSet Result set.
+ * @param session Session.
*/
- public static CompletableFuture<Void> process(ClientMessageUnpacker in, ClientResourceRegistry resources)
- throws IgniteInternalCheckedException {
- long resourceId = in.unpackLong();
+ public ClientSqlResultSet(AsyncResultSet resultSet, Session session) {
+ assert resultSet != null;
+ assert session != null;
- AsyncResultSet asyncResultSet = resources.remove(resourceId).get(AsyncResultSet.class);
+ this.resultSet = resultSet;
+ this.session = session;
+ }
- return asyncResultSet.closeAsync().toCompletableFuture();
+ /**
+ * Gets the result set.
+ *
+ * @return Result set.
+ */
+ public AsyncResultSet resultSet() {
+ return resultSet;
+ }
+
+ /**
+ * Closes underlying result set and session.
+ *
+ * @return Future representing pending completion of the operation.
+ */
+ public CompletableFuture<Void> closeAsync() {
+ return resultSet.closeAsync().thenCompose(res -> session.closeAsync()).toCompletableFuture();
}
}
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSession.java b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSession.java
index 2198809383..8c19fb211c 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSession.java
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSession.java
@@ -50,7 +50,10 @@ public class ClientSession implements Session {
private final String defaultSchema;
@Nullable
- private final Long defaultTimeout;
+ private final Long defaultQueryTimeout;
+
+ @Nullable
+ private final Long defaultSessionTimeout;
@Nullable
private final Map<String, Object> properties;
@@ -61,7 +64,8 @@ public class ClientSession implements Session {
* @param ch Channel.
* @param defaultPageSize Default page size.
* @param defaultSchema Default schema.
- * @param defaultTimeout Default timeout.
+ * @param defaultQueryTimeout Default query timeout.
+ * @param defaultSessionTimeout Default session timeout.
* @param properties Properties.
*/
@SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
@@ -69,12 +73,14 @@ public class ClientSession implements Session {
ReliableChannel ch,
@Nullable Integer defaultPageSize,
@Nullable String defaultSchema,
- @Nullable Long defaultTimeout,
+ @Nullable Long defaultQueryTimeout,
+ @Nullable Long defaultSessionTimeout,
@Nullable Map<String, Object> properties) {
this.ch = ch;
this.defaultPageSize = defaultPageSize;
this.defaultSchema = defaultSchema;
- this.defaultTimeout = defaultTimeout;
+ this.defaultQueryTimeout = defaultQueryTimeout;
+ this.defaultSessionTimeout = defaultSessionTimeout;
this.properties = properties;
}
@@ -107,7 +113,9 @@ public class ClientSession implements Session {
w.out().packObject(oneOf(clientStatement.defaultSchema(), defaultSchema));
w.out().packObject(oneOf(clientStatement.pageSizeNullable(), defaultPageSize));
- w.out().packObject(oneOf(clientStatement.queryTimeoutNullable(), defaultTimeout));
+ w.out().packObject(oneOf(clientStatement.queryTimeoutNullable(), defaultQueryTimeout));
+
+ w.out().packObject(defaultSessionTimeout);
packProperties(w, clientStatement.properties());
@@ -183,10 +191,18 @@ public class ClientSession implements Session {
/** {@inheritDoc} */
@Override
- public long defaultTimeout(TimeUnit timeUnit) {
+ public long defaultQueryTimeout(TimeUnit timeUnit) {
+ Objects.requireNonNull(timeUnit);
+
+ return defaultQueryTimeout == null ? 0 : timeUnit.convert(defaultQueryTimeout, TimeUnit.MILLISECONDS);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public long idleTimeout(TimeUnit timeUnit) {
Objects.requireNonNull(timeUnit);
- return defaultTimeout == null ? 0 : timeUnit.convert(defaultTimeout, TimeUnit.MILLISECONDS);
+ return defaultSessionTimeout == null ? 0 : timeUnit.convert(defaultSessionTimeout, TimeUnit.MILLISECONDS);
}
/** {@inheritDoc} */
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSessionBuilder.java b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSessionBuilder.java
index 9120235e67..032dff1448 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSessionBuilder.java
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSessionBuilder.java
@@ -39,8 +39,11 @@ public class ClientSessionBuilder implements SessionBuilder {
/** Default schema. */
private String defaultSchema;
- /** Default timeout. */
- private Long defaultTimeoutMs;
+ /** Default query timeout. */
+ private Long defaultQueryTimeoutMs;
+
+ /** Default session timeout. */
+ private Long defaultSessionTimeoutMs;
/** Page size. */
private Integer pageSize;
@@ -55,17 +58,33 @@ public class ClientSessionBuilder implements SessionBuilder {
}
@Override
- public long defaultTimeout(TimeUnit timeUnit) {
+ public long defaultQueryTimeout(TimeUnit timeUnit) {
+ Objects.requireNonNull(timeUnit);
+
+ return timeUnit.convert(defaultQueryTimeoutMs == null ? 0 : defaultQueryTimeoutMs, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public SessionBuilder defaultQueryTimeout(long timeout, TimeUnit timeUnit) {
+ Objects.requireNonNull(timeUnit);
+
+ defaultQueryTimeoutMs = TimeUnit.MILLISECONDS.convert(timeout, timeUnit);
+
+ return this;
+ }
+
+ @Override
+ public long idleTimeout(TimeUnit timeUnit) {
Objects.requireNonNull(timeUnit);
- return timeUnit.convert(defaultTimeoutMs == null ? 0 : defaultTimeoutMs, TimeUnit.MILLISECONDS);
+ return timeUnit.convert(defaultSessionTimeoutMs == null ? 0 : defaultSessionTimeoutMs, TimeUnit.MILLISECONDS);
}
@Override
- public SessionBuilder defaultTimeout(long timeout, TimeUnit timeUnit) {
+ public SessionBuilder idleTimeout(long timeout, TimeUnit timeUnit) {
Objects.requireNonNull(timeUnit);
- defaultTimeoutMs = TimeUnit.MILLISECONDS.convert(timeout, timeUnit);
+ defaultSessionTimeoutMs = timeUnit.toMillis(timeout);
return this;
}
@@ -108,6 +127,6 @@ public class ClientSessionBuilder implements SessionBuilder {
@Override
public Session build() {
- return new ClientSession(ch, pageSize, defaultSchema, defaultTimeoutMs, new HashMap<>(properties));
+ return new ClientSession(ch, pageSize, defaultSchema, defaultQueryTimeoutMs, defaultSessionTimeoutMs, new HashMap<>(properties));
}
}
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java
index 8d2abbdfd9..88dbbd15fa 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java
@@ -43,7 +43,7 @@ public class ClientSql implements IgniteSql {
/** {@inheritDoc} */
@Override
public Session createSession() {
- return new ClientSession(ch, null, null, null, null);
+ return new ClientSession(ch, null, null, null, null, null);
}
/** {@inheritDoc} */
diff --git a/modules/client/src/test/java/org/apache/ignite/client/ClientSqlTest.java b/modules/client/src/test/java/org/apache/ignite/client/ClientSqlTest.java
index 660ba32aae..1adf3f40cf 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/ClientSqlTest.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/ClientSqlTest.java
@@ -80,7 +80,7 @@ public class ClientSqlTest extends AbstractClientTableTest {
public void testSessionPropertiesPropagation() {
Session session = client.sql().sessionBuilder()
.defaultSchema("SCHEMA1")
- .defaultTimeout(123, TimeUnit.SECONDS)
+ .defaultQueryTimeout(123, TimeUnit.SECONDS)
.defaultPageSize(234)
.property("prop1", "1")
.property("prop2", "2")
@@ -102,7 +102,7 @@ public class ClientSqlTest extends AbstractClientTableTest {
public void testStatementPropertiesOverrideSessionProperties() {
Session session = client.sql().sessionBuilder()
.defaultSchema("SCHEMA1")
- .defaultTimeout(123, TimeUnit.SECONDS)
+ .defaultQueryTimeout(123, TimeUnit.SECONDS)
.defaultPageSize(234)
.property("prop1", "1")
.property("prop2", "2")
diff --git a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeAsyncResultSet.java b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeAsyncResultSet.java
index 0087b071c9..2ae86dde43 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeAsyncResultSet.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeAsyncResultSet.java
@@ -29,6 +29,7 @@ import java.util.ArrayList;
import java.util.BitSet;
import java.util.List;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.internal.client.sql.ClientSqlRow;
@@ -81,7 +82,7 @@ public class FakeAsyncResultSet implements AsyncResultSet {
rows = new ArrayList<>();
rows.add(getRow("schema", session.defaultSchema()));
- rows.add(getRow("timeout", String.valueOf(session.defaultTimeout(TimeUnit.MILLISECONDS))));
+ rows.add(getRow("timeout", String.valueOf(session.defaultQueryTimeout(TimeUnit.MILLISECONDS))));
rows.add(getRow("pageSize", String.valueOf(session.defaultPageSize())));
var props = ((FakeSession) session).properties();
@@ -205,7 +206,7 @@ public class FakeAsyncResultSet implements AsyncResultSet {
/** {@inheritDoc} */
@Override
public CompletionStage<Void> closeAsync() {
- return null;
+ return CompletableFuture.completedFuture(null);
}
@NotNull
diff --git a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteQueryProcessor.java b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteQueryProcessor.java
index f5f4d60daa..fbac649e26 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteQueryProcessor.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteQueryProcessor.java
@@ -31,7 +31,7 @@ import org.apache.ignite.internal.sql.engine.session.SessionId;
*/
public class FakeIgniteQueryProcessor implements QueryProcessor {
@Override
- public SessionId createSession(PropertiesHolder queryProperties) {
+ public SessionId createSession(long sessionTimeoutMs, PropertiesHolder queryProperties) {
return new SessionId(UUID.randomUUID());
}
diff --git a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeSession.java b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeSession.java
index 809f6a8d28..6a7eb00272 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeSession.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeSession.java
@@ -43,7 +43,10 @@ public class FakeSession implements Session {
private final String defaultSchema;
@Nullable
- private final Long defaultTimeout;
+ private final Long defaultQueryTimeout;
+
+ @Nullable
+ private final Long defaultSessionTimeout;
@Nullable
private final Map<String, Object> properties;
@@ -53,18 +56,20 @@ public class FakeSession implements Session {
*
* @param defaultPageSize Default page size.
* @param defaultSchema Default schema.
- * @param defaultTimeout Default timeout.
+ * @param defaultQueryTimeout Default timeout.
* @param properties Properties.
*/
@SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
public FakeSession(
@Nullable Integer defaultPageSize,
@Nullable String defaultSchema,
- @Nullable Long defaultTimeout,
+ @Nullable Long defaultQueryTimeout,
+ @Nullable Long defaultSessionTimeout,
@Nullable Map<String, Object> properties) {
this.defaultPageSize = defaultPageSize;
this.defaultSchema = defaultSchema;
- this.defaultTimeout = defaultTimeout;
+ this.defaultQueryTimeout = defaultQueryTimeout;
+ this.defaultSessionTimeout = defaultSessionTimeout;
this.properties = properties;
}
@@ -147,8 +152,14 @@ public class FakeSession implements Session {
/** {@inheritDoc} */
@Override
- public long defaultTimeout(TimeUnit timeUnit) {
- return defaultTimeout;
+ public long defaultQueryTimeout(TimeUnit timeUnit) {
+ return defaultQueryTimeout;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public long idleTimeout(TimeUnit timeUnit) {
+ return defaultSessionTimeout;
}
/** {@inheritDoc} */
diff --git a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeSessionBuilder.java b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeSessionBuilder.java
index 178eb354be..7984d2fa09 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeSessionBuilder.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeSessionBuilder.java
@@ -33,24 +33,42 @@ public class FakeSessionBuilder implements SessionBuilder {
private String defaultSchema;
- private Long defaultTimeoutMs;
+ private Long defaultQueryTimeoutMs;
+
+ private Long defaultSessionTimeoutMs;
private Integer pageSize;
/** {@inheritDoc} */
@Override
- public long defaultTimeout(TimeUnit timeUnit) {
+ public long defaultQueryTimeout(TimeUnit timeUnit) {
Objects.requireNonNull(timeUnit);
- return timeUnit.convert(defaultTimeoutMs == null ? 0 : defaultTimeoutMs, TimeUnit.MILLISECONDS);
+ return timeUnit.convert(defaultQueryTimeoutMs == null ? 0 : defaultQueryTimeoutMs, TimeUnit.MILLISECONDS);
}
/** {@inheritDoc} */
@Override
- public SessionBuilder defaultTimeout(long timeout, TimeUnit timeUnit) {
+ public SessionBuilder defaultQueryTimeout(long timeout, TimeUnit timeUnit) {
+ Objects.requireNonNull(timeUnit);
+
+ defaultQueryTimeoutMs = TimeUnit.MILLISECONDS.convert(timeout, timeUnit);
+
+ return this;
+ }
+
+ @Override
+ public long idleTimeout(TimeUnit timeUnit) {
+ Objects.requireNonNull(timeUnit);
+
+ return timeUnit.convert(defaultSessionTimeoutMs == null ? 0 : defaultSessionTimeoutMs, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public SessionBuilder idleTimeout(long timeout, TimeUnit timeUnit) {
Objects.requireNonNull(timeUnit);
- defaultTimeoutMs = TimeUnit.MILLISECONDS.convert(timeout, timeUnit);
+ defaultSessionTimeoutMs = TimeUnit.MILLISECONDS.convert(timeout, timeUnit);
return this;
}
@@ -100,6 +118,6 @@ public class FakeSessionBuilder implements SessionBuilder {
/** {@inheritDoc} */
@Override
public Session build() {
- return new FakeSession(pageSize, defaultSchema, defaultTimeoutMs, new HashMap<>(properties));
+ return new FakeSession(pageSize, defaultSchema, defaultQueryTimeoutMs, defaultSessionTimeoutMs, new HashMap<>(properties));
}
}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
index 97b9d64d42..24794213c4 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
@@ -177,6 +177,7 @@ namespace Apache.Ignite.Tests
props["schema"] = reader.TryReadNil() ? null : reader.ReadString();
props["pageSize"] = reader.TryReadNil() ? (int?)null : reader.ReadInt32();
props["timeoutMs"] = reader.TryReadNil() ? (long?)null : reader.ReadInt64();
+ props["sessionTimeoutMs"] = reader.TryReadNil() ? (long?)null : reader.ReadInt64();
// ReSharper restore RedundantCast
var propCount = reader.ReadMapHeader();
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/Sql/SqlTests.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/Sql/SqlTests.cs
index 41461004a0..ddfec19d2e 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Sql/SqlTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Sql/SqlTests.cs
@@ -263,7 +263,7 @@ namespace Apache.Ignite.Tests.Sql
var props = rows.ToDictionary(x => (string)x["NAME"]!, x => (string)x["VAL"]!);
Assert.IsTrue(res.HasRowSet);
- Assert.AreEqual(8, props.Count);
+ Assert.AreEqual(9, props.Count);
Assert.AreEqual("schema-1", props["schema"]);
Assert.AreEqual("987", props["pageSize"]);
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/RecordViewBinaryTests.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/RecordViewBinaryTests.cs
index 4c863c0164..816b5adc6d 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/RecordViewBinaryTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/RecordViewBinaryTests.cs
@@ -510,7 +510,6 @@ namespace Apache.Ignite.Tests.Table
byte.MinValue + 1,
-1,
0,
- 1,
byte.MaxValue - 1,
byte.MaxValue,
(long)byte.MaxValue + 1,
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/RecordViewPocoTests.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/RecordViewPocoTests.cs
index cf59f074e6..e9fbea0db8 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/RecordViewPocoTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/RecordViewPocoTests.cs
@@ -501,7 +501,6 @@ namespace Apache.Ignite.Tests.Table
byte.MinValue + 1,
-1,
0,
- 1,
byte.MaxValue - 1,
byte.MaxValue,
(long)byte.MaxValue + 1,
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Sql/Sql.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/Sql/Sql.cs
index 4b37753b41..bfe0124c48 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Sql/Sql.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Sql/Sql.cs
@@ -66,6 +66,7 @@ namespace Apache.Ignite.Internal.Sql
w.Write(statement.Schema);
w.Write(statement.PageSize);
w.Write((long)statement.Timeout.TotalMilliseconds);
+ w.WriteNil(); // Session timeout (unused, session is closed by the server immediately).
w.WriteMapHeader(statement.Properties.Count);
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItCommonApiTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItCommonApiTest.java
new file mode 100644
index 0000000000..33fade1473
--- /dev/null
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItCommonApiTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.api;
+
+import static org.apache.ignite.lang.ErrorGroups.Sql;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.sql.engine.AbstractBasicIntegrationTest;
+import org.apache.ignite.internal.sql.engine.SqlQueryProcessor;
+import org.apache.ignite.sql.CursorClosedException;
+import org.apache.ignite.sql.IgniteSql;
+import org.apache.ignite.sql.ResultSet;
+import org.apache.ignite.sql.Session;
+import org.apache.ignite.sql.SqlException;
+import org.junit.jupiter.api.Test;
+
+/** Test common SQL API. */
+public class ItCommonApiTest extends AbstractBasicIntegrationTest {
+ protected SqlQueryProcessor queryProcessor() {
+ return (SqlQueryProcessor) ((IgniteImpl) CLUSTER_NODES.get(0)).queryEngine();
+ }
+
+ /**
+ * Gets the SQL API.
+ *
+ * @return SQL API.
+ */
+ protected IgniteSql igniteSql() {
+ return CLUSTER_NODES.get(0).sql();
+ }
+
+ @Override
+ protected int nodes() {
+ return 1;
+ }
+
+ /** Check correctness of session expiration. */
+ @Test
+ public void testSessionExpiration() throws Exception {
+ long timeout = TimeUnit.SECONDS.toMillis(2); // time from SessionManager.checkPeriod * 2
+
+ IgniteSql sql = igniteSql();
+
+ sql("CREATE TABLE TST(id INTEGER PRIMARY KEY, val INTEGER)");
+ sql("INSERT INTO TST VALUES (1,1), (2,2), (3,3), (4,4)");
+
+ Session ses1 = sql.sessionBuilder().defaultPageSize(1).idleTimeout(1, TimeUnit.MILLISECONDS).build();
+ Session ses2 = sql.sessionBuilder().defaultPageSize(1).idleTimeout(100, TimeUnit.SECONDS).build();
+
+ ResultSet rs1 = ses1.execute(null, "SELECT id FROM TST");
+ ResultSet rs2 = ses2.execute(null, "SELECT id FROM TST");
+
+ // waiting for run session cleanup thread
+ Thread.sleep(timeout);
+
+ // first session should be expired for the moment
+ SqlException ex = assertThrows(SqlException.class, () -> ses1.execute(null, "SELECT 1 + 1"));
+ assertEquals(Sql.SESSION_NOT_FOUND_ERR, ex.code());
+
+ // already started query should fail due to session has been expired
+ ex = assertThrows(CursorClosedException.class, () -> {
+ while (rs1.hasNext()) {
+ rs1.next();
+ }
+ });
+
+ rs1.close();
+
+ assertEquals(Sql.CURSOR_CLOSED_ERR, ex.code());
+
+ // second session could proceed with execution
+ while (rs2.hasNext()) {
+ rs2.next();
+ }
+
+ // second session could start new query
+ ses2.execute(null, "SELECT 2 + 2").close();
+ }
+}
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItPublicApiColocationTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItPublicApiColocationTest.java
index d9589d60f4..168e0e43d0 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItPublicApiColocationTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItPublicApiColocationTest.java
@@ -44,6 +44,7 @@ import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
import org.apache.ignite.table.Table;
import org.apache.ignite.table.Tuple;
import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
@@ -129,6 +130,7 @@ public class ItPublicApiColocationTest extends AbstractBasicIntegrationTest {
* Check colocation by one column for all types.
* TODO: https://issues.apache.org/jira/browse/IGNITE-16711 - supports DECIMAL
*/
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-17557")
@ParameterizedTest(name = "types=" + ARGUMENTS_PLACEHOLDER)
@MethodSource("twoColumnsParameters")
public void colocationTwoColumns(NativeTypeSpec t0, NativeTypeSpec t1) throws ExecutionException, InterruptedException {
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionBuilderImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionBuilderImpl.java
index 5f605aaa7b..ca8a894370 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionBuilderImpl.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionBuilderImpl.java
@@ -32,11 +32,14 @@ import org.jetbrains.annotations.Nullable;
*/
public class SessionBuilderImpl implements SessionBuilder {
- public static final long DEFAULT_TIMEOUT = 0;
+ public static final long DEFAULT_QUERY_TIMEOUT = 0;
+ public static final long DEFAULT_SESSION_TIMEOUT = TimeUnit.MINUTES.toMillis(5);
private final QueryProcessor qryProc;
- private long timeout = DEFAULT_TIMEOUT;
+ private long queryTimeout = DEFAULT_QUERY_TIMEOUT;
+
+ private long sessionTimeout = DEFAULT_SESSION_TIMEOUT;
private String schema = Session.DEFAULT_SCHEMA;
@@ -57,14 +60,28 @@ public class SessionBuilderImpl implements SessionBuilder {
/** {@inheritDoc} */
@Override
- public long defaultTimeout(TimeUnit timeUnit) {
- return timeUnit.convert(timeout, TimeUnit.NANOSECONDS);
+ public long defaultQueryTimeout(TimeUnit timeUnit) {
+ return timeUnit.convert(queryTimeout, TimeUnit.MILLISECONDS);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public SessionBuilder defaultQueryTimeout(long timeout, TimeUnit timeUnit) {
+ this.queryTimeout = timeUnit.toMillis(timeout);
+
+ return this;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public long idleTimeout(TimeUnit timeUnit) {
+ return timeUnit.convert(sessionTimeout, TimeUnit.MILLISECONDS);
}
/** {@inheritDoc} */
@Override
- public SessionBuilder defaultTimeout(long timeout, TimeUnit timeUnit) {
- this.timeout = timeUnit.toNanos(timeout);
+ public SessionBuilder idleTimeout(long timeout, TimeUnit timeUnit) {
+ this.sessionTimeout = timeUnit.toMillis(timeout);
return this;
}
@@ -120,17 +137,18 @@ public class SessionBuilderImpl implements SessionBuilder {
public Session build() {
var propsHolder = PropertiesHolder.holderFor(
Map.of(
- QueryProperty.QUERY_TIMEOUT, timeout,
+ QueryProperty.QUERY_TIMEOUT, queryTimeout,
QueryProperty.DEFAULT_SCHEMA, schema
)
);
- var sessionId = qryProc.createSession(propsHolder);
+ var sessionId = qryProc.createSession(sessionTimeout, propsHolder);
return new SessionImpl(
sessionId,
qryProc,
pageSize,
+ sessionTimeout,
propsHolder
);
}
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionImpl.java
index e2f0428a91..8a380c5740 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionImpl.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionImpl.java
@@ -70,6 +70,8 @@ public class SessionImpl implements Session {
private final int pageSize;
+ private final long sessionTimeout;
+
private final PropertiesHolder props;
/**
@@ -77,17 +79,20 @@ public class SessionImpl implements Session {
*
* @param qryProc Query processor.
* @param pageSize Query fetch page size.
+ * @param sessionTimeoutMs Session timeout in milliseconds.
* @param props Session's properties.
*/
SessionImpl(
SessionId sessionId,
QueryProcessor qryProc,
int pageSize,
+ long sessionTimeoutMs,
PropertiesHolder props
) {
this.qryProc = qryProc;
this.sessionId = sessionId;
this.pageSize = pageSize;
+ this.sessionTimeout = sessionTimeoutMs;
this.props = props;
}
@@ -105,8 +110,14 @@ public class SessionImpl implements Session {
/** {@inheritDoc} */
@Override
- public long defaultTimeout(TimeUnit timeUnit) {
- return timeUnit.convert(props.get(QueryProperty.QUERY_TIMEOUT), TimeUnit.NANOSECONDS);
+ public long defaultQueryTimeout(TimeUnit timeUnit) {
+ return timeUnit.convert(props.get(QueryProperty.QUERY_TIMEOUT), TimeUnit.MILLISECONDS);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public long idleTimeout(TimeUnit timeUnit) {
+ return timeUnit.convert(sessionTimeout, TimeUnit.MILLISECONDS);
}
/** {@inheritDoc} */
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryProcessor.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryProcessor.java
index 804aa55375..e5fb1950fd 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryProcessor.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryProcessor.java
@@ -31,10 +31,11 @@ public interface QueryProcessor extends IgniteComponent {
/**
* Creates a session with given properties.
*
+ * @param sessionTimeoutMs Session timeout in millisecond.
* @param queryProperties Properties to store within a new session.
* @return An identifier of a created session.
*/
- SessionId createSession(PropertiesHolder queryProperties);
+ SessionId createSession(long sessionTimeoutMs, PropertiesHolder queryProperties);
/**
* Closes the session with given id.
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
index be7b065a45..e8aa8f3152 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
@@ -90,6 +90,11 @@ public class SqlQueryProcessor implements QueryProcessor {
/** Size of the cache for query plans. */
public static final int PLAN_CACHE_SIZE = 1024;
+ /** Session expiration check period in milliseconds. */
+ public static final long SESSION_EXPIRE_CHECK_PERIOD = TimeUnit.SECONDS.toMillis(1);
+
+ private final List<LifecycleAware> services = new ArrayList<>();
+
private final ClusterService clusterSrvc;
private final TableManager tableManager;
@@ -102,8 +107,6 @@ public class SqlQueryProcessor implements QueryProcessor {
private final DataStorageManager dataStorageManager;
- private final SessionManager sessionManager = new SessionManager(System::currentTimeMillis);
-
private final Supplier<Map<String, Map<String, Class<?>>>> dataStorageFieldsSupplier;
/** Busy lock for stop synchronisation. */
@@ -112,7 +115,7 @@ public class SqlQueryProcessor implements QueryProcessor {
/** Event listeners to close. */
private final List<Pair<Event, EventListener>> evtLsnrs = new ArrayList<>();
- private final List<LifecycleAware> services = new ArrayList<>();
+ private volatile SessionManager sessionManager;
private volatile QueryTaskExecutor taskExecutor;
@@ -146,6 +149,8 @@ public class SqlQueryProcessor implements QueryProcessor {
public synchronized void start() {
var nodeName = clusterSrvc.topologyService().localMember().name();
+ sessionManager = registerService(new SessionManager(nodeName, SESSION_EXPIRE_CHECK_PERIOD, System::currentTimeMillis));
+
taskExecutor = registerService(new QueryTaskExecutorImpl(nodeName));
var mailboxRegistry = registerService(new MailboxRegistryImpl());
@@ -208,9 +213,9 @@ public class SqlQueryProcessor implements QueryProcessor {
/** {@inheritDoc} */
@Override
- public SessionId createSession(PropertiesHolder queryProperties) {
+ public SessionId createSession(long sessionTimeoutMs, PropertiesHolder queryProperties) {
return sessionManager.createSession(
- TimeUnit.MINUTES.toMillis(5),
+ sessionTimeoutMs,
queryProperties
);
}
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/session/Session.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/session/Session.java
index 9a444a2987..d07df020bd 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/session/Session.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/session/Session.java
@@ -88,6 +88,11 @@ public class Session implements AsyncCloseable {
return sessionId;
}
+ /** Returns the duration in millis after which the session will be considered expired if no one touched it in the middle. */
+ public long idleTimeoutMs() {
+ return idleTimeoutMs;
+ }
+
/** Checks whether the given session has expired or not. */
public boolean expired() {
var last = lastTouched.get();
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/session/SessionManager.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/session/SessionManager.java
index 1e8b800c1c..6df7a9e1c9 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/session/SessionManager.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/session/SessionManager.java
@@ -21,31 +21,65 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.sql.engine.CurrentTimeProvider;
+import org.apache.ignite.internal.sql.engine.exec.LifecycleAware;
import org.apache.ignite.internal.sql.engine.property.PropertiesHolder;
+import org.apache.ignite.internal.thread.IgniteThread;
+import org.apache.ignite.internal.util.worker.IgniteWorker;
import org.jetbrains.annotations.Nullable;
/**
* A manager of a server side sql sessions.
*/
-public class SessionManager {
+public class SessionManager implements LifecycleAware {
+ private static final IgniteLogger LOG = Loggers.forClass(SessionManager.class);
+
+ /** Active sessions. */
private final Map<SessionId, Session> activeSessions = new ConcurrentHashMap<>();
+
private final CurrentTimeProvider timeProvider;
+ /** Session expiration worker. */
+ private final IgniteWorker expirationWorker;
+
+ private final AtomicBoolean startedFlag = new AtomicBoolean(false);
+
/**
* Constructor.
*
+ * @param igniteInstanceName The name of the current node.
+ * @param expirationCheckPeriod Time period in milliseconds to check sessions expiration.
* @param timeProvider A time provider to use for session management.
*/
- public SessionManager(CurrentTimeProvider timeProvider) {
+ public SessionManager(String igniteInstanceName, long expirationCheckPeriod, CurrentTimeProvider timeProvider) {
this.timeProvider = timeProvider;
+
+ expirationWorker = new IgniteWorker(LOG, igniteInstanceName, "session_cleanup-thread", null) {
+ @Override
+ protected void body() throws InterruptedException {
+ while (!isCancelled()) {
+ blockingSectionBegin();
+ try {
+ Thread.sleep(expirationCheckPeriod);
+ } finally {
+ blockingSectionEnd();
+ }
+
+ activeSessions.values().stream().filter(Session::expired).forEach((s) -> destroySession(s));
+
+ LOG.debug("Expired SQL sessions has been cleaned up. Active sessions [count={}]", activeSessions.size());
+ }
+ }
+ };
}
/**
* Creates a new session.
*
- * @param idleTimeoutMs Duration in milliseconds after which the session will be considered expired if no action have been
- * performed on behalf of this session during this period.
+ * @param idleTimeoutMs Duration in milliseconds after which the session will be considered expired if no action have been performed on
+ * behalf of this session during this period.
* @param queryProperties Properties to keep within the session.
* @return A new session.
*/
@@ -56,6 +90,7 @@ public class SessionManager {
var applied = new AtomicBoolean(false);
SessionId sessionId;
+
do {
sessionId = nextSessionId();
@@ -79,13 +114,43 @@ public class SessionManager {
var session = activeSessions.get(sessionId);
if (session != null && !session.touch()) {
+ destroySession(session);
session = null;
}
return session;
}
+ /**
+ * Destroy a given session.
+ *
+ * @param session Session which should be destroyed
+ */
+ private void destroySession(Session session) {
+ activeSessions.remove(session.sessionId());
+ session.closeAsync();
+ }
+
private SessionId nextSessionId() {
return new SessionId(UUID.randomUUID());
}
+
+ /**
+ * Initialize the service by starting session expiration thread.
+ */
+ @Override
+ public void start() {
+ if (startedFlag.compareAndSet(false, true)) {
+ IgniteThread expirationThread = new IgniteThread(expirationWorker);
+
+ expirationThread.setDaemon(true);
+ expirationThread.start();
+ }
+ }
+
+ /** Stop the service by stopping session expiration thread. */
+ @Override
+ public void stop() {
+ expirationWorker.cancel();
+ }
}
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/IgniteSqlApiTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/IgniteSqlApiTest.java
index cdfbb80af3..a91d14476a 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/IgniteSqlApiTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/IgniteSqlApiTest.java
@@ -101,7 +101,7 @@ public class IgniteSqlApiTest {
// Create session with params.
Session sessionWithParams = igniteSql.sessionBuilder()
- .defaultTimeout(10_000, TimeUnit.MILLISECONDS) // Set default timeout.
+ .defaultQueryTimeout(10_000, TimeUnit.MILLISECONDS) // Set default timeout.
.property("memoryQuota", 10 * Constants.MiB) // Set default quota.
.build();
@@ -410,7 +410,7 @@ public class IgniteSqlApiTest {
SessionBuilder sessionBuilder = Mockito.mock(SessionBuilder.class);
Mockito.when(sessionBuilder.defaultSchema(Mockito.anyString())).thenAnswer(Answers.RETURNS_SELF);
- Mockito.when(sessionBuilder.defaultTimeout(Mockito.anyLong(), Mockito.any(TimeUnit.class))).thenAnswer(Answers.RETURNS_SELF);
+ Mockito.when(sessionBuilder.defaultQueryTimeout(Mockito.anyLong(), Mockito.any(TimeUnit.class))).thenAnswer(Answers.RETURNS_SELF);
Mockito.when(sessionBuilder.property(Mockito.anyString(), Mockito.any())).thenAnswer(Answers.RETURNS_SELF);
Mockito.when(sessionBuilder.build()).thenReturn(session);
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
index 0221dc4e1b..f2747c9e23 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
@@ -297,7 +297,7 @@ public class MockedStructuresTest extends IgniteAbstractTest {
*/
@Test
public void testInnerTxInitiated() throws Exception {
- SessionId sesId = queryProc.createSession(PropertiesHolder.holderFor(Map.of()));
+ SessionId sesId = queryProc.createSession(1000, PropertiesHolder.holderFor(Map.of()));
InternalTransaction tx = mock(InternalTransaction.class);
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/session/SessionManagerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/session/SessionManagerTest.java
new file mode 100644
index 0000000000..992dbc670d
--- /dev/null
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/session/SessionManagerTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.session;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.internal.sql.engine.property.PropertiesHolder;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * SessionManager tests.
+ */
+class SessionManagerTest {
+
+ private SessionManager sessionMgr;
+ private AtomicLong clock = new AtomicLong(System.currentTimeMillis());
+
+ @BeforeEach
+ void beforeEach() {
+ sessionMgr = new SessionManager("test", 20, () -> clock.get());
+ }
+
+ @AfterEach
+ void afterEach() {
+ sessionMgr.stop();
+ }
+
+ @Test
+ void sessionGet() {
+ PropertiesHolder propHldr = PropertiesHolder.holderFor(Map.of());
+
+ SessionId sessionId = sessionMgr.createSession(12345, propHldr);
+
+ Session session = sessionMgr.session(sessionId);
+ assertNotNull(session);
+ assertSame(propHldr, session.queryProperties());
+ assertEquals(12345, session.idleTimeoutMs());
+
+ SessionId unknownSessionId = new SessionId(UUID.randomUUID());
+ assertNull(sessionMgr.session(unknownSessionId));
+ }
+
+ @Test
+ void sessionExpiration() throws InterruptedException {
+ clock.set(1);
+ SessionId sessionId = sessionMgr.createSession(2, null);
+
+ Session session = sessionMgr.session(sessionId);
+ assertFalse(session.expired());
+
+ //period is small to expire session
+ clock.set(2);
+ assertFalse(session.expired());
+
+ //period is enough to session expired, but we touch session and prolong times live
+ clock.set(4);
+ assertNotNull(sessionMgr.session(sessionId));
+ assertFalse(session.expired());
+
+ clock.set(7);
+ assertTrue(session.expired());
+ assertNull(sessionMgr.session(sessionId));
+ // touch session don't change already expire state.
+ assertTrue(session.expired());
+ }
+
+ @Test
+ void expirationThreadTests() throws InterruptedException {
+ long idleTimeout = 20;
+
+ SessionManager sesMgr = new SessionManager("test", 20, System::currentTimeMillis);
+ sesMgr.start();
+
+ SessionId sessionId1 = sesMgr.createSession(idleTimeout, null);
+ SessionId sessionId2 = sesMgr.createSession(idleTimeout, null);
+
+ AtomicBoolean signal1 = new AtomicBoolean(false);
+ AtomicBoolean signal2 = new AtomicBoolean(false);
+
+ Session session1 = sesMgr.session(sessionId1);
+ session1.registerResource(() -> {
+ signal1.set(true);
+ return CompletableFuture.completedFuture(null);
+ }
+ );
+
+ Session session2 = sesMgr.session(sessionId2);
+ session2.registerResource(() -> {
+ signal2.set(true);
+ return CompletableFuture.completedFuture(null);
+ }
+ );
+
+ // waiting for expiration first session meanwhile touch second session.
+ IgniteTestUtils.waitForCondition(
+ () -> {
+ sesMgr.session(sessionId2);
+ return signal1.get();
+ },
+ 10,
+ 50);
+
+ // The first session should be expired.
+ assertNull(sesMgr.session(sessionId1));
+ assertTrue(session1.expired());
+ // The second session is alive due to it has been touched.
+ assertNotNull(sesMgr.session(sessionId2));
+ assertFalse(session2.expired());
+
+ IgniteTestUtils.waitForCondition(
+ () -> signal2.get(),
+ 10,
+ 50);
+
+ assertNull(sesMgr.session(sessionId2));
+ assertTrue(session2.expired());
+
+ sesMgr.stop();
+ }
+}