You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/10/03 14:03:21 UTC
[07/37] ignite git commit: IGNITE-6046: JDBC thin driver: allowed
multiple statements execution (e.g. "SELECT something;
SELECT something_else". This closes #2618.
IGNITE-6046: JDBC thin driver: allowed multiple statements execution (e.g. "SELECT something; SELECT something_else". This closes #2618.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/405749a7
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/405749a7
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/405749a7
Branch: refs/heads/ignite-2.3
Commit: 405749a736c85c054e2c4ac1dc197ea903e8d75f
Parents: 507ac67
Author: tledkov-gridgain <tl...@gridgain.com>
Authored: Thu Sep 28 15:59:12 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Thu Sep 28 15:59:12 2017 +0300
----------------------------------------------------------------------
.../jdbc/thin/JdbcThinStatementSelfTest.java | 138 +++++++++++--
.../ignite/cache/query/SqlFieldsQuery.java | 18 ++
.../jdbc/thin/JdbcThinDatabaseMetadata.java | 2 +-
.../jdbc/thin/JdbcThinPreparedStatement.java | 2 +-
.../internal/jdbc/thin/JdbcThinResultSet.java | 8 +-
.../internal/jdbc/thin/JdbcThinStatement.java | 196 +++++++++++++------
.../internal/jdbc/thin/JdbcThinTcpIo.java | 9 +-
.../odbc/jdbc/JdbcConnectionContext.java | 10 +-
...dbcQueryExecuteMultipleStatementsResult.java | 134 +++++++++++++
.../odbc/jdbc/JdbcRequestHandler.java | 82 ++++++--
.../processors/odbc/jdbc/JdbcResult.java | 8 +
.../processors/odbc/jdbc/JdbcResultInfo.java | 95 +++++++++
.../processors/query/GridQueryIndexing.java | 7 +-
.../processors/query/GridQueryProcessor.java | 28 ++-
...IgniteClientCacheInitializationFailTest.java | 6 +-
.../query/h2/DmlStatementsProcessor.java | 35 ++--
.../processors/query/h2/IgniteH2Indexing.java | 181 +++++++++++++----
.../query/h2/ddl/DdlStatementsProcessor.java | 11 +-
.../query/h2/sql/GridSqlQueryParser.java | 81 ++++++++
.../query/h2/sql/GridSqlQuerySplitter.java | 17 +-
.../MultipleStatementsSqlQuerySelfTest.java | 154 +++++++++++++++
.../IgniteCacheQuerySelfTestSuite.java | 2 +
22 files changed, 1037 insertions(+), 187 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/405749a7/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStatementSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStatementSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStatementSelfTest.java
index 16f118c..5309465 100644
--- a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStatementSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStatementSelfTest.java
@@ -92,7 +92,6 @@ public class JdbcThinStatementSelfTest extends JdbcThinAbstractSelfTest {
startGridsMultiThreaded(3);
-
fillCache();
}
@@ -414,22 +413,120 @@ public class JdbcThinStatementSelfTest extends JdbcThinAbstractSelfTest {
/**
* @throws Exception If failed.
*/
- public void testExecuteQueryMultipleResultSets() throws Exception {
- assert !conn.getMetaData().supportsMultipleResultSets();
+ public void testExecuteQueryMultipleOnlyResultSets() throws Exception {
+ assert conn.getMetaData().supportsMultipleResultSets();
- fail("https://issues.apache.org/jira/browse/IGNITE-6046");
+ int stmtCnt = 10;
- final String sqlText = "select 1; select 1";
+ StringBuilder sql = new StringBuilder();
- GridTestUtils.assertThrows(log,
- new Callable<Object>() {
- @Override public Object call() throws Exception {
- return stmt.executeQuery(sqlText);
- }
- },
- SQLException.class,
- "Multiple result sets"
- );
+ for (int i = 0; i < stmtCnt; ++i)
+ sql.append("select ").append(i).append("; ");
+
+ assert stmt.execute(sql.toString());
+
+ for (int i = 0; i < stmtCnt; ++i) {
+ assert stmt.getMoreResults();
+
+ ResultSet rs = stmt.getResultSet();
+
+ assert rs.next();
+ assert rs.getInt(1) == i;
+ assert !rs.next();
+ }
+
+ assert !stmt.getMoreResults();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testExecuteQueryMultipleOnlyDml() throws Exception {
+ conn.setSchema(null);
+
+ int stmtCnt = 10;
+
+ StringBuilder sql = new StringBuilder("drop table if exists test; create table test(ID int primary key, NAME varchar(20)); ");
+
+ for (int i = 0; i < stmtCnt; ++i)
+ sql.append("insert into test (ID, NAME) values (" + i + ", 'name_" + i +"'); ");
+
+ assert !stmt.execute(sql.toString());
+
+ // DROP TABLE statement
+ assert stmt.getResultSet() == null;
+ assert stmt.getUpdateCount() == 0;
+
+ // CREATE TABLE statement
+ assert stmt.getResultSet() == null;
+ assert stmt.getUpdateCount() == 0;
+
+ for (int i = 0; i < stmtCnt; ++i) {
+ assert stmt.getMoreResults();
+
+ assert stmt.getResultSet() == null;
+ assert stmt.getUpdateCount() == 1;
+ }
+
+ assert !stmt.getMoreResults();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testExecuteQueryMultipleMixed() throws Exception {
+ conn.setSchema(null);
+
+ int stmtCnt = 10;
+
+ StringBuilder sql = new StringBuilder("drop table if exists test; create table test(ID int primary key, NAME varchar(20)); ");
+
+ for (int i = 0; i < stmtCnt; ++i) {
+ if (i % 2 == 0)
+ sql.append(" insert into test (ID, NAME) values (" + i + ", 'name_" + i + "'); ");
+ else
+ sql.append(" select * from test where id < " + i + "; ");
+ }
+
+ assert !stmt.execute(sql.toString());
+
+ // DROP TABLE statement
+ assert stmt.getResultSet() == null;
+ assert stmt.getUpdateCount() == 0;
+
+ // CREATE TABLE statement
+ assert stmt.getResultSet() == null;
+ assert stmt.getUpdateCount() == 0;
+
+ boolean notEmptyResult = false;
+
+ for (int i = 0; i < stmtCnt; ++i) {
+ assert stmt.getMoreResults();
+
+ if (i % 2 == 0) {
+ assert stmt.getResultSet() == null;
+ assert stmt.getUpdateCount() == 1;
+ }
+ else {
+ assert stmt.getUpdateCount() == -1;
+
+ ResultSet rs = stmt.getResultSet();
+
+ int rowsCnt = 0;
+
+ while(rs.next())
+ rowsCnt++;
+
+ assert rowsCnt <= (i + 1) / 2;
+
+ if (rowsCnt == (i + 1) / 2)
+ notEmptyResult = true;
+ }
+ }
+
+ assert notEmptyResult;
+
+ assert !stmt.getMoreResults();
}
/**
@@ -462,7 +559,7 @@ public class JdbcThinStatementSelfTest extends JdbcThinAbstractSelfTest {
}
},
SQLException.class,
- "The query is not DML"
+ "Given statement type does not match that declared by JDBC driver"
);
}
@@ -776,12 +873,14 @@ public class JdbcThinStatementSelfTest extends JdbcThinAbstractSelfTest {
public void testGetMoreResults() throws Exception {
assert !stmt.getMoreResults();
- stmt.execute("select 1");
+ stmt.execute("select 1; ");
ResultSet rs = stmt.getResultSet();
assert !stmt.getMoreResults();
+ assert stmt.getResultSet() == null;
+
assert rs.isClosed();
stmt.close();
@@ -801,7 +900,7 @@ public class JdbcThinStatementSelfTest extends JdbcThinAbstractSelfTest {
assert !stmt.getMoreResults(Statement.KEEP_CURRENT_RESULT);
assert !stmt.getMoreResults(Statement.CLOSE_ALL_RESULTS);
- stmt.execute("select 1");
+ stmt.execute("select 1; ");
ResultSet rs = stmt.getResultSet();
@@ -997,7 +1096,10 @@ public class JdbcThinStatementSelfTest extends JdbcThinAbstractSelfTest {
/**
* @throws Exception If failed.
*/
- public void testStatementTypeMismatchSelect() throws Exception {
+ public void testStatementTypeMismatchSelectForCachedQuery() throws Exception {
+ // Put query to cache.
+ stmt.executeQuery("select 1;");
+
GridTestUtils.assertThrows(log,
new Callable<Object>() {
@Override public Object call() throws Exception {
http://git-wip-us.apache.org/repos/asf/ignite/blob/405749a7/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java
index 54f8396..2d128d1 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java
@@ -81,6 +81,24 @@ public class SqlFieldsQuery extends Query<List<?>> {
private String schema;
/**
+ * Copy constructs SQL fields query.
+ *
+ * @param qry SQL query.
+ */
+ public SqlFieldsQuery(SqlFieldsQuery qry) {
+ sql = qry.sql;
+ args = qry.args;
+ collocated = qry.collocated;
+ timeout = qry.timeout;
+ enforceJoinOrder = qry.enforceJoinOrder;
+ distributedJoins = qry.distributedJoins;
+ replicatedOnly = qry.replicatedOnly;
+ lazy = qry.lazy;
+ parts = qry.parts;
+ schema = qry.schema;
+ }
+
+ /**
* Constructs SQL fields query.
*
* @param sql SQL query.
http://git-wip-us.apache.org/repos/asf/ignite/blob/405749a7/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinDatabaseMetadata.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinDatabaseMetadata.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinDatabaseMetadata.java
index d13ef68..2ce7983 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinDatabaseMetadata.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinDatabaseMetadata.java
@@ -311,7 +311,7 @@ public class JdbcThinDatabaseMetadata implements DatabaseMetaData {
/** {@inheritDoc} */
@Override public boolean supportsMultipleResultSets() throws SQLException {
- return false;
+ return true;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/405749a7/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinPreparedStatement.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinPreparedStatement.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinPreparedStatement.java
index ce1b65c..fb2810d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinPreparedStatement.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinPreparedStatement.java
@@ -238,7 +238,7 @@ public class JdbcThinPreparedStatement extends JdbcThinStatement implements Prep
@Override public boolean execute() throws SQLException {
executeWithArguments(JdbcStatementType.ANY_STATEMENT_TYPE);
- return rs.isQuery();
+ return resultSets.get(0).isQuery();
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/405749a7/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinResultSet.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinResultSet.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinResultSet.java
index 189175b..ff93274 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinResultSet.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinResultSet.java
@@ -187,7 +187,7 @@ public class JdbcThinResultSet implements ResultSet {
this.fetchSize = fetchSize;
this.rows = rows;
- rowsIter = rows.iterator();
+ rowsIter = rows != null ? rows.iterator() : null;
}
else
this.updCnt = updCnt;
@@ -228,10 +228,10 @@ public class JdbcThinResultSet implements ResultSet {
/** {@inheritDoc} */
@Override public void close() throws SQLException {
- if (closeStmt)
- stmt.close();
-
close0();
+
+ if (closeStmt)
+ stmt.closeIfAllResultsClosed();
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/405749a7/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinStatement.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinStatement.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinStatement.java
index 8e096c8..603545b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinStatement.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinStatement.java
@@ -25,6 +25,7 @@ import java.sql.SQLFeatureNotSupportedException;
import java.sql.SQLWarning;
import java.sql.Statement;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import org.apache.ignite.cache.query.SqlQuery;
import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
@@ -33,9 +34,12 @@ import org.apache.ignite.internal.processors.odbc.ClientListenerResponse;
import org.apache.ignite.internal.processors.odbc.jdbc.JdbcBatchExecuteRequest;
import org.apache.ignite.internal.processors.odbc.jdbc.JdbcBatchExecuteResult;
import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQuery;
+import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQueryExecuteMultipleStatementsResult;
import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQueryExecuteRequest;
import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQueryExecuteResult;
import org.apache.ignite.internal.processors.odbc.jdbc.JdbcStatementType;
+import org.apache.ignite.internal.processors.odbc.jdbc.JdbcResult;
+import org.apache.ignite.internal.processors.odbc.jdbc.JdbcResultInfo;
import static java.sql.ResultSet.CONCUR_READ_ONLY;
import static java.sql.ResultSet.FETCH_FORWARD;
@@ -60,15 +64,9 @@ public class JdbcThinStatement implements Statement {
/** Query timeout. */
private int timeout;
- /** Current result set. */
- protected JdbcThinResultSet rs;
-
/** Fetch size. */
private int pageSize = DFLT_PAGE_SIZE;
- /** Result set or update count has been already read. */
- private boolean alreadyRead;
-
/** Result set holdability*/
private final int resHoldability;
@@ -78,6 +76,12 @@ public class JdbcThinStatement implements Statement {
/** Close this statement on result set close. */
private boolean closeOnCompletion;
+ /** Result sets. */
+ protected List<JdbcThinResultSet> resultSets;
+
+ /** Current result index. */
+ protected int curRes;
+
/**
* Creates new statement.
*
@@ -113,24 +117,58 @@ public class JdbcThinStatement implements Statement {
protected void execute0(JdbcStatementType stmtType, String sql, List<Object> args) throws SQLException {
ensureNotClosed();
- if (rs != null) {
- rs.close0();
-
- rs = null;
- }
-
- alreadyRead = false;
+ closeResults();
if (sql == null || sql.isEmpty())
throw new SQLException("SQL query is empty.");
- JdbcQueryExecuteResult res = conn.sendRequest(new JdbcQueryExecuteRequest(stmtType, conn.getSchema(), pageSize,
+ JdbcResult res0 = conn.sendRequest(new JdbcQueryExecuteRequest(stmtType, conn.getSchema(), pageSize,
maxRows, sql, args == null ? null : args.toArray(new Object[args.size()])));
- assert res != null;
+ assert res0 != null;
- rs = new JdbcThinResultSet(this, res.getQueryId(), pageSize, res.last(), res.items(),
- res.isQuery(), conn.autoCloseServerCursor(), res.updateCount(), closeOnCompletion);
+ if (res0 instanceof JdbcQueryExecuteResult) {
+ JdbcQueryExecuteResult res = (JdbcQueryExecuteResult)res0;
+
+ resultSets = Collections.singletonList(new JdbcThinResultSet(this, res.getQueryId(), pageSize,
+ res.last(), res.items(), res.isQuery(), conn.autoCloseServerCursor(), res.updateCount(),
+ closeOnCompletion));
+ }
+ else if (res0 instanceof JdbcQueryExecuteMultipleStatementsResult) {
+ JdbcQueryExecuteMultipleStatementsResult res = (JdbcQueryExecuteMultipleStatementsResult)res0;
+
+ List<JdbcResultInfo> resInfos = res.results();
+
+ resultSets = new ArrayList<>(resInfos.size());
+
+ boolean firstRes = true;
+
+ for(JdbcResultInfo rsInfo : resInfos) {
+ if (!rsInfo.isQuery()) {
+ resultSets.add(new JdbcThinResultSet(this, -1, pageSize,
+ true, Collections.<List<Object>>emptyList(), false,
+ conn.autoCloseServerCursor(), rsInfo.updateCount(), closeOnCompletion));
+ }
+ else {
+ if (firstRes) {
+ firstRes = false;
+
+ resultSets.add(new JdbcThinResultSet(this, rsInfo.queryId(), pageSize,
+ res.isLast(), res.items(), true,
+ conn.autoCloseServerCursor(), -1, closeOnCompletion));
+ }
+ else {
+ resultSets.add(new JdbcThinResultSet(this, rsInfo.queryId(), pageSize,
+ false, null, true,
+ conn.autoCloseServerCursor(), -1, closeOnCompletion));
+ }
+ }
+ }
+ }
+ else
+ throw new SQLException("Unexpected result [res=" + res0 + ']');
+
+ assert resultSets.size() > 0 : "At least one results set is expected";
}
/** {@inheritDoc} */
@@ -140,7 +178,7 @@ public class JdbcThinStatement implements Statement {
int res = getUpdateCount();
if (res == -1)
- throw new SQLException("The query is not DML statement: " + sql, SqlStateCode.PARSING_EXCEPTION);
+ throw new SQLException("The query is not DML statememt: " + sql);
return res;
}
@@ -150,12 +188,25 @@ public class JdbcThinStatement implements Statement {
if (isClosed())
return;
- if (rs != null)
- rs.close0();
+ closeResults();
closed = true;
}
+ /**
+ * Close results.
+ * @throws SQLException On error.
+ */
+ private void closeResults() throws SQLException {
+ if (resultSets != null) {
+ for (JdbcThinResultSet rs : resultSets)
+ rs.close0();
+
+ resultSets = null;
+ curRes = 0;
+ }
+ }
+
/** {@inheritDoc} */
@Override public int getMaxFieldSize() throws SQLException {
ensureNotClosed();
@@ -242,31 +293,39 @@ public class JdbcThinStatement implements Statement {
execute0(JdbcStatementType.ANY_STATEMENT_TYPE, sql, null);
- return rs.isQuery();
+ return resultSets.get(0).isQuery();
}
/** {@inheritDoc} */
@Override public ResultSet getResultSet() throws SQLException {
- JdbcThinResultSet rs = lastResultSet();
+ JdbcThinResultSet rs = nextResultSet();
- ResultSet res = rs == null || !rs.isQuery() ? null : rs;
+ if (rs == null)
+ return null;
- if (res != null)
- alreadyRead = true;
+ if (!rs.isQuery()) {
+ curRes--;
- return res;
+ return null;
+ }
+
+ return rs;
}
/** {@inheritDoc} */
@Override public int getUpdateCount() throws SQLException {
- JdbcThinResultSet rs = lastResultSet();
+ JdbcThinResultSet rs = nextResultSet();
+
+ if (rs == null)
+ return -1;
- int res = rs == null || rs.isQuery() ? -1 : (int)rs.updatedCount();
+ if (rs.isQuery()) {
+ curRes--;
- if (res != -1)
- alreadyRead = true;
+ return -1;
+ }
- return res;
+ return (int)rs.updatedCount();
}
/**
@@ -275,13 +334,13 @@ public class JdbcThinStatement implements Statement {
* @return Result set or null.
* @throws SQLException If failed.
*/
- private JdbcThinResultSet lastResultSet() throws SQLException {
+ private JdbcThinResultSet nextResultSet() throws SQLException {
ensureNotClosed();
- if (rs == null || alreadyRead)
+ if (resultSets == null || curRes >= resultSets.size())
return null;
-
- return rs;
+ else
+ return resultSets.get(curRes++);
}
/** {@inheritDoc} */
@@ -358,13 +417,7 @@ public class JdbcThinStatement implements Statement {
@Override public int[] executeBatch() throws SQLException {
ensureNotClosed();
- if (rs != null) {
- rs.close0();
-
- rs = null;
- }
-
- alreadyRead = false;
+ closeResults();
if (batch == null || batch.isEmpty())
throw new SQLException("Batch is empty.");
@@ -395,22 +448,32 @@ public class JdbcThinStatement implements Statement {
@Override public boolean getMoreResults(int curr) throws SQLException {
ensureNotClosed();
- switch (curr) {
- case CLOSE_CURRENT_RESULT:
- case CLOSE_ALL_RESULTS:
- if (rs != null)
- rs.close();
+ if (resultSets != null) {
+ assert curRes <= resultSets.size() : "Invalid results state: [resultsCount=" + resultSets.size() +
+ ", curRes=" + curRes + ']';
- break;
+ switch (curr) {
+ case CLOSE_CURRENT_RESULT:
+ if (curRes > 0)
+ resultSets.get(curRes - 1).close0();
- case KEEP_CURRENT_RESULT:
- break;
+ break;
- default:
- throw new SQLException("Invalid 'current' parameter.");
+ case CLOSE_ALL_RESULTS:
+ for (int i = 0; i < curRes; ++i)
+ resultSets.get(i).close0();
+
+ break;
+
+ case KEEP_CURRENT_RESULT:
+ break;
+
+ default:
+ throw new SQLException("Invalid 'current' parameter.");
+ }
}
- return false;
+ return (resultSets != null && curRes < resultSets.size());
}
/** {@inheritDoc} */
@@ -532,8 +595,10 @@ public class JdbcThinStatement implements Statement {
closeOnCompletion = true;
- if (rs != null)
- rs.closeStatement(true);
+ if (resultSets != null) {
+ for (JdbcThinResultSet rs : resultSets)
+ rs.closeStatement(true);
+ }
}
/** {@inheritDoc} */
@@ -568,4 +633,25 @@ public class JdbcThinStatement implements Statement {
if (isClosed())
throw new SQLException("Statement is closed.");
}
+
+ /**
+ * Used by statement on closeOnCompletion mode.
+ * @throws SQLException On error.
+ */
+ void closeIfAllResultsClosed() throws SQLException {
+ if (isClosed())
+ return;
+
+ boolean allRsClosed = true;
+
+ if (resultSets != null) {
+ for (JdbcThinResultSet rs : resultSets) {
+ if (!rs.isClosed())
+ allRsClosed = false;
+ }
+ }
+
+ if (allRsClosed)
+ close();
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/405749a7/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java
index d7fa9d0..7ac9c2c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java
@@ -46,12 +46,15 @@ import org.apache.ignite.lang.IgniteProductVersion;
* JDBC IO layer implementation based on blocking IPC streams.
*/
public class JdbcThinTcpIo {
- /** Current version. */
- private static final ClientListenerProtocolVersion CURRENT_VER = ClientListenerProtocolVersion.create(2, 1, 5);
-
/** Version 2.1.0. */
private static final ClientListenerProtocolVersion VER_2_1_0 = ClientListenerProtocolVersion.create(2, 1, 0);
+ /** Version 2.3.1. */
+ private static final ClientListenerProtocolVersion VER_2_3_1 = ClientListenerProtocolVersion.create(2, 3, 1);
+
+ /** Current version. */
+ private static final ClientListenerProtocolVersion CURRENT_VER = VER_2_3_1;
+
/** Initial output stream capacity for handshake. */
private static final int HANDSHAKE_MSG_SIZE = 13;
http://git-wip-us.apache.org/repos/asf/ignite/blob/405749a7/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java
index 12be361..38d1972 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java
@@ -37,8 +37,11 @@ public class JdbcConnectionContext implements ClientListenerConnectionContext {
/** Version 2.1.5: added "lazy" flag. */
private static final ClientListenerProtocolVersion VER_2_1_5 = ClientListenerProtocolVersion.create(2, 1, 5);
+ /** Version 2.3.1: added "multiple statements query" feature. */
+ public static final ClientListenerProtocolVersion VER_2_3_1 = ClientListenerProtocolVersion.create(2, 3, 1);
+
/** Current version. */
- private static final ClientListenerProtocolVersion CURRENT_VER = VER_2_1_5;
+ private static final ClientListenerProtocolVersion CURRENT_VER = VER_2_3_1;
/** Supported versions. */
private static final Set<ClientListenerProtocolVersion> SUPPORTED_VERS = new HashSet<>();
@@ -60,6 +63,7 @@ public class JdbcConnectionContext implements ClientListenerConnectionContext {
static {
SUPPORTED_VERS.add(CURRENT_VER);
+ SUPPORTED_VERS.add(VER_2_1_5);
SUPPORTED_VERS.add(VER_2_1_0);
}
@@ -100,8 +104,8 @@ public class JdbcConnectionContext implements ClientListenerConnectionContext {
if (ver.compareTo(VER_2_1_5) >= 0)
lazyExec = reader.readBoolean();
- handler = new JdbcRequestHandler(ctx, busyLock, maxCursors, distributedJoins,
- enforceJoinOrder, collocated, replicatedOnly, autoCloseCursors, lazyExec);
+ handler = new JdbcRequestHandler(ctx, busyLock, maxCursors, distributedJoins, enforceJoinOrder,
+ collocated, replicatedOnly, autoCloseCursors, lazyExec, ver);
parser = new JdbcMessageParser(ctx);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/405749a7/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcQueryExecuteMultipleStatementsResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcQueryExecuteMultipleStatementsResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcQueryExecuteMultipleStatementsResult.java
new file mode 100644
index 0000000..9bbdd59
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcQueryExecuteMultipleStatementsResult.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.odbc.jdbc;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.ignite.binary.BinaryObjectException;
+import org.apache.ignite.internal.binary.BinaryReaderExImpl;
+import org.apache.ignite.internal.binary.BinaryWriterExImpl;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * JDBC query execute result for query with multiple SQL statements.
+ */
+public class JdbcQueryExecuteMultipleStatementsResult extends JdbcResult {
+ /** Statements results. */
+ private List<JdbcResultInfo> results;
+
+ /** Query result rows for the first query. */
+ private List<List<Object>> items;
+
+ /** Flag indicating the query has no unfetched results for the first query. */
+ private boolean last;
+
+ /**
+ * Default constructor.
+ */
+ JdbcQueryExecuteMultipleStatementsResult() {
+ super(QRY_EXEC_MULT);
+ }
+
+ /**
+ * @param results Statements results.
+ * @param items Query result rows for the first query.
+ * @param last Flag indicating the query has no unfetched results for the first query.
+ */
+ public JdbcQueryExecuteMultipleStatementsResult(List<JdbcResultInfo> results,
+ List<List<Object>> items, boolean last) {
+ super(QRY_EXEC_MULT);
+ this.results = results;
+ this.items = items;
+ this.last = last;
+ }
+
+ /**
+ * @return Update counts of query IDs.
+ */
+ public List<JdbcResultInfo> results() {
+ return results;
+ }
+
+ /**
+ * @return Query result rows for the first query.
+ */
+ public List<List<Object>> items() {
+ return items;
+ }
+
+ /**
+ * @return Flag indicating the query has no unfetched results for the first query.
+ */
+ public boolean isLast() {
+ return last;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeBinary(BinaryWriterExImpl writer) throws BinaryObjectException {
+ super.writeBinary(writer);
+
+ if (results != null && results.size() > 0) {
+ writer.writeInt(results.size());
+
+ for (JdbcResultInfo r : results)
+ r.writeBinary(writer);
+
+ if (results.get(0).isQuery()) {
+ writer.writeBoolean(last);
+
+ JdbcUtils.writeItems(writer, items);
+ }
+ }
+ else
+ writer.writeInt(0);
+ }
+
+
+ /** {@inheritDoc} */
+ @Override public void readBinary(BinaryReaderExImpl reader) throws BinaryObjectException {
+ super.readBinary(reader);
+
+ int cnt = reader.readInt();
+
+ if (cnt == 0)
+ results = Collections.emptyList();
+ else {
+ results = new ArrayList<>(cnt);
+
+ for (int i = 0; i < cnt; ++i) {
+ JdbcResultInfo r = new JdbcResultInfo();
+
+ r.readBinary(reader);
+
+ results.add(r);
+ }
+
+ if (results.get(0).isQuery()) {
+ last = reader.readBoolean();
+
+ items = JdbcUtils.readItems(reader);
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(JdbcQueryExecuteMultipleStatementsResult.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/405749a7/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java
index ea25b11..202f813 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java
@@ -37,6 +37,7 @@ import org.apache.ignite.internal.binary.BinaryWriterExImpl;
import org.apache.ignite.internal.jdbc2.JdbcSqlFieldsQuery;
import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
+import org.apache.ignite.internal.processors.odbc.ClientListenerProtocolVersion;
import org.apache.ignite.internal.processors.odbc.ClientListenerRequest;
import org.apache.ignite.internal.processors.odbc.ClientListenerRequestHandler;
import org.apache.ignite.internal.processors.odbc.ClientListenerResponse;
@@ -102,6 +103,9 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler {
/** Automatic close of cursors. */
private final boolean autoCloseCursors;
+ /** Protocol version. */
+ private ClientListenerProtocolVersion protocolVer;
+
/**
* Constructor.
*
@@ -114,10 +118,11 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler {
* @param replicatedOnly Replicated only flag.
* @param autoCloseCursors Flag to automatically close server cursors.
* @param lazy Lazy query execution flag.
+ * @param protocolVer Protocol version.
*/
public JdbcRequestHandler(GridKernalContext ctx, GridSpinBusyLock busyLock, int maxCursors,
boolean distributedJoins, boolean enforceJoinOrder, boolean collocated, boolean replicatedOnly,
- boolean autoCloseCursors, boolean lazy) {
+ boolean autoCloseCursors, boolean lazy, ClientListenerProtocolVersion protocolVer) {
this.ctx = ctx;
this.busyLock = busyLock;
this.maxCursors = maxCursors;
@@ -127,6 +132,7 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler {
this.replicatedOnly = replicatedOnly;
this.autoCloseCursors = autoCloseCursors;
this.lazy = lazy;
+ this.protocolVer = protocolVer;
log = ctx.log(getClass());
}
@@ -284,31 +290,71 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler {
qry.setSchema(schemaName);
- FieldsQueryCursor<List<?>> qryCur = ctx.query().querySqlFieldsNoCache(qry, true);
+ List<FieldsQueryCursor<List<?>>> results = ctx.query().querySqlFieldsNoCache(qry, true,
+ protocolVer.compareTo(JdbcConnectionContext.VER_2_3_1) < 0);
+
+ if (results.size() == 1) {
+ FieldsQueryCursor<List<?>> qryCur = results.get(0);
+
+ JdbcQueryCursor cur = new JdbcQueryCursor(qryId, req.pageSize(), req.maxRows(), (QueryCursorImpl)qryCur);
+
+ JdbcQueryExecuteResult res;
+
+ if (cur.isQuery())
+ res = new JdbcQueryExecuteResult(qryId, cur.fetchRows(), !cur.hasNext());
+ else {
+ List<List<Object>> items = cur.fetchRows();
- JdbcQueryCursor cur = new JdbcQueryCursor(qryId, req.pageSize(), req.maxRows(), (QueryCursorImpl)qryCur);
+ assert items != null && items.size() == 1 && items.get(0).size() == 1
+ && items.get(0).get(0) instanceof Long :
+ "Invalid result set for not-SELECT query. [qry=" + sql +
+ ", res=" + S.toString(List.class, items) + ']';
- JdbcQueryExecuteResult res;
+ res = new JdbcQueryExecuteResult(qryId, (Long)items.get(0).get(0));
+ }
+
+ if (res.last() && (!res.isQuery() || autoCloseCursors))
+ cur.close();
+ else
+ qryCursors.put(qryId, cur);
- if (cur.isQuery())
- res = new JdbcQueryExecuteResult(qryId, cur.fetchRows(), !cur.hasNext());
+ return new JdbcResponse(res);
+ }
else {
- List<List<Object>> items = cur.fetchRows();
+ List<JdbcResultInfo> jdbcResults = new ArrayList<>(results.size());
+ List<List<Object>> items = null;
+ boolean last = true;
+
+ for (FieldsQueryCursor<List<?>> c : results) {
+ QueryCursorImpl qryCur = (QueryCursorImpl)c;
+
+ JdbcResultInfo jdbcRes;
- assert items != null && items.size() == 1 && items.get(0).size() == 1
- && items.get(0).get(0) instanceof Long :
- "Invalid result set for not-SELECT query. [qry=" + sql +
- ", res=" + S.toString(List.class, items) + ']';
+ if (qryCur.isQuery()) {
+ jdbcRes = new JdbcResultInfo(true, -1, qryId);
- res = new JdbcQueryExecuteResult(qryId, (Long)items.get(0).get(0));
+ JdbcQueryCursor cur = new JdbcQueryCursor(qryId, req.pageSize(), req.maxRows(),
+ (QueryCursorImpl)qryCur);
+
+ qryCursors.put(qryId, cur);
+
+ qryId = QRY_ID_GEN.getAndIncrement();
+
+ if (items == null) {
+ items = cur.fetchRows();
+ last = cur.hasNext();
+ }
+ }
+ else
+ jdbcRes = new JdbcResultInfo(false, (Long)((List<?>)qryCur.getAll().get(0)).get(0), -1);
+
+ jdbcResults.add(jdbcRes);
+ }
+
+ return new JdbcResponse(new JdbcQueryExecuteMultipleStatementsResult(jdbcResults, items, last));
}
- if (res.last() && (!res.isQuery() || autoCloseCursors))
- cur.close();
- else
- qryCursors.put(qryId, cur);
- return new JdbcResponse(res);
}
catch (Exception e) {
qryCursors.remove(qryId);
@@ -440,7 +486,7 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler {
qry.setSchema(schemaName);
QueryCursorImpl<List<?>> qryCur = (QueryCursorImpl<List<?>>)ctx.query()
- .querySqlFieldsNoCache(qry, true);
+ .querySqlFieldsNoCache(qry, true, true).get(0);
assert !qryCur.isQuery();
http://git-wip-us.apache.org/repos/asf/ignite/blob/405749a7/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcResult.java
index 202905b..c6c7438 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcResult.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcResult.java
@@ -56,6 +56,9 @@ public class JdbcResult implements JdbcRawBinarylizable {
/** Database schemas metadata result. */
static final byte META_SCHEMAS = 12;
+ /** Multiple statements query results. */
+ static final byte QRY_EXEC_MULT = 13;
+
/** Success status. */
private byte type;
@@ -139,6 +142,11 @@ public class JdbcResult implements JdbcRawBinarylizable {
break;
+ case QRY_EXEC_MULT:
+ res = new JdbcQueryExecuteMultipleStatementsResult();
+
+ break;
+
default:
throw new IgniteException("Unknown SQL listener request ID: [request ID=" + resId + ']');
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/405749a7/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcResultInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcResultInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcResultInfo.java
new file mode 100644
index 0000000..f0706e4
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcResultInfo.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.odbc.jdbc;
+
+import org.apache.ignite.internal.binary.BinaryReaderExImpl;
+import org.apache.ignite.internal.binary.BinaryWriterExImpl;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * JDBC statement result information. Keeps statement type (SELECT or UPDATE) and
+ * queryId or update count (depends on statement type).
+ */
+public class JdbcResultInfo implements JdbcRawBinarylizable {
+ /** Query flag. */
+ private boolean isQuery;
+
+ /** Update count. */
+ private long updCnt;
+
+ /** Query ID. */
+ private long qryId;
+
+ /**
+ * Default constructor is used for serialization.
+ */
+ JdbcResultInfo() {
+ // No-op.
+ }
+
+ /**
+ * @param isQuery Query flag.
+ * @param updCnt Update count.
+ * @param qryId Query ID.
+ */
+ public JdbcResultInfo(boolean isQuery, long updCnt, long qryId) {
+ this.isQuery = isQuery;
+ this.updCnt = updCnt;
+ this.qryId = qryId;
+ }
+
+ /**
+ * @return Query flag.
+ */
+ public boolean isQuery() {
+ return isQuery;
+ }
+
+ /**
+ * @return Query ID.
+ */
+ public long queryId() {
+ return qryId;
+ }
+
+ /**
+ * @return Update count.
+ */
+ public long updateCount() {
+ return updCnt;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeBinary(BinaryWriterExImpl writer) {
+ writer.writeBoolean(isQuery);
+ writer.writeLong(updCnt);
+ writer.writeLong(qryId);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readBinary(BinaryReaderExImpl reader) {
+ isQuery = reader.readBoolean();
+ updCnt = reader.readLong();
+ qryId = reader.readLong();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(JdbcResultInfo.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/405749a7/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
index cecc5dd..b8445ca 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
@@ -83,11 +83,14 @@ public interface GridQueryIndexing {
* @param keepBinary Keep binary flag.
* @param cancel Query cancel.
* @param mainCacheId Main cache ID.
+ * @param failOnMultipleStmts If {@code true} the method must throws exception when query contains
+ * more then one SQL statement.
* @return Cursor.
* @throws IgniteCheckedException If failed.
*/
- public FieldsQueryCursor<List<?>> queryDistributedSqlFields(String schemaName, SqlFieldsQuery qry,
- boolean keepBinary, GridQueryCancel cancel, @Nullable Integer mainCacheId) throws IgniteCheckedException;
+ public List<FieldsQueryCursor<List<?>>> queryDistributedSqlFields(String schemaName, SqlFieldsQuery qry,
+ boolean keepBinary, GridQueryCancel cancel, @Nullable Integer mainCacheId, boolean failOnMultipleStmts)
+ throws IgniteCheckedException;
/**
* Perform a MERGE statement using data streamer as receiver.
http://git-wip-us.apache.org/repos/asf/ignite/blob/405749a7/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index e8cc852..58c3ce9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -1867,7 +1867,8 @@ public class GridQueryProcessor extends GridProcessorAdapter {
if (cctx.config().getQueryParallelism() > 1) {
qry.setDistributedJoins(true);
- cur = idx.queryDistributedSqlFields(schemaName, qry, keepBinary, cancel, mainCacheId);
+ cur = idx.queryDistributedSqlFields(schemaName, qry,
+ keepBinary, cancel, mainCacheId, true).get(0);
}
else {
IndexingQueryFilter filter = idx.backupFilter(requestTopVer.get(), qry.getPartitions());
@@ -1884,7 +1885,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
else {
clo = new IgniteOutClosureX<FieldsQueryCursor<List<?>>>() {
@Override public FieldsQueryCursor<List<?>> applyx() throws IgniteCheckedException {
- return idx.queryDistributedSqlFields(schemaName, qry, keepBinary, null, mainCacheId);
+ return idx.queryDistributedSqlFields(schemaName, qry, keepBinary, null, mainCacheId, true).get(0);
}
};
}
@@ -1906,7 +1907,20 @@ public class GridQueryProcessor extends GridProcessorAdapter {
* @param keepBinary Keep binary flag.
* @return Cursor.
*/
- public FieldsQueryCursor<List<?>> querySqlFieldsNoCache(final SqlFieldsQuery qry, final boolean keepBinary) {
+ public FieldsQueryCursor<List<?>> querySqlFieldsNoCache(final SqlFieldsQuery qry,
+ final boolean keepBinary) {
+ return querySqlFieldsNoCache(qry, keepBinary, true).get(0);
+ }
+
+ /**
+ * Query SQL fields without strict dependency on concrete cache.
+ *
+ * @param qry Query.
+ * @param keepBinary Keep binary flag.
+ * @return Cursor.
+ */
+ public List<FieldsQueryCursor<List<?>>> querySqlFieldsNoCache(final SqlFieldsQuery qry,
+ final boolean keepBinary, final boolean failOnMultipleStmts) {
checkxEnabled();
validateSqlFieldsQuery(qry);
@@ -1921,11 +1935,13 @@ public class GridQueryProcessor extends GridProcessorAdapter {
throw new IllegalStateException("Failed to execute query (grid is stopping).");
try {
- IgniteOutClosureX<FieldsQueryCursor<List<?>>> clo = new IgniteOutClosureX<FieldsQueryCursor<List<?>>>() {
- @Override public FieldsQueryCursor<List<?>> applyx() throws IgniteCheckedException {
+ IgniteOutClosureX<List<FieldsQueryCursor<List<?>>>> clo =
+ new IgniteOutClosureX<List<FieldsQueryCursor<List<?>>>>() {
+ @Override public List<FieldsQueryCursor<List<?>>> applyx() throws IgniteCheckedException {
GridQueryCancel cancel = new GridQueryCancel();
- return idx.queryDistributedSqlFields(qry.getSchema(), qry, keepBinary, cancel, null);
+ return idx.queryDistributedSqlFields(qry.getSchema(), qry, keepBinary, cancel, null,
+ failOnMultipleStmts);
}
};
http://git-wip-us.apache.org/repos/asf/ignite/blob/405749a7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java
index c745d8a..1ebf556 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java
@@ -242,12 +242,12 @@ public class IgniteClientCacheInitializationFailTest extends GridCommonAbstractT
}
/** {@inheritDoc} */
- @Override public FieldsQueryCursor<List<?>> queryDistributedSqlFields(String schemaName, SqlFieldsQuery qry,
- boolean keepBinary, GridQueryCancel cancel, @Nullable Integer mainCacheId) throws IgniteCheckedException {
+ @Override public List<FieldsQueryCursor<List<?>>> queryDistributedSqlFields(String schemaName, SqlFieldsQuery qry,
+ boolean keepBinary, GridQueryCancel cancel,
+ @Nullable Integer mainCacheId, boolean failOnMultipleStmts) throws IgniteCheckedException {
return null;
}
-
/** {@inheritDoc} */
@Override public long streamUpdateQuery(String spaceName, String qry, @Nullable Object[] params,
IgniteDataStreamer<?, ?> streamer) throws IgniteCheckedException {
http://git-wip-us.apache.org/repos/asf/ignite/blob/405749a7/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
index f2f2fd4..ee1875f 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
@@ -150,7 +150,7 @@ public class DmlStatementsProcessor {
* Execute DML statement, possibly with few re-attempts in case of concurrent data modifications.
*
* @param schemaName Schema.
- * @param stmt JDBC statement.
+ * @param prepared Prepared JDBC statement.
* @param fieldsQry Original query.
* @param loc Query locality flag.
* @param filters Cache name and key filter.
@@ -158,13 +158,13 @@ public class DmlStatementsProcessor {
* @return Update result (modified items count and failed keys).
* @throws IgniteCheckedException if failed.
*/
- private UpdateResult updateSqlFields(String schemaName, PreparedStatement stmt, SqlFieldsQuery fieldsQry,
+ private UpdateResult updateSqlFields(String schemaName, Prepared prepared, SqlFieldsQuery fieldsQry,
boolean loc, IndexingQueryFilter filters, GridQueryCancel cancel) throws IgniteCheckedException {
Object[] errKeys = null;
long items = 0;
- UpdatePlan plan = getPlanForStatement(schemaName, stmt, null);
+ UpdatePlan plan = getPlanForStatement(schemaName, prepared, null);
GridCacheContext<?, ?> cctx = plan.tbl.rowDescriptor().context();
@@ -188,7 +188,7 @@ public class DmlStatementsProcessor {
UpdateResult r;
try {
- r = executeUpdateStatement(schemaName, cctx, stmt, fieldsQry, loc, filters, cancel, errKeys);
+ r = executeUpdateStatement(schemaName, cctx, prepared, fieldsQry, loc, filters, cancel, errKeys);
}
finally {
cctx.operationContextPerCall(opCtx);
@@ -213,16 +213,16 @@ public class DmlStatementsProcessor {
/**
* @param schemaName Schema.
- * @param stmt Prepared statement.
+ * @param p Prepared.
* @param fieldsQry Initial query
* @param cancel Query cancel.
* @return Update result wrapped into {@link GridQueryFieldsResult}
* @throws IgniteCheckedException if failed.
*/
@SuppressWarnings("unchecked")
- QueryCursorImpl<List<?>> updateSqlFieldsDistributed(String schemaName, PreparedStatement stmt,
+ QueryCursorImpl<List<?>> updateSqlFieldsDistributed(String schemaName, Prepared p,
SqlFieldsQuery fieldsQry, GridQueryCancel cancel) throws IgniteCheckedException {
- UpdateResult res = updateSqlFields(schemaName, stmt, fieldsQry, false, null, cancel);
+ UpdateResult res = updateSqlFields(schemaName, p, fieldsQry, false, null, cancel);
QueryCursorImpl<List<?>> resCur = (QueryCursorImpl<List<?>>)new QueryCursorImpl(Collections.singletonList
(Collections.singletonList(res.cnt)), cancel, false);
@@ -247,7 +247,8 @@ public class DmlStatementsProcessor {
GridQueryFieldsResult updateSqlFieldsLocal(String schemaName, PreparedStatement stmt,
SqlFieldsQuery fieldsQry, IndexingQueryFilter filters, GridQueryCancel cancel)
throws IgniteCheckedException {
- UpdateResult res = updateSqlFields(schemaName, stmt, fieldsQry, true, filters, cancel);
+ UpdateResult res = updateSqlFields(schemaName, GridSqlQueryParser.prepared(stmt), fieldsQry, true,
+ filters, cancel);
return new GridQueryFieldsResultAdapter(UPDATE_RESULT_META,
new IgniteSingletonIterator(Collections.singletonList(res.cnt)));
@@ -340,22 +341,24 @@ public class DmlStatementsProcessor {
*
* @param schemaName Schema name.
* @param cctx Cache context.
- * @param prepStmt Prepared statement for DML query.
+ * @param prepared Prepared statement for DML query.
* @param fieldsQry Fields query.
+ * @param loc Local query flag.
* @param filters Cache name and key filter.
+ * @param cancel Query cancel state holder.
* @param failedKeys Keys to restrict UPDATE and DELETE operations with. Null or empty array means no restriction.
* @return Pair [number of successfully processed items; keys that have failed to be processed]
* @throws IgniteCheckedException if failed.
*/
@SuppressWarnings({"ConstantConditions", "unchecked"})
private UpdateResult executeUpdateStatement(String schemaName, final GridCacheContext cctx,
- PreparedStatement prepStmt, SqlFieldsQuery fieldsQry, boolean loc, IndexingQueryFilter filters,
+ Prepared prepared, SqlFieldsQuery fieldsQry, boolean loc, IndexingQueryFilter filters,
GridQueryCancel cancel, Object[] failedKeys) throws IgniteCheckedException {
int mainCacheId = CU.cacheId(cctx.name());
Integer errKeysPos = null;
- UpdatePlan plan = getPlanForStatement(schemaName, prepStmt, errKeysPos);
+ UpdatePlan plan = getPlanForStatement(schemaName, prepared, errKeysPos);
if (plan.fastUpdateArgs != null) {
assert F.isEmpty(failedKeys) && errKeysPos == null;
@@ -378,8 +381,8 @@ public class DmlStatementsProcessor {
.setPageSize(fieldsQry.getPageSize())
.setTimeout(fieldsQry.getTimeout(), TimeUnit.MILLISECONDS);
- cur = (QueryCursorImpl<List<?>>) idx.queryDistributedSqlFields(schemaName, newFieldsQry, true, cancel,
- mainCacheId);
+ cur = (QueryCursorImpl<List<?>>)idx.queryDistributedSqlFields(schemaName, newFieldsQry, true,
+ cancel, mainCacheId, true).get(0);
}
else {
final GridQueryFieldsResult res = idx.queryLocalSqlFields(schemaName, plan.selectQry,
@@ -423,14 +426,12 @@ public class DmlStatementsProcessor {
* if available.
*
* @param schema Schema.
- * @param prepStmt JDBC statement.
+ * @param p Prepared JDBC statement.
* @return Update plan.
*/
@SuppressWarnings({"unchecked", "ConstantConditions"})
- private UpdatePlan getPlanForStatement(String schema, PreparedStatement prepStmt, @Nullable Integer errKeysPos)
+ private UpdatePlan getPlanForStatement(String schema, Prepared p, @Nullable Integer errKeysPos)
throws IgniteCheckedException {
- Prepared p = GridSqlQueryParser.prepared(prepStmt);
-
H2DmlPlanKey planKey = new H2DmlPlanKey(schema, p.getSQL());
UpdatePlan res = (errKeysPos == null ? planCache.get(planKey) : null);
http://git-wip-us.apache.org/repos/asf/ignite/blob/405749a7/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 00a37ce..9e6a1fa 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -145,7 +145,6 @@ import org.h2.command.dml.Insert;
import org.h2.engine.Session;
import org.h2.engine.SysProperties;
import org.h2.index.Index;
-import org.h2.jdbc.JdbcPreparedStatement;
import org.h2.jdbc.JdbcStatement;
import org.h2.server.web.WebServer;
import org.h2.table.IndexColumn;
@@ -1244,7 +1243,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
if (qry.getTimeout() > 0)
fqry.setTimeout(qry.getTimeout(), TimeUnit.MILLISECONDS);
- final QueryCursor<List<?>> res = queryDistributedSqlFields(schemaName, fqry, keepBinary, null, mainCacheId);
+ final QueryCursor<List<?>> res =
+ queryDistributedSqlFields(schemaName, fqry, keepBinary, null, mainCacheId, true).get(0);
final Iterable<Cache.Entry<K, V>> converted = new Iterable<Cache.Entry<K, V>>() {
@Override public Iterator<Cache.Entry<K, V>> iterator() {
@@ -1277,10 +1277,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
/** {@inheritDoc} */
- @Override public FieldsQueryCursor<List<?>> queryDistributedSqlFields(String schemaName,
- SqlFieldsQuery qry, boolean keepBinary, GridQueryCancel cancel, @Nullable Integer mainCacheId) {
- final String sqlQry = qry.getSql();
-
+ @Override public List<FieldsQueryCursor<List<?>>> queryDistributedSqlFields(String schemaName, SqlFieldsQuery qry,
+ boolean keepBinary, GridQueryCancel cancel, @Nullable Integer mainCacheId, boolean failOnMultipleStmts) {
Connection c = connectionForSchema(schemaName);
final boolean enforceJoinOrder = qry.isEnforceJoinOrder();
@@ -1289,18 +1287,38 @@ public class IgniteH2Indexing implements GridQueryIndexing {
final DistributedJoinMode distributedJoinMode = distributedJoinMode(qry.isLocal(), distributedJoins);
- GridCacheTwoStepQuery twoStepQry = null;
- List<GridQueryFieldMetadata> meta;
+ String sqlQry = qry.getSql();
- final H2TwoStepCachedQueryKey cachedQryKey = new H2TwoStepCachedQueryKey(schemaName, sqlQry, grpByCollocated,
+ H2TwoStepCachedQueryKey cachedQryKey = new H2TwoStepCachedQueryKey(schemaName, sqlQry, grpByCollocated,
distributedJoins, enforceJoinOrder, qry.isLocal());
H2TwoStepCachedQuery cachedQry = twoStepCache.get(cachedQryKey);
if (cachedQry != null) {
- twoStepQry = cachedQry.query().copy();
- meta = cachedQry.meta();
+ checkQueryType(qry, true);
+
+ GridCacheTwoStepQuery twoStepQry = cachedQry.query().copy();
+
+ List<GridQueryFieldMetadata> meta = cachedQry.meta();
+
+ List<FieldsQueryCursor<List<?>>> res = Collections.singletonList(executeTwoStepsQuery(schemaName, qry.getPageSize(), qry.getPartitions(),
+ qry.getArgs(), keepBinary, qry.isLazy(), qry.getTimeout(), cancel, sqlQry, enforceJoinOrder,
+ twoStepQry, meta));
+
+ return res;
}
- else {
+
+ List<FieldsQueryCursor<List<?>>> res = new ArrayList<>(1);
+
+ Object[] argsOrig = qry.getArgs();
+ int firstArg = 0;
+ Object[] args;
+ String remainingSql = sqlQry;
+
+ while (remainingSql != null) {
+ args = null;
+ GridCacheTwoStepQuery twoStepQry = null;
+ List<GridQueryFieldMetadata> meta;
+
final UUID locNodeId = ctx.localNodeId();
// Here we will just parse the statement, no need to optimize it at all.
@@ -1319,16 +1337,16 @@ public class IgniteH2Indexing implements GridQueryIndexing {
while (true) {
try {
// Do not cache this statement because the whole query object will be cached later on.
- stmt = prepareStatement(c, sqlQry, false);
+ stmt = prepareStatement(c, remainingSql, false);
break;
}
catch (SQLException e) {
if (!cachesCreated && (
e.getErrorCode() == ErrorCode.SCHEMA_NOT_FOUND_1 ||
- e.getErrorCode() == ErrorCode.TABLE_OR_VIEW_NOT_FOUND_1 ||
- e.getErrorCode() == ErrorCode.INDEX_NOT_FOUND_1)
- ) {
+ e.getErrorCode() == ErrorCode.TABLE_OR_VIEW_NOT_FOUND_1 ||
+ e.getErrorCode() == ErrorCode.INDEX_NOT_FOUND_1)
+ ) {
try {
ctx.cache().createMissingQueryCaches();
}
@@ -1344,19 +1362,59 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
}
- prepared = GridSqlQueryParser.prepared(stmt);
+ GridSqlQueryParser.PreparedWithRemaining prep = GridSqlQueryParser.preparedWithRemaining(stmt);
+
+ // remaining == null if the query string contains single SQL statement.
+ remainingSql = prep.remainingSql();
+
+ if (remainingSql != null && failOnMultipleStmts)
+ throw new IgniteSQLException("Multiple statements queries are not supported");
+
+ sqlQry = prep.prepared().getSQL();
+
+ prepared = prep.prepared();
+
+ int paramsCnt = prepared.getParameters().size();
+
+ if (paramsCnt > 0) {
+ if (argsOrig == null || argsOrig.length < firstArg + paramsCnt) {
+ throw new IgniteException("Invalid number of query parameters. " +
+ "Cannot find " + (argsOrig.length + 1 - firstArg) + " parameter.");
+ }
+
+ args = Arrays.copyOfRange(argsOrig, firstArg, firstArg + paramsCnt);
+
+ firstArg += paramsCnt;
+ }
+
+ cachedQryKey = new H2TwoStepCachedQueryKey(schemaName, sqlQry, grpByCollocated,
+ distributedJoins, enforceJoinOrder, qry.isLocal());
+
+ cachedQry = twoStepCache.get(cachedQryKey);
+
+ if (cachedQry != null) {
+ checkQueryType(qry, true);
+
+ twoStepQry = cachedQry.query().copy();
+ meta = cachedQry.meta();
- if (qry instanceof JdbcSqlFieldsQuery && ((JdbcSqlFieldsQuery) qry).isQuery() != prepared.isQuery())
- throw new IgniteSQLException("Given statement type does not match that declared by JDBC driver",
- IgniteQueryErrorCode.STMT_TYPE_MISMATCH);
+ res.add(executeTwoStepsQuery(schemaName, qry.getPageSize(), qry.getPartitions(), args, keepBinary,
+ qry.isLazy(), qry.getTimeout(), cancel, sqlQry, enforceJoinOrder,
+ twoStepQry, meta));
- if (prepared.isQuery()) {
- bindParameters(stmt, F.asList(qry.getArgs()));
+ continue;
+ }
+ else {
+ checkQueryType(qry, prepared.isQuery());
+
+ if (prepared.isQuery()) {
+ bindParameters(stmt, F.asList(args));
- twoStepQry = GridSqlQuerySplitter.split((JdbcPreparedStatement)stmt, qry.getArgs(),
- grpByCollocated, distributedJoins, enforceJoinOrder, this);
+ twoStepQry = GridSqlQuerySplitter.split(c, prepared, args,
+ grpByCollocated, distributedJoins, enforceJoinOrder, this);
- assert twoStepQry != null;
+ assert twoStepQry != null;
+ }
}
}
finally {
@@ -1367,17 +1425,22 @@ public class IgniteH2Indexing implements GridQueryIndexing {
if (twoStepQry == null) {
if (DmlStatementsProcessor.isDmlStatement(prepared)) {
try {
- return dmlProc.updateSqlFieldsDistributed(schemaName, stmt, qry, cancel);
+ res.add(dmlProc.updateSqlFieldsDistributed(schemaName, prepared,
+ new SqlFieldsQuery(qry).setSql(sqlQry).setArgs(args), cancel));
+
+ continue;
}
catch (IgniteCheckedException e) {
throw new IgniteSQLException("Failed to execute DML statement [stmt=" + sqlQry +
- ", params=" + Arrays.deepToString(qry.getArgs()) + "]", e);
+ ", params=" + Arrays.deepToString(args) + "]", e);
}
}
if (DdlStatementsProcessor.isDdlStatement(prepared)) {
try {
- return ddlProc.runDdlStatement(sqlQry, stmt);
+ res.add(ddlProc.runDdlStatement(sqlQry, prepared));
+
+ continue;
}
catch (IgniteCheckedException e) {
throw new IgniteSQLException("Failed to execute DDL statement [stmt=" + sqlQry + ']', e);
@@ -1428,39 +1491,75 @@ public class IgniteH2Indexing implements GridQueryIndexing {
finally {
U.close(stmt, log);
}
+
+ res.add(executeTwoStepsQuery(schemaName, qry.getPageSize(), qry.getPartitions(), args, keepBinary,
+ qry.isLazy(), qry.getTimeout(), cancel, sqlQry, enforceJoinOrder,
+ twoStepQry, meta));
+
+ if (cachedQry == null && !twoStepQry.explain()) {
+ cachedQry = new H2TwoStepCachedQuery(meta, twoStepQry.copy());
+
+ twoStepCache.putIfAbsent(cachedQryKey, cachedQry);
+ }
}
+ return res;
+ }
+
+ /**
+ * Check expected statement type (when it is set by JDBC) and given statement type.
+ *
+ * @param qry Query.
+ * @param isQry {@code true} for select queries, otherwise (DML/DDL queries) {@code false}.
+ */
+ private void checkQueryType(SqlFieldsQuery qry, boolean isQry) {
+ if (qry instanceof JdbcSqlFieldsQuery && ((JdbcSqlFieldsQuery)qry).isQuery() != isQry)
+ throw new IgniteSQLException("Given statement type does not match that declared by JDBC driver",
+ IgniteQueryErrorCode.STMT_TYPE_MISMATCH);
+ }
+
+ /**
+ * @param schemaName Schema name.
+ * @param pageSize Page size.
+ * @param partitions Partitions.
+ * @param args Arguments.
+ * @param keepBinary Keep binary flag.
+ * @param lazy Lazy flag.
+ * @param timeout Timeout.
+ * @param cancel Cancel.
+ * @param sqlQry SQL query string.
+ * @param enforceJoinOrder Enforce join orded flag.
+ * @param twoStepQry Two-steps query.
+ * @param meta Metadata.
+ * @return Cursor.
+ */
+ private FieldsQueryCursor<List<?>> executeTwoStepsQuery(String schemaName, int pageSize, int partitions[],
+ Object[] args, boolean keepBinary, boolean lazy, int timeout,
+ GridQueryCancel cancel, String sqlQry, boolean enforceJoinOrder, GridCacheTwoStepQuery twoStepQry,
+ List<GridQueryFieldMetadata> meta) {
if (log.isDebugEnabled())
log.debug("Parsed query: `" + sqlQry + "` into two step query: " + twoStepQry);
- twoStepQry.pageSize(qry.getPageSize());
+ twoStepQry.pageSize(pageSize);
if (cancel == null)
cancel = new GridQueryCancel();
- int partitions[] = qry.getPartitions();
-
if (partitions == null && twoStepQry.derivedPartitions() != null) {
try {
- partitions = calculateQueryPartitions(twoStepQry.derivedPartitions(), qry.getArgs());
+ partitions = calculateQueryPartitions(twoStepQry.derivedPartitions(), args);
} catch (IgniteCheckedException e) {
throw new CacheException("Failed to calculate derived partitions: [qry=" + sqlQry + ", params=" +
- Arrays.deepToString(qry.getArgs()) + "]", e);
+ Arrays.deepToString(args) + "]", e);
}
}
QueryCursorImpl<List<?>> cursor = new QueryCursorImpl<>(
- runQueryTwoStep(schemaName, twoStepQry, keepBinary, enforceJoinOrder, qry.getTimeout(), cancel,
- qry.getArgs(), partitions, qry.isLazy()), cancel);
+ runQueryTwoStep(schemaName, twoStepQry, keepBinary, enforceJoinOrder, timeout, cancel,
+ args, partitions, lazy), cancel);
cursor.fieldsMeta(meta);
- if (cachedQry == null && !twoStepQry.explain()) {
- cachedQry = new H2TwoStepCachedQuery(meta, twoStepQry.copy());
-
- twoStepCache.putIfAbsent(cachedQryKey, cachedQry);
- }
-
return cursor;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/405749a7/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlStatementsProcessor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlStatementsProcessor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlStatementsProcessor.java
index 4c3264c..8a901dc 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlStatementsProcessor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlStatementsProcessor.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.processors.query.h2.ddl;
-import java.sql.PreparedStatement;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
@@ -59,7 +58,6 @@ import org.h2.command.ddl.CreateIndex;
import org.h2.command.ddl.CreateTable;
import org.h2.command.ddl.DropIndex;
import org.h2.command.ddl.DropTable;
-import org.h2.jdbc.JdbcPreparedStatement;
import org.h2.table.Column;
import org.h2.value.DataType;
@@ -91,17 +89,18 @@ public class DdlStatementsProcessor {
* Execute DDL statement.
*
* @param sql SQL.
- * @param stmt H2 statement to parse and execute.
+ * @param prepared Prepared.
+ * @return Cursor on query results.
+ * @throws IgniteCheckedException On error.
*/
@SuppressWarnings({"unchecked", "ThrowableResultOfMethodCallIgnored"})
- public FieldsQueryCursor<List<?>> runDdlStatement(String sql, PreparedStatement stmt)
+ public FieldsQueryCursor<List<?>> runDdlStatement(String sql, Prepared prepared)
throws IgniteCheckedException {
- assert stmt instanceof JdbcPreparedStatement;
IgniteInternalFuture fut = null;
try {
- GridSqlStatement stmt0 = new GridSqlQueryParser(false).parse(GridSqlQueryParser.prepared(stmt));
+ GridSqlStatement stmt0 = new GridSqlQueryParser(false).parse(prepared);
if (stmt0 instanceof GridSqlCreateIndex) {
GridSqlCreateIndex cmd = (GridSqlCreateIndex)stmt0;
http://git-wip-us.apache.org/repos/asf/ignite/blob/405749a7/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java
index a379a91..3d7a1a0 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java
@@ -397,6 +397,29 @@ public class GridSqlQueryParser {
/** */
private static final Getter<Column, Expression> COLUMN_CHECK_CONSTRAINT = getter(Column.class, "checkConstraint");
+ /** Class for private class: 'org.h2.command.CommandList'. */
+ private static final Class<? extends Command> CLS_COMMAND_LIST;
+
+ /** */
+ private static final Getter<Command, Command> LIST_COMMAND;
+
+ /** */
+ private static final Getter<Command, String> REMAINING;
+
+ static {
+ try {
+ CLS_COMMAND_LIST = (Class<? extends Command>)CommandContainer.class.getClassLoader()
+ .loadClass("org.h2.command.CommandList");
+
+ LIST_COMMAND = getter(CLS_COMMAND_LIST, "command");
+
+ REMAINING = getter(CLS_COMMAND_LIST, "remaining");
+ }
+ catch (ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
/** */
private static final Getter<AlterTableAlterColumn, String> ALTER_COLUMN_TBL_NAME =
getter(AlterTableAlterColumn.class, "tableName");
@@ -485,6 +508,26 @@ public class GridSqlQueryParser {
}
/**
+ * @param stmt Prepared statement.
+ * @return Parsed select.
+ */
+ public static PreparedWithRemaining preparedWithRemaining(PreparedStatement stmt) {
+ Command cmd = COMMAND.get((JdbcPreparedStatement)stmt);
+
+ if (cmd instanceof CommandContainer)
+ return new PreparedWithRemaining(PREPARED.get(cmd), null);
+ else {
+ Class<?> cmdCls = cmd.getClass();
+
+ if (cmdCls.getName().equals("org.h2.command.CommandList")) {
+ return new PreparedWithRemaining(PREPARED.get(LIST_COMMAND.get(cmd)), REMAINING.get(cmd));
+ }
+ else
+ throw new IgniteSQLException("Unexpected statement command");
+ }
+ }
+
+ /**
* @param qry Query expression to parse.
* @return Subquery AST.
*/
@@ -1798,4 +1841,42 @@ public class GridSqlQueryParser {
}
}
}
+
+ /**
+ *
+ */
+ public static class PreparedWithRemaining {
+ /** Prepared. */
+ private Prepared prepared;
+
+ /** Remaining sql. */
+ private String remainingSql;
+
+ /**
+ * @param prepared Prepared.
+ * @param sql Remaining SQL.
+ */
+ public PreparedWithRemaining(Prepared prepared, String sql) {
+ this.prepared = prepared;
+
+ if (sql != null)
+ sql = sql.trim();
+
+ remainingSql = !F.isEmpty(sql) ? sql : null;
+ }
+
+ /**
+ * @return Prepared.
+ */
+ public Prepared prepared() {
+ return prepared;
+ }
+
+ /**
+ * @return Remaining SQL.
+ */
+ public String remainingSql() {
+ return remainingSql;
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/405749a7/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
index b20cbd5..7f28203 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
@@ -51,7 +51,6 @@ import org.apache.ignite.internal.util.typedef.internal.S;
import org.h2.command.Prepared;
import org.h2.command.dml.Query;
import org.h2.command.dml.SelectUnion;
-import org.h2.jdbc.JdbcPreparedStatement;
import org.h2.table.IndexColumn;
import org.h2.value.Value;
import org.jetbrains.annotations.Nullable;
@@ -172,7 +171,8 @@ public class GridSqlQuerySplitter {
}
/**
- * @param stmt Prepared statement.
+ * @param conn Connection.
+ * @param prepared Prepared.
* @param params Parameters.
* @param collocatedGrpBy Whether the query has collocated GROUP BY keys.
* @param distributedJoins If distributed joins enabled.
@@ -183,7 +183,8 @@ public class GridSqlQuerySplitter {
* @throws IgniteCheckedException If failed.
*/
public static GridCacheTwoStepQuery split(
- JdbcPreparedStatement stmt,
+ Connection conn,
+ Prepared prepared,
Object[] params,
boolean collocatedGrpBy,
boolean distributedJoins,
@@ -195,7 +196,7 @@ public class GridSqlQuerySplitter {
// Here we will just do initial query parsing. Do not use optimized
// subqueries because we do not have unique FROM aliases yet.
- GridSqlQuery qry = parse(prepared(stmt), false);
+ GridSqlQuery qry = parse(prepared, false);
String originalSql = qry.getSQL();
@@ -213,8 +214,6 @@ public class GridSqlQuerySplitter {
// debug("NORMALIZED", qry.getSQL());
- Connection conn = stmt.getConnection();
-
// Here we will have correct normalized AST with optimized join order.
// The distributedJoins parameter is ignored because it is not relevant for
// the REDUCE query optimization.
@@ -234,12 +233,12 @@ public class GridSqlQuerySplitter {
boolean allCollocated = true;
for (GridCacheSqlQuery mapSqlQry : splitter.mapSqlQrys) {
- Prepared prepared = optimize(h2, conn, mapSqlQry.query(), mapSqlQry.parameters(params),
+ Prepared prepared0 = optimize(h2, conn, mapSqlQry.query(), mapSqlQry.parameters(params),
true, enforceJoinOrder);
- allCollocated &= isCollocated((Query)prepared);
+ allCollocated &= isCollocated((Query)prepared0);
- mapSqlQry.query(parse(prepared, true).getSQL());
+ mapSqlQry.query(parse(prepared0, true).getSQL());
}
// We do not need distributed joins if all MAP queries are collocated.
http://git-wip-us.apache.org/repos/asf/ignite/blob/405749a7/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/MultipleStatementsSqlQuerySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/MultipleStatementsSqlQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/MultipleStatementsSqlQuerySelfTest.java
new file mode 100644
index 0000000..8b9bf40
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/MultipleStatementsSqlQuerySelfTest.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query;
+
+import java.util.List;
+import java.util.concurrent.Callable;
+import org.apache.ignite.cache.query.FieldsQueryCursor;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ * Tests for schemas.
+ */
+public class MultipleStatementsSqlQuerySelfTest extends GridCommonAbstractTest {
+ /** Node. */
+ private IgniteEx node;
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ node = (IgniteEx)startGrid();
+
+ startGrid(2);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+ }
+
+ /**
+ * Test query without caches.
+ *
+ * @throws Exception If failed.
+ */
+ public void testQuery() throws Exception {
+ GridQueryProcessor qryProc = node.context().query();
+
+ SqlFieldsQuery qry = new SqlFieldsQuery(
+ "create table test(ID int primary key, NAME varchar(20)); " +
+ "insert into test (ID, NAME) values (1, 'name_1');" +
+ "insert into test (ID, NAME) values (2, 'name_2'), (3, 'name_3');" +
+ "select * from test;")
+ .setSchema("PUBLIC");
+
+ List<FieldsQueryCursor<List<?>>> res = qryProc.querySqlFieldsNoCache(qry, true, false);
+
+ assert res.size() == 4 : "Unexpected cursors count: " + res.size();
+
+ assert !((QueryCursorImpl)res.get(0)).isQuery() : "Results of DDL statement is expected ";
+
+ List<List<?>> rows = res.get(1).getAll();
+
+ assert !((QueryCursorImpl)res.get(1)).isQuery() : "Results of DDL statement is expected ";
+ assert Long.valueOf(1).equals(rows.get(0).get(0)) : "1 row must be updated. [actual=" + rows.get(0).get(0) + ']';
+
+ rows = res.get(2).getAll();
+
+ assert !((QueryCursorImpl)res.get(2)).isQuery() : "Results of DML statement is expected ";
+ assert Long.valueOf(2).equals(rows.get(0).get(0)) : "2 row must be updated";
+
+ rows = res.get(3).getAll();
+
+ assert ((QueryCursorImpl)res.get(3)).isQuery() : "Results of SELECT statement is expected ";
+
+ assert rows.size() == 3 : "Invalid rows count: " + rows.size();
+
+ for (int i = 0; i < rows.size(); ++i) {
+ assert Integer.valueOf(1).equals(rows.get(i).get(0))
+ || Integer.valueOf(2).equals(rows.get(i).get(0))
+ || Integer.valueOf(3).equals(rows.get(i).get(0))
+ : "Invalid ID: " + rows.get(i).get(0);
+ }
+ }
+
+ /**
+ * Test query without caches.
+ *
+ * @throws Exception If failed.
+ */
+ public void testQueryWithParameters() throws Exception {
+ GridQueryProcessor qryProc = node.context().query();
+
+ SqlFieldsQuery qry = new SqlFieldsQuery(
+ "create table test(ID int primary key, NAME varchar(20)); " +
+ "insert into test (ID, NAME) values (?, ?);" +
+ "insert into test (ID, NAME) values (?, ?), (?, ?);" +
+ "select * from test;")
+ .setSchema("PUBLIC")
+ .setArgs(1, "name_1", 2, "name2", 3, "name_3");
+
+ List<FieldsQueryCursor<List<?>>> res = qryProc.querySqlFieldsNoCache(qry, true, false);
+
+ assert res.size() == 4 : "Unexpected cursors count: " + res.size();
+
+ assert !((QueryCursorImpl)res.get(0)).isQuery() : "Results of DDL statement is expected ";
+
+ List<List<?>> rows = res.get(1).getAll();
+
+ assert !((QueryCursorImpl)res.get(1)).isQuery() : "Results of DDL statement is expected ";
+ assert Long.valueOf(1).equals(rows.get(0).get(0)) : "1 row must be updated. [actual=" + rows.get(0).get(0) + ']';
+
+ rows = res.get(2).getAll();
+
+ assert !((QueryCursorImpl)res.get(2)).isQuery() : "Results of DML statement is expected ";
+ assert Long.valueOf(2).equals(rows.get(0).get(0)) : "2 row must be updated";
+
+ rows = res.get(3).getAll();
+
+ assert ((QueryCursorImpl)res.get(3)).isQuery() : "Results of SELECT statement is expected ";
+
+ assert rows.size() == 3 : "Invalid rows count: " + rows.size();
+
+ for (int i = 0; i < rows.size(); ++i) {
+ assert Integer.valueOf(1).equals(rows.get(i).get(0))
+ || Integer.valueOf(2).equals(rows.get(i).get(0))
+ || Integer.valueOf(3).equals(rows.get(i).get(0))
+ : "Invalid ID: " + rows.get(i).get(0);
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testQueryMultipleStatementsFailed() throws Exception {
+ final SqlFieldsQuery qry = new SqlFieldsQuery("select 1; select 1;").setSchema("PUBLIC");
+
+ GridTestUtils.assertThrows(log,
+ new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ node.context().query().querySqlFieldsNoCache(qry, true);
+
+ return null;
+ }
+ }, IgniteSQLException.class, "Multiple statements queries are not supported");
+ }
+}
\ No newline at end of file