You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2018/05/11 15:00:25 UTC

[2/3] ignite git commit: IGNITE-7999: JDBC Thin Driver: added unordered streaming mode. This closes #3789.

IGNITE-7999: JDBC Thin Driver: added unordered streaming mode. This closes #3789.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d05c28ca
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d05c28ca
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d05c28ca

Branch: refs/heads/ignite-2.5
Commit: d05c28ca9850d116daedd8303e035345f15f5b82
Parents: 46dac58
Author: tledkov-gridgain <tl...@gridgain.com>
Authored: Fri May 11 17:55:22 2018 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Fri May 11 17:59:58 2018 +0300

----------------------------------------------------------------------
 .../internal/jdbc2/JdbcStreamingSelfTest.java   |   3 -
 .../jdbc/suite/IgniteJdbcDriverTestSuite.java   |   6 +-
 ...JdbcThinConnectionMultipleAddressesTest.java |   7 +-
 .../thin/JdbcThinStreamingAbstractSelfTest.java | 505 +++++++++++++++++++
 .../JdbcThinStreamingNotOrderedSelfTest.java    |  38 ++
 .../thin/JdbcThinStreamingOrderedSelfTest.java  |  39 ++
 .../jdbc/thin/JdbcThinStreamingSelfTest.java    | 486 ------------------
 .../internal/jdbc/thin/JdbcThinConnection.java  | 312 +++++++++---
 .../internal/jdbc/thin/JdbcThinTcpIo.java       |  63 ++-
 .../odbc/ClientListenerNioListener.java         |  23 +-
 .../odbc/jdbc/JdbcBatchExecuteRequest.java      |  27 +
 .../odbc/jdbc/JdbcBatchExecuteResult.java       |  26 +-
 .../odbc/jdbc/JdbcConnectionContext.java        |  34 +-
 .../jdbc/JdbcOrderedBatchExecuteRequest.java    |  85 ++++
 .../jdbc/JdbcOrderedBatchExecuteResult.java     |  75 +++
 .../processors/odbc/jdbc/JdbcRequest.java       |   8 +
 .../odbc/jdbc/JdbcRequestHandler.java           | 119 ++++-
 .../odbc/jdbc/JdbcResponseSender.java           |  31 ++
 .../processors/odbc/jdbc/JdbcResult.java        |  13 +
 .../processors/query/SqlClientContext.java      | 147 ++++--
 .../apache/ignite/internal/sql/SqlKeyword.java  |   3 +
 .../sql/command/SqlSetStreamingCommand.java     |  19 +
 .../sql/SqlParserSetStreamingSelfTest.java      |  39 +-
 .../processors/query/h2/IgniteH2Indexing.java   |   2 +-
 .../config/ignite-localhost-config.xml          |   2 -
 .../benchmark-jdbc-thin-streaming.properties    | 132 +++++
 .../ignite/yardstick/upload/StreamerParams.java |   7 +-
 .../upload/UploadBenchmarkArguments.java        |  19 +-
 .../yardstick/upload/model/QueryFactory.java    |   2 +
 29 files changed, 1635 insertions(+), 637 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/d05c28ca/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcStreamingSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcStreamingSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcStreamingSelfTest.java
index 10adedc..e302529 100644
--- a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcStreamingSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcStreamingSelfTest.java
@@ -56,9 +56,6 @@ public class JdbcStreamingSelfTest extends GridCommonAbstractTest {
     private static final String STREAMING_URL = CFG_URL_PREFIX +
         "cache=person@modules/clients/src/test/config/jdbc-config.xml";
 
-    /** */
-    protected transient IgniteLogger log;
-
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
         return getConfiguration0(gridName);

http://git-wip-us.apache.org/repos/asf/ignite/blob/d05c28ca/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java
index a88ebe8..a18cb45 100644
--- a/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java
+++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java
@@ -70,7 +70,8 @@ import org.apache.ignite.jdbc.thin.JdbcThinResultSetSelfTest;
 import org.apache.ignite.jdbc.thin.JdbcThinSchemaCaseTest;
 import org.apache.ignite.jdbc.thin.JdbcThinSelectAfterAlterTable;
 import org.apache.ignite.jdbc.thin.JdbcThinStatementSelfTest;
-import org.apache.ignite.jdbc.thin.JdbcThinStreamingSelfTest;
+import org.apache.ignite.jdbc.thin.JdbcThinStreamingNotOrderedSelfTest;
+import org.apache.ignite.jdbc.thin.JdbcThinStreamingOrderedSelfTest;
 import org.apache.ignite.jdbc.thin.JdbcThinTcpIoTest;
 import org.apache.ignite.jdbc.thin.JdbcThinUpdateStatementSelfTest;
 import org.apache.ignite.jdbc.thin.JdbcThinUpdateStatementSkipReducerOnUpdateSelfTest;
@@ -128,7 +129,8 @@ public class IgniteJdbcDriverTestSuite extends TestSuite {
 
         suite.addTest(new TestSuite(JdbcBlobTest.class));
         suite.addTest(new TestSuite(org.apache.ignite.internal.jdbc2.JdbcStreamingSelfTest.class));
-        suite.addTest(new TestSuite(JdbcThinStreamingSelfTest.class));
+        suite.addTest(new TestSuite(JdbcThinStreamingNotOrderedSelfTest.class));
+        suite.addTest(new TestSuite(JdbcThinStreamingOrderedSelfTest.class));
 
         // DDL tests.
         suite.addTest(new TestSuite(org.apache.ignite.internal.jdbc2.JdbcDynamicIndexAtomicPartitionedNearSelfTest.class));

http://git-wip-us.apache.org/repos/asf/ignite/blob/d05c28ca/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionMultipleAddressesTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionMultipleAddressesTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionMultipleAddressesTest.java
index 2c2aba9..e1fb295 100644
--- a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionMultipleAddressesTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionMultipleAddressesTest.java
@@ -372,9 +372,7 @@ public class JdbcThinConnectionMultipleAddressesTest extends JdbcThinAbstractSel
 
                     return null;
                 }
-            }, SQLException.class, "Failed to communicate with Ignite cluster");
-
-            assertTrue(id[0] > 0);
+            }, SQLException.class, "Failed to communicate with Ignite cluster on JDBC streaming");
 
             int minId = id[0];
 
@@ -382,6 +380,9 @@ public class JdbcThinConnectionMultipleAddressesTest extends JdbcThinAbstractSel
 
             final Statement stmt1 = conn.createStatement();
 
+            stmt1.execute("SET STREAMING 1 BATCH_SIZE 10 ALLOW_OVERWRITE 0 " +
+                " PER_NODE_BUFFER_SIZE 1000 FLUSH_FREQUENCY 1000");
+
             for (int i = 0; i < 10; ++i, id[0]++)
                 stmt1.execute("INSERT INTO TEST(id, val) values (" + id[0] + ", " + id[0] + ")");
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/d05c28ca/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingAbstractSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingAbstractSelfTest.java
new file mode 100644
index 0000000..7004635
--- /dev/null
+++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingAbstractSelfTest.java
@@ -0,0 +1,505 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.jdbc.thin;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.concurrent.Callable;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.cache.CachePeekMode;
+import org.apache.ignite.cache.query.FieldsQueryCursor;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.internal.jdbc2.JdbcStreamingSelfTest;
+import org.apache.ignite.internal.processors.query.GridQueryCancel;
+import org.apache.ignite.internal.processors.query.GridQueryProcessor;
+import org.apache.ignite.internal.processors.query.SqlClientContext;
+import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Tests for streaming via thin driver.
+ */
+public abstract class JdbcThinStreamingAbstractSelfTest extends JdbcStreamingSelfTest {
+    /** */
+    protected int batchSize = 17;
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        GridQueryProcessor.idxCls = IndexingWithContext.class;
+
+        super.beforeTestsStarted();
+
+        batchSize = 17;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        // Init IndexingWithContext.cliCtx
+        try (Connection c = createOrdinaryConnection()) {
+            execute(c, "SELECT 1");
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        try (Connection c = createOrdinaryConnection()) {
+            execute(c, "DROP TABLE PUBLIC.T IF EXISTS");
+        }
+
+        IndexingWithContext.cliCtx = null;
+
+        super.afterTest();
+    }
+
+        /** {@inheritDoc} */
+    @Override protected Connection createOrdinaryConnection() throws SQLException {
+        return JdbcThinAbstractSelfTest.connect(grid(0), null);
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testStreamedBatchedInsert() throws Exception {
+        for (int i = 10; i <= 100; i += 10)
+            put(i, nameForId(i * 100));
+
+        try (Connection conn = createStreamedConnection(false)) {
+            assertStreamingState(true);
+
+            try (PreparedStatement stmt = conn.prepareStatement("insert into Person(\"id\", \"name\") values (?, ?), " +
+                "(?, ?)")) {
+                for (int i = 1; i <= 100; i+= 2) {
+                    stmt.setInt(1, i);
+                    stmt.setString(2, nameForId(i));
+                    stmt.setInt(3, i + 1);
+                    stmt.setString(4, nameForId(i + 1));
+
+                    stmt.addBatch();
+                }
+
+                stmt.executeBatch();
+            }
+        }
+
+        U.sleep(500);
+
+        // Now let's check it's all there.
+        for (int i = 1; i <= 100; i++) {
+            if (i % 10 != 0)
+                assertEquals(nameForId(i), nameForIdInCache(i));
+            else // All that divides by 10 evenly should point to numbers 100 times greater - see above
+                assertEquals(nameForId(i * 100), nameForIdInCache(i));
+        }
+    }
+
+    /**
+     * @throws SQLException if failed.
+     */
+    public void testSimultaneousStreaming() throws Exception {
+        try (Connection anotherConn = createOrdinaryConnection()) {
+            execute(anotherConn, "CREATE TABLE PUBLIC.T(x int primary key, y int) WITH " +
+                "\"cache_name=T,wrap_value=false\"");
+        }
+
+        // Timeout to let connection close be handled on server side.
+        U.sleep(500);
+
+        try (Connection conn = createStreamedConnection(false, 10000)) {
+            assertStreamingState(true);
+
+            PreparedStatement firstStmt = conn.prepareStatement("insert into Person(\"id\", \"name\") values (?, ?)");
+
+            PreparedStatement secondStmt = conn.prepareStatement("insert into PUBLIC.T(x, y) values (?, ?)");
+
+            try {
+                for (int i = 1; i <= 10; i++) {
+                    firstStmt.setInt(1, i);
+                    firstStmt.setString(2, nameForId(i));
+
+                    firstStmt.executeUpdate();
+                }
+
+                for (int i = 51; i <= 67; i++) {
+                    secondStmt.setInt(1, i);
+                    secondStmt.setInt(2, i);
+
+                    secondStmt.executeUpdate();
+                }
+
+                for (int i = 11; i <= 50; i++) {
+                    firstStmt.setInt(1, i);
+                    firstStmt.setString(2, nameForId(i));
+
+                    firstStmt.executeUpdate();
+                }
+
+                for (int i = 68; i <= 100; i++) {
+                    secondStmt.setInt(1, i);
+                    secondStmt.setInt(2, i);
+
+                    secondStmt.executeUpdate();
+                }
+
+                assertCacheEmpty();
+
+                SqlClientContext cliCtx = sqlClientContext();
+
+                final HashMap<String, IgniteDataStreamer<?, ?>> streamers = U.field(cliCtx, "streamers");
+
+                // Wait when node process requests (because client send batch requests async).
+                GridTestUtils.waitForCondition(() -> streamers.size() == 2, 1000);
+
+                assertEquals(2, streamers.size());
+
+                assertEqualsCollections(new HashSet<>(Arrays.asList("person", "T")), streamers.keySet());
+            }
+            finally {
+                U.closeQuiet(firstStmt);
+
+                U.closeQuiet(secondStmt);
+            }
+        }
+
+        // Let's wait a little so that all data arrives to destination - we can't intercept streamers' flush
+        // on connection close in any way.
+        U.sleep(1000);
+
+        // Now let's check it's all there.
+        for (int i = 1; i <= 50; i++)
+            assertEquals(nameForId(i), nameForIdInCache(i));
+
+        for (int i = 51; i <= 100; i++)
+            assertEquals(i, grid(0).cache("T").get(i));
+    }
+
+    /**
+     *
+     */
+    public void testStreamingWithMixedStatementTypes() throws Exception {
+        String prepStmtStr = "insert into Person(\"id\", \"name\") values (?, ?)";
+
+        String stmtStr = "insert into Person(\"id\", \"name\") values (%d, '%s')";
+
+        try (Connection conn = createStreamedConnection(false, 10000)) {
+            assertStreamingState(true);
+
+            PreparedStatement firstStmt = conn.prepareStatement(prepStmtStr);
+
+            Statement secondStmt = conn.createStatement();
+
+            try {
+                for (int i = 1; i <= 100; i++) {
+                    boolean usePrep = Math.random() > 0.5;
+
+                    boolean useBatch = Math.random() > 0.5;
+
+                    if (usePrep) {
+                        firstStmt.setInt(1, i);
+                        firstStmt.setString(2, nameForId(i));
+
+                        if (useBatch)
+                            firstStmt.addBatch();
+                        else
+                            firstStmt.execute();
+                    }
+                    else {
+                        String sql = String.format(stmtStr, i, nameForId(i));
+
+                        if (useBatch)
+                            secondStmt.addBatch(sql);
+                        else
+                            secondStmt.execute(sql);
+                    }
+                }
+            }
+            finally {
+                U.closeQuiet(firstStmt);
+
+                U.closeQuiet(secondStmt);
+            }
+        }
+
+        // Let's wait a little so that all data arrives to destination - we can't intercept streamers' flush
+        // on connection close in any way.
+        U.sleep(1000);
+
+        // Now let's check it's all there.
+        for (int i = 1; i <= 100; i++)
+            assertEquals(nameForId(i), nameForIdInCache(i));
+    }
+
+    /**
+     * @throws SQLException if failed.
+     */
+    public void testStreamingOffToOn() throws Exception {
+        try (Connection conn = createOrdinaryConnection()) {
+            assertStreamingState(false);
+
+            execute(conn, "SET STREAMING 1");
+
+            assertStreamingState(true);
+        }
+    }
+
+    /**
+     * @throws SQLException if failed.
+     */
+    public void testStreamingOffToOff() throws Exception {
+        try (Connection conn = createOrdinaryConnection()) {
+            assertStreamingState(false);
+
+            execute(conn, "SET STREAMING 0");
+
+            assertStreamingState(false);
+        }
+    }
+
+    /**
+     * @throws SQLException if failed.
+     */
+    public void testStreamingOnToOff() throws Exception {
+        try (Connection conn = createStreamedConnection(false)) {
+            assertStreamingState(true);
+
+            execute(conn, "SET STREAMING off");
+
+            assertStreamingState(false);
+        }
+    }
+
+    /**
+     * @throws SQLException if failed.
+     */
+    public void testFlush() throws Exception {
+        try (Connection conn = createStreamedConnection(false, 10000)) {
+            assertStreamingState(true);
+
+            try (PreparedStatement stmt = conn.prepareStatement("insert into Person(\"id\", \"name\") values (?, ?)")) {
+                for (int i = 1; i <= 100; i++) {
+                    stmt.setInt(1, i);
+                    stmt.setString(2, nameForId(i));
+
+                    stmt.executeUpdate();
+                }
+            }
+
+            assertCacheEmpty();
+
+            execute(conn, "set streaming 0");
+
+            assertStreamingState(false);
+
+            U.sleep(500);
+
+            // Now let's check it's all there.
+            for (int i = 1; i <= 100; i++)
+                assertEquals(nameForId(i), nameForIdInCache(i));
+        }
+    }
+
+    /**
+     * @throws SQLException if failed.
+     */
+    public void testStreamingReEnabled() throws Exception {
+        try (Connection conn = createStreamedConnection(false, 10000)) {
+            assertStreamingState(true);
+
+            try (PreparedStatement stmt = conn.prepareStatement("insert into Person(\"id\", \"name\") values (?, ?)")) {
+                for (int i = 1; i <= 100; i++) {
+                    stmt.setInt(1, i);
+                    stmt.setString(2, nameForId(i));
+
+                    stmt.executeUpdate();
+                }
+            }
+
+            assertCacheEmpty();
+
+            execute(conn, "set streaming 1 batch_size 111 allow_overwrite 0 per_node_buffer_size 512 " +
+                "per_node_parallel_operations 4 flush_frequency 5000");
+
+            U.sleep(500);
+
+            assertEquals((Integer)111, U.field((Object)U.field(conn, "streamState"), "streamBatchSize"));
+
+            SqlClientContext cliCtx = sqlClientContext();
+
+            assertTrue(cliCtx.isStream());
+
+            assertFalse(U.field(cliCtx, "streamAllowOverwrite"));
+
+            assertEquals((Integer)512, U.field(cliCtx, "streamNodeBufSize"));
+
+            assertEquals((Long)5000L, U.field(cliCtx, "streamFlushTimeout"));
+
+            assertEquals((Integer)4, U.field(cliCtx, "streamNodeParOps"));
+
+            // Now let's check it's all there - SET STREAMING 1 repeated call must also have caused flush.
+            for (int i = 1; i <= 100; i++)
+                assertEquals(nameForId(i), nameForIdInCache(i));
+        }
+    }
+
+    /**
+     *
+     */
+    @SuppressWarnings("ThrowableNotThrown")
+    public void testNonStreamedBatch() {
+        GridTestUtils.assertThrows(null, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                try (Connection conn = createOrdinaryConnection()) {
+                    try (Statement s = conn.createStatement()) {
+                        for (int i = 1; i <= 10; i++)
+                            s.addBatch(String.format("insert into Person(\"id\", \"name\")values (%d, '%s')", i,
+                                nameForId(i)));
+
+                        execute(conn, "SET STREAMING 1");
+
+                        s.addBatch(String.format("insert into Person(\"id\", \"name\")values (%d, '%s')", 11,
+                            nameForId(11)));
+                    }
+                }
+
+                return null;
+            }
+        }, SQLException.class, "Statement has non-empty batch (call executeBatch() or clearBatch() before " +
+            "enabling streaming).");
+    }
+
+    /**
+     *
+     */
+    @SuppressWarnings("ThrowableNotThrown")
+    public void testStreamingStatementInTheMiddleOfNonPreparedBatch() {
+        GridTestUtils.assertThrows(null, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                try (Connection conn = createOrdinaryConnection()) {
+                    try (Statement s = conn.createStatement()) {
+                        s.addBatch(String.format("insert into Person(\"id\", \"name\")values (%d, '%s')", 1,
+                            nameForId(1)));
+
+                        s.addBatch("SET STREAMING 1 FLUSH_FREQUENCY 10000");
+                    }
+                }
+
+                return null;
+            }
+        }, SQLException.class, "Streaming control commands must be executed explicitly");
+    }
+
+    /**
+     *
+     */
+    @SuppressWarnings("ThrowableNotThrown")
+    public void testBatchingSetStreamingStatement() {
+        GridTestUtils.assertThrows(null, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                try (Connection conn = createOrdinaryConnection()) {
+                    try (PreparedStatement s = conn.prepareStatement("SET STREAMING 1 FLUSH_FREQUENCY 10000")) {
+                        s.addBatch();
+                    }
+                }
+
+                return null;
+            }
+        }, SQLException.class, "Streaming control commands must be executed explicitly");
+    }
+
+    /**
+     * Check that there's nothing in cache.
+     */
+    protected void assertCacheEmpty() {
+        assertEquals(0, cache().size(CachePeekMode.ALL));
+    }
+
+    /**
+     * @param conn Connection.
+     * @param sql Statement.
+     * @throws SQLException if failed.
+     */
+    protected static void execute(Connection conn, String sql) throws SQLException {
+        try (Statement s = conn.createStatement()) {
+            s.execute(sql);
+        }
+    }
+
+    /**
+     * @return Active SQL client context.
+     */
+    private SqlClientContext sqlClientContext() {
+        assertNotNull(IndexingWithContext.cliCtx);
+
+        return IndexingWithContext.cliCtx;
+    }
+
+    /**
+     * Check that streaming state on target node is as expected.
+     *
+     * @param on Expected streaming state.
+     */
+    protected void assertStreamingState(boolean on) throws Exception {
+        SqlClientContext cliCtx = sqlClientContext();
+
+        GridTestUtils.waitForCondition(() -> cliCtx.isStream() == on, 1000);
+
+        assertEquals(on, cliCtx.isStream());
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void assertStatementForbidden(String sql) {
+        batchSize = 1;
+
+        super.assertStatementForbidden(sql);
+    }
+
+    /**
+     *
+     */
+    static final class IndexingWithContext extends IgniteH2Indexing {
+        /** Client context. */
+        static SqlClientContext cliCtx;
+
+        /** {@inheritDoc} */
+        @Override public List<Long> streamBatchedUpdateQuery(String schemaName, String qry, List<Object[]> params,
+            SqlClientContext cliCtx) throws IgniteCheckedException {
+            IndexingWithContext.cliCtx = cliCtx;
+
+            return super.streamBatchedUpdateQuery(schemaName, qry, params, cliCtx);
+        }
+
+        /** {@inheritDoc} */
+        @Override public List<FieldsQueryCursor<List<?>>> querySqlFields(String schemaName, SqlFieldsQuery qry,
+            @Nullable SqlClientContext cliCtx, boolean keepBinary, boolean failOnMultipleStmts,
+            GridQueryCancel cancel) {
+            IndexingWithContext.cliCtx = cliCtx;
+
+            return super.querySqlFields(schemaName, qry, cliCtx, keepBinary, failOnMultipleStmts, cancel);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/d05c28ca/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingNotOrderedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingNotOrderedSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingNotOrderedSelfTest.java
new file mode 100644
index 0000000..b91258f
--- /dev/null
+++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingNotOrderedSelfTest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.jdbc.thin;
+
+import java.sql.Connection;
+
+/**
+ * Tests for not ordered streaming via thin driver.
+ */
+public class JdbcThinStreamingNotOrderedSelfTest extends JdbcThinStreamingAbstractSelfTest {
+    /** {@inheritDoc} */
+    @Override protected Connection createStreamedConnection(boolean allowOverwrite, long flushFreq) throws Exception {
+        Connection c = JdbcThinAbstractSelfTest.connect(grid(0), null);
+
+        execute(c, "SET STREAMING 1 BATCH_SIZE " + batchSize
+            + " ALLOW_OVERWRITE " + (allowOverwrite ? 1 : 0)
+            + " PER_NODE_BUFFER_SIZE 1000 "
+            + " FLUSH_FREQUENCY " + flushFreq + ";"
+        );
+
+        return c;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/d05c28ca/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingOrderedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingOrderedSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingOrderedSelfTest.java
new file mode 100644
index 0000000..b615f8c
--- /dev/null
+++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingOrderedSelfTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.jdbc.thin;
+
+import java.sql.Connection;
+
+/**
+ * Tests for ordered streaming via thin driver.
+ */
+public class JdbcThinStreamingOrderedSelfTest extends JdbcThinStreamingAbstractSelfTest {
+    /** {@inheritDoc} */
+    @Override protected Connection createStreamedConnection(boolean allowOverwrite, long flushFreq) throws Exception {
+        Connection c = JdbcThinAbstractSelfTest.connect(grid(0), null);
+
+        execute(c, "SET STREAMING 1 BATCH_SIZE " + batchSize
+            + " ALLOW_OVERWRITE " + (allowOverwrite ? 1 : 0)
+            + " PER_NODE_BUFFER_SIZE 1000 "
+            + " FLUSH_FREQUENCY " + flushFreq
+            + " ORDERED;"
+        );
+
+        return c;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/d05c28ca/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingSelfTest.java
deleted file mode 100644
index 3c36f54..0000000
--- a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingSelfTest.java
+++ /dev/null
@@ -1,486 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.jdbc.thin;
-
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.concurrent.Callable;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteDataStreamer;
-import org.apache.ignite.cache.CachePeekMode;
-import org.apache.ignite.cache.query.FieldsQueryCursor;
-import org.apache.ignite.cache.query.SqlFieldsQuery;
-import org.apache.ignite.internal.jdbc2.JdbcStreamingSelfTest;
-import org.apache.ignite.internal.processors.query.GridQueryCancel;
-import org.apache.ignite.internal.processors.query.GridQueryProcessor;
-import org.apache.ignite.internal.processors.query.SqlClientContext;
-import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.testframework.GridTestUtils;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Tests for streaming via thin driver.
- */
-public class JdbcThinStreamingSelfTest extends JdbcStreamingSelfTest {
-    /** */
-    private int batchSize = 17;
-
-    /** {@inheritDoc} */
-    @Override protected void beforeTestsStarted() throws Exception {
-        GridQueryProcessor.idxCls = IndexingWithContext.class;
-
-        super.beforeTestsStarted();
-
-        batchSize = 17;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void afterTest() throws Exception {
-        try (Connection c = createOrdinaryConnection()) {
-            execute(c, "DROP TABLE PUBLIC.T IF EXISTS");
-        }
-
-        IndexingWithContext.cliCtx = null;
-
-        super.afterTest();
-    }
-
-    /** {@inheritDoc} */
-    @Override protected Connection createStreamedConnection(boolean allowOverwrite, long flushFreq) throws Exception {
-        Connection c = JdbcThinAbstractSelfTest.connect(grid(0), null       );
-
-        execute(c, "SET STREAMING 1 BATCH_SIZE " + batchSize + " ALLOW_OVERWRITE " + (allowOverwrite ? 1 : 0) +
-            " PER_NODE_BUFFER_SIZE 1000 FLUSH_FREQUENCY " + flushFreq);
-
-        return c;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected Connection createOrdinaryConnection() throws SQLException {
-        return JdbcThinAbstractSelfTest.connect(grid(0), null);
-    }
-
-    /**
-     * @throws Exception if failed.
-     */
-    public void testStreamedBatchedInsert() throws Exception {
-        for (int i = 10; i <= 100; i += 10)
-            put(i, nameForId(i * 100));
-
-        try (Connection conn = createStreamedConnection(false)) {
-            assertStreamingState(true);
-
-            try (PreparedStatement stmt = conn.prepareStatement("insert into Person(\"id\", \"name\") values (?, ?), " +
-                "(?, ?)")) {
-                for (int i = 1; i <= 100; i+= 2) {
-                    stmt.setInt(1, i);
-                    stmt.setString(2, nameForId(i));
-                    stmt.setInt(3, i + 1);
-                    stmt.setString(4, nameForId(i + 1));
-
-                    stmt.addBatch();
-                }
-
-                stmt.executeBatch();
-            }
-        }
-
-        U.sleep(500);
-
-        // Now let's check it's all there.
-        for (int i = 1; i <= 100; i++) {
-            if (i % 10 != 0)
-                assertEquals(nameForId(i), nameForIdInCache(i));
-            else // All that divides by 10 evenly should point to numbers 100 times greater - see above
-                assertEquals(nameForId(i * 100), nameForIdInCache(i));
-        }
-    }
-
-    /**
-     * @throws SQLException if failed.
-     */
-    public void testSimultaneousStreaming() throws Exception {
-        try (Connection anotherConn = createOrdinaryConnection()) {
-            execute(anotherConn, "CREATE TABLE PUBLIC.T(x int primary key, y int) WITH " +
-                "\"cache_name=T,wrap_value=false\"");
-        }
-
-        // Timeout to let connection close be handled on server side.
-        U.sleep(500);
-
-        try (Connection conn = createStreamedConnection(false, 10000)) {
-            assertStreamingState(true);
-
-            PreparedStatement firstStmt = conn.prepareStatement("insert into Person(\"id\", \"name\") values (?, ?)");
-
-            PreparedStatement secondStmt = conn.prepareStatement("insert into PUBLIC.T(x, y) values (?, ?)");
-
-            try {
-                for (int i = 1; i <= 10; i++) {
-                    firstStmt.setInt(1, i);
-                    firstStmt.setString(2, nameForId(i));
-
-                    firstStmt.executeUpdate();
-                }
-
-                for (int i = 51; i <= 67; i++) {
-                    secondStmt.setInt(1, i);
-                    secondStmt.setInt(2, i);
-
-                    secondStmt.executeUpdate();
-                }
-
-                for (int i = 11; i <= 50; i++) {
-                    firstStmt.setInt(1, i);
-                    firstStmt.setString(2, nameForId(i));
-
-                    firstStmt.executeUpdate();
-                }
-
-                for (int i = 68; i <= 100; i++) {
-                    secondStmt.setInt(1, i);
-                    secondStmt.setInt(2, i);
-
-                    secondStmt.executeUpdate();
-                }
-
-                assertCacheEmpty();
-
-                SqlClientContext cliCtx = sqlClientContext();
-
-                HashMap<String, IgniteDataStreamer<?, ?>> streamers = U.field(cliCtx, "streamers");
-
-                assertEquals(2, streamers.size());
-
-                assertEqualsCollections(new HashSet<>(Arrays.asList("person", "T")), streamers.keySet());
-            }
-            finally {
-                U.closeQuiet(firstStmt);
-
-                U.closeQuiet(secondStmt);
-            }
-        }
-
-        // Let's wait a little so that all data arrives to destination - we can't intercept streamers' flush
-        // on connection close in any way.
-        U.sleep(1000);
-
-        // Now let's check it's all there.
-        for (int i = 1; i <= 50; i++)
-            assertEquals(nameForId(i), nameForIdInCache(i));
-
-        for (int i = 51; i <= 100; i++)
-            assertEquals(i, grid(0).cache("T").get(i));
-    }
-
-    /**
-     *
-     */
-    public void testStreamingWithMixedStatementTypes() throws Exception {
-        String prepStmtStr = "insert into Person(\"id\", \"name\") values (?, ?)";
-
-        String stmtStr = "insert into Person(\"id\", \"name\") values (%d, '%s')";
-
-        try (Connection conn = createStreamedConnection(false, 10000)) {
-            assertStreamingState(true);
-
-            PreparedStatement firstStmt = conn.prepareStatement(prepStmtStr);
-
-            Statement secondStmt = conn.createStatement();
-
-            try {
-                for (int i = 1; i <= 100; i++) {
-                    boolean usePrep = Math.random() > 0.5;
-
-                    boolean useBatch = Math.random() > 0.5;
-
-                    if (usePrep) {
-                        firstStmt.setInt(1, i);
-                        firstStmt.setString(2, nameForId(i));
-
-                        if (useBatch)
-                            firstStmt.addBatch();
-                        else
-                            firstStmt.execute();
-                    }
-                    else {
-                        String sql = String.format(stmtStr, i, nameForId(i));
-
-                        if (useBatch)
-                            secondStmt.addBatch(sql);
-                        else
-                            secondStmt.execute(sql);
-                    }
-                }
-            }
-            finally {
-                U.closeQuiet(firstStmt);
-
-                U.closeQuiet(secondStmt);
-            }
-        }
-
-        // Let's wait a little so that all data arrives to destination - we can't intercept streamers' flush
-        // on connection close in any way.
-        U.sleep(1000);
-
-        // Now let's check it's all there.
-        for (int i = 1; i <= 100; i++)
-            assertEquals(nameForId(i), nameForIdInCache(i));
-    }
-
-    /**
-     * @throws SQLException if failed.
-     */
-    public void testStreamingOffToOn() throws SQLException {
-        try (Connection conn = createOrdinaryConnection()) {
-            assertStreamingState(false);
-
-            execute(conn, "SET STREAMING 1");
-
-            assertStreamingState(true);
-        }
-    }
-
-    /**
-     * @throws SQLException if failed.
-     */
-    public void testStreamingOnToOff() throws Exception {
-        try (Connection conn = createStreamedConnection(false)) {
-            assertStreamingState(true);
-
-            execute(conn, "SET STREAMING off");
-
-            assertStreamingState(false);
-        }
-    }
-
-    /**
-     * @throws SQLException if failed.
-     */
-    public void testFlush() throws Exception {
-        try (Connection conn = createStreamedConnection(false, 10000)) {
-            assertStreamingState(true);
-
-            try (PreparedStatement stmt = conn.prepareStatement("insert into Person(\"id\", \"name\") values (?, ?)")) {
-                for (int i = 1; i <= 100; i++) {
-                    stmt.setInt(1, i);
-                    stmt.setString(2, nameForId(i));
-
-                    stmt.executeUpdate();
-                }
-            }
-
-            assertCacheEmpty();
-
-            execute(conn, "set streaming 0");
-
-            assertStreamingState(false);
-
-            U.sleep(500);
-
-            // Now let's check it's all there.
-            for (int i = 1; i <= 100; i++)
-                assertEquals(nameForId(i), nameForIdInCache(i));
-        }
-    }
-
-    /**
-     * @throws SQLException if failed.
-     */
-    public void testStreamingReEnabled() throws Exception {
-        try (Connection conn = createStreamedConnection(false, 10000)) {
-            assertStreamingState(true);
-
-            try (PreparedStatement stmt = conn.prepareStatement("insert into Person(\"id\", \"name\") values (?, ?)")) {
-                for (int i = 1; i <= 100; i++) {
-                    stmt.setInt(1, i);
-                    stmt.setString(2, nameForId(i));
-
-                    stmt.executeUpdate();
-                }
-            }
-
-            assertCacheEmpty();
-
-            execute(conn, "set streaming 1 batch_size 111 allow_overwrite 0 per_node_buffer_size 512 " +
-                "per_node_parallel_operations 4 flush_frequency 5000");
-
-            U.sleep(500);
-
-            assertEquals((Integer)111, U.field(conn, "streamBatchSize"));
-
-            SqlClientContext cliCtx = sqlClientContext();
-
-            assertTrue(cliCtx.isStream());
-
-            assertFalse(U.field(cliCtx, "streamAllowOverwrite"));
-
-            assertEquals((Integer)512, U.field(cliCtx, "streamNodeBufSize"));
-
-            assertEquals((Long)5000L, U.field(cliCtx, "streamFlushTimeout"));
-
-            assertEquals((Integer)4, U.field(cliCtx, "streamNodeParOps"));
-
-            // Now let's check it's all there - SET STREAMING 1 repeated call must also have caused flush.
-            for (int i = 1; i <= 100; i++)
-                assertEquals(nameForId(i), nameForIdInCache(i));
-        }
-    }
-
-    /**
-     *
-     */
-    @SuppressWarnings("ThrowableNotThrown")
-    public void testNonStreamedBatch() {
-        GridTestUtils.assertThrows(null, new Callable<Object>() {
-            @Override public Object call() throws Exception {
-                try (Connection conn = createOrdinaryConnection()) {
-                    try (Statement s = conn.createStatement()) {
-                        for (int i = 1; i <= 10; i++)
-                            s.addBatch(String.format("insert into Person(\"id\", \"name\")values (%d, '%s')", i,
-                                nameForId(i)));
-
-                        execute(conn, "SET STREAMING 1");
-
-                        s.addBatch(String.format("insert into Person(\"id\", \"name\")values (%d, '%s')", 11,
-                            nameForId(11)));
-                    }
-                }
-
-                return null;
-            }
-        }, SQLException.class, "Statement has non-empty batch (call executeBatch() or clearBatch() before " +
-            "enabling streaming).");
-    }
-
-    /**
-     *
-     */
-    @SuppressWarnings("ThrowableNotThrown")
-    public void testStreamingStatementInTheMiddleOfNonPreparedBatch() {
-        GridTestUtils.assertThrows(null, new Callable<Object>() {
-            @Override public Object call() throws Exception {
-                try (Connection conn = createOrdinaryConnection()) {
-                    try (Statement s = conn.createStatement()) {
-                        s.addBatch(String.format("insert into Person(\"id\", \"name\")values (%d, '%s')", 1,
-                            nameForId(1)));
-
-                        s.addBatch("SET STREAMING 1 FLUSH_FREQUENCY 10000");
-                    }
-                }
-
-                return null;
-            }
-        }, SQLException.class, "Streaming control commands must be executed explicitly");
-    }
-
-    /**
-     *
-     */
-    @SuppressWarnings("ThrowableNotThrown")
-    public void testBatchingSetStreamingStatement() {
-        GridTestUtils.assertThrows(null, new Callable<Object>() {
-            @Override public Object call() throws Exception {
-                try (Connection conn = createOrdinaryConnection()) {
-                    try (PreparedStatement s = conn.prepareStatement("SET STREAMING 1 FLUSH_FREQUENCY 10000")) {
-                        s.addBatch();
-                    }
-                }
-
-                return null;
-            }
-        }, SQLException.class, "Streaming control commands must be executed explicitly");
-    }
-
-    /**
-     * Check that there's nothing in cache.
-     */
-    private void assertCacheEmpty() {
-        assertEquals(0, cache().size(CachePeekMode.ALL));
-    }
-
-    /**
-     * @param conn Connection.
-     * @param sql Statement.
-     * @throws SQLException if failed.
-     */
-    private static void execute(Connection conn, String sql) throws SQLException {
-        try (Statement s = conn.createStatement()) {
-            s.execute(sql);
-        }
-    }
-
-    /**
-     * @return Active SQL client context.
-     */
-    private SqlClientContext sqlClientContext() {
-        assertNotNull(IndexingWithContext.cliCtx);
-
-        return IndexingWithContext.cliCtx;
-    }
-
-    /**
-     * Check that streaming state on target node is as expected.
-     * @param on Expected streaming state.
-     */
-    private void assertStreamingState(boolean on) {
-        SqlClientContext cliCtx = sqlClientContext();
-
-        assertEquals(on, cliCtx.isStream());
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void assertStatementForbidden(String sql) {
-        batchSize = 1;
-
-        super.assertStatementForbidden(sql);
-    }
-
-    /**
-     *
-     */
-    private static final class IndexingWithContext extends IgniteH2Indexing {
-        /** Client context. */
-        static SqlClientContext cliCtx;
-
-        /** {@inheritDoc} */
-        @Override public List<Long> streamBatchedUpdateQuery(String schemaName, String qry, List<Object[]> params,
-            SqlClientContext cliCtx) throws IgniteCheckedException {
-            IndexingWithContext.cliCtx = cliCtx;
-
-            return super.streamBatchedUpdateQuery(schemaName, qry, params, cliCtx);
-        }
-
-        /** {@inheritDoc} */
-        @Override public List<FieldsQueryCursor<List<?>>> querySqlFields(String schemaName, SqlFieldsQuery qry,
-            @Nullable SqlClientContext cliCtx, boolean keepBinary, boolean failOnMultipleStmts,
-            GridQueryCancel cancel) {
-            IndexingWithContext.cliCtx = cliCtx;
-
-            return super.querySqlFields(schemaName, qry, cliCtx, keepBinary, failOnMultipleStmts, cancel);
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/d05c28ca/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java
index 3478124..634579b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java
@@ -39,13 +39,15 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.Executor;
+import java.util.concurrent.Semaphore;
 import java.util.logging.Level;
 import java.util.logging.Logger;
+import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
 import org.apache.ignite.internal.processors.odbc.ClientListenerResponse;
 import org.apache.ignite.internal.processors.odbc.SqlStateCode;
-import org.apache.ignite.internal.processors.odbc.jdbc.JdbcBatchExecuteRequest;
-import org.apache.ignite.internal.processors.odbc.jdbc.JdbcBatchExecuteResult;
+import org.apache.ignite.internal.processors.odbc.jdbc.JdbcOrderedBatchExecuteRequest;
+import org.apache.ignite.internal.processors.odbc.jdbc.JdbcOrderedBatchExecuteResult;
 import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQuery;
 import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQueryExecuteRequest;
 import org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest;
@@ -55,6 +57,7 @@ import org.apache.ignite.internal.processors.odbc.jdbc.JdbcStatementType;
 import org.apache.ignite.internal.processors.query.QueryUtils;
 import org.apache.ignite.internal.sql.command.SqlCommand;
 import org.apache.ignite.internal.sql.command.SqlSetStreamingCommand;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.lang.IgniteProductVersion;
 
@@ -91,7 +94,7 @@ public class JdbcThinConnection implements Connection {
     private boolean readOnly;
 
     /** Streaming flag. */
-    private volatile boolean stream;
+    private volatile StreamState streamState;
 
     /** Current transaction holdability. */
     private int holdability;
@@ -108,15 +111,6 @@ public class JdbcThinConnection implements Connection {
     /** Connection properties. */
     private ConnectionProperties connProps;
 
-    /** Batch size for streaming. */
-    private int streamBatchSize;
-
-    /** Batch for streaming. */
-    private List<JdbcQuery> streamBatch;
-
-    /** Last added query to recognize batches. */
-    private String lastStreamQry;
-
     /** Connected. */
     private boolean connected;
 
@@ -172,7 +166,7 @@ public class JdbcThinConnection implements Connection {
      * @return Whether this connection is streamed or not.
      */
     boolean isStream() {
-        return stream;
+        return streamState != null;
     }
 
     /**
@@ -182,24 +176,28 @@ public class JdbcThinConnection implements Connection {
      */
     void executeNative(String sql, SqlCommand cmd) throws SQLException {
         if (cmd instanceof SqlSetStreamingCommand) {
-            // If streaming is already on, we have to disable it first.
-            if (stream) {
-                // We have to send request regardless of actual batch size.
-                executeBatch(true);
+            SqlSetStreamingCommand cmd0 = (SqlSetStreamingCommand)cmd;
+
+            // If streaming is already on, we have to close it first.
+            if (streamState != null) {
+                streamState.close();
 
-                stream = false;
+                streamState = null;
             }
 
             boolean newVal = ((SqlSetStreamingCommand)cmd).isTurnOn();
 
             // Actual ON, if needed.
             if (newVal) {
+                if (!cmd0.isOrdered()  && !cliIo.igniteVersion().greaterThanEqual(2, 5, 0)) {
+                    throw new SQLException("Streaming without order doesn't supported by server [remoteNodeVer="
+                        + cliIo.igniteVersion() + ']', SqlStateCode.INTERNAL_ERROR);
+                }
+
                 sendRequest(new JdbcQueryExecuteRequest(JdbcStatementType.ANY_STATEMENT_TYPE,
                     schema, 1, 1, sql, null));
 
-                streamBatchSize = ((SqlSetStreamingCommand)cmd).batchSize();
-
-                stream = true;
+                streamState = new StreamState((SqlSetStreamingCommand)cmd);
             }
         }
         else
@@ -214,39 +212,9 @@ public class JdbcThinConnection implements Connection {
      * @throws SQLException On error.
      */
     void addBatch(String sql, List<Object> args) throws SQLException {
-        boolean newQry = (args == null || !F.eq(lastStreamQry, sql));
-
-        // Providing null as SQL here allows for recognizing subbatches on server and handling them more efficiently.
-        JdbcQuery q  = new JdbcQuery(newQry ? sql : null, args != null ? args.toArray() : null);
-
-        if (streamBatch == null)
-            streamBatch = new ArrayList<>(streamBatchSize);
-
-        streamBatch.add(q);
-
-        // Null args means "addBatch(String)" was called on non-prepared Statement,
-        // we don't want to remember its query string.
-        lastStreamQry = (args != null ? sql : null);
-
-        if (streamBatch.size() == streamBatchSize)
-            executeBatch(false);
-    }
-
-    /**
-     * @param lastBatch Whether open data streamers must be flushed and closed after this batch.
-     * @throws SQLException if failed.
-     */
-    private void executeBatch(boolean lastBatch) throws SQLException {
-        JdbcBatchExecuteResult res = sendRequest(new JdbcBatchExecuteRequest(schema, streamBatch, lastBatch));
-
-        streamBatch = null;
+        assert isStream();
 
-        lastStreamQry = null;
-
-        if (res.errorCode() != ClientListenerResponse.STATUS_SUCCESS) {
-            throw new BatchUpdateException(res.errorMessage(), IgniteQueryErrorCode.codeToSqlState(res.errorCode()),
-                res.errorCode(), res.updateCounts());
-        }
+        streamState.addBatch(sql, args);
     }
 
     /** {@inheritDoc} */
@@ -399,13 +367,10 @@ public class JdbcThinConnection implements Connection {
         if (isClosed())
             return;
 
-        if (!F.isEmpty(streamBatch)) {
-            try {
-                executeBatch(true);
-            }
-            catch (SQLException e) {
-                LOG.log(Level.WARNING, "Exception during batch send on streamed connection close", e);
-            }
+        if (streamState != null) {
+            streamState.close();
+
+            streamState = null;
         }
 
         closed = true;
@@ -798,6 +763,28 @@ public class JdbcThinConnection implements Connection {
     }
 
     /**
+     * Send request for execution via {@link #cliIo}. Response is waited at the separate thread
+     *     (see {@link StreamState#asyncRespReaderThread}).
+     * @param req Request.
+     * @throws SQLException On any error.
+     */
+    private void sendRequestNotWaitResponse(JdbcOrderedBatchExecuteRequest req) throws SQLException {
+        ensureConnected();
+
+        try {
+            cliIo.sendBatchRequestNoWaitResponse(req);
+        }
+        catch (SQLException e) {
+            throw e;
+        }
+        catch (Exception e) {
+            onDisconnect();
+
+            throw new SQLException("Failed to communicate with Ignite cluster.", SqlStateCode.CONNECTION_FAILURE, e);
+        }
+    }
+
+    /**
      * @return Connection URL.
      */
     public String url() {
@@ -815,9 +802,11 @@ public class JdbcThinConnection implements Connection {
 
         connected = false;
 
-        streamBatch = null;
+        if (streamState != null) {
+            streamState.close0();
 
-        lastStreamQry = null;
+            streamState = null;
+        }
 
         synchronized (stmtsMux) {
             for (JdbcThinStatement s : stmts)
@@ -846,4 +835,203 @@ public class JdbcThinConnection implements Connection {
 
         return res;
     }
+
+    /**
+     * Streamer state and
+     */
+    private class StreamState {
+        /** Maximum requests count that may be sent before any responses. */
+        private static final int MAX_REQUESTS_BEFORE_RESPONSE = 10;
+
+        /** Wait timeout. */
+        private static final long WAIT_TIMEOUT = 1;
+
+        /** Batch size for streaming. */
+        private int streamBatchSize;
+
+        /** Batch for streaming. */
+        private List<JdbcQuery> streamBatch;
+
+        /** Last added query to recognize batches. */
+        private String lastStreamQry;
+
+        /** Keep request order on execution. */
+        private long order;
+
+        /** Async response reader thread. */
+        private Thread asyncRespReaderThread;
+
+        /** Async response error. */
+        private volatile Exception err;
+
+        /** The order of the last batch request at the stream. */
+        private long lastRespOrder = -1;
+
+        /** Last response future. */
+        private final GridFutureAdapter<Void> lastRespFut = new GridFutureAdapter<>();
+
+        /** Response semaphore sem. */
+        private Semaphore respSem = new Semaphore(MAX_REQUESTS_BEFORE_RESPONSE);
+
+        /**
+         * @param cmd Stream cmd.
+         */
+        StreamState(SqlSetStreamingCommand cmd) {
+            streamBatchSize = cmd.batchSize();
+
+            asyncRespReaderThread = new Thread(this::readResponses);
+
+            asyncRespReaderThread.start();
+        }
+
+        /**
+         * Add another query for batched execution.
+         * @param sql Query.
+         * @param args Arguments.
+         * @throws SQLException On error.
+         */
+        void addBatch(String sql, List<Object> args) throws SQLException {
+            checkError();
+
+            boolean newQry = (args == null || !F.eq(lastStreamQry, sql));
+
+            // Providing null as SQL here allows for recognizing subbatches on server and handling them more efficiently.
+            JdbcQuery q  = new JdbcQuery(newQry ? sql : null, args != null ? args.toArray() : null);
+
+            if (streamBatch == null)
+                streamBatch = new ArrayList<>(streamBatchSize);
+
+            streamBatch.add(q);
+
+            // Null args means "addBatch(String)" was called on non-prepared Statement,
+            // we don't want to remember its query string.
+            lastStreamQry = (args != null ? sql : null);
+
+            if (streamBatch.size() == streamBatchSize)
+                executeBatch(false);
+        }
+
+        /**
+         * @param lastBatch Whether open data streamers must be flushed and closed after this batch.
+         * @throws SQLException if failed.
+         */
+        private void executeBatch(boolean lastBatch) throws SQLException {
+            checkError();
+
+            if (lastBatch)
+                lastRespOrder = order;
+
+            try {
+                respSem.acquire();
+
+                sendRequestNotWaitResponse(
+                    new JdbcOrderedBatchExecuteRequest(schema, streamBatch, lastBatch, order));
+
+                streamBatch = null;
+
+                lastStreamQry = null;
+
+                if (lastBatch) {
+                    try {
+                        lastRespFut.get();
+                    }
+                    catch (IgniteCheckedException e) {
+                        // No-op.
+                        // No exceptions are expected here.
+                    }
+
+                    checkError();
+                }
+                else
+                    order++;
+            }
+            catch (InterruptedException e) {
+                throw new SQLException("Streaming operation was interrupted", SqlStateCode.INTERNAL_ERROR, e);
+            }
+        }
+
+        /**
+         * Throws at the user thread exception that was thrown at the {@link #asyncRespReaderThread} thread.
+         * @throws SQLException Saved exception.
+         */
+        void checkError() throws SQLException {
+            if (err != null) {
+                Exception err0 = err;
+
+                err = null;
+
+                if (err0 instanceof SQLException)
+                    throw (SQLException)err0;
+                else {
+                    onDisconnect();
+
+                    throw new SQLException("Failed to communicate with Ignite cluster on JDBC streaming.",
+                        SqlStateCode.CONNECTION_FAILURE, err0);
+                }
+            }
+        }
+
+        /**
+         * @throws SQLException On error.
+         */
+        void close() throws SQLException {
+            close0();
+
+            checkError();
+        }
+
+        /**
+         */
+        void close0() {
+            if (connected) {
+                try {
+                    executeBatch(true);
+                }
+                catch (SQLException e) {
+                    err = e;
+
+                    LOG.log(Level.WARNING, "Exception during batch send on streamed connection close", e);
+                }
+            }
+
+            if (asyncRespReaderThread != null)
+                asyncRespReaderThread.interrupt();
+        }
+
+        /**
+         *
+         */
+        void readResponses () {
+            try {
+                while (true) {
+                    JdbcResponse resp = cliIo.readResponse();
+
+                    if (resp.response() instanceof JdbcOrderedBatchExecuteResult) {
+                        JdbcOrderedBatchExecuteResult res = (JdbcOrderedBatchExecuteResult)resp.response();
+
+                        respSem.release();
+
+                        if (res.errorCode() != ClientListenerResponse.STATUS_SUCCESS) {
+                            err = new BatchUpdateException(res.errorMessage(),
+                                IgniteQueryErrorCode.codeToSqlState(res.errorCode()),
+                                res.errorCode(), res.updateCounts());
+                        }
+
+                        // Receive the response for the last request.
+                        if (res.order() == lastRespOrder) {
+                            lastRespFut.onDone();
+
+                            break;
+                        }
+                    }
+
+                    if (resp.status() != ClientListenerResponse.STATUS_SUCCESS)
+                        err = new SQLException(resp.error(), IgniteQueryErrorCode.codeToSqlState(resp.status()));
+                }
+            }
+            catch (Exception e) {
+                err = e;
+            }
+        }
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/d05c28ca/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java
index 4631e5d..44c1984 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java
@@ -38,6 +38,7 @@ import org.apache.ignite.internal.processors.odbc.ClientListenerProtocolVersion;
 import org.apache.ignite.internal.processors.odbc.ClientListenerRequest;
 import org.apache.ignite.internal.processors.odbc.SqlStateCode;
 import org.apache.ignite.internal.processors.odbc.jdbc.JdbcBatchExecuteRequest;
+import org.apache.ignite.internal.processors.odbc.jdbc.JdbcOrderedBatchExecuteRequest;
 import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQuery;
 import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQueryCloseRequest;
 import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQueryFetchRequest;
@@ -419,6 +420,44 @@ public class JdbcThinTcpIo {
 
     /**
      * @param req Request.
+     * @throws IOException In case of IO error.
+     * @throws SQLException On error.
+     */
+    void sendBatchRequestNoWaitResponse(JdbcOrderedBatchExecuteRequest req) throws IOException, SQLException {
+        synchronized (mux) {
+            if (ownThread != null) {
+                throw new SQLException("Concurrent access to JDBC connection is not allowed"
+                    + " [ownThread=" + ownThread.getName()
+                    + ", curThread=" + Thread.currentThread().getName(), SqlStateCode.CONNECTION_FAILURE);
+            }
+
+            ownThread = Thread.currentThread();
+        }
+
+        try {
+            if (!igniteVer.greaterThanEqual(2, 5, 0)) {
+                throw new SQLException("Streaming without response doesn't supported by server [driverProtocolVer="
+                    + CURRENT_VER + ", remoteNodeVer=" + igniteVer + ']', SqlStateCode.INTERNAL_ERROR);
+            }
+
+            int cap = guessCapacity(req);
+
+            BinaryWriterExImpl writer = new BinaryWriterExImpl(null, new BinaryHeapOutputStream(cap),
+                null, null);
+
+            req.writeBinary(writer);
+
+            send(writer.array());
+        }
+        finally {
+            synchronized (mux) {
+                ownThread = null;
+            }
+        }
+    }
+
+    /**
+     * @param req Request.
      * @return Server response.
      * @throws IOException In case of IO error.
      * @throws SQLException On concurrent access to JDBC connection.
@@ -444,13 +483,7 @@ public class JdbcThinTcpIo {
 
             send(writer.array());
 
-            BinaryReaderExImpl reader = new BinaryReaderExImpl(null, new BinaryHeapInputStream(read()), null, null, false);
-
-            JdbcResponse res = new JdbcResponse();
-
-            res.readBinary(reader);
-
-            return res;
+            return readResponse();
         }
         finally {
             synchronized (mux) {
@@ -460,6 +493,22 @@ public class JdbcThinTcpIo {
     }
 
     /**
+     * @return Server response.
+     * @throws IOException In case of IO error.
+     */
+    @SuppressWarnings("unchecked")
+    JdbcResponse readResponse() throws IOException {
+        BinaryReaderExImpl reader = new BinaryReaderExImpl(null, new BinaryHeapInputStream(read()), null, null, false);
+
+        JdbcResponse res = new JdbcResponse();
+
+        res.readBinary(reader);
+
+        return res;
+    }
+
+
+    /**
      * Try to guess request capacity.
      *
      * @param req Request.

http://git-wip-us.apache.org/repos/asf/ignite/blob/d05c28ca/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioListener.java
index 407c1a0..be55ab9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioListener.java
@@ -159,16 +159,18 @@ public class ClientListenerNioListener extends GridNioServerListenerAdapter<byte
 
             ClientListenerResponse resp = handler.handle(req);
 
-            if (log.isDebugEnabled()) {
-                long dur = (System.nanoTime() - startTime) / 1000;
+            if (resp != null) {
+                if (log.isDebugEnabled()) {
+                    long dur = (System.nanoTime() - startTime) / 1000;
 
-                log.debug("Client request processed [reqId=" + req.requestId() + ", dur(mcs)=" + dur  +
-                    ", resp=" + resp.status() + ']');
-            }
+                    log.debug("Client request processed [reqId=" + req.requestId() + ", dur(mcs)=" + dur +
+                        ", resp=" + resp.status() + ']');
+                }
 
-            byte[] outMsg = parser.encode(resp);
+                byte[] outMsg = parser.encode(resp);
 
-            ses.send(outMsg);
+                ses.send(outMsg);
+            }
         }
         catch (Exception e) {
             U.error(log, "Failed to process client request [req=" + req + ']', e);
@@ -216,7 +218,7 @@ public class ClientListenerNioListener extends GridNioServerListenerAdapter<byte
         ClientListenerConnectionContext connCtx = null;
 
         try {
-            connCtx = prepareContext(clientType);
+            connCtx = prepareContext(ses, clientType);
 
             ensureClientPermissions(clientType);
 
@@ -270,17 +272,18 @@ public class ClientListenerNioListener extends GridNioServerListenerAdapter<byte
     /**
      * Prepare context.
      *
+     * @param ses Session.
      * @param clientType Client type.
      * @return Context.
      * @throws IgniteCheckedException If failed.
      */
-    private ClientListenerConnectionContext prepareContext(byte clientType) throws IgniteCheckedException {
+    private ClientListenerConnectionContext prepareContext(GridNioSession ses, byte clientType) throws IgniteCheckedException {
         switch (clientType) {
             case ODBC_CLIENT:
                 return new OdbcConnectionContext(ctx, busyLock, maxCursors);
 
             case JDBC_CLIENT:
-                return new JdbcConnectionContext(ctx, busyLock, maxCursors);
+                return new JdbcConnectionContext(ctx, ses, busyLock, maxCursors);
 
             case THIN_CLIENT:
                 return new ClientConnectionContext(ctx, maxCursors);

http://git-wip-us.apache.org/repos/asf/ignite/blob/d05c28ca/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBatchExecuteRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBatchExecuteRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBatchExecuteRequest.java
index 73fd04f..bdc558c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBatchExecuteRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBatchExecuteRequest.java
@@ -53,8 +53,17 @@ public class JdbcBatchExecuteRequest extends JdbcRequest {
     }
 
     /**
+     * Constructor for child requests.
+     * @param type Request type/
+     */
+    protected JdbcBatchExecuteRequest(byte type) {
+        super(type);
+    }
+
+    /**
      * @param schemaName Schema name.
      * @param queries Queries.
+     * @param lastStreamBatch {@code true} in case the request is the last batch at the stream.
      */
     public JdbcBatchExecuteRequest(String schemaName, List<JdbcQuery> queries, boolean lastStreamBatch) {
         super(BATCH_EXEC);
@@ -67,6 +76,24 @@ public class JdbcBatchExecuteRequest extends JdbcRequest {
     }
 
     /**
+     * Constructor for child requests.
+     *
+     * @param type Request type.
+     * @param schemaName Schema name.
+     * @param queries Queries.
+     * @param lastStreamBatch {@code true} in case the request is the last batch at the stream.
+     */
+    protected JdbcBatchExecuteRequest(byte type, String schemaName, List<JdbcQuery> queries, boolean lastStreamBatch) {
+        super(type);
+
+        assert lastStreamBatch || !F.isEmpty(queries);
+
+        this.schemaName = schemaName;
+        this.queries = queries;
+        this.lastStreamBatch = lastStreamBatch;
+    }
+
+    /**
      * @return Schema name.
      */
     @Nullable public String schemaName() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/d05c28ca/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBatchExecuteResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBatchExecuteResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBatchExecuteResult.java
index 917e60a..3fc9dd7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBatchExecuteResult.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBatchExecuteResult.java
@@ -36,18 +36,26 @@ public class JdbcBatchExecuteResult extends JdbcResult {
     private String errMsg;
 
     /**
-     * Condtructor.
+     * Constructor.
      */
-    public JdbcBatchExecuteResult() {
+    JdbcBatchExecuteResult() {
         super(BATCH_EXEC);
     }
 
     /**
+     * Constructor for child results.
+     * @param type Result type.
+     */
+    JdbcBatchExecuteResult(byte type) {
+        super(type);
+    }
+
+    /**
      * @param updateCnts Update counts for batch.
      * @param errCode Error code.
      * @param errMsg Error message.
      */
-    public JdbcBatchExecuteResult(int [] updateCnts, int errCode, String errMsg) {
+    JdbcBatchExecuteResult(int [] updateCnts, int errCode, String errMsg) {
         super(BATCH_EXEC);
 
         this.updateCnts = updateCnts;
@@ -56,6 +64,18 @@ public class JdbcBatchExecuteResult extends JdbcResult {
     }
 
     /**
+     * @param type Result type.
+     * @param res Result.
+     */
+    JdbcBatchExecuteResult(byte type, JdbcBatchExecuteResult res) {
+        super(type);
+
+        this.updateCnts = res.updateCnts;
+        this.errCode = res.errCode;
+        this.errMsg = res.errMsg;
+    }
+
+    /**
      * @return Update count for DML queries.
      */
     public int[] updateCounts() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/d05c28ca/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java
index 2fe3b9c..272c2f0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java
@@ -17,10 +17,10 @@
 
 package org.apache.ignite.internal.processors.odbc.jdbc;
 
-import java.io.IOException;
 import java.util.HashSet;
 import java.util.Set;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.binary.BinaryReaderExImpl;
 import org.apache.ignite.internal.processors.authentication.AuthorizationContext;
@@ -28,7 +28,9 @@ import org.apache.ignite.internal.processors.odbc.ClientListenerConnectionContex
 import org.apache.ignite.internal.processors.odbc.ClientListenerMessageParser;
 import org.apache.ignite.internal.processors.odbc.ClientListenerProtocolVersion;
 import org.apache.ignite.internal.processors.odbc.ClientListenerRequestHandler;
+import org.apache.ignite.internal.processors.odbc.ClientListenerResponse;
 import org.apache.ignite.internal.util.GridSpinBusyLock;
+import org.apache.ignite.internal.util.nio.GridNioSession;
 import org.apache.ignite.internal.util.typedef.F;
 
 /**
@@ -59,9 +61,15 @@ public class JdbcConnectionContext implements ClientListenerConnectionContext {
     /** Context. */
     private final GridKernalContext ctx;
 
+    /** Session. */
+    private final GridNioSession ses;
+
     /** Shutdown busy lock. */
     private final GridSpinBusyLock busyLock;
 
+    /** Logger. */
+    private final IgniteLogger log;
+
     /** Maximum allowed cursors. */
     private final int maxCursors;
 
@@ -83,13 +91,17 @@ public class JdbcConnectionContext implements ClientListenerConnectionContext {
     /**
      * Constructor.
      * @param ctx Kernal Context.
+     * @param ses Session.
      * @param busyLock Shutdown busy lock.
      * @param maxCursors Maximum allowed cursors.
      */
-    public JdbcConnectionContext(GridKernalContext ctx, GridSpinBusyLock busyLock, int maxCursors) {
+    public JdbcConnectionContext(GridKernalContext ctx, GridNioSession ses, GridSpinBusyLock busyLock, int maxCursors) {
         this.ctx = ctx;
+        this.ses = ses;
         this.busyLock = busyLock;
         this.maxCursors = maxCursors;
+
+        log = ctx.log(getClass());
     }
 
     /** {@inheritDoc} */
@@ -146,11 +158,23 @@ public class JdbcConnectionContext implements ClientListenerConnectionContext {
         catch (Exception e) {
             throw new IgniteCheckedException("Handshake error: " + e.getMessage(), e);
         }
+        parser = new JdbcMessageParser(ctx);
 
-        handler = new JdbcRequestHandler(ctx, busyLock, maxCursors, distributedJoins, enforceJoinOrder,
-            collocated, replicatedOnly, autoCloseCursors, lazyExec, skipReducerOnUpdate, actx, ver);
+        JdbcResponseSender sender = new JdbcResponseSender() {
+            @Override public void send(ClientListenerResponse resp) {
+                if (resp != null) {
+                    if (log.isDebugEnabled())
+                        log.debug("Async response: [resp=" + resp.status() + ']');
 
-        parser = new JdbcMessageParser(ctx);
+                    byte[] outMsg = parser.encode(resp);
+
+                    ses.send(outMsg);
+                }
+            }
+        };
+
+        handler = new JdbcRequestHandler(ctx, busyLock, sender, maxCursors, distributedJoins, enforceJoinOrder,
+            collocated, replicatedOnly, autoCloseCursors, lazyExec, skipReducerOnUpdate, actx, ver);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/d05c28ca/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcOrderedBatchExecuteRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcOrderedBatchExecuteRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcOrderedBatchExecuteRequest.java
new file mode 100644
index 0000000..3e84731
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcOrderedBatchExecuteRequest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.odbc.jdbc;
+
+import java.util.List;
+import org.apache.ignite.binary.BinaryObjectException;
+import org.apache.ignite.internal.binary.BinaryReaderExImpl;
+import org.apache.ignite.internal.binary.BinaryWriterExImpl;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * JDBC batch execute ordered request.
+ */
+public class JdbcOrderedBatchExecuteRequest extends JdbcBatchExecuteRequest
+    implements Comparable<JdbcOrderedBatchExecuteRequest> {
+    /** Order. */
+    private long order;
+
+    /**
+     * Default constructor.
+     */
+    public JdbcOrderedBatchExecuteRequest() {
+        super(BATCH_EXEC_ORDERED);
+    }
+
+    /**
+     * @param schemaName Schema name.
+     * @param queries Queries.
+     * @param lastStreamBatch {@code true} in case the request is the last batch at the stream.
+     * @param order Request order.
+     */
+    public JdbcOrderedBatchExecuteRequest(String schemaName, List<JdbcQuery> queries,
+        boolean lastStreamBatch, long order) {
+        super(BATCH_EXEC_ORDERED, schemaName, queries, lastStreamBatch);
+
+        this.order = order;
+    }
+
+    /**
+     * @return Request order.
+     */
+    public long order() {
+        return order;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeBinary(BinaryWriterExImpl writer) throws BinaryObjectException {
+        super.writeBinary(writer);
+
+        writer.writeLong(order);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readBinary(BinaryReaderExImpl reader) throws BinaryObjectException {
+        super.readBinary(reader);
+
+        order = reader.readLong();
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(JdbcOrderedBatchExecuteRequest.class, this);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int compareTo(@NotNull JdbcOrderedBatchExecuteRequest o) {
+        return Long.compare(order, o.order);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/d05c28ca/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcOrderedBatchExecuteResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcOrderedBatchExecuteResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcOrderedBatchExecuteResult.java
new file mode 100644
index 0000000..84853d4
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcOrderedBatchExecuteResult.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.odbc.jdbc;
+
+import org.apache.ignite.binary.BinaryObjectException;
+import org.apache.ignite.internal.binary.BinaryReaderExImpl;
+import org.apache.ignite.internal.binary.BinaryWriterExImpl;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * JDBC batch execute ordered result.
+ */
+public class JdbcOrderedBatchExecuteResult extends JdbcBatchExecuteResult {
+    /** Order. */
+    private long order;
+
+    /**
+     * Constructor.
+     */
+    public JdbcOrderedBatchExecuteResult() {
+        super(BATCH_EXEC_ORDERED);
+    }
+
+    /**
+     * @param res Result.
+     * @param order Order.
+     */
+    public JdbcOrderedBatchExecuteResult(JdbcBatchExecuteResult res, long order) {
+        super(BATCH_EXEC_ORDERED, res);
+
+        this.order = order;
+    }
+
+    /**
+     * @return Order.
+     */
+    public long order() {
+        return order;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeBinary(BinaryWriterExImpl writer) throws BinaryObjectException {
+        super.writeBinary(writer);
+
+        writer.writeLong(order);
+    }
+
+
+    /** {@inheritDoc} */
+    @Override public void readBinary(BinaryReaderExImpl reader) throws BinaryObjectException {
+        super.readBinary(reader);
+
+        order = reader.readLong();
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(JdbcOrderedBatchExecuteResult.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/d05c28ca/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequest.java
index 22522ad..3d5b869 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequest.java
@@ -63,6 +63,9 @@ public class JdbcRequest extends ClientListenerRequestNoId implements JdbcRawBin
     /** Send a batch of a data from client to server. */
     static final byte BULK_LOAD_BATCH = 13;
 
+    /** Ordered batch request. */
+    static final byte BATCH_EXEC_ORDERED = 14;
+
     /** Request type. */
     private byte type;
 
@@ -161,6 +164,11 @@ public class JdbcRequest extends ClientListenerRequestNoId implements JdbcRawBin
 
                 break;
 
+            case BATCH_EXEC_ORDERED:
+                req = new JdbcOrderedBatchExecuteRequest();
+
+                break;
+
             default:
                 throw new IgniteException("Unknown SQL listener request ID: [request ID=" + reqType + ']');
         }