You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/10/09 10:42:26 UTC
[10/25] ignite git commit: IGNITE-6358: JDBC thick: support multiple
statements. This closes #2777.
IGNITE-6358: JDBC thick: support multiple statements. This closes #2777.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/df3c407f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/df3c407f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/df3c407f
Branch: refs/heads/ignite-3478
Commit: df3c407f8c40d4dcd603ee35215199cd1d60c38a
Parents: c116bfc
Author: tledkov-gridgain <tl...@gridgain.com>
Authored: Thu Oct 5 16:32:33 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Thu Oct 5 16:32:33 2017 +0300
----------------------------------------------------------------------
.../internal/jdbc2/JdbcStatementSelfTest.java | 130 ++++++++-
.../org/apache/ignite/IgniteJdbcDriver.java | 9 +-
.../ignite/internal/jdbc2/JdbcConnection.java | 13 +
.../internal/jdbc2/JdbcDatabaseMetadata.java | 54 ++--
.../jdbc2/JdbcQueryMultipleStatementsTask.java | 167 ++++++++++++
.../ignite/internal/jdbc2/JdbcQueryTask.java | 154 +++--------
.../internal/jdbc2/JdbcQueryTaskResult.java | 120 +++++++++
.../ignite/internal/jdbc2/JdbcQueryTaskV3.java | 94 +++++++
.../ignite/internal/jdbc2/JdbcResultSet.java | 175 +++++++++---
.../ignite/internal/jdbc2/JdbcStatement.java | 270 ++++++++++---------
.../internal/jdbc2/JdbcStatementResultInfo.java | 73 +++++
.../jdbc2/JdbcStreamedPreparedStatement.java | 19 +-
12 files changed, 966 insertions(+), 312 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/df3c407f/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcStatementSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcStatementSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcStatementSelfTest.java
index 138eef5..d3f77e0 100644
--- a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcStatementSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcStatementSelfTest.java
@@ -45,7 +45,8 @@ public class JdbcStatementSelfTest extends GridCommonAbstractTest {
private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
/** JDBC URL. */
- private static final String BASE_URL = CFG_URL_PREFIX + "cache=default@modules/clients/src/test/config/jdbc-config.xml";
+ private static final String BASE_URL = CFG_URL_PREFIX
+ + "cache=default:multipleStatementsAllowed=true@modules/clients/src/test/config/jdbc-config.xml";
/** SQL query. */
private static final String SQL = "select * from Person where age > 30";
@@ -250,6 +251,133 @@ public class JdbcStatementSelfTest extends GridCommonAbstractTest {
}
/**
+ * @throws Exception If failed.
+ */
+ public void testExecuteQueryMultipleOnlyResultSets() throws Exception {
+ assert conn.getMetaData().supportsMultipleResultSets();
+
+ int stmtCnt = 10;
+
+ StringBuilder sql = new StringBuilder();
+
+ for (int i = 0; i < stmtCnt; ++i)
+ sql.append("select ").append(i).append("; ");
+
+ assert stmt.execute(sql.toString());
+
+ for (int i = 0; i < stmtCnt; ++i) {
+ assert stmt.getMoreResults();
+
+ ResultSet rs = stmt.getResultSet();
+
+ assert rs.next();
+ assert rs.getInt(1) == i;
+ assert !rs.next();
+ }
+
+ assert !stmt.getMoreResults();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testExecuteQueryMultipleOnlyDml() throws Exception {
+ assert conn.getMetaData().supportsMultipleResultSets();
+
+ conn.setSchema(null);
+
+ int stmtCnt = 10;
+
+ StringBuilder sql = new StringBuilder(
+ "drop table if exists test; create table test(ID int primary key, NAME varchar(20)); ");
+
+ for (int i = 0; i < stmtCnt; ++i)
+ sql.append("insert into test (ID, NAME) values (" + i + ", 'name_" + i +"'); ");
+
+ assert !stmt.execute(sql.toString());
+
+ // DROP TABLE statement
+ assert stmt.getResultSet() == null;
+ assert stmt.getUpdateCount() == 0;
+
+ // CREATE TABLE statement
+ assert stmt.getResultSet() == null;
+ assert stmt.getUpdateCount() == 0;
+
+ for (int i = 0; i < stmtCnt; ++i) {
+ assert stmt.getMoreResults();
+
+ assert stmt.getResultSet() == null;
+ assert stmt.getUpdateCount() == 1;
+ }
+
+ assert !stmt.getMoreResults();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testExecuteQueryMultipleMixed() throws Exception {
+ assert conn.getMetaData().supportsMultipleResultSets();
+
+ conn.setSchema(null);
+
+ int stmtCnt = 10;
+
+ StringBuilder sql = new StringBuilder(
+ "drop table if exists test; create table test(ID int primary key, NAME varchar(20)); ");
+
+ for (int i = 0; i < stmtCnt; ++i) {
+ if (i % 2 == 0)
+ sql.append(" insert into test (ID, NAME) values (" + i + ", 'name_" + i + "'); ");
+ else
+ sql.append(" select * from test where id < " + i + "; ");
+ }
+
+ assert !stmt.execute(sql.toString());
+
+ // DROP TABLE statement
+ assert stmt.getResultSet() == null;
+ assert stmt.getUpdateCount() == 0;
+
+ // CREATE TABLE statement
+ assert stmt.getResultSet() == null;
+ assert stmt.getUpdateCount() == 0;
+
+ boolean notEmptyResult = false;
+
+ for (int i = 0; i < stmtCnt; ++i) {
+ assert stmt.getMoreResults();
+
+ if (i % 2 == 0) {
+ assert stmt.getResultSet() == null;
+ assert stmt.getUpdateCount() == 1;
+ }
+ else {
+ assert stmt.getUpdateCount() == -1;
+
+ ResultSet rs = stmt.getResultSet();
+
+ assert rs.getMetaData().getColumnCount() == 2;
+
+ int rowsCnt = 0;
+
+ while(rs.next())
+ rowsCnt++;
+
+ assert rowsCnt <= (i + 1) / 2;
+
+ if (rowsCnt == (i + 1) / 2)
+ notEmptyResult = true;
+ }
+ }
+
+ assert notEmptyResult;
+
+ assert !stmt.getMoreResults();
+ }
+
+ /**
* Person.
*/
@SuppressWarnings("UnusedDeclaration")
http://git-wip-us.apache.org/repos/asf/ignite/blob/df3c407f/modules/core/src/main/java/org/apache/ignite/IgniteJdbcDriver.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteJdbcDriver.java b/modules/core/src/main/java/org/apache/ignite/IgniteJdbcDriver.java
index f519589..b03e387 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteJdbcDriver.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteJdbcDriver.java
@@ -331,6 +331,9 @@ public class IgniteJdbcDriver implements Driver {
/** Whether DML streaming will overwrite existing cache entries. */
private static final String PARAM_STREAMING_ALLOW_OVERWRITE = "streamingAllowOverwrite";
+ /** Allow queries with multiple statements. */
+ private static final String PARAM_MULTIPLE_STMTS = "multipleStatementsAllowed";
+
/** Hostname property name. */
public static final String PROP_HOST = PROP_PREFIX + "host";
@@ -376,6 +379,9 @@ public class IgniteJdbcDriver implements Driver {
/** Whether DML streaming will overwrite existing cache entries. */
public static final String PROP_STREAMING_ALLOW_OVERWRITE = PROP_PREFIX + PARAM_STREAMING_ALLOW_OVERWRITE;
+ /** Allow query with multiple statements. */
+ public static final String PROP_MULTIPLE_STMTS = PROP_PREFIX + PARAM_MULTIPLE_STMTS;
+
/** Cache name property name. */
public static final String PROP_CFG = PROP_PREFIX + "cfg";
@@ -447,7 +453,8 @@ public class IgniteJdbcDriver implements Driver {
new JdbcDriverPropertyInfo("Distributed Joins", info.getProperty(PROP_DISTRIBUTED_JOINS), ""),
new JdbcDriverPropertyInfo("Enforce Join Order", info.getProperty(JdbcThinUtils.PROP_ENFORCE_JOIN_ORDER), ""),
new JdbcDriverPropertyInfo("Lazy query execution", info.getProperty(JdbcThinUtils.PROP_LAZY), ""),
- new JdbcDriverPropertyInfo("Transactions Allowed", info.getProperty(PROP_TX_ALLOWED), "")
+ new JdbcDriverPropertyInfo("Transactions Allowed", info.getProperty(PROP_TX_ALLOWED), ""),
+ new JdbcDriverPropertyInfo("Queries with multiple statements allowed", info.getProperty(PROP_MULTIPLE_STMTS), "")
);
if (info.getProperty(PROP_CFG) != null)
http://git-wip-us.apache.org/repos/asf/ignite/blob/df3c407f/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java
index fde16ff..ccc09ec 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java
@@ -80,6 +80,7 @@ import static org.apache.ignite.IgniteJdbcDriver.PROP_DISTRIBUTED_JOINS;
import static org.apache.ignite.IgniteJdbcDriver.PROP_ENFORCE_JOIN_ORDER;
import static org.apache.ignite.IgniteJdbcDriver.PROP_LAZY;
import static org.apache.ignite.IgniteJdbcDriver.PROP_LOCAL;
+import static org.apache.ignite.IgniteJdbcDriver.PROP_MULTIPLE_STMTS;
import static org.apache.ignite.IgniteJdbcDriver.PROP_NODE_ID;
import static org.apache.ignite.IgniteJdbcDriver.PROP_TX_ALLOWED;
import static org.apache.ignite.IgniteJdbcDriver.PROP_STREAMING;
@@ -164,6 +165,9 @@ public class JdbcConnection implements Connection {
/** Allow overwrites for duplicate keys on streamed {@code INSERT}s. */
private final boolean streamAllowOverwrite;
+ /** Allow queries with multiple statements. */
+ private final boolean multipleStmts;
+
/** Statements. */
final Set<JdbcStatement> statements = new HashSet<>();
@@ -204,6 +208,8 @@ public class JdbcConnection implements Connection {
// by IgniteDataStreamer.DFLT_PARALLEL_OPS_MULTIPLIER will be used
streamNodeParOps = Integer.parseInt(props.getProperty(PROP_STREAMING_PER_NODE_PAR_OPS, "0"));
+ multipleStmts = Boolean.parseBoolean(props.getProperty(PROP_MULTIPLE_STMTS));
+
String nodeIdProp = props.getProperty(PROP_NODE_ID);
if (nodeIdProp != null)
@@ -841,6 +847,13 @@ public class JdbcConnection implements Connection {
}
/**
+ * @return {@code true} if multiple statements allowed, {@code false} otherwise.
+ */
+ boolean isMultipleStatementsAllowed() {
+ return multipleStmts;
+ }
+
+ /**
* @return Local query flag.
*/
boolean isLocalQuery() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/df3c407f/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcDatabaseMetadata.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcDatabaseMetadata.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcDatabaseMetadata.java
index 03fde79..2fe24bb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcDatabaseMetadata.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcDatabaseMetadata.java
@@ -319,7 +319,7 @@ public class JdbcDatabaseMetadata implements DatabaseMetaData {
/** {@inheritDoc} */
@Override public boolean supportsMultipleResultSets() {
- return false;
+ return conn.isMultipleStatementsAllowed();
}
/** {@inheritDoc} */
@@ -675,7 +675,7 @@ public class JdbcDatabaseMetadata implements DatabaseMetaData {
/** {@inheritDoc} */
@Override public ResultSet getProcedures(String catalog, String schemaPtrn,
String procedureNamePtrn) throws SQLException {
- return new JdbcResultSet(null,
+ return new JdbcResultSet(true, null,
conn.createStatement0(),
Collections.<String>emptyList(),
Arrays.asList("PROCEDURE_CAT", "PROCEDURE_SCHEM", "PROCEDURE_NAME",
@@ -689,7 +689,7 @@ public class JdbcDatabaseMetadata implements DatabaseMetaData {
/** {@inheritDoc} */
@Override public ResultSet getProcedureColumns(String catalog, String schemaPtrn, String procedureNamePtrn,
String colNamePtrn) throws SQLException {
- return new JdbcResultSet(null,
+ return new JdbcResultSet(true, null,
conn.createStatement0(),
Collections.<String>emptyList(),
Arrays.asList("PROCEDURE_CAT", "PROCEDURE_SCHEM", "PROCEDURE_NAME",
@@ -725,7 +725,7 @@ public class JdbcDatabaseMetadata implements DatabaseMetaData {
}
}
- return new JdbcResultSet(null,
+ return new JdbcResultSet(true, null,
conn.createStatement0(),
Collections.<String>emptyList(),
Arrays.asList("TABLE_CAT", "TABLE_SCHEM", "TABLE_NAME", "TABLE_TYPE", "REMARKS", "TYPE_CAT",
@@ -766,7 +766,7 @@ public class JdbcDatabaseMetadata implements DatabaseMetaData {
/** {@inheritDoc} */
@Override public ResultSet getCatalogs() throws SQLException {
- return new JdbcResultSet(null,
+ return new JdbcResultSet(true, null,
conn.createStatement0(),
Collections.<String>emptyList(),
Collections.singletonList("TABLE_CAT"),
@@ -778,7 +778,7 @@ public class JdbcDatabaseMetadata implements DatabaseMetaData {
/** {@inheritDoc} */
@Override public ResultSet getTableTypes() throws SQLException {
- return new JdbcResultSet(null,
+ return new JdbcResultSet(true, null,
conn.createStatement0(),
Collections.<String>emptyList(),
Collections.singletonList("TABLE_TYPE"),
@@ -812,7 +812,7 @@ public class JdbcDatabaseMetadata implements DatabaseMetaData {
}
}
- return new JdbcResultSet(null,
+ return new JdbcResultSet(true, null,
conn.createStatement0(),
Collections.<String>emptyList(),
Arrays.asList("TABLE_CAT", "TABLE_SCHEM", "TABLE_NAME", "COLUMN_NAME", "DATA_TYPE",
@@ -870,7 +870,7 @@ public class JdbcDatabaseMetadata implements DatabaseMetaData {
/** {@inheritDoc} */
@Override public ResultSet getColumnPrivileges(String catalog, String schema, String tbl,
String colNamePtrn) throws SQLException {
- return new JdbcResultSet(null,
+ return new JdbcResultSet(true, null,
conn.createStatement0(),
Collections.<String>emptyList(),
Collections.<String>emptyList(),
@@ -883,7 +883,7 @@ public class JdbcDatabaseMetadata implements DatabaseMetaData {
/** {@inheritDoc} */
@Override public ResultSet getTablePrivileges(String catalog, String schemaPtrn,
String tblNamePtrn) throws SQLException {
- return new JdbcResultSet(null,
+ return new JdbcResultSet(true, null,
conn.createStatement0(),
Collections.<String>emptyList(),
Collections.<String>emptyList(),
@@ -896,7 +896,7 @@ public class JdbcDatabaseMetadata implements DatabaseMetaData {
/** {@inheritDoc} */
@Override public ResultSet getBestRowIdentifier(String catalog, String schema, String tbl, int scope,
boolean nullable) throws SQLException {
- return new JdbcResultSet(null,
+ return new JdbcResultSet(true, null,
conn.createStatement0(),
Collections.<String>emptyList(),
Collections.<String>emptyList(),
@@ -908,7 +908,7 @@ public class JdbcDatabaseMetadata implements DatabaseMetaData {
/** {@inheritDoc} */
@Override public ResultSet getVersionColumns(String catalog, String schema, String tbl) throws SQLException {
- return new JdbcResultSet(null,
+ return new JdbcResultSet(true, null,
conn.createStatement0(),
Collections.<String>emptyList(),
Collections.<String>emptyList(),
@@ -936,7 +936,7 @@ public class JdbcDatabaseMetadata implements DatabaseMetaData {
}
}
- return new JdbcResultSet(null,
+ return new JdbcResultSet(true, null,
conn.createStatement0(),
Collections.<String>emptyList(),
Arrays.asList("TABLE_CAT", "TABLE_SCHEM", "TABLE_NAME", "COLUMN_NAME", "KEY_SEQ", "PK_NAME"),
@@ -948,7 +948,7 @@ public class JdbcDatabaseMetadata implements DatabaseMetaData {
/** {@inheritDoc} */
@Override public ResultSet getImportedKeys(String catalog, String schema, String tbl) throws SQLException {
- return new JdbcResultSet(null,
+ return new JdbcResultSet(true, null,
conn.createStatement0(),
Collections.<String>emptyList(),
Collections.<String>emptyList(),
@@ -960,7 +960,7 @@ public class JdbcDatabaseMetadata implements DatabaseMetaData {
/** {@inheritDoc} */
@Override public ResultSet getExportedKeys(String catalog, String schema, String tbl) throws SQLException {
- return new JdbcResultSet(null,
+ return new JdbcResultSet(true, null,
conn.createStatement0(),
Collections.<String>emptyList(),
Collections.<String>emptyList(),
@@ -973,7 +973,7 @@ public class JdbcDatabaseMetadata implements DatabaseMetaData {
/** {@inheritDoc} */
@Override public ResultSet getCrossReference(String parentCatalog, String parentSchema, String parentTbl,
String foreignCatalog, String foreignSchema, String foreignTbl) throws SQLException {
- return new JdbcResultSet(null,
+ return new JdbcResultSet(true, null,
conn.createStatement0(),
Collections.<String>emptyList(),
Collections.<String>emptyList(),
@@ -985,7 +985,7 @@ public class JdbcDatabaseMetadata implements DatabaseMetaData {
/** {@inheritDoc} */
@Override public ResultSet getTypeInfo() throws SQLException {
- return new JdbcResultSet(null,
+ return new JdbcResultSet(true, null,
conn.createStatement0(),
Collections.<String>emptyList(),
Collections.<String>emptyList(),
@@ -1000,7 +1000,7 @@ public class JdbcDatabaseMetadata implements DatabaseMetaData {
boolean approximate) throws SQLException {
updateMetaData();
- Collection<List<?>> rows = new ArrayList<>(indexes.size());
+ List<List<?>> rows = new ArrayList<>(indexes.size());
if (validCatalogPattern(catalog)) {
for (List<Object> idx : indexes) {
@@ -1029,7 +1029,7 @@ public class JdbcDatabaseMetadata implements DatabaseMetaData {
}
}
- return new JdbcResultSet(null,
+ return new JdbcResultSet(true, null,
conn.createStatement0(),
Collections.<String>emptyList(),
Arrays.asList("TABLE_CAT", "TABLE_SCHEM", "TABLE_NAME", "NON_UNIQUE", "INDEX_QUALIFIER",
@@ -1106,7 +1106,7 @@ public class JdbcDatabaseMetadata implements DatabaseMetaData {
/** {@inheritDoc} */
@Override public ResultSet getUDTs(String catalog, String schemaPtrn, String typeNamePtrn,
int[] types) throws SQLException {
- return new JdbcResultSet(null,
+ return new JdbcResultSet(true, null,
conn.createStatement0(),
Collections.<String>emptyList(),
Collections.<String>emptyList(),
@@ -1144,7 +1144,7 @@ public class JdbcDatabaseMetadata implements DatabaseMetaData {
/** {@inheritDoc} */
@Override public ResultSet getSuperTypes(String catalog, String schemaPtrn,
String typeNamePtrn) throws SQLException {
- return new JdbcResultSet(null,
+ return new JdbcResultSet(true, null,
conn.createStatement0(),
Collections.<String>emptyList(),
Collections.<String>emptyList(),
@@ -1157,7 +1157,7 @@ public class JdbcDatabaseMetadata implements DatabaseMetaData {
/** {@inheritDoc} */
@Override public ResultSet getSuperTables(String catalog, String schemaPtrn,
String tblNamePtrn) throws SQLException {
- return new JdbcResultSet(null,
+ return new JdbcResultSet(true, null,
conn.createStatement0(),
Collections.<String>emptyList(),
Collections.<String>emptyList(),
@@ -1170,7 +1170,7 @@ public class JdbcDatabaseMetadata implements DatabaseMetaData {
/** {@inheritDoc} */
@Override public ResultSet getAttributes(String catalog, String schemaPtrn, String typeNamePtrn,
String attributeNamePtrn) throws SQLException {
- return new JdbcResultSet(null,
+ return new JdbcResultSet(true, null,
conn.createStatement0(),
Collections.<String>emptyList(),
Collections.<String>emptyList(),
@@ -1233,7 +1233,7 @@ public class JdbcDatabaseMetadata implements DatabaseMetaData {
}
}
- return new JdbcResultSet(null,
+ return new JdbcResultSet(true, null,
conn.createStatement0(),
Collections.<String>emptyList(),
Arrays.asList("TABLE_SCHEM", "TABLE_CATALOG"),
@@ -1259,7 +1259,7 @@ public class JdbcDatabaseMetadata implements DatabaseMetaData {
/** {@inheritDoc} */
@Override public ResultSet getClientInfoProperties() throws SQLException {
- return new JdbcResultSet(null,
+ return new JdbcResultSet(true, null,
conn.createStatement0(),
Collections.<String>emptyList(),
Collections.<String>emptyList(),
@@ -1272,7 +1272,7 @@ public class JdbcDatabaseMetadata implements DatabaseMetaData {
/** {@inheritDoc} */
@Override public ResultSet getFunctions(String catalog, String schemaPtrn,
String functionNamePtrn) throws SQLException {
- return new JdbcResultSet(null,
+ return new JdbcResultSet(true, null,
conn.createStatement0(),
Collections.<String>emptyList(),
Arrays.asList("FUNCTION_CAT", "FUNCTION_SCHEM", "FUNCTION_NAME",
@@ -1286,7 +1286,7 @@ public class JdbcDatabaseMetadata implements DatabaseMetaData {
/** {@inheritDoc} */
@Override public ResultSet getFunctionColumns(String catalog, String schemaPtrn, String functionNamePtrn,
String colNamePtrn) throws SQLException {
- return new JdbcResultSet(null,
+ return new JdbcResultSet(true, null,
conn.createStatement0(),
Collections.<String>emptyList(),
Arrays.asList("FUNCTION_CAT", "FUNCTION_SCHEM", "FUNCTION_NAME",
@@ -1305,7 +1305,7 @@ public class JdbcDatabaseMetadata implements DatabaseMetaData {
/** {@inheritDoc} */
@Override public ResultSet getPseudoColumns(String catalog, String schemaPtrn, String tblNamePtrn,
String colNamePtrn) throws SQLException {
- return new JdbcResultSet(null,
+ return new JdbcResultSet(true, null,
conn.createStatement0(),
Collections.<String>emptyList(),
Collections.<String>emptyList(),
http://git-wip-us.apache.org/repos/asf/ignite/blob/df3c407f/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcQueryMultipleStatementsTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcQueryMultipleStatementsTask.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcQueryMultipleStatementsTask.java
new file mode 100644
index 0000000..bf7c24e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcQueryMultipleStatementsTask.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.jdbc2;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteJdbcDriver;
+import org.apache.ignite.cache.query.FieldsQueryCursor;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.resources.IgniteInstanceResource;
+
+/**
+ * Task for SQL queries execution through {@link IgniteJdbcDriver}.
+ * The query can contains several SQL statements.
+ */
+class JdbcQueryMultipleStatementsTask implements IgniteCallable<List<JdbcStatementResultInfo>> {
+ /** Serial version uid. */
+ private static final long serialVersionUID = 0L;
+
+ /** Ignite. */
+ @IgniteInstanceResource
+ private Ignite ignite;
+
+ /** Schema name. */
+ private final String schemaName;
+
+ /** Sql. */
+ private final String sql;
+
+ /** Operation type flag - query or not. */
+ private Boolean isQry;
+
+ /** Args. */
+ private final Object[] args;
+
+ /** Fetch size. */
+ private final int fetchSize;
+
+ /** Local execution flag. */
+ private final boolean loc;
+
+ /** Local query flag. */
+ private final boolean locQry;
+
+ /** Collocated query flag. */
+ private final boolean collocatedQry;
+
+ /** Distributed joins flag. */
+ private final boolean distributedJoins;
+
+ /** Enforce join order flag. */
+ private final boolean enforceJoinOrder;
+
+ /** Lazy query execution flag. */
+ private final boolean lazy;
+
+ /**
+ * @param ignite Ignite.
+ * @param schemaName Schema name.
+ * @param sql Sql query.
+ * @param isQry Operation type flag - query or not - to enforce query type check.
+ * @param loc Local execution flag.
+ * @param args Args.
+ * @param fetchSize Fetch size.
+ * @param locQry Local query flag.
+ * @param collocatedQry Collocated query flag.
+ * @param distributedJoins Distributed joins flag.
+ * @param enforceJoinOrder Enforce joins order falg.
+ * @param lazy Lazy query execution flag.
+ */
+ public JdbcQueryMultipleStatementsTask(Ignite ignite, String schemaName, String sql, Boolean isQry, boolean loc,
+ Object[] args, int fetchSize, boolean locQry, boolean collocatedQry, boolean distributedJoins,
+ boolean enforceJoinOrder, boolean lazy) {
+ this.ignite = ignite;
+ this.args = args;
+ this.schemaName = schemaName;
+ this.sql = sql;
+ this.isQry = isQry;
+ this.fetchSize = fetchSize;
+ this.loc = loc;
+ this.locQry = locQry;
+ this.collocatedQry = collocatedQry;
+ this.distributedJoins = distributedJoins;
+ this.enforceJoinOrder = enforceJoinOrder;
+ this.lazy = lazy;
+ }
+
+ /** {@inheritDoc} */
+ @Override public List<JdbcStatementResultInfo> call() throws Exception {
+ SqlFieldsQuery qry = (isQry != null ? new JdbcSqlFieldsQuery(sql, isQry) : new SqlFieldsQuery(sql))
+ .setArgs(args);
+
+ qry.setPageSize(fetchSize);
+ qry.setLocal(locQry);
+ qry.setCollocated(collocatedQry);
+ qry.setDistributedJoins(distributedJoins);
+ qry.setEnforceJoinOrder(enforceJoinOrder);
+ qry.setLazy(lazy);
+ qry.setSchema(schemaName);
+
+ GridKernalContext ctx = ((IgniteKernal)ignite).context();
+
+ List<FieldsQueryCursor<List<?>>> curs = ctx.query().querySqlFieldsNoCache(qry, true, false);
+
+ List<JdbcStatementResultInfo> resultsInfo = new ArrayList<>(curs.size());
+
+ for (FieldsQueryCursor<List<?>> cur0 : curs) {
+ QueryCursorImpl<List<?>> cur = (QueryCursorImpl<List<?>>)cur0;
+
+ long updCnt = -1;
+
+ UUID qryId = null;
+
+ if (!cur.isQuery()) {
+ List<List<?>> items = cur.getAll();
+
+ assert items != null && items.size() == 1 && items.get(0).size() == 1
+ && items.get(0).get(0) instanceof Long :
+ "Invalid result set for not-SELECT query. [qry=" + sql +
+ ", res=" + S.toString(List.class, items) + ']';
+
+ updCnt = (Long)items.get(0).get(0);
+
+ cur.close();
+ }
+ else {
+ qryId = UUID.randomUUID();
+
+ JdbcQueryTask.Cursor jdbcCur = new JdbcQueryTask.Cursor(cur, cur.iterator());
+
+ JdbcQueryTask.addCursor(qryId, jdbcCur);
+
+ if (!loc)
+ JdbcQueryTask.scheduleRemoval(qryId);
+ }
+
+ JdbcStatementResultInfo resInfo = new JdbcStatementResultInfo(cur.isQuery(), qryId, updCnt);
+
+ resultsInfo.add(resInfo);
+ }
+
+ return resultsInfo;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/df3c407f/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcQueryTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcQueryTask.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcQueryTask.java
index 4854129..ecbfb71 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcQueryTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcQueryTask.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.jdbc2;
-import java.io.Serializable;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
@@ -50,7 +49,7 @@ import org.apache.ignite.resources.IgniteInstanceResource;
* This parameter can be configured via {@link IgniteSystemProperties#IGNITE_JDBC_DRIVER_CURSOR_REMOVE_DELAY}
* system property.
*/
-class JdbcQueryTask implements IgniteCallable<JdbcQueryTask.QueryResult> {
+class JdbcQueryTask implements IgniteCallable<JdbcQueryTaskResult> {
/** Serial version uid. */
private static final long serialVersionUID = 0L;
@@ -132,7 +131,7 @@ class JdbcQueryTask implements IgniteCallable<JdbcQueryTask.QueryResult> {
}
/** {@inheritDoc} */
- @Override public JdbcQueryTask.QueryResult call() throws Exception {
+ @Override public JdbcQueryTaskResult call() throws Exception {
Cursor cursor = CURSORS.get(uuid);
List<String> tbls = null;
@@ -173,7 +172,11 @@ class JdbcQueryTask implements IgniteCallable<JdbcQueryTask.QueryResult> {
if (isQry == null)
isQry = qryCursor.isQuery();
- Collection<GridQueryFieldMetadata> meta = qryCursor.fieldsMeta();
+ CURSORS.put(uuid, cursor = new Cursor(qryCursor, qryCursor.iterator()));
+ }
+
+ if (first || updateMetadata()) {
+ Collection<GridQueryFieldMetadata> meta = cursor.queryCursor().fieldsMeta();
tbls = new ArrayList<>(meta.size());
cols = new ArrayList<>(meta.size());
@@ -184,8 +187,6 @@ class JdbcQueryTask implements IgniteCallable<JdbcQueryTask.QueryResult> {
cols.add(desc.fieldName().toUpperCase());
types.add(desc.fieldTypeName());
}
-
- CURSORS.put(uuid, cursor = new Cursor(qryCursor, qryCursor.iterator()));
}
List<List<?>> rows = new ArrayList<>();
@@ -208,14 +209,14 @@ class JdbcQueryTask implements IgniteCallable<JdbcQueryTask.QueryResult> {
remove(uuid, cursor);
else if (first) {
if (!loc)
- scheduleRemoval(uuid, RMV_DELAY);
+ scheduleRemoval(uuid);
}
else if (!loc && !CURSORS.replace(uuid, cursor, new Cursor(cursor.cursor, cursor.iter)))
assert !CURSORS.containsKey(uuid) : "Concurrent cursor modification.";
assert isQry != null : "Query flag must be set prior to returning result";
- return new QueryResult(uuid, finished, isQry, rows, cols, tbls, types);
+ return new JdbcQueryTaskResult(uuid, finished, isQry, rows, cols, tbls, types);
}
/**
@@ -233,14 +234,28 @@ class JdbcQueryTask implements IgniteCallable<JdbcQueryTask.QueryResult> {
}
/**
+ * @return Flag to update metadata on demand.
+ */
+ protected boolean updateMetadata() {
+ return false;
+ }
+
+ /**
* Schedules removal of stored cursor in case of remote query execution.
*
* @param uuid Cursor UUID.
- * @param delay Delay in milliseconds.
*/
- private void scheduleRemoval(final UUID uuid, long delay) {
- assert !loc;
+ static void scheduleRemoval(final UUID uuid) {
+ scheduleRemoval(uuid, RMV_DELAY);
+ }
+ /**
+ * Schedules removal of stored cursor in case of remote query execution.
+ *
+ * @param uuid Cursor UUID.
+ * @param delay Delay in milliseconds.
+ */
+ private static void scheduleRemoval(final UUID uuid, long delay) {
SCHEDULER.schedule(new CAX() {
@Override public void applyx() {
while (true) {
@@ -279,6 +294,14 @@ class JdbcQueryTask implements IgniteCallable<JdbcQueryTask.QueryResult> {
}
/**
+ * @param uuid Cursor UUID.
+ * @param c Cursor.
+ */
+ static void addCursor(UUID uuid, Cursor c) {
+ CURSORS.putIfAbsent(uuid, c);
+ }
+
+ /**
* Closes and removes cursor.
*
* @param uuid Cursor UUID.
@@ -291,107 +314,9 @@ class JdbcQueryTask implements IgniteCallable<JdbcQueryTask.QueryResult> {
}
/**
- * Result of query execution.
- */
- static class QueryResult implements Serializable {
- /** Serial version uid. */
- private static final long serialVersionUID = 0L;
-
- /** Uuid. */
- private final UUID uuid;
-
- /** Finished. */
- private final boolean finished;
-
- /** Result type - query or update. */
- private final boolean isQry;
-
- /** Rows. */
- private final List<List<?>> rows;
-
- /** Tables. */
- private final List<String> tbls;
-
- /** Columns. */
- private final List<String> cols;
-
- /** Types. */
- private final List<String> types;
-
- /**
- * @param uuid UUID..
- * @param finished Finished.
- * @param isQry
- * @param rows Rows.
- * @param cols Columns.
- * @param tbls Tables.
- * @param types Types.
- */
- public QueryResult(UUID uuid, boolean finished, boolean isQry, List<List<?>> rows, List<String> cols,
- List<String> tbls, List<String> types) {
- this.isQry = isQry;
- this.cols = cols;
- this.uuid = uuid;
- this.finished = finished;
- this.rows = rows;
- this.tbls = tbls;
- this.types = types;
- }
-
- /**
- * @return Query result rows.
- */
- public List<List<?>> getRows() {
- return rows;
- }
-
- /**
- * @return Tables metadata.
- */
- public List<String> getTbls() {
- return tbls;
- }
-
- /**
- * @return Columns metadata.
- */
- public List<String> getCols() {
- return cols;
- }
-
- /**
- * @return Types metadata.
- */
- public List<String> getTypes() {
- return types;
- }
-
- /**
- * @return Query UUID.
- */
- public UUID getUuid() {
- return uuid;
- }
-
- /**
- * @return {@code True} if it is finished query.
- */
- public boolean isFinished() {
- return finished;
- }
-
- /**
- * @return {@code true} if it is result of a query operation, not update; {@code false} otherwise.
- */
- public boolean isQuery() {
- return isQry;
- }
- }
-
- /**
* Cursor.
*/
- private static final class Cursor implements Iterable<List<?>> {
+ static final class Cursor implements Iterable<List<?>> {
/** Cursor. */
final QueryCursor<List<?>> cursor;
@@ -405,7 +330,7 @@ class JdbcQueryTask implements IgniteCallable<JdbcQueryTask.QueryResult> {
* @param cursor Cursor.
* @param iter Iterator.
*/
- private Cursor(QueryCursor<List<?>> cursor, Iterator<List<?>> iter) {
+ Cursor(QueryCursor<List<?>> cursor, Iterator<List<?>> iter) {
this.cursor = cursor;
this.iter = iter;
this.lastAccessTime = U.currentTimeMillis();
@@ -422,5 +347,12 @@ class JdbcQueryTask implements IgniteCallable<JdbcQueryTask.QueryResult> {
public boolean hasNext() {
return iter.hasNext();
}
+
+ /**
+ * @return Cursor.
+ */
+ public QueryCursorImpl<List<?>> queryCursor() {
+ return (QueryCursorImpl<List<?>>)cursor;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/df3c407f/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcQueryTaskResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcQueryTaskResult.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcQueryTaskResult.java
new file mode 100644
index 0000000..607bb38
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcQueryTaskResult.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.jdbc2;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.UUID;
+
+/**
+ * Result of query execution.
+ */
+class JdbcQueryTaskResult implements Serializable {
+ /** Serial version uid. */
+ private static final long serialVersionUID = 0L;
+
+ /** Uuid. */
+ private final UUID uuid;
+
+ /** Finished. */
+ private final boolean finished;
+
+ /** Result type - query or update. */
+ private final boolean isQry;
+
+ /** Rows. */
+ private final List<List<?>> rows;
+
+ /** Tables. */
+ private final List<String> tbls;
+
+ /** Columns. */
+ private final List<String> cols;
+
+ /** Types. */
+ private final List<String> types;
+
+ /**
+ * @param uuid UUID..
+ * @param finished Finished.
+ * @param isQry Is query flag.
+ * @param rows Rows.
+ * @param cols Columns.
+ * @param tbls Tables.
+ * @param types Types.
+ */
+ public JdbcQueryTaskResult(UUID uuid, boolean finished, boolean isQry, List<List<?>> rows, List<String> cols,
+ List<String> tbls, List<String> types) {
+ this.isQry = isQry;
+ this.cols = cols;
+ this.uuid = uuid;
+ this.finished = finished;
+ this.rows = rows;
+ this.tbls = tbls;
+ this.types = types;
+ }
+
+ /**
+ * @return Query result rows.
+ */
+ public List<List<?>> getRows() {
+ return rows;
+ }
+
+ /**
+ * @return Tables metadata.
+ */
+ public List<String> getTbls() {
+ return tbls;
+ }
+
+ /**
+ * @return Columns metadata.
+ */
+ public List<String> getCols() {
+ return cols;
+ }
+
+ /**
+ * @return Types metadata.
+ */
+ public List<String> getTypes() {
+ return types;
+ }
+
+ /**
+ * @return Query UUID.
+ */
+ public UUID getUuid() {
+ return uuid;
+ }
+
+ /**
+ * @return {@code True} if it is finished query.
+ */
+ public boolean isFinished() {
+ return finished;
+ }
+
+ /**
+ * @return {@code true} if it is result of a query operation, not update; {@code false} otherwise.
+ */
+ public boolean isQuery() {
+ return isQry;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/df3c407f/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcQueryTaskV3.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcQueryTaskV3.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcQueryTaskV3.java
new file mode 100644
index 0000000..cb2d452
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcQueryTaskV3.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.jdbc2;
+
+import java.util.UUID;
+import org.apache.ignite.Ignite;
+
+/**
+ * Task for fetch results of multi-statement query.
+ */
+class JdbcQueryTaskV3 extends JdbcQueryTaskV2 {
+ /** Serial version uid. */
+ private static final long serialVersionUID = 0L;
+
+ /** Update metadata on demand flag. */
+ private final boolean updateMeta;
+
+ /**
+ * @param ignite Ignite.
+ * @param cacheName Cache name.
+ * @param schemaName Schema name.
+ * @param sql Sql query.
+ * @param isQry Operation type flag - query or not - to enforce query type check.
+ * @param loc Local execution flag.
+ * @param args Args.
+ * @param fetchSize Fetch size.
+ * @param uuid UUID.
+ * @param locQry Local query flag.
+ * @param collocatedQry Collocated query flag.
+ * @param distributedJoins Distributed joins flag.
+ * @param enforceJoinOrder Enforce joins order flag.
+ * @param lazy Lazy query execution flag.
+ * @param updateMeta Update metadata on demand.
+ */
+ public JdbcQueryTaskV3(Ignite ignite, String cacheName, String schemaName, String sql, Boolean isQry, boolean loc,
+ Object[] args, int fetchSize, UUID uuid, boolean locQry, boolean collocatedQry, boolean distributedJoins,
+ boolean enforceJoinOrder, boolean lazy, boolean updateMeta) {
+ super(ignite, cacheName, schemaName, sql, isQry, loc, args, fetchSize, uuid, locQry,
+ collocatedQry, distributedJoins, enforceJoinOrder, lazy);
+
+ this.updateMeta = updateMeta;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected boolean updateMetadata() {
+ return updateMeta;
+ }
+
+ /**
+ * @param ignite Ignite.
+ * @param cacheName Cache name.
+ * @param schemaName Schema name.
+ * @param sql Sql query.
+ * @param isQry Operation type flag - query or not - to enforce query type check.
+ * @param loc Local execution flag.
+ * @param args Args.
+ * @param fetchSize Fetch size.
+ * @param uuid UUID.
+ * @param locQry Local query flag.
+ * @param collocatedQry Collocated query flag.
+ * @param distributedJoins Distributed joins flag.
+ * @param enforceJoinOrder Enforce joins order flag.
+ * @param lazy Lazy query execution flag.
+ * @param updateMeta Update metadata on demand.
+ * @return Appropriate task JdbcQueryTask or JdbcQueryTaskV2.
+ */
+ public static JdbcQueryTask createTask(Ignite ignite, String cacheName, String schemaName, String sql,
+ Boolean isQry, boolean loc, Object[] args, int fetchSize, UUID uuid, boolean locQry,
+ boolean collocatedQry, boolean distributedJoins,
+ boolean enforceJoinOrder, boolean lazy, boolean updateMeta) {
+
+ if (updateMeta)
+ return new JdbcQueryTaskV3(ignite, cacheName, schemaName, sql, isQry, loc, args, fetchSize,
+ uuid, locQry, collocatedQry, distributedJoins, enforceJoinOrder, lazy, true);
+ else
+ return JdbcQueryTaskV2.createTask(ignite, cacheName, schemaName, sql, isQry, loc, args, fetchSize,
+ uuid, locQry, collocatedQry, distributedJoins, enforceJoinOrder, lazy);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/df3c407f/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcResultSet.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcResultSet.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcResultSet.java
index 04b4041..69d4252 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcResultSet.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcResultSet.java
@@ -39,13 +39,14 @@ import java.sql.Time;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Calendar;
-import java.util.Collection;
+import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.ignite.Ignite;
import org.apache.ignite.internal.processors.odbc.SqlStateCode;
+import org.apache.ignite.internal.util.typedef.F;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.internal.jdbc2.JdbcUtils.convertToSqlException;
@@ -54,6 +55,12 @@ import static org.apache.ignite.internal.jdbc2.JdbcUtils.convertToSqlException;
* JDBC result set implementation.
*/
public class JdbcResultSet implements ResultSet {
+ /** Is query. */
+ private final boolean isQry;
+
+ /** Update count. */
+ private final long updCnt;
+
/** Uuid. */
private final UUID uuid;
@@ -61,13 +68,13 @@ public class JdbcResultSet implements ResultSet {
private final JdbcStatement stmt;
/** Table names. */
- private final List<String> tbls;
+ private List<String> tbls;
/** Column names. */
- private final List<String> cols;
+ private List<String> cols;
/** Class names. */
- private final List<String> types;
+ private List<String> types;
/** Rows cursor iterator. */
private Iterator<List<?>> it;
@@ -93,6 +100,7 @@ public class JdbcResultSet implements ResultSet {
/**
* Creates new result set.
*
+ * @param isQry Is query flag.
* @param uuid Query UUID.
* @param stmt Statement.
* @param tbls Table names.
@@ -100,26 +108,56 @@ public class JdbcResultSet implements ResultSet {
* @param types Types.
* @param fields Fields.
* @param finished Result set finished flag (the last result set).
+ * @throws SQLException On error.
*/
- JdbcResultSet(@Nullable UUID uuid, JdbcStatement stmt, List<String> tbls, List<String> cols,
- List<String> types, Collection<List<?>> fields, boolean finished) {
- assert stmt != null;
- assert tbls != null;
- assert cols != null;
- assert types != null;
- assert fields != null;
-
- this.uuid = uuid;
+ JdbcResultSet(boolean isQry, @Nullable UUID uuid, JdbcStatement stmt, List<String> tbls, List<String> cols,
+ List<String> types, List<List<?>> fields, boolean finished) throws SQLException {
+ this.isQry = isQry;
this.stmt = stmt;
- this.tbls = tbls;
- this.cols = cols;
- this.types = types;
- this.finished = finished;
- this.it = fields.iterator();
+ if (isQry) {
+ this.uuid = uuid;
+ updCnt = -1;
+ this.tbls = tbls;
+ this.cols = cols;
+ this.types = types;
+ this.finished = finished;
+
+ if (fields != null)
+ it = fields.iterator();
+ else
+ it = Collections.emptyIterator();
+ }
+ else {
+ updCnt = updateCounterFromQueryResult(fields);
+
+ this.uuid = null;
+ this.tbls = null;
+ this.cols = null;
+ this.types = null;
+ this.finished = true;
+ it = null;
+ }
}
- /** {@inheritDoc} */
+ /**
+ * @param stmt Statement.
+ * @param updCnt Update count.
+ */
+ JdbcResultSet(JdbcStatement stmt, long updCnt) {
+ isQry = false;
+ this.updCnt = updCnt;
+ this.stmt = stmt;
+
+ uuid = null;
+ tbls = null;
+ cols = null;
+ types = null;
+ finished = true;
+ it = null;
+ }
+
+ /** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public boolean next() throws SQLException {
ensureNotClosed();
@@ -140,37 +178,52 @@ public class JdbcResultSet implements ResultSet {
return true;
}
else if (!finished) {
- JdbcConnection conn = (JdbcConnection)stmt.getConnection();
+ fetchPage();
- Ignite ignite = conn.ignite();
+ return next();
+ }
- UUID nodeId = conn.nodeId();
+ it = null;
- boolean loc = nodeId == null;
+ return false;
+ }
- // Connections from new clients send queries with new tasks, so we have to continue in the same manner
- JdbcQueryTask qryTask = JdbcQueryTaskV2.createTask(loc ? ignite : null, conn.cacheName(), conn.schemaName(),
- null,true, loc, null, fetchSize, uuid, conn.isLocalQuery(), conn.isCollocatedQuery(),
- conn.isDistributedJoins(), conn.isEnforceJoinOrder(), conn.isLazy());
+ /**
+ *
+ */
+ private void fetchPage() throws SQLException {
+ JdbcConnection conn = (JdbcConnection)stmt.getConnection();
- try {
- JdbcQueryTask.QueryResult res =
- loc ? qryTask.call() : ignite.compute(ignite.cluster().forNodeId(nodeId)).call(qryTask);
+ Ignite ignite = conn.ignite();
- finished = res.isFinished();
+ UUID nodeId = conn.nodeId();
- it = res.getRows().iterator();
+ boolean loc = nodeId == null;
- return next();
- }
- catch (Exception e) {
- throw convertToSqlException(e, "Failed to query Ignite.");
- }
- }
+ boolean updateMetadata = tbls == null;
- it = null;
+ // Connections from new clients send queries with new tasks, so we have to continue in the same manner
+ JdbcQueryTask qryTask = JdbcQueryTaskV3.createTask(loc ? ignite : null, conn.cacheName(), conn.schemaName(),
+ null,true, loc, null, fetchSize, uuid, conn.isLocalQuery(), conn.isCollocatedQuery(),
+ conn.isDistributedJoins(), conn.isEnforceJoinOrder(), conn.isLazy(), updateMetadata);
- return false;
+ try {
+ JdbcQueryTaskResult res =
+ loc ? qryTask.call() : ignite.compute(ignite.cluster().forNodeId(nodeId)).call(qryTask);
+
+ finished = res.isFinished();
+
+ it = res.getRows().iterator();
+
+ if (updateMetadata) {
+ tbls = res.getTbls();
+ cols = res.getCols();
+ types = res.getTypes();
+ }
+ }
+ catch (Exception e) {
+ throw convertToSqlException(e, "Failed to query Ignite.");
+ }
}
/** {@inheritDoc} */
@@ -421,6 +474,9 @@ public class JdbcResultSet implements ResultSet {
@Override public ResultSetMetaData getMetaData() throws SQLException {
ensureNotClosed();
+ if (tbls == null)
+ fetchPage();
+
return new JdbcResultSetMetadata(tbls, cols, types);
}
@@ -1523,4 +1579,43 @@ public class JdbcResultSet implements ResultSet {
if (curr == null)
throw new SQLException("Result set is not positioned on a row.");
}
+
+ /**
+ * @return Is Query flag.
+ */
+ public boolean isQuery() {
+ return isQry;
+ }
+
+ /**
+ * @return Update count.
+ */
+ public long updateCount() {
+ return updCnt;
+ }
+
+ /**
+ * @param rows query result.
+ * @return update counter, if found.
+ * @throws SQLException if getting an update counter from result proved to be impossible.
+ */
+ private static long updateCounterFromQueryResult(List<List<?>> rows) throws SQLException {
+ if (F.isEmpty(rows))
+ return -1;
+
+ if (rows.size() != 1)
+ throw new SQLException("Expected fetch size of 1 for update operation.");
+
+ List<?> row = rows.get(0);
+
+ if (row.size() != 1)
+ throw new SQLException("Expected row size of 1 for update operation.");
+
+ Object objRes = row.get(0);
+
+ if (!(objRes instanceof Long))
+ throw new SQLException("Unexpected update result type.");
+
+ return (Long)objRes;
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/df3c407f/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStatement.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStatement.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStatement.java
index a94b8fd..acac123 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStatement.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStatement.java
@@ -24,6 +24,7 @@ import java.sql.SQLFeatureNotSupportedException;
import java.sql.SQLWarning;
import java.sql.Statement;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -57,9 +58,6 @@ public class JdbcStatement implements Statement {
/** Rows limit. */
private int maxRows;
- /** Current result set. */
- protected ResultSet rs;
-
/** Query arguments. */
protected ArrayList<Object> args;
@@ -72,12 +70,15 @@ public class JdbcStatement implements Statement {
/** Fields indexes. */
Map<String, Integer> fieldsIdxs = new HashMap<>();
- /** Current updated items count. */
- long updateCnt = -1;
-
/** Batch of statements. */
private List<String> batch;
+ /** Results. */
+ protected List<JdbcResultSet> results;
+
+ /** Current result set index. */
+ protected int curRes = 0;
+
/**
* Creates new statement.
*
@@ -92,11 +93,20 @@ public class JdbcStatement implements Statement {
/** {@inheritDoc} */
@SuppressWarnings("deprecation")
@Override public ResultSet executeQuery(String sql) throws SQLException {
- ensureNotClosed();
+ execute0(sql, true);
- rs = null;
+ return getResultSet();
+ }
- updateCnt = -1;
+ /**
+ * @param sql SQL query.
+ * @param isQuery Expected type of statements are contained in the query.
+ * @throws SQLException On error.
+ */
+ private void executeMultipleStatement(String sql, Boolean isQuery) throws SQLException {
+ ensureNotClosed();
+
+ closeResults();
if (F.isEmpty(sql))
throw new SQLException("SQL query is empty");
@@ -105,53 +115,37 @@ public class JdbcStatement implements Statement {
UUID nodeId = conn.nodeId();
- UUID uuid = UUID.randomUUID();
-
boolean loc = nodeId == null;
- JdbcQueryTask qryTask = JdbcQueryTaskV2.createTask(loc ? ignite : null, conn.cacheName(), conn.schemaName(),
- sql, true, loc, getArgs(), fetchSize, uuid, conn.isLocalQuery(), conn.isCollocatedQuery(),
+ JdbcQueryMultipleStatementsTask qryTask = new JdbcQueryMultipleStatementsTask(loc ? ignite : null, conn.schemaName(),
+ sql, isQuery, loc, getArgs(), fetchSize, conn.isLocalQuery(), conn.isCollocatedQuery(),
conn.isDistributedJoins(), conn.isEnforceJoinOrder(), conn.isLazy());
try {
- JdbcQueryTask.QueryResult res =
+ List<JdbcStatementResultInfo> rsInfos =
loc ? qryTask.call() : ignite.compute(ignite.cluster().forNodeId(nodeId)).call(qryTask);
- JdbcResultSet rs = new JdbcResultSet(uuid, this, res.getTbls(), res.getCols(), res.getTypes(),
- res.getRows(), res.isFinished());
+ results = new ArrayList<>(rsInfos.size());
- rs.setFetchSize(fetchSize);
-
- resSets.add(rs);
-
- return rs;
+ for (JdbcStatementResultInfo rsInfo : rsInfos) {
+ if (rsInfo.isQuery())
+ results.add(new JdbcResultSet(true, rsInfo.queryId(), this, null, null, null, null, false));
+ else
+ results.add(new JdbcResultSet(this, rsInfo.updateCount()));
+ }
}
catch (Exception e) {
throw convertToSqlException(e, "Failed to query Ignite.");
}
}
- /** {@inheritDoc} */
- @Override public int executeUpdate(String sql) throws SQLException {
- ensureNotClosed();
-
- rs = null;
-
- updateCnt = -1;
-
- return Long.valueOf(doUpdate(sql, getArgs())).intValue();
- }
-
/**
- * Run update query.
* @param sql SQL query.
- * @param args Update arguments.
- * @return Number of affected items.
- * @throws SQLException If failed.
+ * @param isQuery Expected type of statements are contained in the query.
+ * @throws SQLException On error.
*/
- long doUpdate(String sql, Object[] args) throws SQLException {
- if (F.isEmpty(sql))
- throw new SQLException("SQL query is empty");
+ private void executeSingle(String sql, Boolean isQuery) throws SQLException {
+ ensureNotClosed();
Ignite ignite = conn.ignite();
@@ -162,46 +156,50 @@ public class JdbcStatement implements Statement {
boolean loc = nodeId == null;
if (!conn.isDmlSupported())
- throw new SQLException("Failed to query Ignite: DML operations are supported in versions 1.8.0 and newer");
+ if(isQuery != null && !isQuery)
+ throw new SQLException("Failed to query Ignite: DML operations are supported in versions 1.8.0 and newer");
+ else
+ isQuery = true;
JdbcQueryTask qryTask = JdbcQueryTaskV2.createTask(loc ? ignite : null, conn.cacheName(), conn.schemaName(),
- sql, false, loc, args, fetchSize, uuid, conn.isLocalQuery(), conn.isCollocatedQuery(),
+ sql, isQuery, loc, getArgs(), fetchSize, uuid, conn.isLocalQuery(), conn.isCollocatedQuery(),
conn.isDistributedJoins(), conn.isEnforceJoinOrder(), conn.isLazy());
try {
- JdbcQueryTask.QueryResult qryRes =
+ JdbcQueryTaskResult qryRes =
loc ? qryTask.call() : ignite.compute(ignite.cluster().forNodeId(nodeId)).call(qryTask);
- return updateCnt = updateCounterFromQueryResult(qryRes.getRows());
+ JdbcResultSet rs = new JdbcResultSet(qryRes.isQuery(), uuid, this, qryRes.getTbls(), qryRes.getCols(),
+ qryRes.getTypes(), qryRes.getRows(), qryRes.isFinished());
+
+ rs.setFetchSize(fetchSize);
+
+ results = Collections.singletonList(rs);
+ curRes = 0;
}
catch (Exception e) {
throw convertToSqlException(e, "Failed to query Ignite.");
}
+
}
/**
- * @param rows query result.
- * @return update counter, if found.
- * @throws SQLException if getting an update counter from result proved to be impossible.
+ * @param sql SQL query.
+ * @param isQuery Expected type of statements are contained in the query.
+ * @throws SQLException On error.
*/
- private static long updateCounterFromQueryResult(List<List<?>> rows) throws SQLException {
- if (F.isEmpty(rows))
- return -1;
-
- if (rows.size() != 1)
- throw new SQLException("Expected fetch size of 1 for update operation");
-
- List<?> row = rows.get(0);
-
- if (row.size() != 1)
- throw new SQLException("Expected row size of 1 for update operation");
-
- Object objRes = row.get(0);
+ protected void execute0(String sql, Boolean isQuery) throws SQLException {
+ if (conn.isMultipleStatementsAllowed())
+ executeMultipleStatement(sql, isQuery);
+ else
+ executeSingle(sql, isQuery);
+ }
- if (!(objRes instanceof Long))
- throw new SQLException("Unexpected update result type");
+ /** {@inheritDoc} */
+ @Override public int executeUpdate(String sql) throws SQLException {
+ execute0(sql, false);
- return (Long)objRes;
+ return getUpdateCount();
}
/** {@inheritDoc} */
@@ -302,86 +300,48 @@ public class JdbcStatement implements Statement {
/** {@inheritDoc} */
@Override public boolean execute(String sql) throws SQLException {
- if (!conn.isDmlSupported()) {
- // We attempt to run a query without any checks as long as server does not support DML anyway,
- // so it simply will throw an exception when given a DML statement instead of a query.
- rs = executeQuery(sql);
-
- return true;
- }
-
- ensureNotClosed();
-
- rs = null;
-
- updateCnt = -1;
-
- if (F.isEmpty(sql))
- throw new SQLException("SQL query is empty");
-
- Ignite ignite = conn.ignite();
+ execute0(sql, null);
- UUID nodeId = conn.nodeId();
-
- UUID uuid = UUID.randomUUID();
-
- boolean loc = nodeId == null;
-
- JdbcQueryTask qryTask = JdbcQueryTaskV2.createTask(loc ? ignite : null, conn.cacheName(), conn.schemaName(),
- sql, null, loc, getArgs(), fetchSize, uuid, conn.isLocalQuery(), conn.isCollocatedQuery(),
- conn.isDistributedJoins(), conn.isEnforceJoinOrder(), conn.isLazy());
-
- try {
- JdbcQueryTask.QueryResult res =
- loc ? qryTask.call() : ignite.compute(ignite.cluster().forNodeId(nodeId)).call(qryTask);
-
- if (res.isQuery()) {
- JdbcResultSet rs = new JdbcResultSet(uuid, this, res.getTbls(), res.getCols(),
- res.getTypes(), res.getRows(), res.isFinished());
-
- rs.setFetchSize(fetchSize);
-
- resSets.add(rs);
-
- this.rs = rs;
- }
- else
- updateCnt = updateCounterFromQueryResult(res.getRows());
-
- return res.isQuery();
- }
- catch (Exception e) {
- throw convertToSqlException(e, "Failed to query Ignite.");
- }
+ return results.get(0).isQuery();
}
/** {@inheritDoc} */
@Override public ResultSet getResultSet() throws SQLException {
- ensureNotClosed();
+ JdbcResultSet rs = nextResultSet();
- ResultSet rs0 = rs;
+ if (rs == null)
+ return null;
- rs = null;
+ if (!rs.isQuery()) {
+ curRes--;
- return rs0;
+ return null;
+ }
+
+ return rs;
}
/** {@inheritDoc} */
@Override public int getUpdateCount() throws SQLException {
- ensureNotClosed();
+ JdbcResultSet rs = nextResultSet();
- long res = updateCnt;
+ if (rs == null)
+ return -1;
- updateCnt = -1;
+ if (rs.isQuery()) {
+ curRes--;
+
+ return -1;
+ }
- return Long.valueOf(res).intValue();
+ return (int)rs.updateCount();
}
/** {@inheritDoc} */
@Override public boolean getMoreResults() throws SQLException {
ensureNotClosed();
- return false;
+ return getMoreResults(CLOSE_CURRENT_RESULT);
}
/** {@inheritDoc} */
@@ -472,9 +432,8 @@ public class JdbcStatement implements Statement {
*/
protected int[] doBatchUpdate(String command, List<String> batch, List<List<Object>> batchArgs)
throws SQLException {
- rs = null;
- updateCnt = -1;
+ closeResults();
if ((F.isEmpty(command) || F.isEmpty(batchArgs)) && F.isEmpty(batch))
throw new SQLException("Batch is empty.");
@@ -495,7 +454,11 @@ public class JdbcStatement implements Statement {
try {
int[] res = loc ? task.call() : ignite.compute(ignite.cluster().forNodeId(nodeId)).call(task);
- updateCnt = F.isEmpty(res)? -1 : res[res.length - 1];
+ long updateCnt = F.isEmpty(res)? -1 : res[res.length - 1];
+
+ results = Collections.singletonList(new JdbcResultSet(this, updateCnt));
+
+ curRes = 0;
return res;
}
@@ -515,10 +478,32 @@ public class JdbcStatement implements Statement {
@Override public boolean getMoreResults(int curr) throws SQLException {
ensureNotClosed();
- if (curr == KEEP_CURRENT_RESULT || curr == CLOSE_ALL_RESULTS)
- throw new SQLFeatureNotSupportedException("Multiple open results are not supported.");
+ if (results != null) {
+ assert curRes <= results.size() : "Invalid results state: [resultsCount=" + results.size() +
+ ", curRes=" + curRes + ']';
- return false;
+ switch (curr) {
+ case CLOSE_CURRENT_RESULT:
+ if (curRes > 0)
+ results.get(curRes - 1).close();
+
+ break;
+
+ case CLOSE_ALL_RESULTS:
+ for (int i = 0; i < curRes; ++i)
+ results.get(i).close();
+
+ break;
+
+ case KEEP_CURRENT_RESULT:
+ break;
+
+ default:
+ throw new SQLException("Invalid 'current' parameter.");
+ }
+ }
+
+ return (results != null && curRes < results.size());
}
/** {@inheritDoc} */
@@ -657,4 +642,35 @@ public class JdbcStatement implements Statement {
if (closed)
throw new SQLException("Connection is closed.", SqlStateCode.CONNECTION_CLOSED);
}
+
+ /**
+ * Get last result set if any.
+ *
+ * @return Result set or null.
+ * @throws SQLException If failed.
+ */
+ private JdbcResultSet nextResultSet() throws SQLException {
+ ensureNotClosed();
+
+ if (results == null || curRes >= results.size())
+ return null;
+ else
+ return results.get(curRes++);
+ }
+
+ /**
+ * Close results.
+ *
+ * @throws SQLException On error.
+ */
+ private void closeResults() throws SQLException {
+ if (results != null) {
+ for (JdbcResultSet rs : results)
+ rs.close();
+
+ results = null;
+ curRes = 0;
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/df3c407f/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStatementResultInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStatementResultInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStatementResultInfo.java
new file mode 100644
index 0000000..8aa02f1
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStatementResultInfo.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.jdbc2;
+
+import java.util.UUID;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * JDBC statement result information. Keeps statement type (SELECT or UPDATE) and
+ * queryId or update count (depends on statement type).
+ */
+public class JdbcStatementResultInfo {
+ /** Query flag. */
+ private boolean isQuery;
+
+ /** Update count. */
+ private long updCnt;
+
+ /** Query ID. */
+ private UUID qryId;
+
+ /**
+ * @param isQuery Query flag.
+ * @param qryId Query ID.
+ * @param updCnt Update count.
+ */
+ public JdbcStatementResultInfo(boolean isQuery, UUID qryId, long updCnt) {
+ this.isQuery = isQuery;
+ this.updCnt = updCnt;
+ this.qryId = qryId;
+ }
+
+ /**
+ * @return Query flag.
+ */
+ public boolean isQuery() {
+ return isQuery;
+ }
+
+ /**
+ * @return Query ID.
+ */
+ public UUID queryId() {
+ return qryId;
+ }
+
+ /**
+ * @return Update count.
+ */
+ public long updateCount() {
+ return updCnt;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(JdbcStatementResultInfo.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/df3c407f/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStreamedPreparedStatement.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStreamedPreparedStatement.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStreamedPreparedStatement.java
index 9f76700..408f089 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStreamedPreparedStatement.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStreamedPreparedStatement.java
@@ -19,8 +19,8 @@ package org.apache.ignite.internal.jdbc2;
import java.sql.PreparedStatement;
import java.sql.SQLException;
+import java.util.Collections;
import org.apache.ignite.IgniteDataStreamer;
-import org.apache.ignite.internal.IgniteEx;
/**
* Prepared statement associated with a data streamer.
@@ -33,8 +33,9 @@ class JdbcStreamedPreparedStatement extends JdbcPreparedStatement {
* Creates new prepared statement.
*
* @param conn Connection.
- * @param sql SQL query.
+ * @param sql SQL query.
* @param streamer Data streamer to use with this statement. Will be closed on statement close.
+ * @param nativeStmt Native statement.
*/
JdbcStreamedPreparedStatement(JdbcConnection conn, String sql, IgniteDataStreamer<?, ?> streamer,
PreparedStatement nativeStmt) {
@@ -53,8 +54,16 @@ class JdbcStreamedPreparedStatement extends JdbcPreparedStatement {
}
/** {@inheritDoc} */
- @Override long doUpdate(String sql, Object[] args) throws SQLException {
- return conn.ignite().context().query().streamUpdateQuery(conn.cacheName(), conn.schemaName(),
- streamer, sql, args);
+ @Override protected void execute0(String sql, Boolean isQuery) throws SQLException {
+ assert isQuery != null && !isQuery;
+
+ long updCnt = conn.ignite().context().query().streamUpdateQuery(conn.cacheName(), conn.schemaName(),
+ streamer, sql, getArgs());
+
+ JdbcResultSet rs = new JdbcResultSet(this, updCnt);
+
+ results = Collections.singletonList(rs);
+
+ curRes = 0;
}
}