You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2018/05/11 14:55:44 UTC
[2/2] ignite git commit: IGNITE-7999: JDBC Thin Driver: added
unordered streaming mode. This closes #3789.
IGNITE-7999: JDBC Thin Driver: added unordered streaming mode. This closes #3789.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/01f60542
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/01f60542
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/01f60542
Branch: refs/heads/master
Commit: 01f60542ee1b6e9f4c319eb77be2664daf3ecf43
Parents: 1df5a26
Author: tledkov-gridgain <tl...@gridgain.com>
Authored: Fri May 11 17:55:22 2018 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Fri May 11 17:55:22 2018 +0300
----------------------------------------------------------------------
.../internal/jdbc2/JdbcStreamingSelfTest.java | 3 -
.../jdbc/suite/IgniteJdbcDriverTestSuite.java | 6 +-
...JdbcThinConnectionMultipleAddressesTest.java | 7 +-
.../thin/JdbcThinStreamingAbstractSelfTest.java | 505 +++++++++++++++++++
.../JdbcThinStreamingNotOrderedSelfTest.java | 38 ++
.../thin/JdbcThinStreamingOrderedSelfTest.java | 39 ++
.../jdbc/thin/JdbcThinStreamingSelfTest.java | 486 ------------------
.../internal/jdbc/thin/JdbcThinConnection.java | 312 +++++++++---
.../internal/jdbc/thin/JdbcThinTcpIo.java | 63 ++-
.../odbc/ClientListenerNioListener.java | 23 +-
.../odbc/jdbc/JdbcBatchExecuteRequest.java | 27 +
.../odbc/jdbc/JdbcBatchExecuteResult.java | 26 +-
.../odbc/jdbc/JdbcConnectionContext.java | 34 +-
.../jdbc/JdbcOrderedBatchExecuteRequest.java | 85 ++++
.../jdbc/JdbcOrderedBatchExecuteResult.java | 75 +++
.../processors/odbc/jdbc/JdbcRequest.java | 8 +
.../odbc/jdbc/JdbcRequestHandler.java | 119 ++++-
.../odbc/jdbc/JdbcResponseSender.java | 31 ++
.../processors/odbc/jdbc/JdbcResult.java | 8 +
.../processors/query/SqlClientContext.java | 147 ++++--
.../apache/ignite/internal/sql/SqlKeyword.java | 3 +
.../sql/command/SqlSetStreamingCommand.java | 19 +
.../sql/SqlParserSetStreamingSelfTest.java | 39 +-
.../processors/query/h2/IgniteH2Indexing.java | 2 +-
.../config/ignite-localhost-config.xml | 2 -
.../benchmark-jdbc-thin-streaming.properties | 132 +++++
.../ignite/yardstick/upload/StreamerParams.java | 7 +-
.../upload/UploadBenchmarkArguments.java | 19 +-
.../yardstick/upload/model/QueryFactory.java | 2 +
29 files changed, 1630 insertions(+), 637 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/01f60542/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcStreamingSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcStreamingSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcStreamingSelfTest.java
index 10adedc..e302529 100644
--- a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcStreamingSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcStreamingSelfTest.java
@@ -56,9 +56,6 @@ public class JdbcStreamingSelfTest extends GridCommonAbstractTest {
private static final String STREAMING_URL = CFG_URL_PREFIX +
"cache=person@modules/clients/src/test/config/jdbc-config.xml";
- /** */
- protected transient IgniteLogger log;
-
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
return getConfiguration0(gridName);
http://git-wip-us.apache.org/repos/asf/ignite/blob/01f60542/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java
index c0378b2..cf6b4f6 100644
--- a/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java
+++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java
@@ -71,7 +71,8 @@ import org.apache.ignite.jdbc.thin.JdbcThinResultSetSelfTest;
import org.apache.ignite.jdbc.thin.JdbcThinSchemaCaseTest;
import org.apache.ignite.jdbc.thin.JdbcThinSelectAfterAlterTable;
import org.apache.ignite.jdbc.thin.JdbcThinStatementSelfTest;
-import org.apache.ignite.jdbc.thin.JdbcThinStreamingSelfTest;
+import org.apache.ignite.jdbc.thin.JdbcThinStreamingNotOrderedSelfTest;
+import org.apache.ignite.jdbc.thin.JdbcThinStreamingOrderedSelfTest;
import org.apache.ignite.jdbc.thin.JdbcThinTcpIoTest;
import org.apache.ignite.jdbc.thin.JdbcThinUpdateStatementSelfTest;
import org.apache.ignite.jdbc.thin.JdbcThinUpdateStatementSkipReducerOnUpdateSelfTest;
@@ -129,7 +130,8 @@ public class IgniteJdbcDriverTestSuite extends TestSuite {
suite.addTest(new TestSuite(JdbcBlobTest.class));
suite.addTest(new TestSuite(org.apache.ignite.internal.jdbc2.JdbcStreamingSelfTest.class));
- suite.addTest(new TestSuite(JdbcThinStreamingSelfTest.class));
+ suite.addTest(new TestSuite(JdbcThinStreamingNotOrderedSelfTest.class));
+ suite.addTest(new TestSuite(JdbcThinStreamingOrderedSelfTest.class));
// DDL tests.
suite.addTest(new TestSuite(org.apache.ignite.internal.jdbc2.JdbcDynamicIndexAtomicPartitionedNearSelfTest.class));
http://git-wip-us.apache.org/repos/asf/ignite/blob/01f60542/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionMultipleAddressesTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionMultipleAddressesTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionMultipleAddressesTest.java
index 2c2aba9..e1fb295 100644
--- a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionMultipleAddressesTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionMultipleAddressesTest.java
@@ -372,9 +372,7 @@ public class JdbcThinConnectionMultipleAddressesTest extends JdbcThinAbstractSel
return null;
}
- }, SQLException.class, "Failed to communicate with Ignite cluster");
-
- assertTrue(id[0] > 0);
+ }, SQLException.class, "Failed to communicate with Ignite cluster on JDBC streaming");
int minId = id[0];
@@ -382,6 +380,9 @@ public class JdbcThinConnectionMultipleAddressesTest extends JdbcThinAbstractSel
final Statement stmt1 = conn.createStatement();
+ stmt1.execute("SET STREAMING 1 BATCH_SIZE 10 ALLOW_OVERWRITE 0 " +
+ " PER_NODE_BUFFER_SIZE 1000 FLUSH_FREQUENCY 1000");
+
for (int i = 0; i < 10; ++i, id[0]++)
stmt1.execute("INSERT INTO TEST(id, val) values (" + id[0] + ", " + id[0] + ")");
http://git-wip-us.apache.org/repos/asf/ignite/blob/01f60542/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingAbstractSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingAbstractSelfTest.java
new file mode 100644
index 0000000..7004635
--- /dev/null
+++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingAbstractSelfTest.java
@@ -0,0 +1,505 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.jdbc.thin;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.concurrent.Callable;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.cache.CachePeekMode;
+import org.apache.ignite.cache.query.FieldsQueryCursor;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.internal.jdbc2.JdbcStreamingSelfTest;
+import org.apache.ignite.internal.processors.query.GridQueryCancel;
+import org.apache.ignite.internal.processors.query.GridQueryProcessor;
+import org.apache.ignite.internal.processors.query.SqlClientContext;
+import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Tests for streaming via thin driver.
+ */
+public abstract class JdbcThinStreamingAbstractSelfTest extends JdbcStreamingSelfTest {
+ /** */
+ protected int batchSize = 17;
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ GridQueryProcessor.idxCls = IndexingWithContext.class;
+
+ super.beforeTestsStarted();
+
+ batchSize = 17;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ // Init IndexingWithContext.cliCtx
+ try (Connection c = createOrdinaryConnection()) {
+ execute(c, "SELECT 1");
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ try (Connection c = createOrdinaryConnection()) {
+ execute(c, "DROP TABLE PUBLIC.T IF EXISTS");
+ }
+
+ IndexingWithContext.cliCtx = null;
+
+ super.afterTest();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected Connection createOrdinaryConnection() throws SQLException {
+ return JdbcThinAbstractSelfTest.connect(grid(0), null);
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ public void testStreamedBatchedInsert() throws Exception {
+ for (int i = 10; i <= 100; i += 10)
+ put(i, nameForId(i * 100));
+
+ try (Connection conn = createStreamedConnection(false)) {
+ assertStreamingState(true);
+
+ try (PreparedStatement stmt = conn.prepareStatement("insert into Person(\"id\", \"name\") values (?, ?), " +
+ "(?, ?)")) {
+ for (int i = 1; i <= 100; i+= 2) {
+ stmt.setInt(1, i);
+ stmt.setString(2, nameForId(i));
+ stmt.setInt(3, i + 1);
+ stmt.setString(4, nameForId(i + 1));
+
+ stmt.addBatch();
+ }
+
+ stmt.executeBatch();
+ }
+ }
+
+ U.sleep(500);
+
+ // Now let's check it's all there.
+ for (int i = 1; i <= 100; i++) {
+ if (i % 10 != 0)
+ assertEquals(nameForId(i), nameForIdInCache(i));
+ else // All that divides by 10 evenly should point to numbers 100 times greater - see above
+ assertEquals(nameForId(i * 100), nameForIdInCache(i));
+ }
+ }
+
+ /**
+ * @throws SQLException if failed.
+ */
+ public void testSimultaneousStreaming() throws Exception {
+ try (Connection anotherConn = createOrdinaryConnection()) {
+ execute(anotherConn, "CREATE TABLE PUBLIC.T(x int primary key, y int) WITH " +
+ "\"cache_name=T,wrap_value=false\"");
+ }
+
+ // Timeout to let connection close be handled on server side.
+ U.sleep(500);
+
+ try (Connection conn = createStreamedConnection(false, 10000)) {
+ assertStreamingState(true);
+
+ PreparedStatement firstStmt = conn.prepareStatement("insert into Person(\"id\", \"name\") values (?, ?)");
+
+ PreparedStatement secondStmt = conn.prepareStatement("insert into PUBLIC.T(x, y) values (?, ?)");
+
+ try {
+ for (int i = 1; i <= 10; i++) {
+ firstStmt.setInt(1, i);
+ firstStmt.setString(2, nameForId(i));
+
+ firstStmt.executeUpdate();
+ }
+
+ for (int i = 51; i <= 67; i++) {
+ secondStmt.setInt(1, i);
+ secondStmt.setInt(2, i);
+
+ secondStmt.executeUpdate();
+ }
+
+ for (int i = 11; i <= 50; i++) {
+ firstStmt.setInt(1, i);
+ firstStmt.setString(2, nameForId(i));
+
+ firstStmt.executeUpdate();
+ }
+
+ for (int i = 68; i <= 100; i++) {
+ secondStmt.setInt(1, i);
+ secondStmt.setInt(2, i);
+
+ secondStmt.executeUpdate();
+ }
+
+ assertCacheEmpty();
+
+ SqlClientContext cliCtx = sqlClientContext();
+
+ final HashMap<String, IgniteDataStreamer<?, ?>> streamers = U.field(cliCtx, "streamers");
+
+ // Wait when node process requests (because client send batch requests async).
+ GridTestUtils.waitForCondition(() -> streamers.size() == 2, 1000);
+
+ assertEquals(2, streamers.size());
+
+ assertEqualsCollections(new HashSet<>(Arrays.asList("person", "T")), streamers.keySet());
+ }
+ finally {
+ U.closeQuiet(firstStmt);
+
+ U.closeQuiet(secondStmt);
+ }
+ }
+
+ // Let's wait a little so that all data arrives to destination - we can't intercept streamers' flush
+ // on connection close in any way.
+ U.sleep(1000);
+
+ // Now let's check it's all there.
+ for (int i = 1; i <= 50; i++)
+ assertEquals(nameForId(i), nameForIdInCache(i));
+
+ for (int i = 51; i <= 100; i++)
+ assertEquals(i, grid(0).cache("T").get(i));
+ }
+
+ /**
+ *
+ */
+ public void testStreamingWithMixedStatementTypes() throws Exception {
+ String prepStmtStr = "insert into Person(\"id\", \"name\") values (?, ?)";
+
+ String stmtStr = "insert into Person(\"id\", \"name\") values (%d, '%s')";
+
+ try (Connection conn = createStreamedConnection(false, 10000)) {
+ assertStreamingState(true);
+
+ PreparedStatement firstStmt = conn.prepareStatement(prepStmtStr);
+
+ Statement secondStmt = conn.createStatement();
+
+ try {
+ for (int i = 1; i <= 100; i++) {
+ boolean usePrep = Math.random() > 0.5;
+
+ boolean useBatch = Math.random() > 0.5;
+
+ if (usePrep) {
+ firstStmt.setInt(1, i);
+ firstStmt.setString(2, nameForId(i));
+
+ if (useBatch)
+ firstStmt.addBatch();
+ else
+ firstStmt.execute();
+ }
+ else {
+ String sql = String.format(stmtStr, i, nameForId(i));
+
+ if (useBatch)
+ secondStmt.addBatch(sql);
+ else
+ secondStmt.execute(sql);
+ }
+ }
+ }
+ finally {
+ U.closeQuiet(firstStmt);
+
+ U.closeQuiet(secondStmt);
+ }
+ }
+
+ // Let's wait a little so that all data arrives to destination - we can't intercept streamers' flush
+ // on connection close in any way.
+ U.sleep(1000);
+
+ // Now let's check it's all there.
+ for (int i = 1; i <= 100; i++)
+ assertEquals(nameForId(i), nameForIdInCache(i));
+ }
+
+ /**
+ * @throws SQLException if failed.
+ */
+ public void testStreamingOffToOn() throws Exception {
+ try (Connection conn = createOrdinaryConnection()) {
+ assertStreamingState(false);
+
+ execute(conn, "SET STREAMING 1");
+
+ assertStreamingState(true);
+ }
+ }
+
+ /**
+ * @throws SQLException if failed.
+ */
+ public void testStreamingOffToOff() throws Exception {
+ try (Connection conn = createOrdinaryConnection()) {
+ assertStreamingState(false);
+
+ execute(conn, "SET STREAMING 0");
+
+ assertStreamingState(false);
+ }
+ }
+
+ /**
+ * @throws SQLException if failed.
+ */
+ public void testStreamingOnToOff() throws Exception {
+ try (Connection conn = createStreamedConnection(false)) {
+ assertStreamingState(true);
+
+ execute(conn, "SET STREAMING off");
+
+ assertStreamingState(false);
+ }
+ }
+
+ /**
+ * @throws SQLException if failed.
+ */
+ public void testFlush() throws Exception {
+ try (Connection conn = createStreamedConnection(false, 10000)) {
+ assertStreamingState(true);
+
+ try (PreparedStatement stmt = conn.prepareStatement("insert into Person(\"id\", \"name\") values (?, ?)")) {
+ for (int i = 1; i <= 100; i++) {
+ stmt.setInt(1, i);
+ stmt.setString(2, nameForId(i));
+
+ stmt.executeUpdate();
+ }
+ }
+
+ assertCacheEmpty();
+
+ execute(conn, "set streaming 0");
+
+ assertStreamingState(false);
+
+ U.sleep(500);
+
+ // Now let's check it's all there.
+ for (int i = 1; i <= 100; i++)
+ assertEquals(nameForId(i), nameForIdInCache(i));
+ }
+ }
+
+ /**
+ * @throws SQLException if failed.
+ */
+ public void testStreamingReEnabled() throws Exception {
+ try (Connection conn = createStreamedConnection(false, 10000)) {
+ assertStreamingState(true);
+
+ try (PreparedStatement stmt = conn.prepareStatement("insert into Person(\"id\", \"name\") values (?, ?)")) {
+ for (int i = 1; i <= 100; i++) {
+ stmt.setInt(1, i);
+ stmt.setString(2, nameForId(i));
+
+ stmt.executeUpdate();
+ }
+ }
+
+ assertCacheEmpty();
+
+ execute(conn, "set streaming 1 batch_size 111 allow_overwrite 0 per_node_buffer_size 512 " +
+ "per_node_parallel_operations 4 flush_frequency 5000");
+
+ U.sleep(500);
+
+ assertEquals((Integer)111, U.field((Object)U.field(conn, "streamState"), "streamBatchSize"));
+
+ SqlClientContext cliCtx = sqlClientContext();
+
+ assertTrue(cliCtx.isStream());
+
+ assertFalse(U.field(cliCtx, "streamAllowOverwrite"));
+
+ assertEquals((Integer)512, U.field(cliCtx, "streamNodeBufSize"));
+
+ assertEquals((Long)5000L, U.field(cliCtx, "streamFlushTimeout"));
+
+ assertEquals((Integer)4, U.field(cliCtx, "streamNodeParOps"));
+
+ // Now let's check it's all there - SET STREAMING 1 repeated call must also have caused flush.
+ for (int i = 1; i <= 100; i++)
+ assertEquals(nameForId(i), nameForIdInCache(i));
+ }
+ }
+
+ /**
+ *
+ */
+ @SuppressWarnings("ThrowableNotThrown")
+ public void testNonStreamedBatch() {
+ GridTestUtils.assertThrows(null, new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ try (Connection conn = createOrdinaryConnection()) {
+ try (Statement s = conn.createStatement()) {
+ for (int i = 1; i <= 10; i++)
+ s.addBatch(String.format("insert into Person(\"id\", \"name\")values (%d, '%s')", i,
+ nameForId(i)));
+
+ execute(conn, "SET STREAMING 1");
+
+ s.addBatch(String.format("insert into Person(\"id\", \"name\")values (%d, '%s')", 11,
+ nameForId(11)));
+ }
+ }
+
+ return null;
+ }
+ }, SQLException.class, "Statement has non-empty batch (call executeBatch() or clearBatch() before " +
+ "enabling streaming).");
+ }
+
+ /**
+ *
+ */
+ @SuppressWarnings("ThrowableNotThrown")
+ public void testStreamingStatementInTheMiddleOfNonPreparedBatch() {
+ GridTestUtils.assertThrows(null, new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ try (Connection conn = createOrdinaryConnection()) {
+ try (Statement s = conn.createStatement()) {
+ s.addBatch(String.format("insert into Person(\"id\", \"name\")values (%d, '%s')", 1,
+ nameForId(1)));
+
+ s.addBatch("SET STREAMING 1 FLUSH_FREQUENCY 10000");
+ }
+ }
+
+ return null;
+ }
+ }, SQLException.class, "Streaming control commands must be executed explicitly");
+ }
+
+ /**
+ *
+ */
+ @SuppressWarnings("ThrowableNotThrown")
+ public void testBatchingSetStreamingStatement() {
+ GridTestUtils.assertThrows(null, new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ try (Connection conn = createOrdinaryConnection()) {
+ try (PreparedStatement s = conn.prepareStatement("SET STREAMING 1 FLUSH_FREQUENCY 10000")) {
+ s.addBatch();
+ }
+ }
+
+ return null;
+ }
+ }, SQLException.class, "Streaming control commands must be executed explicitly");
+ }
+
+ /**
+ * Check that there's nothing in cache.
+ */
+ protected void assertCacheEmpty() {
+ assertEquals(0, cache().size(CachePeekMode.ALL));
+ }
+
+ /**
+ * @param conn Connection.
+ * @param sql Statement.
+ * @throws SQLException if failed.
+ */
+ protected static void execute(Connection conn, String sql) throws SQLException {
+ try (Statement s = conn.createStatement()) {
+ s.execute(sql);
+ }
+ }
+
+ /**
+ * @return Active SQL client context.
+ */
+ private SqlClientContext sqlClientContext() {
+ assertNotNull(IndexingWithContext.cliCtx);
+
+ return IndexingWithContext.cliCtx;
+ }
+
+ /**
+ * Check that streaming state on target node is as expected.
+ *
+ * @param on Expected streaming state.
+ */
+ protected void assertStreamingState(boolean on) throws Exception {
+ SqlClientContext cliCtx = sqlClientContext();
+
+ GridTestUtils.waitForCondition(() -> cliCtx.isStream() == on, 1000);
+
+ assertEquals(on, cliCtx.isStream());
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void assertStatementForbidden(String sql) {
+ batchSize = 1;
+
+ super.assertStatementForbidden(sql);
+ }
+
+ /**
+ *
+ */
+ static final class IndexingWithContext extends IgniteH2Indexing {
+ /** Client context. */
+ static SqlClientContext cliCtx;
+
+ /** {@inheritDoc} */
+ @Override public List<Long> streamBatchedUpdateQuery(String schemaName, String qry, List<Object[]> params,
+ SqlClientContext cliCtx) throws IgniteCheckedException {
+ IndexingWithContext.cliCtx = cliCtx;
+
+ return super.streamBatchedUpdateQuery(schemaName, qry, params, cliCtx);
+ }
+
+ /** {@inheritDoc} */
+ @Override public List<FieldsQueryCursor<List<?>>> querySqlFields(String schemaName, SqlFieldsQuery qry,
+ @Nullable SqlClientContext cliCtx, boolean keepBinary, boolean failOnMultipleStmts,
+ GridQueryCancel cancel) {
+ IndexingWithContext.cliCtx = cliCtx;
+
+ return super.querySqlFields(schemaName, qry, cliCtx, keepBinary, failOnMultipleStmts, cancel);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/01f60542/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingNotOrderedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingNotOrderedSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingNotOrderedSelfTest.java
new file mode 100644
index 0000000..b91258f
--- /dev/null
+++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingNotOrderedSelfTest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.jdbc.thin;
+
+import java.sql.Connection;
+
+/**
+ * Tests for not ordered streaming via thin driver.
+ */
+public class JdbcThinStreamingNotOrderedSelfTest extends JdbcThinStreamingAbstractSelfTest {
+ /** {@inheritDoc} */
+ @Override protected Connection createStreamedConnection(boolean allowOverwrite, long flushFreq) throws Exception {
+ Connection c = JdbcThinAbstractSelfTest.connect(grid(0), null);
+
+ execute(c, "SET STREAMING 1 BATCH_SIZE " + batchSize
+ + " ALLOW_OVERWRITE " + (allowOverwrite ? 1 : 0)
+ + " PER_NODE_BUFFER_SIZE 1000 "
+ + " FLUSH_FREQUENCY " + flushFreq + ";"
+ );
+
+ return c;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/01f60542/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingOrderedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingOrderedSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingOrderedSelfTest.java
new file mode 100644
index 0000000..b615f8c
--- /dev/null
+++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingOrderedSelfTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.jdbc.thin;
+
+import java.sql.Connection;
+
+/**
+ * Tests for ordered streaming via thin driver.
+ */
+public class JdbcThinStreamingOrderedSelfTest extends JdbcThinStreamingAbstractSelfTest {
+ /** {@inheritDoc} */
+ @Override protected Connection createStreamedConnection(boolean allowOverwrite, long flushFreq) throws Exception {
+ Connection c = JdbcThinAbstractSelfTest.connect(grid(0), null);
+
+ execute(c, "SET STREAMING 1 BATCH_SIZE " + batchSize
+ + " ALLOW_OVERWRITE " + (allowOverwrite ? 1 : 0)
+ + " PER_NODE_BUFFER_SIZE 1000 "
+ + " FLUSH_FREQUENCY " + flushFreq
+ + " ORDERED;"
+ );
+
+ return c;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/01f60542/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingSelfTest.java
deleted file mode 100644
index 3c36f54..0000000
--- a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingSelfTest.java
+++ /dev/null
@@ -1,486 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.jdbc.thin;
-
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.concurrent.Callable;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteDataStreamer;
-import org.apache.ignite.cache.CachePeekMode;
-import org.apache.ignite.cache.query.FieldsQueryCursor;
-import org.apache.ignite.cache.query.SqlFieldsQuery;
-import org.apache.ignite.internal.jdbc2.JdbcStreamingSelfTest;
-import org.apache.ignite.internal.processors.query.GridQueryCancel;
-import org.apache.ignite.internal.processors.query.GridQueryProcessor;
-import org.apache.ignite.internal.processors.query.SqlClientContext;
-import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.testframework.GridTestUtils;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Tests for streaming via thin driver.
- */
-public class JdbcThinStreamingSelfTest extends JdbcStreamingSelfTest {
- /** */
- private int batchSize = 17;
-
- /** {@inheritDoc} */
- @Override protected void beforeTestsStarted() throws Exception {
- GridQueryProcessor.idxCls = IndexingWithContext.class;
-
- super.beforeTestsStarted();
-
- batchSize = 17;
- }
-
- /** {@inheritDoc} */
- @Override protected void afterTest() throws Exception {
- try (Connection c = createOrdinaryConnection()) {
- execute(c, "DROP TABLE PUBLIC.T IF EXISTS");
- }
-
- IndexingWithContext.cliCtx = null;
-
- super.afterTest();
- }
-
- /** {@inheritDoc} */
- @Override protected Connection createStreamedConnection(boolean allowOverwrite, long flushFreq) throws Exception {
- Connection c = JdbcThinAbstractSelfTest.connect(grid(0), null );
-
- execute(c, "SET STREAMING 1 BATCH_SIZE " + batchSize + " ALLOW_OVERWRITE " + (allowOverwrite ? 1 : 0) +
- " PER_NODE_BUFFER_SIZE 1000 FLUSH_FREQUENCY " + flushFreq);
-
- return c;
- }
-
- /** {@inheritDoc} */
- @Override protected Connection createOrdinaryConnection() throws SQLException {
- return JdbcThinAbstractSelfTest.connect(grid(0), null);
- }
-
- /**
- * @throws Exception if failed.
- */
- public void testStreamedBatchedInsert() throws Exception {
- for (int i = 10; i <= 100; i += 10)
- put(i, nameForId(i * 100));
-
- try (Connection conn = createStreamedConnection(false)) {
- assertStreamingState(true);
-
- try (PreparedStatement stmt = conn.prepareStatement("insert into Person(\"id\", \"name\") values (?, ?), " +
- "(?, ?)")) {
- for (int i = 1; i <= 100; i+= 2) {
- stmt.setInt(1, i);
- stmt.setString(2, nameForId(i));
- stmt.setInt(3, i + 1);
- stmt.setString(4, nameForId(i + 1));
-
- stmt.addBatch();
- }
-
- stmt.executeBatch();
- }
- }
-
- U.sleep(500);
-
- // Now let's check it's all there.
- for (int i = 1; i <= 100; i++) {
- if (i % 10 != 0)
- assertEquals(nameForId(i), nameForIdInCache(i));
- else // All that divides by 10 evenly should point to numbers 100 times greater - see above
- assertEquals(nameForId(i * 100), nameForIdInCache(i));
- }
- }
-
- /**
- * @throws SQLException if failed.
- */
- public void testSimultaneousStreaming() throws Exception {
- try (Connection anotherConn = createOrdinaryConnection()) {
- execute(anotherConn, "CREATE TABLE PUBLIC.T(x int primary key, y int) WITH " +
- "\"cache_name=T,wrap_value=false\"");
- }
-
- // Timeout to let connection close be handled on server side.
- U.sleep(500);
-
- try (Connection conn = createStreamedConnection(false, 10000)) {
- assertStreamingState(true);
-
- PreparedStatement firstStmt = conn.prepareStatement("insert into Person(\"id\", \"name\") values (?, ?)");
-
- PreparedStatement secondStmt = conn.prepareStatement("insert into PUBLIC.T(x, y) values (?, ?)");
-
- try {
- for (int i = 1; i <= 10; i++) {
- firstStmt.setInt(1, i);
- firstStmt.setString(2, nameForId(i));
-
- firstStmt.executeUpdate();
- }
-
- for (int i = 51; i <= 67; i++) {
- secondStmt.setInt(1, i);
- secondStmt.setInt(2, i);
-
- secondStmt.executeUpdate();
- }
-
- for (int i = 11; i <= 50; i++) {
- firstStmt.setInt(1, i);
- firstStmt.setString(2, nameForId(i));
-
- firstStmt.executeUpdate();
- }
-
- for (int i = 68; i <= 100; i++) {
- secondStmt.setInt(1, i);
- secondStmt.setInt(2, i);
-
- secondStmt.executeUpdate();
- }
-
- assertCacheEmpty();
-
- SqlClientContext cliCtx = sqlClientContext();
-
- HashMap<String, IgniteDataStreamer<?, ?>> streamers = U.field(cliCtx, "streamers");
-
- assertEquals(2, streamers.size());
-
- assertEqualsCollections(new HashSet<>(Arrays.asList("person", "T")), streamers.keySet());
- }
- finally {
- U.closeQuiet(firstStmt);
-
- U.closeQuiet(secondStmt);
- }
- }
-
- // Let's wait a little so that all data arrives to destination - we can't intercept streamers' flush
- // on connection close in any way.
- U.sleep(1000);
-
- // Now let's check it's all there.
- for (int i = 1; i <= 50; i++)
- assertEquals(nameForId(i), nameForIdInCache(i));
-
- for (int i = 51; i <= 100; i++)
- assertEquals(i, grid(0).cache("T").get(i));
- }
-
- /**
- *
- */
- public void testStreamingWithMixedStatementTypes() throws Exception {
- String prepStmtStr = "insert into Person(\"id\", \"name\") values (?, ?)";
-
- String stmtStr = "insert into Person(\"id\", \"name\") values (%d, '%s')";
-
- try (Connection conn = createStreamedConnection(false, 10000)) {
- assertStreamingState(true);
-
- PreparedStatement firstStmt = conn.prepareStatement(prepStmtStr);
-
- Statement secondStmt = conn.createStatement();
-
- try {
- for (int i = 1; i <= 100; i++) {
- boolean usePrep = Math.random() > 0.5;
-
- boolean useBatch = Math.random() > 0.5;
-
- if (usePrep) {
- firstStmt.setInt(1, i);
- firstStmt.setString(2, nameForId(i));
-
- if (useBatch)
- firstStmt.addBatch();
- else
- firstStmt.execute();
- }
- else {
- String sql = String.format(stmtStr, i, nameForId(i));
-
- if (useBatch)
- secondStmt.addBatch(sql);
- else
- secondStmt.execute(sql);
- }
- }
- }
- finally {
- U.closeQuiet(firstStmt);
-
- U.closeQuiet(secondStmt);
- }
- }
-
- // Let's wait a little so that all data arrives to destination - we can't intercept streamers' flush
- // on connection close in any way.
- U.sleep(1000);
-
- // Now let's check it's all there.
- for (int i = 1; i <= 100; i++)
- assertEquals(nameForId(i), nameForIdInCache(i));
- }
-
- /**
- * @throws SQLException if failed.
- */
- public void testStreamingOffToOn() throws SQLException {
- try (Connection conn = createOrdinaryConnection()) {
- assertStreamingState(false);
-
- execute(conn, "SET STREAMING 1");
-
- assertStreamingState(true);
- }
- }
-
- /**
- * @throws SQLException if failed.
- */
- public void testStreamingOnToOff() throws Exception {
- try (Connection conn = createStreamedConnection(false)) {
- assertStreamingState(true);
-
- execute(conn, "SET STREAMING off");
-
- assertStreamingState(false);
- }
- }
-
- /**
- * @throws SQLException if failed.
- */
- public void testFlush() throws Exception {
- try (Connection conn = createStreamedConnection(false, 10000)) {
- assertStreamingState(true);
-
- try (PreparedStatement stmt = conn.prepareStatement("insert into Person(\"id\", \"name\") values (?, ?)")) {
- for (int i = 1; i <= 100; i++) {
- stmt.setInt(1, i);
- stmt.setString(2, nameForId(i));
-
- stmt.executeUpdate();
- }
- }
-
- assertCacheEmpty();
-
- execute(conn, "set streaming 0");
-
- assertStreamingState(false);
-
- U.sleep(500);
-
- // Now let's check it's all there.
- for (int i = 1; i <= 100; i++)
- assertEquals(nameForId(i), nameForIdInCache(i));
- }
- }
-
- /**
- * @throws SQLException if failed.
- */
- public void testStreamingReEnabled() throws Exception {
- try (Connection conn = createStreamedConnection(false, 10000)) {
- assertStreamingState(true);
-
- try (PreparedStatement stmt = conn.prepareStatement("insert into Person(\"id\", \"name\") values (?, ?)")) {
- for (int i = 1; i <= 100; i++) {
- stmt.setInt(1, i);
- stmt.setString(2, nameForId(i));
-
- stmt.executeUpdate();
- }
- }
-
- assertCacheEmpty();
-
- execute(conn, "set streaming 1 batch_size 111 allow_overwrite 0 per_node_buffer_size 512 " +
- "per_node_parallel_operations 4 flush_frequency 5000");
-
- U.sleep(500);
-
- assertEquals((Integer)111, U.field(conn, "streamBatchSize"));
-
- SqlClientContext cliCtx = sqlClientContext();
-
- assertTrue(cliCtx.isStream());
-
- assertFalse(U.field(cliCtx, "streamAllowOverwrite"));
-
- assertEquals((Integer)512, U.field(cliCtx, "streamNodeBufSize"));
-
- assertEquals((Long)5000L, U.field(cliCtx, "streamFlushTimeout"));
-
- assertEquals((Integer)4, U.field(cliCtx, "streamNodeParOps"));
-
- // Now let's check it's all there - SET STREAMING 1 repeated call must also have caused flush.
- for (int i = 1; i <= 100; i++)
- assertEquals(nameForId(i), nameForIdInCache(i));
- }
- }
-
- /**
- *
- */
- @SuppressWarnings("ThrowableNotThrown")
- public void testNonStreamedBatch() {
- GridTestUtils.assertThrows(null, new Callable<Object>() {
- @Override public Object call() throws Exception {
- try (Connection conn = createOrdinaryConnection()) {
- try (Statement s = conn.createStatement()) {
- for (int i = 1; i <= 10; i++)
- s.addBatch(String.format("insert into Person(\"id\", \"name\")values (%d, '%s')", i,
- nameForId(i)));
-
- execute(conn, "SET STREAMING 1");
-
- s.addBatch(String.format("insert into Person(\"id\", \"name\")values (%d, '%s')", 11,
- nameForId(11)));
- }
- }
-
- return null;
- }
- }, SQLException.class, "Statement has non-empty batch (call executeBatch() or clearBatch() before " +
- "enabling streaming).");
- }
-
- /**
- *
- */
- @SuppressWarnings("ThrowableNotThrown")
- public void testStreamingStatementInTheMiddleOfNonPreparedBatch() {
- GridTestUtils.assertThrows(null, new Callable<Object>() {
- @Override public Object call() throws Exception {
- try (Connection conn = createOrdinaryConnection()) {
- try (Statement s = conn.createStatement()) {
- s.addBatch(String.format("insert into Person(\"id\", \"name\")values (%d, '%s')", 1,
- nameForId(1)));
-
- s.addBatch("SET STREAMING 1 FLUSH_FREQUENCY 10000");
- }
- }
-
- return null;
- }
- }, SQLException.class, "Streaming control commands must be executed explicitly");
- }
-
- /**
- *
- */
- @SuppressWarnings("ThrowableNotThrown")
- public void testBatchingSetStreamingStatement() {
- GridTestUtils.assertThrows(null, new Callable<Object>() {
- @Override public Object call() throws Exception {
- try (Connection conn = createOrdinaryConnection()) {
- try (PreparedStatement s = conn.prepareStatement("SET STREAMING 1 FLUSH_FREQUENCY 10000")) {
- s.addBatch();
- }
- }
-
- return null;
- }
- }, SQLException.class, "Streaming control commands must be executed explicitly");
- }
-
- /**
- * Check that there's nothing in cache.
- */
- private void assertCacheEmpty() {
- assertEquals(0, cache().size(CachePeekMode.ALL));
- }
-
- /**
- * @param conn Connection.
- * @param sql Statement.
- * @throws SQLException if failed.
- */
- private static void execute(Connection conn, String sql) throws SQLException {
- try (Statement s = conn.createStatement()) {
- s.execute(sql);
- }
- }
-
- /**
- * @return Active SQL client context.
- */
- private SqlClientContext sqlClientContext() {
- assertNotNull(IndexingWithContext.cliCtx);
-
- return IndexingWithContext.cliCtx;
- }
-
- /**
- * Check that streaming state on target node is as expected.
- * @param on Expected streaming state.
- */
- private void assertStreamingState(boolean on) {
- SqlClientContext cliCtx = sqlClientContext();
-
- assertEquals(on, cliCtx.isStream());
- }
-
- /** {@inheritDoc} */
- @Override protected void assertStatementForbidden(String sql) {
- batchSize = 1;
-
- super.assertStatementForbidden(sql);
- }
-
- /**
- *
- */
- private static final class IndexingWithContext extends IgniteH2Indexing {
- /** Client context. */
- static SqlClientContext cliCtx;
-
- /** {@inheritDoc} */
- @Override public List<Long> streamBatchedUpdateQuery(String schemaName, String qry, List<Object[]> params,
- SqlClientContext cliCtx) throws IgniteCheckedException {
- IndexingWithContext.cliCtx = cliCtx;
-
- return super.streamBatchedUpdateQuery(schemaName, qry, params, cliCtx);
- }
-
- /** {@inheritDoc} */
- @Override public List<FieldsQueryCursor<List<?>>> querySqlFields(String schemaName, SqlFieldsQuery qry,
- @Nullable SqlClientContext cliCtx, boolean keepBinary, boolean failOnMultipleStmts,
- GridQueryCancel cancel) {
- IndexingWithContext.cliCtx = cliCtx;
-
- return super.querySqlFields(schemaName, qry, cliCtx, keepBinary, failOnMultipleStmts, cancel);
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/01f60542/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java
index 3478124..634579b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java
@@ -39,13 +39,15 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executor;
+import java.util.concurrent.Semaphore;
import java.util.logging.Level;
import java.util.logging.Logger;
+import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
import org.apache.ignite.internal.processors.odbc.ClientListenerResponse;
import org.apache.ignite.internal.processors.odbc.SqlStateCode;
-import org.apache.ignite.internal.processors.odbc.jdbc.JdbcBatchExecuteRequest;
-import org.apache.ignite.internal.processors.odbc.jdbc.JdbcBatchExecuteResult;
+import org.apache.ignite.internal.processors.odbc.jdbc.JdbcOrderedBatchExecuteRequest;
+import org.apache.ignite.internal.processors.odbc.jdbc.JdbcOrderedBatchExecuteResult;
import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQuery;
import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQueryExecuteRequest;
import org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest;
@@ -55,6 +57,7 @@ import org.apache.ignite.internal.processors.odbc.jdbc.JdbcStatementType;
import org.apache.ignite.internal.processors.query.QueryUtils;
import org.apache.ignite.internal.sql.command.SqlCommand;
import org.apache.ignite.internal.sql.command.SqlSetStreamingCommand;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.lang.IgniteProductVersion;
@@ -91,7 +94,7 @@ public class JdbcThinConnection implements Connection {
private boolean readOnly;
/** Streaming flag. */
- private volatile boolean stream;
+ private volatile StreamState streamState;
/** Current transaction holdability. */
private int holdability;
@@ -108,15 +111,6 @@ public class JdbcThinConnection implements Connection {
/** Connection properties. */
private ConnectionProperties connProps;
- /** Batch size for streaming. */
- private int streamBatchSize;
-
- /** Batch for streaming. */
- private List<JdbcQuery> streamBatch;
-
- /** Last added query to recognize batches. */
- private String lastStreamQry;
-
/** Connected. */
private boolean connected;
@@ -172,7 +166,7 @@ public class JdbcThinConnection implements Connection {
* @return Whether this connection is streamed or not.
*/
boolean isStream() {
- return stream;
+ return streamState != null;
}
/**
@@ -182,24 +176,28 @@ public class JdbcThinConnection implements Connection {
*/
void executeNative(String sql, SqlCommand cmd) throws SQLException {
if (cmd instanceof SqlSetStreamingCommand) {
- // If streaming is already on, we have to disable it first.
- if (stream) {
- // We have to send request regardless of actual batch size.
- executeBatch(true);
+ SqlSetStreamingCommand cmd0 = (SqlSetStreamingCommand)cmd;
+
+ // If streaming is already on, we have to close it first.
+ if (streamState != null) {
+ streamState.close();
- stream = false;
+ streamState = null;
}
boolean newVal = ((SqlSetStreamingCommand)cmd).isTurnOn();
// Actual ON, if needed.
if (newVal) {
+ if (!cmd0.isOrdered() && !cliIo.igniteVersion().greaterThanEqual(2, 5, 0)) {
+ throw new SQLException("Streaming without order doesn't supported by server [remoteNodeVer="
+ + cliIo.igniteVersion() + ']', SqlStateCode.INTERNAL_ERROR);
+ }
+
sendRequest(new JdbcQueryExecuteRequest(JdbcStatementType.ANY_STATEMENT_TYPE,
schema, 1, 1, sql, null));
- streamBatchSize = ((SqlSetStreamingCommand)cmd).batchSize();
-
- stream = true;
+ streamState = new StreamState((SqlSetStreamingCommand)cmd);
}
}
else
@@ -214,39 +212,9 @@ public class JdbcThinConnection implements Connection {
* @throws SQLException On error.
*/
void addBatch(String sql, List<Object> args) throws SQLException {
- boolean newQry = (args == null || !F.eq(lastStreamQry, sql));
-
- // Providing null as SQL here allows for recognizing subbatches on server and handling them more efficiently.
- JdbcQuery q = new JdbcQuery(newQry ? sql : null, args != null ? args.toArray() : null);
-
- if (streamBatch == null)
- streamBatch = new ArrayList<>(streamBatchSize);
-
- streamBatch.add(q);
-
- // Null args means "addBatch(String)" was called on non-prepared Statement,
- // we don't want to remember its query string.
- lastStreamQry = (args != null ? sql : null);
-
- if (streamBatch.size() == streamBatchSize)
- executeBatch(false);
- }
-
- /**
- * @param lastBatch Whether open data streamers must be flushed and closed after this batch.
- * @throws SQLException if failed.
- */
- private void executeBatch(boolean lastBatch) throws SQLException {
- JdbcBatchExecuteResult res = sendRequest(new JdbcBatchExecuteRequest(schema, streamBatch, lastBatch));
-
- streamBatch = null;
+ assert isStream();
- lastStreamQry = null;
-
- if (res.errorCode() != ClientListenerResponse.STATUS_SUCCESS) {
- throw new BatchUpdateException(res.errorMessage(), IgniteQueryErrorCode.codeToSqlState(res.errorCode()),
- res.errorCode(), res.updateCounts());
- }
+ streamState.addBatch(sql, args);
}
/** {@inheritDoc} */
@@ -399,13 +367,10 @@ public class JdbcThinConnection implements Connection {
if (isClosed())
return;
- if (!F.isEmpty(streamBatch)) {
- try {
- executeBatch(true);
- }
- catch (SQLException e) {
- LOG.log(Level.WARNING, "Exception during batch send on streamed connection close", e);
- }
+ if (streamState != null) {
+ streamState.close();
+
+ streamState = null;
}
closed = true;
@@ -798,6 +763,28 @@ public class JdbcThinConnection implements Connection {
}
/**
+ * Send request for execution via {@link #cliIo}. Response is waited at the separate thread
+ * (see {@link StreamState#asyncRespReaderThread}).
+ * @param req Request.
+ * @throws SQLException On any error.
+ */
+ private void sendRequestNotWaitResponse(JdbcOrderedBatchExecuteRequest req) throws SQLException {
+ ensureConnected();
+
+ try {
+ cliIo.sendBatchRequestNoWaitResponse(req);
+ }
+ catch (SQLException e) {
+ throw e;
+ }
+ catch (Exception e) {
+ onDisconnect();
+
+ throw new SQLException("Failed to communicate with Ignite cluster.", SqlStateCode.CONNECTION_FAILURE, e);
+ }
+ }
+
+ /**
* @return Connection URL.
*/
public String url() {
@@ -815,9 +802,11 @@ public class JdbcThinConnection implements Connection {
connected = false;
- streamBatch = null;
+ if (streamState != null) {
+ streamState.close0();
- lastStreamQry = null;
+ streamState = null;
+ }
synchronized (stmtsMux) {
for (JdbcThinStatement s : stmts)
@@ -846,4 +835,203 @@ public class JdbcThinConnection implements Connection {
return res;
}
+
+ /**
+ * Streamer state and
+ */
+ private class StreamState {
+ /** Maximum requests count that may be sent before any responses. */
+ private static final int MAX_REQUESTS_BEFORE_RESPONSE = 10;
+
+ /** Wait timeout. */
+ private static final long WAIT_TIMEOUT = 1;
+
+ /** Batch size for streaming. */
+ private int streamBatchSize;
+
+ /** Batch for streaming. */
+ private List<JdbcQuery> streamBatch;
+
+ /** Last added query to recognize batches. */
+ private String lastStreamQry;
+
+ /** Keep request order on execution. */
+ private long order;
+
+ /** Async response reader thread. */
+ private Thread asyncRespReaderThread;
+
+ /** Async response error. */
+ private volatile Exception err;
+
+ /** The order of the last batch request at the stream. */
+ private long lastRespOrder = -1;
+
+ /** Last response future. */
+ private final GridFutureAdapter<Void> lastRespFut = new GridFutureAdapter<>();
+
+ /** Response semaphore sem. */
+ private Semaphore respSem = new Semaphore(MAX_REQUESTS_BEFORE_RESPONSE);
+
+ /**
+ * @param cmd Stream cmd.
+ */
+ StreamState(SqlSetStreamingCommand cmd) {
+ streamBatchSize = cmd.batchSize();
+
+ asyncRespReaderThread = new Thread(this::readResponses);
+
+ asyncRespReaderThread.start();
+ }
+
+ /**
+ * Add another query for batched execution.
+ * @param sql Query.
+ * @param args Arguments.
+ * @throws SQLException On error.
+ */
+ void addBatch(String sql, List<Object> args) throws SQLException {
+ checkError();
+
+ boolean newQry = (args == null || !F.eq(lastStreamQry, sql));
+
+ // Providing null as SQL here allows for recognizing subbatches on server and handling them more efficiently.
+ JdbcQuery q = new JdbcQuery(newQry ? sql : null, args != null ? args.toArray() : null);
+
+ if (streamBatch == null)
+ streamBatch = new ArrayList<>(streamBatchSize);
+
+ streamBatch.add(q);
+
+ // Null args means "addBatch(String)" was called on non-prepared Statement,
+ // we don't want to remember its query string.
+ lastStreamQry = (args != null ? sql : null);
+
+ if (streamBatch.size() == streamBatchSize)
+ executeBatch(false);
+ }
+
+ /**
+ * @param lastBatch Whether open data streamers must be flushed and closed after this batch.
+ * @throws SQLException if failed.
+ */
+ private void executeBatch(boolean lastBatch) throws SQLException {
+ checkError();
+
+ if (lastBatch)
+ lastRespOrder = order;
+
+ try {
+ respSem.acquire();
+
+ sendRequestNotWaitResponse(
+ new JdbcOrderedBatchExecuteRequest(schema, streamBatch, lastBatch, order));
+
+ streamBatch = null;
+
+ lastStreamQry = null;
+
+ if (lastBatch) {
+ try {
+ lastRespFut.get();
+ }
+ catch (IgniteCheckedException e) {
+ // No-op.
+ // No exceptions are expected here.
+ }
+
+ checkError();
+ }
+ else
+ order++;
+ }
+ catch (InterruptedException e) {
+ throw new SQLException("Streaming operation was interrupted", SqlStateCode.INTERNAL_ERROR, e);
+ }
+ }
+
+ /**
+ * Throws at the user thread exception that was thrown at the {@link #asyncRespReaderThread} thread.
+ * @throws SQLException Saved exception.
+ */
+ void checkError() throws SQLException {
+ if (err != null) {
+ Exception err0 = err;
+
+ err = null;
+
+ if (err0 instanceof SQLException)
+ throw (SQLException)err0;
+ else {
+ onDisconnect();
+
+ throw new SQLException("Failed to communicate with Ignite cluster on JDBC streaming.",
+ SqlStateCode.CONNECTION_FAILURE, err0);
+ }
+ }
+ }
+
+ /**
+ * @throws SQLException On error.
+ */
+ void close() throws SQLException {
+ close0();
+
+ checkError();
+ }
+
+ /**
+ */
+ void close0() {
+ if (connected) {
+ try {
+ executeBatch(true);
+ }
+ catch (SQLException e) {
+ err = e;
+
+ LOG.log(Level.WARNING, "Exception during batch send on streamed connection close", e);
+ }
+ }
+
+ if (asyncRespReaderThread != null)
+ asyncRespReaderThread.interrupt();
+ }
+
+ /**
+ *
+ */
+ void readResponses () {
+ try {
+ while (true) {
+ JdbcResponse resp = cliIo.readResponse();
+
+ if (resp.response() instanceof JdbcOrderedBatchExecuteResult) {
+ JdbcOrderedBatchExecuteResult res = (JdbcOrderedBatchExecuteResult)resp.response();
+
+ respSem.release();
+
+ if (res.errorCode() != ClientListenerResponse.STATUS_SUCCESS) {
+ err = new BatchUpdateException(res.errorMessage(),
+ IgniteQueryErrorCode.codeToSqlState(res.errorCode()),
+ res.errorCode(), res.updateCounts());
+ }
+
+ // Receive the response for the last request.
+ if (res.order() == lastRespOrder) {
+ lastRespFut.onDone();
+
+ break;
+ }
+ }
+
+ if (resp.status() != ClientListenerResponse.STATUS_SUCCESS)
+ err = new SQLException(resp.error(), IgniteQueryErrorCode.codeToSqlState(resp.status()));
+ }
+ }
+ catch (Exception e) {
+ err = e;
+ }
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/01f60542/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java
index 4631e5d..44c1984 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java
@@ -38,6 +38,7 @@ import org.apache.ignite.internal.processors.odbc.ClientListenerProtocolVersion;
import org.apache.ignite.internal.processors.odbc.ClientListenerRequest;
import org.apache.ignite.internal.processors.odbc.SqlStateCode;
import org.apache.ignite.internal.processors.odbc.jdbc.JdbcBatchExecuteRequest;
+import org.apache.ignite.internal.processors.odbc.jdbc.JdbcOrderedBatchExecuteRequest;
import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQuery;
import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQueryCloseRequest;
import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQueryFetchRequest;
@@ -419,6 +420,44 @@ public class JdbcThinTcpIo {
/**
* @param req Request.
+ * @throws IOException In case of IO error.
+ * @throws SQLException On error.
+ */
+ void sendBatchRequestNoWaitResponse(JdbcOrderedBatchExecuteRequest req) throws IOException, SQLException {
+ synchronized (mux) {
+ if (ownThread != null) {
+ throw new SQLException("Concurrent access to JDBC connection is not allowed"
+ + " [ownThread=" + ownThread.getName()
+ + ", curThread=" + Thread.currentThread().getName(), SqlStateCode.CONNECTION_FAILURE);
+ }
+
+ ownThread = Thread.currentThread();
+ }
+
+ try {
+ if (!igniteVer.greaterThanEqual(2, 5, 0)) {
+ throw new SQLException("Streaming without response doesn't supported by server [driverProtocolVer="
+ + CURRENT_VER + ", remoteNodeVer=" + igniteVer + ']', SqlStateCode.INTERNAL_ERROR);
+ }
+
+ int cap = guessCapacity(req);
+
+ BinaryWriterExImpl writer = new BinaryWriterExImpl(null, new BinaryHeapOutputStream(cap),
+ null, null);
+
+ req.writeBinary(writer);
+
+ send(writer.array());
+ }
+ finally {
+ synchronized (mux) {
+ ownThread = null;
+ }
+ }
+ }
+
+ /**
+ * @param req Request.
* @return Server response.
* @throws IOException In case of IO error.
* @throws SQLException On concurrent access to JDBC connection.
@@ -444,13 +483,7 @@ public class JdbcThinTcpIo {
send(writer.array());
- BinaryReaderExImpl reader = new BinaryReaderExImpl(null, new BinaryHeapInputStream(read()), null, null, false);
-
- JdbcResponse res = new JdbcResponse();
-
- res.readBinary(reader);
-
- return res;
+ return readResponse();
}
finally {
synchronized (mux) {
@@ -460,6 +493,22 @@ public class JdbcThinTcpIo {
}
/**
+ * @return Server response.
+ * @throws IOException In case of IO error.
+ */
+ @SuppressWarnings("unchecked")
+ JdbcResponse readResponse() throws IOException {
+ BinaryReaderExImpl reader = new BinaryReaderExImpl(null, new BinaryHeapInputStream(read()), null, null, false);
+
+ JdbcResponse res = new JdbcResponse();
+
+ res.readBinary(reader);
+
+ return res;
+ }
+
+
+ /**
* Try to guess request capacity.
*
* @param req Request.
http://git-wip-us.apache.org/repos/asf/ignite/blob/01f60542/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioListener.java
index 407c1a0..be55ab9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioListener.java
@@ -159,16 +159,18 @@ public class ClientListenerNioListener extends GridNioServerListenerAdapter<byte
ClientListenerResponse resp = handler.handle(req);
- if (log.isDebugEnabled()) {
- long dur = (System.nanoTime() - startTime) / 1000;
+ if (resp != null) {
+ if (log.isDebugEnabled()) {
+ long dur = (System.nanoTime() - startTime) / 1000;
- log.debug("Client request processed [reqId=" + req.requestId() + ", dur(mcs)=" + dur +
- ", resp=" + resp.status() + ']');
- }
+ log.debug("Client request processed [reqId=" + req.requestId() + ", dur(mcs)=" + dur +
+ ", resp=" + resp.status() + ']');
+ }
- byte[] outMsg = parser.encode(resp);
+ byte[] outMsg = parser.encode(resp);
- ses.send(outMsg);
+ ses.send(outMsg);
+ }
}
catch (Exception e) {
U.error(log, "Failed to process client request [req=" + req + ']', e);
@@ -216,7 +218,7 @@ public class ClientListenerNioListener extends GridNioServerListenerAdapter<byte
ClientListenerConnectionContext connCtx = null;
try {
- connCtx = prepareContext(clientType);
+ connCtx = prepareContext(ses, clientType);
ensureClientPermissions(clientType);
@@ -270,17 +272,18 @@ public class ClientListenerNioListener extends GridNioServerListenerAdapter<byte
/**
* Prepare context.
*
+ * @param ses Session.
* @param clientType Client type.
* @return Context.
* @throws IgniteCheckedException If failed.
*/
- private ClientListenerConnectionContext prepareContext(byte clientType) throws IgniteCheckedException {
+ private ClientListenerConnectionContext prepareContext(GridNioSession ses, byte clientType) throws IgniteCheckedException {
switch (clientType) {
case ODBC_CLIENT:
return new OdbcConnectionContext(ctx, busyLock, maxCursors);
case JDBC_CLIENT:
- return new JdbcConnectionContext(ctx, busyLock, maxCursors);
+ return new JdbcConnectionContext(ctx, ses, busyLock, maxCursors);
case THIN_CLIENT:
return new ClientConnectionContext(ctx, maxCursors);
http://git-wip-us.apache.org/repos/asf/ignite/blob/01f60542/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBatchExecuteRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBatchExecuteRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBatchExecuteRequest.java
index 73fd04f..bdc558c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBatchExecuteRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBatchExecuteRequest.java
@@ -53,8 +53,17 @@ public class JdbcBatchExecuteRequest extends JdbcRequest {
}
/**
+ * Constructor for child requests.
+ * @param type Request type/
+ */
+ protected JdbcBatchExecuteRequest(byte type) {
+ super(type);
+ }
+
+ /**
* @param schemaName Schema name.
* @param queries Queries.
+ * @param lastStreamBatch {@code true} in case the request is the last batch at the stream.
*/
public JdbcBatchExecuteRequest(String schemaName, List<JdbcQuery> queries, boolean lastStreamBatch) {
super(BATCH_EXEC);
@@ -67,6 +76,24 @@ public class JdbcBatchExecuteRequest extends JdbcRequest {
}
/**
+ * Constructor for child requests.
+ *
+ * @param type Request type.
+ * @param schemaName Schema name.
+ * @param queries Queries.
+ * @param lastStreamBatch {@code true} in case the request is the last batch at the stream.
+ */
+ protected JdbcBatchExecuteRequest(byte type, String schemaName, List<JdbcQuery> queries, boolean lastStreamBatch) {
+ super(type);
+
+ assert lastStreamBatch || !F.isEmpty(queries);
+
+ this.schemaName = schemaName;
+ this.queries = queries;
+ this.lastStreamBatch = lastStreamBatch;
+ }
+
+ /**
* @return Schema name.
*/
@Nullable public String schemaName() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/01f60542/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBatchExecuteResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBatchExecuteResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBatchExecuteResult.java
index 917e60a..3fc9dd7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBatchExecuteResult.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBatchExecuteResult.java
@@ -36,18 +36,26 @@ public class JdbcBatchExecuteResult extends JdbcResult {
private String errMsg;
/**
- * Condtructor.
+ * Constructor.
*/
- public JdbcBatchExecuteResult() {
+ JdbcBatchExecuteResult() {
super(BATCH_EXEC);
}
/**
+ * Constructor for child results.
+ * @param type Result type.
+ */
+ JdbcBatchExecuteResult(byte type) {
+ super(type);
+ }
+
+ /**
* @param updateCnts Update counts for batch.
* @param errCode Error code.
* @param errMsg Error message.
*/
- public JdbcBatchExecuteResult(int [] updateCnts, int errCode, String errMsg) {
+ JdbcBatchExecuteResult(int [] updateCnts, int errCode, String errMsg) {
super(BATCH_EXEC);
this.updateCnts = updateCnts;
@@ -56,6 +64,18 @@ public class JdbcBatchExecuteResult extends JdbcResult {
}
/**
+ * @param type Result type.
+ * @param res Result.
+ */
+ JdbcBatchExecuteResult(byte type, JdbcBatchExecuteResult res) {
+ super(type);
+
+ this.updateCnts = res.updateCnts;
+ this.errCode = res.errCode;
+ this.errMsg = res.errMsg;
+ }
+
+ /**
* @return Update count for DML queries.
*/
public int[] updateCounts() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/01f60542/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java
index ed37e0b..391d5f7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java
@@ -17,10 +17,10 @@
package org.apache.ignite.internal.processors.odbc.jdbc;
-import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.binary.BinaryReaderExImpl;
import org.apache.ignite.internal.processors.authentication.AuthorizationContext;
@@ -28,7 +28,9 @@ import org.apache.ignite.internal.processors.odbc.ClientListenerConnectionContex
import org.apache.ignite.internal.processors.odbc.ClientListenerMessageParser;
import org.apache.ignite.internal.processors.odbc.ClientListenerProtocolVersion;
import org.apache.ignite.internal.processors.odbc.ClientListenerRequestHandler;
+import org.apache.ignite.internal.processors.odbc.ClientListenerResponse;
import org.apache.ignite.internal.util.GridSpinBusyLock;
+import org.apache.ignite.internal.util.nio.GridNioSession;
import org.apache.ignite.internal.util.typedef.F;
/**
@@ -59,9 +61,15 @@ public class JdbcConnectionContext implements ClientListenerConnectionContext {
/** Context. */
private final GridKernalContext ctx;
+ /** Session. */
+ private final GridNioSession ses;
+
/** Shutdown busy lock. */
private final GridSpinBusyLock busyLock;
+ /** Logger. */
+ private final IgniteLogger log;
+
/** Maximum allowed cursors. */
private final int maxCursors;
@@ -83,13 +91,17 @@ public class JdbcConnectionContext implements ClientListenerConnectionContext {
/**
* Constructor.
* @param ctx Kernal Context.
+ * @param ses Session.
* @param busyLock Shutdown busy lock.
* @param maxCursors Maximum allowed cursors.
*/
- public JdbcConnectionContext(GridKernalContext ctx, GridSpinBusyLock busyLock, int maxCursors) {
+ public JdbcConnectionContext(GridKernalContext ctx, GridNioSession ses, GridSpinBusyLock busyLock, int maxCursors) {
this.ctx = ctx;
+ this.ses = ses;
this.busyLock = busyLock;
this.maxCursors = maxCursors;
+
+ log = ctx.log(getClass());
}
/** {@inheritDoc} */
@@ -146,11 +158,23 @@ public class JdbcConnectionContext implements ClientListenerConnectionContext {
catch (Exception e) {
throw new IgniteCheckedException("Handshake error: " + e.getMessage(), e);
}
+ parser = new JdbcMessageParser(ctx);
- handler = new JdbcRequestHandler(ctx, busyLock, maxCursors, distributedJoins, enforceJoinOrder,
- collocated, replicatedOnly, autoCloseCursors, lazyExec, skipReducerOnUpdate, actx, ver);
+ JdbcResponseSender sender = new JdbcResponseSender() {
+ @Override public void send(ClientListenerResponse resp) {
+ if (resp != null) {
+ if (log.isDebugEnabled())
+ log.debug("Async response: [resp=" + resp.status() + ']');
- parser = new JdbcMessageParser(ctx);
+ byte[] outMsg = parser.encode(resp);
+
+ ses.send(outMsg);
+ }
+ }
+ };
+
+ handler = new JdbcRequestHandler(ctx, busyLock, sender, maxCursors, distributedJoins, enforceJoinOrder,
+ collocated, replicatedOnly, autoCloseCursors, lazyExec, skipReducerOnUpdate, actx, ver);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/01f60542/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcOrderedBatchExecuteRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcOrderedBatchExecuteRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcOrderedBatchExecuteRequest.java
new file mode 100644
index 0000000..3e84731
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcOrderedBatchExecuteRequest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.odbc.jdbc;
+
+import java.util.List;
+import org.apache.ignite.binary.BinaryObjectException;
+import org.apache.ignite.internal.binary.BinaryReaderExImpl;
+import org.apache.ignite.internal.binary.BinaryWriterExImpl;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * JDBC batch execute ordered request.
+ */
+public class JdbcOrderedBatchExecuteRequest extends JdbcBatchExecuteRequest
+ implements Comparable<JdbcOrderedBatchExecuteRequest> {
+ /** Order. */
+ private long order;
+
+ /**
+ * Default constructor.
+ */
+ public JdbcOrderedBatchExecuteRequest() {
+ super(BATCH_EXEC_ORDERED);
+ }
+
+ /**
+ * @param schemaName Schema name.
+ * @param queries Queries.
+ * @param lastStreamBatch {@code true} in case the request is the last batch at the stream.
+ * @param order Request order.
+ */
+ public JdbcOrderedBatchExecuteRequest(String schemaName, List<JdbcQuery> queries,
+ boolean lastStreamBatch, long order) {
+ super(BATCH_EXEC_ORDERED, schemaName, queries, lastStreamBatch);
+
+ this.order = order;
+ }
+
+ /**
+ * @return Request order.
+ */
+ public long order() {
+ return order;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeBinary(BinaryWriterExImpl writer) throws BinaryObjectException {
+ super.writeBinary(writer);
+
+ writer.writeLong(order);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readBinary(BinaryReaderExImpl reader) throws BinaryObjectException {
+ super.readBinary(reader);
+
+ order = reader.readLong();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(JdbcOrderedBatchExecuteRequest.class, this);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int compareTo(@NotNull JdbcOrderedBatchExecuteRequest o) {
+ return Long.compare(order, o.order);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/01f60542/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcOrderedBatchExecuteResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcOrderedBatchExecuteResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcOrderedBatchExecuteResult.java
new file mode 100644
index 0000000..84853d4
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcOrderedBatchExecuteResult.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.odbc.jdbc;
+
+import org.apache.ignite.binary.BinaryObjectException;
+import org.apache.ignite.internal.binary.BinaryReaderExImpl;
+import org.apache.ignite.internal.binary.BinaryWriterExImpl;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * JDBC batch execute ordered result.
+ */
+public class JdbcOrderedBatchExecuteResult extends JdbcBatchExecuteResult {
+ /** Order. */
+ private long order;
+
+ /**
+ * Constructor.
+ */
+ public JdbcOrderedBatchExecuteResult() {
+ super(BATCH_EXEC_ORDERED);
+ }
+
+ /**
+ * @param res Result.
+ * @param order Order.
+ */
+ public JdbcOrderedBatchExecuteResult(JdbcBatchExecuteResult res, long order) {
+ super(BATCH_EXEC_ORDERED, res);
+
+ this.order = order;
+ }
+
+ /**
+ * @return Order.
+ */
+ public long order() {
+ return order;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeBinary(BinaryWriterExImpl writer) throws BinaryObjectException {
+ super.writeBinary(writer);
+
+ writer.writeLong(order);
+ }
+
+
+ /** {@inheritDoc} */
+ @Override public void readBinary(BinaryReaderExImpl reader) throws BinaryObjectException {
+ super.readBinary(reader);
+
+ order = reader.readLong();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(JdbcOrderedBatchExecuteResult.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/01f60542/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequest.java
index 22522ad..3d5b869 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequest.java
@@ -63,6 +63,9 @@ public class JdbcRequest extends ClientListenerRequestNoId implements JdbcRawBin
/** Send a batch of a data from client to server. */
static final byte BULK_LOAD_BATCH = 13;
+ /** Ordered batch request. */
+ static final byte BATCH_EXEC_ORDERED = 14;
+
/** Request type. */
private byte type;
@@ -161,6 +164,11 @@ public class JdbcRequest extends ClientListenerRequestNoId implements JdbcRawBin
break;
+ case BATCH_EXEC_ORDERED:
+ req = new JdbcOrderedBatchExecuteRequest();
+
+ break;
+
default:
throw new IgniteException("Unknown SQL listener request ID: [request ID=" + reqType + ']');
}