You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/10/13 10:23:09 UTC
[1/7] ignite git commit: IGNITE-6127 Fixed bytes encoding.
Repository: ignite
Updated Branches:
refs/heads/ignite-3478 f29d4bc50 -> 4c06131bd
IGNITE-6127 Fixed bytes encoding.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0f3f7d20
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0f3f7d20
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0f3f7d20
Branch: refs/heads/ignite-3478
Commit: 0f3f7d20048a13b561ddcac4537f15c2ce9bc8ed
Parents: b8b7c50
Author: Alexey Kuznetsov <ak...@apache.org>
Authored: Thu Oct 12 22:48:35 2017 +0700
Committer: Alexey Kuznetsov <ak...@apache.org>
Committed: Thu Oct 12 22:48:35 2017 +0700
----------------------------------------------------------------------
.../apache/ignite/console/agent/handlers/AbstractListener.java | 6 +++++-
1 file changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/0f3f7d20/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/handlers/AbstractListener.java
----------------------------------------------------------------------
diff --git a/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/handlers/AbstractListener.java b/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/handlers/AbstractListener.java
index 2eec89b..fc4daef 100644
--- a/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/handlers/AbstractListener.java
+++ b/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/handlers/AbstractListener.java
@@ -20,6 +20,7 @@ package org.apache.ignite.console.agent.handlers;
import io.socket.client.Ack;
import io.socket.emitter.Emitter;
import java.io.ByteArrayOutputStream;
+import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
@@ -39,6 +40,9 @@ import static org.apache.ignite.console.agent.AgentUtils.toJSON;
* Base class for web socket handlers.
*/
abstract class AbstractListener implements Emitter.Listener {
+ /** UTF8 charset. */
+ private static final Charset UTF8 = Charset.forName("UTF-8");
+
/** */
private ExecutorService pool;
@@ -81,7 +85,7 @@ abstract class AbstractListener implements Emitter.Listener {
Base64OutputStream b64os = new Base64OutputStream(baos);
GZIPOutputStream gzip = new GZIPOutputStream(b64os);
- gzip.write(restRes.getData().getBytes());
+ gzip.write(restRes.getData().getBytes(UTF8));
gzip.close();
[6/7] ignite git commit: IGNITE-6024: SQL: Implemented
"skipReducerOnUpdate" flag. This closes #2488.
Posted by sb...@apache.org.
IGNITE-6024: SQL: Implemented "skipReducerOnUpdate" flag. This closes #2488.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ae02a1d3
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ae02a1d3
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ae02a1d3
Branch: refs/heads/ignite-3478
Commit: ae02a1d3c673f080d6744ff1d3384f9d48a34dea
Parents: 5ec744c
Author: Sergey Kalashnikov <sk...@gridgain.com>
Authored: Fri Oct 13 12:29:53 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Fri Oct 13 12:29:53 2017 +0300
----------------------------------------------------------------------
.../internal/jdbc2/JdbcConnectionSelfTest.java | 13 +-
.../jdbc/suite/IgniteJdbcDriverTestSuite.java | 11 +
.../JdbcThinAbstractDmlStatementSelfTest.java | 14 +-
.../thin/JdbcThinComplexDmlDdlSelfTest.java | 10 +-
...omplexDmlDdlSkipReducerOnUpdateSelfTest.java | 33 +
.../jdbc/thin/JdbcThinConnectionSelfTest.java | 18 +-
.../thin/JdbcThinInsertStatementSelfTest.java | 1 -
...ertStatementSkipReducerOnUpdateSelfTest.java | 33 +
...rgeStatementSkipReducerOnUpdateSelfTest.java | 33 +
...ateStatementSkipReducerOnUpdateSelfTest.java | 33 +
.../ignite/codegen/MessageCodeGenerator.java | 2 +
.../org/apache/ignite/IgniteJdbcDriver.java | 9 +-
.../org/apache/ignite/IgniteJdbcThinDriver.java | 3 +-
.../ignite/cache/query/SqlFieldsQuery.java | 7 +
.../internal/jdbc/thin/JdbcThinConnection.java | 4 +-
.../internal/jdbc/thin/JdbcThinTcpIo.java | 15 +-
.../internal/jdbc/thin/JdbcThinUtils.java | 6 +
.../internal/jdbc2/JdbcBatchUpdateTask.java | 3 +-
.../ignite/internal/jdbc2/JdbcConnection.java | 14 +-
.../jdbc2/JdbcQueryMultipleStatementsTask.java | 3 +-
.../ignite/internal/jdbc2/JdbcQueryTask.java | 10 +-
.../ignite/internal/jdbc2/JdbcQueryTaskV3.java | 19 +-
.../ignite/internal/jdbc2/JdbcResultSet.java | 2 +-
.../internal/jdbc2/JdbcSqlFieldsQuery.java | 105 ---
.../ignite/internal/jdbc2/JdbcStatement.java | 4 +-
.../cache/query/GridCacheSqlQuery.java | 24 +
.../cache/query/SqlFieldsQueryEx.java | 158 ++++
.../odbc/jdbc/JdbcConnectionContext.java | 7 +-
.../odbc/jdbc/JdbcRequestHandler.java | 19 +-
.../odbc/odbc/OdbcConnectionContext.java | 13 +-
.../odbc/odbc/OdbcRequestHandler.java | 14 +-
.../resources/META-INF/classnames.properties | 4 +-
.../query/h2/DmlStatementsProcessor.java | 160 ++--
.../processors/query/h2/H2DmlPlanKey.java | 21 +-
.../processors/query/h2/IgniteH2Indexing.java | 116 ++-
.../processors/query/h2/UpdateResult.java | 63 ++
.../processors/query/h2/dml/UpdatePlan.java | 64 +-
.../query/h2/dml/UpdatePlanBuilder.java | 117 ++-
.../query/h2/sql/GridSqlQuerySplitter.java | 33 +
.../query/h2/twostep/DistributedUpdateRun.java | 133 ++++
.../query/h2/twostep/GridMapQueryExecutor.java | 136 ++++
.../h2/twostep/GridReduceQueryExecutor.java | 294 ++++++-
.../query/h2/twostep/MapNodeResults.java | 33 +
.../query/h2/twostep/msg/GridH2DmlRequest.java | 516 ++++++++++++
.../query/h2/twostep/msg/GridH2DmlResponse.java | 250 ++++++
.../twostep/msg/GridH2ValueMessageFactory.java | 6 +
...teSqlSkipReducerOnUpdateDmlFlagSelfTest.java | 783 +++++++++++++++++++
...IgniteSqlSkipReducerOnUpdateDmlSelfTest.java | 755 ++++++++++++++++++
.../IgniteCacheQuerySelfTestSuite.java | 4 +
.../cpp/odbc-test/src/configuration_test.cpp | 25 +-
.../cpp/odbc-test/src/queries_test.cpp | 8 +
.../include/ignite/odbc/config/configuration.h | 26 +
.../cpp/odbc/include/ignite/odbc/message.h | 6 +-
.../odbc/include/ignite/odbc/protocol_version.h | 1 +
.../odbc/system/ui/dsn_configuration_window.h | 4 +
.../src/system/ui/dsn_configuration_window.cpp | 20 +
.../cpp/odbc/src/config/configuration.cpp | 50 +-
modules/platforms/cpp/odbc/src/connection.cpp | 5 +-
modules/platforms/cpp/odbc/src/dsn_config.cpp | 4 +
modules/platforms/cpp/odbc/src/message.cpp | 12 +-
.../platforms/cpp/odbc/src/protocol_version.cpp | 6 +-
61 files changed, 3999 insertions(+), 296 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcConnectionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcConnectionSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcConnectionSelfTest.java
index aeb7c76..35d0fba 100644
--- a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcConnectionSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcConnectionSelfTest.java
@@ -31,7 +31,6 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.jetbrains.annotations.NotNull;
-import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.IgniteJdbcDriver.CFG_URL_PREFIX;
@@ -315,6 +314,7 @@ public class JdbcConnectionSelfTest extends GridCommonAbstractTest {
assertFalse(((JdbcConnection)conn).isDistributedJoins());
assertFalse(((JdbcConnection)conn).isCollocatedQuery());
assertFalse(((JdbcConnection)conn).isLazy());
+ assertFalse(((JdbcConnection)conn).skipReducerOnUpdate());
}
try (final Connection conn = DriverManager.getConnection(CFG_URL_PREFIX + "distributedJoins=true@"
@@ -323,6 +323,7 @@ public class JdbcConnectionSelfTest extends GridCommonAbstractTest {
assertTrue(((JdbcConnection)conn).isDistributedJoins());
assertFalse(((JdbcConnection)conn).isCollocatedQuery());
assertFalse(((JdbcConnection)conn).isLazy());
+ assertFalse(((JdbcConnection)conn).skipReducerOnUpdate());
}
try (final Connection conn = DriverManager.getConnection(CFG_URL_PREFIX + "collocated=true@"
@@ -331,6 +332,7 @@ public class JdbcConnectionSelfTest extends GridCommonAbstractTest {
assertFalse(((JdbcConnection)conn).isDistributedJoins());
assertTrue(((JdbcConnection)conn).isCollocatedQuery());
assertFalse(((JdbcConnection)conn).isLazy());
+ assertFalse(((JdbcConnection)conn).skipReducerOnUpdate());
}
try (final Connection conn = DriverManager.getConnection(CFG_URL_PREFIX + "lazy=true@" + configURL())) {
@@ -338,6 +340,15 @@ public class JdbcConnectionSelfTest extends GridCommonAbstractTest {
assertFalse(((JdbcConnection)conn).isDistributedJoins());
assertFalse(((JdbcConnection)conn).isCollocatedQuery());
assertTrue(((JdbcConnection)conn).isLazy());
+ assertFalse(((JdbcConnection)conn).skipReducerOnUpdate());
+ }
+ try (final Connection conn = DriverManager.getConnection(CFG_URL_PREFIX + "skipReducerOnUpdate=true@"
+ + configURL())) {
+ assertFalse(((JdbcConnection)conn).isEnforceJoinOrder());
+ assertFalse(((JdbcConnection)conn).isDistributedJoins());
+ assertFalse(((JdbcConnection)conn).isCollocatedQuery());
+ assertFalse(((JdbcConnection)conn).isLazy());
+ assertTrue(((JdbcConnection)conn).skipReducerOnUpdate());
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java
index 1ae2427..bec388a 100644
--- a/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java
+++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java
@@ -58,6 +58,10 @@ import org.apache.ignite.jdbc.thin.JdbcThinSchemaCaseTest;
import org.apache.ignite.jdbc.thin.JdbcThinSelectAfterAlterTable;
import org.apache.ignite.jdbc.thin.JdbcThinStatementSelfTest;
import org.apache.ignite.jdbc.thin.JdbcThinUpdateStatementSelfTest;
+import org.apache.ignite.jdbc.thin.JdbcThinComplexDmlDdlSkipReducerOnUpdateSelfTest;
+import org.apache.ignite.jdbc.thin.JdbcThinInsertStatementSkipReducerOnUpdateSelfTest;
+import org.apache.ignite.jdbc.thin.JdbcThinMergeStatementSkipReducerOnUpdateSelfTest;
+import org.apache.ignite.jdbc.thin.JdbcThinUpdateStatementSkipReducerOnUpdateSelfTest;
/**
* JDBC driver test suite.
@@ -152,6 +156,13 @@ public class IgniteJdbcDriverTestSuite extends TestSuite {
suite.addTest(new TestSuite(JdbcThinSelectAfterAlterTable.class));
+ // Update on server
+ suite.addTest(new TestSuite(JdbcThinInsertStatementSkipReducerOnUpdateSelfTest.class));
+ suite.addTest(new TestSuite(JdbcThinUpdateStatementSkipReducerOnUpdateSelfTest.class));
+ suite.addTest(new TestSuite(JdbcThinMergeStatementSkipReducerOnUpdateSelfTest.class));
+ suite.addTest(new TestSuite(JdbcThinComplexDmlDdlSkipReducerOnUpdateSelfTest.class));
+
+
return suite;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinAbstractDmlStatementSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinAbstractDmlStatementSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinAbstractDmlStatementSelfTest.java
index afe5e2e..69435da 100644
--- a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinAbstractDmlStatementSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinAbstractDmlStatementSelfTest.java
@@ -20,6 +20,7 @@ package org.apache.ignite.jdbc.thin;
import java.io.Serializable;
import java.sql.Connection;
import java.sql.DriverManager;
+import java.sql.SQLException;
import java.util.Collections;
import org.apache.ignite.cache.QueryEntity;
import org.apache.ignite.cache.query.annotations.QuerySqlField;
@@ -42,9 +43,6 @@ public abstract class JdbcThinAbstractDmlStatementSelfTest extends JdbcThinAbstr
/** IP finder. */
private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
- /** URL. */
- private static final String URL = "jdbc:ignite:thin://127.0.0.1/";
-
/** SQL SELECT query for verification. */
static final String SQL_SELECT = "select _key, id, firstName, lastName, age from Person";
@@ -67,7 +65,7 @@ public abstract class JdbcThinAbstractDmlStatementSelfTest extends JdbcThinAbstr
@Override protected void beforeTest() throws Exception {
ignite(0).getOrCreateCache(cacheConfig());
- conn = DriverManager.getConnection(URL);
+ conn = createConnection();
conn.setSchema('"' + DEFAULT_CACHE_NAME + '"');
}
@@ -81,6 +79,14 @@ public abstract class JdbcThinAbstractDmlStatementSelfTest extends JdbcThinAbstr
assertTrue(conn.isClosed());
}
+ /**
+ * @return JDBC connection.
+ * @throws SQLException On error.
+ */
+ protected Connection createConnection() throws SQLException {
+ return DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1/");
+ }
+
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
return getConfiguration0(igniteInstanceName);
http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinComplexDmlDdlSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinComplexDmlDdlSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinComplexDmlDdlSelfTest.java
index 0760107..d4e03bc 100644
--- a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinComplexDmlDdlSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinComplexDmlDdlSelfTest.java
@@ -93,6 +93,14 @@ public class JdbcThinComplexDmlDdlSelfTest extends GridCommonAbstractTest {
return cfg;
}
+ /**
+ * @return JDBC connection.
+ * @throws SQLException On error.
+ */
+ protected Connection createConnection() throws SQLException {
+ return DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1");
+ }
+
/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
super.beforeTestsStarted();
@@ -109,7 +117,7 @@ public class JdbcThinComplexDmlDdlSelfTest extends GridCommonAbstractTest {
@Override protected void beforeTest() throws Exception {
super.beforeTest();
- conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1");
+ conn = createConnection();
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinComplexDmlDdlSkipReducerOnUpdateSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinComplexDmlDdlSkipReducerOnUpdateSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinComplexDmlDdlSkipReducerOnUpdateSelfTest.java
new file mode 100644
index 0000000..7ae6479
--- /dev/null
+++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinComplexDmlDdlSkipReducerOnUpdateSelfTest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.jdbc.thin;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import org.apache.ignite.jdbc.thin.JdbcThinComplexDmlDdlSelfTest;
+
+/**
+ * Base class for complex SQL tests based on JDBC driver.
+ */
+public class JdbcThinComplexDmlDdlSkipReducerOnUpdateSelfTest extends JdbcThinComplexDmlDdlSelfTest {
+ /** {@inheritDoc} */
+ @Override protected Connection createConnection() throws SQLException {
+ return DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1?skipReducerOnUpdate=true");
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionSelfTest.java
index fbbec0d..7f67136 100644
--- a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionSelfTest.java
@@ -187,6 +187,7 @@ public class JdbcThinConnectionSelfTest extends JdbcThinAbstractSelfTest {
assertFalse(io(conn).collocated());
assertFalse(io(conn).replicatedOnly());
assertFalse(io(conn).lazy());
+ assertFalse(io(conn).skipReducerOnUpdate());
}
try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1?distributedJoins=true")) {
@@ -195,6 +196,7 @@ public class JdbcThinConnectionSelfTest extends JdbcThinAbstractSelfTest {
assertFalse(io(conn).collocated());
assertFalse(io(conn).replicatedOnly());
assertFalse(io(conn).lazy());
+ assertFalse(io(conn).skipReducerOnUpdate());
}
try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1?enforceJoinOrder=true")) {
@@ -203,6 +205,7 @@ public class JdbcThinConnectionSelfTest extends JdbcThinAbstractSelfTest {
assertFalse(io(conn).collocated());
assertFalse(io(conn).replicatedOnly());
assertFalse(io(conn).lazy());
+ assertFalse(io(conn).skipReducerOnUpdate());
}
try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1?collocated=true")) {
@@ -211,6 +214,7 @@ public class JdbcThinConnectionSelfTest extends JdbcThinAbstractSelfTest {
assertTrue(io(conn).collocated());
assertFalse(io(conn).replicatedOnly());
assertFalse(io(conn).lazy());
+ assertFalse(io(conn).skipReducerOnUpdate());
}
try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1?replicatedOnly=true")) {
@@ -219,6 +223,7 @@ public class JdbcThinConnectionSelfTest extends JdbcThinAbstractSelfTest {
assertFalse(io(conn).collocated());
assertTrue(io(conn).replicatedOnly());
assertFalse(io(conn).lazy());
+ assertFalse(io(conn).skipReducerOnUpdate());
}
try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1?lazy=true")) {
@@ -227,15 +232,26 @@ public class JdbcThinConnectionSelfTest extends JdbcThinAbstractSelfTest {
assertFalse(io(conn).collocated());
assertFalse(io(conn).replicatedOnly());
assertTrue(io(conn).lazy());
+ assertFalse(io(conn).skipReducerOnUpdate());
+ }
+
+ try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1?skipReducerOnUpdate=true")) {
+ assertFalse(io(conn).distributedJoins());
+ assertFalse(io(conn).enforceJoinOrder());
+ assertFalse(io(conn).collocated());
+ assertFalse(io(conn).replicatedOnly());
+ assertFalse(io(conn).lazy());
+ assertTrue(io(conn).skipReducerOnUpdate());
}
try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1?distributedJoins=true&" +
- "enforceJoinOrder=true&collocated=true&replicatedOnly=true&lazy=true")) {
+ "enforceJoinOrder=true&collocated=true&replicatedOnly=true&lazy=true&skipReducerOnUpdate=true")) {
assertTrue(io(conn).distributedJoins());
assertTrue(io(conn).enforceJoinOrder());
assertTrue(io(conn).collocated());
assertTrue(io(conn).replicatedOnly());
assertTrue(io(conn).lazy());
+ assertTrue(io(conn).skipReducerOnUpdate());
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinInsertStatementSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinInsertStatementSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinInsertStatementSelfTest.java
index 8ab5760..bf55da0 100644
--- a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinInsertStatementSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinInsertStatementSelfTest.java
@@ -24,7 +24,6 @@ import java.sql.Statement;
import java.util.Arrays;
import java.util.HashSet;
import java.util.concurrent.Callable;
-import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.testframework.GridTestUtils;
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinInsertStatementSkipReducerOnUpdateSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinInsertStatementSkipReducerOnUpdateSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinInsertStatementSkipReducerOnUpdateSelfTest.java
new file mode 100644
index 0000000..d99639f
--- /dev/null
+++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinInsertStatementSkipReducerOnUpdateSelfTest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.jdbc.thin;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import org.apache.ignite.jdbc.thin.JdbcThinInsertStatementSelfTest;
+
+/**
+ * Statement test.
+ */
+public class JdbcThinInsertStatementSkipReducerOnUpdateSelfTest extends JdbcThinInsertStatementSelfTest {
+ /** {@inheritDoc} */
+ @Override protected Connection createConnection() throws SQLException {
+ return DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1?skipReducerOnUpdate=true");
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinMergeStatementSkipReducerOnUpdateSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinMergeStatementSkipReducerOnUpdateSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinMergeStatementSkipReducerOnUpdateSelfTest.java
new file mode 100644
index 0000000..0832fb7
--- /dev/null
+++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinMergeStatementSkipReducerOnUpdateSelfTest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.jdbc.thin;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import org.apache.ignite.jdbc.thin.JdbcThinMergeStatementSelfTest;
+
+/**
+ * MERGE statement test.
+ */
+public class JdbcThinMergeStatementSkipReducerOnUpdateSelfTest extends JdbcThinMergeStatementSelfTest {
+ /** {@inheritDoc} */
+ @Override protected Connection createConnection() throws SQLException {
+ return DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1?skipReducerOnUpdate=true");
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinUpdateStatementSkipReducerOnUpdateSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinUpdateStatementSkipReducerOnUpdateSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinUpdateStatementSkipReducerOnUpdateSelfTest.java
new file mode 100644
index 0000000..475a77f
--- /dev/null
+++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinUpdateStatementSkipReducerOnUpdateSelfTest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.jdbc.thin;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import org.apache.ignite.jdbc.thin.JdbcThinUpdateStatementSelfTest;
+
+/**
+ *
+ */
+public class JdbcThinUpdateStatementSkipReducerOnUpdateSelfTest extends JdbcThinUpdateStatementSelfTest {
+ /** {@inheritDoc} */
+ @Override protected Connection createConnection() throws SQLException {
+ return DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1?skipReducerOnUpdate=true");
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
----------------------------------------------------------------------
diff --git a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
index 99ec08a..3ea0c81 100644
--- a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
+++ b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
@@ -235,6 +235,8 @@ public class MessageCodeGenerator {
// gen.generateAndWrite(GridH2RowMessage.class);
// gen.generateAndWrite(GridCacheVersion.class);
// gen.generateAndWrite(GridCacheVersionEx.class);
+// gen.generateAndWrite(GridH2DmlRequest.class);
+// gen.generateAndWrite(GridH2DmlResponse.class);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/core/src/main/java/org/apache/ignite/IgniteJdbcDriver.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteJdbcDriver.java b/modules/core/src/main/java/org/apache/ignite/IgniteJdbcDriver.java
index b03e387..ea9b7f8 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteJdbcDriver.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteJdbcDriver.java
@@ -334,6 +334,9 @@ public class IgniteJdbcDriver implements Driver {
/** Allow queries with multiple statements. */
private static final String PARAM_MULTIPLE_STMTS = "multipleStatementsAllowed";
+ /** Skip reducer on update property name. */
+ private static final String PARAM_SKIP_REDUCER_ON_UPDATE = "skipReducerOnUpdate";
+
/** Hostname property name. */
public static final String PROP_HOST = PROP_PREFIX + "host";
@@ -382,6 +385,9 @@ public class IgniteJdbcDriver implements Driver {
/** Allow query with multiple statements. */
public static final String PROP_MULTIPLE_STMTS = PROP_PREFIX + PARAM_MULTIPLE_STMTS;
+ /** Skip reducer on update update property name. */
+ public static final String PROP_SKIP_REDUCER_ON_UPDATE = PROP_PREFIX + PARAM_SKIP_REDUCER_ON_UPDATE;
+
/** Cache name property name. */
public static final String PROP_CFG = PROP_PREFIX + "cfg";
@@ -454,7 +460,8 @@ public class IgniteJdbcDriver implements Driver {
new JdbcDriverPropertyInfo("Enforce Join Order", info.getProperty(JdbcThinUtils.PROP_ENFORCE_JOIN_ORDER), ""),
new JdbcDriverPropertyInfo("Lazy query execution", info.getProperty(JdbcThinUtils.PROP_LAZY), ""),
new JdbcDriverPropertyInfo("Transactions Allowed", info.getProperty(PROP_TX_ALLOWED), ""),
- new JdbcDriverPropertyInfo("Queries with multiple statements allowed", info.getProperty(PROP_MULTIPLE_STMTS), "")
+ new JdbcDriverPropertyInfo("Queries with multiple statements allowed", info.getProperty(PROP_MULTIPLE_STMTS), ""),
+ new JdbcDriverPropertyInfo("Skip reducer on update", info.getProperty(PROP_SKIP_REDUCER_ON_UPDATE), "")
);
if (info.getProperty(PROP_CFG) != null)
http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/core/src/main/java/org/apache/ignite/IgniteJdbcThinDriver.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteJdbcThinDriver.java b/modules/core/src/main/java/org/apache/ignite/IgniteJdbcThinDriver.java
index 8085ed4..a313f92 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteJdbcThinDriver.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteJdbcThinDriver.java
@@ -186,7 +186,8 @@ public class IgniteJdbcThinDriver implements Driver {
new JdbcDriverPropertyInfo("Enforce Join Order", info.getProperty(JdbcThinUtils.PROP_ENFORCE_JOIN_ORDER), ""),
new JdbcDriverPropertyInfo("Collocated", info.getProperty(JdbcThinUtils.PROP_COLLOCATED), ""),
new JdbcDriverPropertyInfo("Replicated only", info.getProperty(JdbcThinUtils.PROP_REPLICATED_ONLY), ""),
- new JdbcDriverPropertyInfo("Lazy query execution flag", info.getProperty(JdbcThinUtils.PROP_LAZY),"")
+ new JdbcDriverPropertyInfo("Lazy query execution flag", info.getProperty(JdbcThinUtils.PROP_LAZY),""),
+ new JdbcDriverPropertyInfo("Skip reducer on update", info.getProperty(JdbcThinUtils.PROP_SKIP_REDUCER_ON_UPDATE),"")
);
return props.toArray(new DriverPropertyInfo[0]);
http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java
index 2d128d1..4e12b8c 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java
@@ -369,6 +369,13 @@ public class SqlFieldsQuery extends Query<List<?>> {
return this;
}
+ /**
+ * @return Copy of this query.
+ */
+ public SqlFieldsQuery copy() {
+ return new SqlFieldsQuery(this);
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(SqlFieldsQuery.class, this);
http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java
index 5afed4e..57b25e1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java
@@ -62,6 +62,7 @@ import static org.apache.ignite.internal.jdbc.thin.JdbcThinUtils.PROP_REPLICATED
import static org.apache.ignite.internal.jdbc.thin.JdbcThinUtils.PROP_SOCK_RCV_BUF;
import static org.apache.ignite.internal.jdbc.thin.JdbcThinUtils.PROP_SOCK_SND_BUF;
import static org.apache.ignite.internal.jdbc.thin.JdbcThinUtils.PROP_TCP_NO_DELAY;
+import static org.apache.ignite.internal.jdbc.thin.JdbcThinUtils.PROP_SKIP_REDUCER_ON_UPDATE;
/**
* JDBC connection implementation.
@@ -136,10 +137,11 @@ public class JdbcThinConnection implements Connection {
int sockRcvBuf = extractIntNonNegative(props, PROP_SOCK_RCV_BUF, 0);
boolean tcpNoDelay = extractBoolean(props, PROP_TCP_NO_DELAY, true);
+ boolean skipReducerOnUpdate = extractBoolean(props, PROP_SKIP_REDUCER_ON_UPDATE, false);
try {
cliIo = new JdbcThinTcpIo(host, port, distributedJoins, enforceJoinOrder, collocated, replicatedOnly,
- autoCloseServerCursor, lazyExec, sockSndBuf, sockRcvBuf, tcpNoDelay);
+ autoCloseServerCursor, lazyExec, sockSndBuf, sockRcvBuf, tcpNoDelay, skipReducerOnUpdate);
cliIo.start();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java
index 9e12fbf..0670fb1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java
@@ -100,6 +100,9 @@ public class JdbcThinTcpIo {
/** Flag to automatically close server cursor. */
private final boolean autoCloseServerCursor;
+ /** Executes update queries on server nodes. */
+ private final boolean skipReducerOnUpdate;
+
/** Socket send buffer. */
private final int sockSndBuf;
@@ -138,10 +141,11 @@ public class JdbcThinTcpIo {
* @param sockSndBuf Socket send buffer.
* @param sockRcvBuf Socket receive buffer.
* @param tcpNoDelay TCP no delay flag.
+ * @param skipReducerOnUpdate Executes update queries on ignite server nodes.
*/
JdbcThinTcpIo(String host, int port, boolean distributedJoins, boolean enforceJoinOrder, boolean collocated,
boolean replicatedOnly, boolean autoCloseServerCursor, boolean lazy, int sockSndBuf, int sockRcvBuf,
- boolean tcpNoDelay) {
+ boolean tcpNoDelay, boolean skipReducerOnUpdate) {
this.host = host;
this.port = port;
this.distributedJoins = distributedJoins;
@@ -153,6 +157,7 @@ public class JdbcThinTcpIo {
this.sockSndBuf = sockSndBuf;
this.sockRcvBuf = sockRcvBuf;
this.tcpNoDelay = tcpNoDelay;
+ this.skipReducerOnUpdate = skipReducerOnUpdate;
}
/**
@@ -211,6 +216,7 @@ public class JdbcThinTcpIo {
writer.writeBoolean(replicatedOnly);
writer.writeBoolean(autoCloseServerCursor);
writer.writeBoolean(lazy);
+ writer.writeBoolean(skipReducerOnUpdate);
send(writer.array());
@@ -491,4 +497,11 @@ public class JdbcThinTcpIo {
public boolean lazy() {
return lazy;
}
+
+ /**
+ * @return Server side update flag.
+ */
+ public boolean skipReducerOnUpdate() {
+ return skipReducerOnUpdate;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinUtils.java
index 52b3abc..c9bf61c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinUtils.java
@@ -81,6 +81,9 @@ public class JdbcThinUtils {
/** Parameter: Automatically close server cursor. */
public static final String PARAM_AUTO_CLOSE_SERVER_CURSOR = "autoCloseServerCursor";
+ /** Parameter: execute update query in distributed mode on ignite server nodes. */
+ public static final String PARAM_SKIP_REDUCER_ON_UPDATE = "skipReducerOnUpdate";
+
/** Distributed joins property name. */
public static final String PROP_DISTRIBUTED_JOINS = PROP_PREFIX + PARAM_DISTRIBUTED_JOINS;
@@ -108,6 +111,9 @@ public class JdbcThinUtils {
/** Automatically close server cursor. */
public static final String PROP_AUTO_CLOSE_SERVER_CURSORS = PROP_PREFIX + PARAM_AUTO_CLOSE_SERVER_CURSOR;
+ /** Executes update queries on ignite server nodes in distributed mode. */
+ public static final String PROP_SKIP_REDUCER_ON_UPDATE = PROP_PREFIX + PARAM_SKIP_REDUCER_ON_UPDATE;
+
/** Default port. */
public static final int DFLT_PORT = ClientConnectorConfiguration.DFLT_PORT;
http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcBatchUpdateTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcBatchUpdateTask.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcBatchUpdateTask.java
index e4916f7..774f922 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcBatchUpdateTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcBatchUpdateTask.java
@@ -29,6 +29,7 @@ import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
+import org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.lang.IgniteCallable;
import org.apache.ignite.resources.IgniteInstanceResource;
@@ -162,7 +163,7 @@ class JdbcBatchUpdateTask implements IgniteCallable<int[]> {
* @throws SQLException If failed.
*/
private Integer doSingleUpdate(IgniteCache<?, ?> cache, String sqlText, List<Object> args) throws SQLException {
- SqlFieldsQuery qry = new JdbcSqlFieldsQuery(sqlText, false);
+ SqlFieldsQuery qry = new SqlFieldsQueryEx(sqlText, false);
qry.setPageSize(fetchSize);
qry.setLocal(locQry);
http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java
index ccc09ec..29cb6a1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java
@@ -82,12 +82,13 @@ import static org.apache.ignite.IgniteJdbcDriver.PROP_LAZY;
import static org.apache.ignite.IgniteJdbcDriver.PROP_LOCAL;
import static org.apache.ignite.IgniteJdbcDriver.PROP_MULTIPLE_STMTS;
import static org.apache.ignite.IgniteJdbcDriver.PROP_NODE_ID;
-import static org.apache.ignite.IgniteJdbcDriver.PROP_TX_ALLOWED;
import static org.apache.ignite.IgniteJdbcDriver.PROP_STREAMING;
import static org.apache.ignite.IgniteJdbcDriver.PROP_STREAMING_ALLOW_OVERWRITE;
import static org.apache.ignite.IgniteJdbcDriver.PROP_STREAMING_FLUSH_FREQ;
import static org.apache.ignite.IgniteJdbcDriver.PROP_STREAMING_PER_NODE_BUF_SIZE;
import static org.apache.ignite.IgniteJdbcDriver.PROP_STREAMING_PER_NODE_PAR_OPS;
+import static org.apache.ignite.IgniteJdbcDriver.PROP_TX_ALLOWED;
+import static org.apache.ignite.IgniteJdbcDriver.PROP_SKIP_REDUCER_ON_UPDATE;
import static org.apache.ignite.internal.jdbc2.JdbcUtils.convertToSqlException;
import static org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode.createJdbcSqlException;
@@ -168,6 +169,9 @@ public class JdbcConnection implements Connection {
/** Allow queries with multiple statements. */
private final boolean multipleStmts;
+ /** Skip reducer on update flag. */
+ private final boolean skipReducerOnUpdate;
+
/** Statements. */
final Set<JdbcStatement> statements = new HashSet<>();
@@ -209,6 +213,7 @@ public class JdbcConnection implements Connection {
streamNodeParOps = Integer.parseInt(props.getProperty(PROP_STREAMING_PER_NODE_PAR_OPS, "0"));
multipleStmts = Boolean.parseBoolean(props.getProperty(PROP_MULTIPLE_STMTS));
+ skipReducerOnUpdate = Boolean.parseBoolean(props.getProperty(PROP_SKIP_REDUCER_ON_UPDATE));
String nodeIdProp = props.getProperty(PROP_NODE_ID);
@@ -854,6 +859,13 @@ public class JdbcConnection implements Connection {
}
/**
+ * @return {@code true} if update on server is enabled, {@code false} otherwise.
+ */
+ boolean skipReducerOnUpdate() {
+ return skipReducerOnUpdate;
+ }
+
+ /**
* @return Local query flag.
*/
boolean isLocalQuery() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcQueryMultipleStatementsTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcQueryMultipleStatementsTask.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcQueryMultipleStatementsTask.java
index bf7c24e..f907525 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcQueryMultipleStatementsTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcQueryMultipleStatementsTask.java
@@ -27,6 +27,7 @@ import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
+import org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgniteCallable;
import org.apache.ignite.resources.IgniteInstanceResource;
@@ -109,7 +110,7 @@ class JdbcQueryMultipleStatementsTask implements IgniteCallable<List<JdbcStateme
/** {@inheritDoc} */
@Override public List<JdbcStatementResultInfo> call() throws Exception {
- SqlFieldsQuery qry = (isQry != null ? new JdbcSqlFieldsQuery(sql, isQry) : new SqlFieldsQuery(sql))
+ SqlFieldsQuery qry = (isQry != null ? new SqlFieldsQueryEx(sql, isQry) : new SqlFieldsQuery(sql))
.setArgs(args);
qry.setPageSize(fetchSize);
http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcQueryTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcQueryTask.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcQueryTask.java
index ecbfb71..aa9f009 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcQueryTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcQueryTask.java
@@ -36,6 +36,7 @@ import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
+import org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx;
import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
import org.apache.ignite.internal.util.typedef.CAX;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -156,7 +157,7 @@ class JdbcQueryTask implements IgniteCallable<JdbcQueryTaskResult> {
throw new SQLException("Cache not found [cacheName=" + cacheName + ']');
}
- SqlFieldsQuery qry = (isQry != null ? new JdbcSqlFieldsQuery(sql, isQry) : new SqlFieldsQuery(sql))
+ SqlFieldsQuery qry = (isQry != null ? new SqlFieldsQueryEx(sql, isQry) : new SqlFieldsQuery(sql))
.setArgs(args);
qry.setPageSize(fetchSize);
@@ -241,6 +242,13 @@ class JdbcQueryTask implements IgniteCallable<JdbcQueryTaskResult> {
}
/**
+ * @return Flag to update enable server side updates.
+ */
+ protected boolean skipReducerOnUpdate() {
+ return false;
+ }
+
+ /**
* Schedules removal of stored cursor in case of remote query execution.
*
* @param uuid Cursor UUID.
http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcQueryTaskV3.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcQueryTaskV3.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcQueryTaskV3.java
index cb2d452..f002d87 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcQueryTaskV3.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcQueryTaskV3.java
@@ -30,6 +30,9 @@ class JdbcQueryTaskV3 extends JdbcQueryTaskV2 {
/** Update metadata on demand flag. */
private final boolean updateMeta;
+ /** Update metadata on demand flag. */
+ private final boolean skipReducerOnUpdate;
+
/**
* @param ignite Ignite.
* @param cacheName Cache name.
@@ -46,14 +49,16 @@ class JdbcQueryTaskV3 extends JdbcQueryTaskV2 {
* @param enforceJoinOrder Enforce joins order flag.
* @param lazy Lazy query execution flag.
* @param updateMeta Update metadata on demand.
+ * @param skipReducerOnUpdate Flkag to enable server side updates.
*/
public JdbcQueryTaskV3(Ignite ignite, String cacheName, String schemaName, String sql, Boolean isQry, boolean loc,
Object[] args, int fetchSize, UUID uuid, boolean locQry, boolean collocatedQry, boolean distributedJoins,
- boolean enforceJoinOrder, boolean lazy, boolean updateMeta) {
+ boolean enforceJoinOrder, boolean lazy, boolean updateMeta, boolean skipReducerOnUpdate) {
super(ignite, cacheName, schemaName, sql, isQry, loc, args, fetchSize, uuid, locQry,
collocatedQry, distributedJoins, enforceJoinOrder, lazy);
this.updateMeta = updateMeta;
+ this.skipReducerOnUpdate = skipReducerOnUpdate;
}
/** {@inheritDoc} */
@@ -61,6 +66,11 @@ class JdbcQueryTaskV3 extends JdbcQueryTaskV2 {
return updateMeta;
}
+ /** {@inheritDoc} */
+ @Override protected boolean skipReducerOnUpdate() {
+ return skipReducerOnUpdate;
+ }
+
/**
* @param ignite Ignite.
* @param cacheName Cache name.
@@ -77,16 +87,17 @@ class JdbcQueryTaskV3 extends JdbcQueryTaskV2 {
* @param enforceJoinOrder Enforce joins order flag.
* @param lazy Lazy query execution flag.
* @param updateMeta Update metadata on demand.
+ * @param skipReducerOnUpdate Update on server flag.
* @return Appropriate task JdbcQueryTask or JdbcQueryTaskV2.
*/
public static JdbcQueryTask createTask(Ignite ignite, String cacheName, String schemaName, String sql,
Boolean isQry, boolean loc, Object[] args, int fetchSize, UUID uuid, boolean locQry,
boolean collocatedQry, boolean distributedJoins,
- boolean enforceJoinOrder, boolean lazy, boolean updateMeta) {
+ boolean enforceJoinOrder, boolean lazy, boolean updateMeta, boolean skipReducerOnUpdate) {
- if (updateMeta)
+ if (updateMeta || skipReducerOnUpdate)
return new JdbcQueryTaskV3(ignite, cacheName, schemaName, sql, isQry, loc, args, fetchSize,
- uuid, locQry, collocatedQry, distributedJoins, enforceJoinOrder, lazy, true);
+ uuid, locQry, collocatedQry, distributedJoins, enforceJoinOrder, lazy, updateMeta, skipReducerOnUpdate);
else
return JdbcQueryTaskV2.createTask(ignite, cacheName, schemaName, sql, isQry, loc, args, fetchSize,
uuid, locQry, collocatedQry, distributedJoins, enforceJoinOrder, lazy);
http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcResultSet.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcResultSet.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcResultSet.java
index 69d4252..e2ff5d8 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcResultSet.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcResultSet.java
@@ -205,7 +205,7 @@ public class JdbcResultSet implements ResultSet {
// Connections from new clients send queries with new tasks, so we have to continue in the same manner
JdbcQueryTask qryTask = JdbcQueryTaskV3.createTask(loc ? ignite : null, conn.cacheName(), conn.schemaName(),
null,true, loc, null, fetchSize, uuid, conn.isLocalQuery(), conn.isCollocatedQuery(),
- conn.isDistributedJoins(), conn.isEnforceJoinOrder(), conn.isLazy(), updateMetadata);
+ conn.isDistributedJoins(), conn.isEnforceJoinOrder(), conn.isLazy(), updateMetadata, false);
try {
JdbcQueryTaskResult res =
http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcSqlFieldsQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcSqlFieldsQuery.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcSqlFieldsQuery.java
deleted file mode 100644
index d8b9a26..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcSqlFieldsQuery.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.jdbc2;
-
-import java.util.concurrent.TimeUnit;
-import org.apache.ignite.cache.query.SqlFieldsQuery;
-
-/**
- * {@link SqlFieldsQuery} with JDBC flavor - it has additional flag indicating whether JDBC driver expects
- * this query to return a result set or an update counter. This class is not intended for use outside JDBC driver.
- */
-public final class JdbcSqlFieldsQuery extends SqlFieldsQuery {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Flag set by JDBC driver to enforce checks for correct operation type. */
- private final boolean isQry;
-
- /**
- * @param sql SQL query.
- * @param isQry Flag indicating whether this object denotes a query or an update operation.
- */
- public JdbcSqlFieldsQuery(String sql, boolean isQry) {
- super(sql);
- this.isQry = isQry;
- }
-
- /**
- * @return Flag indicating whether this object denotes a query or an update operation..
- */
- public boolean isQuery() {
- return isQry;
- }
-
- /** {@inheritDoc} */
- @Override public JdbcSqlFieldsQuery setSql(String sql) {
- super.setSql(sql);
-
- return this;
- }
-
- /** {@inheritDoc} */
- @Override public JdbcSqlFieldsQuery setArgs(Object... args) {
- super.setArgs(args);
-
- return this;
- }
-
- /** {@inheritDoc} */
- @Override public JdbcSqlFieldsQuery setTimeout(int timeout, TimeUnit timeUnit) {
- super.setTimeout(timeout, timeUnit);
-
- return this;
- }
-
- /** {@inheritDoc} */
- @Override public JdbcSqlFieldsQuery setCollocated(boolean collocated) {
- super.setCollocated(collocated);
-
- return this;
- }
-
- /** {@inheritDoc} */
- @Override public JdbcSqlFieldsQuery setEnforceJoinOrder(boolean enforceJoinOrder) {
- super.setEnforceJoinOrder(enforceJoinOrder);
-
- return this;
- }
-
- /** {@inheritDoc} */
- @Override public JdbcSqlFieldsQuery setDistributedJoins(boolean distributedJoins) {
- super.setDistributedJoins(distributedJoins);
-
- return this;
- }
-
- /** {@inheritDoc} */
- @Override public JdbcSqlFieldsQuery setPageSize(int pageSize) {
- super.setPageSize(pageSize);
-
- return this;
- }
-
- /** {@inheritDoc} */
- @Override public JdbcSqlFieldsQuery setLocal(boolean loc) {
- super.setLocal(loc);
-
- return this;
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStatement.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStatement.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStatement.java
index acac123..2498456 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStatement.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStatement.java
@@ -161,9 +161,9 @@ public class JdbcStatement implements Statement {
else
isQuery = true;
- JdbcQueryTask qryTask = JdbcQueryTaskV2.createTask(loc ? ignite : null, conn.cacheName(), conn.schemaName(),
+ JdbcQueryTask qryTask = JdbcQueryTaskV3.createTask(loc ? ignite : null, conn.cacheName(), conn.schemaName(),
sql, isQuery, loc, getArgs(), fetchSize, uuid, conn.isLocalQuery(), conn.isCollocatedQuery(),
- conn.isDistributedJoins(), conn.isEnforceJoinOrder(), conn.isLazy());
+ conn.isDistributedJoins(), conn.isEnforceJoinOrder(), conn.isLazy(), false, conn.skipReducerOnUpdate());
try {
JdbcQueryTaskResult qryRes =
http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java
index d3746f3..f38c5b2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java
@@ -74,6 +74,11 @@ public class GridCacheSqlQuery implements Message {
@GridDirectTransient
private transient Object[] derivedPartitions;
+ /** Flag indicating that query contains sub-queries. */
+ @GridToStringInclude
+ @GridDirectTransient
+ private transient boolean hasSubQries;
+
/**
* For {@link Message}.
*/
@@ -259,6 +264,7 @@ public class GridCacheSqlQuery implements Message {
cp.sort = sort;
cp.partitioned = partitioned;
cp.derivedPartitions = derivedPartitions;
+ cp.hasSubQries = hasSubQries;
return cp;
}
@@ -347,4 +353,22 @@ public class GridCacheSqlQuery implements Message {
return this;
}
+
+ /**
+ * @return {@code true} if query contains sub-queries.
+ */
+ public boolean hasSubQueries() {
+ return hasSubQries;
+ }
+
+ /**
+ * @param hasSubQries Flag indicating that query contains sub-queries.
+ *
+ * @return {@code this}.
+ */
+ public GridCacheSqlQuery hasSubQueries(boolean hasSubQries) {
+ this.hasSubQries = hasSubQries;
+
+ return this;
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/SqlFieldsQueryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/SqlFieldsQueryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/SqlFieldsQueryEx.java
new file mode 100644
index 0000000..c5f786e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/SqlFieldsQueryEx.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+
+/**
+ * {@link SqlFieldsQuery} with experimental and internal features.
+ */
+public final class SqlFieldsQueryEx extends SqlFieldsQuery {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Flag to enforce checks for correct operation type. */
+ private final Boolean isQry;
+
+ /** Whether server side DML should be enabled. */
+ private boolean skipReducerOnUpdate;
+
+ /**
+ * @param sql SQL query.
+ * @param isQry Flag indicating whether this object denotes a query or an update operation.
+ */
+ public SqlFieldsQueryEx(String sql, Boolean isQry) {
+ super(sql);
+ this.isQry = isQry;
+ }
+
+ /**
+ * @param qry SQL query.
+ */
+ private SqlFieldsQueryEx(SqlFieldsQueryEx qry) {
+ super(qry);
+
+ this.isQry = qry.isQry;
+ this.skipReducerOnUpdate = qry.skipReducerOnUpdate;
+ }
+
+ /**
+ * @return Flag indicating whether this object denotes a query or an update operation.
+ */
+ public Boolean isQuery() {
+ return isQry;
+ }
+
+ /** {@inheritDoc} */
+ @Override public SqlFieldsQueryEx setSql(String sql) {
+ super.setSql(sql);
+
+ return this;
+ }
+
+ /** {@inheritDoc} */
+ @Override public SqlFieldsQueryEx setArgs(Object... args) {
+ super.setArgs(args);
+
+ return this;
+ }
+
+ /** {@inheritDoc} */
+ @Override public SqlFieldsQueryEx setTimeout(int timeout, TimeUnit timeUnit) {
+ super.setTimeout(timeout, timeUnit);
+
+ return this;
+ }
+
+ /** {@inheritDoc} */
+ @Override public SqlFieldsQueryEx setCollocated(boolean collocated) {
+ super.setCollocated(collocated);
+
+ return this;
+ }
+
+ /** {@inheritDoc} */
+ @Override public SqlFieldsQueryEx setEnforceJoinOrder(boolean enforceJoinOrder) {
+ super.setEnforceJoinOrder(enforceJoinOrder);
+
+ return this;
+ }
+
+ /** {@inheritDoc} */
+ @Override public SqlFieldsQueryEx setDistributedJoins(boolean distributedJoins) {
+ super.setDistributedJoins(distributedJoins);
+
+ return this;
+ }
+
+ /** {@inheritDoc} */
+ @Override public SqlFieldsQueryEx setPageSize(int pageSize) {
+ super.setPageSize(pageSize);
+
+ return this;
+ }
+
+ /** {@inheritDoc} */
+ @Override public SqlFieldsQueryEx setLocal(boolean loc) {
+ super.setLocal(loc);
+
+ return this;
+ }
+
+ /**
+ * Sets server side update flag.
+ * <p>
+ * By default, when processing DML command, Ignite first fetches all affected intermediate rows for analysis to the
+ * node which initiated the query and only then forms batches of updated values to be sent to remote nodes.
+ * For simple DML commands (that however affect great deal of rows) such approach may be an overkill in terms of
+ * network delays and memory usage on initiating node. Use this flag as hint for Ignite to do all intermediate rows
+ * analysis and updates in place on corresponding remote data nodes.
+ * <p>
+ * There are limitations to what DML command can be optimized this way. The command containing LIMIT, OFFSET,
+ * DISTINCT, ORDER BY, GROUP BY, sub-query or UNION will be processed the usual way despite this flag setting.
+ * <p>
+ * Defaults to {@code false}, meaning that intermediate results will be fetched to initiating node first.
+ * Only affects DML commands. Ignored when {@link #isLocal()} is {@code true}.
+ * Note that when set to {@code true}, the query may fail in the case of even single node failure.
+ *
+ * @param skipReducerOnUpdate Server side update flag.
+ * @return {@code this} For chaining.
+ */
+ public SqlFieldsQuery setSkipReducerOnUpdate(boolean skipReducerOnUpdate) {
+ this.skipReducerOnUpdate = skipReducerOnUpdate;
+
+ return this;
+ }
+
+ /**
+ * Gets server side update flag.
+ * <p>
+ * See {@link #setSkipReducerOnUpdate(boolean)} for more information.
+ *
+ * @return Server side update flag.
+ */
+ public boolean isSkipReducerOnUpdate() {
+ return skipReducerOnUpdate;
+ }
+
+ /** {@inheritDoc} */
+ @Override public SqlFieldsQuery copy() {
+ return new SqlFieldsQueryEx(this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java
index a6a7aa5..7b40466 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java
@@ -104,8 +104,13 @@ public class JdbcConnectionContext implements ClientListenerConnectionContext {
if (ver.compareTo(VER_2_1_5) >= 0)
lazyExec = reader.readBoolean();
+ boolean skipReducerOnUpdate = false;
+
+ if (ver.compareTo(VER_2_3_0) >= 0)
+ skipReducerOnUpdate = reader.readBoolean();
+
handler = new JdbcRequestHandler(ctx, busyLock, maxCursors, distributedJoins, enforceJoinOrder,
- collocated, replicatedOnly, autoCloseCursors, lazyExec, ver);
+ collocated, replicatedOnly, autoCloseCursors, lazyExec, skipReducerOnUpdate, ver);
parser = new JdbcMessageParser(ctx);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java
index 166402f..e3b6f5b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java
@@ -35,7 +35,7 @@ import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteVersionUtils;
import org.apache.ignite.internal.binary.BinaryWriterExImpl;
-import org.apache.ignite.internal.jdbc2.JdbcSqlFieldsQuery;
+import org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx;
import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
import org.apache.ignite.internal.processors.odbc.ClientListenerProtocolVersion;
@@ -103,6 +103,9 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler {
/** Lazy query execution flag. */
private final boolean lazy;
+ /** Skip reducer on update flag. */
+ private final boolean skipReducerOnUpdate;
+
/** Automatic close of cursors. */
private final boolean autoCloseCursors;
@@ -121,11 +124,13 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler {
* @param replicatedOnly Replicated only flag.
* @param autoCloseCursors Flag to automatically close server cursors.
* @param lazy Lazy query execution flag.
+ * @param skipReducerOnUpdate Skip reducer on update flag.
* @param protocolVer Protocol version.
*/
public JdbcRequestHandler(GridKernalContext ctx, GridSpinBusyLock busyLock, int maxCursors,
boolean distributedJoins, boolean enforceJoinOrder, boolean collocated, boolean replicatedOnly,
- boolean autoCloseCursors, boolean lazy, ClientListenerProtocolVersion protocolVer) {
+ boolean autoCloseCursors, boolean lazy, boolean skipReducerOnUpdate,
+ ClientListenerProtocolVersion protocolVer) {
this.ctx = ctx;
this.busyLock = busyLock;
this.maxCursors = maxCursors;
@@ -135,6 +140,7 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler {
this.replicatedOnly = replicatedOnly;
this.autoCloseCursors = autoCloseCursors;
this.lazy = lazy;
+ this.skipReducerOnUpdate = skipReducerOnUpdate;
this.protocolVer = protocolVer;
log = ctx.log(getClass());
@@ -263,14 +269,17 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler {
break;
case SELECT_STATEMENT_TYPE:
- qry = new JdbcSqlFieldsQuery(sql, true);
+ qry = new SqlFieldsQueryEx(sql, true);
break;
default:
assert req.expectedStatementType() == JdbcStatementType.UPDATE_STMT_TYPE;
- qry = new JdbcSqlFieldsQuery(sql, false);
+ qry = new SqlFieldsQueryEx(sql, false);
+
+ if (skipReducerOnUpdate)
+ ((SqlFieldsQueryEx)qry).setSkipReducerOnUpdate(true);
}
qry.setArgs(req.arguments());
@@ -476,7 +485,7 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler {
if (q.sql() != null)
sql = q.sql();
- SqlFieldsQuery qry = new JdbcSqlFieldsQuery(sql, false);
+ SqlFieldsQuery qry = new SqlFieldsQueryEx(sql, false);
qry.setArgs(q.args());
http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcConnectionContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcConnectionContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcConnectionContext.java
index a4af478..88a2e0f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcConnectionContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcConnectionContext.java
@@ -37,8 +37,11 @@ public class OdbcConnectionContext implements ClientListenerConnectionContext {
/** Version 2.1.5: added "lazy" flag. */
public static final ClientListenerProtocolVersion VER_2_1_5 = ClientListenerProtocolVersion.create(2, 1, 5);
+ /** Version 2.3.0: added "skipReducerOnUpdate" flag. */
+ public static final ClientListenerProtocolVersion VER_2_3_0 = ClientListenerProtocolVersion.create(2, 3, 0);
+
/** Current version. */
- private static final ClientListenerProtocolVersion CURRENT_VER = VER_2_1_5;
+ private static final ClientListenerProtocolVersion CURRENT_VER = VER_2_3_0;
/** Supported versions. */
private static final Set<ClientListenerProtocolVersion> SUPPORTED_VERS = new HashSet<>();
@@ -60,6 +63,7 @@ public class OdbcConnectionContext implements ClientListenerConnectionContext {
static {
SUPPORTED_VERS.add(CURRENT_VER);
+ SUPPORTED_VERS.add(VER_2_1_5);
SUPPORTED_VERS.add(VER_2_1_0);
}
@@ -98,8 +102,13 @@ public class OdbcConnectionContext implements ClientListenerConnectionContext {
if (ver.compareTo(VER_2_1_5) >= 0)
lazy = reader.readBoolean();
+ boolean skipReducerOnUpdate = false;
+
+ if (ver.compareTo(VER_2_3_0) >= 0)
+ skipReducerOnUpdate = reader.readBoolean();
+
handler = new OdbcRequestHandler(ctx, busyLock, maxCursors, distributedJoins,
- enforceJoinOrder, replicatedOnly, collocated, lazy);
+ enforceJoinOrder, replicatedOnly, collocated, lazy, skipReducerOnUpdate);
parser = new OdbcMessageParser(ctx, ver);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcRequestHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcRequestHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcRequestHandler.java
index 07b41f3..32375fd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcRequestHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcRequestHandler.java
@@ -34,6 +34,7 @@ import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.binary.BinaryWriterExImpl;
import org.apache.ignite.internal.binary.GridBinaryMarshaller;
+import org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx;
import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
import org.apache.ignite.internal.processors.odbc.ClientListenerRequest;
@@ -43,7 +44,6 @@ import org.apache.ignite.internal.processors.odbc.odbc.escape.OdbcEscapeUtils;
import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
import org.apache.ignite.internal.processors.query.GridQueryIndexing;
import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
-import org.apache.ignite.internal.processors.query.IgniteSQLException;
import org.apache.ignite.internal.util.GridSpinBusyLock;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -94,6 +94,9 @@ public class OdbcRequestHandler implements ClientListenerRequestHandler {
/** Lazy flag. */
private final boolean lazy;
+ /** Update on server flag. */
+ private final boolean skipReducerOnUpdate;
+
/**
* Constructor.
* @param ctx Context.
@@ -104,10 +107,11 @@ public class OdbcRequestHandler implements ClientListenerRequestHandler {
* @param replicatedOnly Replicated only flag.
* @param collocated Collocated flag.
* @param lazy Lazy flag.
+ * @param skipReducerOnUpdate Skip reducer on update flag.
*/
public OdbcRequestHandler(GridKernalContext ctx, GridSpinBusyLock busyLock, int maxCursors,
boolean distributedJoins, boolean enforceJoinOrder, boolean replicatedOnly,
- boolean collocated, boolean lazy) {
+ boolean collocated, boolean lazy, boolean skipReducerOnUpdate) {
this.ctx = ctx;
this.busyLock = busyLock;
this.maxCursors = maxCursors;
@@ -116,6 +120,7 @@ public class OdbcRequestHandler implements ClientListenerRequestHandler {
this.replicatedOnly = replicatedOnly;
this.collocated = collocated;
this.lazy = lazy;
+ this.skipReducerOnUpdate = skipReducerOnUpdate;
log = ctx.log(getClass());
}
@@ -196,8 +201,8 @@ public class OdbcRequestHandler implements ClientListenerRequestHandler {
* @param args Arguments.
* @return Query instance.
*/
- private SqlFieldsQuery makeQuery(String schema, String sql, Object[] args) {
- SqlFieldsQuery qry = new SqlFieldsQuery(sql);
+ private SqlFieldsQueryEx makeQuery(String schema, String sql, Object[] args) {
+ SqlFieldsQueryEx qry = new SqlFieldsQueryEx(sql, null);
qry.setArgs(args);
@@ -207,6 +212,7 @@ public class OdbcRequestHandler implements ClientListenerRequestHandler {
qry.setCollocated(collocated);
qry.setLazy(lazy);
qry.setSchema(schema);
+ qry.setSkipReducerOnUpdate(skipReducerOnUpdate);
return qry;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/core/src/main/resources/META-INF/classnames.properties
----------------------------------------------------------------------
diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties
index 2703e6d..2f795df 100644
--- a/modules/core/src/main/resources/META-INF/classnames.properties
+++ b/modules/core/src/main/resources/META-INF/classnames.properties
@@ -310,7 +310,7 @@ org.apache.ignite.internal.jdbc2.JdbcDatabaseMetadata$UpdateMetadataTask
org.apache.ignite.internal.jdbc2.JdbcQueryTask
org.apache.ignite.internal.jdbc2.JdbcQueryTask$1
org.apache.ignite.internal.jdbc2.JdbcQueryTask$QueryResult
-org.apache.ignite.internal.jdbc2.JdbcSqlFieldsQuery
+org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx
org.apache.ignite.internal.managers.GridManagerAdapter$1$1
org.apache.ignite.internal.managers.checkpoint.GridCheckpointManager$CheckpointSet
org.apache.ignite.internal.managers.checkpoint.GridCheckpointRequest
@@ -2094,4 +2094,4 @@ org.apache.ignite.transactions.TransactionRollbackException
org.apache.ignite.transactions.TransactionState
org.apache.ignite.transactions.TransactionTimeoutException
org.apache.ignite.util.AttributeNodeFilter
-org.apache.ignite.internal.processors.cache.persistence.file.AsyncFileIO
\ No newline at end of file
+org.apache.ignite.internal.processors.cache.persistence.file.AsyncFileIO
[5/7] ignite git commit: IGNITE-6024: SQL: Implemented
"skipReducerOnUpdate" flag. This closes #2488.
Posted by sb...@apache.org.
http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
index 98117b2..9e55442 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.query.h2;
import java.lang.reflect.Array;
+import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Time;
@@ -150,7 +151,8 @@ public class DmlStatementsProcessor {
* Execute DML statement, possibly with few re-attempts in case of concurrent data modifications.
*
* @param schemaName Schema.
- * @param prepared Prepared JDBC statement.
+ * @param conn Connection.
+ * @param prepared Prepared statement.
* @param fieldsQry Original query.
* @param loc Query locality flag.
* @param filters Cache name and key filter.
@@ -158,13 +160,14 @@ public class DmlStatementsProcessor {
* @return Update result (modified items count and failed keys).
* @throws IgniteCheckedException if failed.
*/
- private UpdateResult updateSqlFields(String schemaName, Prepared prepared, SqlFieldsQuery fieldsQry,
- boolean loc, IndexingQueryFilter filters, GridQueryCancel cancel) throws IgniteCheckedException {
+ private UpdateResult updateSqlFields(String schemaName, Connection conn, Prepared prepared,
+ SqlFieldsQuery fieldsQry, boolean loc, IndexingQueryFilter filters, GridQueryCancel cancel)
+ throws IgniteCheckedException {
Object[] errKeys = null;
long items = 0;
- UpdatePlan plan = getPlanForStatement(schemaName, prepared, null);
+ UpdatePlan plan = getPlanForStatement(schemaName, conn, prepared, fieldsQry, loc, null);
GridCacheContext<?, ?> cctx = plan.tbl.rowDescriptor().context();
@@ -188,14 +191,14 @@ public class DmlStatementsProcessor {
UpdateResult r;
try {
- r = executeUpdateStatement(schemaName, cctx, prepared, fieldsQry, loc, filters, cancel, errKeys);
+ r = executeUpdateStatement(schemaName, cctx, conn, prepared, fieldsQry, loc, filters, cancel, errKeys);
}
finally {
cctx.operationContextPerCall(opCtx);
}
- items += r.cnt;
- errKeys = r.errKeys;
+ items += r.counter();
+ errKeys = r.errorKeys();
if (F.isEmpty(errKeys))
break;
@@ -213,19 +216,22 @@ public class DmlStatementsProcessor {
/**
* @param schemaName Schema.
- * @param p Prepared.
+ * @param c Connection.
+ * @param p Prepared statement.
* @param fieldsQry Initial query
* @param cancel Query cancel.
* @return Update result wrapped into {@link GridQueryFieldsResult}
* @throws IgniteCheckedException if failed.
*/
@SuppressWarnings("unchecked")
- QueryCursorImpl<List<?>> updateSqlFieldsDistributed(String schemaName, Prepared p,
+ QueryCursorImpl<List<?>> updateSqlFieldsDistributed(String schemaName, Connection c, Prepared p,
SqlFieldsQuery fieldsQry, GridQueryCancel cancel) throws IgniteCheckedException {
- UpdateResult res = updateSqlFields(schemaName, p, fieldsQry, false, null, cancel);
+ UpdateResult res = updateSqlFields(schemaName, c, p, fieldsQry, false, null, cancel);
+
+ checkUpdateResult(res);
QueryCursorImpl<List<?>> resCur = (QueryCursorImpl<List<?>>)new QueryCursorImpl(Collections.singletonList
- (Collections.singletonList(res.cnt)), cancel, false);
+ (Collections.singletonList(res.counter())), cancel, false);
resCur.fieldsMeta(UPDATE_RESULT_META);
@@ -236,6 +242,7 @@ public class DmlStatementsProcessor {
* Execute DML statement on local cache.
*
* @param schemaName Schema.
+ * @param conn Connection.
* @param stmt Prepared statement.
* @param fieldsQry Fields query.
* @param filters Cache name and key filter.
@@ -244,14 +251,14 @@ public class DmlStatementsProcessor {
* @throws IgniteCheckedException if failed.
*/
@SuppressWarnings("unchecked")
- GridQueryFieldsResult updateSqlFieldsLocal(String schemaName, PreparedStatement stmt,
+ GridQueryFieldsResult updateSqlFieldsLocal(String schemaName, Connection conn, PreparedStatement stmt,
SqlFieldsQuery fieldsQry, IndexingQueryFilter filters, GridQueryCancel cancel)
throws IgniteCheckedException {
- UpdateResult res = updateSqlFields(schemaName, GridSqlQueryParser.prepared(stmt), fieldsQry, true,
+ UpdateResult res = updateSqlFields(schemaName, conn, GridSqlQueryParser.prepared(stmt), fieldsQry, true,
filters, cancel);
return new GridQueryFieldsResultAdapter(UPDATE_RESULT_META,
- new IgniteSingletonIterator(Collections.singletonList(res.cnt)));
+ new IgniteSingletonIterator(Collections.singletonList(res.counter())));
}
/**
@@ -272,7 +279,7 @@ public class DmlStatementsProcessor {
assert p != null;
- UpdatePlan plan = UpdatePlanBuilder.planForStatement(p, null);
+ UpdatePlan plan = UpdatePlanBuilder.planForStatement(p, true, idx, null, null, null);
if (!F.eq(streamer.cacheName(), plan.tbl.rowDescriptor().context().name()))
throw new IgniteSQLException("Cross cache streaming is not supported, please specify cache explicitly" +
@@ -340,6 +347,7 @@ public class DmlStatementsProcessor {
*
* @param schemaName Schema name.
* @param cctx Cache context.
+ * @param c Connection.
* @param prepared Prepared statement for DML query.
* @param fieldsQry Fields query.
* @param loc Local query flag.
@@ -350,14 +358,14 @@ public class DmlStatementsProcessor {
* @throws IgniteCheckedException if failed.
*/
@SuppressWarnings({"ConstantConditions", "unchecked"})
- private UpdateResult executeUpdateStatement(String schemaName, final GridCacheContext cctx,
+ private UpdateResult executeUpdateStatement(String schemaName, final GridCacheContext cctx, Connection c,
Prepared prepared, SqlFieldsQuery fieldsQry, boolean loc, IndexingQueryFilter filters,
GridQueryCancel cancel, Object[] failedKeys) throws IgniteCheckedException {
int mainCacheId = CU.cacheId(cctx.name());
Integer errKeysPos = null;
- UpdatePlan plan = getPlanForStatement(schemaName, prepared, errKeysPos);
+ UpdatePlan plan = getPlanForStatement(schemaName, c, prepared, fieldsQry, loc, errKeysPos);
if (plan.fastUpdateArgs != null) {
assert F.isEmpty(failedKeys) && errKeysPos == null;
@@ -365,6 +373,14 @@ public class DmlStatementsProcessor {
return doFastUpdate(plan, fieldsQry.getArgs());
}
+ if (plan.distributed != null) {
+ UpdateResult result = doDistributedUpdate(schemaName, fieldsQry, plan, cancel);
+
+ // null is returned in case not all nodes support distributed DML.
+ if (result != null)
+ return result;
+ }
+
assert !F.isEmpty(plan.selectQry);
QueryCursorImpl<List<?>> cur;
@@ -401,18 +417,31 @@ public class DmlStatementsProcessor {
int pageSize = loc ? 0 : fieldsQry.getPageSize();
+ return processDmlSelectResult(cctx, plan, cur, pageSize);
+ }
+
+ /**
+ * @param cctx Cache context.
+ * @param plan Update plan.
+ * @param cursor Cursor over select results.
+ * @param pageSize Page size.
+ * @return Pair [number of successfully processed items; keys that have failed to be processed]
+ * @throws IgniteCheckedException if failed.
+ */
+ private UpdateResult processDmlSelectResult(GridCacheContext cctx, UpdatePlan plan, Iterable<List<?>> cursor,
+ int pageSize) throws IgniteCheckedException {
switch (plan.mode) {
case MERGE:
- return new UpdateResult(doMerge(plan, cur, pageSize), X.EMPTY_OBJECT_ARRAY);
+ return new UpdateResult(doMerge(plan, cursor, pageSize), X.EMPTY_OBJECT_ARRAY);
case INSERT:
- return new UpdateResult(doInsert(plan, cur, pageSize), X.EMPTY_OBJECT_ARRAY);
+ return new UpdateResult(doInsert(plan, cursor, pageSize), X.EMPTY_OBJECT_ARRAY);
case UPDATE:
- return doUpdate(plan, cur, pageSize);
+ return doUpdate(plan, cursor, pageSize);
case DELETE:
- return doDelete(cctx, cur, pageSize);
+ return doDelete(cctx, cursor, pageSize);
default:
throw new IgniteSQLException("Unexpected DML operation [mode=" + plan.mode + ']',
@@ -425,20 +454,23 @@ public class DmlStatementsProcessor {
* if available.
*
* @param schema Schema.
- * @param p Prepared JDBC statement.
+ * @param conn Connection.
+ * @param p Prepared statement.
+ * @param fieldsQry Original fields query.
+ * @param loc Local query flag.
* @return Update plan.
*/
@SuppressWarnings({"unchecked", "ConstantConditions"})
- private UpdatePlan getPlanForStatement(String schema, Prepared p, @Nullable Integer errKeysPos)
- throws IgniteCheckedException {
- H2DmlPlanKey planKey = new H2DmlPlanKey(schema, p.getSQL());
+ private UpdatePlan getPlanForStatement(String schema, Connection conn, Prepared p, SqlFieldsQuery fieldsQry,
+ boolean loc, @Nullable Integer errKeysPos) throws IgniteCheckedException {
+ H2DmlPlanKey planKey = new H2DmlPlanKey(schema, p.getSQL(), loc, fieldsQry);
UpdatePlan res = (errKeysPos == null ? planCache.get(planKey) : null);
if (res != null)
return res;
- res = UpdatePlanBuilder.planForStatement(p, errKeysPos);
+ res = UpdatePlanBuilder.planForStatement(p, loc, idx, conn, fieldsQry, errKeysPos);
// Don't cache re-runs
if (errKeysPos == null)
@@ -449,6 +481,7 @@ public class DmlStatementsProcessor {
/**
* Perform single cache operation based on given args.
+ * @param plan Update plan.
* @param args Query parameters.
* @return 1 if an item was affected, 0 otherwise.
* @throws IgniteCheckedException if failed.
@@ -487,6 +520,25 @@ public class DmlStatementsProcessor {
}
/**
+ * @param schemaName Schema name.
+ * @param fieldsQry Initial query.
+ * @param plan Update plan.
+ * @param cancel Cancel state.
+ * @return Update result.
+ * @throws IgniteCheckedException if failed.
+ */
+ private UpdateResult doDistributedUpdate(String schemaName, SqlFieldsQuery fieldsQry, UpdatePlan plan,
+ GridQueryCancel cancel) throws IgniteCheckedException {
+ assert plan.distributed != null;
+
+ if (cancel == null)
+ cancel = new GridQueryCancel();
+
+ return idx.runDistributedUpdate(schemaName, fieldsQry, plan.distributed.getCacheIds(),
+ plan.distributed.isReplicatedOnly(), cancel);
+ }
+
+ /**
* Perform DELETE operation on top of results of SELECT.
* @param cctx Cache context.
* @param cursor SELECT results.
@@ -573,7 +625,7 @@ public class DmlStatementsProcessor {
GridQueryProperty prop = plan.tbl.rowDescriptor().type().property(plan.colNames[i]);
- assert prop != null;
+ assert prop != null : "Unknown property: " + plan.colNames[i];
newColVals.put(plan.colNames[i], convert(row.get(i + 2), desc, prop.type(), plan.colTypes[i]));
}
@@ -981,6 +1033,31 @@ public class DmlStatementsProcessor {
return new IgniteBiTuple<>(key, val);
}
+ /**
+ *
+ * @param schemaName Schema name.
+ * @param stmt Prepared statement.
+ * @param fldsQry Query.
+ * @param filter Filter.
+ * @param cancel Cancel state.
+ * @param local Locality flag.
+ * @return Update result.
+ * @throws IgniteCheckedException if failed.
+ */
+ UpdateResult mapDistributedUpdate(String schemaName, PreparedStatement stmt, SqlFieldsQuery fldsQry,
+ IndexingQueryFilter filter, GridQueryCancel cancel, boolean local) throws IgniteCheckedException {
+ Connection c;
+
+ try {
+ c = stmt.getConnection();
+ }
+ catch (SQLException e) {
+ throw new IgniteCheckedException(e);
+ }
+
+ return updateSqlFields(schemaName, c, GridSqlQueryParser.prepared(stmt), fldsQry, local, filter, cancel);
+ }
+
/** */
private final static class InsertEntryProcessor implements EntryProcessor<Object, Object, Boolean> {
/** Value to set. */
@@ -1079,26 +1156,19 @@ public class DmlStatementsProcessor {
return stmt instanceof Merge || stmt instanceof Insert || stmt instanceof Update || stmt instanceof Delete;
}
- /** Update result - modifications count and keys to re-run query with, if needed. */
- private final static class UpdateResult {
- /** Result to return for operations that affected 1 item - mostly to be used for fast updates and deletes. */
- final static UpdateResult ONE = new UpdateResult(1, X.EMPTY_OBJECT_ARRAY);
-
- /** Result to return for operations that affected 0 items - mostly to be used for fast updates and deletes. */
- final static UpdateResult ZERO = new UpdateResult(0, X.EMPTY_OBJECT_ARRAY);
-
- /** Number of processed items. */
- final long cnt;
+ /**
+ * Check update result for erroneous keys and throws concurrent update exception if necessary.
+ *
+ * @param r Update result.
+ */
+ static void checkUpdateResult(UpdateResult r) {
+ if (!F.isEmpty(r.errorKeys())) {
+ String msg = "Failed to update some keys because they had been modified concurrently " +
+ "[keys=" + r.errorKeys() + ']';
- /** Keys that failed to be updated or deleted due to concurrent modification of values. */
- @NotNull
- final Object[] errKeys;
+ SQLException conEx = createJdbcSqlException(msg, IgniteQueryErrorCode.CONCURRENT_UPDATE);
- /** */
- @SuppressWarnings("ConstantConditions")
- private UpdateResult(long cnt, Object[] errKeys) {
- this.cnt = cnt;
- this.errKeys = U.firstNotNull(errKeys, X.EMPTY_OBJECT_ARRAY);
+ throw new IgniteSQLException(conEx);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2DmlPlanKey.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2DmlPlanKey.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2DmlPlanKey.java
index 3a43ea1..455b5e5 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2DmlPlanKey.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2DmlPlanKey.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.processors.query.h2;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.internal.processors.query.h2.dml.UpdatePlanBuilder;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -30,20 +32,33 @@ public class H2DmlPlanKey {
/** SQL. */
private final String sql;
+ /** Flags. */
+ private final byte flags;
+
/**
* Constructor.
*
* @param schemaName Schema name.
* @param sql SQL.
*/
- public H2DmlPlanKey(String schemaName, String sql) {
+ public H2DmlPlanKey(String schemaName, String sql, boolean loc, SqlFieldsQuery fieldsQry) {
this.schemaName = schemaName;
this.sql = sql;
+
+ if (loc || !UpdatePlanBuilder.isSkipReducerOnUpdateQuery(fieldsQry))
+ this.flags = 0; // flags only relevant for server side updates.
+ else {
+ this.flags = (byte)(1 +
+ (fieldsQry.isDistributedJoins() ? 2 : 0) +
+ (fieldsQry.isEnforceJoinOrder() ? 4 : 0) +
+ (fieldsQry.isCollocated() ? 8 : 0));
+ }
}
/** {@inheritDoc} */
@Override public int hashCode() {
- return 31 * (schemaName != null ? schemaName.hashCode() : 0) + (sql != null ? sql.hashCode() : 0);
+ return 31 * (31 * (schemaName != null ? schemaName.hashCode() : 0) + (sql != null ? sql.hashCode() : 0)) +
+ flags;
}
/** {@inheritDoc} */
@@ -56,7 +71,7 @@ public class H2DmlPlanKey {
H2DmlPlanKey other = (H2DmlPlanKey)o;
- return F.eq(sql, other.sql) && F.eq(schemaName, other.schemaName);
+ return F.eq(sql, other.sql) && F.eq(schemaName, other.schemaName) && flags == other.flags;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 22ed592..fddd2e8 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -59,7 +59,7 @@ import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.GridTopic;
import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.jdbc2.JdbcSqlFieldsQuery;
+import org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheEntryImpl;
import org.apache.ignite.internal.processors.cache.CacheObject;
@@ -834,7 +834,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
fldsQry.setEnforceJoinOrder(enforceJoinOrder);
fldsQry.setTimeout(timeout, TimeUnit.MILLISECONDS);
- return dmlProc.updateSqlFieldsLocal(schemaName, stmt, fldsQry, filter, cancel);
+ return dmlProc.updateSqlFieldsLocal(schemaName, conn, stmt, fldsQry, filter, cancel);
}
else if (DdlStatementsProcessor.isDdlStatement(p))
throw new IgniteSQLException("DDL statements are supported for the whole cluster only",
@@ -1215,6 +1215,27 @@ public class IgniteH2Indexing implements GridQueryIndexing {
};
}
+ /**
+ * Run DML on remote nodes.
+ *
+ * @param schemaName Schema name.
+ * @param fieldsQry Initial update query.
+ * @param cacheIds Cache identifiers.
+ * @param isReplicatedOnly Whether query uses only replicated caches.
+ * @param cancel Cancel state.
+ * @return Update result.
+ */
+ UpdateResult runDistributedUpdate(
+ String schemaName,
+ SqlFieldsQuery fieldsQry,
+ List<Integer> cacheIds,
+ boolean isReplicatedOnly,
+ GridQueryCancel cancel) {
+ return rdcQryExec.update(schemaName, cacheIds, fieldsQry.getSql(), fieldsQry.getArgs(),
+ fieldsQry.isEnforceJoinOrder(), fieldsQry.getPageSize(), fieldsQry.getTimeout(),
+ fieldsQry.getPartitions(), isReplicatedOnly, cancel);
+ }
+
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public <K, V> QueryCursor<Cache.Entry<K, V>> queryDistributedSql(String schemaName, String cacheName,
@@ -1429,8 +1450,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
if (twoStepQry == null) {
if (DmlStatementsProcessor.isDmlStatement(prepared)) {
try {
- res.add(dmlProc.updateSqlFieldsDistributed(schemaName, prepared,
- new SqlFieldsQuery(qry).setSql(sqlQry).setArgs(args), cancel));
+ res.add(dmlProc.updateSqlFieldsDistributed(schemaName, c, prepared,
+ qry.copy().setSql(sqlQry).setArgs(args), cancel));
continue;
}
@@ -1452,33 +1473,13 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
}
- LinkedHashSet<Integer> caches0 = new LinkedHashSet<>();
-
assert twoStepQry != null;
- int tblCnt = twoStepQry.tablesCount();
-
- if (mainCacheId != null)
- caches0.add(mainCacheId);
-
- if (tblCnt > 0) {
- for (QueryTable tblKey : twoStepQry.tables()) {
- GridH2Table tbl = dataTable(tblKey);
-
- int cacheId = CU.cacheId(tbl.cacheName());
-
- caches0.add(cacheId);
- }
- }
+ List<Integer> cacheIds = collectCacheIds(mainCacheId, twoStepQry);
- if (caches0.isEmpty())
+ if (F.isEmpty(cacheIds))
twoStepQry.local(true);
else {
- //Prohibit usage indices with different numbers of segments in same query.
- List<Integer> cacheIds = new ArrayList<>(caches0);
-
- checkCacheIndexSegmentation(cacheIds);
-
twoStepQry.cacheIds(cacheIds);
twoStepQry.local(qry.isLocal());
}
@@ -1517,7 +1518,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
* @param isQry {@code true} for select queries, otherwise (DML/DDL queries) {@code false}.
*/
private void checkQueryType(SqlFieldsQuery qry, boolean isQry) {
- if (qry instanceof JdbcSqlFieldsQuery && ((JdbcSqlFieldsQuery)qry).isQuery() != isQry)
+ if (qry instanceof SqlFieldsQueryEx && ((SqlFieldsQueryEx)qry).isQuery() != null &&
+ ((SqlFieldsQueryEx)qry).isQuery() != isQry)
throw new IgniteSQLException("Given statement type does not match that declared by JDBC driver",
IgniteQueryErrorCode.STMT_TYPE_MISMATCH);
}
@@ -1568,6 +1570,29 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
/**
+ * Run DML request from other node.
+ *
+ * @param schemaName Schema name.
+ * @param fldsQry Query.
+ * @param filter Filter.
+ * @param cancel Cancel state.
+ * @param local Locality flag.
+ * @return Update result.
+ * @throws IgniteCheckedException if failed.
+ */
+ public UpdateResult mapDistributedUpdate(String schemaName, SqlFieldsQuery fldsQry, IndexingQueryFilter filter,
+ GridQueryCancel cancel, boolean local) throws IgniteCheckedException {
+ Connection conn = connectionForSchema(schemaName);
+
+ H2Utils.setupConnection(conn, false, fldsQry.isEnforceJoinOrder());
+
+ PreparedStatement stmt = preparedStatementWithParams(conn, fldsQry.getSql(),
+ Arrays.asList(fldsQry.getArgs()), true);
+
+ return dmlProc.mapDistributedUpdate(schemaName, stmt, fldsQry, filter, cancel, local);
+ }
+
+ /**
* @throws IllegalStateException if segmented indices used with non-segmented indices.
*/
private void checkCacheIndexSegmentation(List<Integer> cacheIds) {
@@ -2524,6 +2549,43 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
/**
+ * Collect cache identifiers from two-step query.
+ *
+ * @param mainCacheId Id of main cache.
+ * @param twoStepQry Two-step query.
+ * @return Result.
+ */
+ public List<Integer> collectCacheIds(@Nullable Integer mainCacheId, GridCacheTwoStepQuery twoStepQry) {
+ LinkedHashSet<Integer> caches0 = new LinkedHashSet<>();
+
+ int tblCnt = twoStepQry.tablesCount();
+
+ if (mainCacheId != null)
+ caches0.add(mainCacheId);
+
+ if (tblCnt > 0) {
+ for (QueryTable tblKey : twoStepQry.tables()) {
+ GridH2Table tbl = dataTable(tblKey);
+
+ int cacheId = CU.cacheId(tbl.cacheName());
+
+ caches0.add(cacheId);
+ }
+ }
+
+ if (caches0.isEmpty())
+ return null;
+ else {
+ //Prohibit usage indices with different numbers of segments in same query.
+ List<Integer> cacheIds = new ArrayList<>(caches0);
+
+ checkCacheIndexSegmentation(cacheIds);
+
+ return cacheIds;
+ }
+ }
+
+ /**
* Closeable iterator.
*/
private interface ClIter<X> extends AutoCloseable, Iterator<X> {
http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/UpdateResult.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/UpdateResult.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/UpdateResult.java
new file mode 100644
index 0000000..de0e63f
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/UpdateResult.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2;
+
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ * Update result - modifications count and keys to re-run query with, if needed.
+ */
+public final class UpdateResult {
+ /** Result to return for operations that affected 1 item - mostly to be used for fast updates and deletes. */
+ final static UpdateResult ONE = new UpdateResult(1, X.EMPTY_OBJECT_ARRAY);
+
+ /** Result to return for operations that affected 0 items - mostly to be used for fast updates and deletes. */
+ final static UpdateResult ZERO = new UpdateResult(0, X.EMPTY_OBJECT_ARRAY);
+
+ /** Number of processed items. */
+ private final long cnt;
+
+ /** Keys that failed to be updated or deleted due to concurrent modification of values. */
+ private final Object[] errKeys;
+
+ /**
+ * Constructor.
+ *
+ * @param cnt Updated rows count.
+ * @param errKeys Array of erroneous keys.
+ */
+ public @SuppressWarnings("ConstantConditions") UpdateResult(long cnt, Object[] errKeys) {
+ this.cnt = cnt;
+ this.errKeys = U.firstNotNull(errKeys, X.EMPTY_OBJECT_ARRAY);
+ }
+
+ /**
+ * @return Update counter.
+ */
+ public long counter() {
+ return cnt;
+ }
+
+ /**
+ * @return Error keys.
+ */
+ public Object[] errorKeys() {
+ return errKeys;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java
index b81ac60..a99d811 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.query.h2.dml;
+import java.util.List;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
import org.apache.ignite.internal.util.typedef.F;
@@ -64,10 +65,13 @@ public final class UpdatePlan {
/** Arguments for fast UPDATE or DELETE. */
public final FastUpdateArguments fastUpdateArgs;
+ /** Additional info for distributed update. */
+ public final DistributedPlanInfo distributed;
+
/** */
private UpdatePlan(UpdateMode mode, GridH2Table tbl, String[] colNames, int[] colTypes, KeyValueSupplier keySupplier,
KeyValueSupplier valSupplier, int keyColIdx, int valColIdx, String selectQry, boolean isLocSubqry,
- int rowsNum, FastUpdateArguments fastUpdateArgs) {
+ int rowsNum, FastUpdateArguments fastUpdateArgs, DistributedPlanInfo distributed) {
this.colNames = colNames;
this.colTypes = colTypes;
this.rowsNum = rowsNum;
@@ -83,46 +87,84 @@ public final class UpdatePlan {
this.selectQry = selectQry;
this.isLocSubqry = isLocSubqry;
this.fastUpdateArgs = fastUpdateArgs;
+ this.distributed = distributed;
}
/** */
public static UpdatePlan forMerge(GridH2Table tbl, String[] colNames, int[] colTypes, KeyValueSupplier keySupplier,
KeyValueSupplier valSupplier, int keyColIdx, int valColIdx, String selectQry, boolean isLocSubqry,
- int rowsNum) {
+ int rowsNum, DistributedPlanInfo distributed) {
assert !F.isEmpty(colNames);
return new UpdatePlan(UpdateMode.MERGE, tbl, colNames, colTypes, keySupplier, valSupplier, keyColIdx, valColIdx,
- selectQry, isLocSubqry, rowsNum, null);
+ selectQry, isLocSubqry, rowsNum, null, distributed);
}
/** */
public static UpdatePlan forInsert(GridH2Table tbl, String[] colNames, int[] colTypes, KeyValueSupplier keySupplier,
- KeyValueSupplier valSupplier, int keyColIdx, int valColIdx, String selectQry, boolean isLocSubqry, int rowsNum) {
+ KeyValueSupplier valSupplier, int keyColIdx, int valColIdx, String selectQry, boolean isLocSubqry,
+ int rowsNum, DistributedPlanInfo distributed) {
assert !F.isEmpty(colNames);
- return new UpdatePlan(UpdateMode.INSERT, tbl, colNames, colTypes, keySupplier, valSupplier, keyColIdx, valColIdx,
- selectQry, isLocSubqry, rowsNum, null);
+ return new UpdatePlan(UpdateMode.INSERT, tbl, colNames, colTypes, keySupplier, valSupplier, keyColIdx,
+ valColIdx, selectQry, isLocSubqry, rowsNum, null, distributed);
}
/** */
public static UpdatePlan forUpdate(GridH2Table tbl, String[] colNames, int[] colTypes, KeyValueSupplier valSupplier,
- int valColIdx, String selectQry) {
+ int valColIdx, String selectQry, DistributedPlanInfo distributed) {
assert !F.isEmpty(colNames);
return new UpdatePlan(UpdateMode.UPDATE, tbl, colNames, colTypes, null, valSupplier, -1, valColIdx, selectQry,
- false, 0, null);
+ false, 0, null, distributed);
}
/** */
- public static UpdatePlan forDelete(GridH2Table tbl, String selectQry) {
- return new UpdatePlan(UpdateMode.DELETE, tbl, null, null, null, null, -1, -1, selectQry, false, 0, null);
+ public static UpdatePlan forDelete(GridH2Table tbl, String selectQry, DistributedPlanInfo distributed) {
+ return new UpdatePlan(UpdateMode.DELETE, tbl, null, null, null, null, -1, -1, selectQry, false, 0, null,
+ distributed);
}
/** */
public static UpdatePlan forFastUpdate(UpdateMode mode, GridH2Table tbl, FastUpdateArguments fastUpdateArgs) {
assert mode == UpdateMode.UPDATE || mode == UpdateMode.DELETE;
- return new UpdatePlan(mode, tbl, null, null, null, null, -1, -1, null, false, 0, fastUpdateArgs);
+ return new UpdatePlan(mode, tbl, null, null, null, null, -1, -1, null, false, 0, fastUpdateArgs, null);
}
+ /**
+ * Additional information about distributed update plan.
+ */
+ public final static class DistributedPlanInfo {
+ /** Whether update involves only replicated caches. */
+ private final boolean replicatedOnly;
+
+ /** Identifiers of caches involved in update (used for cluster nodes mapping). */
+ private final List<Integer> cacheIds;
+
+ /**
+ * Constructor.
+ *
+ * @param replicatedOnly Whether all caches are replicated.
+ * @param cacheIds List of cache identifiers.
+ */
+ DistributedPlanInfo(boolean replicatedOnly, List<Integer> cacheIds) {
+ this.replicatedOnly = replicatedOnly;
+ this.cacheIds = cacheIds;
+ }
+
+ /**
+ * @return {@code true} in case all involved caches are replicated.
+ */
+ public boolean isReplicatedOnly() {
+ return replicatedOnly;
+ }
+
+ /**
+ * @return cache identifiers.
+ */
+ public List<Integer> getCacheIds() {
+ return cacheIds;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java
index 804f7d8..c845266 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java
@@ -18,19 +18,26 @@
package org.apache.ignite.internal.processors.query.h2.dml;
import java.lang.reflect.Constructor;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.binary.BinaryObjectBuilder;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery;
import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
+import org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx;
import org.apache.ignite.internal.processors.query.GridQueryProperty;
import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
import org.apache.ignite.internal.processors.query.QueryUtils;
import org.apache.ignite.internal.processors.query.h2.DmlStatementsProcessor;
+import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
import org.apache.ignite.internal.processors.query.h2.sql.DmlAstUtils;
@@ -41,12 +48,15 @@ import org.apache.ignite.internal.processors.query.h2.sql.GridSqlInsert;
import org.apache.ignite.internal.processors.query.h2.sql.GridSqlMerge;
import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQuery;
import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQueryParser;
+import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQuerySplitter;
import org.apache.ignite.internal.processors.query.h2.sql.GridSqlSelect;
import org.apache.ignite.internal.processors.query.h2.sql.GridSqlStatement;
import org.apache.ignite.internal.processors.query.h2.sql.GridSqlTable;
import org.apache.ignite.internal.processors.query.h2.sql.GridSqlUnion;
import org.apache.ignite.internal.processors.query.h2.sql.GridSqlUpdate;
import org.apache.ignite.internal.util.GridUnsafe;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.h2.command.Prepared;
@@ -71,29 +81,39 @@ public final class UpdatePlanBuilder {
* if available.
*
* @param prepared H2's {@link Prepared}.
+ * @param loc Local query flag.
+ * @param idx Indexing.
+ * @param conn Connection.
+ * @param fieldsQuery Original query.
* @return Update plan.
*/
- public static UpdatePlan planForStatement(Prepared prepared,
- @Nullable Integer errKeysPos) throws IgniteCheckedException {
+ public static UpdatePlan planForStatement(Prepared prepared, boolean loc, IgniteH2Indexing idx,
+ @Nullable Connection conn, @Nullable SqlFieldsQuery fieldsQuery, @Nullable Integer errKeysPos)
+ throws IgniteCheckedException {
assert !prepared.isQuery();
GridSqlStatement stmt = new GridSqlQueryParser(false).parse(prepared);
if (stmt instanceof GridSqlMerge || stmt instanceof GridSqlInsert)
- return planForInsert(stmt);
+ return planForInsert(stmt, loc, idx, conn, fieldsQuery);
else
- return planForUpdate(stmt, errKeysPos);
+ return planForUpdate(stmt, loc, idx, conn, fieldsQuery, errKeysPos);
}
/**
* Prepare update plan for INSERT or MERGE.
*
* @param stmt INSERT or MERGE statement.
+ * @param loc Local query flag.
+ * @param idx Indexing.
+ * @param conn Connection.
+ * @param fieldsQuery Original query.
* @return Update plan.
* @throws IgniteCheckedException if failed.
*/
@SuppressWarnings("ConstantConditions")
- private static UpdatePlan planForInsert(GridSqlStatement stmt) throws IgniteCheckedException {
+ private static UpdatePlan planForInsert(GridSqlStatement stmt, boolean loc, IgniteH2Indexing idx,
+ @Nullable Connection conn, @Nullable SqlFieldsQuery fieldsQuery) throws IgniteCheckedException {
GridSqlQuery sel;
GridSqlElement target;
@@ -191,23 +211,33 @@ public final class UpdatePlanBuilder {
KeyValueSupplier keySupplier = createSupplier(cctx, desc.type(), keyColIdx, hasKeyProps, true, false);
KeyValueSupplier valSupplier = createSupplier(cctx, desc.type(), valColIdx, hasValProps, false, false);
+ String selectSql = sel.getSQL();
+
+ UpdatePlan.DistributedPlanInfo distributed = (rowsNum == 0 && !F.isEmpty(selectSql)) ?
+ checkPlanCanBeDistributed(idx, conn, fieldsQuery, loc, selectSql, tbl.dataTable().cacheName()) : null;
+
if (stmt instanceof GridSqlMerge)
return UpdatePlan.forMerge(tbl.dataTable(), colNames, colTypes, keySupplier, valSupplier, keyColIdx,
- valColIdx, sel.getSQL(), !isTwoStepSubqry, rowsNum);
+ valColIdx, selectSql, !isTwoStepSubqry, rowsNum, distributed);
else
return UpdatePlan.forInsert(tbl.dataTable(), colNames, colTypes, keySupplier, valSupplier, keyColIdx,
- valColIdx, sel.getSQL(), !isTwoStepSubqry, rowsNum);
+ valColIdx, selectSql, !isTwoStepSubqry, rowsNum, distributed);
}
/**
* Prepare update plan for UPDATE or DELETE.
*
* @param stmt UPDATE or DELETE statement.
+ * @param loc Local query flag.
+ * @param idx Indexing.
+ * @param conn Connection.
+ * @param fieldsQuery Original query.
* @param errKeysPos index to inject param for re-run keys at. Null if it's not a re-run plan.
* @return Update plan.
* @throws IgniteCheckedException if failed.
*/
- private static UpdatePlan planForUpdate(GridSqlStatement stmt, @Nullable Integer errKeysPos)
+ private static UpdatePlan planForUpdate(GridSqlStatement stmt, boolean loc, IgniteH2Indexing idx,
+ @Nullable Connection conn, @Nullable SqlFieldsQuery fieldsQuery, @Nullable Integer errKeysPos)
throws IgniteCheckedException {
GridSqlElement target;
@@ -286,12 +316,23 @@ public final class UpdatePlanBuilder {
sel = DmlAstUtils.selectForUpdate((GridSqlUpdate) stmt, errKeysPos);
- return UpdatePlan.forUpdate(gridTbl, colNames, colTypes, newValSupplier, valColIdx, sel.getSQL());
+ String selectSql = sel.getSQL();
+
+ UpdatePlan.DistributedPlanInfo distributed = F.isEmpty(selectSql) ? null :
+ checkPlanCanBeDistributed(idx, conn, fieldsQuery, loc, selectSql, tbl.dataTable().cacheName());
+
+ return UpdatePlan.forUpdate(gridTbl, colNames, colTypes, newValSupplier, valColIdx, selectSql,
+ distributed);
}
else {
sel = DmlAstUtils.selectForDelete((GridSqlDelete) stmt, errKeysPos);
- return UpdatePlan.forDelete(gridTbl, sel.getSQL());
+ String selectSql = sel.getSQL();
+
+ UpdatePlan.DistributedPlanInfo distributed = F.isEmpty(selectSql) ? null :
+ checkPlanCanBeDistributed(idx, conn, fieldsQuery, loc, selectSql, tbl.dataTable().cacheName());
+
+ return UpdatePlan.forDelete(gridTbl, selectSql, distributed);
}
}
}
@@ -494,6 +535,62 @@ public final class UpdatePlanBuilder {
}
/**
+ * Checks whether the given update plan can be distributed and returns additional info.
+ *
+ * @param idx Indexing.
+ * @param conn Connection.
+ * @param fieldsQry Initial update query.
+ * @param loc Local query flag.
+ * @param selectQry Derived select query.
+ * @param cacheName Cache name.
+ * @return distributed update plan info, or {@code null} if cannot be distributed.
+ * @throws IgniteCheckedException if failed.
+ */
+ private static UpdatePlan.DistributedPlanInfo checkPlanCanBeDistributed(IgniteH2Indexing idx,
+ Connection conn, SqlFieldsQuery fieldsQry, boolean loc, String selectQry, String cacheName)
+ throws IgniteCheckedException {
+
+ if (loc || !isSkipReducerOnUpdateQuery(fieldsQry))
+ return null;
+
+ assert conn != null;
+
+ try {
+ // Get a new prepared statement for derived select query.
+ try (PreparedStatement stmt = conn.prepareStatement(selectQry)) {
+ idx.bindParameters(stmt, F.asList(fieldsQry.getArgs()));
+
+ GridCacheTwoStepQuery qry = GridSqlQuerySplitter.split(conn,
+ GridSqlQueryParser.prepared(stmt),
+ fieldsQry.getArgs(),
+ fieldsQry.isCollocated(),
+ fieldsQry.isDistributedJoins(),
+ fieldsQry.isEnforceJoinOrder(), idx);
+
+ boolean distributed = qry.skipMergeTable() && qry.mapQueries().size() == 1 &&
+ !qry.mapQueries().get(0).hasSubQueries();
+
+ return distributed ? new UpdatePlan.DistributedPlanInfo(qry.isReplicatedOnly(),
+ idx.collectCacheIds(CU.cacheId(cacheName), qry)): null;
+ }
+ }
+ catch (SQLException e) {
+ throw new IgniteCheckedException(e);
+ }
+ }
+
+ /**
+ * Checks whether query flags are compatible with server side update.
+ *
+ * @param qry Query.
+ * @return {@code true} if update can be distributed.
+ */
+ public static boolean isSkipReducerOnUpdateQuery(SqlFieldsQuery qry) {
+ return qry != null && !qry.isLocal() &&
+ qry instanceof SqlFieldsQueryEx && ((SqlFieldsQueryEx)qry).isSkipReducerOnUpdate();
+ }
+
+ /**
* Simple supplier that just takes specified element of a given row.
*/
private final static class PlainValueSupplier implements KeyValueSupplier {
http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
index 7f28203..c96b486 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
@@ -1509,6 +1509,19 @@ public class GridSqlQuerySplitter {
rdcQry.distinct(true);
}
+ // -- SUB-QUERIES
+ boolean hasSubQueries = hasSubQueries(mapQry.where()) || hasSubQueries(mapQry.from());
+
+ if (!hasSubQueries) {
+ for (int i = 0; i < mapQry.columns(false).size(); i++) {
+ if (hasSubQueries(mapQry.column(i))) {
+ hasSubQueries = true;
+
+ break;
+ }
+ }
+ }
+
// Replace the given select with generated reduce query in the parent.
prnt.child(childIdx, rdcQry);
@@ -1519,6 +1532,7 @@ public class GridSqlQuerySplitter {
map.columns(collectColumns(mapExps));
map.sortColumns(mapQry.sort());
map.partitioned(hasPartitionedTables(mapQry));
+ map.hasSubQueries(hasSubQueries);
if (map.isPartitioned())
map.derivedPartitions(derivePartitionsFromQuery(mapQry, ctx));
@@ -1543,6 +1557,25 @@ public class GridSqlQuerySplitter {
}
/**
+ * @param ast Map query AST.
+ * @return {@code true} If the given AST has sub-queries.
+ */
+ private boolean hasSubQueries(GridSqlAst ast) {
+ if (ast == null)
+ return false;
+
+ if (ast instanceof GridSqlSubquery)
+ return true;
+
+ for (int childIdx = 0; childIdx < ast.size(); childIdx++) {
+ if (hasSubQueries(ast.child(childIdx)))
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
* @param sqlQry Query.
* @param qryAst Select AST.
* @param params All parameters.
http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/DistributedUpdateRun.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/DistributedUpdateRun.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/DistributedUpdateRun.java
new file mode 100644
index 0000000..a783b8a
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/DistributedUpdateRun.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.twostep;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.UUID;
+import javax.cache.CacheException;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.query.GridRunningQueryInfo;
+import org.apache.ignite.internal.processors.query.h2.UpdateResult;
+import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2DmlResponse;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.F;
+
+/**
+ * Context for DML operation on reducer node.
+ */
+class DistributedUpdateRun {
+ /** Expected number of responses. */
+ private final int nodeCount;
+
+ /** Registers nodes that have responded. */
+ private final HashSet<UUID> rspNodes;
+
+ /** Accumulates total number of updated rows. */
+ private long updCntr = 0L;
+
+ /** Accumulates error keys. */
+ private HashSet<Object> errorKeys;
+
+ /** Query info. */
+ private final GridRunningQueryInfo qry;
+
+ /** Result future. */
+ private final GridFutureAdapter<UpdateResult> fut = new GridFutureAdapter<>();
+
+ /**
+ * Constructor.
+ *
+ * @param nodeCount Number of nodes to await results from.
+ * @param qry Query info.
+ */
+ DistributedUpdateRun(int nodeCount, GridRunningQueryInfo qry) {
+ this.nodeCount = nodeCount;
+ this.qry = qry;
+
+ rspNodes = new HashSet<>(nodeCount);
+ }
+
+ /**
+ * @return Query info.
+ */
+ GridRunningQueryInfo queryInfo() {
+ return qry;
+ }
+
+ /**
+ * @return Result future.
+ */
+ GridFutureAdapter<UpdateResult> future() {
+ return fut;
+ }
+
+ /**
+ * Handle disconnection.
+ * @param e Pre-formatted error.
+ */
+ void handleDisconnect(CacheException e) {
+ fut.onDone(new IgniteCheckedException("Update failed because client node have disconnected.", e));
+ }
+
+ /**
+ * Handle leave of a node.
+ *
+ * @param nodeId Node id.
+ */
+ void handleNodeLeft(UUID nodeId) {
+ fut.onDone(new IgniteCheckedException("Update failed because map node left topology [nodeId=" + nodeId + "]"));
+ }
+
+ /**
+ * Handle response from remote node.
+ *
+ * @param id Node id.
+ * @param msg Response message.
+ */
+ void handleResponse(UUID id, GridH2DmlResponse msg) {
+ synchronized (this) {
+ if (!rspNodes.add(id))
+ return; // ignore duplicated messages
+
+ String err = msg.error();
+
+ if (err != null) {
+ fut.onDone(new IgniteCheckedException("Update failed. " + (F.isEmpty(err) ? "" : err) + "[reqId=" +
+ msg.requestId() + ", node=" + id + "]."));
+
+ return;
+ }
+
+ if (!F.isEmpty(msg.errorKeys())) {
+ List<Object> errList = Arrays.asList(msg.errorKeys());
+
+ if (errorKeys == null)
+ errorKeys = new HashSet<>(errList);
+ else
+ errorKeys.addAll(errList);
+ }
+
+ updCntr += msg.updateCounter();
+
+ if (rspNodes.size() == nodeCount)
+ fut.onDone(new UpdateResult(updCntr, errorKeys == null ? null : errorKeys.toArray()));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index 0cc4172..77b928f 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -21,6 +21,7 @@ import java.sql.Connection;
import java.sql.ResultSet;
import java.util.AbstractCollection;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
@@ -30,12 +31,14 @@ import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.cache.CacheException;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.query.QueryCancelledException;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.events.CacheQueryExecutedEvent;
import org.apache.ignite.events.DiscoveryEvent;
@@ -54,8 +57,10 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservabl
import org.apache.ignite.internal.processors.cache.query.CacheQueryType;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable;
import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
+import org.apache.ignite.internal.processors.query.GridQueryCancel;
import org.apache.ignite.internal.processors.query.h2.H2Utils;
import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
+import org.apache.ignite.internal.processors.query.h2.UpdateResult;
import org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryContext;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2RetryException;
@@ -63,6 +68,8 @@ import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQuery
import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryFailResponse;
import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageRequest;
import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageResponse;
+import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2DmlRequest;
+import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2DmlResponse;
import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryRequest;
import org.apache.ignite.internal.util.GridSpinBusyLock;
import org.apache.ignite.internal.util.typedef.CI1;
@@ -71,6 +78,7 @@ import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.thread.IgniteThread;
+import org.apache.ignite.spi.indexing.IndexingQueryFilter;
import org.h2.jdbc.JdbcResultSet;
import org.h2.value.Value;
import org.jetbrains.annotations.Nullable;
@@ -208,6 +216,8 @@ public class GridMapQueryExecutor {
onNextPageRequest(node, (GridQueryNextPageRequest)msg);
else if (msg instanceof GridQueryCancelRequest)
onCancel(node, (GridQueryCancelRequest)msg);
+ else if (msg instanceof GridH2DmlRequest)
+ onDmlRequest(node, (GridH2DmlRequest)msg);
else
processed = false;
@@ -735,6 +745,102 @@ public class GridMapQueryExecutor {
/**
* @param node Node.
+ * @param req DML request.
+ */
+ private void onDmlRequest(final ClusterNode node, final GridH2DmlRequest req) throws IgniteCheckedException {
+ int[] parts = req.queryPartitions();
+
+ List<Integer> cacheIds = req.caches();
+
+ long reqId = req.requestId();
+
+ AffinityTopologyVersion topVer = req.topologyVersion();
+
+ List<GridReservable> reserved = new ArrayList<>();
+
+ if (!reservePartitions(cacheIds, topVer, parts, reserved)) {
+ U.error(log, "Failed to reserve partitions for DML request. [localNodeId=" + ctx.localNodeId() +
+ ", nodeId=" + node.id() + ", reqId=" + req.requestId() + ", cacheIds=" + cacheIds +
+ ", topVer=" + topVer + ", parts=" + Arrays.toString(parts) + ']');
+
+ sendUpdateResponse(node, reqId, null, "Failed to reserve partitions for DML request. " +
+ "Explanation (Retry your request when re-balancing is over).");
+
+ return;
+ }
+
+ MapNodeResults nodeResults = resultsForNode(node.id());
+
+ try {
+ IndexingQueryFilter filter = h2.backupFilter(topVer, parts);
+
+ GridQueryCancel cancel = nodeResults.putUpdate(reqId);
+
+ SqlFieldsQuery fldsQry = new SqlFieldsQuery(req.query());
+
+ if (req.parameters() != null)
+ fldsQry.setArgs(req.parameters());
+
+ fldsQry.setEnforceJoinOrder(req.isFlagSet(GridH2QueryRequest.FLAG_ENFORCE_JOIN_ORDER));
+ fldsQry.setTimeout(req.timeout(), TimeUnit.MILLISECONDS);
+ fldsQry.setPageSize(req.pageSize());
+ fldsQry.setLocal(true);
+
+ boolean local = true;
+
+ final boolean replicated = req.isFlagSet(GridH2QueryRequest.FLAG_REPLICATED);
+
+ if (!replicated && !F.isEmpty(cacheIds) &&
+ findFirstPartitioned(cacheIds).config().getQueryParallelism() > 1) {
+ fldsQry.setDistributedJoins(true);
+
+ local = false;
+ }
+
+ UpdateResult updRes = h2.mapDistributedUpdate(req.schemaName(), fldsQry, filter, cancel, local);
+
+ GridCacheContext<?, ?> mainCctx =
+ !F.isEmpty(cacheIds) ? ctx.cache().context().cacheContext(cacheIds.get(0)) : null;
+
+ boolean evt = local && mainCctx != null && ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED);
+
+ if (evt) {
+ ctx.event().record(new CacheQueryExecutedEvent<>(
+ node,
+ "SQL query executed.",
+ EVT_CACHE_QUERY_EXECUTED,
+ CacheQueryType.SQL.name(),
+ mainCctx.name(),
+ null,
+ req.query(),
+ null,
+ null,
+ req.parameters(),
+ node.id(),
+ null));
+ }
+
+ sendUpdateResponse(node, reqId, updRes, null);
+ }
+ catch (Exception e) {
+ U.error(log, "Error processing dml request. [localNodeId=" + ctx.localNodeId() +
+ ", nodeId=" + node.id() + ", req=" + req + ']', e);
+
+ sendUpdateResponse(node, reqId, null, e.getMessage());
+ }
+ finally {
+ if (!F.isEmpty(reserved)) {
+ // Release reserved partitions.
+ for (int i = 0; i < reserved.size(); i++)
+ reserved.get(i).release();
+ }
+
+ nodeResults.removeUpdate(reqId);
+ }
+ }
+
+ /**
+ * @param node Node.
* @param qryReqId Query request ID.
* @param err Error.
*/
@@ -758,6 +864,36 @@ public class GridMapQueryExecutor {
}
/**
+ * Sends update response for DML request.
+ *
+ * @param node Node.
+ * @param reqId Request id.
+ * @param updResult Update result.
+ * @param error Error message.
+ */
+ @SuppressWarnings("deprecation")
+ private void sendUpdateResponse(ClusterNode node, long reqId, UpdateResult updResult, String error) {
+ try {
+ GridH2DmlResponse rsp = new GridH2DmlResponse(reqId, updResult == null ? 0 : updResult.counter(),
+ updResult == null ? null : updResult.errorKeys(), error);
+
+ if (log.isDebugEnabled())
+ log.debug("Sending: [localNodeId=" + ctx.localNodeId() + ", node=" + node.id() + ", msg=" + rsp + "]");
+
+ if (node.isLocal())
+ h2.reduceQueryExecutor().onMessage(ctx.localNodeId(), rsp);
+ else {
+ rsp.marshall(ctx.config().getMarshaller());
+
+ ctx.io().sendToGridTopic(node, GridTopic.TOPIC_QUERY, rsp, QUERY_POOL);
+ }
+ }
+ catch (Exception e) {
+ U.error(log, "Failed to send message.", e);
+ }
+ }
+
+ /**
* @param node Node.
* @param req Request.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 8638794..f85cd94 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -33,6 +33,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -59,6 +60,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable;
+import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery;
import org.apache.ignite.internal.processors.query.GridQueryCacheObjectsIterator;
@@ -67,6 +69,7 @@ import org.apache.ignite.internal.processors.query.GridRunningQueryInfo;
import org.apache.ignite.internal.processors.query.h2.H2FieldsIterator;
import org.apache.ignite.internal.processors.query.h2.H2Utils;
import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
+import org.apache.ignite.internal.processors.query.h2.UpdateResult;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryContext;
import org.apache.ignite.internal.processors.query.h2.sql.GridSqlSortColumn;
import org.apache.ignite.internal.processors.query.h2.sql.GridSqlType;
@@ -74,6 +77,8 @@ import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQuery
import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryFailResponse;
import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageRequest;
import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageResponse;
+import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2DmlRequest;
+import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2DmlResponse;
import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryRequest;
import org.apache.ignite.internal.util.GridIntIterator;
import org.apache.ignite.internal.util.GridIntList;
@@ -130,6 +135,9 @@ public class GridReduceQueryExecutor {
/** */
private final ConcurrentMap<Long, ReduceQueryRun> runs = new ConcurrentHashMap8<>();
+ /** Contexts of running DML requests. */
+ private final ConcurrentMap<Long, DistributedUpdateRun> updRuns = new ConcurrentHashMap<>();
+
/** */
private volatile List<GridThreadLocalTable> fakeTbls = Collections.emptyList();
@@ -197,6 +205,10 @@ public class GridReduceQueryExecutor {
}
}
}
+
+ for (DistributedUpdateRun r : updRuns.values())
+ r.handleNodeLeft(nodeId);
+
}
}, EventType.EVT_NODE_FAILED, EventType.EVT_NODE_LEFT);
}
@@ -229,6 +241,8 @@ public class GridReduceQueryExecutor {
onNextPage(node, (GridQueryNextPageResponse)msg);
else if (msg instanceof GridQueryFailResponse)
onFail(node, (GridQueryFailResponse)msg);
+ else if (msg instanceof GridH2DmlResponse)
+ onDmlResponse(node, (GridH2DmlResponse)msg);
else
processed = false;
@@ -575,25 +589,11 @@ public class GridReduceQueryExecutor {
if (qry.isLocal())
nodes = singletonList(ctx.discovery().localNode());
else {
- if (isPreloadingActive(cacheIds)) {
- if (isReplicatedOnly)
- nodes = replicatedUnstableDataNodes(cacheIds);
- else {
- partsMap = partitionedUnstableDataNodes(cacheIds);
-
- if (partsMap != null) {
- qryMap = narrowForQuery(partsMap, parts);
-
- nodes = qryMap == null ? null : qryMap.keySet();
- }
- }
- }
- else {
- qryMap = stableDataNodes(isReplicatedOnly, topVer, cacheIds, parts);
+ NodesForPartitionsResult nodesParts = nodesForPartitions(cacheIds, topVer, parts, isReplicatedOnly);
- if (qryMap != null)
- nodes = qryMap.keySet();
- }
+ nodes = nodesParts.nodes();
+ partsMap = nodesParts.partitionsMap();
+ qryMap = nodesParts.queryPartitionsMap();
if (nodes == null)
continue; // Retry.
@@ -845,6 +845,153 @@ public class GridReduceQueryExecutor {
}
/**
+ *
+ * @param schemaName Schema name.
+ * @param cacheIds Cache ids.
+ * @param selectQry Select query.
+ * @param params SQL parameters.
+ * @param enforceJoinOrder Enforce join order of tables.
+ * @param pageSize Page size.
+ * @param timeoutMillis Timeout.
+ * @param parts Partitions.
+ * @param isReplicatedOnly Whether query uses only replicated caches.
+ * @param cancel Cancel state.
+ * @return Update result, or {@code null} when some map node doesn't support distributed DML.
+ */
+ public UpdateResult update(
+ String schemaName,
+ List<Integer> cacheIds,
+ String selectQry,
+ Object[] params,
+ boolean enforceJoinOrder,
+ int pageSize,
+ int timeoutMillis,
+ final int[] parts,
+ boolean isReplicatedOnly,
+ GridQueryCancel cancel
+ ) {
+ AffinityTopologyVersion topVer = h2.readyTopologyVersion();
+
+ NodesForPartitionsResult nodesParts = nodesForPartitions(cacheIds, topVer, parts, isReplicatedOnly);
+
+ final long reqId = qryIdGen.incrementAndGet();
+
+ final GridRunningQueryInfo qryInfo = new GridRunningQueryInfo(reqId, selectQry, GridCacheQueryType.SQL_FIELDS,
+ schemaName, U.currentTimeMillis(), cancel, false);
+
+ Collection<ClusterNode> nodes = nodesParts.nodes();
+
+ if (nodes == null)
+ throw new CacheException("Failed to determine nodes participating in the update. " +
+ "Explanation (Retry update once topology recovers).");
+
+ if (isReplicatedOnly) {
+ ClusterNode locNode = ctx.discovery().localNode();
+
+ if (nodes.contains(locNode))
+ nodes = singletonList(locNode);
+ else
+ nodes = singletonList(F.rand(nodes));
+ }
+
+ for (ClusterNode n : nodes) {
+ if (!n.version().greaterThanEqual(2, 3, 0)) {
+ log.warning("Server-side DML optimization is skipped because map node does not support it. " +
+ "Falling back to normal DML. [node=" + n.id() + ", v=" + n.version() + "].");
+
+ return null;
+ }
+ }
+
+ final DistributedUpdateRun r = new DistributedUpdateRun(nodes.size(), qryInfo);
+
+ int flags = enforceJoinOrder ? GridH2QueryRequest.FLAG_ENFORCE_JOIN_ORDER : 0;
+
+ if (isReplicatedOnly)
+ flags |= GridH2QueryRequest.FLAG_REPLICATED;
+
+ GridH2DmlRequest req = new GridH2DmlRequest()
+ .requestId(reqId)
+ .topologyVersion(topVer)
+ .caches(cacheIds)
+ .schemaName(schemaName)
+ .query(selectQry)
+ .pageSize(pageSize)
+ .parameters(params)
+ .timeout(timeoutMillis)
+ .flags(flags);
+
+ updRuns.put(reqId, r);
+
+ boolean release = false;
+
+ try {
+ Map<ClusterNode, IntArray> partsMap = (nodesParts.queryPartitionsMap() != null) ?
+ nodesParts.queryPartitionsMap() : nodesParts.partitionsMap();
+
+ ExplicitPartitionsSpecializer partsSpec = (parts == null) ? null :
+ new ExplicitPartitionsSpecializer(partsMap);
+
+ final Collection<ClusterNode> finalNodes = nodes;
+
+ cancel.set(new Runnable() {
+ @Override public void run() {
+ r.future().onCancelled();
+
+ send(finalNodes, new GridQueryCancelRequest(reqId), null, false);
+ }
+ });
+
+ // send() logs the debug message
+ if (send(nodes, req, partsSpec, false))
+ return r.future().get();
+
+ throw new CacheException("Failed to send update request to participating nodes.");
+ }
+ catch (IgniteCheckedException | RuntimeException e) {
+ release = true;
+
+ U.error(log, "Error during update [localNodeId=" + ctx.localNodeId() + "]", e);
+
+ throw new CacheException("Failed to run update. " + e.getMessage(), e);
+ }
+ finally {
+ if (release)
+ send(nodes, new GridQueryCancelRequest(reqId), null, false);
+
+ if (!updRuns.remove(reqId, r))
+ U.warn(log, "Update run was already removed: " + reqId);
+ }
+ }
+
+ /**
+ * Process response for DML request.
+ *
+ * @param node Node.
+ * @param msg Message.
+ */
+ private void onDmlResponse(final ClusterNode node, GridH2DmlResponse msg) {
+ try {
+ long reqId = msg.requestId();
+
+ DistributedUpdateRun r = updRuns.get(reqId);
+
+ if (r == null) {
+ U.warn(log, "Unexpected dml response (will ignore). [localNodeId=" + ctx.localNodeId() + ", nodeId=" +
+ node.id() + ", msg=" + msg.toString() + ']');
+
+ return;
+ }
+
+ r.handleResponse(node.id(), msg);
+ }
+ catch (Exception e) {
+ U.error(log, "Error in dml response processing. [localNodeId=" + ctx.localNodeId() + ", nodeId=" +
+ node.id() + ", msg=" + msg.toString() + ']', e);
+ }
+ }
+
+ /**
* @param cacheIds Cache IDs.
* @return The first partitioned cache context.
*/
@@ -1309,6 +1456,44 @@ public class GridReduceQueryExecutor {
}
/**
+ * Evaluates nodes and nodes to partitions map given a list of cache ids, topology version and partitions.
+ *
+ * @param cacheIds Cache ids.
+ * @param topVer Topology version.
+ * @param parts Partitions array.
+ * @param isReplicatedOnly Allow only replicated caches.
+ * @return Result.
+ */
+ private NodesForPartitionsResult nodesForPartitions(List<Integer> cacheIds, AffinityTopologyVersion topVer,
+ int[] parts, boolean isReplicatedOnly) {
+ Collection<ClusterNode> nodes = null;
+ Map<ClusterNode, IntArray> partsMap = null;
+ Map<ClusterNode, IntArray> qryMap = null;
+
+ if (isPreloadingActive(cacheIds)) {
+ if (isReplicatedOnly)
+ nodes = replicatedUnstableDataNodes(cacheIds);
+ else {
+ partsMap = partitionedUnstableDataNodes(cacheIds);
+
+ if (partsMap != null) {
+ qryMap = narrowForQuery(partsMap, parts);
+
+ nodes = qryMap == null ? null : qryMap.keySet();
+ }
+ }
+ }
+ else {
+ qryMap = stableDataNodes(isReplicatedOnly, topVer, cacheIds, parts);
+
+ if (qryMap != null)
+ nodes = qryMap.keySet();
+ }
+
+ return new NodesForPartitionsResult(nodes, partsMap, qryMap);
+ }
+
+ /**
* @param conn Connection.
* @param qry Query.
* @param explain Explain.
@@ -1403,6 +1588,9 @@ public class GridReduceQueryExecutor {
for (Map.Entry<Long, ReduceQueryRun> e : runs.entrySet())
e.getValue().disconnected(err);
+
+ for (DistributedUpdateRun r: updRuns.values())
+ r.handleDisconnect(err);
}
/**
@@ -1421,6 +1609,11 @@ public class GridReduceQueryExecutor {
res.add(run.queryInfo());
}
+ for (DistributedUpdateRun upd: updRuns.values()) {
+ if (upd.queryInfo().longQuery(curTime, duration))
+ res.add(upd.queryInfo());
+ }
+
return res;
}
@@ -1435,6 +1628,12 @@ public class GridReduceQueryExecutor {
if (run != null)
run.queryInfo().cancel();
+ else {
+ DistributedUpdateRun upd = updRuns.get(qryId);
+
+ if (upd != null)
+ upd.queryInfo().cancel();
+ }
}
}
@@ -1478,11 +1677,64 @@ public class GridReduceQueryExecutor {
/** {@inheritDoc} */
@Override public Message apply(ClusterNode node, Message msg) {
- GridH2QueryRequest rq = new GridH2QueryRequest((GridH2QueryRequest)msg);
+ if (msg instanceof GridH2QueryRequest) {
+ GridH2QueryRequest rq = new GridH2QueryRequest((GridH2QueryRequest)msg);
+
+ rq.queryPartitions(toArray(partsMap.get(node)));
+
+ return rq;
+ } else if (msg instanceof GridH2DmlRequest) {
+ GridH2DmlRequest rq = new GridH2DmlRequest((GridH2DmlRequest)msg);
+
+ rq.queryPartitions(toArray(partsMap.get(node)));
+
+ return rq;
+ }
+
+ return msg;
+ }
+ }
+
+ /**
+ * Result of nodes to partitions mapping for a query or update.
+ */
+ static class NodesForPartitionsResult {
+ /** */
+ final Collection<ClusterNode> nodes;
- rq.queryPartitions(toArray(partsMap.get(node)));
+ /** */
+ final Map<ClusterNode, IntArray> partsMap;
- return rq;
+ /** */
+ final Map<ClusterNode, IntArray> qryMap;
+
+ /** */
+ NodesForPartitionsResult(Collection<ClusterNode> nodes, Map<ClusterNode, IntArray> partsMap,
+ Map<ClusterNode, IntArray> qryMap) {
+ this.nodes = nodes;
+ this.partsMap = partsMap;
+ this.qryMap = qryMap;
+ }
+
+ /**
+ * @return Collection of nodes a message shall be sent to.
+ */
+ Collection<ClusterNode> nodes() {
+ return nodes;
+ }
+
+ /**
+ * @return Maps a node to partition array.
+ */
+ Map<ClusterNode, IntArray> partitionsMap() {
+ return partsMap;
+ }
+
+ /**
+ * @return Maps a node to partition array.
+ */
+ Map<ClusterNode, IntArray> queryPartitionsMap() {
+ return qryMap;
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapNodeResults.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapNodeResults.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapNodeResults.java
index 2d20c8d..c0637ea 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapNodeResults.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapNodeResults.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.query.h2.twostep;
+import org.apache.ignite.internal.processors.query.GridQueryCancel;
import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashMap;
import org.jsr166.ConcurrentHashMap8;
@@ -32,6 +33,9 @@ class MapNodeResults {
/** */
private final ConcurrentMap<MapRequestKey, MapQueryResults> res = new ConcurrentHashMap8<>();
+ /** Cancel state for update requests. */
+ private final ConcurrentMap<Long, GridQueryCancel> updCancels = new ConcurrentHashMap8<>();
+
/** */
private final GridBoundedConcurrentLinkedHashMap<Long, Boolean> qryHist =
new GridBoundedConcurrentLinkedHashMap<>(1024, 1024, 0.75f, 64, PER_SEGMENT_Q);
@@ -88,6 +92,12 @@ class MapNodeResults {
removed.cancel(true);
}
}
+
+ // Cancel update request
+ GridQueryCancel updCancel = updCancels.remove(reqId);
+
+ if (updCancel != null)
+ updCancel.cancel();
}
/**
@@ -111,11 +121,34 @@ class MapNodeResults {
}
/**
+ * @param reqId Request id.
+ * @return Cancel state.
+ */
+ public GridQueryCancel putUpdate(long reqId) {
+ GridQueryCancel cancel = new GridQueryCancel();
+
+ updCancels.put(reqId, cancel);
+
+ return cancel;
+ }
+
+ /**
+ * @param reqId Request id.
+ */
+ public void removeUpdate(long reqId) {
+ updCancels.remove(reqId);
+ }
+
+ /**
* Cancel all node queries.
*/
public void cancelAll() {
for (MapQueryResults ress : res.values())
ress.cancel(true);
+
+ // Cancel update requests
+ for (GridQueryCancel upd: updCancels.values())
+ upd.cancel();
}
}
[3/7] ignite git commit: IGNITE-6024: SQL: Implemented
"skipReducerOnUpdate" flag. This closes #2488.
Posted by sb...@apache.org.
http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/platforms/cpp/odbc/os/win/src/system/ui/dsn_configuration_window.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc/os/win/src/system/ui/dsn_configuration_window.cpp b/modules/platforms/cpp/odbc/os/win/src/system/ui/dsn_configuration_window.cpp
index 9b13481..d5aa0db 100644
--- a/modules/platforms/cpp/odbc/os/win/src/system/ui/dsn_configuration_window.cpp
+++ b/modules/platforms/cpp/odbc/os/win/src/system/ui/dsn_configuration_window.cpp
@@ -180,6 +180,12 @@ namespace ignite
lazyCheckBox->SetEnabled(version >= ProtocolVersion::VERSION_2_1_5);
+ skipReducerOnUpdateCheckBox = CreateCheckBox(editPosX + checkBoxSize + interval, rowPos,
+ checkBoxSize, rowSize, "Skip reducer on update", ChildId::SKIP_REDUCER_ON_UPDATE_CHECK_BOX,
+ config.IsSkipReducerOnUpdate());
+
+ skipReducerOnUpdateCheckBox->SetEnabled(version >= ProtocolVersion::VERSION_2_3_0);
+
rowPos += interval * 2 + rowSize;
connectionSettingsGroupBox = CreateGroupBox(margin, sectionBegin, width - 2 * margin,
@@ -264,6 +270,13 @@ namespace ignite
break;
}
+ case ChildId::SKIP_REDUCER_ON_UPDATE_CHECK_BOX:
+ {
+ skipReducerOnUpdateCheckBox->SetChecked(!skipReducerOnUpdateCheckBox->IsChecked());
+
+ break;
+ }
+
case ChildId::PROTOCOL_VERSION_COMBO_BOX:
{
std::string versionStr;
@@ -271,6 +284,7 @@ namespace ignite
ProtocolVersion version = ProtocolVersion::FromString(versionStr);
lazyCheckBox->SetEnabled(version >= ProtocolVersion::VERSION_2_1_5);
+ skipReducerOnUpdateCheckBox->SetEnabled(version >= ProtocolVersion::VERSION_2_3_0);
break;
}
@@ -309,6 +323,7 @@ namespace ignite
bool replicatedOnly;
bool collocated;
bool lazy;
+ bool skipReducerOnUpdate;
nameEdit->GetText(dsn);
addressEdit->GetText(address);
@@ -330,6 +345,9 @@ namespace ignite
collocated = collocatedCheckBox->IsEnabled() && collocatedCheckBox->IsChecked();
lazy = lazyCheckBox->IsEnabled() && lazyCheckBox->IsChecked();
+ skipReducerOnUpdate =
+ skipReducerOnUpdateCheckBox->IsEnabled() && skipReducerOnUpdateCheckBox->IsChecked();
+
LOG_MSG("Retriving arguments:");
LOG_MSG("DSN: " << dsn);
LOG_MSG("Address: " << address);
@@ -341,6 +359,7 @@ namespace ignite
LOG_MSG("Replicated only: " << (replicatedOnly ? "true" : "false"));
LOG_MSG("Collocated: " << (collocated ? "true" : "false"));
LOG_MSG("Lazy: " << (lazy ? "true" : "false"));
+ LOG_MSG("Skip reducer on update: " << (skipReducerOnUpdate ? "true" : "false"));
if (dsn.empty())
throw IgniteError(IgniteError::IGNITE_ERR_GENERIC, "DSN name can not be empty.");
@@ -355,6 +374,7 @@ namespace ignite
cfg.SetReplicatedOnly(replicatedOnly);
cfg.SetCollocated(collocated);
cfg.SetLazy(lazy);
+ cfg.SetSkipReducerOnUpdate(skipReducerOnUpdate);
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/platforms/cpp/odbc/src/config/configuration.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc/src/config/configuration.cpp b/modules/platforms/cpp/odbc/src/config/configuration.cpp
index 95ed964..be5a781 100644
--- a/modules/platforms/cpp/odbc/src/config/configuration.cpp
+++ b/modules/platforms/cpp/odbc/src/config/configuration.cpp
@@ -32,34 +32,36 @@ namespace ignite
{
namespace config
{
- const std::string Configuration::Key::dsn = "dsn";
- const std::string Configuration::Key::driver = "driver";
- const std::string Configuration::Key::schema = "schema";
- const std::string Configuration::Key::address = "address";
- const std::string Configuration::Key::server = "server";
- const std::string Configuration::Key::port = "port";
- const std::string Configuration::Key::distributedJoins = "distributed_joins";
- const std::string Configuration::Key::enforceJoinOrder = "enforce_join_order";
- const std::string Configuration::Key::protocolVersion = "protocol_version";
- const std::string Configuration::Key::pageSize = "page_size";
- const std::string Configuration::Key::replicatedOnly = "replicated_only";
- const std::string Configuration::Key::collocated = "collocated";
- const std::string Configuration::Key::lazy = "lazy";
-
- const std::string Configuration::DefaultValue::dsn = "Apache Ignite DSN";
- const std::string Configuration::DefaultValue::driver = "Apache Ignite";
- const std::string Configuration::DefaultValue::schema = "PUBLIC";
- const std::string Configuration::DefaultValue::address = "";
- const std::string Configuration::DefaultValue::server = "";
+ const std::string Configuration::Key::dsn = "dsn";
+ const std::string Configuration::Key::driver = "driver";
+ const std::string Configuration::Key::schema = "schema";
+ const std::string Configuration::Key::address = "address";
+ const std::string Configuration::Key::server = "server";
+ const std::string Configuration::Key::port = "port";
+ const std::string Configuration::Key::distributedJoins = "distributed_joins";
+ const std::string Configuration::Key::enforceJoinOrder = "enforce_join_order";
+ const std::string Configuration::Key::protocolVersion = "protocol_version";
+ const std::string Configuration::Key::pageSize = "page_size";
+ const std::string Configuration::Key::replicatedOnly = "replicated_only";
+ const std::string Configuration::Key::collocated = "collocated";
+ const std::string Configuration::Key::lazy = "lazy";
+ const std::string Configuration::Key::skipReducerOnUpdate = "skip_reducer_on_update";
+
+ const std::string Configuration::DefaultValue::dsn = "Apache Ignite DSN";
+ const std::string Configuration::DefaultValue::driver = "Apache Ignite";
+ const std::string Configuration::DefaultValue::schema = "PUBLIC";
+ const std::string Configuration::DefaultValue::address = "";
+ const std::string Configuration::DefaultValue::server = "";
const uint16_t Configuration::DefaultValue::port = 10800;
const int32_t Configuration::DefaultValue::pageSize = 1024;
- const bool Configuration::DefaultValue::distributedJoins = false;
- const bool Configuration::DefaultValue::enforceJoinOrder = false;
- const bool Configuration::DefaultValue::replicatedOnly = false;
- const bool Configuration::DefaultValue::collocated = false;
- const bool Configuration::DefaultValue::lazy = false;
+ const bool Configuration::DefaultValue::distributedJoins = false;
+ const bool Configuration::DefaultValue::enforceJoinOrder = false;
+ const bool Configuration::DefaultValue::replicatedOnly = false;
+ const bool Configuration::DefaultValue::collocated = false;
+ const bool Configuration::DefaultValue::lazy = false;
+ const bool Configuration::DefaultValue::skipReducerOnUpdate = false;
const ProtocolVersion& Configuration::DefaultValue::protocolVersion = ProtocolVersion::GetCurrent();
http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/platforms/cpp/odbc/src/connection.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc/src/connection.cpp b/modules/platforms/cpp/odbc/src/connection.cpp
index 161e1c4..8f4bf14 100644
--- a/modules/platforms/cpp/odbc/src/connection.cpp
+++ b/modules/platforms/cpp/odbc/src/connection.cpp
@@ -417,6 +417,7 @@ namespace ignite
bool replicatedOnly = false;
bool collocated = false;
bool lazy = false;
+ bool skipReducerOnUpdate = false;
ProtocolVersion protocolVersion;
try
@@ -427,6 +428,7 @@ namespace ignite
replicatedOnly = config.IsReplicatedOnly();
collocated = config.IsCollocated();
lazy = config.IsLazy();
+ skipReducerOnUpdate = config.IsSkipReducerOnUpdate();
}
catch (const IgniteError& err)
{
@@ -443,7 +445,8 @@ namespace ignite
return SqlResult::AI_ERROR;
}
- HandshakeRequest req(protocolVersion, distributedJoins, enforceJoinOrder, replicatedOnly, collocated, lazy);
+ HandshakeRequest req(protocolVersion, distributedJoins, enforceJoinOrder, replicatedOnly, collocated, lazy,
+ skipReducerOnUpdate);
HandshakeResponse rsp;
try
http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/platforms/cpp/odbc/src/dsn_config.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc/src/dsn_config.cpp b/modules/platforms/cpp/odbc/src/dsn_config.cpp
index c91cd8c..536f679 100644
--- a/modules/platforms/cpp/odbc/src/dsn_config.cpp
+++ b/modules/platforms/cpp/odbc/src/dsn_config.cpp
@@ -108,6 +108,9 @@ namespace ignite
bool lazy = ReadDsnBool(dsn, Configuration::Key::lazy, config.IsLazy());
+ bool skipReducerOnUpdate =
+ ReadDsnBool(dsn, Configuration::Key::skipReducerOnUpdate, config.IsSkipReducerOnUpdate());
+
std::string version = ReadDsnString(dsn, Configuration::Key::protocolVersion,
config.GetProtocolVersion().ToString().c_str());
@@ -125,6 +128,7 @@ namespace ignite
config.SetReplicatedOnly(replicatedOnly);
config.SetCollocated(collocated);
config.SetLazy(lazy);
+ config.SetSkipReducerOnUpdate(skipReducerOnUpdate);
config.SetProtocolVersion(version);
config.SetPageSize(pageSize);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/platforms/cpp/odbc/src/message.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc/src/message.cpp b/modules/platforms/cpp/odbc/src/message.cpp
index 3601591..4767c74 100644
--- a/modules/platforms/cpp/odbc/src/message.cpp
+++ b/modules/platforms/cpp/odbc/src/message.cpp
@@ -23,13 +23,14 @@ namespace ignite
namespace odbc
{
HandshakeRequest::HandshakeRequest(const ProtocolVersion& version, bool distributedJoins,
- bool enforceJoinOrder, bool replicatedOnly, bool collocated, bool lazy):
+ bool enforceJoinOrder, bool replicatedOnly, bool collocated, bool lazy, bool skipReducerOnUpdate):
version(version),
distributedJoins(distributedJoins),
enforceJoinOrder(enforceJoinOrder),
replicatedOnly(replicatedOnly),
collocated(collocated),
- lazy(lazy)
+ lazy(lazy),
+ skipReducerOnUpdate(skipReducerOnUpdate)
{
// No-op.
}
@@ -53,7 +54,12 @@ namespace ignite
writer.WriteBool(enforceJoinOrder);
writer.WriteBool(replicatedOnly);
writer.WriteBool(collocated);
- writer.WriteBool(lazy);
+
+ if (version >= ProtocolVersion::VERSION_2_1_5)
+ writer.WriteBool(lazy);
+
+ if (version >= ProtocolVersion::VERSION_2_3_0)
+ writer.WriteBool(skipReducerOnUpdate);
}
QueryExecuteRequest::QueryExecuteRequest(const std::string& schema, const std::string& sql,
http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/platforms/cpp/odbc/src/protocol_version.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc/src/protocol_version.cpp b/modules/platforms/cpp/odbc/src/protocol_version.cpp
index b668fb8..b0b9121 100644
--- a/modules/platforms/cpp/odbc/src/protocol_version.cpp
+++ b/modules/platforms/cpp/odbc/src/protocol_version.cpp
@@ -28,10 +28,12 @@ namespace ignite
{
const ProtocolVersion ProtocolVersion::VERSION_2_1_0(2, 1, 0);
const ProtocolVersion ProtocolVersion::VERSION_2_1_5(2, 1, 5);
+ const ProtocolVersion ProtocolVersion::VERSION_2_3_0(2, 3, 0);
ProtocolVersion::VersionSet::value_type supportedArray[] = {
ProtocolVersion::VERSION_2_1_0,
- ProtocolVersion::VERSION_2_1_5
+ ProtocolVersion::VERSION_2_1_5,
+ ProtocolVersion::VERSION_2_3_0,
};
const ProtocolVersion::VersionSet ProtocolVersion::supported(supportedArray,
@@ -60,7 +62,7 @@ namespace ignite
const ProtocolVersion& ProtocolVersion::GetCurrent()
{
- return VERSION_2_1_5;
+ return VERSION_2_3_0;
}
void ThrowParseError()
[7/7] ignite git commit: Merge remote-tracking branch
'remotes/origin/master' into ignite-3478
Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/origin/master' into ignite-3478
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4c06131b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4c06131b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4c06131b
Branch: refs/heads/ignite-3478
Commit: 4c06131bda84bb92a770befe872807c3680b2046
Parents: f29d4bc ae02a1d
Author: sboikov <sb...@gridgain.com>
Authored: Fri Oct 13 13:21:47 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Oct 13 13:21:47 2017 +0300
----------------------------------------------------------------------
.../internal/jdbc2/JdbcConnectionSelfTest.java | 13 +-
.../jdbc/suite/IgniteJdbcDriverTestSuite.java | 11 +
.../JdbcThinAbstractDmlStatementSelfTest.java | 14 +-
.../thin/JdbcThinComplexDmlDdlSelfTest.java | 10 +-
...omplexDmlDdlSkipReducerOnUpdateSelfTest.java | 33 +
.../jdbc/thin/JdbcThinConnectionSelfTest.java | 18 +-
.../thin/JdbcThinInsertStatementSelfTest.java | 1 -
...ertStatementSkipReducerOnUpdateSelfTest.java | 33 +
...rgeStatementSkipReducerOnUpdateSelfTest.java | 33 +
...ateStatementSkipReducerOnUpdateSelfTest.java | 33 +
.../ignite/codegen/MessageCodeGenerator.java | 2 +
.../org/apache/ignite/IgniteJdbcDriver.java | 9 +-
.../org/apache/ignite/IgniteJdbcThinDriver.java | 3 +-
.../ignite/cache/query/SqlFieldsQuery.java | 7 +
.../internal/jdbc/thin/JdbcThinConnection.java | 4 +-
.../internal/jdbc/thin/JdbcThinTcpIo.java | 15 +-
.../internal/jdbc/thin/JdbcThinUtils.java | 6 +
.../internal/jdbc2/JdbcBatchUpdateTask.java | 3 +-
.../ignite/internal/jdbc2/JdbcConnection.java | 14 +-
.../jdbc2/JdbcQueryMultipleStatementsTask.java | 3 +-
.../ignite/internal/jdbc2/JdbcQueryTask.java | 10 +-
.../ignite/internal/jdbc2/JdbcQueryTaskV3.java | 19 +-
.../ignite/internal/jdbc2/JdbcResultSet.java | 2 +-
.../internal/jdbc2/JdbcSqlFieldsQuery.java | 105 ---
.../ignite/internal/jdbc2/JdbcStatement.java | 4 +-
.../cache/query/GridCacheSqlQuery.java | 24 +
.../cache/query/SqlFieldsQueryEx.java | 158 ++++
.../odbc/jdbc/JdbcConnectionContext.java | 7 +-
.../odbc/jdbc/JdbcRequestHandler.java | 19 +-
.../odbc/odbc/OdbcConnectionContext.java | 13 +-
.../odbc/odbc/OdbcRequestHandler.java | 14 +-
.../client/cache/ClientCacheRequest.java | 8 +-
.../resources/META-INF/classnames.properties | 4 +-
.../query/h2/DmlStatementsProcessor.java | 160 ++--
.../processors/query/h2/H2DmlPlanKey.java | 21 +-
.../processors/query/h2/IgniteH2Indexing.java | 116 ++-
.../processors/query/h2/UpdateResult.java | 63 ++
.../processors/query/h2/dml/UpdatePlan.java | 64 +-
.../query/h2/dml/UpdatePlanBuilder.java | 117 ++-
.../query/h2/sql/GridSqlQuerySplitter.java | 33 +
.../query/h2/twostep/DistributedUpdateRun.java | 133 ++++
.../query/h2/twostep/GridMapQueryExecutor.java | 136 ++++
.../h2/twostep/GridReduceQueryExecutor.java | 294 ++++++-
.../query/h2/twostep/MapNodeResults.java | 33 +
.../query/h2/twostep/msg/GridH2DmlRequest.java | 516 ++++++++++++
.../query/h2/twostep/msg/GridH2DmlResponse.java | 250 ++++++
.../twostep/msg/GridH2ValueMessageFactory.java | 6 +
...teSqlSkipReducerOnUpdateDmlFlagSelfTest.java | 783 +++++++++++++++++++
...IgniteSqlSkipReducerOnUpdateDmlSelfTest.java | 755 ++++++++++++++++++
.../IgniteCacheQuerySelfTestSuite.java | 4 +
.../cpp/odbc-test/src/configuration_test.cpp | 25 +-
.../cpp/odbc-test/src/queries_test.cpp | 8 +
.../include/ignite/odbc/config/configuration.h | 26 +
.../cpp/odbc/include/ignite/odbc/message.h | 6 +-
.../odbc/include/ignite/odbc/protocol_version.h | 1 +
.../odbc/system/ui/dsn_configuration_window.h | 4 +
.../src/system/ui/dsn_configuration_window.cpp | 20 +
.../cpp/odbc/src/config/configuration.cpp | 50 +-
modules/platforms/cpp/odbc/src/connection.cpp | 5 +-
modules/platforms/cpp/odbc/src/dsn_config.cpp | 4 +
modules/platforms/cpp/odbc/src/message.cpp | 12 +-
.../platforms/cpp/odbc/src/protocol_version.cpp | 6 +-
.../Examples/Example.cs | 6 +-
.../Examples/ExamplesTest.cs | 42 +-
.../Apache.Ignite.Examples.csproj | 2 +
.../examples/Apache.Ignite.Examples/App.config | 4 +
.../ThinClient/ThinClientPutGetExample.cs | 93 +++
.../ThinClient/ThinClientQueryExample.cs | 147 ++++
.../agent/handlers/AbstractListener.java | 6 +-
69 files changed, 4287 insertions(+), 316 deletions(-)
----------------------------------------------------------------------
[2/7] ignite git commit: IGNITE-6371 .NET: Thin client example
Posted by sb...@apache.org.
IGNITE-6371 .NET: Thin client example
This closes #2830
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5ec744cf
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5ec744cf
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5ec744cf
Branch: refs/heads/ignite-3478
Commit: 5ec744cf7fb0db0658f91176bf98dfe5ccb05be2
Parents: 0f3f7d2
Author: Pavel Tupitsyn <pt...@apache.org>
Authored: Fri Oct 13 12:25:34 2017 +0300
Committer: Pavel Tupitsyn <pt...@apache.org>
Committed: Fri Oct 13 12:25:34 2017 +0300
----------------------------------------------------------------------
.../client/cache/ClientCacheRequest.java | 8 +-
.../Examples/Example.cs | 6 +-
.../Examples/ExamplesTest.cs | 42 ++++--
.../Apache.Ignite.Examples.csproj | 2 +
.../examples/Apache.Ignite.Examples/App.config | 4 +
.../ThinClient/ThinClientPutGetExample.cs | 93 ++++++++++++
.../ThinClient/ThinClientQueryExample.cs | 147 +++++++++++++++++++
7 files changed, 283 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/5ec744cf/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheRequest.java
index 1aaa22c..b290a5b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheRequest.java
@@ -19,7 +19,7 @@ package org.apache.ignite.internal.processors.platform.client.cache;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.binary.BinaryRawReader;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
import org.apache.ignite.internal.processors.platform.client.ClientRequest;
import org.apache.ignite.internal.processors.platform.client.ClientStatus;
@@ -77,13 +77,13 @@ class ClientCacheRequest extends ClientRequest {
* @return Cache.
*/
protected IgniteCache rawCache(ClientConnectionContext ctx) {
- GridCacheContext<Object, Object> cacheCtx = ctx.kernalContext().cache().context().cacheContext(cacheId);
+ DynamicCacheDescriptor cacheDesc = ctx.kernalContext().cache().cacheDescriptor(cacheId);
- if (cacheCtx == null)
+ if (cacheDesc == null)
throw new IgniteClientException(ClientStatus.CACHE_DOES_NOT_EXIST, "Cache does not exist [cacheId= " +
cacheId + "]", null);
- String cacheName = cacheCtx.cache().name();
+ String cacheName = cacheDesc.cacheName();
return ctx.kernalContext().grid().cache(cacheName);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5ec744cf/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Examples/Example.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Examples/Example.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Examples/Example.cs
index 9c3625f..4d1eeb6 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Examples/Example.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Examples/Example.cs
@@ -40,7 +40,7 @@ namespace Apache.Ignite.Core.Tests.Examples
public bool NeedsTestDll { get; private set; }
/** Name */
- public string Name { get; private set; }
+ public Type ExampleType { get; private set; }
/// <summary>
/// Runs this example.
@@ -92,7 +92,7 @@ namespace Apache.Ignite.Core.Tests.Examples
ConfigPath = GetConfigPath(sourceCode),
NeedsTestDll = sourceCode.Contains("-assembly="),
_runAction = GetRunAction(type),
- Name = type.Name
+ ExampleType = type
};
}
}
@@ -119,7 +119,7 @@ namespace Apache.Ignite.Core.Tests.Examples
public override string ToString()
{
// This will be displayed in TeamCity and R# test runner
- return Name;
+ return ExampleType.Name;
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/5ec744cf/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Examples/ExamplesTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Examples/ExamplesTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Examples/ExamplesTest.cs
index edc95fa..48e471f 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Examples/ExamplesTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Examples/ExamplesTest.cs
@@ -20,9 +20,15 @@ namespace Apache.Ignite.Core.Tests.Examples
extern alias ExamplesDll;
using System;
using System.Collections.Generic;
+ using System.Configuration;
using System.IO;
using System.Linq;
using Apache.Ignite.Core.Tests.Process;
+ using Apache.Ignite.Examples.Compute;
+ using Apache.Ignite.Examples.Datagrid;
+ using Apache.Ignite.Examples.Messaging;
+ using Apache.Ignite.Examples.Misc;
+ using Apache.Ignite.Examples.ThinClient;
using NUnit.Framework;
/// <summary>
@@ -35,21 +41,23 @@ namespace Apache.Ignite.Core.Tests.Examples
private static readonly Example[] AllExamples = Example.GetExamples().ToArray();
/** */
- private static readonly string[] LocalOnlyExamples =
+ private static readonly Type[] LocalOnlyExamples =
{
- "LifecycleExample", "ClientReconnectExample", "MultiTieredCacheExample"
+ typeof(LifecycleExample), typeof(ClientReconnectExample), typeof(MultiTieredCacheExample)
};
/** */
- private static readonly string[] RemoteOnlyExamples =
+ private static readonly Type[] RemoteOnlyExamples =
{
- "PeerAssemblyLoadingExample", "MessagingExample", "NearCacheExample"
+ typeof(PeerAssemblyLoadingExample), typeof(MessagingExample), typeof(NearCacheExample),
+ typeof(ThinClientPutGetExample), typeof(ThinClientQueryExample)
};
/** */
- private static readonly string[] NoDllExamples =
+ private static readonly Type[] NoDllExamples =
{
- "BinaryModeExample", "NearCacheExample", "PeerAssemblyLoadingExample"
+ typeof(BinaryModeExample), typeof(NearCacheExample), typeof(PeerAssemblyLoadingExample),
+ typeof(ThinClientPutGetExample)
};
/** Config file path. */
@@ -70,7 +78,7 @@ namespace Apache.Ignite.Core.Tests.Examples
{
StopRemoteNodes();
- if (LocalOnlyExamples.Contains(example.Name))
+ if (LocalOnlyExamples.Contains(example.ExampleType))
{
Assert.IsFalse(example.NeedsTestDll, "Local-only example should not mention test dll.");
Assert.IsNull(example.ConfigPath, "Local-only example should not mention app.config path.");
@@ -109,7 +117,7 @@ namespace Apache.Ignite.Core.Tests.Examples
Assert.IsTrue(PathUtil.ExamplesAppConfigPath.EndsWith(example.ConfigPath,
StringComparison.OrdinalIgnoreCase), "All examples should use the same app.config.");
- Assert.IsTrue(example.NeedsTestDll || NoDllExamples.Contains(example.Name),
+ Assert.IsTrue(example.NeedsTestDll || NoDllExamples.Contains(example.ExampleType),
"Examples that allow standalone nodes should mention test dll.");
StartRemoteNodes();
@@ -133,8 +141,18 @@ namespace Apache.Ignite.Core.Tests.Examples
// Stop it after topology check so we don't interfere with example.
Ignition.ClientMode = false;
- using (var ignite = Ignition.StartFromApplicationConfiguration(
- "igniteConfiguration", _configPath))
+ var fileMap = new ExeConfigurationFileMap { ExeConfigFilename = _configPath };
+ var config = ConfigurationManager.OpenMappedExeConfiguration(fileMap, ConfigurationUserLevel.None);
+ var section = (IgniteConfigurationSection) config.GetSection("igniteConfiguration");
+
+ // Disable client connector so that temporary node does not occupy the port.
+ var cfg = new IgniteConfiguration(section.IgniteConfiguration)
+ {
+ ClientConnectorConfigurationEnabled = false,
+ CacheConfiguration = null
+ };
+
+ using (var ignite = Ignition.Start(cfg))
{
var args = new List<string>
{
@@ -219,7 +237,7 @@ namespace Apache.Ignite.Core.Tests.Examples
// ReSharper disable once MemberCanBeMadeStatic.Global
public IEnumerable<Example> TestCasesLocal
{
- get { return AllExamples.Where(x => !RemoteOnlyExamples.Contains(x.Name)); }
+ get { return AllExamples.Where(x => !RemoteOnlyExamples.Contains(x.ExampleType)); }
}
/// <summary>
@@ -231,7 +249,7 @@ namespace Apache.Ignite.Core.Tests.Examples
{
get
{
- return AllExamples.Where(x => !LocalOnlyExamples.Contains(x.Name));
+ return AllExamples.Where(x => !LocalOnlyExamples.Contains(x.ExampleType));
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5ec744cf/modules/platforms/dotnet/examples/Apache.Ignite.Examples/Apache.Ignite.Examples.csproj
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/examples/Apache.Ignite.Examples/Apache.Ignite.Examples.csproj b/modules/platforms/dotnet/examples/Apache.Ignite.Examples/Apache.Ignite.Examples.csproj
index 3206457..0aaa249 100644
--- a/modules/platforms/dotnet/examples/Apache.Ignite.Examples/Apache.Ignite.Examples.csproj
+++ b/modules/platforms/dotnet/examples/Apache.Ignite.Examples/Apache.Ignite.Examples.csproj
@@ -52,6 +52,8 @@
<Reference Include="System.Transactions" />
</ItemGroup>
<ItemGroup>
+ <Compile Include="ThinClient\ThinClientPutGetExample.cs" />
+ <Compile Include="ThinClient\ThinClientQueryExample.cs" />
<Compile Include="Compute\ClosureExample.cs" />
<Compile Include="Compute\TaskExample.cs" />
<Compile Include="Compute\PeerAssemblyLoadingExample.cs" />
http://git-wip-us.apache.org/repos/asf/ignite/blob/5ec744cf/modules/platforms/dotnet/examples/Apache.Ignite.Examples/App.config
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/examples/Apache.Ignite.Examples/App.config b/modules/platforms/dotnet/examples/Apache.Ignite.Examples/App.config
index 8f78382..1249ed8 100644
--- a/modules/platforms/dotnet/examples/Apache.Ignite.Examples/App.config
+++ b/modules/platforms/dotnet/examples/Apache.Ignite.Examples/App.config
@@ -30,6 +30,10 @@
localhost="127.0.0.1" peerAssemblyLoadingMode="CurrentAppDomain">
<atomicConfiguration atomicSequenceReserveSize="10" />
+ <cacheConfiguration>
+ <cacheConfiguration name="default-cache" />
+ </cacheConfiguration>
+
<discoverySpi type="TcpDiscoverySpi">
<ipFinder type="TcpDiscoveryMulticastIpFinder">
<endpoints>
http://git-wip-us.apache.org/repos/asf/ignite/blob/5ec744cf/modules/platforms/dotnet/examples/Apache.Ignite.Examples/ThinClient/ThinClientPutGetExample.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/examples/Apache.Ignite.Examples/ThinClient/ThinClientPutGetExample.cs b/modules/platforms/dotnet/examples/Apache.Ignite.Examples/ThinClient/ThinClientPutGetExample.cs
new file mode 100644
index 0000000..9158b09
--- /dev/null
+++ b/modules/platforms/dotnet/examples/Apache.Ignite.Examples/ThinClient/ThinClientPutGetExample.cs
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Examples.ThinClient
+{
+ using System;
+ using Apache.Ignite.Core;
+ using Apache.Ignite.Core.Client;
+ using Apache.Ignite.Core.Client.Cache;
+ using Apache.Ignite.ExamplesDll.Binary;
+
+ /// <summary>
+ /// Demonstrates Ignite.NET "thin" client cache operations.
+ /// <para />
+ /// 1) Set this class as startup object (Apache.Ignite.Examples project -> right-click -> Properties ->
+ /// Application -> Startup object);
+ /// 2) Start example (F5 or Ctrl+F5).
+ /// <para />
+ /// This example must be run with standalone Apache Ignite node:
+ /// 1) Run %IGNITE_HOME%/platforms/dotnet/bin/Apache.Ignite.exe:
+ /// Apache.Ignite.exe -configFileName=platforms\dotnet\examples\apache.ignite.examples\app.config
+ /// 2) Start example.
+ /// <para />
+ /// This example can also work via pure Java node started with ignite.bat/ignite.sh.
+ /// The only requirement is that the nodes have to create the cache named "default-cache" in advance.
+ /// </summary>
+ public static class ThinClientPutGetExample
+ {
+ /// <summary> Cache name. </summary>
+ private const string CacheName = "default-cache";
+
+ [STAThread]
+ public static void Main()
+ {
+ var cfg = new IgniteClientConfiguration
+ {
+ Host = "127.0.0.1"
+ };
+
+ using (IIgniteClient igniteClient = Ignition.StartClient(cfg))
+ {
+ Console.WriteLine();
+ Console.WriteLine(">>> Cache put-get client example started.");
+
+ ICacheClient<int, Organization> cache = igniteClient.GetCache<int, Organization>(CacheName);
+
+ PutGet(cache);
+ }
+
+ Console.WriteLine();
+ Console.WriteLine(">>> Example finished, press any key to exit ...");
+ Console.ReadKey();
+ }
+
+ /// <summary>
+ /// Execute individual Put and Get.
+ /// </summary>
+ /// <param name="cache">Cache instance.</param>
+ private static void PutGet(ICacheClient<int, Organization> cache)
+ {
+ // Create new Organization to store in cache.
+ Organization org = new Organization(
+ "Microsoft",
+ new Address("1096 Eddy Street, San Francisco, CA", 94109),
+ OrganizationType.Private,
+ DateTime.Now
+ );
+
+ // Put created data entry to cache.
+ cache.Put(1, org);
+
+ // Get recently created employee as a strongly-typed fully de-serialized instance.
+ Organization orgFromCache = cache.Get(1);
+
+ Console.WriteLine();
+ Console.WriteLine(">>> Retrieved organization instance from cache: " + orgFromCache);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5ec744cf/modules/platforms/dotnet/examples/Apache.Ignite.Examples/ThinClient/ThinClientQueryExample.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/examples/Apache.Ignite.Examples/ThinClient/ThinClientQueryExample.cs b/modules/platforms/dotnet/examples/Apache.Ignite.Examples/ThinClient/ThinClientQueryExample.cs
new file mode 100644
index 0000000..49c87b7
--- /dev/null
+++ b/modules/platforms/dotnet/examples/Apache.Ignite.Examples/ThinClient/ThinClientQueryExample.cs
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Examples.ThinClient
+{
+ using System;
+ using Apache.Ignite.Core;
+ using Apache.Ignite.Core.Cache.Query;
+ using Apache.Ignite.Core.Client;
+ using Apache.Ignite.Core.Client.Cache;
+ using Apache.Ignite.ExamplesDll.Binary;
+ using Apache.Ignite.ExamplesDll.Datagrid;
+
+ /// <summary>
+ /// Demonstrates Ignite.NET "thin" client cache queries.
+ /// <para />
+ /// 1) Set this class as startup object (Apache.Ignite.Examples project -> right-click -> Properties ->
+ /// Application -> Startup object);
+ /// 2) Start example (F5 or Ctrl+F5).
+ /// <para />
+ /// This example must be run with standalone Apache Ignite node:
+ /// 1) Run %IGNITE_HOME%/platforms/dotnet/bin/Apache.Ignite.exe:
+ /// Apache.Ignite.exe -configFileName=platforms\dotnet\examples\apache.ignite.examples\app.config -assembly=[path_to_Apache.Ignite.ExamplesDll.dll]
+ /// 2) Start example.
+ /// </summary>
+ public class ThinClientQueryExample
+ {
+ /// <summary> Cache name. </summary>
+ private const string CacheName = "default-cache";
+
+ [STAThread]
+ public static void Main()
+ {
+ var cfg = new IgniteClientConfiguration
+ {
+ Host = "127.0.0.1"
+ };
+
+ using (IIgniteClient igniteClient = Ignition.StartClient(cfg))
+ {
+ Console.WriteLine();
+ Console.WriteLine(">>> Cache query client example started.");
+
+ ICacheClient<int, Employee> cache = igniteClient.GetCache<int, Employee>(CacheName);
+
+ // Populate cache with sample data entries.
+ PopulateCache(cache);
+
+ // Run scan query example.
+ ScanQueryExample(cache);
+ }
+
+ Console.WriteLine();
+ Console.WriteLine(">>> Example finished, press any key to exit ...");
+ Console.ReadKey();
+ }
+
+ /// <summary>
+ /// Queries organizations of specified type.
+ /// </summary>
+ /// <param name="cache">Cache.</param>
+ private static void ScanQueryExample(ICacheClient<int, Employee> cache)
+ {
+ const int zip = 94109;
+
+ var qry = cache.Query(new ScanQuery<int, Employee>(new ScanQueryFilter(zip)));
+
+ Console.WriteLine();
+ Console.WriteLine(">>> Private organizations (scan):");
+
+ foreach (var entry in qry)
+ {
+ Console.WriteLine(">>> " + entry.Value);
+ }
+ }
+
+ /// <summary>
+ /// Populate cache with data for this example.
+ /// </summary>
+ /// <param name="cache">Cache.</param>
+ private static void PopulateCache(ICacheClient<int, Employee> cache)
+ {
+ cache.Put(1, new Employee(
+ "James Wilson",
+ 12500,
+ new Address("1096 Eddy Street, San Francisco, CA", 94109),
+ new[] { "Human Resources", "Customer Service" },
+ 1));
+
+ cache.Put(2, new Employee(
+ "Daniel Adams",
+ 11000,
+ new Address("184 Fidler Drive, San Antonio, TX", 78130),
+ new[] { "Development", "QA" },
+ 1));
+
+ cache.Put(3, new Employee(
+ "Cristian Moss",
+ 12500,
+ new Address("667 Jerry Dove Drive, Florence, SC", 29501),
+ new[] { "Logistics" },
+ 1));
+
+ cache.Put(4, new Employee(
+ "Allison Mathis",
+ 25300,
+ new Address("2702 Freedom Lane, San Francisco, CA", 94109),
+ new[] { "Development" },
+ 2));
+
+ cache.Put(5, new Employee(
+ "Breana Robbin",
+ 6500,
+ new Address("3960 Sundown Lane, Austin, TX", 78130),
+ new[] { "Sales" },
+ 2));
+
+ cache.Put(6, new Employee(
+ "Philip Horsley",
+ 19800,
+ new Address("2803 Elsie Drive, Sioux Falls, SD", 57104),
+ new[] { "Sales" },
+ 2));
+
+ cache.Put(7, new Employee(
+ "Brian Peters",
+ 10600,
+ new Address("1407 Pearlman Avenue, Boston, MA", 12110),
+ new[] { "Development", "QA" },
+ 2));
+ }
+ }
+}
[4/7] ignite git commit: IGNITE-6024: SQL: Implemented
"skipReducerOnUpdate" flag. This closes #2488.
Posted by sb...@apache.org.
http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2DmlRequest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2DmlRequest.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2DmlRequest.java
new file mode 100644
index 0000000..e40bc2d
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2DmlRequest.java
@@ -0,0 +1,516 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.query.h2.twostep.msg;
+
+import java.io.Externalizable;
+import java.nio.ByteBuffer;
+import java.util.List;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.GridDirectCollection;
+import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.binary.BinaryMarshaller;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+import static org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery.EMPTY_PARAMS;
+
+/**
+ * Request for DML operation on remote node.
+ */
+public class GridH2DmlRequest implements Message, GridCacheQueryMarshallable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Request id. */
+ @GridToStringInclude
+ private long reqId;
+
+ /** Cache identifiers. */
+ @GridToStringInclude
+ @GridDirectCollection(Integer.class)
+ private List<Integer> caches;
+
+ /** Topology version. */
+ @GridToStringInclude
+ private AffinityTopologyVersion topVer;
+
+ /** Query partitions. */
+ @GridToStringInclude
+ private int[] qryParts;
+
+ /** Page size. */
+ private int pageSize;
+
+ /** Query. */
+ @GridToStringInclude
+ private String qry;
+
+ /** Flags. */
+ private byte flags;
+
+ /** Timeout. */
+ private int timeout;
+
+ /** Query parameters. */
+ @GridToStringInclude(sensitive = true)
+ @GridDirectTransient
+ private Object[] params;
+
+ /** Query parameters as bytes. */
+ private byte[] paramsBytes;
+
+ /** Schema name. */
+ @GridToStringInclude
+ private String schemaName;
+
+ /**
+ * Required by {@link Externalizable}
+ */
+ public GridH2DmlRequest() {
+ // No-op.
+ }
+
+ /**
+ * @param req Request.
+ */
+ public GridH2DmlRequest(GridH2DmlRequest req) {
+ reqId = req.reqId;
+ caches = req.caches;
+ topVer = req.topVer;
+ qryParts = req.qryParts;
+ pageSize = req.pageSize;
+ qry = req.qry;
+ flags = req.flags;
+ timeout = req.timeout;
+ params = req.params;
+ paramsBytes = req.paramsBytes;
+ schemaName = req.schemaName;
+ }
+
+ /**
+ * @return Parameters.
+ */
+ public Object[] parameters() {
+ return params;
+ }
+
+ /**
+ * @param params Parameters.
+ * @return {@code this}.
+ */
+ public GridH2DmlRequest parameters(Object[] params) {
+ if (params == null)
+ params = EMPTY_PARAMS;
+
+ this.params = params;
+
+ return this;
+ }
+
+ /**
+ * @param reqId Request ID.
+ * @return {@code this}.
+ */
+ public GridH2DmlRequest requestId(long reqId) {
+ this.reqId = reqId;
+
+ return this;
+ }
+
+ /**
+ * @return Request ID.
+ */
+ public long requestId() {
+ return reqId;
+ }
+
+ /**
+ * @param caches Caches.
+ * @return {@code this}.
+ */
+ public GridH2DmlRequest caches(List<Integer> caches) {
+ this.caches = caches;
+
+ return this;
+ }
+
+ /**
+ * @return Caches.
+ */
+ public List<Integer> caches() {
+ return caches;
+ }
+
+ /**
+ * @param topVer Topology version.
+ * @return {@code this}.
+ */
+ public GridH2DmlRequest topologyVersion(AffinityTopologyVersion topVer) {
+ this.topVer = topVer;
+
+ return this;
+ }
+
+ /**
+ * @return Topology version.
+ */
+ public AffinityTopologyVersion topologyVersion() {
+ return topVer;
+ }
+
+ /**
+ * @return Query partitions.
+ */
+ public int[] queryPartitions() {
+ return qryParts;
+ }
+
+ /**
+ * @param qryParts Query partitions.
+ * @return {@code this}.
+ */
+ public GridH2DmlRequest queryPartitions(int[] qryParts) {
+ this.qryParts = qryParts;
+
+ return this;
+ }
+
+ /**
+ * @param pageSize Page size.
+ * @return {@code this}.
+ */
+ public GridH2DmlRequest pageSize(int pageSize) {
+ this.pageSize = pageSize;
+
+ return this;
+ }
+
+ /**
+ * @return Page size.
+ */
+ public int pageSize() {
+ return pageSize;
+ }
+
+ /**
+ * @param qry SQL Query.
+ * @return {@code this}.
+ */
+ public GridH2DmlRequest query(String qry) {
+ this.qry = qry;
+
+ return this;
+ }
+
+ /**
+ * @return SQL Query.
+ */
+ public String query() {
+ return qry;
+ }
+
+ /**
+ * @param flags Flags.
+ * @return {@code this}.
+ */
+ public GridH2DmlRequest flags(int flags) {
+ assert flags >= 0 && flags <= 255: flags;
+
+ this.flags = (byte)flags;
+
+ return this;
+ }
+
+ /**
+ * @param flags Flags to check.
+ * @return {@code true} If all the requested flags are set to {@code true}.
+ */
+ public boolean isFlagSet(int flags) {
+ return (this.flags & flags) == flags;
+ }
+
+ /**
+ * @return Timeout.
+ */
+ public int timeout() {
+ return timeout;
+ }
+
+ /**
+ * @param timeout New timeout.
+ * @return {@code this}.
+ */
+ public GridH2DmlRequest timeout(int timeout) {
+ this.timeout = timeout;
+
+ return this;
+ }
+
+ /**
+ * @return Schema name.
+ */
+ public String schemaName() {
+ return schemaName;
+ }
+
+ /**
+ * @param schemaName Schema name.
+ * @return {@code this}.
+ */
+ public GridH2DmlRequest schemaName(String schemaName) {
+ this.schemaName = schemaName;
+
+ return this;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void marshall(Marshaller m) {
+ if (paramsBytes != null)
+ return;
+
+ assert params != null;
+
+ try {
+ paramsBytes = U.marshal(m, params);
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("IfMayBeConditional")
+ @Override public void unmarshall(Marshaller m, GridKernalContext ctx) {
+ if (params != null)
+ return;
+
+ assert paramsBytes != null;
+
+ try {
+ final ClassLoader ldr = U.resolveClassLoader(ctx.config());
+
+ if (m instanceof BinaryMarshaller)
+ // To avoid deserializing of enum types.
+ params = ((BinaryMarshaller)m).binaryMarshaller().unmarshal(paramsBytes, ldr);
+ else
+ params = U.unmarshal(m, paramsBytes, ldr);
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 0:
+ if (!writer.writeCollection("caches", caches, MessageCollectionItemType.INT))
+ return false;
+
+ writer.incrementState();
+
+ case 1:
+ if (!writer.writeByte("flags", flags))
+ return false;
+
+ writer.incrementState();
+
+ case 2:
+ if (!writer.writeInt("pageSize", pageSize))
+ return false;
+
+ writer.incrementState();
+
+ case 3:
+ if (!writer.writeByteArray("paramsBytes", paramsBytes))
+ return false;
+
+ writer.incrementState();
+
+ case 4:
+ if (!writer.writeString("qry", qry))
+ return false;
+
+ writer.incrementState();
+
+ case 5:
+ if (!writer.writeIntArray("qryParts", qryParts))
+ return false;
+
+ writer.incrementState();
+
+ case 6:
+ if (!writer.writeLong("reqId", reqId))
+ return false;
+
+ writer.incrementState();
+
+ case 7:
+ if (!writer.writeString("schemaName", schemaName))
+ return false;
+
+ writer.incrementState();
+
+ case 8:
+ if (!writer.writeInt("timeout", timeout))
+ return false;
+
+ writer.incrementState();
+
+ case 9:
+ if (!writer.writeMessage("topVer", topVer))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ switch (reader.state()) {
+ case 0:
+ caches = reader.readCollection("caches", MessageCollectionItemType.INT);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 1:
+ flags = reader.readByte("flags");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 2:
+ pageSize = reader.readInt("pageSize");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 3:
+ paramsBytes = reader.readByteArray("paramsBytes");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 4:
+ qry = reader.readString("qry");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 5:
+ qryParts = reader.readIntArray("qryParts");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 6:
+ reqId = reader.readLong("reqId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 7:
+ schemaName = reader.readString("schemaName");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 8:
+ timeout = reader.readInt("timeout");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 9:
+ topVer = reader.readMessage("topVer");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return reader.afterMessageRead(GridH2DmlRequest.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return -55;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 10;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onAckReceived() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridH2DmlRequest.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2DmlResponse.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2DmlResponse.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2DmlResponse.java
new file mode 100644
index 0000000..808ff9e
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2DmlResponse.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.twostep.msg;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.binary.BinaryMarshaller;
+import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ * Response to remote DML request.
+ */
+public class GridH2DmlResponse implements Message, GridCacheQueryMarshallable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Request id. */
+ @GridToStringInclude
+ private long reqId;
+
+ /** Number of updated rows. */
+ @GridToStringInclude
+ private long updCnt;
+
+ /** Error message. */
+ @GridToStringInclude
+ private String err;
+
+ /** Keys that failed. */
+ @GridToStringInclude
+ @GridDirectTransient
+ private Object[] errKeys;
+
+ /** Keys that failed (after marshalling). */
+ private byte[] errKeysBytes;
+
+ /**
+ * Default constructor.
+ */
+ public GridH2DmlResponse() {
+ // No-op.
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param reqId Request id.
+ * @param updCnt Updated row number.
+ * @param errKeys Erroneous keys.
+ * @param error Error message.
+ */
+ public GridH2DmlResponse(long reqId, long updCnt, Object[] errKeys, String error) {
+ this.reqId = reqId;
+ this.updCnt = updCnt;
+ this.errKeys = errKeys;
+ this.err = error;
+ }
+
+ /**
+ * @return Request id.
+ */
+ public long requestId() {
+ return reqId;
+ }
+
+ /**
+ * @return Update counter.
+ */
+ public long updateCounter() {
+ return updCnt;
+ }
+
+ /**
+ * @return Error keys.
+ */
+ public Object[] errorKeys() {
+ return errKeys;
+ }
+
+ /**
+ * @return Error message.
+ */
+ public String error() {
+ return err;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void marshall(Marshaller m) {
+ if (errKeysBytes != null || errKeys == null)
+ return;
+
+ try {
+ errKeysBytes = U.marshal(m, errKeys);
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("IfMayBeConditional")
+ @Override public void unmarshall(Marshaller m, GridKernalContext ctx) {
+ if (errKeys != null || errKeysBytes == null)
+ return;
+
+ try {
+ final ClassLoader ldr = U.resolveClassLoader(ctx.config());
+
+ if (m instanceof BinaryMarshaller)
+ // To avoid deserializing of enum types.
+ errKeys = ((BinaryMarshaller)m).binaryMarshaller().unmarshal(errKeysBytes, ldr);
+ else
+ errKeys = U.unmarshal(m, errKeysBytes, ldr);
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridH2DmlResponse.class, this);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 0:
+ if (!writer.writeString("err", err))
+ return false;
+
+ writer.incrementState();
+
+ case 1:
+ if (!writer.writeByteArray("errKeysBytes", errKeysBytes))
+ return false;
+
+ writer.incrementState();
+
+ case 2:
+ if (!writer.writeLong("reqId", reqId))
+ return false;
+
+ writer.incrementState();
+
+ case 3:
+ if (!writer.writeLong("updCnt", updCnt))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ switch (reader.state()) {
+ case 0:
+ err = reader.readString("err");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 1:
+ errKeysBytes = reader.readByteArray("errKeysBytes");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 2:
+ reqId = reader.readLong("reqId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 3:
+ updCnt = reader.readLong("updCnt");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return reader.afterMessageRead(GridH2DmlResponse.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return -56;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 4;
+ }
+
+ @Override public void onAckReceived() {
+ // No-op
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2ValueMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2ValueMessageFactory.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2ValueMessageFactory.java
index 18b1afb..3c13392 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2ValueMessageFactory.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2ValueMessageFactory.java
@@ -112,6 +112,12 @@ public class GridH2ValueMessageFactory implements MessageFactory {
case -54:
return new QueryTable();
+
+ case -55:
+ return new GridH2DmlRequest();
+
+ case -56:
+ return new GridH2DmlResponse();
}
return null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSkipReducerOnUpdateDmlFlagSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSkipReducerOnUpdateDmlFlagSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSkipReducerOnUpdateDmlFlagSelfTest.java
new file mode 100644
index 0000000..e5efc06
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSkipReducerOnUpdateDmlFlagSelfTest.java
@@ -0,0 +1,783 @@
+package org.apache.ignite.internal.processors.query;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import javax.cache.Cache;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ * Tests for {@link SqlFieldsQueryEx#skipReducerOnUpdate} flag.
+ */
+public class IgniteSqlSkipReducerOnUpdateDmlFlagSelfTest extends GridCommonAbstractTest {
+ /** IP finder. */
+ private static final TcpDiscoveryVmIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private static int NODE_COUNT = 4;
+
+ /** */
+ private static String NODE_CLIENT = "client";
+
+ /** */
+ private static String CACHE_ACCOUNT = "acc";
+
+ /** */
+ private static String CACHE_REPORT = "rep";
+
+ /** */
+ private static String CACHE_STOCK = "stock";
+
+ /** */
+ private static String CACHE_TRADE = "trade";
+
+ /** */
+ private static String CACHE_LIST = "list";
+
+ /** */
+ private static IgniteEx client;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration c = super.getConfiguration(gridName);
+
+ TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+ disco.setIpFinder(IP_FINDER);
+
+ c.setDiscoverySpi(disco);
+
+ List<CacheConfiguration> ccfgs = new ArrayList<>();
+
+ ccfgs.add(buildCacheConfiguration(CACHE_ACCOUNT));
+ ccfgs.add(buildCacheConfiguration(CACHE_STOCK));
+ ccfgs.add(buildCacheConfiguration(CACHE_TRADE));
+ ccfgs.add(buildCacheConfiguration(CACHE_REPORT));
+ ccfgs.add(buildCacheConfiguration(CACHE_LIST));
+
+ c.setCacheConfiguration(ccfgs.toArray(new CacheConfiguration[ccfgs.size()]));
+
+ if (gridName.equals(NODE_CLIENT))
+ c.setClientMode(true);
+
+ return c;
+ }
+
+ /**
+ * Creates a cache configuration.
+ *
+ * @param name Name of the cache.
+ * @return Cache configuration.
+ */
+ private CacheConfiguration buildCacheConfiguration(String name) {
+ if (name.equals(CACHE_ACCOUNT)) {
+ CacheConfiguration ccfg = new CacheConfiguration(CACHE_ACCOUNT);
+
+ ccfg.setCacheMode(CacheMode.PARTITIONED);
+
+ QueryEntity entity = new QueryEntity(Integer.class, Account.class);
+
+ ccfg.setQueryEntities(Collections.singletonList(entity));
+
+ return ccfg;
+ }
+ if (name.equals(CACHE_STOCK)) {
+ CacheConfiguration ccfg = new CacheConfiguration(CACHE_STOCK);
+
+ ccfg.setCacheMode(CacheMode.REPLICATED);
+
+ QueryEntity entity = new QueryEntity(Integer.class, Stock.class);
+
+ ccfg.setQueryEntities(Collections.singletonList(entity));
+
+ return ccfg;
+ }
+ if (name.equals(CACHE_TRADE)) {
+ CacheConfiguration ccfg = new CacheConfiguration(CACHE_TRADE);
+
+ ccfg.setCacheMode(CacheMode.PARTITIONED);
+
+ QueryEntity entity = new QueryEntity(Integer.class, Trade.class);
+
+ ccfg.setQueryEntities(Collections.singletonList(entity));
+
+ return ccfg;
+ }
+ if (name.equals(CACHE_REPORT)) {
+ CacheConfiguration ccfg = new CacheConfiguration(CACHE_REPORT);
+
+ ccfg.setCacheMode(CacheMode.PARTITIONED);
+
+ QueryEntity entity = new QueryEntity(Integer.class, Report.class);
+
+ ccfg.setQueryEntities(Collections.singletonList(entity));
+
+ return ccfg;
+ }
+ if (name.equals(CACHE_LIST)) {
+ CacheConfiguration ccfg = new CacheConfiguration(CACHE_LIST);
+
+ ccfg.setCacheMode(CacheMode.PARTITIONED);
+
+ QueryEntity entity = new QueryEntity(Integer.class, String.class);
+
+ ccfg.setQueryEntities(Collections.singletonList(entity));
+
+ return ccfg;
+ }
+
+ assert false;
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ startGrids(NODE_COUNT);
+
+ client = (IgniteEx)startGrid(NODE_CLIENT);
+
+ awaitPartitionMapExchange();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+
+ super.afterTestsStopped();
+
+ stopAllGrids();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ awaitPartitionMapExchange();
+
+ client.cache(CACHE_ACCOUNT).clear();
+ client.cache(CACHE_STOCK).clear();
+ client.cache(CACHE_TRADE).clear();
+ client.cache(CACHE_REPORT).clear();
+ client.cache(CACHE_LIST).clear();
+ }
+
+ /**
+ *
+ * @throws Exception If failed.
+ */
+ public void testUpdate() throws Exception {
+ Map<Integer, Account> accounts = getAccounts(100, 1, 100);
+
+ String text = "UPDATE \"acc\".Account SET depo = depo - ? WHERE depo > 0";
+
+ checkUpdate(client.<Integer, Account>cache(CACHE_ACCOUNT), accounts, new SqlFieldsQueryEx(text, false).setArgs(10));
+ }
+
+ /**
+ *
+ * @throws Exception If failed.
+ */
+ public void testUpdateFastKey() throws Exception {
+ Map<Integer, Account> accounts = getAccounts(100, 1, 100);
+
+ String text = "UPDATE \"acc\".Account SET depo = depo - ? WHERE _key = ?";
+
+ checkUpdate(client.<Integer, Account>cache(CACHE_ACCOUNT), accounts,
+ new SqlFieldsQueryEx(text, false).setArgs(10, 1));
+ }
+
+ /**
+ *
+ * @throws Exception If failed.
+ */
+ public void testUpdateLimit() throws Exception {
+ Map<Integer, Account> accounts = getAccounts(100, 1, 100);
+
+ String text = "UPDATE \"acc\".Account SET depo = depo - ? WHERE sn >= ? AND sn < ? LIMIT ?";
+
+ checkUpdate(client.<Integer, Account>cache(CACHE_ACCOUNT), accounts,
+ new SqlFieldsQueryEx(text, false).setArgs(10, 0, 10, 10));
+ }
+
+ /**
+ *
+ * @throws Exception If failed.
+ */
+ public void testUpdateWhereSubquery() throws Exception {
+ Map<Integer, Account> accounts = getAccounts(100, 1, -100);
+
+ Map<Integer, Trade> trades = getTrades(100, 2);
+
+ client.cache(CACHE_ACCOUNT).putAll(accounts);
+
+ String text = "UPDATE \"trade\".Trade t SET qty = ? " +
+ "WHERE accountId IN (SELECT p._key FROM \"acc\".Account p WHERE depo < ?)";
+
+ checkUpdate(client.<Integer, Trade>cache(CACHE_TRADE), trades,
+ new SqlFieldsQueryEx(text, false).setArgs(0, 0));
+ }
+
+ /**
+ *
+ * @throws Exception If failed.
+ */
+ public void testUpdateSetSubquery() throws Exception {
+ Map<Integer, Account> accounts = getAccounts(100, 1, 1000);
+ Map<Integer, Trade> trades = getTrades(100, 2);
+
+ client.cache(CACHE_ACCOUNT).putAll(accounts);
+
+ String text = "UPDATE \"trade\".Trade t SET qty = " +
+ "(SELECT a.depo/t.price FROM \"acc\".Account a WHERE t.accountId = a._key)";
+
+ checkUpdate(client.<Integer, Trade>cache(CACHE_TRADE), trades,
+ new SqlFieldsQueryEx(text, false));
+ }
+
+ /**
+ *
+ * @throws Exception If failed.
+ */
+ public void testUpdateSetTableSubquery() throws Exception {
+ Map<Integer, Account> accounts = getAccounts(100, 1, 1000);
+ Map<Integer, Trade> trades = getTrades(100, 2);
+
+ client.cache(CACHE_ACCOUNT).putAll(accounts);
+
+ String text = "UPDATE \"trade\".Trade t SET (qty) = " +
+ "(SELECT a.depo/t.price FROM \"acc\".Account a WHERE t.accountId = a._key)";
+
+ checkUpdate(client.<Integer, Trade>cache(CACHE_TRADE), trades,
+ new SqlFieldsQueryEx(text, false));
+ }
+
+ /**
+ *
+ * @throws Exception If failed.
+ */
+ public void testInsertValues() throws Exception {
+ String text = "INSERT INTO \"acc\".Account (_key, name, sn, depo)" +
+ " VALUES (?, ?, ?, ?), (?, ?, ?, ?)";
+
+ checkUpdate(client.<Integer, Account>cache(CACHE_ACCOUNT), null,
+ new SqlFieldsQueryEx(text, false).setArgs(1, "John Marry", 11111, 100, 2, "Marry John", 11112, 200));
+ }
+
+ /**
+ *
+ * @throws Exception If failed.
+ */
+ public void testInsertFromSelect() throws Exception {
+ Map<Integer, Account> accounts = getAccounts(100, 1, 1000);
+
+ client.cache(CACHE_ACCOUNT).putAll(accounts);
+
+ String text = "INSERT INTO \"trade\".Trade (_key, accountId, stockId, qty, price) " +
+ "SELECT a._key, a._key, ?, a.depo/?, ? FROM \"acc\".Account a";
+
+ checkUpdate(client.<Integer, Trade>cache(CACHE_TRADE), null,
+ new SqlFieldsQueryEx(text, false).setArgs(1, 10, 10));
+ }
+
+ /**
+ *
+ * @throws Exception If failed.
+ */
+ public void testInsertFromSelectOrderBy() throws Exception {
+ Map<Integer, Account> accounts = getAccounts(100, 1, 1000);
+
+ client.cache(CACHE_ACCOUNT).putAll(accounts);
+
+ String text = "INSERT INTO \"trade\".Trade (_key, accountId, stockId, qty, price) " +
+ "SELECT a._key, a._key, ?, a.depo/?, ? FROM \"acc\".Account a " +
+ "ORDER BY a.sn DESC";
+
+ checkUpdate(client.<Integer, Trade>cache(CACHE_TRADE), null,
+ new SqlFieldsQueryEx(text, false).setArgs(1, 10, 10));
+ }
+
+ /**
+ *
+ * @throws Exception If failed.
+ */
+ public void testInsertFromSelectUnion() throws Exception {
+ Map<Integer, Account> accounts = getAccounts(20, 1, 1000);
+
+ client.cache(CACHE_ACCOUNT).putAll(accounts);
+
+ String text = "INSERT INTO \"trade\".Trade (_key, accountId, stockId, qty, price) " +
+ "SELECT a._key, a._key, 0, a.depo, 1 FROM \"acc\".Account a " +
+ "UNION " +
+ "SELECT 101 + a2._key, a2._key, 1, a2.depo, 1 FROM \"acc\".Account a2";
+
+ checkUpdate(client.<Integer, Trade>cache(CACHE_TRADE), null,
+ new SqlFieldsQueryEx(text, false));
+ }
+
+ /**
+ *
+ * @throws Exception If failed.
+ */
+ public void testInsertFromSelectGroupBy() throws Exception {
+ Map<Integer, Account> accounts = getAccounts(100, 1, 1000);
+ Map<Integer, Trade> trades = getTrades(100, 2);
+
+ client.cache(CACHE_ACCOUNT).putAll(accounts);
+ client.cache(CACHE_TRADE).putAll(trades);
+
+ String text = "INSERT INTO \"rep\".Report (_key, accountId, spends, count) " +
+ "SELECT accountId, accountId, SUM(qty * price), COUNT(*) " +
+ "FROM \"trade\".Trade " +
+ "GROUP BY accountId";
+
+ checkUpdate(client.<Integer, Report>cache(CACHE_REPORT), null,
+ new SqlFieldsQueryEx(text, false));
+ }
+
+ /**
+ *
+ * @throws Exception If failed.
+ */
+ public void testInsertFromSelectDistinct() throws Exception {
+ Map<Integer, Account> accounts = getAccounts(100, 2, 100);
+
+ client.cache(CACHE_ACCOUNT).putAll(accounts);
+
+ String text = "INSERT INTO \"list\".String (_key, _val) " +
+ "SELECT DISTINCT sn, name FROM \"acc\".Account ";
+
+ checkUpdate(client.<Integer, String>cache(CACHE_LIST), null,
+ new SqlFieldsQueryEx(text, false));
+ }
+
+ /**
+ *
+ * @throws Exception If failed.
+ */
+ public void testInsertFromSelectJoin() throws Exception {
+ Map<Integer, Account> accounts = getAccounts(100, 1, 100);
+ Map<Integer, Stock> stocks = getStocks(5);
+
+ client.cache(CACHE_ACCOUNT).putAll(accounts);
+ client.cache(CACHE_STOCK).putAll(stocks);
+
+ String text = "INSERT INTO \"trade\".Trade(_key, accountId, stockId, qty, price) " +
+ "SELECT 5*a._key + s._key, a._key, s._key, ?, a.depo/? " +
+ "FROM \"acc\".Account a JOIN \"stock\".Stock s ON 1=1";
+
+ checkUpdate(client.<Integer, Trade>cache(CACHE_TRADE), null,
+ new SqlFieldsQueryEx(text, false).setArgs(10, 10));
+ }
+
+ /**
+ *
+ * @throws Exception If failed.
+ */
+ public void testDelete() throws Exception {
+ Map<Integer, Account> accounts = getAccounts(100, 1, 100);
+
+ client.cache(CACHE_ACCOUNT).putAll(accounts);
+
+ String text = "DELETE FROM \"acc\".Account WHERE sn > ?";
+
+ checkUpdate(client.<Integer, Account>cache(CACHE_ACCOUNT), accounts,
+ new SqlFieldsQueryEx(text, false).setArgs(10));
+ }
+
+ /**
+ *
+ * @throws Exception If failed.
+ */
+ public void testDeleteTop() throws Exception {
+ Map<Integer, Account> accounts = getAccounts(100, 1, 100);
+
+ client.cache(CACHE_ACCOUNT).putAll(accounts);
+
+ String text = "DELETE TOP ? FROM \"acc\".Account WHERE sn < ?";
+
+ checkUpdate(client.<Integer, Account>cache(CACHE_ACCOUNT), accounts,
+ new SqlFieldsQueryEx(text, false).setArgs(10, 10));
+ }
+
+ /**
+ *
+ * @throws Exception If failed.
+ */
+ public void testDeleteWhereSubquery() throws Exception {
+ Map<Integer, Account> accounts = getAccounts(20, 1, 100);
+ Map<Integer, Trade> trades = getTrades(10, 2);
+
+ client.cache(CACHE_ACCOUNT).putAll(accounts);
+ client.cache(CACHE_TRADE).putAll(trades);
+
+ String text = "DELETE FROM \"acc\".Account " +
+ "WHERE _key IN (SELECT t.accountId FROM \"trade\".Trade t)";
+
+ checkUpdate(client.<Integer, Account>cache(CACHE_ACCOUNT), accounts,
+ new SqlFieldsQueryEx(text, false));
+ }
+
+ /**
+ *
+ * @throws Exception If failed.
+ */
+ public void testMergeValues() throws Exception {
+ Map<Integer, Account> accounts = getAccounts(1, 1, 100);
+
+ String text = "MERGE INTO \"acc\".Account (_key, name, sn, depo)" +
+ " VALUES (?, ?, ?, ?), (?, ?, ?, ?)";
+
+ checkUpdate(client.<Integer, Account>cache(CACHE_ACCOUNT), accounts,
+ new SqlFieldsQueryEx(text, false).setArgs(0, "John Marry", 11111, 100, 1, "Marry John", 11112, 200));
+ }
+
+ /**
+ *
+ * @throws Exception If failed.
+ */
+ public void testMergeFromSelectJoin() throws Exception {
+ Map<Integer, Account> accounts = getAccounts(100, 1, 100);
+ Map<Integer, Stock> stocks = getStocks(5);
+
+ client.cache(CACHE_ACCOUNT).putAll(accounts);
+ client.cache(CACHE_STOCK).putAll(stocks);
+
+ Map<Integer, Trade> trades = new HashMap<>();
+
+ trades.put(5, new Trade(1, 1, 1, 1));
+
+ String text = "MERGE INTO \"trade\".Trade(_key, accountId, stockId, qty, price) " +
+ "SELECT 5*a._key + s._key, a._key, s._key, ?, a.depo/? " +
+ "FROM \"acc\".Account a JOIN \"stock\".Stock s ON 1=1";
+
+ checkUpdate(client.<Integer, Trade>cache(CACHE_TRADE), trades,
+ new SqlFieldsQueryEx(text, false).setArgs(10, 10));
+ }
+
+ /**
+ *
+ * @throws Exception If failed.
+ */
+ public void testMergeFromSelectOrderBy() throws Exception {
+ Map<Integer, Account> accounts = getAccounts(100, 1, 1000);
+
+ client.cache(CACHE_ACCOUNT).putAll(accounts);
+
+ Map<Integer, Trade> trades = new HashMap<>();
+
+ trades.put(5, new Trade(1, 1, 1, 1));
+
+ String text = "MERGE INTO \"trade\".Trade (_key, accountId, stockId, qty, price) " +
+ "SELECT a._key, a._key, ?, a.depo/?, ? FROM \"acc\".Account a " +
+ "ORDER BY a.sn DESC";
+
+ checkUpdate(client.<Integer, Trade>cache(CACHE_TRADE), trades,
+ new SqlFieldsQueryEx(text, false).setArgs(1, 10, 10));
+ }
+
+ /**
+ *
+ * @throws Exception If failed.
+ */
+ public void testMergeFromSelectGroupBy() throws Exception {
+ Map<Integer, Account> accounts = getAccounts(100, 1, 1000);
+ Map<Integer, Trade> trades = getTrades(100, 2);
+
+ client.cache(CACHE_ACCOUNT).putAll(accounts);
+ client.cache(CACHE_TRADE).putAll(trades);
+
+ Map<Integer, Report> reports = new HashMap<>();
+
+ reports.put(5, new Report(5, 1, 1));
+
+ String text = "MERGE INTO \"rep\".Report (_key, accountId, spends, count) " +
+ "SELECT accountId, accountId, SUM(qty * price), COUNT(*) " +
+ "FROM \"trade\".Trade " +
+ "GROUP BY accountId";
+
+ checkUpdate(client.<Integer, Report>cache(CACHE_REPORT), reports,
+ new SqlFieldsQueryEx(text, false));
+ }
+
+ /**
+ * Constructs multiple Account objects.
+ *
+ * @param num Number of accounts.
+ * @param numCopy Number of copies.
+ * @param depo Deposit amount.
+ * @return Map of accounts.
+ */
+ private Map<Integer, Account> getAccounts(int num, int numCopy, int depo) {
+ Map<Integer, Account> res = new HashMap<>();
+
+ int count = 0;
+
+ for (int i = 0; i < num; ++i) {
+ String name = "John doe #" + i;
+
+ for (int j = 0; j < numCopy; ++j)
+ res.put(count++, new Account(name, i, depo));
+ }
+
+ return res;
+ }
+
+ /**
+ * Constructs multiple Stock objects.
+ *
+ * @param num Number of stocks.
+ * @return Map of Stock objects.
+ */
+ private Map<Integer, Stock> getStocks(int num) {
+ Map<Integer, Stock> res = new HashMap<>();
+
+ for (int i = 0; i < num; ++i)
+ res.put(i, new Stock("T" + i, "Stock #" + i));
+
+ return res;
+ }
+
+ /**
+ * Constructs multiple Trade objects.
+ *
+ * @param numAccounts Number of accounts.
+ * @param numStocks Number of stocks.
+ * @return Map of Trade objects.
+ */
+ private Map<Integer, Trade> getTrades(int numAccounts, int numStocks) {
+ Map<Integer, Trade> res = new HashMap<>();
+
+ int count = 0;
+
+ for (int i = 0; i < numAccounts; ++i) {
+ for (int j = 0; j < numStocks; ++j) {
+ res.put(count++, new Trade(i, j, 100, 100));
+ }
+ }
+
+ return res;
+ }
+
+ /**
+ * Executes provided sql update with skipReducerOnUpdate flag on and off and checks results are the same.
+ *
+ * @param cache Cache.
+ * @param initial Initial content of the cache.
+ * @param qry Query to execute.
+ * @param <K> Key type.
+ * @param <V> Value type.
+ */
+ private <K, V> void checkUpdate(IgniteCache<K, V> cache, Map<K, V> initial, SqlFieldsQueryEx qry) {
+ cache.clear();
+
+ if (!F.isEmpty(initial))
+ cache.putAll(initial);
+
+ List<List<?>> updRes = cache.query(qry.setSkipReducerOnUpdate(true)).getAll();
+
+ Map<K, V> result = new HashMap<>(cache.size());
+
+ for (Cache.Entry<K, V> e : cache)
+ result.put(e.getKey(), e.getValue());
+
+ cache.clear();
+
+ if (!F.isEmpty(initial))
+ cache.putAll(initial);
+
+ List<List<?>> updRes2 = cache.query(qry.setSkipReducerOnUpdate(false)).getAll();
+
+ assertTrue(((Number)updRes.get(0).get(0)).intValue() > 0);
+
+ assertEquals(((Number)updRes.get(0).get(0)).intValue(), ((Number)updRes2.get(0).get(0)).intValue());
+
+ assertEquals(result.size(), cache.size());
+
+ for (Cache.Entry<K, V> e : cache)
+ assertEquals(e.getValue(), result.get(e.getKey()));
+ }
+
+ /** */
+ public class Account {
+ /** */
+ @QuerySqlField
+ String name;
+
+ /** */
+ @QuerySqlField
+ int sn;
+
+ /** */
+ @QuerySqlField
+ int depo;
+
+ /**
+ * Constructor.
+ *
+ * @param name Name.
+ * @param sn ID.
+ * @param depo Deposit amount.
+ */
+ Account(String name, int sn, int depo) {
+ this.name = name;
+ this.sn = sn;
+ this.depo = depo;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return (name == null ? 0 : name.hashCode()) ^ sn ^ depo;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object obj) {
+ if (obj == null)
+ return false;
+
+ if (!obj.getClass().equals(Account.class))
+ return false;
+
+ Account other = (Account)obj;
+
+ return F.eq(name, other.name) && sn == other.sn && depo == other.depo;
+ }
+ }
+
+ /** */
+ public class Stock {
+ /** */
+ @QuerySqlField
+ String ticker;
+
+ /** */
+ @QuerySqlField
+ String name;
+
+ /**
+ * Constructor.
+ *
+ * @param ticker Short name.
+ * @param name Name.
+ */
+ Stock(String ticker, String name) {
+ this.ticker = ticker;
+ this.name = name;
+ }
+ }
+
+ /** */
+ public class Trade {
+ /** */
+ @QuerySqlField
+ int accountId;
+
+ /** */
+ @QuerySqlField
+ int stockId;
+
+ /** */
+ @QuerySqlField
+ int qty;
+
+ /** */
+ @QuerySqlField
+ int price;
+
+ /**
+ * Constructor.
+ *
+ * @param accountId Account id.
+ * @param stockId Stock id.
+ * @param qty Quantity.
+ * @param price Price.
+ */
+ Trade(int accountId, int stockId, int qty, int price) {
+ this.accountId = accountId;
+ this.stockId = stockId;
+ this.qty = qty;
+ this.price = price;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return accountId ^ stockId ^ qty ^ price;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object obj) {
+ if (obj == null)
+ return false;
+
+ if (!obj.getClass().equals(Trade.class))
+ return false;
+
+ Trade other = (Trade)obj;
+
+ return accountId == other.accountId && stockId == other.stockId &&
+ qty == other.qty && price == other.price;
+ }
+
+ }
+
+ /** */
+ public class Report {
+ /** */
+ @QuerySqlField
+ int accountId;
+
+ /** */
+ @QuerySqlField
+ int spends;
+
+ /** */
+ @QuerySqlField
+ int count;
+
+ /**
+ * Constructor.
+ *
+ * @param accountId Account id.
+ * @param spends Spends.
+ * @param count Count.
+ */
+ Report(int accountId, int spends, int count) {
+ this.accountId = accountId;
+ this.spends = spends;
+ this.count = count;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return accountId ^ spends ^ count;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object obj) {
+ if (obj == null)
+ return false;
+
+ if (!obj.getClass().equals(Report.class))
+ return false;
+
+ Report other = (Report)obj;
+
+ return accountId == other.accountId && spends == other.spends &&
+ count == other.count;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSkipReducerOnUpdateDmlSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSkipReducerOnUpdateDmlSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSkipReducerOnUpdateDmlSelfTest.java
new file mode 100644
index 0000000..a2a6bf8
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSkipReducerOnUpdateDmlSelfTest.java
@@ -0,0 +1,755 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import javax.cache.CacheException;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CacheKeyConfiguration;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cache.affinity.AffinityKeyMapped;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.apache.ignite.cache.query.annotations.QuerySqlFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.CacheQueryExecutedEvent;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx;
+import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
+import org.apache.ignite.internal.processors.query.h2.twostep.GridMapQueryExecutor;
+import org.apache.ignite.internal.processors.query.h2.twostep.GridReduceQueryExecutor;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jsr166.ThreadLocalRandom8;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_EXECUTED;
+
+/**
+ * Tests for distributed DML.
+ */
+@SuppressWarnings({"unchecked", "ThrowableResultOfMethodCallIgnored"})
+public class IgniteSqlSkipReducerOnUpdateDmlSelfTest extends GridCommonAbstractTest {
+ /** IP finder. */
+ private static final TcpDiscoveryVmIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private static int NODE_COUNT = 4;
+
+ /** */
+ private static String NODE_CLIENT = "client";
+
+ /** */
+ private static String CACHE_ORG = "org";
+
+ /** */
+ private static String CACHE_PERSON = "person";
+
+ /** */
+ private static String CACHE_POSITION = "pos";
+
+ /** */
+ private static Ignite client;
+
+ /** */
+ private static CountDownLatch latch;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration c = super.getConfiguration(gridName);
+
+ TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+ disco.setIpFinder(IP_FINDER);
+
+ c.setDiscoverySpi(disco);
+
+ List<CacheConfiguration> ccfgs = new ArrayList<>();
+
+ ccfgs.add(buildCacheConfiguration(CACHE_ORG));
+ ccfgs.add(buildCacheConfiguration(CACHE_PERSON));
+ ccfgs.add(buildCacheConfiguration(CACHE_POSITION));
+
+ c.setCacheConfiguration(ccfgs.toArray(new CacheConfiguration[ccfgs.size()]));
+
+ c.setLongQueryWarningTimeout(10000);
+
+ if (gridName.equals(NODE_CLIENT))
+ c.setClientMode(true);
+
+ return c;
+ }
+
+ /**
+ * Creates cache configuration.
+ *
+ * @param name Cache name.
+ * @return Cache configuration.
+ */
+ private CacheConfiguration buildCacheConfiguration(String name) {
+ if (name.equals(CACHE_ORG)) {
+ CacheConfiguration ccfg = new CacheConfiguration(CACHE_ORG);
+
+ ccfg.setCacheMode(CacheMode.PARTITIONED);
+
+ QueryEntity entity = new QueryEntity(Integer.class, Organization.class);
+
+ ccfg.setQueryEntities(Collections.singletonList(entity));
+
+ ccfg.setSqlFunctionClasses(IgniteSqlSkipReducerOnUpdateDmlSelfTest.class);
+
+ return ccfg;
+ }
+ if (name.equals(CACHE_PERSON)) {
+ CacheConfiguration ccfg = new CacheConfiguration(CACHE_PERSON);
+
+ ccfg.setCacheMode(CacheMode.PARTITIONED);
+
+ QueryEntity entity = new QueryEntity(PersonKey.class, Person.class);
+
+ ccfg.setQueryEntities(Collections.singletonList(entity));
+
+ ccfg.setKeyConfiguration(new CacheKeyConfiguration(PersonKey.class));
+
+ ccfg.setSqlFunctionClasses(IgniteSqlSkipReducerOnUpdateDmlSelfTest.class);
+
+ return ccfg;
+ }
+ if (name.equals(CACHE_POSITION)) {
+ CacheConfiguration ccfg = new CacheConfiguration(CACHE_POSITION);
+
+ ccfg.setCacheMode(CacheMode.REPLICATED);
+
+ QueryEntity entity = new QueryEntity(Integer.class, Position.class);
+
+ ccfg.setQueryEntities(Collections.singletonList(entity));
+
+ ccfg.setSqlFunctionClasses(IgniteSqlSkipReducerOnUpdateDmlSelfTest.class);
+
+ return ccfg;
+ }
+
+ assert false;
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ startGrids(NODE_COUNT);
+
+ client = startGrid(NODE_CLIENT);
+
+ awaitPartitionMapExchange();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ checkNoLeaks();
+
+ super.afterTestsStopped();
+
+ stopAllGrids();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ // Stop additional node that is started in one of the test.
+ stopGrid(NODE_COUNT + 1);
+
+ awaitPartitionMapExchange();
+
+ client.cache(CACHE_PERSON).clear();
+ client.cache(CACHE_ORG).clear();
+ client.cache(CACHE_POSITION).clear();
+ }
+
+ /**
+ *
+ * @throws Exception if failed.
+ */
+ public void testSimpleUpdateDistributedReplicated() throws Exception {
+ fillCaches();
+
+ IgniteCache<Integer, Position> cache = grid(NODE_CLIENT).cache(CACHE_POSITION);
+
+ Position p = cache.get(1);
+
+ List<List<?>> r = cache.query(new SqlFieldsQueryEx("UPDATE Position p SET name = CONCAT('A ', name)", false)
+ .setSkipReducerOnUpdate(true)).getAll();
+
+ assertEquals((long)cache.size(), r.get(0).get(0));
+
+ assertEquals(cache.get(1).name, "A " + p.name);
+ }
+
+ /**
+ *
+ * @throws Exception if failed.
+ */
+ public void testSimpleUpdateDistributedPartitioned() throws Exception {
+ fillCaches();
+
+ IgniteCache<PersonKey, Person> cache = grid(NODE_CLIENT).cache(CACHE_PERSON);
+
+ List<List<?>> r = cache.query(new SqlFieldsQueryEx(
+ "UPDATE Person SET position = CASEWHEN(position = 1, 1, position - 1)", false)
+ .setSkipReducerOnUpdate(true)).getAll();
+
+ assertEquals((long)cache.size(), r.get(0).get(0));
+ }
+
+ /**
+ *
+ * @throws Exception if failed.
+ */
+ public void testDistributedUpdateFailedKeys() throws Exception {
+ // UPDATE can produce failed keys due to concurrent modification
+ fillCaches();
+
+ final IgniteCache<Integer, Organization> cache = grid(NODE_CLIENT).cache(CACHE_ORG);
+
+ GridTestUtils.assertThrows(log, new Callable<Object>() {
+ @Override public Object call() {
+ return cache.query(new SqlFieldsQueryEx("UPDATE Organization SET rate = Modify(_key, rate - 1)", false)
+ .setSkipReducerOnUpdate(true));
+ }
+ }, CacheException.class, "Failed to update some keys because they had been modified concurrently");
+ }
+
+ /**
+ *
+ * @throws Exception if failed.
+ */
+ public void testDistributedUpdateFail() throws Exception {
+ fillCaches();
+
+ final IgniteCache cache = grid(NODE_CLIENT).cache(CACHE_PERSON);
+
+ GridTestUtils.assertThrows(log, new Callable<Object>() {
+ @Override public Object call() {
+ return cache.query(new SqlFieldsQueryEx("UPDATE Person SET name = Fail(name)", false)
+ .setSkipReducerOnUpdate(true));
+ }
+ }, CacheException.class, "Failed to execute SQL query");
+ }
+
+ /**
+ *
+ * @throws Exception if failed.
+ */
+ @SuppressWarnings("ConstantConditions")
+ public void testQueryParallelism() throws Exception {
+ String cacheName = CACHE_ORG + "x4";
+
+ CacheConfiguration cfg = buildCacheConfiguration(CACHE_ORG)
+ .setQueryParallelism(4)
+ .setName(cacheName);
+
+ IgniteCache<Integer, Organization> cache = grid(NODE_CLIENT).createCache(cfg);
+
+ for (int i = 0; i < 1024; i++)
+ cache.put(i, new Organization("Acme Inc #" + i, 0));
+
+ List<List<?>> r = cache.query(new SqlFieldsQueryEx("UPDATE \"" + cacheName +
+ "\".Organization o SET name = UPPER(name)", false).setSkipReducerOnUpdate(true)).getAll();
+
+ assertEquals((long)cache.size(), r.get(0).get(0));
+ }
+
+ /**
+ *
+ * @throws Exception if failed.
+ */
+ public void testEvents() throws Exception {
+ final CountDownLatch latch = new CountDownLatch(NODE_COUNT);
+
+ final IgnitePredicate<Event> pred = new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ assert evt instanceof CacheQueryExecutedEvent;
+
+ CacheQueryExecutedEvent qe = (CacheQueryExecutedEvent)evt;
+
+ assertNotNull(qe.clause());
+
+ latch.countDown();
+
+ return true;
+ }
+ };
+
+ for (int idx = 0; idx < NODE_COUNT; idx++)
+ grid(idx).events().localListen(pred, EVT_CACHE_QUERY_EXECUTED);
+
+ IgniteCache<Integer, Organization> cache = grid(NODE_CLIENT).cache(CACHE_ORG);
+
+ for (int i = 0; i < 1024; i++)
+ cache.put(i, new Organization("Acme Inc #" + i, 0));
+
+ cache.query(new SqlFieldsQueryEx("UPDATE \"org\".Organization o SET name = UPPER(name)", false)
+ .setSkipReducerOnUpdate(true)).getAll();
+
+ assertTrue(latch.await(5000, MILLISECONDS));
+
+ for (int idx = 0; idx < NODE_COUNT; idx++)
+ grid(idx).events().stopLocalListen(pred);
+ }
+
+ /**
+ *
+ * @throws Exception if failed.
+ */
+ public void testSpecificPartitionsUpdate() throws Exception {
+ fillCaches();
+
+ Affinity aff = grid(NODE_CLIENT).affinity(CACHE_PERSON);
+
+ int numParts = aff.partitions();
+ int parts[] = new int[numParts / 2];
+
+ for (int idx = 0; idx < numParts / 2; idx++)
+ parts[idx] = idx * 2;
+
+ IgniteCache<PersonKey, Person> cache = grid(NODE_CLIENT).cache(CACHE_PERSON);
+
+ // UPDATE over even partitions
+ cache.query(new SqlFieldsQueryEx("UPDATE Person SET position = 0", false)
+ .setSkipReducerOnUpdate(true)
+ .setPartitions(parts));
+
+ List<List<?>> rows = cache.query(new SqlFieldsQuery("SELECT _key, position FROM Person")).getAll();
+
+ for (List<?> row : rows) {
+ PersonKey personKey = (PersonKey)row.get(0);
+ int pos = ((Number)row.get(1)).intValue();
+ int part = aff.partition(personKey);
+
+ assertTrue((part % 2 == 0) ^ (pos != 0));
+ }
+ }
+
+ /**
+ *
+ * @throws Exception if failed.
+ */
+ public void testCancel() throws Exception {
+ latch = new CountDownLatch(NODE_COUNT + 1);
+
+ fillCaches();
+
+ final IgniteCache<Integer, Organization> cache = grid(NODE_CLIENT).cache(CACHE_ORG);
+
+ final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
+ @Override public Object call() {
+ return cache.query(new SqlFieldsQueryEx("UPDATE Organization SET name = WAIT(name)", false)
+ .setSkipReducerOnUpdate(true));
+ }
+ });
+
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ Collection<GridRunningQueryInfo> qCol =
+ grid(NODE_CLIENT).context().query().runningQueries(0);
+
+ if (qCol.isEmpty())
+ return false;
+
+ for (GridRunningQueryInfo queryInfo : qCol)
+ queryInfo.cancel();
+
+ return true;
+ }
+ }, 5000);
+
+ latch.await(5000, MILLISECONDS);
+
+ GridTestUtils.assertThrows(log, new Callable<Object>() {
+ @Override public Object call() throws IgniteCheckedException {
+ return fut.get();
+ }
+ }, IgniteCheckedException.class, "Future was cancelled");
+ }
+
+ /**
+ *
+ * @throws Exception if failed.
+ */
+ public void testNodeStopDuringUpdate() throws Exception {
+ startGrid(NODE_COUNT + 1);
+
+ awaitPartitionMapExchange();
+
+ fillCaches();
+
+ latch = new CountDownLatch(NODE_COUNT + 1 + 1);
+
+ final IgniteCache<Integer, Organization> cache = grid(NODE_CLIENT).cache(CACHE_ORG);
+
+ final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
+ @Override public Object call() {
+ return cache.query(new SqlFieldsQueryEx("UPDATE Organization SET name = WAIT(name)", false)
+ .setSkipReducerOnUpdate(true));
+ }
+ });
+
+ final CountDownLatch finalLatch = latch;
+
+ assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return finalLatch.getCount() == 1;
+ }
+ }, 5000));
+
+ latch.countDown();
+
+ stopGrid(NODE_COUNT + 1);
+
+ GridTestUtils.assertThrows(log, new Callable<Object>() {
+ @Override public Object call() throws IgniteCheckedException {
+ return fut.get();
+ }
+ }, IgniteCheckedException.class, "Update failed because map node left topology");
+ }
+
+ /**
+ * Ensure there are no leaks in data structures associated with distributed dml execution.
+ */
+ private void checkNoLeaks() {
+ GridQueryProcessor qryProc = grid(NODE_CLIENT).context().query();
+
+ IgniteH2Indexing h2Idx = GridTestUtils.getFieldValue(qryProc, GridQueryProcessor.class, "idx");
+
+ GridReduceQueryExecutor rdcQryExec = GridTestUtils.getFieldValue(h2Idx, IgniteH2Indexing.class, "rdcQryExec");
+
+ Map updRuns = GridTestUtils.getFieldValue(rdcQryExec, GridReduceQueryExecutor.class, "updRuns");
+
+ assertEquals(0, updRuns.size());
+
+ for (int idx = 0; idx < NODE_COUNT; idx++) {
+ qryProc = grid(idx).context().query();
+
+ h2Idx = GridTestUtils.getFieldValue(qryProc, GridQueryProcessor.class, "idx");
+
+ GridMapQueryExecutor mapQryExec = GridTestUtils.getFieldValue(h2Idx, IgniteH2Indexing.class, "mapQryExec");
+
+ Map qryRess = GridTestUtils.getFieldValue(mapQryExec, GridMapQueryExecutor.class, "qryRess");
+
+ for (Object obj : qryRess.values()) {
+ Map updCancels = GridTestUtils.getFieldValue(obj, "updCancels");
+
+ assertEquals(0, updCancels.size());
+ }
+ }
+ }
+
+ /**
+ * Fills caches with initial data.
+ */
+ private void fillCaches() {
+ Ignite client = grid(NODE_CLIENT);
+
+ IgniteCache<Integer, Position> posCache = client.cache(CACHE_POSITION);
+
+ // Generate positions
+ Position[] positions = new Position[] {
+ new Position(1, "High Ranking Officer", 1),
+ new Position(2, "Administrative worker", 3),
+ new Position(3, "Worker", 7),
+ new Position(4, "Security", 2),
+ new Position(5, "Cleaner", 1)
+ };
+
+ for (Position pos: positions)
+ posCache.put(pos.id, pos);
+
+ // Generate organizations
+ String[] forms = new String[] {" Inc", " Co", " AG", " Industries"};
+ String[] orgNames = new String[] {"Acme", "Sierra", "Mesa", "Umbrella", "Robotics"};
+ String[] names = new String[] {"Mary", "John", "William", "Tom", "Basil", "Ann", "Peter"};
+
+ IgniteCache<PersonKey, Person> personCache = client.cache(CACHE_PERSON);
+
+ IgniteCache<Integer, Organization> orgCache = client.cache(CACHE_ORG);
+
+ int orgId = 0;
+ int personId = 0;
+
+ for (String orgName : produceCombination(orgNames, orgNames, forms)) {
+ Organization org = new Organization(orgName, 1 + orgId);
+
+ orgCache.put(++orgId, org);
+
+ // Generate persons
+
+ List<String> personNames = produceCombination(names, names, new String[]{"s"});
+
+ int positionId = 0;
+ int posCounter = 0;
+
+ for (String name : personNames) {
+ PersonKey pKey = new PersonKey(orgId, ++personId);
+
+ if (positions[positionId].rate < posCounter++) {
+ posCounter = 0;
+ positionId = (positionId + 1) % positions.length;
+ }
+
+ Person person = new Person(name, positions[positionId].id, org.rate * positions[positionId].rate);
+
+ personCache.put(pKey, person);
+ }
+ }
+ }
+
+ /**
+ * Produces all possible combinations.
+ *
+ * @param a First array.
+ * @param b Second array.
+ * @param ends Endings array.
+ * @return Result.
+ */
+ private List<String> produceCombination(String[] a, String[] b, String[] ends) {
+ List<String> res = new ArrayList<>();
+
+ for (String s1 : a) {
+ for (String s2 : b) {
+ if (!s1.equals(s2)) {
+ String end = ends[ThreadLocalRandom8.current().nextInt(ends.length)];
+
+ res.add(s1 + " " + s2 + end);
+ }
+ }
+ }
+
+ return res;
+ }
+
+ /** */
+ private static class Organization {
+ /** */
+ @QuerySqlField
+ String name;
+
+ /** */
+ @QuerySqlField
+ int rate;
+
+ /** */
+ @QuerySqlField
+ Date updated;
+
+ /**
+ * Constructor.
+ *
+ * @param name Organization name.
+ * @param rate Rate.
+ */
+ public Organization(String name, int rate) {
+ this.name = name;
+ this.rate = rate;
+ this.updated = new Date(System.currentTimeMillis());
+ }
+ }
+
+ /** */
+ public static class PersonKey {
+ /** */
+ @AffinityKeyMapped
+ @QuerySqlField
+ private Integer orgId;
+
+ /** */
+ @QuerySqlField
+ private Integer id;
+
+ /**
+ * Constructor.
+ *
+ * @param orgId Organization id.
+ * @param id Person id.
+ */
+ PersonKey(int orgId, int id) {
+ this.orgId = orgId;
+ this.id = id;
+ }
+ }
+
+ /** */
+ public static class Person {
+ /** */
+ @QuerySqlField
+ String name;
+
+ /** */
+ @QuerySqlField
+ int position;
+
+ /** */
+ @QuerySqlField
+ int amount;
+ /** */
+ @QuerySqlField
+ Date updated;
+
+ /**
+ * Constructor.
+ *
+ * @param name Name.
+ * @param position Position.
+ * @param amount Amount.
+ */
+ private Person(String name, int position, int amount) {
+ this.name = name;
+ this.position = position;
+ this.amount = amount;
+
+ this.updated = new Date(System.currentTimeMillis());
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return (name==null? 0: name.hashCode()) ^ position ^ amount ^ (updated == null ? 0 : updated.hashCode());
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object obj) {
+ if (obj == null)
+ return false;
+
+ if (!obj.getClass().equals(Person.class))
+ return false;
+
+ Person other = (Person)obj;
+
+ return F.eq(name, other.name) && position == other.position &&
+ amount == other.amount && F.eq(updated, other.updated);
+ }
+ }
+
+ /** */
+ private static class Position {
+ /** */
+ @QuerySqlField
+ int id;
+
+ /** */
+ @QuerySqlField
+ String name;
+
+ /** */
+ @QuerySqlField
+ int rate;
+
+ /**
+ * Constructor.
+ *
+ * @param id Id.
+ * @param name Name.
+ * @param rate Rate.
+ */
+ public Position(int id, String name, int rate) {
+ this.id = id;
+ this.name = name;
+ this.rate = rate;
+ }
+ }
+
+ /**
+ * SQL function that always fails.
+ *
+ * @param param Arbitrary parameter.
+ * @return Result.
+ */
+ @QuerySqlFunction
+ public static String Fail(String param) {
+ throw new IgniteSQLException("Fail() called");
+ }
+
+ /**
+ * SQL function that waits for condition.
+ *
+ * @param param Arbitrary parameter.
+ * @return Result.
+ */
+ @QuerySqlFunction
+ public static String Wait(String param) {
+ try {
+ if (latch.getCount() > 0) {
+ latch.countDown();
+
+ latch.await(5000, MILLISECONDS);
+ }
+ else
+ Thread.sleep(100);
+ }
+ catch (InterruptedException ignore) {
+ // No-op
+ }
+ return param;
+ }
+
+ /**
+ * SQL function that makes a concurrent modification.
+ *
+ * @param id Id.
+ * @param rate Rate.
+ * @return Result.
+ */
+ @QuerySqlFunction
+ public static int Modify(final int id, final int rate) {
+ try {
+ GridTestUtils.runAsync(new Callable<Object>() {
+ @Override public Object call() {
+ IgniteCache cache = client.cache(CACHE_ORG);
+
+ cache.put(id, new Organization("Acme Inc #" + id, rate + 1));
+
+ return null;
+ }
+ }).get();
+ }
+ catch (Exception e) {
+ // No-op
+ }
+
+ return rate - 1;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
index c49649b..83b4689 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
@@ -123,9 +123,11 @@ import org.apache.ignite.internal.processors.cache.query.IgniteCacheQueryCacheDe
import org.apache.ignite.internal.processors.cache.query.IndexingSpiQuerySelfTest;
import org.apache.ignite.internal.processors.cache.query.IndexingSpiQueryTxSelfTest;
import org.apache.ignite.internal.processors.client.ClientConnectorConfigurationValidationSelfTest;
+import org.apache.ignite.internal.processors.query.IgniteSqlSkipReducerOnUpdateDmlFlagSelfTest;
import org.apache.ignite.internal.processors.query.IgniteSqlParameterizedQueryTest;
import org.apache.ignite.internal.processors.query.h2.IgniteSqlBigIntegerKeyTest;
import org.apache.ignite.internal.processors.query.IgniteQueryDedicatedPoolTest;
+import org.apache.ignite.internal.processors.query.IgniteSqlSkipReducerOnUpdateDmlSelfTest;
import org.apache.ignite.internal.processors.query.IgniteSqlEntryCacheModeAgnosticTest;
import org.apache.ignite.internal.processors.query.IgniteSqlKeyValueFieldsTest;
import org.apache.ignite.internal.processors.query.IgniteSqlNotNullConstraintTest;
@@ -243,6 +245,8 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite {
suite.addTestSuite(IgniteCacheInsertSqlQuerySelfTest.class);
suite.addTestSuite(IgniteCacheUpdateSqlQuerySelfTest.class);
suite.addTestSuite(IgniteCacheDeleteSqlQuerySelfTest.class);
+ suite.addTestSuite(IgniteSqlSkipReducerOnUpdateDmlSelfTest.class);
+ suite.addTestSuite(IgniteSqlSkipReducerOnUpdateDmlFlagSelfTest.class);
suite.addTestSuite(IgniteBinaryObjectQueryArgumentsTest.class);
suite.addTestSuite(IgniteBinaryObjectLocalQueryArgumentsTest.class);
http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/platforms/cpp/odbc-test/src/configuration_test.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc-test/src/configuration_test.cpp b/modules/platforms/cpp/odbc-test/src/configuration_test.cpp
index 7da6757..3165c4d 100644
--- a/modules/platforms/cpp/odbc-test/src/configuration_test.cpp
+++ b/modules/platforms/cpp/odbc-test/src/configuration_test.cpp
@@ -43,6 +43,7 @@ namespace
const bool testReplicatedOnly = true;
const bool testCollocated = true;
const bool testLazy = true;
+ const bool testSkipReducerOnUpdate = true;
const std::string testAddress = testServerHost + ':' + ignite::common::LexicalCast<std::string>(testServerPort);
}
@@ -132,6 +133,7 @@ void CheckConnectionConfig(const Configuration& cfg)
BOOST_CHECK_EQUAL(cfg.IsReplicatedOnly(), testReplicatedOnly);
BOOST_CHECK_EQUAL(cfg.IsCollocated(), testCollocated);
BOOST_CHECK_EQUAL(cfg.IsLazy(), testLazy);
+ BOOST_CHECK_EQUAL(cfg.IsSkipReducerOnUpdate(), testSkipReducerOnUpdate);
std::stringstream constructor;
@@ -143,7 +145,8 @@ void CheckConnectionConfig(const Configuration& cfg)
<< "lazy=" << BoolToStr(testLazy) << ';'
<< "page_size=" << testPageSize << ';'
<< "replicated_only=" << BoolToStr(testReplicatedOnly) << ';'
- << "schema=" << testSchemaName << ';';
+ << "schema=" << testSchemaName << ';'
+ << "skip_reducer_on_update=" << BoolToStr(testReplicatedOnly) << ';';
const std::string& expectedStr = constructor.str();
@@ -164,6 +167,7 @@ void CheckDsnConfig(const Configuration& cfg)
BOOST_CHECK_EQUAL(cfg.IsReplicatedOnly(), false);
BOOST_CHECK_EQUAL(cfg.IsCollocated(), false);
BOOST_CHECK_EQUAL(cfg.IsLazy(), false);
+ BOOST_CHECK_EQUAL(cfg.IsSkipReducerOnUpdate(), false);
}
BOOST_AUTO_TEST_SUITE(ConfigurationTestSuite)
@@ -180,6 +184,8 @@ BOOST_AUTO_TEST_CASE(CheckTestValuesNotEquealDefault)
BOOST_CHECK_NE(testEnforceJoinOrder, Configuration::DefaultValue::enforceJoinOrder);
BOOST_CHECK_NE(testReplicatedOnly, Configuration::DefaultValue::replicatedOnly);
BOOST_CHECK_NE(testCollocated, Configuration::DefaultValue::collocated);
+ BOOST_CHECK_NE(testLazy, Configuration::DefaultValue::lazy);
+ BOOST_CHECK_NE(testSkipReducerOnUpdate, Configuration::DefaultValue::skipReducerOnUpdate);
}
BOOST_AUTO_TEST_CASE(TestConnectStringUppercase)
@@ -196,7 +202,8 @@ BOOST_AUTO_TEST_CASE(TestConnectStringUppercase)
<< "COLLOCATED=" << BoolToStr(testCollocated, false) << ';'
<< "REPLICATED_ONLY=" << BoolToStr(testReplicatedOnly, false) << ';'
<< "PAGE_SIZE=" << testPageSize << ';'
- << "SCHEMA=" << testSchemaName;
+ << "SCHEMA=" << testSchemaName << ';'
+ << "SKIP_REDUCER_ON_UPDATE=" << BoolToStr(testSkipReducerOnUpdate, false);
const std::string& connectStr = constructor.str();
@@ -219,7 +226,8 @@ BOOST_AUTO_TEST_CASE(TestConnectStringLowercase)
<< "enforce_join_order=" << BoolToStr(testEnforceJoinOrder) << ';'
<< "replicated_only=" << BoolToStr(testReplicatedOnly) << ';'
<< "collocated=" << BoolToStr(testCollocated) << ';'
- << "schema=" << testSchemaName;
+ << "schema=" << testSchemaName << ';'
+ << "skip_reducer_on_update=" << BoolToStr(testSkipReducerOnUpdate);
const std::string& connectStr = constructor.str();
@@ -242,7 +250,8 @@ BOOST_AUTO_TEST_CASE(TestConnectStringZeroTerminated)
<< "collocated=" << BoolToStr(testCollocated) << ';'
<< "distributed_joins=" << BoolToStr(testDistributedJoins) << ';'
<< "enforce_join_order=" << BoolToStr(testEnforceJoinOrder) << ';'
- << "schema=" << testSchemaName;
+ << "schema=" << testSchemaName << ';'
+ << "skip_reducer_on_update=" << BoolToStr(testSkipReducerOnUpdate);
const std::string& connectStr = constructor.str();
@@ -265,7 +274,8 @@ BOOST_AUTO_TEST_CASE(TestConnectStringMixed)
<< "Enforce_Join_Order=" << BoolToStr(testEnforceJoinOrder) << ';'
<< "Replicated_Only=" << BoolToStr(testReplicatedOnly, false) << ';'
<< "Collocated=" << BoolToStr(testCollocated) << ';'
- << "Schema=" << testSchemaName;
+ << "Schema=" << testSchemaName << ';'
+ << "Skip_Reducer_On_Update=" << BoolToStr(testSkipReducerOnUpdate);
const std::string& connectStr = constructor.str();
@@ -288,7 +298,8 @@ BOOST_AUTO_TEST_CASE(TestConnectStringWhitepaces)
<< "COLLOCATED =" << BoolToStr(testCollocated, false) << " ;"
<< " REPLICATED_ONLY= " << BoolToStr(testReplicatedOnly, false) << ';'
<< "ENFORCE_JOIN_ORDER= " << BoolToStr(testEnforceJoinOrder, false) << " ;"
- << "SCHEMA = \n\r" << testSchemaName;
+ << "SCHEMA = \n\r" << testSchemaName << ';'
+ << " skip_reducer_on_update=" << BoolToStr(testSkipReducerOnUpdate, false);
const std::string& connectStr = constructor.str();
@@ -358,6 +369,7 @@ BOOST_AUTO_TEST_CASE(TestConnectStringInvalidBoolKeys)
keys.insert("replicated_only");
keys.insert("collocated");
keys.insert("lazy");
+ keys.insert("skip_reducer_on_update");
for (Set::const_iterator it = keys.begin(); it != keys.end(); ++it)
{
@@ -385,6 +397,7 @@ BOOST_AUTO_TEST_CASE(TestConnectStringValidBoolKeys)
keys.insert("replicated_only");
keys.insert("collocated");
keys.insert("lazy");
+ keys.insert("skip_reducer_on_update");
for (Set::const_iterator it = keys.begin(); it != keys.end(); ++it)
{
http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/platforms/cpp/odbc-test/src/queries_test.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc-test/src/queries_test.cpp b/modules/platforms/cpp/odbc-test/src/queries_test.cpp
index 4c7e402..707669d 100644
--- a/modules/platforms/cpp/odbc-test/src/queries_test.cpp
+++ b/modules/platforms/cpp/odbc-test/src/queries_test.cpp
@@ -755,6 +755,14 @@ BOOST_AUTO_TEST_CASE(TestConnectionProtocolVersion_2_1_5)
InsertTestBatch(11, 20, 9);
}
+BOOST_AUTO_TEST_CASE(TestConnectionProtocolVersion_2_3_0)
+{
+ Connect("DRIVER={Apache Ignite};ADDRESS=127.0.0.1:11110;SCHEMA=cache;PROTOCOL_VERSION=2.3.0");
+
+ InsertTestStrings(10, false);
+ InsertTestBatch(11, 20, 9);
+}
+
BOOST_AUTO_TEST_CASE(TestTwoRowsInt8)
{
CheckTwoRowsInt<signed char>(SQL_C_STINYINT);
http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/platforms/cpp/odbc/include/ignite/odbc/config/configuration.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc/include/ignite/odbc/config/configuration.h b/modules/platforms/cpp/odbc/include/ignite/odbc/config/configuration.h
index 2b1ec52..419a65e 100644
--- a/modules/platforms/cpp/odbc/include/ignite/odbc/config/configuration.h
+++ b/modules/platforms/cpp/odbc/include/ignite/odbc/config/configuration.h
@@ -82,6 +82,9 @@ namespace ignite
/** Connection attribute keyword for lazy attribute. */
static const std::string lazy;
+
+ /** Connection attribute keyword for skipReducerOnUpdate attribute. */
+ static const std::string skipReducerOnUpdate;
};
/** Default values for configuration. */
@@ -125,6 +128,9 @@ namespace ignite
/** Default value for lazy attribute. */
static const bool lazy;
+
+ /** Default value for skipReducerOnUpdate attribute. */
+ static const bool skipReducerOnUpdate;
};
/**
@@ -384,6 +390,26 @@ namespace ignite
}
/**
+ * Check update on server flag.
+ *
+ * @return True if update on server.
+ */
+ bool IsSkipReducerOnUpdate() const
+ {
+ return GetBoolValue(Key::skipReducerOnUpdate, DefaultValue::skipReducerOnUpdate);
+ }
+
+ /**
+ * Set update on server.
+ *
+ * @param val Value to set.
+ */
+ void SetSkipReducerOnUpdate(bool val)
+ {
+ SetBoolValue(Key::skipReducerOnUpdate, val);
+ }
+
+ /**
* Get protocol version.
*
* @return Protocol version.
http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/platforms/cpp/odbc/include/ignite/odbc/message.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc/include/ignite/odbc/message.h b/modules/platforms/cpp/odbc/include/ignite/odbc/message.h
index 91a808c..dda0ba9 100644
--- a/modules/platforms/cpp/odbc/include/ignite/odbc/message.h
+++ b/modules/platforms/cpp/odbc/include/ignite/odbc/message.h
@@ -79,9 +79,10 @@ namespace ignite
* @param replicatedOnly Replicated only flag.
* @param collocated Collocated flag.
* @param lazy Lazy flag.
+ * @param skipReducerOnUpdate Skip reducer on update.
*/
HandshakeRequest(const ProtocolVersion& version, bool distributedJoins, bool enforceJoinOrder,
- bool replicatedOnly, bool collocated, bool lazy);
+ bool replicatedOnly, bool collocated, bool lazy, bool skipReducerOnUpdate);
/**
* Destructor.
@@ -112,6 +113,9 @@ namespace ignite
/** Lazy flag. */
bool lazy;
+
+ /** Skip reducer on update flag. */
+ bool skipReducerOnUpdate;
};
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/platforms/cpp/odbc/include/ignite/odbc/protocol_version.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc/include/ignite/odbc/protocol_version.h b/modules/platforms/cpp/odbc/include/ignite/odbc/protocol_version.h
index c36d5dd..e6088a7 100644
--- a/modules/platforms/cpp/odbc/include/ignite/odbc/protocol_version.h
+++ b/modules/platforms/cpp/odbc/include/ignite/odbc/protocol_version.h
@@ -34,6 +34,7 @@ namespace ignite
/** Current protocol version. */
static const ProtocolVersion VERSION_2_1_0;
static const ProtocolVersion VERSION_2_1_5;
+ static const ProtocolVersion VERSION_2_3_0;
typedef std::set<ProtocolVersion> VersionSet;
http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/platforms/cpp/odbc/include/ignite/odbc/system/ui/dsn_configuration_window.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc/include/ignite/odbc/system/ui/dsn_configuration_window.h b/modules/platforms/cpp/odbc/include/ignite/odbc/system/ui/dsn_configuration_window.h
index 2974b67..90286b9 100644
--- a/modules/platforms/cpp/odbc/include/ignite/odbc/system/ui/dsn_configuration_window.h
+++ b/modules/platforms/cpp/odbc/include/ignite/odbc/system/ui/dsn_configuration_window.h
@@ -55,6 +55,7 @@ namespace ignite
REPLICATED_ONLY_CHECK_BOX,
COLLOCATED_CHECK_BOX,
LAZY_CHECK_BOX,
+ SKIP_REDUCER_ON_UPDATE_CHECK_BOX,
PROTOCOL_VERSION_LABEL,
PROTOCOL_VERSION_COMBO_BOX,
OK_BUTTON,
@@ -149,6 +150,9 @@ namespace ignite
/** Lazy CheckBox. */
std::auto_ptr<Window> lazyCheckBox;
+ /** Update on server CheckBox. */
+ std::auto_ptr<Window> skipReducerOnUpdateCheckBox;
+
/** Protocol version edit field. */
std::auto_ptr<Window> protocolVersionLabel;