You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/02/20 10:32:36 UTC
ignite git commit: IGNITE-4169: SQL: implemented streaming support
for INSERT operations. This closes #1350. This closes #1553.
Repository: ignite
Updated Branches:
refs/heads/ignite-1.9 1a08ef76b -> 0130b097b
IGNITE-4169: SQL: implemented streaming support for INSERT operations. This closes #1350. This closes #1553.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0130b097
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0130b097
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0130b097
Branch: refs/heads/ignite-1.9
Commit: 0130b097b0a24950f369cc5e009a9394c3fd4e1f
Parents: 1a08ef7
Author: Alexander Paschenko <al...@gmail.com>
Authored: Mon Feb 20 13:32:19 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Mon Feb 20 13:32:19 2017 +0300
----------------------------------------------------------------------
.../jdbc2/JdbcAbstractDmlStatementSelfTest.java | 49 +----
.../jdbc2/JdbcInsertStatementSelfTest.java | 51 +++++
.../jdbc2/JdbcMergeStatementSelfTest.java | 51 +++++
.../internal/jdbc2/JdbcStreamingSelfTest.java | 189 ++++++++++++++++
.../jdbc2/JdbcUpdateStatementSelfTest.java | 50 +++++
.../jdbc/suite/IgniteJdbcDriverTestSuite.java | 1 +
.../org/apache/ignite/IgniteJdbcDriver.java | 30 +++
.../ignite/internal/jdbc2/JdbcConnection.java | 72 +++++-
.../internal/jdbc2/JdbcPreparedStatement.java | 34 ++-
.../ignite/internal/jdbc2/JdbcStatement.java | 20 +-
.../jdbc2/JdbcStreamedPreparedStatement.java | 59 +++++
.../processors/query/GridQueryIndexing.java | 35 +++
.../processors/query/GridQueryProcessor.java | 63 +++++-
.../query/h2/DmlStatementsProcessor.java | 219 +++++++++++++------
.../processors/query/h2/IgniteH2Indexing.java | 57 ++++-
15 files changed, 812 insertions(+), 168 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/0130b097/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcAbstractDmlStatementSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcAbstractDmlStatementSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcAbstractDmlStatementSelfTest.java
index 4a97aef..332bbba 100644
--- a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcAbstractDmlStatementSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcAbstractDmlStatementSelfTest.java
@@ -20,8 +20,6 @@ package org.apache.ignite.internal.jdbc2;
import java.io.Serializable;
import java.sql.Connection;
import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.Statement;
import java.util.Collections;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.cache.QueryEntity;
@@ -54,7 +52,7 @@ public abstract class JdbcAbstractDmlStatementSelfTest extends GridCommonAbstrac
static final String BASE_URL_BIN = CFG_URL_PREFIX + "modules/clients/src/test/config/jdbc-bin-config.xml";
/** SQL SELECT query for verification. */
- private static final String SQL_SELECT = "select _key, id, firstName, lastName, age from Person";
+ static final String SQL_SELECT = "select _key, id, firstName, lastName, age from Person";
/** Connection. */
protected Connection conn;
@@ -149,51 +147,6 @@ public abstract class JdbcAbstractDmlStatementSelfTest extends GridCommonAbstrac
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
- try (Statement selStmt = conn.createStatement()) {
- assert selStmt.execute(SQL_SELECT);
-
- ResultSet rs = selStmt.getResultSet();
-
- assert rs != null;
-
- while (rs.next()) {
- int id = rs.getInt("id");
-
- switch (id) {
- case 1:
- assertEquals("p1", rs.getString("_key"));
- assertEquals("John", rs.getString("firstName"));
- assertEquals("White", rs.getString("lastName"));
- assertEquals(25, rs.getInt("age"));
- break;
-
- case 2:
- assertEquals("p2", rs.getString("_key"));
- assertEquals("Joe", rs.getString("firstName"));
- assertEquals("Black", rs.getString("lastName"));
- assertEquals(35, rs.getInt("age"));
- break;
-
- case 3:
- assertEquals("p3", rs.getString("_key"));
- assertEquals("Mike", rs.getString("firstName"));
- assertEquals("Green", rs.getString("lastName"));
- assertEquals(40, rs.getInt("age"));
- break;
-
- case 4:
- assertEquals("p4", rs.getString("_key"));
- assertEquals("Leah", rs.getString("firstName"));
- assertEquals("Grey", rs.getString("lastName"));
- assertEquals(22, rs.getInt("age"));
- break;
-
- default:
- assert false : "Invalid ID: " + id;
- }
- }
- }
-
grid(0).cache(null).clear();
assertEquals(0, grid(0).cache(null).size(CachePeekMode.ALL));
http://git-wip-us.apache.org/repos/asf/ignite/blob/0130b097/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcInsertStatementSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcInsertStatementSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcInsertStatementSelfTest.java
index 7fc92de..1bd6d34 100644
--- a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcInsertStatementSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcInsertStatementSelfTest.java
@@ -18,12 +18,14 @@
package org.apache.ignite.internal.jdbc2;
import java.sql.PreparedStatement;
+import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
import java.util.HashSet;
import java.util.concurrent.Callable;
import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.testframework.GridTestUtils;
/**
@@ -61,6 +63,55 @@ public class JdbcInsertStatementSelfTest extends JdbcAbstractDmlStatementSelfTes
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
+ try (Statement selStmt = conn.createStatement()) {
+ assertTrue(selStmt.execute(SQL_SELECT));
+
+ ResultSet rs = selStmt.getResultSet();
+
+ assert rs != null;
+
+ while (rs.next()) {
+ int id = rs.getInt("id");
+
+ switch (id) {
+ case 1:
+ assertEquals("p1", rs.getString("_key"));
+ assertEquals("John", rs.getString("firstName"));
+ assertEquals("White", rs.getString("lastName"));
+ assertEquals(25, rs.getInt("age"));
+ break;
+
+ case 2:
+ assertEquals("p2", rs.getString("_key"));
+ assertEquals("Joe", rs.getString("firstName"));
+ assertEquals("Black", rs.getString("lastName"));
+ assertEquals(35, rs.getInt("age"));
+ break;
+
+ case 3:
+ assertEquals("p3", rs.getString("_key"));
+ assertEquals("Mike", rs.getString("firstName"));
+ assertEquals("Green", rs.getString("lastName"));
+ assertEquals(40, rs.getInt("age"));
+ break;
+
+ case 4:
+ assertEquals("p4", rs.getString("_key"));
+ assertEquals("Leah", rs.getString("firstName"));
+ assertEquals("Grey", rs.getString("lastName"));
+ assertEquals(22, rs.getInt("age"));
+ break;
+
+ default:
+ assert false : "Invalid ID: " + id;
+ }
+ }
+ }
+
+ grid(0).cache(null).clear();
+
+ assertEquals(0, grid(0).cache(null).size(CachePeekMode.ALL));
+
super.afterTest();
if (stmt != null && !stmt.isClosed())
http://git-wip-us.apache.org/repos/asf/ignite/blob/0130b097/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcMergeStatementSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcMergeStatementSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcMergeStatementSelfTest.java
index ecf6032..3c56c92 100644
--- a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcMergeStatementSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcMergeStatementSelfTest.java
@@ -18,8 +18,10 @@
package org.apache.ignite.internal.jdbc2;
import java.sql.PreparedStatement;
+import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
+import org.apache.ignite.cache.CachePeekMode;
/**
* MERGE statement test.
@@ -56,6 +58,55 @@ public class JdbcMergeStatementSelfTest extends JdbcAbstractDmlStatementSelfTest
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
+ try (Statement selStmt = conn.createStatement()) {
+ assertTrue(selStmt.execute(SQL_SELECT));
+
+ ResultSet rs = selStmt.getResultSet();
+
+ assert rs != null;
+
+ while (rs.next()) {
+ int id = rs.getInt("id");
+
+ switch (id) {
+ case 1:
+ assertEquals("p1", rs.getString("_key"));
+ assertEquals("John", rs.getString("firstName"));
+ assertEquals("White", rs.getString("lastName"));
+ assertEquals(25, rs.getInt("age"));
+ break;
+
+ case 2:
+ assertEquals("p2", rs.getString("_key"));
+ assertEquals("Joe", rs.getString("firstName"));
+ assertEquals("Black", rs.getString("lastName"));
+ assertEquals(35, rs.getInt("age"));
+ break;
+
+ case 3:
+ assertEquals("p3", rs.getString("_key"));
+ assertEquals("Mike", rs.getString("firstName"));
+ assertEquals("Green", rs.getString("lastName"));
+ assertEquals(40, rs.getInt("age"));
+ break;
+
+ case 4:
+ assertEquals("p4", rs.getString("_key"));
+ assertEquals("Leah", rs.getString("firstName"));
+ assertEquals("Grey", rs.getString("lastName"));
+ assertEquals(22, rs.getInt("age"));
+ break;
+
+ default:
+ assert false : "Invalid ID: " + id;
+ }
+ }
+ }
+
+ grid(0).cache(null).clear();
+
+ assertEquals(0, grid(0).cache(null).size(CachePeekMode.ALL));
+
super.afterTest();
if (stmt != null && !stmt.isClosed())
http://git-wip-us.apache.org/repos/asf/ignite/blob/0130b097/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcStreamingSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcStreamingSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcStreamingSelfTest.java
new file mode 100644
index 0000000..5e206ee
--- /dev/null
+++ b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcStreamingSelfTest.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.jdbc2;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.util.Properties;
+import org.apache.ignite.IgniteJdbcDriver;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.ConnectorConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.IgniteJdbcDriver.CFG_URL_PREFIX;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ * Data streaming test.
+ */
+public class JdbcStreamingSelfTest extends GridCommonAbstractTest {
+ /** IP finder. */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** JDBC URL. */
+ private static final String BASE_URL = CFG_URL_PREFIX + "modules/clients/src/test/config/jdbc-config.xml";
+
+ /** Connection. */
+ protected Connection conn;
+
+ /** */
+ protected transient IgniteLogger log;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ return getConfiguration0(gridName);
+ }
+
+ /**
+ * @param gridName Grid name.
+ * @return Grid configuration used for starting the grid.
+ * @throws Exception If failed.
+ */
+ private IgniteConfiguration getConfiguration0(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ CacheConfiguration<?,?> cache = defaultCacheConfiguration();
+
+ cache.setCacheMode(PARTITIONED);
+ cache.setBackups(1);
+ cache.setWriteSynchronizationMode(FULL_SYNC);
+ cache.setIndexedTypes(
+ Integer.class, Integer.class
+ );
+
+ cfg.setCacheConfiguration(cache);
+
+ TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+ disco.setIpFinder(IP_FINDER);
+
+ cfg.setDiscoverySpi(disco);
+
+ cfg.setConnectorConfiguration(new ConnectorConfiguration());
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ startGridsMultiThreaded(3);
+
+ Class.forName("org.apache.ignite.IgniteJdbcDriver");
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ stopAllGrids();
+ }
+
+ /**
+ * @param allowOverwrite Allow overwriting of existing keys.
+ * @return Connection to use for the test.
+ * @throws Exception if failed.
+ */
+ private Connection createConnection(boolean allowOverwrite) throws Exception {
+ Properties props = new Properties();
+
+ props.setProperty(IgniteJdbcDriver.PROP_STREAMING, "true");
+ props.setProperty(IgniteJdbcDriver.PROP_STREAMING_FLUSH_FREQ, "500");
+
+ if (allowOverwrite)
+ props.setProperty(IgniteJdbcDriver.PROP_STREAMING_ALLOW_OVERWRITE, "true");
+
+ return DriverManager.getConnection(BASE_URL, props);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ U.closeQuiet(conn);
+
+ ignite(0).cache(null).clear();
+
+ super.afterTest();
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ public void testStreamedInsert() throws Exception {
+ conn = createConnection(false);
+
+ ignite(0).cache(null).put(5, 500);
+
+ PreparedStatement stmt = conn.prepareStatement("insert into Integer(_key, _val) values (?, ?)");
+
+ for (int i = 1; i <= 100000; i++) {
+ stmt.setInt(1, i);
+ stmt.setInt(2, i);
+
+ stmt.executeUpdate();
+ }
+
+ // Data is not there yet.
+ assertNull(grid(0).cache(null).get(100000));
+
+ // Let the stream flush.
+ U.sleep(1500);
+
+ // Now let's check it's all there.
+ assertEquals(1, grid(0).cache(null).get(1));
+ assertEquals(100000, grid(0).cache(null).get(100000));
+
+ // 5 should still point to 500.
+ assertEquals(500, grid(0).cache(null).get(5));
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ public void testStreamedInsertWithOverwritesAllowed() throws Exception {
+ conn = createConnection(true);
+
+ ignite(0).cache(null).put(5, 500);
+
+ PreparedStatement stmt = conn.prepareStatement("insert into Integer(_key, _val) values (?, ?)");
+
+ for (int i = 1; i <= 100000; i++) {
+ stmt.setInt(1, i);
+ stmt.setInt(2, i);
+
+ stmt.executeUpdate();
+ }
+
+ // Data is not there yet.
+ assertNull(grid(0).cache(null).get(100000));
+
+ // Let the stream flush.
+ U.sleep(1500);
+
+ // Now let's check it's all there.
+ assertEquals(1, grid(0).cache(null).get(1));
+ assertEquals(100000, grid(0).cache(null).get(100000));
+
+ // 5 should now point to 5 as we've turned overwriting on.
+ assertEquals(5, grid(0).cache(null).get(5));
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0130b097/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcUpdateStatementSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcUpdateStatementSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcUpdateStatementSelfTest.java
new file mode 100644
index 0000000..8ae0e90
--- /dev/null
+++ b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcUpdateStatementSelfTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.jdbc2;
+
+import java.sql.SQLException;
+import java.util.Arrays;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.internal.util.typedef.F;
+
+/**
+ *
+ */
+public class JdbcUpdateStatementSelfTest extends JdbcAbstractUpdateStatementSelfTest {
+ /**
+ *
+ */
+ public void testExecute() throws SQLException {
+ conn.createStatement().execute("update Person set firstName = 'Jack' where " +
+ "cast(substring(_key, 2, 1) as int) % 2 = 0");
+
+ assertEquals(Arrays.asList(F.asList("John"), F.asList("Jack"), F.asList("Mike")),
+ jcache(0).query(new SqlFieldsQuery("select firstName from Person order by _key")).getAll());
+ }
+
+ /**
+ *
+ */
+ public void testExecuteUpdate() throws SQLException {
+ conn.createStatement().executeUpdate("update Person set firstName = 'Jack' where " +
+ "cast(substring(_key, 2, 1) as int) % 2 = 0");
+
+ assertEquals(Arrays.asList(F.asList("John"), F.asList("Jack"), F.asList("Mike")),
+ jcache(0).query(new SqlFieldsQuery("select firstName from Person order by _key")).getAll());
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0130b097/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java
index c41b754..7395fcb 100644
--- a/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java
+++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java
@@ -66,6 +66,7 @@ public class IgniteJdbcDriverTestSuite extends TestSuite {
suite.addTest(new TestSuite(org.apache.ignite.internal.jdbc2.JdbcInsertStatementSelfTest.class));
suite.addTest(new TestSuite(org.apache.ignite.internal.jdbc2.JdbcBinaryMarshallerInsertStatementSelfTest.class));
suite.addTest(new TestSuite(org.apache.ignite.internal.jdbc2.JdbcDeleteStatementSelfTest.class));
+ suite.addTest(new TestSuite(org.apache.ignite.internal.jdbc2.JdbcStreamingSelfTest.class));
return suite;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0130b097/modules/core/src/main/java/org/apache/ignite/IgniteJdbcDriver.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteJdbcDriver.java b/modules/core/src/main/java/org/apache/ignite/IgniteJdbcDriver.java
index d432c1e4..9790b8f 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteJdbcDriver.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteJdbcDriver.java
@@ -292,6 +292,21 @@ public class IgniteJdbcDriver implements Driver {
/** Distributed joins parameter name. */
private static final String PARAM_DISTRIBUTED_JOINS = "distributedJoins";
+ /** DML streaming parameter name. */
+ private static final String PARAM_STREAMING = "streaming";
+
+ /** DML streaming auto flush frequency. */
+ private static final String PARAM_STREAMING_FLUSH_FREQ = "streamingFlushFrequency";
+
+ /** DML streaming node buffer size. */
+ private static final String PARAM_STREAMING_PER_NODE_BUF_SIZE = "streamingPerNodeBufferSize";
+
+ /** DML streaming parallel operations per node. */
+ private static final String PARAM_STREAMING_PER_NODE_PAR_OPS = "streamingPerNodeParallelOperations";
+
+ /** Whether DML streaming will overwrite existing cache entries. */
+ private static final String PARAM_STREAMING_ALLOW_OVERWRITE = "streamingAllowOverwrite";
+
/** Hostname property name. */
public static final String PROP_HOST = PROP_PREFIX + "host";
@@ -313,6 +328,21 @@ public class IgniteJdbcDriver implements Driver {
/** Distributed joins property name. */
public static final String PROP_DISTRIBUTED_JOINS = PROP_PREFIX + PARAM_DISTRIBUTED_JOINS;
+ /** DML streaming property name. */
+ public static final String PROP_STREAMING = PROP_PREFIX + PARAM_STREAMING;
+
+ /** DML stream auto flush frequency property name. */
+ public static final String PROP_STREAMING_FLUSH_FREQ = PROP_PREFIX + PARAM_STREAMING_FLUSH_FREQ;
+
+ /** DML stream node buffer size property name. */
+ public static final String PROP_STREAMING_PER_NODE_BUF_SIZE = PROP_PREFIX + PARAM_STREAMING_PER_NODE_BUF_SIZE;
+
+ /** DML stream parallel operations per node property name. */
+ public static final String PROP_STREAMING_PER_NODE_PAR_OPS = PROP_PREFIX + PARAM_STREAMING_PER_NODE_PAR_OPS;
+
+ /** Whether DML streaming will overwrite existing cache entries. */
+ public static final String PROP_STREAMING_ALLOW_OVERWRITE = PROP_PREFIX + PARAM_STREAMING_ALLOW_OVERWRITE;
+
/** Cache name property name. */
public static final String PROP_CFG = PROP_PREFIX + "cfg";
http://git-wip-us.apache.org/repos/asf/ignite/blob/0130b097/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java
index 5c4a147..4244602 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java
@@ -48,17 +48,21 @@ import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteClientDisconnectedException;
import org.apache.ignite.IgniteCompute;
+import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteJdbcDriver;
import org.apache.ignite.Ignition;
import org.apache.ignite.cluster.ClusterGroup;
import org.apache.ignite.compute.ComputeTaskTimeoutException;
+import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgnitionEx;
import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
import org.apache.ignite.internal.processors.resource.GridSpringResourceContext;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteCallable;
import org.apache.ignite.resources.IgniteInstanceResource;
@@ -73,6 +77,11 @@ import static org.apache.ignite.IgniteJdbcDriver.PROP_COLLOCATED;
import static org.apache.ignite.IgniteJdbcDriver.PROP_DISTRIBUTED_JOINS;
import static org.apache.ignite.IgniteJdbcDriver.PROP_LOCAL;
import static org.apache.ignite.IgniteJdbcDriver.PROP_NODE_ID;
+import static org.apache.ignite.IgniteJdbcDriver.PROP_STREAMING;
+import static org.apache.ignite.IgniteJdbcDriver.PROP_STREAMING_ALLOW_OVERWRITE;
+import static org.apache.ignite.IgniteJdbcDriver.PROP_STREAMING_FLUSH_FREQ;
+import static org.apache.ignite.IgniteJdbcDriver.PROP_STREAMING_PER_NODE_BUF_SIZE;
+import static org.apache.ignite.IgniteJdbcDriver.PROP_STREAMING_PER_NODE_PAR_OPS;
/**
* JDBC connection implementation.
@@ -118,6 +127,21 @@ public class JdbcConnection implements Connection {
/** Distributed joins flag. */
private boolean distributedJoins;
+ /** Make this connection streaming oriented, and prepared statements - data streamer aware. */
+ private final boolean stream;
+
+ /** Auto flush frequency for streaming. */
+ private final long streamFlushTimeout;
+
+ /** Node buffer size for data streamer. */
+ private final int streamNodeBufSize;
+
+ /** Parallel ops count per node for data streamer. */
+ private final int streamNodeParOps;
+
+ /** Allow overwrites for duplicate keys on streamed {@code INSERT}s. */
+ private final boolean streamAllowOverwrite;
+
/** Statements. */
final Set<JdbcStatement> statements = new HashSet<>();
@@ -139,6 +163,14 @@ public class JdbcConnection implements Connection {
this.collocatedQry = Boolean.parseBoolean(props.getProperty(PROP_COLLOCATED));
this.distributedJoins = Boolean.parseBoolean(props.getProperty(PROP_DISTRIBUTED_JOINS));
+ stream = Boolean.parseBoolean(props.getProperty(PROP_STREAMING));
+ streamAllowOverwrite = Boolean.parseBoolean(props.getProperty(PROP_STREAMING_ALLOW_OVERWRITE));
+ streamFlushTimeout = Long.parseLong(props.getProperty(PROP_STREAMING_FLUSH_FREQ, "0"));
+ streamNodeBufSize = Integer.parseInt(props.getProperty(PROP_STREAMING_PER_NODE_BUF_SIZE,
+ String.valueOf(IgniteDataStreamer.DFLT_PER_NODE_BUFFER_SIZE)));
+ streamNodeParOps = Integer.parseInt(props.getProperty(PROP_STREAMING_PER_NODE_PAR_OPS,
+ String.valueOf(IgniteDataStreamer.DFLT_MAX_PARALLEL_OPS)));
+
String nodeIdProp = props.getProperty(PROP_NODE_ID);
if (nodeIdProp != null)
@@ -291,6 +323,14 @@ public class JdbcConnection implements Connection {
closed = true;
+ for (Iterator<JdbcStatement> it = statements.iterator(); it.hasNext();) {
+ JdbcStatement stmt = it.next();
+
+ stmt.closeInternal();
+
+ it.remove();
+ }
+
IgniteNodeFuture fut = NODES.get(cfg);
if (fut != null && fut.release()) {
@@ -299,14 +339,6 @@ public class JdbcConnection implements Connection {
if (ignite != null)
ignite.close();
}
-
- for (Iterator<JdbcStatement> it = statements.iterator(); it.hasNext();) {
- JdbcStatement stmt = it.next();
-
- stmt.closeInternal();
-
- it.remove();
- }
}
/** {@inheritDoc} */
@@ -487,7 +519,18 @@ public class JdbcConnection implements Connection {
if (resSetHoldability != HOLD_CURSORS_OVER_COMMIT)
throw new SQLFeatureNotSupportedException("Invalid holdability (transactions are not supported).");
- JdbcPreparedStatement stmt = new JdbcPreparedStatement(this, sql);
+ JdbcPreparedStatement stmt;
+
+ if (!stream)
+ stmt = new JdbcPreparedStatement(this, sql);
+ else {
+ PreparedStatement nativeStmt = prepareNativeStatement(sql);
+
+ IgniteDataStreamer<?, ?> streamer = ((IgniteEx) ignite).context().query().createStreamer(cacheName,
+ nativeStmt, streamFlushTimeout, streamNodeBufSize, streamNodeParOps, streamAllowOverwrite);
+
+ stmt = new JdbcStreamedPreparedStatement(this, sql, streamer, nativeStmt);
+ }
statements.add(stmt);
@@ -646,12 +689,17 @@ public class JdbcConnection implements Connection {
/** {@inheritDoc} */
@Override public void setSchema(String schema) throws SQLException {
- cacheName = schema;
+ assert ignite instanceof IgniteEx;
+
+ cacheName = ((IgniteEx)ignite).context().query().space(schema);
}
/** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
@Override public String getSchema() throws SQLException {
- return cacheName;
+ String sqlSchema = ignite.cache(cacheName).getConfiguration(CacheConfiguration.class).getSqlSchema();
+
+ return U.firstNotNull(sqlSchema, cacheName, "");
}
/** {@inheritDoc} */
@@ -749,7 +797,7 @@ public class JdbcConnection implements Connection {
*/
PreparedStatement prepareNativeStatement(String sql) throws SQLException {
return ((IgniteCacheProxy) ignite().cache(cacheName())).context()
- .kernalContext().query().prepareNativeStatement(cacheName(), sql);
+ .kernalContext().query().prepareNativeStatement(getSchema(), sql);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/0130b097/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcPreparedStatement.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcPreparedStatement.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcPreparedStatement.java
index 57badd2..54e58e9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcPreparedStatement.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcPreparedStatement.java
@@ -17,12 +17,28 @@
package org.apache.ignite.internal.jdbc2;
-import java.io.*;
-import java.math.*;
-import java.net.*;
-import java.sql.*;
+import java.io.InputStream;
+import java.io.Reader;
+import java.math.BigDecimal;
+import java.net.URL;
+import java.sql.Array;
+import java.sql.Blob;
+import java.sql.Clob;
import java.sql.Date;
-import java.util.*;
+import java.sql.NClob;
+import java.sql.ParameterMetaData;
+import java.sql.PreparedStatement;
+import java.sql.Ref;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.RowId;
+import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
+import java.sql.SQLXML;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Calendar;
/**
* JDBC prepared statement implementation.
@@ -31,10 +47,8 @@ public class JdbcPreparedStatement extends JdbcStatement implements PreparedStat
/** SQL query. */
private final String sql;
- /**
- * H2's parsed statement to retrieve metadata from.
- */
- private PreparedStatement nativeStatement;
+ /** H2's parsed statement to retrieve metadata from. */
+ PreparedStatement nativeStatement;
/**
* Creates new prepared statement.
@@ -55,8 +69,6 @@ public class JdbcPreparedStatement extends JdbcStatement implements PreparedStat
throw new SQLFeatureNotSupportedException("Adding new SQL command to batch not supported for prepared statement.");
}
-
-
/** {@inheritDoc} */
@Override public ResultSet executeQuery() throws SQLException {
ensureNotClosed();
http://git-wip-us.apache.org/repos/asf/ignite/blob/0130b097/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStatement.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStatement.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStatement.java
index d7e387f..44db375 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStatement.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStatement.java
@@ -140,7 +140,7 @@ public class JdbcStatement implements Statement {
updateCnt = -1;
- return doUpdate(sql, getArgs());
+ return Long.valueOf(doUpdate(sql, getArgs())).intValue();
}
/**
@@ -148,9 +148,9 @@ public class JdbcStatement implements Statement {
* @param sql SQL query.
* @param args Update arguments.
* @return Number of affected items.
- * @throws SQLException
+ * @throws SQLException If failed.
*/
- int doUpdate(String sql, Object[] args) throws SQLException {
+ long doUpdate(String sql, Object[] args) throws SQLException {
if (F.isEmpty(sql))
throw new SQLException("SQL query is empty");
@@ -172,11 +172,7 @@ public class JdbcStatement implements Statement {
JdbcQueryTaskV2.QueryResult qryRes =
loc ? qryTask.call() : ignite.compute(ignite.cluster().forNodeId(nodeId)).call(qryTask);
- Long res = updateCounterFromQueryResult(qryRes.getRows());
-
- updateCnt = res;
-
- return res.intValue();
+ return updateCnt = updateCounterFromQueryResult(qryRes.getRows());
}
catch (IgniteSQLException e) {
throw e.toJdbcException();
@@ -194,12 +190,12 @@ public class JdbcStatement implements Statement {
* @return update counter, if found
* @throws SQLException if getting an update counter from result proved to be impossible.
*/
- private static Long updateCounterFromQueryResult(List<List<?>> rows) throws SQLException {
+ private static long updateCounterFromQueryResult(List<List<?>> rows) throws SQLException {
if (F.isEmpty(rows))
- return 0L;
+ return -1;
if (rows.size() != 1)
- throw new SQLException("Expected number of rows of 1 for update operation");
+ throw new SQLException("Expected fetch size of 1 for update operation");
List<?> row = rows.get(0);
@@ -211,7 +207,7 @@ public class JdbcStatement implements Statement {
if (!(objRes instanceof Long))
throw new SQLException("Unexpected update result type");
- return (Long) objRes;
+ return (Long)objRes;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/0130b097/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStreamedPreparedStatement.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStreamedPreparedStatement.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStreamedPreparedStatement.java
new file mode 100644
index 0000000..019923f
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStreamedPreparedStatement.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.jdbc2;
+
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.internal.IgniteEx;
+
+/**
+ * Prepared statement associated with a data streamer.
+ */
+class JdbcStreamedPreparedStatement extends JdbcPreparedStatement {
+ /** */
+ private final IgniteDataStreamer<?, ?> streamer;
+
+ /**
+ * Creates new prepared statement.
+ *
+ * @param conn Connection.
+ * @param sql SQL query.
+ * @param streamer Data streamer to use with this statement. Will be closed on statement close.
+ */
+ JdbcStreamedPreparedStatement(JdbcConnection conn, String sql, IgniteDataStreamer<?, ?> streamer,
+ PreparedStatement nativeStmt) {
+ super(conn, sql);
+
+ this.streamer = streamer;
+
+ nativeStatement = nativeStmt;
+ }
+
+ /** {@inheritDoc} */
+ @Override void closeInternal() throws SQLException {
+ streamer.close(false);
+
+ super.closeInternal();
+ }
+
+ /** {@inheritDoc} */
+ @Override long doUpdate(String sql, Object[] args) throws SQLException {
+ return ((IgniteEx)conn.ignite()).context().query().streamUpdateQuery(conn.cacheName(), streamer, sql, args);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0130b097/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
index ca04724..2abb3a9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
@@ -23,6 +23,7 @@ import java.util.Collection;
import java.util.List;
import javax.cache.Cache;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cache.query.SqlQuery;
@@ -99,6 +100,19 @@ public interface GridQueryIndexing {
GridQueryCancel cancel) throws IgniteCheckedException;
/**
+ * Perform a MERGE statement using data streamer as receiver.
+ *
+ * @param spaceName Space name.
+ * @param qry Query.
+ * @param params Query parameters.
+ * @param streamer Data streamer to feed data to.
+ * @return Query result.
+ * @throws IgniteCheckedException If failed.
+ */
+ public long streamUpdateQuery(@Nullable final String spaceName, final String qry,
+ @Nullable final Object[] params, IgniteDataStreamer<?, ?> streamer) throws IgniteCheckedException;
+
+ /**
* Executes regular query.
*
* @param spaceName Space name.
@@ -241,6 +255,14 @@ public interface GridQueryIndexing {
public PreparedStatement prepareNativeStatement(String schema, String sql) throws SQLException;
/**
+ * Gets space name from database schema.
+ *
+ * @param schemaName Schema name. Could not be null. Could be empty.
+ * @return Space name. Could be null.
+ */
+ public String space(String schemaName);
+
+ /**
* Collect queries that already running more than specified duration.
*
* @param duration Duration to check.
@@ -259,4 +281,17 @@ public interface GridQueryIndexing {
* Cancels all executing queries.
*/
public void cancelAllQueries();
+
+ /**
+ * @param spaceName Space name.
+ * @param nativeStmt Native statement.
+ * @param autoFlushFreq Automatic data flushing frequency, disabled if {@code 0}.
+ * @param nodeBufSize Per node buffer size - see {@link IgniteDataStreamer#perNodeBufferSize(int)}
+ * @param nodeParOps Per node parallel ops count - see {@link IgniteDataStreamer#perNodeParallelOperations(int)}
+ * @param allowOverwrite Overwrite existing cache values on key duplication.
+ * @return {@link IgniteDataStreamer} tailored to specific needs of given native statement based on its metadata;
+ * {@code null} if given statement is a query.
+ */
+ public IgniteDataStreamer<?,?> createStreamer(String spaceName, PreparedStatement nativeStmt, long autoFlushFreq,
+ int nodeBufSize, int nodeParOps, boolean allowOverwrite);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0130b097/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index ee9224b..c6d8270 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -17,12 +17,11 @@
package org.apache.ignite.internal.processors.query;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-import java.util.concurrent.TimeUnit;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.math.BigDecimal;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.ArrayList;
@@ -40,9 +39,11 @@ import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
import javax.cache.Cache;
import javax.cache.CacheException;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.IgniteException;
import org.apache.ignite.binary.BinaryField;
import org.apache.ignite.binary.BinaryObject;
@@ -818,6 +819,36 @@ public class GridQueryProcessor extends GridProcessorAdapter {
}
/**
+ * @param spaceName Cache name.
+ * @param streamer Data streamer.
+ * @param qry Query.
+ * @return Iterator.
+ */
+ public long streamUpdateQuery(@Nullable final String spaceName,
+ final IgniteDataStreamer<?, ?> streamer, final String qry, final Object[] args) {
+ assert streamer != null;
+
+ if (!busyLock.enterBusy())
+ throw new IllegalStateException("Failed to execute query (grid is stopping).");
+
+ try {
+ GridCacheContext cctx = ctx.cache().cache(spaceName).context();
+
+ return executeQuery(GridCacheQueryType.SQL_FIELDS, qry, cctx, new IgniteOutClosureX<Long>() {
+ @Override public Long applyx() throws IgniteCheckedException {
+ return idx.streamUpdateQuery(spaceName, qry, args, streamer);
+ }
+ }, true);
+ }
+ catch (IgniteCheckedException e) {
+ throw new CacheException(e);
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ /**
* @param cctx Cache context.
* @param qry Query.
* @return Cursor.
@@ -964,7 +995,6 @@ public class GridQueryProcessor extends GridProcessorAdapter {
}
/**
- *
* @param schema Schema.
* @param sql Query.
* @return {@link PreparedStatement} from underlying engine to supply metadata to Prepared - most likely H2.
@@ -976,6 +1006,31 @@ public class GridQueryProcessor extends GridProcessorAdapter {
}
/**
+ * @param schema Schema name.
+ * @return space (cache) name from schema name.
+ */
+ public String space(String schema) throws SQLException {
+ checkxEnabled();
+
+ return idx.space(schema);
+ }
+
+ /**
+ * @param spaceName Space name.
+ * @param nativeStmt Native statement.
+ * @param autoFlushFreq Automatic data flushing frequency, disabled if {@code 0}.
+ * @param nodeBufSize Per node buffer size - see {@link IgniteDataStreamer#perNodeBufferSize(int)}
+ * @param nodeParOps Per node parallel ops count - see {@link IgniteDataStreamer#perNodeParallelOperations(int)}
+ * @param allowOverwrite Overwrite existing cache values on key duplication.
+ * @see IgniteDataStreamer#allowOverwrite
+ * @return {@link IgniteDataStreamer} tailored to specific needs of given native statement based on its metadata.
+ */
+ public IgniteDataStreamer<?, ?> createStreamer(String spaceName, PreparedStatement nativeStmt, long autoFlushFreq,
+ int nodeBufSize, int nodeParOps, boolean allowOverwrite) {
+ return idx.createStreamer(spaceName, nativeStmt, autoFlushFreq, nodeBufSize, nodeParOps, allowOverwrite);
+ }
+
+ /**
* @param timeout Timeout.
* @param timeUnit Time unit.
* @return Converted time.
http://git-wip-us.apache.org/repos/asf/ignite/blob/0130b097/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
index 4030758..78c5bbc 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
@@ -21,7 +21,6 @@ import java.lang.reflect.Array;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
@@ -40,6 +39,7 @@ import javax.cache.processor.EntryProcessorResult;
import javax.cache.processor.MutableEntry;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.IgniteException;
import org.apache.ignite.binary.BinaryArrayIdentityResolver;
import org.apache.ignite.binary.BinaryObject;
@@ -60,6 +60,7 @@ import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
import org.apache.ignite.internal.processors.query.h2.dml.FastUpdateArguments;
import org.apache.ignite.internal.processors.query.h2.dml.KeyValueSupplier;
+import org.apache.ignite.internal.processors.query.h2.dml.UpdateMode;
import org.apache.ignite.internal.processors.query.h2.dml.UpdatePlan;
import org.apache.ignite.internal.processors.query.h2.dml.UpdatePlanBuilder;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor;
@@ -126,7 +127,7 @@ public class DmlStatementsProcessor {
* @return Update result (modified items count and failed keys).
* @throws IgniteCheckedException if failed.
*/
- private long updateSqlFields(String spaceName, PreparedStatement stmt, SqlFieldsQuery fieldsQry,
+ private UpdateResult updateSqlFields(String spaceName, PreparedStatement stmt, SqlFieldsQuery fieldsQry,
boolean loc, IndexingQueryFilter filters, GridQueryCancel cancel) throws IgniteCheckedException {
Object[] errKeys = null;
@@ -156,23 +157,27 @@ public class DmlStatementsProcessor {
UpdateResult r;
try {
- r = executeUpdateStatement(cctx, stmt, fieldsQry, loc, filters,
- cancel, errKeys);
+ r = executeUpdateStatement(cctx, stmt, fieldsQry, loc, filters, cancel, errKeys);
}
finally {
cctx.operationContextPerCall(opCtx);
}
- if (F.isEmpty(r.errKeys))
- return r.cnt + items;
- else {
- items += r.cnt;
- errKeys = r.errKeys;
- }
+ items += r.cnt;
+ errKeys = r.errKeys;
+
+ if (F.isEmpty(errKeys))
+ break;
}
- throw new IgniteSQLException("Failed to update or delete some keys: " + Arrays.deepToString(errKeys),
- IgniteQueryErrorCode.CONCURRENT_UPDATE);
+ if (F.isEmpty(errKeys)) {
+ if (items == 1L)
+ return UpdateResult.ONE;
+ else if (items == 0L)
+ return UpdateResult.ZERO;
+ }
+
+ return new UpdateResult(items, errKeys);
}
/**
@@ -186,9 +191,14 @@ public class DmlStatementsProcessor {
@SuppressWarnings("unchecked")
QueryCursorImpl<List<?>> updateSqlFieldsTwoStep(String spaceName, PreparedStatement stmt,
SqlFieldsQuery fieldsQry, GridQueryCancel cancel) throws IgniteCheckedException {
- long res = updateSqlFields(spaceName, stmt, fieldsQry, false, null, cancel);
+ UpdateResult res = updateSqlFields(spaceName, stmt, fieldsQry, false, null, cancel);
+
+ QueryCursorImpl<List<?>> resCur = (QueryCursorImpl<List<?>>)new QueryCursorImpl(Collections.singletonList
+ (Collections.singletonList(res.cnt)), null, false);
- return cursorForUpdateResult(res);
+ resCur.fieldsMeta(UPDATE_RESULT_META);
+
+ return resCur;
}
/**
@@ -203,10 +213,95 @@ public class DmlStatementsProcessor {
@SuppressWarnings("unchecked")
GridQueryFieldsResult updateLocalSqlFields(String spaceName, PreparedStatement stmt,
SqlFieldsQuery fieldsQry, IndexingQueryFilter filters, GridQueryCancel cancel) throws IgniteCheckedException {
- long res = updateSqlFields(spaceName, stmt, fieldsQry, true, filters, cancel);
+ UpdateResult res = updateSqlFields(spaceName, stmt, fieldsQry, true, filters, cancel);
return new GridQueryFieldsResultAdapter(UPDATE_RESULT_META,
- new IgniteSingletonIterator(Collections.singletonList(res)));
+ new IgniteSingletonIterator(Collections.singletonList(res.cnt)));
+ }
+
+ /**
+ * Perform given statement against given data streamer. Only rows based INSERT and MERGE are supported
+ * as well as key bound UPDATE and DELETE (ones with filter {@code WHERE _key = ?}).
+ *
+ * @param streamer Streamer to feed data to.
+ * @param stmt Statement.
+ * @param args Statement arguments.
+ * @return Number of rows in given statement for INSERT and MERGE, {@code 1} otherwise.
+ * @throws IgniteCheckedException if failed.
+ */
+ @SuppressWarnings({"unchecked", "ConstantConditions"})
+ long streamUpdateQuery(IgniteDataStreamer streamer, PreparedStatement stmt, Object[] args)
+ throws IgniteCheckedException {
+ args = U.firstNotNull(args, X.EMPTY_OBJECT_ARRAY);
+
+ Prepared p = GridSqlQueryParser.prepared((JdbcPreparedStatement)stmt);
+
+ assert p != null;
+
+ UpdatePlan plan = UpdatePlanBuilder.planForStatement(p, null);
+
+ if (!F.eq(streamer.cacheName(), plan.tbl.rowDescriptor().context().namex()))
+ throw new IgniteSQLException("Cross cache streaming is not supported, please specify cache explicitly" +
+ " in connection options", IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
+
+ if (plan.mode == UpdateMode.INSERT && plan.rowsNum > 0) {
+ assert plan.isLocSubqry;
+
+ final GridCacheContext cctx = plan.tbl.rowDescriptor().context();
+
+ QueryCursorImpl<List<?>> cur;
+
+ final ArrayList<List<?>> data = new ArrayList<>(plan.rowsNum);
+
+ final GridQueryFieldsResult res = indexing.queryLocalSqlFields(cctx.name(), plan.selectQry,
+ F.asList(args), null, false, 0, null);
+
+ QueryCursorImpl<List<?>> stepCur = new QueryCursorImpl<>(new Iterable<List<?>>() {
+ @Override public Iterator<List<?>> iterator() {
+ try {
+ return new GridQueryCacheObjectsIterator(res.iterator(), cctx, cctx.keepBinary());
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+ }, null);
+
+ data.addAll(stepCur.getAll());
+
+ cur = new QueryCursorImpl<>(new Iterable<List<?>>() {
+ @Override public Iterator<List<?>> iterator() {
+ return data.iterator();
+ }
+ }, null);
+
+ GridH2RowDescriptor desc = plan.tbl.rowDescriptor();
+
+ if (plan.rowsNum == 1) {
+ IgniteBiTuple t = rowToKeyValue(cctx, cur.iterator().next().toArray(), plan.colNames, plan.colTypes,
+ plan.keySupplier, plan.valSupplier, plan.keyColIdx, plan.valColIdx, desc);
+
+ streamer.addData(t.getKey(), t.getValue());
+
+ return 1;
+ }
+
+ Map<Object, Object> rows = new LinkedHashMap<>(plan.rowsNum);
+
+ for (List<?> row : cur) {
+ final IgniteBiTuple t = rowToKeyValue(cctx, row.toArray(), plan.colNames, plan.colTypes,
+ plan.keySupplier, plan.valSupplier, plan.keyColIdx, plan.valColIdx, desc);
+
+ rows.put(t.getKey(), t.getValue());
+ }
+
+ streamer.addData(rows);
+
+ return rows.size();
+ }
+ else
+ throw new IgniteSQLException("Only tuple based INSERT statements are supported in streaming mode",
+ IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
}
/**
@@ -214,31 +309,21 @@ public class DmlStatementsProcessor {
* @param cctx Cache context.
* @param prepStmt Prepared statement for DML query.
* @param filters Space name and key filter.
- * @param failedKeys Keys to restrict UPDATE and DELETE operations with. Null or empty array means no restriction.
- * @return Pair [number of successfully processed items; keys that have failed to be processed]
+ * @param failedKeys Keys to restrict UPDATE and DELETE operations with. Null or empty array means no restriction. @return Pair [number of successfully processed items; keys that have failed to be processed]
* @throws IgniteCheckedException if failed.
*/
- @SuppressWarnings("ConstantConditions")
+ @SuppressWarnings({"ConstantConditions", "unchecked"})
private UpdateResult executeUpdateStatement(final GridCacheContext cctx, PreparedStatement prepStmt,
- SqlFieldsQuery fieldsQry, boolean loc, IndexingQueryFilter filters, GridQueryCancel cancel, Object[] failedKeys)
- throws IgniteCheckedException {
+ SqlFieldsQuery fieldsQry, boolean loc, IndexingQueryFilter filters, GridQueryCancel cancel,
+ Object[] failedKeys) throws IgniteCheckedException {
Integer errKeysPos = null;
- Object[] params = fieldsQry.getArgs();
-
- if (!F.isEmpty(failedKeys)) {
- int paramsCnt = F.isEmpty(params) ? 0 : params.length;
- params = Arrays.copyOf(U.firstNotNull(params, X.EMPTY_OBJECT_ARRAY), paramsCnt + 1);
- params[paramsCnt] = failedKeys;
- errKeysPos = paramsCnt; // Last position
- }
-
UpdatePlan plan = getPlanForStatement(cctx.name(), prepStmt, errKeysPos);
if (plan.fastUpdateArgs != null) {
assert F.isEmpty(failedKeys) && errKeysPos == null;
- return new UpdateResult(doSingleUpdate(plan, params), X.EMPTY_OBJECT_ARRAY);
+ return doFastUpdate(plan, fieldsQry.getArgs());
}
assert !F.isEmpty(plan.selectQry);
@@ -249,7 +334,7 @@ public class DmlStatementsProcessor {
// subquery and not some dummy stuff like "select 1, 2, 3;"
if (!loc && !plan.isLocSubqry) {
SqlFieldsQuery newFieldsQry = new SqlFieldsQuery(plan.selectQry, fieldsQry.isCollocated())
- .setArgs(params)
+ .setArgs(fieldsQry.getArgs())
.setDistributedJoins(fieldsQry.isDistributedJoins())
.setEnforceJoinOrder(fieldsQry.isEnforceJoinOrder())
.setLocal(fieldsQry.isLocal())
@@ -259,8 +344,8 @@ public class DmlStatementsProcessor {
cur = (QueryCursorImpl<List<?>>) indexing.queryTwoStep(cctx, newFieldsQry, cancel);
}
else {
- final GridQueryFieldsResult res = indexing.queryLocalSqlFields(cctx.name(), plan.selectQry, F.asList(params),
- filters, fieldsQry.isEnforceJoinOrder(), fieldsQry.getTimeout(), cancel);
+ final GridQueryFieldsResult res = indexing.queryLocalSqlFields(cctx.name(), plan.selectQry,
+ F.asList(fieldsQry.getArgs()), filters, fieldsQry.isEnforceJoinOrder(), fieldsQry.getTimeout(), cancel);
cur = new QueryCursorImpl<>(new Iterable<List<?>>() {
@Override public Iterator<List<?>> iterator() {
@@ -272,8 +357,6 @@ public class DmlStatementsProcessor {
}
}
}, cancel);
-
- cur.fieldsMeta(res.metaData());
}
int pageSize = loc ? 0 : fieldsQry.getPageSize();
@@ -337,38 +420,41 @@ public class DmlStatementsProcessor {
/**
* Perform single cache operation based on given args.
- * @param params Query parameters.
+ * @param args Query parameters.
* @return 1 if an item was affected, 0 otherwise.
* @throws IgniteCheckedException if failed.
*/
@SuppressWarnings({"unchecked", "ConstantConditions"})
- private static long doSingleUpdate(UpdatePlan plan, Object[] params) throws IgniteCheckedException {
+ private static UpdateResult doFastUpdate(UpdatePlan plan, Object[] args) throws IgniteCheckedException {
GridCacheContext cctx = plan.tbl.rowDescriptor().context();
FastUpdateArguments singleUpdate = plan.fastUpdateArgs;
assert singleUpdate != null;
- int res;
+ boolean valBounded = (singleUpdate.val != FastUpdateArguments.NULL_ARGUMENT);
- Object key = singleUpdate.key.apply(params);
- Object val = singleUpdate.val.apply(params);
- Object newVal = singleUpdate.newVal.apply(params);
+ if (singleUpdate.newVal != FastUpdateArguments.NULL_ARGUMENT) { // Single item UPDATE
+ Object key = singleUpdate.key.apply(args);
+ Object newVal = singleUpdate.newVal.apply(args);
- if (newVal != null) { // Single item UPDATE
- if (val == null) // No _val bound in source query
- res = cctx.cache().replace(key, newVal) ? 1 : 0;
+ if (valBounded) {
+ Object val = singleUpdate.val.apply(args);
+
+ return (cctx.cache().replace(key, val, newVal) ? UpdateResult.ONE : UpdateResult.ZERO);
+ }
else
- res = cctx.cache().replace(key, val, newVal) ? 1 : 0;
+ return (cctx.cache().replace(key, newVal) ? UpdateResult.ONE : UpdateResult.ZERO);
}
else { // Single item DELETE
- if (val == null) // No _val bound in source query
- res = cctx.cache().remove(key) ? 1 : 0;
+ Object key = singleUpdate.key.apply(args);
+ Object val = singleUpdate.val.apply(args);
+
+ if (singleUpdate.val == FastUpdateArguments.NULL_ARGUMENT) // No _val bound in source query
+ return cctx.cache().remove(key) ? UpdateResult.ONE : UpdateResult.ZERO;
else
- res = cctx.cache().remove(key, val) ? 1 : 0;
+ return cctx.cache().remove(key, val) ? UpdateResult.ONE : UpdateResult.ZERO;
}
-
- return res;
}
/**
@@ -379,7 +465,7 @@ public class DmlStatementsProcessor {
* @return Results of DELETE (number of items affected AND keys that failed to be updated).
*/
@SuppressWarnings({"unchecked", "ConstantConditions", "ThrowableResultOfMethodCallIgnored"})
- private UpdateResult doDelete(GridCacheContext cctx, QueryCursorImpl<List<?>> cursor, int pageSize)
+ private UpdateResult doDelete(GridCacheContext cctx, Iterable<List<?>> cursor, int pageSize)
throws IgniteCheckedException {
// With DELETE, we have only two columns - key and value.
long res = 0;
@@ -449,7 +535,7 @@ public class DmlStatementsProcessor {
* had been modified concurrently (arguments for a re-run)].
*/
@SuppressWarnings({"unchecked", "ThrowableResultOfMethodCallIgnored"})
- private UpdateResult doUpdate(UpdatePlan plan, QueryCursorImpl<List<?>> cursor, int pageSize)
+ private UpdateResult doUpdate(UpdatePlan plan, Iterable<List<?>> cursor, int pageSize)
throws IgniteCheckedException {
GridH2RowDescriptor desc = plan.tbl.rowDescriptor();
@@ -575,7 +661,6 @@ public class DmlStatementsProcessor {
throw new IgniteSQLException(resEx);
}
-
return new UpdateResult(res, failedKeys.toArray());
}
@@ -689,7 +774,7 @@ public class DmlStatementsProcessor {
* @throws IgniteCheckedException if failed.
*/
@SuppressWarnings("unchecked")
- private long doMerge(UpdatePlan plan, QueryCursorImpl<List<?>> cursor, int pageSize) throws IgniteCheckedException {
+ private long doMerge(UpdatePlan plan, Iterable<List<?>> cursor, int pageSize) throws IgniteCheckedException {
GridH2RowDescriptor desc = plan.tbl.rowDescriptor();
GridCacheContext cctx = desc.context();
@@ -735,7 +820,7 @@ public class DmlStatementsProcessor {
* @throws IgniteCheckedException if failed, particularly in case of duplicate keys.
*/
@SuppressWarnings({"unchecked", "ConstantConditions"})
- private long doInsert(UpdatePlan plan, QueryCursorImpl<List<?>> cursor, int pageSize) throws IgniteCheckedException {
+ private long doInsert(UpdatePlan plan, Iterable<List<?>> cursor, int pageSize) throws IgniteCheckedException {
GridH2RowDescriptor desc = plan.tbl.rowDescriptor();
GridCacheContext cctx = desc.context();
@@ -999,24 +1084,14 @@ public class DmlStatementsProcessor {
}
}
- /**
- * Wrap result of DML operation (number of items affected) to Iterable suitable to be wrapped by cursor.
- *
- * @param itemsCnt Update result to wrap.
- * @return Resulting Iterable.
- */
- @SuppressWarnings("unchecked")
- private static QueryCursorImpl<List<?>> cursorForUpdateResult(long itemsCnt) {
- QueryCursorImpl<List<?>> res =
- new QueryCursorImpl(Collections.singletonList(Collections.singletonList(itemsCnt)), null, false);
-
- res.fieldsMeta(UPDATE_RESULT_META);
-
- return res;
- }
-
/** Update result - modifications count and keys to re-run query with, if needed. */
private final static class UpdateResult {
+ /** Result to return for operations that affected 1 item - mostly to be used for fast updates and deletes. */
+ final static UpdateResult ONE = new UpdateResult(1, X.EMPTY_OBJECT_ARRAY);
+
+ /** Result to return for operations that affected 0 items - mostly to be used for fast updates and deletes. */
+ final static UpdateResult ZERO = new UpdateResult(0, X.EMPTY_OBJECT_ARRAY);
+
/** Number of processed items. */
final long cnt;
http://git-wip-us.apache.org/repos/asf/ignite/blob/0130b097/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index e4b0c1f..8088f80 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -56,6 +56,7 @@ import java.util.concurrent.atomic.AtomicLong;
import javax.cache.Cache;
import javax.cache.CacheException;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.CacheMemoryMode;
@@ -82,7 +83,6 @@ import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable;
import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery;
import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
-import org.apache.ignite.internal.processors.query.GridRunningQueryInfo;
import org.apache.ignite.internal.processors.query.GridQueryCancel;
import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
import org.apache.ignite.internal.processors.query.GridQueryFieldsResult;
@@ -91,6 +91,7 @@ import org.apache.ignite.internal.processors.query.GridQueryIndexDescriptor;
import org.apache.ignite.internal.processors.query.GridQueryIndexing;
import org.apache.ignite.internal.processors.query.GridQueryProperty;
import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
+import org.apache.ignite.internal.processors.query.GridRunningQueryInfo;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2DefaultTableEngine;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2KeyValueRowOffheap;
@@ -136,6 +137,7 @@ import org.h2.api.ErrorCode;
import org.h2.api.JavaObjectSerializer;
import org.h2.command.CommandInterface;
import org.h2.command.Prepared;
+import org.h2.command.dml.Insert;
import org.h2.engine.Session;
import org.h2.engine.SysProperties;
import org.h2.index.Index;
@@ -440,7 +442,32 @@ public class IgniteH2Indexing implements GridQueryIndexing {
/** {@inheritDoc} */
@Override public PreparedStatement prepareNativeStatement(String schema, String sql) throws SQLException {
- return prepareStatement(connectionForSpace(schema), sql, false);
+ return prepareStatement(connectionForSpace(space(schema)), sql, true);
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override public IgniteDataStreamer<?, ?> createStreamer(String spaceName, PreparedStatement nativeStmt,
+ long autoFlushFreq, int nodeBufSize, int nodeParOps, boolean allowOverwrite) {
+ Prepared prep = GridSqlQueryParser.prepared((JdbcPreparedStatement) nativeStmt);
+
+ if (!(prep instanceof Insert))
+ throw new IgniteSQLException("Only INSERT operations are supported in streaming mode",
+ IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
+
+ IgniteDataStreamer streamer = ctx.grid().dataStreamer(spaceName);
+
+ streamer.autoFlushFrequency(autoFlushFreq);
+
+ streamer.allowOverwrite(allowOverwrite);
+
+ if (nodeBufSize > 0)
+ streamer.perNodeBufferSize(nodeBufSize);
+
+ if (nodeParOps > 0)
+ streamer.perNodeParallelOperations(nodeParOps);
+
+ return streamer;
}
/**
@@ -873,6 +900,23 @@ public class IgniteH2Indexing implements GridQueryIndexing {
};
}
+ /** {@inheritDoc} */
+ @Override public long streamUpdateQuery(@Nullable String spaceName, String qry,
+ @Nullable Object[] params, IgniteDataStreamer<?, ?> streamer) throws IgniteCheckedException {
+ final Connection conn = connectionForSpace(spaceName);
+
+ final PreparedStatement stmt;
+
+ try {
+ stmt = prepareStatement(conn, qry, true);
+ }
+ catch (SQLException e) {
+ throw new IgniteSQLException(e);
+ }
+
+ return dmlProc.streamUpdateQuery(streamer, stmt, params);
+ }
+
/**
* @param rsMeta Metadata.
* @return List of fields metadata.
@@ -1680,13 +1724,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
}
- /**
- * Gets space name from database schema.
- *
- * @param schemaName Schema name. Could not be null. Could be empty.
- * @return Space name. Could be null.
- */
- public String space(String schemaName) {
+ /** {@inheritDoc} */
+ @Override public String space(String schemaName) {
assert schemaName != null;
Schema schema = schemas.get(schemaName);