You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by pt...@apache.org on 2022/05/30 13:10:52 UTC
[ignite-3] branch ignite-14972 updated: wip ClientSession.executeAsync
This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch ignite-14972
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/ignite-14972 by this push:
new a2af5cf30 wip ClientSession.executeAsync
a2af5cf30 is described below
commit a2af5cf306f6111651244b3d3a4585b0ba3d61e4
Author: Pavel Tupitsyn <pt...@apache.org>
AuthorDate: Mon May 30 16:10:42 2022 +0300
wip ClientSession.executeAsync
---
.../main/java/org/apache/ignite/sql/Statement.java | 7 +++
.../requests/sql/ClientSqlExecuteRequest.java | 2 +-
.../ignite/internal/client/sql/ClientSession.java | 63 +++++++++++++++++++++-
3 files changed, 69 insertions(+), 3 deletions(-)
diff --git a/modules/api/src/main/java/org/apache/ignite/sql/Statement.java b/modules/api/src/main/java/org/apache/ignite/sql/Statement.java
index fbde3f8b5..ec30821a7 100644
--- a/modules/api/src/main/java/org/apache/ignite/sql/Statement.java
+++ b/modules/api/src/main/java/org/apache/ignite/sql/Statement.java
@@ -61,6 +61,13 @@ public interface Statement extends AutoCloseable {
*/
int pageSize();
+ /**
+ * Returns a value indicating whether this is a prepared statement.
+ *
+ * @return Whether this is a prepared statement.
+ */
+ boolean prepared();
+
/**
* Returns statement property value that overrides the session property value or {@code null} if session property value should be used.
*
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
index affad42a4..1b00750fd 100644
--- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
@@ -60,7 +60,7 @@ public class ClientSqlExecuteRequest {
.defaultSchema(in.unpackString())
.defaultTimeout(in.unpackLong(), TimeUnit.MILLISECONDS);
- var propCount = in.unpackInt();
+ var propCount = in.unpackMapHeader();
for (int i = 0; i < propCount; i++) {
sessionBuilder.property(in.unpackString(), in.unpackObjectWithType());
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSession.java b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSession.java
index 1ce06e51e..c6907296e 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSession.java
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSession.java
@@ -18,11 +18,16 @@
package org.apache.ignite.internal.client.sql;
import static org.apache.ignite.internal.client.ClientUtils.sync;
+import static org.apache.ignite.internal.client.table.ClientTable.writeTx;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow.Publisher;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.internal.client.ReliableChannel;
+import org.apache.ignite.internal.client.proto.ClientOp;
import org.apache.ignite.sql.BatchedArguments;
import org.apache.ignite.sql.ResultSet;
import org.apache.ignite.sql.Session;
@@ -39,13 +44,37 @@ public class ClientSession implements Session {
/** Channel. */
private final ReliableChannel ch;
+ private final int defaultPageSize;
+
+ @Nullable
+ private final String defaultSchema;
+
+ private final long defaultTimeout;
+
+ @Nullable
+ private final Map<String, Object> properties;
+
/**
* Constructor.
*
* @param ch Channel.
+ * @param defaultPageSize Default page size.
+ * @param defaultSchema Default schema.
+ * @param defaultTimeout Default timeout.
+ * @param properties Properties.
*/
- public ClientSession(ReliableChannel ch) {
+ @SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
+ public ClientSession(
+ ReliableChannel ch,
+ int defaultPageSize,
+ @Nullable String defaultSchema,
+ long defaultTimeout,
+ @Nullable Map<String, Object> properties) {
this.ch = ch;
+ this.defaultPageSize = defaultPageSize;
+ this.defaultSchema = defaultSchema;
+ this.defaultTimeout = defaultTimeout;
+ this.properties = properties;
}
/** {@inheritDoc} */
@@ -72,7 +101,37 @@ public class ClientSession implements Session {
@Override
public CompletableFuture<AsyncResultSet> executeAsync(@Nullable Transaction transaction, Statement statement,
@Nullable Object... arguments) {
- return null;
+ Objects.requireNonNull(statement);
+
+ return ch.serviceAsync(ClientOp.SQL_EXEC, w -> {
+ writeTx(transaction, w);
+
+ w.out().packInt(defaultPageSize);
+ w.out().packString(defaultSchema);
+ w.out().packLong(defaultTimeout);
+
+ if (properties != null) {
+ w.out().packMapHeader(0);
+ } else {
+ w.out().packMapHeader(properties.size());
+
+ for (Entry<String, Object> entry : properties.entrySet()) {
+ w.out().packString(entry.getKey());
+ w.out().packObjectWithType(entry.getValue());
+ }
+ }
+
+ w.out().packString(statement.defaultSchema());
+ w.out().packInt(statement.pageSize());
+ w.out().packString(statement.query());
+ w.out().packLong(statement.queryTimeout(TimeUnit.MILLISECONDS));
+ w.out().packBoolean(statement.prepared());
+
+ // TODO: Pack statement properties.
+ w.out().packMapHeader(0);
+ }, r -> {
+
+ });
}
/** {@inheritDoc} */