You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by zs...@apache.org on 2023/12/29 06:48:31 UTC
(ignite-3) branch main updated: IGNITE-20661 JDBC. Support multi statement queries by JDBC (#2906)
This is an automated email from the ASF dual-hosted git repository.
zstan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 704b71d999 IGNITE-20661 JDBC. Support multi statement queries by JDBC (#2906)
704b71d999 is described below
commit 704b71d9995bddecda7ea8f4fea2c3eb2ee530ad
Author: Evgeniy Stanilovskiy <st...@gmail.com>
AuthorDate: Fri Dec 29 09:48:24 2023 +0300
IGNITE-20661 JDBC. Support multi statement queries by JDBC (#2906)
---
.../ignite/internal/client/proto/ClientOp.java | 3 +
.../jdbc/proto/JdbcQueryCursorHandler.java | 16 +-
...uest.java => JdbcFetchQueryResultsRequest.java} | 28 +-
.../jdbc/proto/event/JdbcQueryCloseRequest.java | 13 +-
.../jdbc/proto/event/JdbcQueryExecuteRequest.java | 16 +-
.../jdbc/proto/event/JdbcQueryExecuteResult.java | 123 ----
.../jdbc/proto/event/JdbcQueryFetchResult.java | 5 +-
.../jdbc/proto/event/JdbcQuerySingleResult.java | 72 ++-
.../ignite/client/handler/ClientHandlerModule.java | 7 +
.../handler/ClientInboundMessageHandler.java | 4 +
.../client/handler/JdbcQueryCursorHandlerImpl.java | 108 +++-
.../client/handler/JdbcQueryEventHandlerImpl.java | 113 ++--
.../requests/jdbc/ClientJdbcFetchRequest.java | 4 +-
...hRequest.java => ClientJdbcHasMoreRequest.java} | 12 +-
.../handler/requests/jdbc/JdbcQueryCursor.java | 9 +-
.../handler/JdbcQueryCursorHandlerImplTest.java | 132 +++++
.../handler/JdbcQueryEventHandlerImplTest.java | 4 +-
.../apache/ignite/internal/client/ClientUtils.java | 3 +
.../org/apache/ignite/client/RetryPolicyTest.java | 2 +-
modules/jdbc/build.gradle | 1 +
.../apache/ignite/jdbc/AbstractJdbcSelfTest.java | 22 +
.../ignite/jdbc/ItJdbcMultiStatementSelfTest.java | 623 ++++++++++++++++++---
.../ignite/jdbc/ItJdbcStatementSelfTest.java | 54 +-
.../jdbc/JdbcClientQueryCursorHandler.java | 19 +-
.../internal/jdbc/JdbcPreparedStatement.java | 10 +-
.../internal/jdbc/JdbcQueryExecuteResponse.java | 12 +-
.../apache/ignite/internal/jdbc/JdbcResultSet.java | 99 +++-
.../apache/ignite/internal/jdbc/JdbcStatement.java | 164 +++---
.../ignite/internal/jdbc/JdbcResultSetTest.java | 96 ++++
29 files changed, 1351 insertions(+), 423 deletions(-)
diff --git a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientOp.java b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientOp.java
index 0eddf79098..829670ab89 100644
--- a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientOp.java
+++ b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientOp.java
@@ -152,4 +152,7 @@ public class ClientOp {
/** SQL parameter metadata. */
public static final int SQL_PARAM_META = 57;
+
+ /** JDBC get more results command. */
+ public static final int JDBC_MORE_RESULTS = 58;
}
diff --git a/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/JdbcQueryCursorHandler.java b/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/JdbcQueryCursorHandler.java
index 7c08f9b711..69eae806e4 100644
--- a/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/JdbcQueryCursorHandler.java
+++ b/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/JdbcQueryCursorHandler.java
@@ -17,25 +17,35 @@
package org.apache.ignite.internal.jdbc.proto;
+import java.sql.Statement;
import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.jdbc.proto.event.JdbcFetchQueryResultsRequest;
import org.apache.ignite.internal.jdbc.proto.event.JdbcMetaColumnsResult;
import org.apache.ignite.internal.jdbc.proto.event.JdbcQueryCloseRequest;
import org.apache.ignite.internal.jdbc.proto.event.JdbcQueryCloseResult;
-import org.apache.ignite.internal.jdbc.proto.event.JdbcQueryFetchRequest;
import org.apache.ignite.internal.jdbc.proto.event.JdbcQueryFetchResult;
import org.apache.ignite.internal.jdbc.proto.event.JdbcQueryMetadataRequest;
+import org.apache.ignite.internal.jdbc.proto.event.JdbcQuerySingleResult;
/**
* Jdbc QUERY cursor operations handler interface.
*/
public interface JdbcQueryCursorHandler {
/**
- * {@link JdbcQueryFetchRequest} command handler.
+ * {@link JdbcFetchQueryResultsRequest} command handler.
*
* @param req Fetch query request.
* @return Result future.
*/
- CompletableFuture<JdbcQueryFetchResult> fetchAsync(JdbcQueryFetchRequest req);
+ CompletableFuture<JdbcQueryFetchResult> fetchAsync(JdbcFetchQueryResultsRequest req);
+
+ /**
+ * {@link Statement#getMoreResults()} command implementor.
+ *
+ * @param req Results request.
+ * @return Result future.
+ */
+ CompletableFuture<JdbcQuerySingleResult> getMoreResultsAsync(JdbcFetchQueryResultsRequest req);
/**
* {@link JdbcQueryCloseRequest} command handler.
diff --git a/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/event/JdbcQueryFetchRequest.java b/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/event/JdbcFetchQueryResultsRequest.java
similarity index 76%
rename from modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/event/JdbcQueryFetchRequest.java
rename to modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/event/JdbcFetchQueryResultsRequest.java
index c71ec90a6a..c73b557fa5 100644
--- a/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/event/JdbcQueryFetchRequest.java
+++ b/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/event/JdbcFetchQueryResultsRequest.java
@@ -23,30 +23,30 @@ import org.apache.ignite.internal.jdbc.proto.ClientMessage;
import org.apache.ignite.internal.tostring.S;
/**
- * JDBC query fetch request.
+ * JDBC query fetch results request.
*/
-public class JdbcQueryFetchRequest implements ClientMessage {
+public class JdbcFetchQueryResultsRequest implements ClientMessage {
/** Cursor ID. */
private long cursorId;
/** Fetch size. */
- private int pageSize;
+ private int fetchSize;
/**
* Constructor.
*/
- public JdbcQueryFetchRequest() {
+ public JdbcFetchQueryResultsRequest() {
}
/**
* Constructor.
*
* @param cursorId Cursor ID.
- * @param pageSize Fetch size.
+ * @param fetchSize Fetch size.
*/
- public JdbcQueryFetchRequest(long cursorId, int pageSize) {
+ public JdbcFetchQueryResultsRequest(long cursorId, int fetchSize) {
this.cursorId = cursorId;
- this.pageSize = pageSize;
+ this.fetchSize = fetchSize;
}
/**
@@ -59,31 +59,31 @@ public class JdbcQueryFetchRequest implements ClientMessage {
}
/**
- * Get the fetch page size.
+ * Get the fetch size.
*
- * @return Fetch page size.
+ * @return Fetch size.
*/
- public int pageSize() {
- return pageSize;
+ public int fetchSize() {
+ return fetchSize;
}
/** {@inheritDoc} */
@Override
public void writeBinary(ClientMessagePacker packer) {
packer.packLong(cursorId);
- packer.packInt(pageSize);
+ packer.packInt(fetchSize);
}
/** {@inheritDoc} */
@Override
public void readBinary(ClientMessageUnpacker unpacker) {
cursorId = unpacker.unpackLong();
- pageSize = unpacker.unpackInt();
+ fetchSize = unpacker.unpackInt();
}
/** {@inheritDoc} */
@Override
public String toString() {
- return S.toString(JdbcQueryFetchRequest.class, this);
+ return S.toString(JdbcFetchQueryResultsRequest.class, this);
}
}
diff --git a/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/event/JdbcQueryCloseRequest.java b/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/event/JdbcQueryCloseRequest.java
index e695bad296..0c006c2ee5 100644
--- a/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/event/JdbcQueryCloseRequest.java
+++ b/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/event/JdbcQueryCloseRequest.java
@@ -29,6 +29,9 @@ public class JdbcQueryCloseRequest implements ClientMessage {
/** Cursor ID. */
private long cursorId;
+ /** Delete from resources flag. */
+ private boolean removeFromResources;
+
/**
* Default constructor.
*/
@@ -39,9 +42,11 @@ public class JdbcQueryCloseRequest implements ClientMessage {
* Constructor.
*
* @param cursorId Cursor ID.
+ * @param removeFromResources If {@code true} cursor need to be removed from resources.
*/
- public JdbcQueryCloseRequest(long cursorId) {
+ public JdbcQueryCloseRequest(long cursorId, boolean removeFromResources) {
this.cursorId = cursorId;
+ this.removeFromResources = removeFromResources;
}
/**
@@ -53,16 +58,22 @@ public class JdbcQueryCloseRequest implements ClientMessage {
return cursorId;
}
+ public boolean removeFromResources() {
+ return removeFromResources;
+ }
+
/** {@inheritDoc} */
@Override
public void writeBinary(ClientMessagePacker packer) {
packer.packLong(cursorId);
+ packer.packBoolean(removeFromResources);
}
/** {@inheritDoc} */
@Override
public void readBinary(ClientMessageUnpacker unpacker) {
cursorId = unpacker.unpackLong();
+ removeFromResources = unpacker.unpackBoolean();
}
/** {@inheritDoc} */
diff --git a/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/event/JdbcQueryExecuteRequest.java b/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/event/JdbcQueryExecuteRequest.java
index 82ad551d45..de926c0a6c 100644
--- a/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/event/JdbcQueryExecuteRequest.java
+++ b/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/event/JdbcQueryExecuteRequest.java
@@ -49,6 +49,9 @@ public class JdbcQueryExecuteRequest implements ClientMessage {
/** Flag indicating whether auto-commit mode is enabled. */
private boolean autoCommit;
+ /** Multiple statement flag. */
+ private boolean multiStatement;
+
/**
* Default constructor. For deserialization purposes.
*/
@@ -65,9 +68,10 @@ public class JdbcQueryExecuteRequest implements ClientMessage {
* @param sqlQry SQL query.
* @param args Arguments list.
* @param autoCommit Flag indicating whether auto-commit mode is enabled.
+ * @param multiStatement Multiple statement flag.
*/
public JdbcQueryExecuteRequest(JdbcStatementType stmtType, String schemaName,
- int pageSize, int maxRows, String sqlQry, Object[] args, boolean autoCommit) {
+ int pageSize, int maxRows, String sqlQry, Object[] args, boolean autoCommit, boolean multiStatement) {
Objects.requireNonNull(stmtType);
this.autoCommit = autoCommit;
@@ -77,6 +81,7 @@ public class JdbcQueryExecuteRequest implements ClientMessage {
this.maxRows = maxRows;
this.sqlQry = sqlQry;
this.args = args;
+ this.multiStatement = multiStatement;
}
/**
@@ -97,6 +102,13 @@ public class JdbcQueryExecuteRequest implements ClientMessage {
return maxRows;
}
+ /**
+ * Returns multiple statement flag.
+ */
+ public boolean multiStatement() {
+ return multiStatement;
+ }
+
/**
* Returns the sql query.
*
@@ -151,6 +163,7 @@ public class JdbcQueryExecuteRequest implements ClientMessage {
packer.packInt(pageSize);
packer.packInt(maxRows);
packer.packString(sqlQry);
+ packer.packBoolean(multiStatement);
packer.packObjectArrayAsBinaryTuple(args);
}
@@ -164,6 +177,7 @@ public class JdbcQueryExecuteRequest implements ClientMessage {
pageSize = unpacker.unpackInt();
maxRows = unpacker.unpackInt();
sqlQry = unpacker.unpackString();
+ multiStatement = unpacker.unpackBoolean();
args = unpacker.unpackObjectArrayFromBinaryTuple();
}
diff --git a/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/event/JdbcQueryExecuteResult.java b/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/event/JdbcQueryExecuteResult.java
deleted file mode 100644
index 7bb2b10126..0000000000
--- a/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/event/JdbcQueryExecuteResult.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.jdbc.proto.event;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Objects;
-import org.apache.ignite.internal.client.proto.ClientMessagePacker;
-import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
-import org.apache.ignite.internal.tostring.S;
-
-/**
- * JDBC query execute result.
- */
-public class JdbcQueryExecuteResult extends Response {
- /** Query result rows. */
- private List<JdbcQuerySingleResult> results;
-
- /**
- * Constructor. For deserialization purposes only.
- */
- public JdbcQueryExecuteResult() {
- }
-
- /**
- * Constructor.
- *
- * @param status Status code.
- * @param err Error message.
- */
- public JdbcQueryExecuteResult(int status, String err) {
- super(status, err);
- }
-
- /**
- * Constructor.
- *
- * @param results Results.
- */
- public JdbcQueryExecuteResult(List<JdbcQuerySingleResult> results) {
- super();
-
- Objects.requireNonNull(results);
-
- this.results = results;
-
- this.hasResults = true;
- }
-
- /** {@inheritDoc} */
- @Override
- public void writeBinary(ClientMessagePacker packer) {
- super.writeBinary(packer);
-
- if (!hasResults) {
- return;
- }
-
- packer.packInt(results.size());
-
- for (JdbcQuerySingleResult result : results) {
- result.writeBinary(packer);
- }
- }
-
- /** {@inheritDoc} */
- @Override
- public void readBinary(ClientMessageUnpacker unpacker) {
- super.readBinary(unpacker);
-
- if (!hasResults) {
- return;
- }
-
- int size = unpacker.unpackInt();
-
- if (size == 0) {
- results = Collections.emptyList();
-
- return;
- }
-
- results = new ArrayList<>(size);
-
- for (int i = 0; i < size; i++) {
- var res = new JdbcQuerySingleResult();
- res.readBinary(unpacker);
-
- results.add(res);
- }
- }
-
- /**
- * Get the query results.
- *
- * @return Query result rows.
- */
- public List<JdbcQuerySingleResult> results() {
- return results;
- }
-
- /** {@inheritDoc} */
- @Override
- public String toString() {
- return S.toString(JdbcQueryExecuteResult.class, this);
- }
-}
diff --git a/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/event/JdbcQueryFetchResult.java b/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/event/JdbcQueryFetchResult.java
index 462f714c6c..dca2f1e2e1 100644
--- a/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/event/JdbcQueryFetchResult.java
+++ b/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/event/JdbcQueryFetchResult.java
@@ -17,8 +17,9 @@
package org.apache.ignite.internal.jdbc.proto.event;
+import static org.apache.ignite.internal.binarytuple.BinaryTupleParser.ORDER;
+
import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
@@ -119,7 +120,7 @@ public class JdbcQueryFetchResult extends Response {
rowTuples = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
- rowTuples.add(ByteBuffer.wrap(unpacker.readBinary()).order(ByteOrder.LITTLE_ENDIAN));
+ rowTuples.add(ByteBuffer.wrap(unpacker.readBinary()).order(ORDER));
}
}
diff --git a/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/event/JdbcQuerySingleResult.java b/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/event/JdbcQuerySingleResult.java
index 7743daacc0..eae6474c4d 100644
--- a/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/event/JdbcQuerySingleResult.java
+++ b/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/event/JdbcQuerySingleResult.java
@@ -18,14 +18,12 @@
package org.apache.ignite.internal.jdbc.proto.event;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.Objects;
import org.apache.ignite.internal.binarytuple.BinaryTupleReader;
import org.apache.ignite.internal.client.proto.ClientMessagePacker;
import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
import org.apache.ignite.internal.tostring.S;
-import org.apache.ignite.internal.util.ArrayUtils;
import org.apache.ignite.sql.ColumnType;
/**
@@ -41,7 +39,7 @@ public class JdbcQuerySingleResult extends Response {
/** Flag indicating the query has no unfetched results. */
private boolean last;
- /** Flag indicating the query is SELECT query. {@code false} for DML/DDL queries. */
+ /** Flag indicating the query is SELECT/EXPLAIN query. {@code false} for DML/DDL/TX queries. */
private boolean isQuery;
/** Update count. */
@@ -53,10 +51,14 @@ public class JdbcQuerySingleResult extends Response {
/** Decimal scales in appearance order. Can be empty in case no any decimal columns. */
private int[] decimalScales;
+ /** {@code true} if results are available, {@code false} otherwise. */
+ private boolean resultsAvailable;
+
/**
- * Constructor. For deserialization purposes only.
+ * Constructor.
*/
public JdbcQuerySingleResult() {
+ resultsAvailable = false;
}
/**
@@ -67,6 +69,8 @@ public class JdbcQuerySingleResult extends Response {
*/
public JdbcQuerySingleResult(int status, String err) {
super(status, err);
+
+ resultsAvailable = false;
}
/**
@@ -74,6 +78,8 @@ public class JdbcQuerySingleResult extends Response {
*
* @param cursorId Cursor ID.
* @param rowTuples Serialized SQL result rows.
+ * @param columnTypes Ordered list of types of columns in serialized rows.
+ * @param decimalScales Decimal scales in appearance order.
* @param last Flag indicates the query has no unfetched results.
*/
public JdbcQuerySingleResult(long cursorId, List<BinaryTupleReader> rowTuples, List<ColumnType> columnTypes, int[] decimalScales,
@@ -91,6 +97,7 @@ public class JdbcQuerySingleResult extends Response {
this.isQuery = true;
hasResults = true;
+ resultsAvailable = true;
assert decimalScales != null;
}
@@ -100,16 +107,14 @@ public class JdbcQuerySingleResult extends Response {
*
* @param updateCnt Update count for DML queries.
*/
- public JdbcQuerySingleResult(long updateCnt) {
+ public JdbcQuerySingleResult(long cursorId, long updateCnt) {
super();
- this.last = true;
- this.isQuery = false;
this.updateCnt = updateCnt;
- this.rowTuples = Collections.emptyList();
- columnTypes = Collections.emptyList();
- this.decimalScales = ArrayUtils.INT_EMPTY_ARRAY;
- hasResults = true;
+ this.cursorId = cursorId;
+
+ hasResults = false;
+ resultsAvailable = true;
}
/**
@@ -143,7 +148,7 @@ public class JdbcQuerySingleResult extends Response {
* Decimal scales.
*
* @return Decimal scales in appearance order in columns. Can be empty in case no any decimal columns.
- * */
+ */
public int[] decimalScales() {
return decimalScales;
}
@@ -166,6 +171,13 @@ public class JdbcQuerySingleResult extends Response {
return isQuery;
}
+ /** Results availability flag.
+ * If no more results available, returns {@code false}
+ */
+ public boolean resultAvailable() {
+ return resultsAvailable;
+ }
+
/**
* Get the update count.
*
@@ -180,18 +192,22 @@ public class JdbcQuerySingleResult extends Response {
public void writeBinary(ClientMessagePacker packer) {
super.writeBinary(packer);
- if (!hasResults) {
- return;
+ packer.packBoolean(resultsAvailable);
+ if (resultsAvailable) {
+ packer.packLong(updateCnt);
+
+ if (cursorId != null) {
+ packer.packLong(cursorId);
+ } else {
+ packer.packNil();
+ }
}
- if (cursorId != null) {
- packer.packLong(cursorId);
- } else {
- packer.packNil();
+ if (!hasResults) {
+ return;
}
packer.packBoolean(isQuery);
- packer.packLong(updateCnt);
packer.packBoolean(last);
packer.packIntArray(decimalScales);
@@ -212,18 +228,22 @@ public class JdbcQuerySingleResult extends Response {
@Override
public void readBinary(ClientMessageUnpacker unpacker) {
super.readBinary(unpacker);
+ resultsAvailable = unpacker.unpackBoolean();
+ if (resultsAvailable) {
+ updateCnt = unpacker.unpackLong();
+
+ if (unpacker.tryUnpackNil()) {
+ cursorId = null;
+ } else {
+ cursorId = unpacker.unpackLong();
+ }
+ }
if (!hasResults) {
return;
}
- if (unpacker.tryUnpackNil()) {
- cursorId = null;
- } else {
- cursorId = unpacker.unpackLong();
- }
isQuery = unpacker.unpackBoolean();
- updateCnt = unpacker.unpackLong();
last = unpacker.unpackBoolean();
decimalScales = unpacker.unpackIntArray();
@@ -239,7 +259,7 @@ public class JdbcQuerySingleResult extends Response {
rowTuples = new ArrayList<>(size);
for (int rowIdx = 0; rowIdx < size; rowIdx++) {
- rowTuples.add(new BinaryTupleReader(columnTypes.size(), unpacker.readBinary()));
+ rowTuples.add(new BinaryTupleReader(count, unpacker.readBinary()));
}
}
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
index b15a7ae99a..04ccbdc430 100644
--- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
@@ -22,6 +22,7 @@ import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.handler.ssl.SslContext;
@@ -60,6 +61,7 @@ import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.NettyBootstrapFactory;
import org.apache.ignite.sql.IgniteSql;
import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.TestOnly;
/**
* Client handler module maintains TCP endpoint for thin client connections.
@@ -123,6 +125,9 @@ public class ClientHandlerModule implements IgniteComponent {
private final AtomicBoolean stopGuard = new AtomicBoolean();
+ @TestOnly
+ private volatile ChannelHandler handler;
+
/**
* Constructor.
*
@@ -295,6 +300,8 @@ public class ClientHandlerModule implements IgniteComponent {
ClientInboundMessageHandler messageHandler = createInboundMessageHandler(
configuration, clusterId, connectionId);
+ handler = messageHandler;
+
ch.pipeline().addLast(
new ClientMessageDecoder(),
messageHandler
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
index b22114a7ce..7b436b85ff 100644
--- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
@@ -51,6 +51,7 @@ import org.apache.ignite.client.handler.requests.jdbc.ClientJdbcExecuteBatchRequ
import org.apache.ignite.client.handler.requests.jdbc.ClientJdbcExecuteRequest;
import org.apache.ignite.client.handler.requests.jdbc.ClientJdbcFetchRequest;
import org.apache.ignite.client.handler.requests.jdbc.ClientJdbcFinishTxRequest;
+import org.apache.ignite.client.handler.requests.jdbc.ClientJdbcHasMoreRequest;
import org.apache.ignite.client.handler.requests.jdbc.ClientJdbcPreparedStmntBatchRequest;
import org.apache.ignite.client.handler.requests.jdbc.ClientJdbcPrimaryKeyMetadataRequest;
import org.apache.ignite.client.handler.requests.jdbc.ClientJdbcQueryMetadataRequest;
@@ -681,6 +682,9 @@ public class ClientInboundMessageHandler extends ChannelInboundHandlerAdapter im
case ClientOp.JDBC_NEXT:
return ClientJdbcFetchRequest.process(in, out, jdbcQueryCursorHandler);
+ case ClientOp.JDBC_MORE_RESULTS:
+ return ClientJdbcHasMoreRequest.process(in, out, jdbcQueryCursorHandler);
+
case ClientOp.JDBC_CURSOR_CLOSE:
return ClientJdbcCloseRequest.process(in, out, jdbcQueryCursorHandler);
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/JdbcQueryCursorHandlerImpl.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/JdbcQueryCursorHandlerImpl.java
index 5b67f73d40..71f62f8b69 100644
--- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/JdbcQueryCursorHandlerImpl.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/JdbcQueryCursorHandlerImpl.java
@@ -17,26 +17,33 @@
package org.apache.ignite.client.handler;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.apache.ignite.client.handler.JdbcQueryEventHandlerImpl.buildSingleRequest;
+import static org.apache.ignite.internal.jdbc.proto.IgniteQueryErrorCode.UNSUPPORTED_OPERATION;
+
import java.io.PrintWriter;
import java.io.StringWriter;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.ignite.internal.jdbc.JdbcConverterUtils;
import org.apache.ignite.internal.jdbc.proto.JdbcQueryCursorHandler;
import org.apache.ignite.internal.jdbc.proto.event.JdbcColumnMeta;
+import org.apache.ignite.internal.jdbc.proto.event.JdbcFetchQueryResultsRequest;
import org.apache.ignite.internal.jdbc.proto.event.JdbcMetaColumnsResult;
import org.apache.ignite.internal.jdbc.proto.event.JdbcQueryCloseRequest;
import org.apache.ignite.internal.jdbc.proto.event.JdbcQueryCloseResult;
-import org.apache.ignite.internal.jdbc.proto.event.JdbcQueryFetchRequest;
import org.apache.ignite.internal.jdbc.proto.event.JdbcQueryFetchResult;
import org.apache.ignite.internal.jdbc.proto.event.JdbcQueryMetadataRequest;
+import org.apache.ignite.internal.jdbc.proto.event.JdbcQuerySingleResult;
import org.apache.ignite.internal.jdbc.proto.event.Response;
import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
import org.apache.ignite.internal.sql.engine.AsyncSqlCursor;
import org.apache.ignite.internal.sql.engine.InternalSqlRow;
+import org.apache.ignite.internal.sql.engine.SqlQueryType;
import org.apache.ignite.sql.ColumnMetadata;
import org.apache.ignite.sql.ResultSetMetadata;
@@ -52,13 +59,13 @@ public class JdbcQueryCursorHandlerImpl implements JdbcQueryCursorHandler {
*
* @param resources Client resources.
*/
- public JdbcQueryCursorHandlerImpl(ClientResourceRegistry resources) {
+ JdbcQueryCursorHandlerImpl(ClientResourceRegistry resources) {
this.resources = resources;
}
/** {@inheritDoc} */
@Override
- public CompletableFuture<JdbcQueryFetchResult> fetchAsync(JdbcQueryFetchRequest req) {
+ public CompletableFuture<JdbcQueryFetchResult> fetchAsync(JdbcFetchQueryResultsRequest req) {
AsyncSqlCursor<InternalSqlRow> asyncSqlCursor;
try {
asyncSqlCursor = resources.get(req.cursorId()).get(AsyncSqlCursor.class);
@@ -69,12 +76,12 @@ public class JdbcQueryCursorHandlerImpl implements JdbcQueryCursorHandler {
"Failed to find query cursor [curId=" + req.cursorId() + "]. Error message:" + sw));
}
- if (req.pageSize() <= 0) {
+ if (req.fetchSize() <= 0) {
return CompletableFuture.completedFuture(new JdbcQueryFetchResult(Response.STATUS_FAILED,
- "Invalid fetch size [fetchSize=" + req.pageSize() + ']'));
+ "Invalid fetch size [fetchSize=" + req.fetchSize() + ']'));
}
- return asyncSqlCursor.requestNextAsync(req.pageSize()).handle((batch, t) -> {
+ return asyncSqlCursor.requestNextAsync(req.fetchSize()).handle((batch, t) -> {
if (t != null) {
StringWriter sw = getWriterWithStackTrace(t);
@@ -91,12 +98,97 @@ public class JdbcQueryCursorHandlerImpl implements JdbcQueryCursorHandler {
}).toCompletableFuture();
}
+ /** {@inheritDoc} */
+ @Override
+ public CompletableFuture<JdbcQuerySingleResult> getMoreResultsAsync(JdbcFetchQueryResultsRequest req) {
+ AsyncSqlCursor<InternalSqlRow> asyncSqlCursor;
+ try {
+ asyncSqlCursor = resources.get(req.cursorId()).get(AsyncSqlCursor.class);
+ } catch (IgniteInternalCheckedException e) {
+ StringWriter sw = getWriterWithStackTrace(e);
+
+ return CompletableFuture.completedFuture(new JdbcQuerySingleResult(Response.STATUS_FAILED,
+ "Failed to find query cursor [curId=" + req.cursorId() + "]. Error message:" + sw));
+ }
+
+ if (!asyncSqlCursor.hasNextResult()) {
+ return CompletableFuture.completedFuture(new JdbcQuerySingleResult());
+ }
+
+ return asyncSqlCursor.closeAsync().thenCompose(c -> asyncSqlCursor.nextResult())
+ .thenCompose(cur -> cur.requestNextAsync(req.fetchSize())
+ .thenApply(batch -> {
+ try {
+ SqlQueryType queryType = cur.queryType();
+
+ long cursorId = resources.put(new ClientResource(cur, cur::closeAsync));
+
+ switch (queryType) {
+ case EXPLAIN:
+ case QUERY: {
+ List<ColumnMetadata> columns = cur.metadata().columns();
+
+ return buildSingleRequest(batch, columns, cursorId, !batch.hasMore());
+ }
+ case DML: {
+ long updCount = (long) batch.items().get(0).get(0);
+
+ return new JdbcQuerySingleResult(cursorId, updCount);
+ }
+ case DDL:
+ case TX_CONTROL:
+ return new JdbcQuerySingleResult(cursorId, 0);
+ default:
+ return new JdbcQuerySingleResult(UNSUPPORTED_OPERATION,
+ "Query type is not supported yet [queryType=" + cur.queryType() + ']');
+ }
+ } catch (IgniteInternalCheckedException e) {
+ return new JdbcQuerySingleResult(Response.STATUS_FAILED,
+ "Unable to store query cursor.");
+ }
+ })
+ ).handle((res, t) -> {
+ if (t != null) {
+ iterateThroughResultsAndCloseThem(asyncSqlCursor);
+
+ StringWriter sw = getWriterWithStackTrace(t);
+
+ return new JdbcQuerySingleResult(Response.STATUS_FAILED,
+ "Failed to fetch query results [curId=" + req.cursorId() + "]. Error message:" + sw);
+ }
+
+ return res;
+ });
+ }
+
+ static void iterateThroughResultsAndCloseThem(AsyncSqlCursor<InternalSqlRow> cursor) {
+ Function<AsyncSqlCursor<InternalSqlRow>, CompletableFuture<AsyncSqlCursor<InternalSqlRow>>> traverser = new Function<>() {
+ @Override
+ public CompletableFuture<AsyncSqlCursor<InternalSqlRow>> apply(AsyncSqlCursor<InternalSqlRow> cur) {
+ return cur.closeAsync()
+ .thenCompose(none -> {
+ if (cur.hasNextResult()) {
+ return cur.nextResult().thenCompose(this);
+ } else {
+ return completedFuture(cur);
+ }
+ });
+ }
+ };
+
+ completedFuture(cursor).thenCompose(traverser);
+ }
+
/** {@inheritDoc} */
@Override
public CompletableFuture<JdbcQueryCloseResult> closeAsync(JdbcQueryCloseRequest req) {
AsyncSqlCursor<List<Object>> asyncSqlCursor;
try {
- asyncSqlCursor = resources.remove(req.cursorId()).get(AsyncSqlCursor.class);
+ if (req.removeFromResources()) {
+ asyncSqlCursor = resources.remove(req.cursorId()).get(AsyncSqlCursor.class);
+ } else {
+ asyncSqlCursor = resources.get(req.cursorId()).get(AsyncSqlCursor.class);
+ }
} catch (IgniteInternalCheckedException e) {
StringWriter sw = getWriterWithStackTrace(e);
@@ -119,7 +211,7 @@ public class JdbcQueryCursorHandlerImpl implements JdbcQueryCursorHandler {
/** {@inheritDoc} */
@Override
public CompletableFuture<JdbcMetaColumnsResult> queryMetadataAsync(JdbcQueryMetadataRequest req) {
- AsyncSqlCursor<List<Object>> asyncSqlCursor = null;
+ AsyncSqlCursor<List<Object>> asyncSqlCursor;
try {
asyncSqlCursor = resources.get(req.cursorId()).get(AsyncSqlCursor.class);
} catch (IgniteInternalCheckedException e) {
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/JdbcQueryEventHandlerImpl.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/JdbcQueryEventHandlerImpl.java
index d82396e8bd..5f74fe6819 100644
--- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/JdbcQueryEventHandlerImpl.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/JdbcQueryEventHandlerImpl.java
@@ -19,6 +19,7 @@ package org.apache.ignite.client.handler;
import static org.apache.ignite.internal.jdbc.proto.IgniteQueryErrorCode.UNKNOWN;
import static org.apache.ignite.internal.jdbc.proto.IgniteQueryErrorCode.UNSUPPORTED_OPERATION;
+import static org.apache.ignite.internal.sql.engine.SqlQueryType.DML;
import static org.apache.ignite.internal.util.ArrayUtils.OBJECT_EMPTY_ARRAY;
import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.apache.ignite.lang.ErrorGroups.Client.CONNECTION_ERR;
@@ -51,7 +52,6 @@ import org.apache.ignite.internal.jdbc.proto.event.JdbcMetaSchemasResult;
import org.apache.ignite.internal.jdbc.proto.event.JdbcMetaTablesRequest;
import org.apache.ignite.internal.jdbc.proto.event.JdbcMetaTablesResult;
import org.apache.ignite.internal.jdbc.proto.event.JdbcQueryExecuteRequest;
-import org.apache.ignite.internal.jdbc.proto.event.JdbcQueryExecuteResult;
import org.apache.ignite.internal.jdbc.proto.event.JdbcQuerySingleResult;
import org.apache.ignite.internal.jdbc.proto.event.Response;
import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
@@ -66,6 +66,7 @@ import org.apache.ignite.internal.sql.engine.SqlQueryType;
import org.apache.ignite.internal.sql.engine.property.SqlProperties;
import org.apache.ignite.internal.sql.engine.property.SqlPropertiesHelper;
import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.internal.util.AsyncCursor.BatchedResult;
import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.sql.ColumnMetadata;
import org.apache.ignite.sql.ColumnType;
@@ -84,7 +85,7 @@ public class JdbcQueryEventHandlerImpl implements JdbcQueryEventHandler {
private static final Set<SqlQueryType> SELECT_STATEMENT_QUERIES = Set.of(SqlQueryType.QUERY, SqlQueryType.EXPLAIN);
/** {@link SqlQueryType}s allowed in JDBC update statements. **/
- private static final Set<SqlQueryType> UPDATE_STATEMENT_QUERIES = Set.of(SqlQueryType.DML, SqlQueryType.DDL);
+ private static final Set<SqlQueryType> UPDATE_STATEMENT_QUERIES = Set.of(DML, SqlQueryType.DDL);
/** Sql query processor. */
private final QueryProcessor processor;
@@ -143,7 +144,7 @@ public class JdbcQueryEventHandlerImpl implements JdbcQueryEventHandler {
@Override
public CompletableFuture<? extends Response> queryAsync(long connectionId, JdbcQueryExecuteRequest req) {
if (req.pageSize() <= 0) {
- return CompletableFuture.completedFuture(new JdbcQueryExecuteResult(Response.STATUS_FAILED,
+ return CompletableFuture.completedFuture(new JdbcQuerySingleResult(Response.STATUS_FAILED,
"Invalid fetch size [fetchSize=" + req.pageSize() + ']'));
}
@@ -151,29 +152,40 @@ public class JdbcQueryEventHandlerImpl implements JdbcQueryEventHandler {
try {
connectionContext = resources.get(connectionId).get(JdbcConnectionContext.class);
} catch (IgniteInternalCheckedException exception) {
- return CompletableFuture.completedFuture(new JdbcQueryExecuteResult(Response.STATUS_FAILED,
+ return CompletableFuture.completedFuture(new JdbcQuerySingleResult(Response.STATUS_FAILED,
"Connection is broken"));
}
InternalTransaction tx = req.autoCommit() ? null : connectionContext.getOrStartTransaction();
SqlProperties properties = createProperties(req.getStmtType());
- CompletableFuture<AsyncSqlCursor<InternalSqlRow>> result = processor.querySingleAsync(
- properties,
- igniteTransactions,
- tx,
- req.sqlQuery(),
- req.arguments() == null ? OBJECT_EMPTY_ARRAY : req.arguments()
- );
+ CompletableFuture<AsyncSqlCursor<InternalSqlRow>> result;
+
+ if (req.multiStatement()) {
+ result = processor.queryScriptAsync(
+ properties,
+ igniteTransactions,
+ tx,
+ req.sqlQuery(),
+ req.arguments() == null ? OBJECT_EMPTY_ARRAY : req.arguments()
+ );
+ } else {
+ result = processor.querySingleAsync(
+ properties,
+ igniteTransactions,
+ tx,
+ req.sqlQuery(),
+ req.arguments() == null ? OBJECT_EMPTY_ARRAY : req.arguments()
+ );
+ }
return result.thenCompose(cursor -> createJdbcResult(new JdbcQueryCursor<>(req.maxRows(), cursor), req))
- .thenApply(jdbcResult -> new JdbcQueryExecuteResult(List.of(jdbcResult)))
.exceptionally(t -> {
LOG.info("Exception while executing query [query=" + req.sqlQuery() + "]", ExceptionUtils.unwrapCause(t));
String msg = getErrorMessage(t);
- return new JdbcQueryExecuteResult(Response.STATUS_FAILED, msg);
+ return new JdbcQuerySingleResult(Response.STATUS_FAILED, msg);
});
}
@@ -369,49 +381,36 @@ public class JdbcQueryEventHandlerImpl implements JdbcQueryEventHandler {
return cur.requestNextAsync(req.pageSize()).thenApply(batch -> {
boolean hasNext = batch.hasMore();
+ long cursorId;
+ try {
+ cursorId = resources.put(new ClientResource(cur, cur::closeAsync));
+ } catch (IgniteInternalCheckedException e) {
+ cur.closeAsync();
+
+ return new JdbcQuerySingleResult(Response.STATUS_FAILED,
+ "Unable to store query cursor.");
+ }
+
switch (cur.queryType()) {
case EXPLAIN:
case QUERY: {
- long cursorId;
- try {
- cursorId = resources.put(new ClientResource(cur, cur::closeAsync));
- } catch (IgniteInternalCheckedException e) {
- cur.closeAsync();
-
- return new JdbcQuerySingleResult(Response.STATUS_FAILED,
- "Unable to store query cursor.");
- }
-
- List<BinaryTupleReader> rows = new ArrayList<>(batch.items().size());
- for (InternalSqlRow item : batch.items()) {
- rows.add(item.asBinaryTuple());
- }
-
List<ColumnMetadata> columns = cur.metadata().columns();
- int[] decimalScales = new int[columns.size()];
- List<ColumnType> schema = new ArrayList<>(columns.size());
-
- int countOfDecimal = 0;
- for (ColumnMetadata column : columns) {
- schema.add(column.type());
- if (column.type() == ColumnType.DECIMAL) {
- decimalScales[countOfDecimal++] = column.scale();
- }
- }
- decimalScales = Arrays.copyOf(decimalScales, countOfDecimal);
-
- return new JdbcQuerySingleResult(cursorId, rows, schema, decimalScales, !hasNext);
+ return buildSingleRequest(batch, columns, cursorId, !hasNext);
}
- case DML:
+ case DML: {
if (!validateDmlResult(cur.metadata(), hasNext)) {
return new JdbcQuerySingleResult(Response.STATUS_FAILED,
"Unexpected result for DML [query=" + req.sqlQuery() + ']');
}
- return new JdbcQuerySingleResult((Long) batch.items().get(0).get(0));
+ long updCount = (long) batch.items().get(0).get(0);
+
+ return new JdbcQuerySingleResult(cursorId, updCount);
+ }
case DDL:
- return new JdbcQuerySingleResult(0);
+ case TX_CONTROL:
+ return new JdbcQuerySingleResult(cursorId, 0);
default:
return new JdbcQuerySingleResult(UNSUPPORTED_OPERATION,
"Query type is not supported yet [queryType=" + cur.queryType() + ']');
@@ -419,6 +418,32 @@ public class JdbcQueryEventHandlerImpl implements JdbcQueryEventHandler {
});
}
+ static JdbcQuerySingleResult buildSingleRequest(
+ BatchedResult<InternalSqlRow> batch,
+ List<ColumnMetadata> columns,
+ long cursorId,
+ boolean hasNext
+ ) {
+ List<BinaryTupleReader> rows = new ArrayList<>(batch.items().size());
+ for (InternalSqlRow item : batch.items()) {
+ rows.add(item.asBinaryTuple());
+ }
+
+ int[] decimalScales = new int[columns.size()];
+ List<ColumnType> schema = new ArrayList<>(columns.size());
+
+ int countOfDecimal = 0;
+ for (ColumnMetadata column : columns) {
+ schema.add(column.type());
+ if (column.type() == ColumnType.DECIMAL) {
+ decimalScales[countOfDecimal++] = column.scale();
+ }
+ }
+ decimalScales = Arrays.copyOf(decimalScales, countOfDecimal);
+
+ return new JdbcQuerySingleResult(cursorId, rows, schema, decimalScales, hasNext);
+ }
+
/**
* Validate dml result. Check if it stores only one value of Long type.
*
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/jdbc/ClientJdbcFetchRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/jdbc/ClientJdbcFetchRequest.java
index becdf20287..aa33f9fe71 100644
--- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/jdbc/ClientJdbcFetchRequest.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/jdbc/ClientJdbcFetchRequest.java
@@ -21,7 +21,7 @@ import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.client.proto.ClientMessagePacker;
import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
import org.apache.ignite.internal.jdbc.proto.JdbcQueryCursorHandler;
-import org.apache.ignite.internal.jdbc.proto.event.JdbcQueryFetchRequest;
+import org.apache.ignite.internal.jdbc.proto.event.JdbcFetchQueryResultsRequest;
/**
* Client jdbc request handler.
@@ -40,7 +40,7 @@ public class ClientJdbcFetchRequest {
ClientMessagePacker out,
JdbcQueryCursorHandler handler
) {
- var req = new JdbcQueryFetchRequest();
+ var req = new JdbcFetchQueryResultsRequest();
req.readBinary(in);
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/jdbc/ClientJdbcFetchRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/jdbc/ClientJdbcHasMoreRequest.java
similarity index 80%
copy from modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/jdbc/ClientJdbcFetchRequest.java
copy to modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/jdbc/ClientJdbcHasMoreRequest.java
index becdf20287..f3af35a033 100644
--- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/jdbc/ClientJdbcFetchRequest.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/jdbc/ClientJdbcHasMoreRequest.java
@@ -21,14 +21,14 @@ import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.client.proto.ClientMessagePacker;
import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
import org.apache.ignite.internal.jdbc.proto.JdbcQueryCursorHandler;
-import org.apache.ignite.internal.jdbc.proto.event.JdbcQueryFetchRequest;
+import org.apache.ignite.internal.jdbc.proto.event.JdbcFetchQueryResultsRequest;
/**
- * Client jdbc request handler.
+ * Client jdbc more results request.
*/
-public class ClientJdbcFetchRequest {
+public class ClientJdbcHasMoreRequest {
/**
- * Processes remote {@code JdbcQueryFetchRequest}.
+ * Processes remote {@code JdbcGetMoreResultsRequest}.
*
* @param in Client message unpacker.
* @param out Client message packer.
@@ -40,10 +40,10 @@ public class ClientJdbcFetchRequest {
ClientMessagePacker out,
JdbcQueryCursorHandler handler
) {
- var req = new JdbcQueryFetchRequest();
+ var req = new JdbcFetchQueryResultsRequest();
req.readBinary(in);
- return handler.fetchAsync(req).thenAccept(res -> res.writeBinary(out));
+ return handler.getMoreResultsAsync(req).thenAccept(res -> res.writeBinary(out));
}
}
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/jdbc/JdbcQueryCursor.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/jdbc/JdbcQueryCursor.java
index cb26891c43..2356200da9 100644
--- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/jdbc/JdbcQueryCursor.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/jdbc/JdbcQueryCursor.java
@@ -56,6 +56,9 @@ public class JdbcQueryCursor<T> implements AsyncSqlCursor<T> {
@Override
public CompletableFuture<BatchedResult<T>> requestNextAsync(int rows) {
long fetched0 = fetched.addAndGet(rows);
+
+ assert cur != null : "non initialized cursor";
+
return cur.requestNextAsync(rows).thenApply(batch -> {
if (maxRows == 0 || fetched0 < maxRows) {
return batch;
@@ -104,8 +107,7 @@ public class JdbcQueryCursor<T> implements AsyncSqlCursor<T> {
/** {@inheritDoc} */
@Override
public boolean hasNextResult() {
- // TODO https://issues.apache.org/jira/browse/IGNITE-20661
- return false;
+ return cur.hasNextResult();
}
/** {@inheritDoc} */
@@ -115,7 +117,6 @@ public class JdbcQueryCursor<T> implements AsyncSqlCursor<T> {
throw new NoSuchElementException("Query has no more results");
}
- // TODO https://issues.apache.org/jira/browse/IGNITE-20661
- return null;
+ return cur.nextResult();
}
}
diff --git a/modules/client-handler/src/test/java/org/apache/ignite/client/handler/JdbcQueryCursorHandlerImplTest.java b/modules/client-handler/src/test/java/org/apache/ignite/client/handler/JdbcQueryCursorHandlerImplTest.java
new file mode 100644
index 0000000000..96c225fb68
--- /dev/null
+++ b/modules/client-handler/src/test/java/org/apache/ignite/client/handler/JdbcQueryCursorHandlerImplTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client.handler;
+
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
+import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.apache.ignite.lang.ErrorGroups.Catalog.VALIDATION_ERR;
+import static org.apache.ignite.lang.ErrorGroups.Sql.STMT_PARSE_ERR;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.jdbc.proto.JdbcQueryCursorHandler;
+import org.apache.ignite.internal.jdbc.proto.event.JdbcFetchQueryResultsRequest;
+import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
+import org.apache.ignite.internal.lang.IgniteInternalException;
+import org.apache.ignite.internal.sql.engine.AsyncSqlCursor;
+import org.apache.ignite.internal.sql.engine.AsyncSqlCursorImpl;
+import org.apache.ignite.internal.sql.engine.InternalSqlRow;
+import org.apache.ignite.internal.sql.engine.SqlQueryType;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.sql.ResultSetMetadata;
+import org.apache.ignite.sql.SqlException;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.stubbing.Answer;
+
+/**
+ * Test to verify {@link JdbcQueryCursorHandlerImpl}.
+ */
+@ExtendWith(MockitoExtension.class)
+public class JdbcQueryCursorHandlerImplTest extends BaseIgniteAbstractTest {
+
+ @ParameterizedTest(name = "throw exception from nextResult = {0}")
+ @ValueSource(booleans = {true, false})
+ void testGetMoreResultsProcessExceptions(boolean nextResultThrow) throws IgniteInternalCheckedException {
+ ClientResourceRegistry resourceRegistryMocked = mock(ClientResourceRegistry.class);
+ ClientResource rsrc = mock(ClientResource.class);
+
+ JdbcQueryCursorHandler cursorHandler = new JdbcQueryCursorHandlerImpl(resourceRegistryMocked);
+
+ when(resourceRegistryMocked.get(anyLong())).thenAnswer(new Answer<ClientResource>() {
+ @Override
+ public ClientResource answer(InvocationOnMock invocation) {
+ return rsrc;
+ }
+ });
+
+ when(rsrc.get(AsyncSqlCursor.class)).thenAnswer(new Answer<AsyncSqlCursor<InternalSqlRow>>() {
+ @Override
+ public AsyncSqlCursor<InternalSqlRow> answer(InvocationOnMock invocation) {
+ return new AsyncSqlCursor<>() {
+ @Override
+ public SqlQueryType queryType() {
+ throw new UnsupportedOperationException("queryType");
+ }
+
+ @Override
+ public ResultSetMetadata metadata() {
+ throw new UnsupportedOperationException("metadata");
+ }
+
+ @Override
+ public boolean hasNextResult() {
+ return true;
+ }
+
+ @Override
+ public CompletableFuture<Void> onClose() {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<Void> onFirstPageReady() {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<AsyncSqlCursor<InternalSqlRow>> nextResult() {
+ if (nextResultThrow) {
+ throw new SqlException(STMT_PARSE_ERR, new Exception("nextResult exception"));
+ } else {
+ AsyncSqlCursorImpl<InternalSqlRow> sqlCursor = mock(AsyncSqlCursorImpl.class);
+
+ lenient().when(sqlCursor.requestNextAsync(anyInt()))
+ .thenAnswer((Answer<BatchedResult<InternalSqlRow>>) invocation -> {
+ throw new IgniteInternalException(VALIDATION_ERR, "requestNextAsync error");
+ }
+ );
+
+ return CompletableFuture.completedFuture(sqlCursor);
+ }
+ }
+
+ @Override
+ public CompletableFuture<BatchedResult<InternalSqlRow>> requestNextAsync(int rows) {
+ return nullCompletedFuture();
+ }
+
+ @Override
+ public CompletableFuture<Void> closeAsync() {
+ return nullCompletedFuture();
+ }
+ };
+ }
+ });
+
+ await(cursorHandler.getMoreResultsAsync(new JdbcFetchQueryResultsRequest(1, 100)), 5, TimeUnit.SECONDS);
+ }
+}
diff --git a/modules/client-handler/src/test/java/org/apache/ignite/client/handler/JdbcQueryEventHandlerImplTest.java b/modules/client-handler/src/test/java/org/apache/ignite/client/handler/JdbcQueryEventHandlerImplTest.java
index bb5113e613..d2fdf603c1 100644
--- a/modules/client-handler/src/test/java/org/apache/ignite/client/handler/JdbcQueryEventHandlerImplTest.java
+++ b/modules/client-handler/src/test/java/org/apache/ignite/client/handler/JdbcQueryEventHandlerImplTest.java
@@ -175,7 +175,7 @@ class JdbcQueryEventHandlerImplTest extends BaseIgniteAbstractTest {
String schema = "schema";
JdbcStatementType type = JdbcStatementType.SELECT_STATEMENT_TYPE;
- await(eventHandler.queryAsync(connectionId, new JdbcQueryExecuteRequest(type, schema, 1024, 1, "SELECT 1", null, false)));
+ await(eventHandler.queryAsync(connectionId, new JdbcQueryExecuteRequest(type, schema, 1024, 1, "SELECT 1", null, false, false)));
verify(igniteTransactions, times(1)).begin();
await(eventHandler.batchAsync(connectionId, new JdbcBatchExecuteRequest("schema", List.of("UPDATE 1", "UPDATE 2"), false)));
verify(igniteTransactions, times(1)).begin();
@@ -185,7 +185,7 @@ class JdbcQueryEventHandlerImplTest extends BaseIgniteAbstractTest {
await(eventHandler.batchAsync(connectionId, new JdbcBatchExecuteRequest("schema", List.of("UPDATE 1", "UPDATE 2"), false)));
verify(igniteTransactions, times(2)).begin();
- await(eventHandler.queryAsync(connectionId, new JdbcQueryExecuteRequest(type, schema, 1024, 1, "SELECT 2", null, false)));
+ await(eventHandler.queryAsync(connectionId, new JdbcQueryExecuteRequest(type, schema, 1024, 1, "SELECT 2", null, false, false)));
verify(igniteTransactions, times(2)).begin();
await(eventHandler.batchAsync(connectionId, new JdbcBatchExecuteRequest("schema", List.of("UPDATE 3", "UPDATE 4"), false)));
verify(igniteTransactions, times(2)).begin();
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientUtils.java b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientUtils.java
index 8e4ae595c8..88a8d9924a 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientUtils.java
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientUtils.java
@@ -179,6 +179,9 @@ public class ClientUtils {
case ClientOp.JDBC_NEXT:
return null;
+ case ClientOp.JDBC_MORE_RESULTS:
+ return null;
+
case ClientOp.JDBC_EXEC_BATCH:
return null;
diff --git a/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java b/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java
index e7a7914132..7b1d3188f4 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java
@@ -217,7 +217,7 @@ public class RetryPolicyTest extends BaseIgniteAbstractTest {
}
}
- long expectedNullCount = 21;
+ long expectedNullCount = 22;
String msg = nullOpFields.size()
+ " operation codes do not have public equivalent. When adding new codes, update ClientOperationType too. Missing ops: "
diff --git a/modules/jdbc/build.gradle b/modules/jdbc/build.gradle
index 41c4e4d68b..e6d1aac97f 100644
--- a/modules/jdbc/build.gradle
+++ b/modules/jdbc/build.gradle
@@ -37,6 +37,7 @@ dependencies {
testImplementation testFixtures(project(":ignite-core"))
testImplementation libs.mockito.core
+ testImplementation libs.mockito.junit
testImplementation libs.hamcrest.core
integrationTestImplementation testFixtures(project(":ignite-api"))
diff --git a/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/AbstractJdbcSelfTest.java b/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/AbstractJdbcSelfTest.java
index 8be4cbe169..f5e19c52a7 100644
--- a/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/AbstractJdbcSelfTest.java
+++ b/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/AbstractJdbcSelfTest.java
@@ -29,11 +29,16 @@ import java.sql.SQLFeatureNotSupportedException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgnitionManager;
import org.apache.ignite.InitParameters;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.sql.engine.SqlQueryProcessor;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
import org.apache.ignite.internal.testframework.TestIgnitionManager;
import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
@@ -168,4 +173,21 @@ public class AbstractJdbcSelfTest extends BaseIgniteAbstractTest {
protected void checkNotSupported(Executable ex) {
assertThrows(SQLFeatureNotSupportedException.class, ex);
}
+
+ /** Return a size of stored resources. Reflection based implementation, need to be refactored. */
+ int openResources() {
+ IgniteImpl ignite = (IgniteImpl) clusterNodes.get(0);
+ IgniteComponent cliHnd = IgniteTestUtils.getFieldValue(ignite, "clientHandlerModule");
+ Object clientInboundHandler = IgniteTestUtils.getFieldValue(cliHnd, "handler");
+ Object rsrc = IgniteTestUtils.getFieldValue(clientInboundHandler, "resources");
+ Map resources = IgniteTestUtils.getFieldValue(rsrc, "res");
+ return resources.size();
+ }
+
+ /** Returns a size of opened cursors. */
+ int openCursors() {
+ IgniteImpl ignite = (IgniteImpl) clusterNodes.get(0);
+ SqlQueryProcessor queryProcessor = (SqlQueryProcessor) ignite.queryEngine();
+ return queryProcessor.openedCursors();
+ }
}
diff --git a/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/ItJdbcMultiStatementSelfTest.java b/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/ItJdbcMultiStatementSelfTest.java
index 53f03b5bcf..37ac75c087 100644
--- a/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/ItJdbcMultiStatementSelfTest.java
+++ b/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/ItJdbcMultiStatementSelfTest.java
@@ -17,21 +17,29 @@
package org.apache.ignite.jdbc;
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.jdbc.util.JdbcTestUtils.assertThrowsSqlException;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
/**
- * Tests for ddl queries that contain multiply sql statements, separated by ";".
+ * Tests for queries containing multiple sql statements, separated by ";".
*/
-@Disabled("https://issues.apache.org/jira/browse/IGNITE-16204")
public class ItJdbcMultiStatementSelfTest extends AbstractJdbcSelfTest {
/**
* Setup tables.
@@ -52,48 +60,510 @@ public class ItJdbcMultiStatementSelfTest extends AbstractJdbcSelfTest {
+ "(4, 19, 'Nick');");
}
- /**
- * Execute sql script using thin driver.
- */
- private void execute(String sql) throws Exception {
- stmt.executeUpdate(sql);
+ @AfterEach
+ void tearDown() throws Exception {
+ int openCursorResources = openResources();
+ // connection + not closed result set
+ assertTrue(openResources() <= 2, "Open cursors: " + openCursorResources);
+
+ stmt.close();
+
+ openCursorResources = openResources();
+ // only connection context or 0 if already closed.
+ assertTrue(openResources() <= 1, "Open cursors: " + openCursorResources);
+ assertTrue(waitForCondition(() -> openCursors() == 0, 5_000));
+ }
+
+ @Test
+ public void testAllStatementsAppliedIfExecutedWithFailure() throws Exception {
+ stmt.execute("SELECT COUNT(*) FROM TEST_TX");
+ try (ResultSet rs = stmt.getResultSet()) {
+ assertTrue(rs.next());
+ assertEquals(4, rs.getInt(1));
+ }
+
+ // pk violation exception
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-21133
+ stmt.execute("START TRANSACTION; INSERT INTO TEST_TX VALUES (1, 1, '1'); COMMIT");
+ assertThrowsSqlException("Failed to fetch query results", () -> stmt.execute("SELECT COUNT(*) FROM TEST_TX"));
+ stmt.execute("SELECT COUNT(*) FROM TEST_TX");
+ try (ResultSet rs = stmt.getResultSet()) {
+ assertTrue(rs.next());
+ assertEquals(4, rs.getInt(1));
+ }
+ }
+
+ @Test
+ public void testAllStatementsAppliedIfExecutedWithoutFailure() throws Exception {
+ // no pk violation
+ stmt.execute("START TRANSACTION; INSERT INTO TEST_TX VALUES (5, 5, '5'); COMMIT");
+ stmt.execute("SELECT COUNT(*) FROM TEST_TX");
+ try (ResultSet rs = stmt.getResultSet()) {
+ assertTrue(rs.next());
+ assertEquals(5, rs.getInt(1));
+ }
+ }
+
+ @Test
+ public void testEmptyResults() throws Exception {
+ boolean res = stmt.execute("SELECT 1; SELECT 1 FROM table(system_range(1, 0))");
+ assertTrue(res);
+ assertEquals(-1, stmt.getUpdateCount());
+ assertTrue(stmt.getMoreResults());
+ assertFalse(stmt.getMoreResults());
+ }
+
+ @Test
+ public void testSimpleQueryExecute() throws Exception {
+ boolean res = stmt.execute("INSERT INTO TEST_TX VALUES (5, 5, '5');");
+ assertFalse(res);
+ assertNull(stmt.getResultSet());
+ assertFalse(stmt.getMoreResults());
+ assertNull(stmt.getResultSet());
+ assertEquals(-1, stmt.getUpdateCount());
+
+ stmt.execute("INSERT INTO TEST_TX VALUES (6, 5, '5');");
+ assertEquals(1, stmt.getUpdateCount());
+ assertTrue(checkNoMoreResults());
+
+ // empty result
+ res = stmt.execute("SELECT ID FROM TEST_TX WHERE ID=1000;");
+ assertTrue(res);
+ assertNotNull(stmt.getResultSet());
+ }
+
+ @Test
+ public void testSimpleQueryError() throws Exception {
+ boolean res = stmt.execute("SELECT 1; SELECT 1/0; SELECT 2");
+ assertTrue(res);
+ assertThrowsSqlException("Failed to fetch query results", () -> stmt.getMoreResults());
+ //next after exception
+ assertFalse(stmt.getMoreResults());
+
+ stmt.closeOnCompletion();
+ }
+
+ @Test
+ public void testSimpleQueryErrorCloseRs() throws Exception {
+ stmt.execute("SELECT 1; SELECT 1/0; SELECT 2");
+ ResultSet rs = stmt.getResultSet();
+ assertThrowsSqlException("Failed to fetch query results", () -> stmt.getMoreResults());
+ stmt.closeOnCompletion();
+
+ rs.close();
+ }
+
+ @ParameterizedTest(name = "closeOnCompletion = {0}")
+ @ValueSource(booleans = {true, false})
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-21129")
+ public void testCloseOnCompletionFirstRsClosed(boolean closeOnCompletion) throws Exception {
+ stmt.execute("SELECT 1; DROP TABLE IF EXISTS TEST_TX; SELECT 1; ");
+ ResultSet rs = stmt.getResultSet();
+
+ if (closeOnCompletion) {
+ stmt.closeOnCompletion();
+ }
+
+ rs.close();
+ assertFalse(stmt.isClosed());
+
+ stmt.getMoreResults();
+ stmt.getResultSet();
+
+ stmt.getMoreResults();
+ rs = stmt.getResultSet();
+
+ rs.close();
+
+ if (closeOnCompletion) {
+ assertTrue(stmt.isClosed());
+ } else {
+ assertFalse(stmt.isClosed());
+ }
+ }
+
+ @ParameterizedTest(name = "closeOnCompletion = {0}")
+ @ValueSource(booleans = {true, false})
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-21129")
+ public void testCloseOnCompletionFirstRsClosed2(boolean closeOnCompletion) throws Exception {
+ stmt.execute("SELECT 1; DROP TABLE IF EXISTS TEST_TX;");
+ ResultSet rs = stmt.getResultSet();
+
+ if (closeOnCompletion) {
+ stmt.closeOnCompletion();
+ }
+
+ rs.close();
+
+ if (closeOnCompletion) {
+ assertTrue(stmt.isClosed());
+ } else {
+ assertFalse(stmt.isClosed());
+ }
+ }
+
+ @Test
+ public void noMoreResultsArePossibleAfterCloseOnCompletion() throws Exception {
+ stmt.execute("SELECT 1; SELECT 2; SELECT 3");
+ // SELECT 2;
+ assertTrue(stmt.getMoreResults());
+
+ stmt.closeOnCompletion();
+
+ // SELECT 3;
+ assertTrue(stmt.getMoreResults());
+ assertFalse(stmt.isClosed());
+
+ // no more results, auto close statement
+ assertFalse(stmt.getMoreResults());
+ assertThrowsSqlException("Statement is closed", () -> stmt.getMoreResults());
+ assertTrue(stmt.isClosed());
+ }
+
+ @Test
+ public void requestMoreThanOneFetch() throws Exception {
+ int range = stmt.getFetchSize() + 100;
+ stmt.execute(format("START TRANSACTION; SELECT * FROM TABLE(system_range(0, {})); COMMIT;", range));
+ assertEquals(range + 1, getResultSetSize());
+
+ stmt.execute("START TRANSACTION; SELECT * FROM TABLE(system_range(0, 2000)); COMMIT;");
+ stmt.getMoreResults();
+ ResultSet rs = stmt.getResultSet();
+ rs.close();
+ stmt.getMoreResults();
+ assertTrue(checkNoMoreResults());
+ }
+
+ @Test
+ public void moreResultsAfterClosedRs() throws Exception {
+ stmt.execute("START TRANSACTION; SELECT 1; SELECT 2; COMMIT;");
+ stmt.getMoreResults();
+ ResultSet rs = stmt.getResultSet();
+ rs.close();
+ assertTrue(stmt.getMoreResults());
+ stmt.getResultSet().next();
+ assertEquals(2, stmt.getResultSet().getInt(1));
+ }
+
+ @Test
+ public void testMixedDmlQueryExecute() throws Exception {
+ boolean res = stmt.execute("INSERT INTO TEST_TX VALUES (6, 5, '5'); DELETE FROM TEST_TX WHERE ID=6; SELECT 1;");
+ assertFalse(res);
+ assertEquals(1, getResultSetSize());
+
+ res = stmt.execute("SELECT 1; INSERT INTO TEST_TX VALUES (7, 5, '5'); DELETE FROM TEST_TX WHERE ID=6;");
+ assertEquals(true, res);
+ assertEquals(1, getResultSetSize());
+
+ // empty results set in the middle
+ res = stmt.execute("SELECT * FROM TEST_TX; INSERT INTO TEST_TX VALUES (6, 6, '6'); SELECT * FROM TEST_TX;");
+ assertEquals(true, res);
+ assertEquals(11, getResultSetSize());
+ }
+
+ @Test
+ public void testMiscDmlExecute() throws Exception {
+ boolean res = stmt.execute("DROP TABLE IF EXISTS TEST_TX; DROP TABLE IF EXISTS SOME_UNEXISTING_TBL;");
+ assertFalse(res);
+ assertEquals(0, stmt.getUpdateCount());
+ assertFalse(stmt.getMoreResults());
+ assertEquals(0, stmt.getUpdateCount());
+
+ res = stmt.execute("CREATE TABLE TEST_TX (ID INT PRIMARY KEY, AGE INT, NAME VARCHAR) ");
+ assertFalse(res);
+ assertEquals(0, stmt.getUpdateCount());
+ assertFalse(stmt.getMoreResults());
+ assertNull(stmt.getResultSet());
+
+ res = stmt.execute("INSERT INTO TEST_TX VALUES (1, 17, 'James'), (2, 43, 'Valery');");
+ assertFalse(res);
+ assertEquals(2, stmt.getUpdateCount());
+ assertFalse(stmt.getMoreResults());
+ assertNull(stmt.getResultSet());
+
+ res = stmt.execute("DROP TABLE IF EXISTS PUBLIC.TRANSACTIONS; INSERT INTO TEST_TX VALUES (3, 25, 'Michel');");
+ assertFalse(res);
+ assertEquals(0, stmt.getUpdateCount());
+ assertFalse(stmt.getMoreResults());
+ assertEquals(1, stmt.getUpdateCount());
+ }
+
+ @Test
+ public void testPureTransaction() throws Exception {
+ boolean res = stmt.execute("START TRANSACTION; COMMIT");
+ assertFalse(res);
+ assertNull(stmt.getResultSet());
+ assertEquals(0, stmt.getUpdateCount());
+ assertFalse(stmt.getMoreResults());
+ assertEquals(0, stmt.getUpdateCount());
+ assertFalse(stmt.getMoreResults());
+ assertEquals(-1, stmt.getUpdateCount());
+ }
+
+ @Test
+ public void testBrokenTransaction() throws Exception {
+ boolean res = stmt.execute("START TRANSACTION;");
+ assertFalse(res);
+ assertNull(stmt.getResultSet());
+ assertEquals(0, stmt.getUpdateCount());
+ assertFalse(stmt.getMoreResults());
+ assertEquals(-1, stmt.getUpdateCount());
+
+ res = stmt.execute("COMMIT;");
+ assertFalse(res);
+ assertNull(stmt.getResultSet());
+ assertEquals(0, stmt.getUpdateCount());
+ assertFalse(stmt.getMoreResults());
+ assertEquals(-1, stmt.getUpdateCount());
+ }
+
+ @Test
+ public void testTransactionQueryInside() throws Exception {
+ stmt.execute("START TRANSACTION; SELECT 1; COMMIT");
+ ResultSet resultSet = stmt.getResultSet();
+ assertNull(resultSet);
+ assertEquals(0, stmt.getUpdateCount());
+
+ // SELECT 1
+ assertTrue(stmt.getMoreResults());
+ resultSet = stmt.getResultSet();
+ assertNotNull(resultSet);
+
+ // COMMIT
+ assertFalse(stmt.getMoreResults());
+ resultSet = stmt.getResultSet();
+ assertNull(resultSet);
+ assertEquals(0, stmt.getUpdateCount());
+
+ // after commit
+ assertFalse(stmt.getMoreResults());
+ assertEquals(-1, stmt.getUpdateCount());
+ }
+
+ @Test
+ public void testTransactionQueryInsideOutside() throws Exception {
+ stmt.execute("START TRANSACTION; SELECT 1; COMMIT; SELECT 2;");
+ ResultSet resultSet = stmt.getResultSet();
+ assertNull(resultSet);
+ assertEquals(0, stmt.getUpdateCount());
+
+ // SELECT 1;
+ assertTrue(stmt.getMoreResults());
+ resultSet = stmt.getResultSet();
+ assertNotNull(resultSet);
+
+ // COMMIT;
+ assertFalse(stmt.getMoreResults());
+ resultSet = stmt.getResultSet();
+ assertNull(resultSet);
+ assertEquals(0, stmt.getUpdateCount());
+
+ // SELECT 2;
+ assertTrue(stmt.getMoreResults());
+ resultSet = stmt.getResultSet();
+ assertNotNull(resultSet);
+ assertEquals(-1, stmt.getUpdateCount());
+
+ // after
+ assertFalse(stmt.getMoreResults());
+ assertEquals(-1, stmt.getUpdateCount());
+ }
+
+ @Test
+ public void testDmlInsideTransaction() throws Exception {
+ stmt.execute("START TRANSACTION; INSERT INTO TEST_TX VALUES (5, 19, 'Nick'); COMMIT");
+ assertEquals(0, stmt.getUpdateCount());
+ stmt.getMoreResults();
+ assertEquals(1, stmt.getUpdateCount());
+ stmt.getMoreResults();
+ assertEquals(0, stmt.getUpdateCount());
+ assertTrue(checkNoMoreResults());
}
- /**
- * Assert that script containing both h2 and non h2 (native) sql statements is handled correctly.
- */
@Test
- public void testMixedCommands() throws Exception {
- execute("CREATE TABLE public.transactions (pk INT, id INT, k VARCHAR, v VARCHAR, PRIMARY KEY (pk, id)); "
- + "CREATE INDEX transactions_id_k_v ON public.transactions (id, k, v) INLINE_SIZE 150; "
- + "INSERT INTO public.transactions VALUES (1,2,'some', 'word') ; "
- + "CREATE INDEX transactions_k_v_id ON public.transactions (k, v, id) INLINE_SIZE 150; "
- + "CREATE INDEX transactions_pk_id ON public.transactions (pk, id) INLINE_SIZE 20;");
+ public void testAutoCommitFalseNonCompleted() throws Exception {
+ String txErrMsg = "Transaction control statement cannot be executed within an external transaction";
+ conn.setAutoCommit(false);
+ assertThrowsSqlException(txErrMsg, () -> stmt.execute("COMMIT"));
+
+ boolean res = stmt.execute("SELECT 1;COMMIT");
+ assertTrue(res);
+ assertNotNull(stmt.getResultSet());
+ assertThrowsSqlException(txErrMsg, () -> stmt.getMoreResults());
+
+ assertThrowsSqlException(txErrMsg, () -> stmt.execute("START TRANSACTION; SELECT 1;"));
+ }
+
+ @Test
+ public void testAutoCommitFalse() throws Exception {
+ conn.setAutoCommit(false);
+
+ stmt.execute("SELECT 1;");
+ ResultSet rs = stmt.getResultSet();
+ assertTrue(rs.next());
+ assertEquals(1, rs.getInt(1));
+
+ stmt.execute("INSERT INTO TEST_TX VALUES (5, 19, 'Nick');");
+ conn.rollback();
+
+ stmt.execute("SELECT COUNT(ID) FROM TEST_TX WHERE ID=5;");
+ rs = stmt.getResultSet();
+ assertTrue(rs.next());
+ assertEquals(0, rs.getInt(1));
+
+
+ stmt.execute("INSERT INTO TEST_TX VALUES (5, 19, 'Nick');");
+ conn.commit();
+
+ stmt.execute("SELECT COUNT(ID) FROM TEST_TX WHERE ID=5;");
+ rs = stmt.getResultSet();
+ assertTrue(rs.next());
+ assertEquals(1, rs.getInt(1));
+ }
+
+ @Test
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-21167")
+ public void testAutoCommitFalseWithEmptyTx() throws Exception {
+ String txErrMsg = "Transaction control statement cannot be executed within an external transaction";
+ conn.setAutoCommit(false);
+ assertThrowsSqlException(txErrMsg, () -> stmt.execute("START TRANSACTION; SELECT 1; COMMIT;"));
+ }
+
+ @Test
+ public void testPreviousResultSetIsClosedExecute() throws Exception {
+ boolean res = stmt.execute("SELECT ID FROM TEST_TX; SELECT 1;");
+ assertEquals(true, res);
+ stmt.getResultSet();
+ ResultSet rs = stmt.getResultSet();
+
+ res = stmt.getMoreResults();
+ assertEquals(true, res);
+
+ assertTrue(rs.isClosed());
+ assertNotNull(stmt.getResultSet());
+ assertTrue(checkNoMoreResults());
+
+ stmt.execute("SELECT 1; SELECT 2; SELECT 3;");
+ ResultSet rs1 = stmt.getResultSet();
+ stmt.getMoreResults();
+ assertTrue(rs1.isClosed());
+ ResultSet rs2 = stmt.getResultSet();
+ stmt.getMoreResults();
+ assertTrue(rs2.isClosed());
+ rs = stmt.getResultSet();
+ assertTrue(rs.next());
+ assertEquals(3, rs.getObject(1));
+ rs.close();
+ }
+
+ @Test
+ public void testPreviousResultsNotInvolvedExecute() throws Exception {
+ boolean res = stmt.execute("SELECT ID FROM TEST_TX; SELECT 1;");
+ assertEquals(true, res);
+ assertEquals(5, getResultSetSize());
+
+ res = stmt.execute("SELECT 1; SELECT 1;");
+ assertEquals(true, res);
+ assertEquals(2, getResultSetSize());
+ }
+
+ /** Check update count invariants. */
+ @Test
+ public void testUpdCountMisc() throws Exception {
+ // pure select case
+ stmt.execute("SELECT 1; SELECT 1;");
+ assertEquals(-1, stmt.getUpdateCount());
+
+ ResultSet rs = stmt.getResultSet();
+ assertEquals(-1, stmt.getUpdateCount());
+
+ rs.next();
+ assertEquals(-1, stmt.getUpdateCount());
+
+ stmt.getMoreResults();
+ assertTrue(rs.isClosed());
+ rs = stmt.getResultSet();
+ assertEquals(-1, stmt.getUpdateCount());
+
+ rs.next();
+ assertEquals(-1, stmt.getUpdateCount());
+
+ // empty result
+ stmt.execute("SELECT ID FROM TEST_TX WHERE ID=1000;");
+ assertEquals(-1, stmt.getUpdateCount());
+ }
+
+ @Test
+ public void testUpdCountNoMoreResults() throws Exception {
+ stmt.execute("INSERT INTO TEST_TX VALUES (5, 5, '5'), (6, 5, '5');");
+ assertEquals(2, stmt.getUpdateCount());
+ stmt.getMoreResults();
+ assertEquals(-1, stmt.getUpdateCount());
+ stmt.getResultSet();
+ assertEquals(-1, stmt.getUpdateCount());
+ }
+
+ @Test
+ public void testMixedQueriesUpdCount() throws Exception {
+ stmt.execute("SELECT 1; INSERT INTO TEST_TX VALUES (7, 5, '5');");
+ stmt.getMoreResults();
+ assertEquals(1, stmt.getUpdateCount());
+ assertEquals(1, stmt.getUpdateCount());
+ stmt.getMoreResults();
+ assertEquals(-1, stmt.getUpdateCount());
+
+ stmt.execute("DROP TABLE IF EXISTS TEST_TX; DROP TABLE IF EXISTS PUBLIC.TRANSACTIONS;");
+ assertEquals(0, stmt.getUpdateCount());
+
+ stmt.execute("CREATE TABLE TEST_TX (ID INT PRIMARY KEY, AGE INT, NAME VARCHAR) ");
+ assertEquals(0, stmt.getUpdateCount());
+ }
+
+ @Test
+ public void testUpdCountAfterError() throws Exception {
+ try {
+ stmt.execute("INSERT INTO NOT_EXIST VALUES (3, 17, 'James');");
+ } catch (Throwable ignored) {
+ // No op.
+ }
+ ResultSet rs = stmt.getResultSet();
+ assertNull(rs);
+ assertEquals(-1, stmt.getUpdateCount());
+ }
+
+ @Test
+ public void testResultsFromExecuteBatch() throws Exception {
+ stmt.addBatch("INSERT INTO TEST_TX VALUES (7, 25, 'Michel');");
+ stmt.addBatch("INSERT INTO TEST_TX VALUES (8, 25, 'Michel');");
+ int[] arr = stmt.executeBatch();
+
+ assertEquals(2, arr.length);
+ assertArrayEquals(new int[]{1, 1}, arr);
+ assertEquals(-1, stmt.getUpdateCount());
+ assertFalse(stmt.getMoreResults());
}
/**
* Sanity test for scripts, containing empty statements are handled correctly.
*/
@Test
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-21081")
public void testEmptyStatements() throws Exception {
- execute(";; ;;;;");
+ execute(";;;SELECT 1 + 2");
execute(" ;; ;;;; ");
execute("CREATE TABLE ONE (id INT PRIMARY KEY, VAL VARCHAR);;"
+ "CREATE INDEX T_IDX ON ONE(val)"
+ ";;UPDATE ONE SET VAL = 'SOME';;; ");
- // TODO: IGNITE-19150 We are waiting for schema synchronization to avoid races to create and destroy indexes
- // org.apache.ignite.internal.sql.engine.ClusterPerClassIntegrationTest.waitForIndexBuild
-
execute("DROP INDEX T_IDX ;; ;;"
+ "UPDATE ONE SET VAL = 'SOME'");
}
/**
- * Check multi-statement containing both h2 and native parser statements (having "?" args) works well.
+ * Check multiple statements execution through prepared statement.
*/
@Test
- public void testMultiStatementTxWithParams() throws Exception {
+ public void testMultiStatementPreparedStatement() throws Exception {
int leoAge = 28;
String nickolas = "Nickolas";
@@ -104,12 +574,11 @@ public class ItJdbcMultiStatementSelfTest extends AbstractJdbcSelfTest {
int delYounger = 19;
String complexQuery =
- "INSERT INTO TEST_TX VALUES (5, ?, 'Leo'); " // 1
- + ";;;;"
- + "BEGIN ; "
- + "UPDATE TEST_TX SET name = ? WHERE name = 'Nick' ;" // 2
- + "INSERT INTO TEST_TX VALUES (6, ?, ?); " // 3, 4
- + "DELETE FROM TEST_TX WHERE age < ?; " // 5
+ "INSERT INTO TEST_TX VALUES (5, ?, 'Leo'); "
+ + "START TRANSACTION ; "
+ + "UPDATE TEST_TX SET name = ? WHERE name = 'Nick' ;"
+ + "INSERT INTO TEST_TX VALUES (6, ?, ?); "
+ + "DELETE FROM TEST_TX WHERE age < ?; "
+ "COMMIT;";
try (PreparedStatement p = conn.prepareStatement(complexQuery)) {
@@ -118,56 +587,21 @@ public class ItJdbcMultiStatementSelfTest extends AbstractJdbcSelfTest {
p.setInt(3, gabAge);
p.setString(4, gabName);
p.setInt(5, delYounger);
-
- assertFalse(p.execute(), "Expected, that first result is an update count.");
-
- assertTrue(p.getUpdateCount() != -1, "Expected update count of the INSERT.");
- assertTrue(p.getMoreResults(), "More results are expected.");
-
- assertTrue(p.getUpdateCount() != -1, "Expected update count of an empty statement.");
- assertTrue(p.getMoreResults(), "More results are expected.");
- assertTrue(p.getUpdateCount() != -1, "Expected update count of an empty statement.");
- assertTrue(p.getMoreResults(), "More results are expected.");
- assertTrue(p.getUpdateCount() != -1, "Expected update count of an empty statement.");
- assertTrue(p.getMoreResults(), "More results are expected.");
- assertTrue(p.getUpdateCount() != -1, "Expected update count of an empty statement.");
- assertTrue(p.getMoreResults(), "More results are expected.");
-
- assertTrue(p.getUpdateCount() != -1, "Expected update count of the BEGIN");
- assertTrue(p.getMoreResults(), "More results are expected.");
- assertTrue(p.getUpdateCount() != -1, "Expected update count of the UPDATE");
- assertTrue(p.getMoreResults(), "More results are expected.");
- assertTrue(p.getUpdateCount() != -1, "Expected update count of the INSERT");
- assertTrue(p.getMoreResults(), "More results are expected.");
- assertTrue(p.getUpdateCount() != -1, "Expected update count of the DELETE");
- assertTrue(p.getMoreResults(), "More results are expected.");
- assertTrue(p.getUpdateCount() != -1, "Expected update count of the COMMIT");
-
- assertFalse(p.getMoreResults(), "There should have been no results.");
- assertFalse(p.getUpdateCount() != -1, "There should have been no update results.");
}
- try (PreparedStatement sel = conn.prepareStatement("SELECT * FROM TEST_TX ORDER BY ID;")) {
- try (ResultSet pers = sel.executeQuery()) {
- assertTrue(pers.next());
- assertEquals(43, age(pers));
- assertEquals("Valery", name(pers));
-
- assertTrue(pers.next());
- assertEquals(25, age(pers));
- assertEquals("Michel", name(pers));
+ complexQuery = "UPDATE TEST_TX SET name = ? WHERE name = 'James';";
- assertTrue(pers.next());
- assertEquals(19, age(pers));
- assertEquals("Nickolas", name(pers));
+ try (PreparedStatement p = conn.prepareStatement(complexQuery)) {
+ p.setString(1, nickolas);
- assertTrue(pers.next());
- assertEquals(28, age(pers));
- assertEquals("Leo", name(pers));
+ p.execute();
+ }
+ try (PreparedStatement sel = conn.prepareStatement("SELECT * FROM TEST_TX ORDER BY ID LIMIT 1;")) {
+ try (ResultSet pers = sel.executeQuery()) {
assertTrue(pers.next());
- assertEquals(84, age(pers));
- assertEquals("Gab", name(pers));
+ assertEquals(17, age(pers));
+ assertEquals(nickolas, name(pers));
assertFalse(pers.next());
}
@@ -188,4 +622,49 @@ public class ItJdbcMultiStatementSelfTest extends AbstractJdbcSelfTest {
return rs.getInt("AGE");
}
+ /**
+ * Execute sql script using thin driver.
+ */
+ private boolean execute(String sql) throws Exception {
+ return stmt.execute(sql);
+ }
+
+ /**
+ * Check summary results returned from statement.
+ *
+ * @throws SQLException If failed.
+ */
+ private int getResultSetSize() throws SQLException {
+ ResultSet rs = stmt.getResultSet();
+ int size = -1;
+ boolean more;
+ int updCount;
+
+ do {
+ if (rs != null) {
+ if (size == -1) {
+ size = 0;
+ }
+ while (rs.next()) {
+ ++size;
+ }
+ }
+
+ if (stmt.getMoreResults()) {
+ rs = stmt.getResultSet();
+ more = true;
+ } else {
+ rs = null;
+ more = false;
+ }
+ updCount = stmt.getUpdateCount();
+ } while (more || updCount != -1);
+ return size;
+ }
+
+ private boolean checkNoMoreResults() throws SQLException {
+ boolean more = stmt.getMoreResults();
+ int updCnt = stmt.getUpdateCount();
+ return !more && updCnt == -1;
+ }
}
diff --git a/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/ItJdbcStatementSelfTest.java b/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/ItJdbcStatementSelfTest.java
index 59642f2e0d..dcb939a066 100644
--- a/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/ItJdbcStatementSelfTest.java
+++ b/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/ItJdbcStatementSelfTest.java
@@ -37,7 +37,6 @@ import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
/**
@@ -47,6 +46,8 @@ public class ItJdbcStatementSelfTest extends ItJdbcAbstractStatementSelfTest {
/** SQL query. */
private static final String SQL = "select * from PERSON where age > 30";
+ private int populateStmtCnt = 10;
+
@BeforeAll
public static void beforeClass() throws Exception {
try (Statement statement = conn.createStatement()) {
@@ -57,9 +58,7 @@ public class ItJdbcStatementSelfTest extends ItJdbcAbstractStatementSelfTest {
@BeforeEach
public void beforeEach() throws Exception {
try (Statement statement = conn.createStatement()) {
- int stmtCnt = 10;
-
- for (int i = 0; i < stmtCnt; ++i) {
+ for (int i = 0; i < populateStmtCnt; ++i) {
statement.executeUpdate("insert into TEST (ID, NAME) values (" + i + ", 'name_" + i + "'); ");
}
}
@@ -347,7 +346,6 @@ public class ItJdbcStatementSelfTest extends ItJdbcAbstractStatementSelfTest {
}
@Test
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-16960")
public void testExecuteQueryMultipleOnlyResultSets() throws Exception {
assertTrue(conn.getMetaData().supportsMultipleResultSets());
@@ -381,7 +379,6 @@ public class ItJdbcStatementSelfTest extends ItJdbcAbstractStatementSelfTest {
}
@Test
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-16276")
public void testExecuteQueryMultipleOnlyDml() throws Exception {
Statement stmt0 = conn.createStatement();
@@ -406,7 +403,7 @@ public class ItJdbcStatementSelfTest extends ItJdbcAbstractStatementSelfTest {
assertEquals(0, stmt0.getUpdateCount());
for (int i = 0; i < stmtCnt; ++i) {
- assertTrue(stmt0.getMoreResults());
+ assertFalse(stmt0.getMoreResults());
assertNull(stmt0.getResultSet());
assertEquals(1, stmt0.getUpdateCount());
@@ -416,7 +413,6 @@ public class ItJdbcStatementSelfTest extends ItJdbcAbstractStatementSelfTest {
}
@Test
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-16276")
public void testExecuteQueryMultipleMixed() throws Exception {
int stmtCnt = 10;
@@ -436,19 +432,20 @@ public class ItJdbcStatementSelfTest extends ItJdbcAbstractStatementSelfTest {
assertNull(stmt.getResultSet());
assertEquals(0, stmt.getUpdateCount());
- assertTrue(stmt.getMoreResults(), "Result set doesn't have more results.");
+ // DROP TABLE
+ assertFalse(stmt.getMoreResults(), "Result set doesn't have more results.");
// CREATE TABLE statement
assertNull(stmt.getResultSet());
assertEquals(0, stmt.getUpdateCount());
for (int i = 0; i < stmtCnt; ++i) {
- assertTrue(stmt.getMoreResults());
-
if (i % 2 == 0) {
+ assertFalse(stmt.getMoreResults());
assertNull(stmt.getResultSet());
assertEquals(1, stmt.getUpdateCount());
} else {
+ assertTrue(stmt.getMoreResults());
assertEquals(-1, stmt.getUpdateCount());
ResultSet rs = stmt.getResultSet();
@@ -462,8 +459,6 @@ public class ItJdbcStatementSelfTest extends ItJdbcAbstractStatementSelfTest {
assertEquals((i + 1) / 2, rowsCnt);
}
}
-
- assertFalse(stmt.getMoreResults());
}
@Test
@@ -783,4 +778,37 @@ public class ItJdbcStatementSelfTest extends ItJdbcAbstractStatementSelfTest {
+ "Because update statement is executed via 'executeQuery' method."
+ " Data [val=" + rs.getString(1) + ']');
}
+
+ @Test
+ public void testOpenCursorsPureQuery() throws Exception {
+ int initial = openResources();
+
+ stmt.execute("SELECT 1; SELECT 2;");
+ ResultSet rs = stmt.getResultSet();
+ stmt.execute("SELECT 3;");
+ assertTrue(rs.isClosed());
+
+ assertTrue(populateStmtCnt < 100);
+ //more than one fetch request
+ for (int i = populateStmtCnt; i < stmt.getMaxRows() + 100; ++i) {
+ stmt.execute(String.format("INSERT INTO TEST VALUES (%d, '1')", i));
+ }
+
+ stmt.close();
+ assertEquals(0, openResources() - initial);
+ assertEquals(0, openCursors());
+ }
+
+ @Test
+ public void testOpenCursorsWithDdl() throws Exception {
+ int initial = openResources();
+
+ stmt.execute("CREATE TABLE T1(ID INT PRIMARY KEY, AGE INT, NAME VARCHAR)");
+ stmt.getResultSet();
+ stmt.execute("SELECT 3;");
+ stmt.execute("DROP TABLE T1");
+ stmt.getResultSet();
+
+ assertEquals(0, openResources() - initial);
+ }
}
diff --git a/modules/jdbc/src/main/java/org/apache/ignite/internal/jdbc/JdbcClientQueryCursorHandler.java b/modules/jdbc/src/main/java/org/apache/ignite/internal/jdbc/JdbcClientQueryCursorHandler.java
index 2095eb38d8..dc6a10baa1 100644
--- a/modules/jdbc/src/main/java/org/apache/ignite/internal/jdbc/JdbcClientQueryCursorHandler.java
+++ b/modules/jdbc/src/main/java/org/apache/ignite/internal/jdbc/JdbcClientQueryCursorHandler.java
@@ -21,12 +21,13 @@ import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.client.ClientChannel;
import org.apache.ignite.internal.client.proto.ClientOp;
import org.apache.ignite.internal.jdbc.proto.JdbcQueryCursorHandler;
+import org.apache.ignite.internal.jdbc.proto.event.JdbcFetchQueryResultsRequest;
import org.apache.ignite.internal.jdbc.proto.event.JdbcMetaColumnsResult;
import org.apache.ignite.internal.jdbc.proto.event.JdbcQueryCloseRequest;
import org.apache.ignite.internal.jdbc.proto.event.JdbcQueryCloseResult;
-import org.apache.ignite.internal.jdbc.proto.event.JdbcQueryFetchRequest;
import org.apache.ignite.internal.jdbc.proto.event.JdbcQueryFetchResult;
import org.apache.ignite.internal.jdbc.proto.event.JdbcQueryMetadataRequest;
+import org.apache.ignite.internal.jdbc.proto.event.JdbcQuerySingleResult;
/**
* Jdbc client cursor events handler implementation.
@@ -40,13 +41,13 @@ public class JdbcClientQueryCursorHandler implements JdbcQueryCursorHandler {
*
* @param channel Client channel.
*/
- public JdbcClientQueryCursorHandler(ClientChannel channel) {
+ JdbcClientQueryCursorHandler(ClientChannel channel) {
this.channel = channel;
}
/** {@inheritDoc} */
@Override
- public CompletableFuture<JdbcQueryFetchResult> fetchAsync(JdbcQueryFetchRequest req) {
+ public CompletableFuture<JdbcQueryFetchResult> fetchAsync(JdbcFetchQueryResultsRequest req) {
return channel.serviceAsync(ClientOp.JDBC_NEXT, w -> req.writeBinary(w.out()), r -> {
JdbcQueryFetchResult res = new JdbcQueryFetchResult();
@@ -56,6 +57,18 @@ public class JdbcClientQueryCursorHandler implements JdbcQueryCursorHandler {
});
}
+ /** {@inheritDoc} */
+ @Override
+ public CompletableFuture<JdbcQuerySingleResult> getMoreResultsAsync(JdbcFetchQueryResultsRequest req) {
+ return channel.serviceAsync(ClientOp.JDBC_MORE_RESULTS, w -> req.writeBinary(w.out()), r -> {
+ JdbcQuerySingleResult res = new JdbcQuerySingleResult();
+
+ res.readBinary(r.in());
+
+ return res;
+ });
+ }
+
/** {@inheritDoc} */
@Override
public CompletableFuture<JdbcQueryCloseResult> closeAsync(JdbcQueryCloseRequest req) {
diff --git a/modules/jdbc/src/main/java/org/apache/ignite/internal/jdbc/JdbcPreparedStatement.java b/modules/jdbc/src/main/java/org/apache/ignite/internal/jdbc/JdbcPreparedStatement.java
index 408657d215..7105a83c5c 100644
--- a/modules/jdbc/src/main/java/org/apache/ignite/internal/jdbc/JdbcPreparedStatement.java
+++ b/modules/jdbc/src/main/java/org/apache/ignite/internal/jdbc/JdbcPreparedStatement.java
@@ -113,7 +113,7 @@ public class JdbcPreparedStatement extends JdbcStatement implements PreparedStat
/** {@inheritDoc} */
@Override
public ResultSet executeQuery() throws SQLException {
- executeWithArguments(JdbcStatementType.SELECT_STATEMENT_TYPE);
+ executeWithArguments(JdbcStatementType.SELECT_STATEMENT_TYPE, false);
ResultSet rs = getResultSet();
@@ -170,7 +170,7 @@ public class JdbcPreparedStatement extends JdbcStatement implements PreparedStat
/** {@inheritDoc} */
@Override
public int executeUpdate() throws SQLException {
- executeWithArguments(JdbcStatementType.UPDATE_STATEMENT_TYPE);
+ executeWithArguments(JdbcStatementType.UPDATE_STATEMENT_TYPE, false);
int res = getUpdateCount();
@@ -212,7 +212,7 @@ public class JdbcPreparedStatement extends JdbcStatement implements PreparedStat
/** {@inheritDoc} */
@Override
public boolean execute() throws SQLException {
- executeWithArguments(JdbcStatementType.ANY_STATEMENT_TYPE);
+ executeWithArguments(JdbcStatementType.ANY_STATEMENT_TYPE, true);
return isQuery();
}
@@ -758,11 +758,11 @@ public class JdbcPreparedStatement extends JdbcStatement implements PreparedStat
* @param statementType Expected statement type.
* @throws SQLException If failed.
*/
- private void executeWithArguments(JdbcStatementType statementType) throws SQLException {
+ private void executeWithArguments(JdbcStatementType statementType, boolean multiStatement) throws SQLException {
Object[] args = currentArgs == null ? ArrayUtils.OBJECT_EMPTY_ARRAY :
currentArgs.stream().map(this::convertJdbcTypeToInternal).toArray();
- execute0(statementType, sql, args);
+ execute0(statementType, sql, multiStatement, args);
}
private static void checkType(int sqlType) throws SQLException {
diff --git a/modules/jdbc/src/main/java/org/apache/ignite/internal/jdbc/JdbcQueryExecuteResponse.java b/modules/jdbc/src/main/java/org/apache/ignite/internal/jdbc/JdbcQueryExecuteResponse.java
index cb2a5e5448..a3202921a7 100644
--- a/modules/jdbc/src/main/java/org/apache/ignite/internal/jdbc/JdbcQueryExecuteResponse.java
+++ b/modules/jdbc/src/main/java/org/apache/ignite/internal/jdbc/JdbcQueryExecuteResponse.java
@@ -20,7 +20,7 @@ package org.apache.ignite.internal.jdbc;
import org.apache.ignite.internal.client.ClientChannel;
import org.apache.ignite.internal.client.proto.ClientMessagePacker;
import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
-import org.apache.ignite.internal.jdbc.proto.event.JdbcQueryExecuteResult;
+import org.apache.ignite.internal.jdbc.proto.event.JdbcQuerySingleResult;
import org.apache.ignite.internal.jdbc.proto.event.Response;
import org.apache.ignite.internal.tostring.S;
@@ -29,7 +29,7 @@ import org.apache.ignite.internal.tostring.S;
*/
public class JdbcQueryExecuteResponse extends Response {
/** Query result. */
- private JdbcQueryExecuteResult result;
+ private JdbcQuerySingleResult result;
/** Client channel. */
private final ClientChannel channel;
@@ -52,7 +52,7 @@ public class JdbcQueryExecuteResponse extends Response {
/** {@inheritDoc} */
@Override
public void readBinary(ClientMessageUnpacker unpacker) {
- result = new JdbcQueryExecuteResult();
+ result = new JdbcQuerySingleResult();
result.readBinary(unpacker);
}
@@ -75,12 +75,16 @@ public class JdbcQueryExecuteResponse extends Response {
return result.hasResults();
}
+ boolean hasResult() {
+ return result.resultAvailable();
+ }
+
/**
* Get the query results.
*
* @return Query result.
*/
- public JdbcQueryExecuteResult result() {
+ public JdbcQuerySingleResult result() {
return result;
}
diff --git a/modules/jdbc/src/main/java/org/apache/ignite/internal/jdbc/JdbcResultSet.java b/modules/jdbc/src/main/java/org/apache/ignite/internal/jdbc/JdbcResultSet.java
index 688fef2eba..75892545a6 100644
--- a/modules/jdbc/src/main/java/org/apache/ignite/internal/jdbc/JdbcResultSet.java
+++ b/modules/jdbc/src/main/java/org/apache/ignite/internal/jdbc/JdbcResultSet.java
@@ -65,13 +65,17 @@ import org.apache.ignite.internal.jdbc.proto.IgniteQueryErrorCode;
import org.apache.ignite.internal.jdbc.proto.JdbcQueryCursorHandler;
import org.apache.ignite.internal.jdbc.proto.SqlStateCode;
import org.apache.ignite.internal.jdbc.proto.event.JdbcColumnMeta;
+import org.apache.ignite.internal.jdbc.proto.event.JdbcFetchQueryResultsRequest;
import org.apache.ignite.internal.jdbc.proto.event.JdbcMetaColumnsResult;
import org.apache.ignite.internal.jdbc.proto.event.JdbcQueryCloseRequest;
import org.apache.ignite.internal.jdbc.proto.event.JdbcQueryCloseResult;
-import org.apache.ignite.internal.jdbc.proto.event.JdbcQueryFetchRequest;
import org.apache.ignite.internal.jdbc.proto.event.JdbcQueryFetchResult;
import org.apache.ignite.internal.jdbc.proto.event.JdbcQueryMetadataRequest;
+import org.apache.ignite.internal.jdbc.proto.event.JdbcQuerySingleResult;
+import org.apache.ignite.internal.jdbc.proto.event.Response;
import org.apache.ignite.internal.util.TransformingIterator;
+import org.apache.ignite.sql.ColumnType;
+import org.jetbrains.annotations.Nullable;
/**
* Jdbc result set implementation.
@@ -131,9 +135,6 @@ public class JdbcResultSet implements ResultSet {
/** Is query flag. */
private boolean isQuery;
- /** Auto close server cursors flag. */
- private boolean autoClose;
-
/** Update count. */
private long updCnt;
@@ -152,6 +153,9 @@ public class JdbcResultSet implements ResultSet {
/** Function to deserialize raw rows to list of objects. */
private Function<BinaryTupleReader, List<Object>> transformer;
+ /** If {#code true} indicates that handler still holds cursor in resources. */
+ private boolean holdsResource = true;
+
/**
* Creates new result set.
*
@@ -162,14 +166,13 @@ public class JdbcResultSet implements ResultSet {
* @param finished Finished flag.
* @param rows Rows.
* @param isQry Is Result ser for Select query.
- * @param autoClose Is automatic close of server cursors enabled.
* @param updCnt Update count.
* @param closeStmt Close statement on the result set close.
* @param columnCount Count of columns in resultSet row.
* @param transformer Function to deserialize raw rows to list of objects.
*/
JdbcResultSet(JdbcQueryCursorHandler handler, JdbcStatement stmt, Long cursorId, int fetchSize, boolean finished,
- List<BinaryTupleReader> rows, boolean isQry, boolean autoClose, long updCnt, boolean closeStmt, int columnCount,
+ List<BinaryTupleReader> rows, boolean isQry, long updCnt, boolean closeStmt, int columnCount,
Function<BinaryTupleReader, List<Object>> transformer) {
assert stmt != null;
assert fetchSize > 0;
@@ -180,7 +183,6 @@ public class JdbcResultSet implements ResultSet {
this.fetchSize = fetchSize;
this.finished = finished;
this.isQuery = isQry;
- this.autoClose = autoClose;
this.closeStmt = closeStmt;
this.columnCount = columnCount;
this.transformer = transformer;
@@ -215,13 +217,54 @@ public class JdbcResultSet implements ResultSet {
initColumnOrder();
}
+ boolean holdResults() {
+ return rows != null;
+ }
+
+ @Nullable JdbcResultSet getNextResultSet() throws SQLException {
+ try {
+ JdbcFetchQueryResultsRequest req = new JdbcFetchQueryResultsRequest(cursorId, fetchSize);
+ JdbcQuerySingleResult res = cursorHandler.getMoreResultsAsync(req).get();
+
+ close0(true);
+
+ if (!res.resultAvailable()) {
+ if (res.status() == Response.STATUS_FAILED) {
+ throw IgniteQueryErrorCode.createJdbcSqlException(res.err(), res.status());
+ }
+
+ return null;
+ }
+
+ long cursorId0 = res.cursorId();
+
+ List<ColumnType> columnTypes = res.columnTypes();
+ int[] decimalScales = res.decimalScales();
+
+ rows = List.of();
+
+ Function<BinaryTupleReader, List<Object>> transformer = createTransformer(columnTypes, decimalScales);
+
+ int colCount = columnTypes == null ? 0 : columnTypes.size();
+
+ return new JdbcResultSet(cursorHandler, stmt, cursorId0, fetchSize, res.last(), res.items(),
+ res.isQuery(), res.updateCount(), closeStmt, colCount, transformer);
+ } catch (InterruptedException e) {
+ throw new SQLException("Thread was interrupted.", e);
+ } catch (ExecutionException e) {
+ throw new SQLException("Fetch request failed.", e);
+ } catch (CancellationException e) {
+ throw new SQLException("Fetch request canceled.", SqlStateCode.QUERY_CANCELLED);
+ }
+ }
+
/** {@inheritDoc} */
@Override
public boolean next() throws SQLException {
ensureNotClosed();
if ((rowsIter == null || !rowsIter.hasNext()) && !finished) {
try {
- JdbcQueryFetchResult res = cursorHandler.fetchAsync(new JdbcQueryFetchRequest(cursorId, fetchSize)).get();
+ JdbcQueryFetchResult res = cursorHandler.fetchAsync(new JdbcFetchQueryResultsRequest(cursorId, fetchSize)).get();
if (!res.hasResults()) {
throw IgniteQueryErrorCode.createJdbcSqlException(res.err(), res.status());
@@ -265,7 +308,7 @@ public class JdbcResultSet implements ResultSet {
/** {@inheritDoc} */
@Override
public void close() throws SQLException {
- close0();
+ close0(false);
if (closeStmt) {
stmt.closeIfAllResultsClosed();
@@ -275,16 +318,20 @@ public class JdbcResultSet implements ResultSet {
/**
* Close result set.
*
+ * @param removeFromResources If {@code true} cursor need to be removed from client resources.
+ *
* @throws SQLException On error.
*/
- void close0() throws SQLException {
- if (isClosed() || cursorId == null) {
+ void close0(boolean removeFromResources) throws SQLException {
+ if (!holdsResource && (isClosed() || cursorId == null)) {
return;
}
+ holdsResource = !removeFromResources;
+
try {
- if (stmt != null && (!finished || (isQuery && !autoClose))) {
- JdbcQueryCloseResult res = cursorHandler.closeAsync(new JdbcQueryCloseRequest(cursorId)).get();
+ if (stmt != null) {
+ JdbcQueryCloseResult res = cursorHandler.closeAsync(new JdbcQueryCloseRequest(cursorId, removeFromResources)).get();
if (!res.hasResults()) {
throw IgniteQueryErrorCode.createJdbcSqlException(res.err(), res.status());
@@ -301,6 +348,10 @@ public class JdbcResultSet implements ResultSet {
}
}
+ boolean holdsResources() {
+ return holdsResource;
+ }
+
/** {@inheritDoc} */
@Override
public boolean wasNull() throws SQLException {
@@ -2217,7 +2268,7 @@ public class JdbcResultSet implements ResultSet {
* @throws SQLException On error.
*/
private void initMeta() throws SQLException {
- if (finished && (!isQuery || autoClose)) {
+ if (finished && !isQuery) {
throw new SQLException("Server cursor is already closed.", SqlStateCode.INVALID_CURSOR_STATE);
}
@@ -2237,4 +2288,24 @@ public class JdbcResultSet implements ResultSet {
throw new SQLException("Metadata request canceled.", SqlStateCode.QUERY_CANCELLED);
}
}
+
+ static Function<BinaryTupleReader, List<Object>> createTransformer(List<ColumnType> columnTypes, int[] decimalScales) {
+ return (tuple) -> {
+ int columnCount = columnTypes.size();
+ List<Object> row = new ArrayList<>(columnCount);
+ int decimalIdx = 0;
+ int currentDecimalScale = -1;
+
+ for (int colIdx = 0; colIdx < columnCount; colIdx++) {
+ ColumnType type = columnTypes.get(colIdx);
+ if (type == ColumnType.DECIMAL) {
+ currentDecimalScale = decimalScales[decimalIdx++];
+ }
+
+ row.add(JdbcConverterUtils.deriveValueFromBinaryTuple(type, tuple, colIdx, currentDecimalScale));
+ }
+
+ return row;
+ };
+ }
}
diff --git a/modules/jdbc/src/main/java/org/apache/ignite/internal/jdbc/JdbcStatement.java b/modules/jdbc/src/main/java/org/apache/ignite/internal/jdbc/JdbcStatement.java
index 9d3d11f188..38836c4928 100644
--- a/modules/jdbc/src/main/java/org/apache/ignite/internal/jdbc/JdbcStatement.java
+++ b/modules/jdbc/src/main/java/org/apache/ignite/internal/jdbc/JdbcStatement.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.jdbc;
import static java.sql.ResultSet.CONCUR_READ_ONLY;
import static java.sql.ResultSet.FETCH_FORWARD;
import static java.sql.ResultSet.TYPE_FORWARD_ONLY;
+import static org.apache.ignite.internal.jdbc.JdbcResultSet.createTransformer;
import static org.apache.ignite.internal.util.ArrayUtils.INT_EMPTY_ARRAY;
import java.sql.BatchUpdateException;
@@ -43,12 +44,11 @@ import org.apache.ignite.internal.jdbc.proto.SqlStateCode;
import org.apache.ignite.internal.jdbc.proto.event.JdbcBatchExecuteRequest;
import org.apache.ignite.internal.jdbc.proto.event.JdbcBatchExecuteResult;
import org.apache.ignite.internal.jdbc.proto.event.JdbcQueryExecuteRequest;
-import org.apache.ignite.internal.jdbc.proto.event.JdbcQueryExecuteResult;
import org.apache.ignite.internal.jdbc.proto.event.JdbcQuerySingleResult;
-import org.apache.ignite.internal.jdbc.proto.event.Response;
import org.apache.ignite.internal.util.ArrayUtils;
import org.apache.ignite.internal.util.CollectionUtils;
import org.apache.ignite.sql.ColumnType;
+import org.jetbrains.annotations.Nullable;
/**
* Jdbc statement implementation.
@@ -78,8 +78,8 @@ public class JdbcStatement implements Statement {
/** Fetch size. */
private int pageSize = DFLT_PAGE_SIZE;
- /** Result sets. */
- private volatile List<JdbcResultSet> resSets;
+ /** Result sets. {@code null} represents final result set (no more results are available). */
+ private volatile List<@Nullable JdbcResultSet> resSets;
/** Batch. */
private List<String> batch;
@@ -108,7 +108,7 @@ public class JdbcStatement implements Statement {
/** {@inheritDoc} */
@Override
public ResultSet executeQuery(String sql) throws SQLException {
- execute0(JdbcStatementType.SELECT_STATEMENT_TYPE, Objects.requireNonNull(sql), ArrayUtils.OBJECT_EMPTY_ARRAY);
+ execute0(JdbcStatementType.SELECT_STATEMENT_TYPE, Objects.requireNonNull(sql), false, ArrayUtils.OBJECT_EMPTY_ARRAY);
ResultSet rs = getResultSet();
@@ -124,9 +124,10 @@ public class JdbcStatement implements Statement {
*
* @param sql Sql query.
* @param args Query parameters.
+ * @param multiStatement Multiple statement flag.
* @throws SQLException Onj error.
*/
- protected void execute0(JdbcStatementType stmtType, String sql, Object[] args) throws SQLException {
+ void execute0(JdbcStatementType stmtType, String sql, boolean multiStatement, Object[] args) throws SQLException {
ensureNotClosed();
closeResults();
@@ -135,11 +136,12 @@ public class JdbcStatement implements Statement {
throw new SQLException("SQL query is empty.");
}
- JdbcQueryExecuteRequest req = new JdbcQueryExecuteRequest(stmtType, schema, pageSize, maxRows, sql, args, conn.getAutoCommit());
+ JdbcQueryExecuteRequest req = new JdbcQueryExecuteRequest(stmtType, schema, pageSize, maxRows, sql, args,
+ conn.getAutoCommit(), multiStatement);
- Response res;
+ JdbcQueryExecuteResponse res;
try {
- res = conn.handler().queryAsync(conn.connectionId(), req).get();
+ res = (JdbcQueryExecuteResponse) conn.handler().queryAsync(conn.connectionId(), req).get();
} catch (InterruptedException e) {
throw new SQLException("Thread was interrupted.", e);
} catch (ExecutionException e) {
@@ -148,62 +150,35 @@ public class JdbcStatement implements Statement {
throw new SQLException("Query execution canceled.", SqlStateCode.QUERY_CANCELLED, e);
}
- if (!res.hasResults()) {
+ if (!res.hasResult()) {
throw IgniteQueryErrorCode.createJdbcSqlException(res.err(), res.status());
}
- JdbcQueryExecuteResponse result = (JdbcQueryExecuteResponse) res;
+ JdbcQuerySingleResult executeResult = res.result();
- JdbcQueryExecuteResult executeResult = result.result();
-
- for (JdbcQuerySingleResult jdbcRes : executeResult.results()) {
- if (!jdbcRes.hasResults()) {
- throw IgniteQueryErrorCode.createJdbcSqlException(jdbcRes.err(), jdbcRes.status());
- }
+ if (!executeResult.resultAvailable()) {
+ throw IgniteQueryErrorCode.createJdbcSqlException(executeResult.err(), executeResult.status());
}
- resSets = new ArrayList<>(executeResult.results().size());
-
- JdbcQueryCursorHandler handler = new JdbcClientQueryCursorHandler(result.getChannel());
+ resSets = new ArrayList<>();
- for (JdbcQuerySingleResult jdbcRes : executeResult.results()) {
- List<ColumnType> columnTypes = jdbcRes.columnTypes();
- int[] decimalScales = jdbcRes.decimalScales();
+ JdbcQueryCursorHandler handler = new JdbcClientQueryCursorHandler(res.getChannel());
- Function<BinaryTupleReader, List<Object>> transformer = createTransformer(columnTypes, decimalScales);
+ List<ColumnType> columnTypes = executeResult.columnTypes();
+ columnTypes = columnTypes == null ? List.of() : columnTypes;
+ int[] decimalScales = executeResult.decimalScales();
- resSets.add(new JdbcResultSet(handler, this, jdbcRes.cursorId(), pageSize,
- jdbcRes.last(), jdbcRes.items(), jdbcRes.isQuery(), false, jdbcRes.updateCount(),
- closeOnCompletion, columnTypes.size(), transformer));
- }
-
- assert !resSets.isEmpty() : "At least one results set is expected";
- }
-
- private static Function<BinaryTupleReader, List<Object>> createTransformer(List<ColumnType> columnTypes, int[] decimalScales) {
- return (tuple) -> {
- int columnCount = columnTypes.size();
- List<Object> row = new ArrayList<>(columnCount);
- int decimalIdx = 0;
- int currentDecimalScale = -1;
-
- for (int colIdx = 0; colIdx < columnCount; colIdx++) {
- ColumnType type = columnTypes.get(colIdx);
- if (type == ColumnType.DECIMAL) {
- currentDecimalScale = decimalScales[decimalIdx++];
- }
-
- row.add(JdbcConverterUtils.deriveValueFromBinaryTuple(type, tuple, colIdx, currentDecimalScale));
- }
+ Function<BinaryTupleReader, List<Object>> transformer = createTransformer(columnTypes, decimalScales);
- return row;
- };
+ resSets.add(new JdbcResultSet(handler, this, executeResult.cursorId(), pageSize,
+ executeResult.last(), executeResult.items(), executeResult.isQuery(), executeResult.updateCount(),
+ closeOnCompletion, columnTypes.size(), transformer));
}
/** {@inheritDoc} */
@Override
public int executeUpdate(String sql) throws SQLException {
- execute0(JdbcStatementType.UPDATE_STATEMENT_TYPE, Objects.requireNonNull(sql), ArrayUtils.OBJECT_EMPTY_ARRAY);
+ execute0(JdbcStatementType.UPDATE_STATEMENT_TYPE, Objects.requireNonNull(sql), false, ArrayUtils.OBJECT_EMPTY_ARRAY);
int res = getUpdateCount();
@@ -367,7 +342,7 @@ public class JdbcStatement implements Statement {
public boolean execute(String sql) throws SQLException {
ensureNotClosed();
- execute0(JdbcStatementType.ANY_STATEMENT_TYPE, Objects.requireNonNull(sql), ArrayUtils.OBJECT_EMPTY_ARRAY);
+ execute0(JdbcStatementType.ANY_STATEMENT_TYPE, Objects.requireNonNull(sql), true, ArrayUtils.OBJECT_EMPTY_ARRAY);
return isQuery();
}
@@ -415,16 +390,16 @@ public class JdbcStatement implements Statement {
/** {@inheritDoc} */
@Override
- public ResultSet getResultSet() throws SQLException {
+ public @Nullable ResultSet getResultSet() throws SQLException {
ensureNotClosed();
if (resSets == null || curRes >= resSets.size()) {
return null;
}
- JdbcResultSet rs = resSets.get(curRes);
+ @Nullable JdbcResultSet rs = resSets.get(curRes);
- if (!rs.isQuery()) {
+ if (rs == null || !rs.isQuery()) {
return null;
}
@@ -440,9 +415,9 @@ public class JdbcStatement implements Statement {
return -1;
}
- JdbcResultSet rs = resSets.get(curRes);
+ @Nullable JdbcResultSet rs = resSets.get(curRes);
- if (rs.isQuery()) {
+ if (rs == null || rs.isQuery()) {
return -1;
}
@@ -457,24 +432,14 @@ public class JdbcStatement implements Statement {
/** {@inheritDoc} */
@Override
- public boolean getMoreResults(int curr) throws SQLException {
+ public boolean getMoreResults(int current) throws SQLException {
ensureNotClosed();
- if (resSets == null || curRes >= resSets.size()) {
- return false;
- }
-
- curRes++;
-
if (resSets != null) {
assert curRes <= resSets.size() : "Invalid results state: [resultsCount=" + resSets.size() + ", curRes=" + curRes + ']';
- switch (curr) {
+ switch (current) {
case CLOSE_CURRENT_RESULT:
- if (curRes > 0) {
- resSets.get(curRes - 1).close0();
- }
-
break;
case CLOSE_ALL_RESULTS:
@@ -486,7 +451,38 @@ public class JdbcStatement implements Statement {
}
}
- return (resSets != null && curRes < resSets.size());
+ // No more results are available if last result set is null
+ if (resSets == null || curRes >= resSets.size() || resSets.get(curRes) == null) {
+ return false;
+ }
+
+ JdbcResultSet nextResultSet;
+ SQLException exceptionally = null;
+
+ try {
+ // just a stub if exception is raised inside multiple statements.
+ // all further execution is not processed.
+ nextResultSet = resSets.get(curRes).getNextResultSet();
+ } catch (SQLException ex) {
+ nextResultSet = null;
+ exceptionally = ex;
+ }
+
+ resSets.add(nextResultSet);
+
+ curRes++;
+
+ // all previous results need to be closed at this point.
+ if (nextResultSet == null && isCloseOnCompletion()) {
+ close();
+ return false;
+ }
+
+ if (exceptionally != null) {
+ throw exceptionally;
+ }
+
+ return nextResultSet != null && nextResultSet.holdResults();
}
/** {@inheritDoc} */
@@ -657,7 +653,9 @@ public class JdbcStatement implements Statement {
if (resSets != null) {
for (JdbcResultSet rs : resSets) {
- rs.closeStatement(true);
+ if (rs != null) {
+ rs.closeStatement(true);
+ }
}
}
}
@@ -692,7 +690,7 @@ public class JdbcStatement implements Statement {
* @return isQuery flag.
*/
protected boolean isQuery() {
- return resSets.get(0).isQuery();
+ return Objects.requireNonNull(resSets).get(0).isQuery();
}
/**
@@ -711,10 +709,25 @@ public class JdbcStatement implements Statement {
*
* @throws SQLException On error.
*/
- protected void closeResults() throws SQLException {
+ void closeResults() throws SQLException {
+ @Nullable JdbcResultSet last = null;
+
if (resSets != null) {
- for (JdbcResultSet rs : resSets) {
- rs.close0();
+ JdbcResultSet lastRs = resSets.get(resSets.size() - 1);
+ boolean allFetched = lastRs == null || (lastRs.isClosed() && !lastRs.holdsResources());
+
+ if (allFetched) {
+ for (JdbcResultSet rs : resSets) {
+ if (rs != null) {
+ rs.close0(true);
+ }
+ }
+ } else {
+ last = lastRs.getNextResultSet();
+
+ while (last != null) {
+ last = last.getNextResultSet();
+ }
}
resSets = null;
@@ -736,8 +749,9 @@ public class JdbcStatement implements Statement {
if (resSets != null) {
for (JdbcResultSet rs : resSets) {
- if (!rs.isClosed()) {
+ if (rs != null && !rs.isClosed()) {
allRsClosed = false;
+ break;
}
}
}
diff --git a/modules/jdbc/src/test/java/org/apache/ignite/internal/jdbc/JdbcResultSetTest.java b/modules/jdbc/src/test/java/org/apache/ignite/internal/jdbc/JdbcResultSetTest.java
new file mode 100644
index 0000000000..a8637c397b
--- /dev/null
+++ b/modules/jdbc/src/test/java/org/apache/ignite/internal/jdbc/JdbcResultSetTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.jdbc;
+
+import static org.apache.ignite.internal.jdbc.proto.IgniteQueryErrorCode.codeToSqlState;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.sql.SQLException;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.jdbc.proto.JdbcQueryCursorHandler;
+import org.apache.ignite.internal.jdbc.proto.event.JdbcQueryCloseRequest;
+import org.apache.ignite.internal.jdbc.proto.event.JdbcQueryCloseResult;
+import org.apache.ignite.internal.jdbc.proto.event.JdbcQuerySingleResult;
+import org.apache.ignite.internal.jdbc.proto.event.Response;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+/** Unit test for JdbcResultSet. */
+@ExtendWith(MockitoExtension.class)
+public class JdbcResultSetTest extends BaseIgniteAbstractTest {
+ @Test
+ public void getNextResultSetTest() throws SQLException {
+ String errorStr = "Failed to fetch query results";
+
+ JdbcQueryCursorHandler handler = mock(JdbcQueryCursorHandler.class);
+ JdbcStatement stmt = mock(JdbcStatement.class);
+
+ JdbcResultSet rs = spy(new JdbcResultSet(handler, stmt, 1L, 1, true, List.of(), true, 0, false, 1, null));
+
+ when(handler.getMoreResultsAsync(any())).thenReturn(CompletableFuture.completedFuture(
+ new JdbcQuerySingleResult(Response.STATUS_FAILED, errorStr)));
+
+ JdbcQueryCloseResult closeRes = mock(JdbcQueryCloseResult.class);
+
+ when(handler.closeAsync(any())).thenReturn(CompletableFuture.completedFuture(closeRes));
+ when(closeRes.hasResults()).thenReturn(true);
+
+ SQLException ex = assertThrows(SQLException.class, rs::getNextResultSet);
+
+ String actualMessage = ex.getMessage();
+
+ assertEquals(errorStr, actualMessage);
+ assertEquals(codeToSqlState(Response.STATUS_FAILED), ex.getSQLState());
+
+ verify(rs).close0(anyBoolean());
+ }
+
+ @Test
+ public void checkClose() throws SQLException {
+ JdbcQueryCursorHandler handler = mock(JdbcQueryCursorHandler.class);
+ JdbcStatement stmt = mock(JdbcStatement.class);
+
+ JdbcResultSet rs = spy(new JdbcResultSet(handler, stmt, 1L, 1, true, List.of(), true, 0, false, 1, null));
+
+ JdbcQueryCloseResult closeRequest = mock(JdbcQueryCloseResult.class);
+
+ when(closeRequest.hasResults()).thenReturn(true);
+
+ when(handler.closeAsync(any())).thenReturn(CompletableFuture.completedFuture(closeRequest));
+
+ rs.close();
+
+ ArgumentCaptor<JdbcQueryCloseRequest> argument = ArgumentCaptor.forClass(JdbcQueryCloseRequest.class);
+
+ verify(handler).closeAsync(argument.capture());
+
+ assertFalse(argument.getValue().removeFromResources());
+ }
+}