You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by nt...@apache.org on 2017/08/15 18:00:06 UTC
[05/29] ignite git commit: IGNITE-5126: Batch support for this JDBC
driver. This closes #2162.
IGNITE-5126: Batch support for this JDBC driver. This closes #2162.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0f22223b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0f22223b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0f22223b
Branch: refs/heads/ignite-5947
Commit: 0f22223b7ca25313083e4dc35e7842931a655abd
Parents: 3fdf453
Author: tledkov-gridgain <tl...@gridgain.com>
Authored: Fri Aug 4 11:46:14 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Fri Aug 4 12:04:07 2017 +0300
----------------------------------------------------------------------
.../jdbc/suite/IgniteJdbcDriverTestSuite.java | 2 +
.../ignite/jdbc/thin/JdbcThinBatchSelfTest.java | 333 +++++++++++++++++++
.../jdbc/thin/JdbcThinPreparedStatement.java | 16 +-
.../internal/jdbc/thin/JdbcThinStatement.java | 46 ++-
.../internal/jdbc/thin/JdbcThinTcpIo.java | 20 ++
.../odbc/jdbc/JdbcBatchExecuteRequest.java | 109 ++++++
.../odbc/jdbc/JdbcBatchExecuteResult.java | 96 ++++++
.../processors/odbc/jdbc/JdbcQuery.java | 95 ++++++
.../processors/odbc/jdbc/JdbcRequest.java | 8 +
.../odbc/jdbc/JdbcRequestHandler.java | 66 +++-
.../processors/odbc/jdbc/JdbcResult.java | 11 +
11 files changed, 794 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/0f22223b/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java
index 8ca3d45..cf7ee8f 100644
--- a/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java
+++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java
@@ -33,6 +33,7 @@ import org.apache.ignite.jdbc.JdbcPreparedStatementSelfTest;
import org.apache.ignite.jdbc.JdbcResultSetSelfTest;
import org.apache.ignite.jdbc.JdbcStatementSelfTest;
import org.apache.ignite.jdbc.thin.JdbcThinAutoCloseServerCursorTest;
+import org.apache.ignite.jdbc.thin.JdbcThinBatchSelfTest;
import org.apache.ignite.jdbc.thin.JdbcThinComplexQuerySelfTest;
import org.apache.ignite.jdbc.thin.JdbcThinConnectionSelfTest;
import org.apache.ignite.jdbc.thin.JdbcThinDeleteStatementSelfTest;
@@ -121,6 +122,7 @@ public class IgniteJdbcDriverTestSuite extends TestSuite {
suite.addTest(new TestSuite(JdbcThinMergeStatementSelfTest.class));
suite.addTest(new TestSuite(JdbcThinDeleteStatementSelfTest.class));
suite.addTest(new TestSuite(JdbcThinAutoCloseServerCursorTest.class));
+ suite.addTest(new TestSuite(JdbcThinBatchSelfTest.class));
// New thin JDBC driver, DDL tests
suite.addTest(new TestSuite(JdbcThinDynamicIndexAtomicPartitionedNearSelfTest.class));
http://git-wip-us.apache.org/repos/asf/ignite/blob/0f22223b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBatchSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBatchSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBatchSelfTest.java
new file mode 100644
index 0000000..5781e00
--- /dev/null
+++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBatchSelfTest.java
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.jdbc.thin;
+
+import java.sql.BatchUpdateException;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.concurrent.Callable;
+import org.apache.ignite.testframework.GridTestUtils;
+
+/**
+ * Statement test.
+ */
+public class JdbcThinBatchSelfTest extends JdbcThinAbstractDmlStatementSelfTest {
+ /** SQL query. */
+ private static final String SQL_PREPARED = "insert into Person(_key, id, firstName, lastName, age) values " +
+ "(?, ?, ?, ?, ?)";
+
+ /** Statement. */
+ private Statement stmt;
+
+ /** Prepared statement. */
+ private PreparedStatement pstmt;
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ stmt = conn.createStatement();
+
+ pstmt = conn.prepareStatement(SQL_PREPARED);
+
+ assertNotNull(stmt);
+ assertFalse(stmt.isClosed());
+
+ assertNotNull(pstmt);
+ assertFalse(pstmt.isClosed());
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ if (stmt != null && !stmt.isClosed())
+ stmt.close();
+
+ if (pstmt != null && !pstmt.isClosed())
+ pstmt.close();
+
+ assertTrue(pstmt.isClosed());
+ assertTrue(stmt.isClosed());
+
+ super.afterTest();
+ }
+
+ /**
+ * @throws SQLException If failed.
+ */
+ public void testBatch() throws SQLException {
+ final int BATCH_SIZE = 10;
+
+ for (int idx = 0, i = 0; i < BATCH_SIZE; ++i, idx += i) {
+ stmt.addBatch("insert into Person (_key, id, firstName, lastName, age) values "
+ + generateValues(idx, i + 1));
+ }
+
+ int [] updCnts = stmt.executeBatch();
+
+ assertEquals("Invalid update counts size", BATCH_SIZE, updCnts.length);
+
+ for (int i = 0; i < BATCH_SIZE; ++i)
+ assertEquals("Invalid update count",i + 1, updCnts[i]);
+ }
+
+ /**
+ * @throws SQLException If failed.
+ */
+ public void testBatchOnClosedStatement() throws SQLException {
+ final Statement stmt2 = conn.createStatement();
+ final PreparedStatement pstmt2 = conn.prepareStatement("");
+
+ stmt2.close();
+ pstmt2.close();
+
+ GridTestUtils.assertThrows(log, new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ stmt2.addBatch("");
+
+ return null;
+ }
+ }, SQLException.class, "Statement is closed.");
+
+ GridTestUtils.assertThrows(log, new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ stmt2.clearBatch();
+
+ return null;
+ }
+ }, SQLException.class, "Statement is closed.");
+
+ GridTestUtils.assertThrows(log, new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ stmt2.executeBatch();
+
+ return null;
+ }
+ }, SQLException.class, "Statement is closed.");
+
+ GridTestUtils.assertThrows(log, new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ pstmt2.addBatch();
+
+ return null;
+ }
+ }, SQLException.class, "Statement is closed.");
+
+ GridTestUtils.assertThrows(log, new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ pstmt2.clearBatch();
+
+ return null;
+ }
+ }, SQLException.class, "Statement is closed.");
+
+ GridTestUtils.assertThrows(log, new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ pstmt2.executeBatch();
+
+ return null;
+ }
+ }, SQLException.class, "Statement is closed.");
+ }
+
+ /**
+ * @throws SQLException If failed.
+ */
+ public void testBatchException() throws SQLException {
+ final int BATCH_SIZE = 7;
+
+ for (int idx = 0, i = 0; i < BATCH_SIZE; ++i, idx += i) {
+ stmt.addBatch("insert into Person (_key, id, firstName, lastName, age) values "
+ + generateValues(idx, i + 1));
+ }
+
+ stmt.addBatch("select * from Person");
+
+ stmt.addBatch("insert into Person (_key, id, firstName, lastName, age) values "
+ + generateValues(100, 1));
+
+ try {
+ stmt.executeBatch();
+
+ fail("BatchUpdateException must be thrown");
+ } catch(BatchUpdateException e) {
+ int [] updCnts = e.getUpdateCounts();
+
+ assertEquals("Invalid update counts size", BATCH_SIZE, updCnts.length);
+
+ for (int i = 0; i < BATCH_SIZE; ++i)
+ assertEquals("Invalid update count",i + 1, updCnts[i]);
+
+ if (!e.getMessage().contains("Query produced result set [qry=select * from Person, args=[]]")) {
+ log.error("Invalid exception: ", e);
+
+ fail();
+ }
+ }
+ }
+
+ /**
+ * @throws SQLException If failed.
+ */
+ public void testBatchClear() throws SQLException {
+ final int BATCH_SIZE = 7;
+
+ for (int idx = 0, i = 0; i < BATCH_SIZE; ++i, idx += i) {
+ stmt.addBatch("insert into Person (_key, id, firstName, lastName, age) values "
+ + generateValues(idx, i + 1));
+ }
+
+ stmt.clearBatch();
+
+ GridTestUtils.assertThrows(log, new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ stmt.executeBatch();
+
+ return null;
+ }
+ }, SQLException.class, "Batch is empty.");
+ }
+
+ /**
+ * @throws SQLException If failed.
+ */
+ public void testBatchPrepared() throws SQLException {
+ final int BATCH_SIZE = 10;
+
+ for (int i = 0; i < BATCH_SIZE; ++i) {
+ int paramCnt = 1;
+
+ pstmt.setString(paramCnt++, "p" + i);
+ pstmt.setInt(paramCnt++, i);
+ pstmt.setString(paramCnt++, "Name" + i);
+ pstmt.setString(paramCnt++, "Lastname" + i);
+ pstmt.setInt(paramCnt++, 20 + i);
+
+ pstmt.addBatch();
+ }
+
+ int [] updCnts = pstmt.executeBatch();
+
+ assertEquals("Invalid update counts size", BATCH_SIZE, updCnts.length);
+
+ for (int i = 0; i < BATCH_SIZE; ++i)
+ assertEquals("Invalid update count",1, updCnts[i]);
+ }
+
+ /**
+ * @throws SQLException If failed.
+ */
+ public void testBatchExceptionPrepared() throws SQLException {
+ final int BATCH_SIZE = 7;
+
+ for (int i = 0; i < BATCH_SIZE; ++i) {
+ int paramCnt = 1;
+
+ pstmt.setString(paramCnt++, "p" + i);
+ pstmt.setInt(paramCnt++, i);
+ pstmt.setString(paramCnt++, "Name" + i);
+ pstmt.setString(paramCnt++, "Lastname" + i);
+ pstmt.setInt(paramCnt++, 20 + i);
+
+ pstmt.addBatch();
+ }
+
+ int paramCnt = 1;
+ pstmt.setString(paramCnt++, "p" + 100);
+ pstmt.setString(paramCnt++, "x");
+ pstmt.setString(paramCnt++, "Name" + 100);
+ pstmt.setString(paramCnt++, "Lastname" + 100);
+ pstmt.setInt(paramCnt++, 20 + 100);
+
+ pstmt.addBatch();
+
+ try {
+ pstmt.executeBatch();
+
+ fail("BatchUpdateException must be thrown");
+ } catch(BatchUpdateException e) {
+ int [] updCnts = e.getUpdateCounts();
+
+ assertEquals("Invalid update counts size", BATCH_SIZE, updCnts.length);
+
+ for (int i = 0; i < BATCH_SIZE; ++i)
+ assertEquals("Invalid update count",1, updCnts[i]);
+
+ if (!e.getMessage().contains("Failed to execute SQL query.")) {
+ log.error("Invalid exception: ", e);
+
+ fail();
+ }
+ }
+ }
+
+ /**
+ * @throws SQLException If failed.
+ */
+ public void testBatchClearPrepared() throws SQLException {
+ final int BATCH_SIZE = 10;
+
+ for (int i = 0; i < BATCH_SIZE; ++i) {
+ int paramCnt = 1;
+
+ pstmt.setString(paramCnt++, "p" + i);
+ pstmt.setInt(paramCnt++, i);
+ pstmt.setString(paramCnt++, "Name" + i);
+ pstmt.setString(paramCnt++, "Lastname" + i);
+ pstmt.setInt(paramCnt++, 20 + i);
+
+ pstmt.addBatch();
+ }
+
+ pstmt.clearBatch();
+
+ GridTestUtils.assertThrows(log, new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ pstmt.executeBatch();
+
+ return null;
+ }
+ }, SQLException.class, "Batch is empty.");
+ }
+
+ /**
+ * @param beginIndex Begin row index.
+ * @param cnt Count of rows.
+ * @return String contains values for 'cnt' rows.
+ */
+ private String generateValues(int beginIndex, int cnt) {
+ StringBuilder sb = new StringBuilder();
+
+ int lastIdx = beginIndex + cnt - 1;
+
+ for (int i = beginIndex; i < lastIdx; ++i)
+ sb.append(valuesRow(i)).append(',');
+
+ sb.append(valuesRow(lastIdx));
+
+ return sb.toString();
+ }
+
+ /**
+ * @param idx Index of the row.
+ * @return String with row values.
+ */
+ private String valuesRow(int idx) {
+ return String.format("('p%d', %d, 'Name%d', 'Lastname%d', %d)", idx, idx, idx, idx, 20 + idx);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0f22223b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinPreparedStatement.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinPreparedStatement.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinPreparedStatement.java
index 0c78a13..455c80f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinPreparedStatement.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinPreparedStatement.java
@@ -40,6 +40,7 @@ import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Calendar;
import org.apache.ignite.internal.processors.odbc.SqlListenerUtils;
+import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQuery;
/**
* JDBC prepared statement implementation.
@@ -230,7 +231,20 @@ public class JdbcThinPreparedStatement extends JdbcThinStatement implements Prep
@Override public void addBatch() throws SQLException {
ensureNotClosed();
- throw new SQLFeatureNotSupportedException("Updates are not supported.");
+ if (batch == null) {
+ batch = new ArrayList<>();
+
+ batch.add(new JdbcQuery(sql, args.toArray(new Object[args.size()])));
+ }
+ else
+ batch.add(new JdbcQuery(null, args.toArray(new Object[args.size()])));
+
+ args = null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void addBatch(String sql) throws SQLException {
+ throw new SQLException("The method 'addBatch(String)' is called on PreparedStatement instance.");
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/0f22223b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinStatement.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinStatement.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinStatement.java
index 2cad223..b01350a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinStatement.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinStatement.java
@@ -18,15 +18,20 @@
package org.apache.ignite.internal.jdbc.thin;
import java.io.IOException;
+import java.sql.BatchUpdateException;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.sql.SQLWarning;
import java.sql.Statement;
+import java.util.ArrayList;
import java.util.List;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.query.SqlQuery;
+import org.apache.ignite.internal.processors.odbc.SqlListenerResponse;
+import org.apache.ignite.internal.processors.odbc.jdbc.JdbcBatchExecuteResult;
+import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQuery;
import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQueryExecuteResult;
import static java.sql.ResultSet.CONCUR_READ_ONLY;
@@ -62,6 +67,9 @@ public class JdbcThinStatement implements Statement {
/** */
private boolean alreadyRead;
+ /** Batch. */
+ protected List<JdbcQuery> batch;
+
/**
* Creates new statement.
*
@@ -323,21 +331,53 @@ public class JdbcThinStatement implements Statement {
@Override public void addBatch(String sql) throws SQLException {
ensureNotClosed();
- throw new SQLFeatureNotSupportedException("Updates are not supported.");
+ if (batch == null)
+ batch = new ArrayList<>();
+
+ batch.add(new JdbcQuery(sql, null));
}
/** {@inheritDoc} */
@Override public void clearBatch() throws SQLException {
ensureNotClosed();
- throw new SQLFeatureNotSupportedException("Updates are not supported.");
+ batch = null;
}
/** {@inheritDoc} */
@Override public int[] executeBatch() throws SQLException {
ensureNotClosed();
- throw new SQLFeatureNotSupportedException("Updates are not supported.");
+ if (rs != null) {
+ rs.close();
+
+ rs = null;
+ }
+
+ alreadyRead = false;
+
+ if (batch == null || batch.isEmpty())
+ throw new SQLException("Batch is empty.");
+
+ try {
+ JdbcBatchExecuteResult res = conn.io().batchExecute(conn.getSchema(), batch);
+
+ if (res.errorCode() != SqlListenerResponse.STATUS_SUCCESS)
+ throw new BatchUpdateException(res.errorMessage(), null, res.errorCode(), res.updateCounts());
+
+ return res.updateCounts();
+ }
+ catch (IOException e) {
+ conn.close();
+
+ throw new SQLException("Failed to query Ignite.", e);
+ }
+ catch (IgniteCheckedException e) {
+ throw new SQLException("Failed to query Ignite.", e);
+ }
+ finally {
+ batch = null;
+ }
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/0f22223b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java
index be62a8d..f54d5fd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java
@@ -32,6 +32,9 @@ import org.apache.ignite.internal.processors.odbc.SqlListenerNioListener;
import org.apache.ignite.internal.processors.odbc.SqlListenerProtocolVersion;
import org.apache.ignite.internal.processors.odbc.SqlListenerRequest;
import org.apache.ignite.internal.processors.odbc.SqlListenerResponse;
+import org.apache.ignite.internal.processors.odbc.jdbc.JdbcBatchExecuteRequest;
+import org.apache.ignite.internal.processors.odbc.jdbc.JdbcBatchExecuteResult;
+import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQuery;
import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQueryCloseRequest;
import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQueryExecuteRequest;
import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQueryExecuteResult;
@@ -58,6 +61,9 @@ public class JdbcThinTcpIo {
/** Initial output for query message. */
private static final int QUERY_EXEC_MSG_INIT_CAP = 256;
+ /** Maximum batch query count. */
+ private static final int MAX_BATCH_QRY_CNT = 32;
+
/** Initial output for query fetch message. */
private static final int QUERY_FETCH_MSG_SIZE = 13;
@@ -289,6 +295,20 @@ public class JdbcThinTcpIo {
}
/**
+ * @param schema Schema.
+ * @param batch Batch queries.
+ * @return Result.
+ * @throws IOException On error.
+ * @throws IgniteCheckedException On error.
+ */
+ public JdbcBatchExecuteResult batchExecute(String schema, List<JdbcQuery> batch)
+ throws IOException, IgniteCheckedException {
+ int cnt = Math.min(MAX_BATCH_QRY_CNT, batch.size());
+
+ return sendRequest(new JdbcBatchExecuteRequest(schema, batch), QUERY_EXEC_MSG_INIT_CAP * cnt);
+ }
+
+ /**
* @param req ODBC request.
* @throws IOException On error.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/0f22223b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBatchExecuteRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBatchExecuteRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBatchExecuteRequest.java
new file mode 100644
index 0000000..9f71bff
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBatchExecuteRequest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.odbc.jdbc;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.ignite.binary.BinaryObjectException;
+import org.apache.ignite.internal.binary.BinaryReaderExImpl;
+import org.apache.ignite.internal.binary.BinaryWriterExImpl;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * JDBC batch execute request.
+ */
+public class JdbcBatchExecuteRequest extends JdbcRequest {
+ /** Cache name. */
+ private String schema;
+
+ /** Sql query. */
+ @GridToStringInclude(sensitive = true)
+ private List<JdbcQuery> queries;
+
+ /**
+ * Default constructor.
+ */
+ public JdbcBatchExecuteRequest() {
+ super(BATCH_EXEC);
+ }
+
+ /**
+ * @param schema Schema.
+ * @param queries Queries.
+ */
+ public JdbcBatchExecuteRequest(String schema, List<JdbcQuery> queries) {
+ super(BATCH_EXEC);
+
+ assert !F.isEmpty(queries);
+
+ this.schema = schema;
+ this.queries = queries;
+ }
+
+ /**
+ * @return Schema.
+ */
+ @Nullable public String schema() {
+ return schema;
+ }
+
+ /**
+ * @return Queries.
+ */
+ public List<JdbcQuery> queries() {
+ return queries;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeBinary(BinaryWriterExImpl writer) throws BinaryObjectException {
+ super.writeBinary(writer);
+
+ writer.writeString(schema);
+ writer.writeInt(queries.size());
+
+ for (JdbcQuery q : queries)
+ q.writeBinary(writer);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readBinary(BinaryReaderExImpl reader) throws BinaryObjectException {
+ super.readBinary(reader);
+
+ schema = reader.readString();
+
+ int n = reader.readInt();
+
+ queries = new ArrayList<>(n);
+
+ for (int i = 0; i < n; ++i) {
+ JdbcQuery qry = new JdbcQuery();
+
+ qry.readBinary(reader);
+
+ queries.add(qry);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(JdbcBatchExecuteRequest.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0f22223b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBatchExecuteResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBatchExecuteResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBatchExecuteResult.java
new file mode 100644
index 0000000..7977c22
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBatchExecuteResult.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.odbc.jdbc;
+
+import org.apache.ignite.binary.BinaryObjectException;
+import org.apache.ignite.internal.binary.BinaryReaderExImpl;
+import org.apache.ignite.internal.binary.BinaryWriterExImpl;
+
+/**
+ * JDBC batch execute result.
+ */
+public class JdbcBatchExecuteResult extends JdbcResult {
+ /** Update counts. */
+ private int [] updateCnts;
+
+ /** Batch update error code. */
+ private int errCode;
+
+ /** Batch update error message. */
+ private String errMsg;
+
+ /**
+ * Condtructor.
+ */
+ public JdbcBatchExecuteResult() {
+ super(BATCH_EXEC);
+ }
+
+ /**
+ * @param updateCnts Update counts for batch.
+ * @param errCode Error code.
+ * @param errMsg Error message.
+ */
+ public JdbcBatchExecuteResult(int [] updateCnts, int errCode, String errMsg) {
+ super(BATCH_EXEC);
+
+ this.updateCnts = updateCnts;
+ this.errCode = errCode;
+ this.errMsg = errMsg;
+ }
+
+ /**
+ * @return Update count for DML queries.
+ */
+ public int[] updateCounts() {
+ return updateCnts;
+ }
+
+ /**
+ * @return Batch error code.
+ */
+ public int errorCode() {
+ return errCode;
+ }
+
+ /**
+ * @return Batch error message.
+ */
+ public String errorMessage() {
+ return errMsg;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeBinary(BinaryWriterExImpl writer) throws BinaryObjectException {
+ super.writeBinary(writer);
+
+ writer.writeInt(errCode);
+ writer.writeString(errMsg);
+ writer.writeIntArray(updateCnts);
+ }
+
+
+ /** {@inheritDoc} */
+ @Override public void readBinary(BinaryReaderExImpl reader) throws BinaryObjectException {
+ super.readBinary(reader);
+
+ errCode = reader.readInt();
+ errMsg = reader.readString();
+ updateCnts = reader.readIntArray();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0f22223b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcQuery.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcQuery.java
new file mode 100644
index 0000000..f7ffb99
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcQuery.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.odbc.jdbc;
+
+import org.apache.ignite.internal.binary.BinaryReaderExImpl;
+import org.apache.ignite.internal.binary.BinaryWriterExImpl;
+import org.apache.ignite.internal.processors.odbc.SqlListenerUtils;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * JDBC SQL query with parameters.
+ */
+public class JdbcQuery implements JdbcRawBinarylizable {
+ /** Query SQL. */
+ private String sql;
+
+ /** Arguments. */
+ private Object[] args;
+
+ /**
+ * Default constructor is used for serialization.
+ */
+ public JdbcQuery() {
+ // No-op.
+ }
+
+ /**
+ * @param sql Query SQL.
+ * @param args Arguments.
+ */
+ public JdbcQuery(String sql, Object[] args) {
+ this.sql = sql;
+ this.args = args;
+ }
+
+ /**
+ * @return Query SQL string.
+ */
+ public String sql() {
+ return sql;
+ }
+
+ /**
+ * @return Query arguments.
+ */
+ public Object[] args() {
+ return args;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeBinary(BinaryWriterExImpl writer) {
+ writer.writeString(sql);
+
+ if (args == null || args.length == 0)
+ writer.writeInt(0);
+ else {
+ writer.writeInt(args.length);
+
+ for (Object arg : args)
+ SqlListenerUtils.writeObject(writer, arg, false);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readBinary(BinaryReaderExImpl reader) {
+ sql = reader.readString();
+
+ int argsNum = reader.readInt();
+
+ args = new Object[argsNum];
+
+ for (int i = 0; i < argsNum; ++i)
+ args[i] = SqlListenerUtils.readObject(reader, false);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(JdbcQuery.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0f22223b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequest.java
index d6f8fd3..0e144cc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequest.java
@@ -39,6 +39,9 @@ public class JdbcRequest extends SqlListenerRequest implements JdbcRawBinaryliza
/** Get columns meta query. */
public static final byte QRY_META = 5;
+ /** Batch queries. */
+ public static final byte BATCH_EXEC = 6;
+
/** Request type. */
private byte type;
@@ -97,6 +100,11 @@ public class JdbcRequest extends SqlListenerRequest implements JdbcRawBinaryliza
break;
+ case BATCH_EXEC:
+ req = new JdbcBatchExecuteRequest();
+
+ break;
+
default:
throw new IgniteException("Unknown SQL listener request ID: [request ID=" + reqType + ']');
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0f22223b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java
index 94ac433..60c08f9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java
@@ -17,6 +17,11 @@
package org.apache.ignite.internal.processors.odbc.jdbc;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.query.FieldsQueryCursor;
import org.apache.ignite.cache.query.SqlFieldsQuery;
@@ -31,10 +36,7 @@ import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicLong;
-
+import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.BATCH_EXEC;
import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.QRY_CLOSE;
import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.QRY_EXEC;
import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.QRY_FETCH;
@@ -129,6 +131,9 @@ public class JdbcRequestHandler implements SqlListenerRequestHandler {
case QRY_META:
return getQueryMeta((JdbcQueryMetadataRequest)req);
+
+ case BATCH_EXEC:
+ return executeBatch((JdbcBatchExecuteRequest)req);
}
return new JdbcResponse(SqlListenerResponse.STATUS_FAILED, "Unsupported JDBC request [req=" + req + ']');
@@ -307,4 +312,57 @@ public class JdbcRequestHandler implements SqlListenerRequestHandler {
return new JdbcResponse(SqlListenerResponse.STATUS_FAILED, e.toString());
}
}
+
+ /**
+ * @param req Request.
+ * @return Response.
+ */
+ private SqlListenerResponse executeBatch(JdbcBatchExecuteRequest req) {
+ String schemaName = req.schema();
+
+ if (F.isEmpty(schemaName))
+ schemaName = QueryUtils.DFLT_SCHEMA;
+
+ int successQueries = 0;
+ int updCnts[] = new int[req.queries().size()];
+
+ try {
+ String sql = null;
+
+ for (JdbcQuery q : req.queries()) {
+ if (q.sql() != null)
+ sql = q.sql();
+
+ SqlFieldsQuery qry = new SqlFieldsQuery(sql);
+
+ qry.setArgs(q.args());
+
+ qry.setDistributedJoins(distributedJoins);
+ qry.setEnforceJoinOrder(enforceJoinOrder);
+ qry.setCollocated(collocated);
+ qry.setReplicatedOnly(replicatedOnly);
+
+ qry.setSchema(schemaName);
+
+ QueryCursorImpl<List<?>> qryCur = (QueryCursorImpl<List<?>>)ctx.query()
+ .querySqlFieldsNoCache(qry, true);
+
+ if (qryCur.isQuery())
+ throw new IgniteCheckedException("Query produced result set [qry=" + q.sql() + ", args=" +
+ Arrays.toString(q.args()) + ']');
+
+ List<List<?>> items = qryCur.getAll();
+
+ updCnts[successQueries++] = ((Long)items.get(0).get(0)).intValue();
+ }
+
+ return new JdbcResponse(new JdbcBatchExecuteResult(updCnts, SqlListenerResponse.STATUS_SUCCESS, null));
+ }
+ catch (Exception e) {
+ U.error(log, "Failed to execute batch query [reqId=" + req.requestId() + ", req=" + req + ']', e);
+
+ return new JdbcResponse(new JdbcBatchExecuteResult(Arrays.copyOf(updCnts, successQueries),
+ SqlListenerResponse.STATUS_FAILED, e.toString()));
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0f22223b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcResult.java
index 2d7666e..48affe9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcResult.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcResult.java
@@ -35,6 +35,9 @@ public class JdbcResult implements JdbcRawBinarylizable {
/** Get columns meta query result. */
public static final byte QRY_META = 4;
+ /** Batch queries. */
+ public static final byte BATCH_EXEC = 6;
+
/** Success status. */
private byte type;
@@ -70,14 +73,22 @@ public class JdbcResult implements JdbcRawBinarylizable {
switch(resId) {
case QRY_EXEC:
res = new JdbcQueryExecuteResult();
+
break;
case QRY_FETCH:
res = new JdbcQueryFetchResult();
+
break;
case QRY_META:
res = new JdbcQueryMetadataResult();
+
+ break;
+
+ case BATCH_EXEC:
+ res = new JdbcBatchExecuteResult();
+
break;
default: