You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ko...@apache.org on 2024/02/12 14:56:58 UTC
(ignite-3) branch main updated: IGNITE-21341 Sql. Optimize way to execute insert (#3188)
This is an automated email from the ASF dual-hosted git repository.
korlov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new a88346ba07 IGNITE-21341 Sql. Optimize way to execute insert (#3188)
a88346ba07 is described below
commit a88346ba07df29bd4cbb3835b434d2183d74559c
Author: korlov42 <ko...@gridgain.com>
AuthorDate: Mon Feb 12 16:56:52 2024 +0200
IGNITE-21341 Sql. Optimize way to execute insert (#3188)
---
.../internal/sql/engine/ItKeyValueGetTest.java | 14 +-
.../internal/sql/engine/ItKeyValuePutTest.java | 143 ++++++++++++++++
.../internal/sql/engine/ItSecondaryIndexTest.java | 2 +-
.../sql/engine/InternalSqlRowSingleLong.java | 72 ++++++++
.../internal/sql/engine/exec/ExecutablePlan.java | 47 ++++++
.../engine/exec/ExecutableTableRegistryImpl.java | 2 +-
.../sql/engine/exec/ExecutionServiceImpl.java | 111 +++----------
.../sql/engine/exec/LogicalRelImplementor.java | 18 +-
.../internal/sql/engine/exec/UpdatableTable.java | 17 ++
.../sql/engine/exec/UpdatableTableImpl.java | 33 +++-
.../sql/engine/exec/mapping/FragmentMapper.java | 14 +-
.../sql/engine/prepare/IgniteRelShuttle.java | 14 ++
.../sql/engine/prepare/KeyValueGetPlan.java | 137 ++++++++++++++--
.../sql/engine/prepare/KeyValueModifyPlan.java | 146 +++++++++++++++++
.../internal/sql/engine/prepare/PlannerHelper.java | 16 +-
.../internal/sql/engine/prepare/PlannerPhase.java | 15 ++
.../sql/engine/prepare/PrepareServiceImpl.java | 19 ++-
.../internal/sql/engine/rel/IgniteKeyValueGet.java | 113 +++++++++++++
.../sql/engine/rel/IgniteKeyValueModify.java | 125 ++++++++++++++
.../internal/sql/engine/rel/IgniteRelVisitor.java | 10 ++
.../engine/rule/TableModifyToKeyValuePutRule.java | 142 ++++++++++++++++
.../TableScanToKeyValueGetRule.java} | 157 +++++++-----------
.../sql/engine/exec/ExecutionServiceImplTest.java | 2 +-
.../internal/sql/engine/framework/TestNode.java | 3 -
.../sql/engine/planner/AbstractPlannerTest.java | 11 +-
.../sql/engine/planner/DmlPlannerTest.java | 10 +-
.../sql/engine/planner/DynamicParametersTest.java | 7 +
.../sql/engine/planner/ImplicitCastsTest.java | 3 +-
.../engine/planner/KeyValueModifyPlannerTest.java | 181 +++++++++++++++++++++
.../internal/sql/engine/util/StatementChecker.java | 18 +-
.../sql-engine/src/test/resources/mapping/dml.test | 2 +-
31 files changed, 1367 insertions(+), 237 deletions(-)
diff --git a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItKeyValueGetTest.java b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItKeyValueGetTest.java
index 9abf355103..70d27b1163 100644
--- a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItKeyValueGetTest.java
+++ b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItKeyValueGetTest.java
@@ -55,7 +55,7 @@ public class ItKeyValueGetTest extends BaseSqlIntegrationTest {
int key = randomKey();
assertQuery("SELECT * FROM simple_key WHERE id = ?")
- .matches(containsSubPlan("IgnitePkLookup"))
+ .matches(containsSubPlan("IgniteKeyValueGet"))
.withParams(key)
.returns(key, key)
.check();
@@ -66,7 +66,7 @@ public class ItKeyValueGetTest extends BaseSqlIntegrationTest {
int key = randomKey();
assertQuery("SELECT * FROM complex_key_normal_order WHERE id1 = ? AND id2 = ?")
- .matches(containsSubPlan("IgnitePkLookup"))
+ .matches(containsSubPlan("IgniteKeyValueGet"))
.withParams(key, 2 * key)
.returns(key, 2 * key, key)
.check();
@@ -78,7 +78,7 @@ public class ItKeyValueGetTest extends BaseSqlIntegrationTest {
int key = randomKey();
assertQuery("SELECT * FROM complex_key_revers_order WHERE id1 = ? AND id2 = ?")
- .matches(containsSubPlan("IgnitePkLookup"))
+ .matches(containsSubPlan("IgniteKeyValueGet"))
.withParams(key, 2 * key)
.returns(key, 2 * key, key)
.check();
@@ -87,13 +87,13 @@ public class ItKeyValueGetTest extends BaseSqlIntegrationTest {
@Test
void lookupBySimpleKeyWithPostFiltration() {
assertQuery("SELECT * FROM simple_key WHERE id = ? AND val > 5")
- .matches(containsSubPlan("IgnitePkLookup"))
+ .matches(containsSubPlan("IgniteKeyValueGet"))
.withParams(1)
.returnNothing()
.check();
assertQuery("SELECT * FROM simple_key WHERE id = ? AND val > 5")
- .matches(containsSubPlan("IgnitePkLookup"))
+ .matches(containsSubPlan("IgniteKeyValueGet"))
.withParams(6)
.returns(6, 6)
.check();
@@ -104,13 +104,13 @@ public class ItKeyValueGetTest extends BaseSqlIntegrationTest {
int key = randomKey();
assertQuery("SELECT val FROM simple_key WHERE id = ?")
- .matches(containsSubPlan("IgnitePkLookup"))
+ .matches(containsSubPlan("IgniteKeyValueGet"))
.withParams(key)
.returns(key)
.check();
assertQuery("SELECT id, val * 10 FROM simple_key WHERE id = ?")
- .matches(containsSubPlan("IgnitePkLookup"))
+ .matches(containsSubPlan("IgniteKeyValueGet"))
.withParams(key)
.returns(key, key * 10)
.check();
diff --git a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItKeyValuePutTest.java b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItKeyValuePutTest.java
new file mode 100644
index 0000000000..c10bcf1a7b
--- /dev/null
+++ b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItKeyValuePutTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine;
+
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+import static org.apache.ignite.internal.sql.engine.util.QueryChecker.containsSubPlan;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrows;
+
+import org.apache.ignite.internal.sql.BaseSqlIntegrationTest;
+import org.apache.ignite.sql.Session;
+import org.apache.ignite.sql.SqlException;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests to verify e2e cases of optimized insert.
+ */
+public class ItKeyValuePutTest extends BaseSqlIntegrationTest {
+ private static final int TABLE_SIZE = 10;
+
+ @BeforeAll
+ @SuppressWarnings({"ConcatenationWithEmptyString", "resource"})
+ static void initSchema() {
+ try (Session session = CLUSTER.aliveNode().sql().createSession()) {
+ session.executeScript(""
+ + "CREATE TABLE simple_key (id INT PRIMARY KEY, val INT);"
+ + "CREATE TABLE complex_key (id1 INT, id2 INT, val INT, PRIMARY KEY(id1, id2));"
+ );
+ }
+ }
+
+ @AfterEach
+ void clearTables() {
+ sql("DELETE FROM simple_key");
+ sql("DELETE FROM complex_key");
+ }
+
+ @Test
+ void insertConstantSimpleKey() {
+ for (int i = 0; i < TABLE_SIZE; i++) {
+ assertQuery(format("INSERT INTO simple_key VALUES ({}, {})", i, i))
+ .matches(containsSubPlan("IgniteKeyValueModify"))
+ .returns(1L)
+ .check();
+ }
+
+ for (int i = 0; i < TABLE_SIZE; i++) {
+ assertQuery("SELECT * FROM simple_key WHERE id = ?")
+ .withParams(i)
+ .returns(i, i)
+ .check();
+ }
+ }
+
+ @Test
+ void insertDynamicParamsSimpleKey() {
+ for (int i = 0; i < TABLE_SIZE; i++) {
+ assertQuery("INSERT INTO simple_key VALUES (?, ?)")
+ .matches(containsSubPlan("IgniteKeyValueModify"))
+ .withParams(i, i)
+ .returns(1L)
+ .check();
+ }
+
+ for (int i = 0; i < TABLE_SIZE; i++) {
+ assertQuery("SELECT * FROM simple_key WHERE id = ?")
+ .withParams(i)
+ .returns(i, i)
+ .check();
+ }
+ }
+
+ @Test
+ void insertSimpleKeyWithCast() {
+ for (int i = 0; i < TABLE_SIZE; i++) {
+ assertQuery("INSERT INTO simple_key VALUES (?, ?)")
+ .matches(containsSubPlan("IgniteKeyValueModify"))
+ .withParams((byte) i, (byte) i)
+ .returns(1L)
+ .check();
+ }
+
+ for (int i = 0; i < TABLE_SIZE; i++) {
+ assertQuery("SELECT * FROM simple_key WHERE id = ?")
+ .withParams(i)
+ .returns(i, i)
+ .check();
+ }
+ }
+
+ @Test
+ void insertComplexKey() {
+ for (int i = 0; i < TABLE_SIZE; i++) {
+ assertQuery("INSERT INTO complex_key VALUES (?, ?, ?)")
+ .matches(containsSubPlan("IgniteKeyValueModify"))
+ .withParams(i, 2 * i, i)
+ .returns(1L)
+ .check();
+ }
+
+ for (int i = 0; i < TABLE_SIZE; i++) {
+ assertQuery("SELECT * FROM complex_key WHERE id1 = ? AND id2 = ?")
+ .withParams(i, 2 * i)
+ .returns(i, 2 * i, i)
+ .check();
+ }
+ }
+
+ @Test
+ @SuppressWarnings("ThrowableNotThrown")
+ void exceptionIsThrownOnKeyViolation() {
+ String insertStatement = "INSERT INTO simple_key VALUES (1, 1)";
+ sql(insertStatement);
+
+ assertThrows(
+ SqlException.class,
+ () -> sql(insertStatement),
+ "PK unique constraint is violated"
+ );
+
+ assertThrows(
+ SqlException.class,
+ () -> sql("INSERT INTO complex_key(id1, val) VALUES (1, 1)"),
+ "Column 'ID2' does not allow NULLs"
+ );
+ }
+}
diff --git a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSecondaryIndexTest.java b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSecondaryIndexTest.java
index 8144190cef..b26d95720e 100644
--- a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSecondaryIndexTest.java
+++ b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSecondaryIndexTest.java
@@ -202,7 +202,7 @@ public class ItSecondaryIndexTest extends BaseSqlIntegrationTest {
@Test
public void testKeyEqualsFilter() {
assertQuery("SELECT * FROM Developer WHERE id=2")
- .matches(containsSubPlan("IgnitePkLookup(table=[[PUBLIC, DEVELOPER]]"))
+ .matches(containsSubPlan("IgniteKeyValueGet(table=[[PUBLIC, DEVELOPER]]"))
.returns(2, "Beethoven", 2, "Vienna", 44)
.check();
}
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/InternalSqlRowSingleLong.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/InternalSqlRowSingleLong.java
new file mode 100644
index 0000000000..3d798daf16
--- /dev/null
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/InternalSqlRowSingleLong.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine;
+
+import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
+import org.apache.ignite.internal.schema.BinaryTuple;
+
+/**
+ * Implementation of {@code InternalSqlRow} allowing to represent a SQL row with a single long column.
+ */
+public class InternalSqlRowSingleLong implements InternalSqlRow {
+ private final long val;
+ private BinaryTuple row;
+
+ /**
+ * Constructor.
+ *
+ * @param val Value for single column row.
+ */
+ public InternalSqlRowSingleLong(long val) {
+ this.val = val;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public Object get(int idx) {
+ assert idx == 0;
+ return val;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public int fieldCount() {
+ return 1;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public BinaryTuple asBinaryTuple() {
+ if (row == null) {
+ row = new BinaryTuple(1, new BinaryTupleBuilder(1, estimateSize(val)).appendLong(val).build());
+ }
+ return row;
+ }
+
+ private static int estimateSize(long value) {
+ if (value <= Byte.MAX_VALUE && value >= Byte.MIN_VALUE) {
+ return Byte.BYTES;
+ } else if (value <= Short.MAX_VALUE && value >= Short.MIN_VALUE) {
+ return Short.BYTES;
+ } else if (value <= Integer.MAX_VALUE && value >= Integer.MIN_VALUE) {
+ return Integer.BYTES;
+ }
+
+ return Long.BYTES;
+ }
+}
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutablePlan.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutablePlan.java
new file mode 100644
index 0000000000..de68db0e9e
--- /dev/null
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutablePlan.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec;
+
+import org.apache.ignite.internal.sql.engine.InternalSqlRow;
+import org.apache.ignite.internal.sql.engine.QueryPrefetchCallback;
+import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.internal.util.AsyncCursor;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Denotes a plan that can evaluates itself.
+ */
+@FunctionalInterface
+public interface ExecutablePlan {
+ /**
+ * Evaluates plan and returns cursor over result.
+ *
+ * @param ctx An execution context.
+ * @param tx A transaction to use to access the data.
+ * @param tableRegistry A registry to resolve executable table to evaluate the plan.
+ * @param firstPageReadyCallback A callback to notify when first page has been prefetched.
+ * @param <RowT> A type of the sql row.
+ * @return Cursor over result of the evaluation.
+ */
+ <RowT> AsyncCursor<InternalSqlRow> execute(
+ ExecutionContext<RowT> ctx,
+ InternalTransaction tx,
+ ExecutableTableRegistry tableRegistry,
+ @Nullable QueryPrefetchCallback firstPageReadyCallback
+ );
+}
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutableTableRegistryImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutableTableRegistryImpl.java
index 1492aa415d..8575412e39 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutableTableRegistryImpl.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutableTableRegistryImpl.java
@@ -93,7 +93,7 @@ public class ExecutableTableRegistryImpl implements ExecutableTableRegistry {
TableRowConverter rowConverter = converterFactory.create(null);
UpdatableTableImpl updatableTable = new UpdatableTableImpl(sqlTable.id(), tableDescriptor, internalTable.partitions(),
- replicaService, clock, rowConverter);
+ internalTable, replicaService, clock, rowConverter);
return new ExecutableTableImpl(scannableTable, updatableTable, sqlTable.partitionCalculator());
});
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
index ccf603e995..4d68e0bf49 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
@@ -25,11 +25,11 @@ import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFu
import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
import com.github.benmanes.caffeine.cache.Caffeine;
+import it.unimi.dsi.fastutil.longs.Long2ObjectMaps;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
@@ -52,14 +52,8 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
import java.util.function.Function;
-import java.util.function.Predicate;
-import java.util.function.Supplier;
import java.util.stream.Collectors;
-import org.apache.calcite.plan.RelOptUtil;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rex.RexNode;
import org.apache.calcite.tools.Frameworks;
-import org.apache.calcite.util.ImmutableBitSet;
import org.apache.ignite.configuration.ConfigurationChangeException;
import org.apache.ignite.internal.lang.IgniteBiTuple;
import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
@@ -76,7 +70,6 @@ import org.apache.ignite.internal.sql.engine.NodeLeftException;
import org.apache.ignite.internal.sql.engine.QueryCancelledException;
import org.apache.ignite.internal.sql.engine.QueryPrefetchCallback;
import org.apache.ignite.internal.sql.engine.SqlQueryType;
-import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory;
import org.apache.ignite.internal.sql.engine.exec.ddl.DdlCommandHandler;
import org.apache.ignite.internal.sql.engine.exec.mapping.ColocationGroup;
import org.apache.ignite.internal.sql.engine.exec.mapping.FragmentDescription;
@@ -85,7 +78,6 @@ import org.apache.ignite.internal.sql.engine.exec.mapping.MappingService;
import org.apache.ignite.internal.sql.engine.exec.rel.AbstractNode;
import org.apache.ignite.internal.sql.engine.exec.rel.AsyncRootNode;
import org.apache.ignite.internal.sql.engine.exec.rel.Outbox;
-import org.apache.ignite.internal.sql.engine.exec.row.RowSchema;
import org.apache.ignite.internal.sql.engine.message.ErrorMessage;
import org.apache.ignite.internal.sql.engine.message.MessageService;
import org.apache.ignite.internal.sql.engine.message.QueryCloseMessage;
@@ -97,7 +89,6 @@ import org.apache.ignite.internal.sql.engine.prepare.DdlPlan;
import org.apache.ignite.internal.sql.engine.prepare.ExplainPlan;
import org.apache.ignite.internal.sql.engine.prepare.Fragment;
import org.apache.ignite.internal.sql.engine.prepare.IgniteRelShuttle;
-import org.apache.ignite.internal.sql.engine.prepare.KeyValueGetPlan;
import org.apache.ignite.internal.sql.engine.prepare.MultiStepPlan;
import org.apache.ignite.internal.sql.engine.prepare.QueryPlan;
import org.apache.ignite.internal.sql.engine.rel.IgniteIndexScan;
@@ -138,6 +129,10 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve
private static final List<InternalSqlRow> NOT_APPLIED_ANSWER = List.of(new InternalSqlRowSingleBoolean(false));
+ private static final FragmentDescription DUMMY_DESCRIPTION = new FragmentDescription(
+ 0, true, Long2ObjectMaps.emptyMap(), null, null
+ );
+
private final MessageService messageService;
private final TopologyService topSrvc;
@@ -304,10 +299,9 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve
switch (queryType) {
case DML:
- return executeQuery(tx, ctx, (MultiStepPlan) plan);
case QUERY:
- if (plan instanceof KeyValueGetPlan) {
- return executeKeyValueGetOperation(tx, ctx, (KeyValueGetPlan) plan, ctx.prefetchCallback());
+ if (plan instanceof ExecutablePlan) {
+ return executeExecutablePlan(tx, ctx, (ExecutablePlan) plan, ctx.prefetchCallback());
}
assert plan instanceof MultiStepPlan : plan.getClass();
@@ -334,84 +328,24 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve
return mgr.close(true);
}
- private AsyncCursor<InternalSqlRow> executeKeyValueGetOperation(
+ private AsyncCursor<InternalSqlRow> executeExecutablePlan(
InternalTransaction tx,
BaseQueryContext ctx,
- KeyValueGetPlan plan,
+ ExecutablePlan plan,
@Nullable QueryPrefetchCallback callback
) {
- IgniteTable sqlTable = plan.table();
- CompletableFuture<Iterator<InternalSqlRow>> result = tableRegistry.getTable(ctx.schemaVersion(), sqlTable.id())
- .thenCompose(execTable -> {
- ExecutionContext<RowT> ectx = new ExecutionContext<>(
- taskExecutor,
- ctx.queryId(),
- localNode,
- localNode.name(),
- null,
- handler,
- Commons.parametersMap(ctx.parameters()),
- TxAttributes.fromTx(tx)
- );
-
- ImmutableBitSet requiredColumns = plan.lookupNode().requiredColumns();
- RexNode filterExpr = plan.lookupNode().condition();
- List<RexNode> projectionExpr = plan.lookupNode().projects();
- List<RexNode> keyExpressions = plan.lookupNode().keyExpressions();
-
- RelDataType rowType = sqlTable.getRowType(Commons.typeFactory(), requiredColumns);
-
- Supplier<RowT> keySupplier = ectx.expressionFactory()
- .rowSource(keyExpressions);
- Predicate<RowT> filter = filterExpr == null ? null : ectx.expressionFactory()
- .predicate(filterExpr, rowType);
- Function<RowT, RowT> projection = projectionExpr == null ? null : ectx.expressionFactory()
- .project(projectionExpr, rowType);
-
- RowHandler<RowT> rowHandler = ectx.rowHandler();
- RowSchema rowSchema = TypeUtils.rowSchemaFromRelTypes(RelOptUtil.getFieldTypeList(rowType));
- RowFactory<RowT> rowFactory = rowHandler.factory(rowSchema);
-
- RelDataType resultType = plan.lookupNode().getRowType();
- BiFunction<Integer, Object, Object> internalTypeConverter = TypeUtils.resultTypeConverter(ectx, resultType);
-
- ScannableTable scannableTable = execTable.scannableTable();
- Function<RowT, Iterator<InternalSqlRow>> postProcess = row -> {
- if (row == null) {
- return Collections.emptyIterator();
- }
-
- if (filter != null && !filter.test(row)) {
- return Collections.emptyIterator();
- }
-
- if (projection != null) {
- row = projection.apply(row);
- }
-
- return List.<InternalSqlRow>of(
- new InternalSqlRowImpl<>(row, rowHandler, internalTypeConverter)
- ).iterator();
- };
-
- CompletableFuture<RowT> lookupResult = scannableTable.primaryKeyLookup(
- ectx, tx, rowFactory, keySupplier.get(), requiredColumns.toBitSet()
- );
-
- if (projection == null && filter == null) {
- // no arbitrary computations, should be safe to proceed execution on
- // thread that completes the future
- return lookupResult.thenApply(postProcess);
- } else {
- return lookupResult.thenApplyAsync(postProcess, taskExecutor);
- }
- });
-
- if (callback != null) {
- result.whenCompleteAsync((res, err) -> callback.onPrefetchComplete(err), taskExecutor);
- }
+ ExecutionContext<RowT> ectx = new ExecutionContext<>(
+ taskExecutor,
+ ctx.queryId(),
+ localNode,
+ localNode.name(),
+ DUMMY_DESCRIPTION,
+ handler,
+ Commons.parametersMap(ctx.parameters()),
+ TxAttributes.fromTx(tx)
+ );
- return new AsyncWrapper<>(result, Runnable::run);
+ return plan.execute(ectx, tx, tableRegistry, callback);
}
private AsyncCursor<InternalSqlRow> executeDdl(DdlPlan plan, @Nullable QueryPrefetchCallback callback) {
@@ -1075,6 +1009,11 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve
}
private void enlistPartitions(MappedFragment mappedFragment, InternalTransaction tx) {
+ // no need to traverse the tree if fragment has no tables
+ if (mappedFragment.fragment().tables().isEmpty()) {
+ return;
+ }
+
new IgniteRelShuttle() {
@Override
public IgniteRel visit(IgniteIndexScan rel) {
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
index 520224b86e..2fcf7182b0 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
@@ -82,6 +82,8 @@ import org.apache.ignite.internal.sql.engine.rel.IgniteExchange;
import org.apache.ignite.internal.sql.engine.rel.IgniteFilter;
import org.apache.ignite.internal.sql.engine.rel.IgniteHashIndexSpool;
import org.apache.ignite.internal.sql.engine.rel.IgniteIndexScan;
+import org.apache.ignite.internal.sql.engine.rel.IgniteKeyValueGet;
+import org.apache.ignite.internal.sql.engine.rel.IgniteKeyValueModify;
import org.apache.ignite.internal.sql.engine.rel.IgniteLimit;
import org.apache.ignite.internal.sql.engine.rel.IgniteMergeJoin;
import org.apache.ignite.internal.sql.engine.rel.IgniteNestedLoopJoin;
@@ -207,7 +209,7 @@ public class LogicalRelImplementor<RowT> implements IgniteRelVisitor<Node<RowT>>
public Node<RowT> visit(IgniteTrimExchange rel) {
assert TraitUtils.distribution(rel).getType() == HASH_DISTRIBUTED;
- ColocationGroup targetGroup = ctx.target();
+ ColocationGroup targetGroup = ctx.group(rel.sourceId());
assert targetGroup != null;
@@ -877,7 +879,19 @@ public class LogicalRelImplementor<RowT> implements IgniteRelVisitor<Node<RowT>>
/** {@inheritDoc} */
@Override
public Node<RowT> visit(IgniteExchange rel) {
- throw new AssertionError();
+ throw new AssertionError(rel.getClass());
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public Node<RowT> visit(IgniteKeyValueGet rel) {
+ throw new AssertionError(rel.getClass());
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public Node<RowT> visit(IgniteKeyValueModify rel) {
+ throw new AssertionError(rel.getClass());
}
private Node<RowT> visit(RelNode rel) {
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/UpdatableTable.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/UpdatableTable.java
index 7e021c096a..f7e49b4a4e 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/UpdatableTable.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/UpdatableTable.java
@@ -24,6 +24,7 @@ import org.apache.ignite.internal.sql.engine.exec.mapping.ColocationGroup;
import org.apache.ignite.internal.sql.engine.exec.rel.ModifyNode;
import org.apache.ignite.internal.sql.engine.schema.TableDescriptor;
import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
+import org.apache.ignite.internal.tx.InternalTransaction;
/**
* The interface describe a table that could be updated by {@link ModifyNode}.
@@ -50,6 +51,22 @@ public interface UpdatableTable {
ColocationGroup colocationGroup
);
+ /**
+ * Insert given row into the table.
+ *
+ * <p>This method accepts instance of the transaction, thus MUST be issued on initiator node.
+ *
+ * @param tx A transaction within which the insert is issued.
+ * @param ectx An execution context. Used mainly to acquire {@link RowHandler}.
+ * @param row A row to insert.
+ * @param <RowT> A type of sql row.
+ * @return Future representing result of operation. Future will be completed successfully
+ * iif row has been inserted, will be completed exceptionally otherwise.
+ */
+ <RowT> CompletableFuture<Void> insert(
+ InternalTransaction tx, ExecutionContext<RowT> ectx, RowT row
+ );
+
/**
* Updates rows if they are exists, inserts the rows otherwise.
*
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/UpdatableTableImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/UpdatableTableImpl.java
index 58c112d939..8a508b490e 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/UpdatableTableImpl.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/UpdatableTableImpl.java
@@ -42,11 +42,13 @@ import org.apache.ignite.internal.sql.engine.exec.mapping.ColocationGroup;
import org.apache.ignite.internal.sql.engine.exec.row.RowSchema;
import org.apache.ignite.internal.sql.engine.schema.TableDescriptor;
import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
+import org.apache.ignite.internal.table.InternalTable;
import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
import org.apache.ignite.internal.table.distributed.command.TablePartitionIdMessage;
import org.apache.ignite.internal.table.distributed.replication.request.ReadWriteMultiRowReplicaRequest;
import org.apache.ignite.internal.table.distributed.replicator.action.RequestType;
import org.apache.ignite.internal.table.distributed.storage.RowBatch;
+import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.sql.SqlException;
@@ -65,26 +67,26 @@ public final class UpdatableTableImpl implements UpdatableTable {
private final HybridClock clock;
+ private final InternalTable table;
+
private final ReplicaService replicaService;
private final PartitionExtractor partitionExtractor;
private final TableRowConverter rowConverter;
- /**
- * Constructor.
- *
- * @param desc Table descriptor.
- */
- public UpdatableTableImpl(
+ /** Constructor. */
+ UpdatableTableImpl(
int tableId,
TableDescriptor desc,
int partitions,
+ InternalTable table,
ReplicaService replicaService,
HybridClock clock,
TableRowConverter rowConverter
) {
this.tableId = tableId;
+ this.table = table;
this.desc = desc;
this.replicaService = replicaService;
this.clock = clock;
@@ -112,6 +114,7 @@ public final class UpdatableTableImpl implements UpdatableTable {
rowsByPartition.computeIfAbsent(partitionExtractor.fromRow(binaryRow), k -> new ArrayList<>()).add(binaryRow);
}
+ @SuppressWarnings("unchecked")
CompletableFuture<List<RowT>>[] futures = new CompletableFuture[rowsByPartition.size()];
int batchNum = 0;
@@ -172,6 +175,23 @@ public final class UpdatableTableImpl implements UpdatableTable {
return desc;
}
+ /** {@inheritDoc} */
+ @Override
+ public <RowT> CompletableFuture<Void> insert(InternalTransaction tx, ExecutionContext<RowT> ectx, RowT row) {
+ BinaryRowEx tableRow = rowConverter.toBinaryRow(ectx, row, false);
+
+ return table.insert(tableRow, tx)
+ .thenApply(success -> {
+ if (success) {
+ return null;
+ }
+
+ RowHandler<RowT> rowHandler = ectx.rowHandler();
+
+ throw conflictKeysException(List.of(rowHandler.toString(row)));
+ });
+ }
+
/** {@inheritDoc} */
@Override
public <RowT> CompletableFuture<?> insertAll(
@@ -253,6 +273,7 @@ public final class UpdatableTableImpl implements UpdatableTable {
keyRowsByPartition.computeIfAbsent(partitionExtractor.fromRow(binaryRow), k -> new ArrayList<>()).add(binaryRow);
}
+ @SuppressWarnings("unchecked")
CompletableFuture<List<RowT>>[] futures = new CompletableFuture[keyRowsByPartition.size()];
int batchNum = 0;
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentMapper.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentMapper.java
index f42e56ef25..d9d6129a7b 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentMapper.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentMapper.java
@@ -37,6 +37,8 @@ import org.apache.ignite.internal.sql.engine.rel.IgniteExchange;
import org.apache.ignite.internal.sql.engine.rel.IgniteFilter;
import org.apache.ignite.internal.sql.engine.rel.IgniteHashIndexSpool;
import org.apache.ignite.internal.sql.engine.rel.IgniteIndexScan;
+import org.apache.ignite.internal.sql.engine.rel.IgniteKeyValueGet;
+import org.apache.ignite.internal.sql.engine.rel.IgniteKeyValueModify;
import org.apache.ignite.internal.sql.engine.rel.IgniteLimit;
import org.apache.ignite.internal.sql.engine.rel.IgniteMergeJoin;
import org.apache.ignite.internal.sql.engine.rel.IgniteNestedLoopJoin;
@@ -183,7 +185,17 @@ class FragmentMapper {
@Override
public Mapping visit(IgniteExchange rel) {
- throw new AssertionError("Unexpected call: " + rel);
+ throw new AssertionError(rel.getClass());
+ }
+
+ @Override
+ public Mapping visit(IgniteKeyValueGet rel) {
+ throw new AssertionError(rel.getClass());
+ }
+
+ @Override
+ public Mapping visit(IgniteKeyValueModify rel) {
+ throw new AssertionError(rel.getClass());
}
@Override
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/IgniteRelShuttle.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/IgniteRelShuttle.java
index 8ff0c846f6..6a980e9332 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/IgniteRelShuttle.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/IgniteRelShuttle.java
@@ -23,6 +23,8 @@ import org.apache.ignite.internal.sql.engine.rel.IgniteExchange;
import org.apache.ignite.internal.sql.engine.rel.IgniteFilter;
import org.apache.ignite.internal.sql.engine.rel.IgniteHashIndexSpool;
import org.apache.ignite.internal.sql.engine.rel.IgniteIndexScan;
+import org.apache.ignite.internal.sql.engine.rel.IgniteKeyValueGet;
+import org.apache.ignite.internal.sql.engine.rel.IgniteKeyValueModify;
import org.apache.ignite.internal.sql.engine.rel.IgniteLimit;
import org.apache.ignite.internal.sql.engine.rel.IgniteMergeJoin;
import org.apache.ignite.internal.sql.engine.rel.IgniteNestedLoopJoin;
@@ -103,6 +105,18 @@ public class IgniteRelShuttle implements IgniteRelVisitor<IgniteRel> {
return processNode(rel);
}
+ /** {@inheritDoc} */
+ @Override
+ public IgniteRel visit(IgniteKeyValueGet rel) {
+ return processNode(rel);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public IgniteRel visit(IgniteKeyValueModify rel) {
+ return processNode(rel);
+ }
+
/** {@inheritDoc} */
@Override
public IgniteRel visit(IgniteColocatedHashAggregate rel) {
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/KeyValueGetPlan.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/KeyValueGetPlan.java
index 891cc93186..e61f437255 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/KeyValueGetPlan.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/KeyValueGetPlan.java
@@ -17,31 +17,66 @@
package org.apache.ignite.internal.sql.engine.prepare;
-import java.util.concurrent.ExecutorService;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlExplainLevel;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.sql.engine.InternalSqlRow;
+import org.apache.ignite.internal.sql.engine.InternalSqlRowImpl;
+import org.apache.ignite.internal.sql.engine.QueryPrefetchCallback;
import org.apache.ignite.internal.sql.engine.SqlQueryType;
-import org.apache.ignite.internal.sql.engine.rel.IgnitePkLookup;
+import org.apache.ignite.internal.sql.engine.exec.ExecutablePlan;
+import org.apache.ignite.internal.sql.engine.exec.ExecutableTableRegistry;
+import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
+import org.apache.ignite.internal.sql.engine.exec.RowHandler;
+import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory;
+import org.apache.ignite.internal.sql.engine.exec.ScannableTable;
+import org.apache.ignite.internal.sql.engine.exec.row.RowSchema;
+import org.apache.ignite.internal.sql.engine.rel.IgniteKeyValueGet;
import org.apache.ignite.internal.sql.engine.rel.IgniteRel;
import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
import org.apache.ignite.internal.sql.engine.util.Cloner;
import org.apache.ignite.internal.sql.engine.util.Commons;
+import org.apache.ignite.internal.sql.engine.util.TypeUtils;
+import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.internal.util.AsyncCursor;
+import org.apache.ignite.internal.util.AsyncWrapper;
import org.apache.ignite.sql.ResultSetMetadata;
+import org.jetbrains.annotations.Nullable;
/**
* Plan representing single lookup by a primary key.
- *
- * <p>Note: since {@link IgnitePkLookup} is not supposed to be a part of distributed
- * query plan, we need a new object to be able to handle it differently by {@link ExecutorService}.
*/
-public class KeyValueGetPlan implements ExplainablePlan {
+public class KeyValueGetPlan implements ExplainablePlan, ExecutablePlan {
+ private static final IgniteLogger LOG = Loggers.forClass(KeyValueGetPlan.class);
+
private final PlanId id;
- private final IgnitePkLookup lookupNode;
+ private final int schemaVersion;
+ private final IgniteKeyValueGet lookupNode;
private final ResultSetMetadata meta;
private final ParameterMetadata parameterMetadata;
- KeyValueGetPlan(PlanId id, IgnitePkLookup lookupNode, ResultSetMetadata meta, ParameterMetadata parameterMetadata) {
+ KeyValueGetPlan(
+ PlanId id,
+ int schemaVersion,
+ IgniteKeyValueGet lookupNode,
+ ResultSetMetadata meta,
+ ParameterMetadata parameterMetadata
+ ) {
this.id = id;
+ this.schemaVersion = schemaVersion;
this.lookupNode = lookupNode;
this.meta = meta;
this.parameterMetadata = parameterMetadata;
@@ -72,7 +107,7 @@ public class KeyValueGetPlan implements ExplainablePlan {
}
/** Returns a table in question. */
- public IgniteTable table() {
+ private IgniteTable table() {
IgniteTable table = lookupNode.getTable().unwrap(IgniteTable.class);
assert table != null : lookupNode.getTable();
@@ -87,7 +122,89 @@ public class KeyValueGetPlan implements ExplainablePlan {
return RelOptUtil.toString(clonedRoot, SqlExplainLevel.ALL_ATTRIBUTES);
}
- public IgnitePkLookup lookupNode() {
+ public IgniteKeyValueGet lookupNode() {
return lookupNode;
}
+
+ @Override
+ public <RowT> AsyncCursor<InternalSqlRow> execute(
+ ExecutionContext<RowT> ctx,
+ InternalTransaction tx,
+ ExecutableTableRegistry tableRegistry,
+ @Nullable QueryPrefetchCallback firstPageReadyCallback
+ ) {
+ IgniteTable sqlTable = table();
+
+ CompletableFuture<Iterator<InternalSqlRow>> result = tableRegistry.getTable(schemaVersion, sqlTable.id())
+ .thenCompose(execTable -> {
+
+ ImmutableBitSet requiredColumns = lookupNode.requiredColumns();
+ RexNode filterExpr = lookupNode.condition();
+ List<RexNode> projectionExpr = lookupNode.projects();
+ List<RexNode> keyExpressions = lookupNode.keyExpressions();
+
+ RelDataType rowType = sqlTable.getRowType(Commons.typeFactory(), requiredColumns);
+
+ Supplier<RowT> keySupplier = ctx.expressionFactory()
+ .rowSource(keyExpressions);
+ Predicate<RowT> filter = filterExpr == null ? null : ctx.expressionFactory()
+ .predicate(filterExpr, rowType);
+ Function<RowT, RowT> projection = projectionExpr == null ? null : ctx.expressionFactory()
+ .project(projectionExpr, rowType);
+
+ RowHandler<RowT> rowHandler = ctx.rowHandler();
+ RowSchema rowSchema = TypeUtils.rowSchemaFromRelTypes(RelOptUtil.getFieldTypeList(rowType));
+ RowFactory<RowT> rowFactory = rowHandler.factory(rowSchema);
+
+ RelDataType resultType = lookupNode.getRowType();
+ BiFunction<Integer, Object, Object> internalTypeConverter = TypeUtils.resultTypeConverter(ctx, resultType);
+
+ ScannableTable scannableTable = execTable.scannableTable();
+ Function<RowT, Iterator<InternalSqlRow>> postProcess = row -> {
+ if (row == null) {
+ return Collections.emptyIterator();
+ }
+
+ if (filter != null && !filter.test(row)) {
+ return Collections.emptyIterator();
+ }
+
+ if (projection != null) {
+ row = projection.apply(row);
+ }
+
+ return List.<InternalSqlRow>of(
+ new InternalSqlRowImpl<>(row, rowHandler, internalTypeConverter)
+ ).iterator();
+ };
+
+ CompletableFuture<RowT> lookupResult = scannableTable.primaryKeyLookup(
+ ctx, tx, rowFactory, keySupplier.get(), requiredColumns.toBitSet()
+ );
+
+ if (projection == null && filter == null) {
+ // no arbitrary computations, should be safe to proceed execution on
+ // thread that completes the future
+ return lookupResult.thenApply(postProcess);
+ } else {
+ Executor executor = task -> ctx.execute(task::run, error -> {
+ // this executor is used to process future chain, so any unhandled exception
+ // should be wrapped with CompletionException and returned as a result, implying
+ // no error handler should be called.
+ // But just in case there is error in future processing pipeline let's log error
+ LOG.error("Unexpected error", error);
+ });
+
+ return lookupResult.thenApplyAsync(postProcess, executor);
+ }
+ });
+
+ if (firstPageReadyCallback != null) {
+ Executor executor = task -> ctx.execute(task::run, firstPageReadyCallback::onPrefetchComplete);
+
+ result.whenCompleteAsync((res, err) -> firstPageReadyCallback.onPrefetchComplete(err), executor);
+ }
+
+ return new AsyncWrapper<>(result, Runnable::run);
+ }
}
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/KeyValueModifyPlan.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/KeyValueModifyPlan.java
new file mode 100644
index 0000000000..0a6df52446
--- /dev/null
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/KeyValueModifyPlan.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.prepare;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.function.Supplier;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlExplainLevel;
+import org.apache.ignite.internal.sql.engine.InternalSqlRow;
+import org.apache.ignite.internal.sql.engine.InternalSqlRowSingleLong;
+import org.apache.ignite.internal.sql.engine.QueryPrefetchCallback;
+import org.apache.ignite.internal.sql.engine.SqlQueryType;
+import org.apache.ignite.internal.sql.engine.exec.ExecutablePlan;
+import org.apache.ignite.internal.sql.engine.exec.ExecutableTableRegistry;
+import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
+import org.apache.ignite.internal.sql.engine.exec.UpdatableTable;
+import org.apache.ignite.internal.sql.engine.rel.IgniteKeyValueModify;
+import org.apache.ignite.internal.sql.engine.rel.IgniteRel;
+import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
+import org.apache.ignite.internal.sql.engine.util.Cloner;
+import org.apache.ignite.internal.sql.engine.util.Commons;
+import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.internal.util.AsyncCursor;
+import org.apache.ignite.internal.util.AsyncWrapper;
+import org.apache.ignite.sql.ResultSetMetadata;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Plan representing simple modify operation that can be executed by Key-Value API.
+ */
+public class KeyValueModifyPlan implements ExplainablePlan, ExecutablePlan {
+ private final PlanId id;
+ private final int schemaVersion;
+ private final IgniteKeyValueModify modifyNode;
+ private final ResultSetMetadata meta;
+ private final ParameterMetadata parameterMetadata;
+
+ KeyValueModifyPlan(
+ PlanId id,
+ int schemaVersion,
+ IgniteKeyValueModify modifyNode,
+ ResultSetMetadata meta,
+ ParameterMetadata parameterMetadata
+ ) {
+ this.id = id;
+ this.schemaVersion = schemaVersion;
+ this.modifyNode = modifyNode;
+ this.meta = meta;
+ this.parameterMetadata = parameterMetadata;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public PlanId id() {
+ return id;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public SqlQueryType type() {
+ return SqlQueryType.DML;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public ResultSetMetadata metadata() {
+ return meta;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public ParameterMetadata parameterMetadata() {
+ return parameterMetadata;
+ }
+
+ /** Returns a table in question. */
+ private IgniteTable table() {
+ IgniteTable table = modifyNode.getTable().unwrap(IgniteTable.class);
+
+ assert table != null : modifyNode.getTable();
+
+ return table;
+ }
+
+ @Override
+ public String explain() {
+ IgniteRel clonedRoot = Cloner.clone(modifyNode, Commons.cluster());
+
+ return RelOptUtil.toString(clonedRoot, SqlExplainLevel.ALL_ATTRIBUTES);
+ }
+
+ public IgniteKeyValueModify modifyNode() {
+ return modifyNode;
+ }
+
+ @Override
+ public <RowT> AsyncCursor<InternalSqlRow> execute(
+ ExecutionContext<RowT> ctx,
+ InternalTransaction tx,
+ ExecutableTableRegistry tableRegistry,
+ @Nullable QueryPrefetchCallback firstPageReadyCallback
+ ) {
+ IgniteTable sqlTable = table();
+
+ CompletableFuture<Iterator<InternalSqlRow>> result = tableRegistry.getTable(schemaVersion, sqlTable.id())
+ .thenCompose(execTable -> {
+ List<RexNode> expressions = modifyNode.expressions();
+
+ Supplier<RowT> rowSupplier = ctx.expressionFactory()
+ .rowSource(expressions);
+
+ UpdatableTable updatableTable = execTable.updatableTable();
+
+ return updatableTable.insert(
+ tx, ctx, rowSupplier.get()
+ ).thenApply(none -> List.<InternalSqlRow>of(new InternalSqlRowSingleLong(1L)).iterator());
+ });
+
+ if (firstPageReadyCallback != null) {
+ Executor executor = task -> ctx.execute(task::run, firstPageReadyCallback::onPrefetchComplete);
+
+ result.whenCompleteAsync((res, err) -> firstPageReadyCallback.onPrefetchComplete(err), executor);
+ }
+
+ return new AsyncWrapper<>(result, Runnable::run);
+ }
+}
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerHelper.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerHelper.java
index 1fb987d335..728e4b12f3 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerHelper.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerHelper.java
@@ -41,10 +41,8 @@ import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.sql.engine.hint.Hints;
import org.apache.ignite.internal.sql.engine.rel.IgniteConvention;
-import org.apache.ignite.internal.sql.engine.rel.IgnitePkLookup;
import org.apache.ignite.internal.sql.engine.rel.IgniteProject;
import org.apache.ignite.internal.sql.engine.rel.IgniteRel;
-import org.apache.ignite.internal.sql.engine.rel.logical.IgniteLogicalTableScan;
import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
/**
@@ -118,13 +116,15 @@ public final class PlannerHelper {
rel = planner.transform(PlannerPhase.HEP_PROJECT_PUSH_DOWN, rel.getTraitSet(), rel);
- // if after all operators being pushed down an entire tree was collapsed to a single node,
- // then, probably, we may replaced it with optimized lookup by a primary key
- if (rel instanceof IgniteLogicalTableScan) {
- IgniteRel igniteRel = IgnitePkLookup.convert((IgniteLogicalTableScan) rel);
+ {
+ // the sole purpose of this code block is to limit scope of `simpleOperation` variable.
+ // The result of `HEP_TO_SIMPLE_KEY_VALUE_OPERATION` phase MUST NOT be passed to next stage,
+ // thus if result meets our expectation, then return the result, otherwise discard it and
+ // proceed with regular flow
+ RelNode simpleOperation = planner.transform(PlannerPhase.HEP_TO_SIMPLE_KEY_VALUE_OPERATION, rel.getTraitSet(), rel);
- if (igniteRel != null) {
- return igniteRel;
+ if (simpleOperation instanceof IgniteRel) {
+ return (IgniteRel) simpleOperation;
}
}
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerPhase.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerPhase.java
index 862d05af4c..120a12014b 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerPhase.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerPhase.java
@@ -59,6 +59,8 @@ import org.apache.ignite.internal.sql.engine.rule.SortAggregateConverterRule;
import org.apache.ignite.internal.sql.engine.rule.SortConverterRule;
import org.apache.ignite.internal.sql.engine.rule.TableFunctionScanConverterRule;
import org.apache.ignite.internal.sql.engine.rule.TableModifyConverterRule;
+import org.apache.ignite.internal.sql.engine.rule.TableModifyToKeyValuePutRule;
+import org.apache.ignite.internal.sql.engine.rule.TableScanToKeyValueGetRule;
import org.apache.ignite.internal.sql.engine.rule.UnionConverterRule;
import org.apache.ignite.internal.sql.engine.rule.ValuesConverterRule;
import org.apache.ignite.internal.sql.engine.rule.logical.ExposeIndexRule;
@@ -83,6 +85,19 @@ public enum PlannerPhase {
}
},
+ HEP_TO_SIMPLE_KEY_VALUE_OPERATION(
+ "Heuristic phase to convert relational tree to simple Key-Value operation",
+ TableScanToKeyValueGetRule.INSTANCE,
+ TableModifyToKeyValuePutRule.PROJECT,
+ TableModifyToKeyValuePutRule.VALUES
+ ) {
+ /** {@inheritDoc} */
+ @Override
+ public Program getProgram(PlanningContext ctx) {
+ return hep(getRules(ctx));
+ }
+ },
+
HEP_FILTER_PUSH_DOWN(
"Heuristic phase to push down filters",
FilterScanMergeRule.TABLE_SCAN_SKIP_CORRELATED,
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java
index 0909ad6ff0..b0374357e5 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java
@@ -52,7 +52,8 @@ import org.apache.ignite.internal.sql.configuration.distributed.SqlDistributedCo
import org.apache.ignite.internal.sql.configuration.local.SqlLocalConfiguration;
import org.apache.ignite.internal.sql.engine.SqlQueryType;
import org.apache.ignite.internal.sql.engine.prepare.ddl.DdlSqlToCommandConverter;
-import org.apache.ignite.internal.sql.engine.rel.IgnitePkLookup;
+import org.apache.ignite.internal.sql.engine.rel.IgniteKeyValueGet;
+import org.apache.ignite.internal.sql.engine.rel.IgniteKeyValueModify;
import org.apache.ignite.internal.sql.engine.rel.IgniteRel;
import org.apache.ignite.internal.sql.engine.sql.ParsedResult;
import org.apache.ignite.internal.sql.engine.util.BaseQueryContext;
@@ -340,8 +341,12 @@ public class PrepareServiceImpl implements PrepareService {
ResultSetMetadata resultSetMetadata = resultSetMetadata(validated.dataType(), validated.origins(), validated.aliases());
- if (clonedTree instanceof IgnitePkLookup) {
- return new KeyValueGetPlan(nextPlanId(), (IgnitePkLookup) clonedTree, resultSetMetadata, parameterMetadata);
+ if (clonedTree instanceof IgniteKeyValueGet) {
+ int schemaVersion = ctx.unwrap(BaseQueryContext.class).schemaVersion();
+
+ return new KeyValueGetPlan(
+ nextPlanId(), schemaVersion, (IgniteKeyValueGet) clonedTree, resultSetMetadata, parameterMetadata
+ );
}
return new MultiStepPlan(nextPlanId(), SqlQueryType.QUERY, clonedTree, resultSetMetadata, parameterMetadata);
@@ -399,6 +404,14 @@ public class PrepareServiceImpl implements PrepareService {
// before storing tree in plan cache
IgniteRel clonedTree = Cloner.clone(igniteRel, Commons.emptyCluster());
+ if (clonedTree instanceof IgniteKeyValueModify) {
+ int schemaVersion = ctx.unwrap(BaseQueryContext.class).schemaVersion();
+
+ return new KeyValueModifyPlan(
+ nextPlanId(), schemaVersion, (IgniteKeyValueModify) clonedTree, DML_METADATA, parameterMetadata
+ );
+ }
+
return new MultiStepPlan(nextPlanId(), SqlQueryType.DML, clonedTree, DML_METADATA, parameterMetadata);
}, planningPool));
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteKeyValueGet.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteKeyValueGet.java
new file mode 100644
index 0000000000..0f300fcd8a
--- /dev/null
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteKeyValueGet.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.rel;
+
+import java.util.List;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelInput;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rel.hint.RelHint;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.ignite.internal.sql.engine.exec.TxAttributes;
+import org.apache.ignite.internal.sql.engine.exec.mapping.MappingService;
+import org.apache.ignite.internal.tx.InternalTransaction;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Relational operator that represents lookup by a primary key.
+ *
+ * <p>Note: at the moment, KV api requires an actual transaction which is object
+ * of type {@link InternalTransaction} while distributed execution has access to
+ * only certain {@link TxAttributes attributes} of a transaction. Given that node is
+ * not supposed to be a part of distributed query plan, the following parts were
+ * deliberately omitted:<ul>
+ * <li>this class doesn't implement {@link SourceAwareIgniteRel}, making it impossible
+ * to map properly by {@link MappingService}</li>
+ * <li>de-serialisation constructor is omitted (see {@link ProjectableFilterableTableScan#ProjectableFilterableTableScan(RelInput)}
+ * as example)</li>
+ * </ul>
+ */
+public class IgniteKeyValueGet extends ProjectableFilterableTableScan implements IgniteRel {
+ private final List<RexNode> keyExpressions;
+
+ /**
+ * Constructor.
+ *
+ * @param cluster A cluster this relation belongs to.
+ * @param traits A set of traits this node satisfies.
+ * @param table A source table.
+ * @param hints List of hints related to a relational node.
+ * @param keyExpressions List of expressions representing primary key.
+ * @param proj Optional projection to transform the row.
+ * @param cond Optional condition to do post-filtration.
+ * @param requiredColumns Optional set required fields to do trimming unused columns.
+ */
+ public IgniteKeyValueGet(
+ RelOptCluster cluster,
+ RelTraitSet traits,
+ RelOptTable table,
+ List<RelHint> hints,
+ List<RexNode> keyExpressions,
+ @Nullable List<RexNode> proj,
+ @Nullable RexNode cond,
+ @Nullable ImmutableBitSet requiredColumns
+ ) {
+ super(cluster, traits, hints, table, proj, cond, requiredColumns);
+
+ this.keyExpressions = keyExpressions;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public <T> T accept(IgniteRelVisitor<T> visitor) {
+ return visitor.visit(this);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> inputs) {
+ return new IgniteKeyValueGet(cluster, getTraitSet(), getTable(), getHints(), keyExpressions, projects, condition, requiredColumns);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public IgniteKeyValueGet withHints(List<RelHint> hintList) {
+ return new IgniteKeyValueGet(
+ getCluster(), getTraitSet(), getTable(), hintList, keyExpressions, projects, condition, requiredColumns
+ );
+ }
+
+ @Override
+ protected RelWriter explainTerms0(RelWriter pw) {
+ return super.explainTerms0(pw)
+ .item("key", keyExpressions);
+ }
+
+ /**
+ * Returns a list of expressions in the order of a primary key columns of related table
+ * to use as lookup key.
+ *
+ * @return List of expressions representing lookup key.
+ */
+ public List<RexNode> keyExpressions() {
+ return keyExpressions;
+ }
+}
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteKeyValueModify.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteKeyValueModify.java
new file mode 100644
index 0000000000..57d991e15a
--- /dev/null
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteKeyValueModify.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.rel;
+
+import java.util.List;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.AbstractRelNode;
+import org.apache.calcite.rel.RelInput;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.ignite.internal.sql.engine.exec.TxAttributes;
+import org.apache.ignite.internal.sql.engine.exec.mapping.MappingService;
+import org.apache.ignite.internal.tx.InternalTransaction;
+
+/**
+ * Relational operator that represents simple modification that can be transacted
+ * to Key-Value operation.
+ *
+ * <p>Note: at the moment, KV api requires an actual transaction which is object
+ * of type {@link InternalTransaction} while distributed execution has access to
+ * only certain {@link TxAttributes attributes} of a transaction. Given that node is
+ * not supposed to be a part of distributed query plan, the following parts were
+ * deliberately omitted:<ul>
+ * <li>this class doesn't implement {@link SourceAwareIgniteRel}, making it impossible
+ * to map properly by {@link MappingService}</li>
+ * <li>de-serialisation constructor is omitted (see {@link IgniteTableModify#IgniteTableModify(RelInput)}
+ * as example)</li>
+ * </ul>
+ */
+public class IgniteKeyValueModify extends AbstractRelNode implements IgniteRel {
+ /** Enumeration of supported modification operations. */
+ public enum Operation {
+ PUT
+ }
+
+ private final RelOptTable table;
+ private final Operation operation;
+ private final List<RexNode> expressions;
+
+ /**
+ * Constructor.
+ *
+ * @param cluster A cluster this relation belongs to.
+ * @param traits A set of traits this node satisfies.
+ * @param table A target table.
+ * @param operation Type of the modification operation.
+ * @param expressions List of expressions representing either full row or only a key
+ * depending on particular operation.
+ */
+ public IgniteKeyValueModify(
+ RelOptCluster cluster,
+ RelTraitSet traits,
+ RelOptTable table,
+ Operation operation,
+ List<RexNode> expressions
+ ) {
+ super(cluster, traits);
+
+ this.table = table;
+ this.operation = operation;
+ this.expressions = expressions;
+ }
+
+ @Override public RelDataType deriveRowType() {
+ return RelOptUtil.createDmlRowType(
+ SqlKind.INSERT, getCluster().getTypeFactory());
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public RelOptTable getTable() {
+ return table;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public <T> T accept(IgniteRelVisitor<T> visitor) {
+ return visitor.visit(this);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> inputs) {
+ assert inputs.isEmpty() : inputs;
+
+ return new IgniteKeyValueModify(cluster, getTraitSet(), table, operation, expressions);
+ }
+
+ @Override public RelWriter explainTerms(RelWriter pw) {
+ return super.explainTerms(pw)
+ .item("table", table.getQualifiedName())
+ .item("operation", operation)
+ .item("expressions", expressions);
+ }
+
+ /**
+ * Returns a list of expressions representing either full row or only a key
+ * depending on particular operation.
+ *
+ * @return List of expressions.
+ */
+ public List<RexNode> expressions() {
+ return expressions;
+ }
+}
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteRelVisitor.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteRelVisitor.java
index 3bf9f9fafe..ddaf73d6fb 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteRelVisitor.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteRelVisitor.java
@@ -169,6 +169,16 @@ public interface IgniteRelVisitor<T> {
*/
T visit(IgniteTableFunctionScan rel);
+ /**
+ * See {@link IgniteRelVisitor#visit(IgniteRel)}.
+ */
+ T visit(IgniteKeyValueGet rel);
+
+ /**
+ * See {@link IgniteRelVisitor#visit(IgniteRel)}.
+ */
+ T visit(IgniteKeyValueModify rel);
+
/**
* Visits a relational node and calculates a result on the basis of node meta information.
*
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/TableModifyToKeyValuePutRule.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/TableModifyToKeyValuePutRule.java
new file mode 100644
index 0000000000..b19e25eb97
--- /dev/null
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/TableModifyToKeyValuePutRule.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.rule;
+
+import java.util.List;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelRule;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.TableModify;
+import org.apache.calcite.rel.core.Values;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexShuttle;
+import org.apache.calcite.rex.RexVisitor;
+import org.apache.ignite.internal.sql.engine.rel.IgniteConvention;
+import org.apache.ignite.internal.sql.engine.rel.IgniteKeyValueModify;
+import org.apache.ignite.internal.sql.engine.rel.IgniteKeyValueModify.Operation;
+import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
+import org.immutables.value.Value;
+
+/**
+ * Rule that converts {@link TableModify} representing INSERT operation with a determined source
+ * to a Key-Value PUT operation.
+ *
+ * <p>Note: at the moment, this rule support only single row insert.
+ */
+@Value.Enclosing
+public class TableModifyToKeyValuePutRule extends RelRule<TableModifyToKeyValuePutRule.Config> {
+ public static final RelOptRule VALUES = Config.VALUES.toRule();
+ public static final RelOptRule PROJECT = Config.PROJECT.toRule();
+
+ private TableModifyToKeyValuePutRule(Config cfg) {
+ super(cfg);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ List<RelNode> operands = call.getRelList();
+
+ TableModify modify = cast(operands.get(0));
+
+ assert modify.getOperation() == TableModify.Operation.INSERT : modify.getOperation();
+
+ List<RexNode> expressions;
+ if (operands.size() == 2) {
+ Values values = cast(operands.get(1));
+
+ assert values.getTuples().size() == 1 : "Expected exactly one tuple, but was " + values.getTuples().size();
+
+ expressions = List.copyOf(values.getTuples().get(0));
+ } else {
+ assert operands.size() == 3 : operands;
+
+ Values values = cast(operands.get(2));
+
+ assert values.getTuples().size() == 1 : "Expected exactly one tuple, but was " + values.getTuples().size();
+
+ List<RexNode> inputExpressions = List.copyOf(values.getTuples().get(0));
+
+ RexVisitor<RexNode> inputInliner = new RexShuttle() {
+ @Override
+ public RexNode visitInputRef(RexInputRef inputRef) {
+ return inputExpressions.get(inputRef.getIndex());
+ }
+ };
+
+ Project project = cast(operands.get(1));
+
+ expressions = inputInliner.visitList(project.getProjects());
+ }
+
+ call.transformTo(
+ new IgniteKeyValueModify(
+ modify.getCluster(),
+ modify.getTraitSet()
+ .replace(IgniteConvention.INSTANCE)
+ .replace(IgniteDistributions.single()),
+ modify.getTable(),
+ Operation.PUT,
+ expressions
+ )
+ );
+ }
+
+ private static <T extends RelNode> T cast(RelNode node) {
+ return (T) node;
+ }
+
+ /**
+ * Configuration.
+ */
+ @SuppressWarnings({"ClassNameSameAsAncestorName", "InnerClassFieldHidesOuterClassField"})
+ @Value.Immutable
+ public interface Config extends RelRule.Config {
+ Config VALUES = ImmutableTableModifyToKeyValuePutRule.Config.of()
+ .withDescription("TableModifyToKeyValuePutRule:VALUES")
+ .withOperandSupplier(o0 ->
+ o0.operand(TableModify.class)
+ .predicate(TableModify::isInsert)
+ .oneInput(o1 ->
+ o1.operand(Values.class)
+ .predicate(values -> values.getTuples().size() == 1)
+ .noInputs()))
+ .as(Config.class);
+
+ Config PROJECT = ImmutableTableModifyToKeyValuePutRule.Config.of()
+ .withDescription("TableModifyToKeyValuePutRule:PROJECT")
+ .withOperandSupplier(o0 ->
+ o0.operand(TableModify.class)
+ .predicate(TableModify::isInsert)
+ .oneInput(o1 ->
+ o1.operand(Project.class).oneInput(o2 ->
+ o2.operand(Values.class)
+ .predicate(values -> values.getTuples().size() == 1)
+ .noInputs())))
+ .as(Config.class);
+
+ /** {@inheritDoc} */
+ @Override
+ default TableModifyToKeyValuePutRule toRule() {
+ return new TableModifyToKeyValuePutRule(this);
+ }
+ }
+}
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgnitePkLookup.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/TableScanToKeyValueGetRule.java
similarity index 52%
rename from modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgnitePkLookup.java
rename to modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/TableScanToKeyValueGetRule.java
index 4e25a0f08f..b555ca997a 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgnitePkLookup.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/TableScanToKeyValueGetRule.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.sql.engine.rel;
+package org.apache.ignite.internal.sql.engine.rule;
import static org.apache.ignite.internal.sql.engine.util.RexUtils.builder;
import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
@@ -25,118 +25,58 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.RelOptUtil;
-import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.plan.RelRule;
import org.apache.calcite.rel.RelCollation;
-import org.apache.calcite.rel.RelInput;
-import org.apache.calcite.rel.RelWriter;
-import org.apache.calcite.rel.hint.RelHint;
+import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexUtil;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.calcite.util.mapping.Mappings;
-import org.apache.ignite.internal.sql.engine.exec.TxAttributes;
-import org.apache.ignite.internal.sql.engine.exec.mapping.MappingService;
import org.apache.ignite.internal.sql.engine.prepare.bounds.ExactBounds;
import org.apache.ignite.internal.sql.engine.prepare.bounds.SearchBounds;
+import org.apache.ignite.internal.sql.engine.rel.IgniteConvention;
+import org.apache.ignite.internal.sql.engine.rel.IgniteKeyValueGet;
import org.apache.ignite.internal.sql.engine.rel.logical.IgniteLogicalTableScan;
import org.apache.ignite.internal.sql.engine.schema.IgniteIndex;
import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
import org.apache.ignite.internal.sql.engine.util.Commons;
import org.apache.ignite.internal.sql.engine.util.RexUtils;
-import org.apache.ignite.internal.tx.InternalTransaction;
+import org.immutables.value.Value;
import org.jetbrains.annotations.Nullable;
/**
- * Relational operator that represents lookup by a primary key.
+ * Tries to convert given scan node to physical node representing primary key lookup.
*
- * <p>Note: at the moment, KV api requires an actual transaction which is object
- * of type {@link InternalTransaction} while distributed execution has access to
- * only certain {@link TxAttributes attributes} of a transaction. Given that node is
- * not supposed to be a part of distributed query plan, the following parts were
- * deliberately omitted:<ul>
- * <li>this class doesn't implement {@link SourceAwareIgniteRel}, making it impossible
- * to map properly by {@link MappingService}</li>
- * <li>{@link IgniteRelVisitor} was not extended with this class</li>
- * <li>de-serialisation constructor is omitted (see {@link ProjectableFilterableTableScan#ProjectableFilterableTableScan(RelInput)}
- * as example)</li>
- * </ul>
+ * <p>Conversion will be successful if: <ol>
+ * <li>there is condition</li>
+ * <li>table has primary key index</li>
+ * <li>condition covers all columns of primary key index</li>
+ * <li>only single search key is derived from condition</li>
+ * </ol>
*/
-public class IgnitePkLookup extends ProjectableFilterableTableScan implements IgniteRel {
- private final List<RexNode> keyExpressions;
-
- private IgnitePkLookup(
- RelOptCluster cluster,
- RelTraitSet traits,
- RelOptTable tbl,
- List<RelHint> hints,
- List<RexNode> keyExpressions,
- @Nullable List<RexNode> proj,
- @Nullable RexNode cond,
- @Nullable ImmutableBitSet requiredColumns
- ) {
- super(cluster, traits, hints, tbl, proj, cond, requiredColumns);
-
- this.keyExpressions = keyExpressions;
- }
+@Value.Enclosing
+public class TableScanToKeyValueGetRule extends RelRule<TableScanToKeyValueGetRule.Config> {
+ public static final RelOptRule INSTANCE = Config.INSTANCE.toRule();
- /** {@inheritDoc} */
- @Override
- public <T> T accept(IgniteRelVisitor<T> visitor) {
- return visitor.visit(this);
+ private TableScanToKeyValueGetRule(Config cfg) {
+ super(cfg);
}
/** {@inheritDoc} */
@Override
- public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> inputs) {
- return new IgnitePkLookup(cluster, getTraitSet(), getTable(), getHints(), keyExpressions, projects, condition, requiredColumns);
- }
-
- /** {@inheritDoc} */
- @Override
- public IgnitePkLookup withHints(List<RelHint> hintList) {
- return new IgnitePkLookup(getCluster(), getTraitSet(), getTable(), hintList, keyExpressions, projects, condition, requiredColumns);
- }
-
- @Override
- protected RelWriter explainTerms0(RelWriter pw) {
- return super.explainTerms0(pw)
- .item("key", keyExpressions);
- }
+ public void onMatch(RelOptRuleCall call) {
+ IgniteLogicalTableScan scan = cast(call.rel(0));
- /**
- * Returns a list of expressions in the order of a primary key columns of related table
- * to use as lookup key.
- *
- * @return List of expressions representing lookup key.
- */
- public List<RexNode> keyExpressions() {
- return keyExpressions;
- }
-
- /**
- * Tries to convert given scan node to physical node representing primary key lookup.
- *
- * <p>Conversion will be successful if: <ol>
- * <li>there is condition</li>
- * <li>table has primary key index</li>
- * <li>condition covers all columns of primary key index</li>
- * <li>only single search key is derived from condition</li>
- * </ol>
- *
- * @param scan A scan node to analyze.
- * @return A physical node representing optimized look up by primary key or {@code null}
- * if we were unable to derive all necessary information.
- */
- public static @Nullable IgnitePkLookup convert(IgniteLogicalTableScan scan) {
List<SearchBounds> bounds = deriveSearchBounds(scan);
if (nullOrEmpty(bounds)) {
- return null;
+ return;
}
List<RexNode> expressions = new ArrayList<>(bounds.size());
@@ -150,7 +90,7 @@ public class IgnitePkLookup extends ProjectableFilterableTableScan implements Ig
// iteration over a number of search keys are not supported yet,
// thus we need to make sure only single key was derived
if (!(bound instanceof ExactBounds)) {
- return null;
+ return;
}
condition.remove(bound.condition());
@@ -158,7 +98,7 @@ public class IgnitePkLookup extends ProjectableFilterableTableScan implements Ig
}
if (nullOrEmpty(expressions)) {
- return null;
+ return;
}
RexNode resultingCondition = RexUtil.composeConjunction(rexBuilder, condition);
@@ -166,17 +106,19 @@ public class IgnitePkLookup extends ProjectableFilterableTableScan implements Ig
resultingCondition = null;
}
- return new IgnitePkLookup(
- cluster,
- scan.getTraitSet()
- .replace(IgniteConvention.INSTANCE)
- .replace(IgniteDistributions.single()),
- scan.getTable(),
- scan.getHints(),
- expressions,
- scan.projects(),
- resultingCondition,
- scan.requiredColumns()
+ call.transformTo(
+ new IgniteKeyValueGet(
+ cluster,
+ scan.getTraitSet()
+ .replace(IgniteConvention.INSTANCE)
+ .replace(IgniteDistributions.single()),
+ scan.getTable(),
+ scan.getHints(),
+ expressions,
+ scan.projects(),
+ resultingCondition,
+ scan.requiredColumns()
+ )
);
}
@@ -220,4 +162,27 @@ public class IgnitePkLookup extends ProjectableFilterableTableScan implements Ig
requiredColumns
);
}
+
+ private static <T extends RelNode> T cast(RelNode node) {
+ return (T) node;
+ }
+
+ /** Configuration. */
+ @SuppressWarnings({"ClassNameSameAsAncestorName", "InnerClassFieldHidesOuterClassField"})
+ @Value.Immutable
+ public interface Config extends RelRule.Config {
+ Config INSTANCE = ImmutableTableScanToKeyValueGetRule.Config.of()
+ .withDescription("TableScanToKeyValueGetRule")
+ .withOperandSupplier(o0 ->
+ o0.operand(IgniteLogicalTableScan.class)
+ .predicate(scan -> scan.condition() != null)
+ .noInputs())
+ .as(Config.class);
+
+ /** {@inheritDoc} */
+ @Override
+ default TableScanToKeyValueGetRule toRule() {
+ return new TableScanToKeyValueGetRule(this);
+ }
+ }
}
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
index 514a17b3f6..06bfda53aa 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
@@ -588,7 +588,7 @@ public class ExecutionServiceImplTest extends BaseIgniteAbstractTest {
public void exceptionArrivingBeforeRootFragmentExecutesDoesNotLeaveQueryHanging() {
ExecutionService execService = executionServices.get(0);
BaseQueryContext ctx = createContext();
- QueryPlan plan = prepare("INSERT INTO test_tbl(ID, VAL) VALUES (1, 1)", ctx);
+ QueryPlan plan = prepare("SELECT * FROM test_tbl", ctx);
CountDownLatch queryFailedLatch = new CountDownLatch(1);
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java
index 69b1159a0d..44d6e0be4d 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.sql.engine.framework;
import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_SCHEMA_NAME;
import static org.apache.ignite.internal.sql.engine.util.Commons.FRAMEWORK_CONFIG;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
-import static org.junit.jupiter.api.Assertions.assertEquals;
import java.util.ArrayList;
import java.util.Collections;
@@ -198,8 +197,6 @@ public class TestNode implements LifecycleAware {
ParsedResult parsedResult = parserService.parse(query);
BaseQueryContext ctx = createContext();
- assertEquals(ctx.parameters().length, parsedResult.dynamicParamsCount(), "Invalid number of dynamic parameters");
-
return await(prepareService.prepareAsync(parsedResult, ctx));
}
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractPlannerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractPlannerTest.java
index 6f18f64f52..62e64d3d98 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractPlannerTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractPlannerTest.java
@@ -91,6 +91,7 @@ import org.apache.ignite.internal.sql.engine.rel.IgniteIndexScan;
import org.apache.ignite.internal.sql.engine.rel.IgniteRel;
import org.apache.ignite.internal.sql.engine.rel.IgniteSystemViewScan;
import org.apache.ignite.internal.sql.engine.rel.IgniteTableScan;
+import org.apache.ignite.internal.sql.engine.rule.TableModifyToKeyValuePutRule;
import org.apache.ignite.internal.sql.engine.schema.ColumnDescriptor;
import org.apache.ignite.internal.sql.engine.schema.DefaultValueStrategy;
import org.apache.ignite.internal.sql.engine.schema.IgniteIndex.Collation;
@@ -114,6 +115,11 @@ import org.jetbrains.annotations.Nullable;
* TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
*/
public abstract class AbstractPlannerTest extends IgniteAbstractTest {
+ protected static final String[] DISABLE_KEY_VALUE_MODIFY_RULES = {
+ TableModifyToKeyValuePutRule.VALUES.toString(),
+ TableModifyToKeyValuePutRule.PROJECT.toString(),
+ };
+
protected static final IgniteTypeFactory TYPE_FACTORY = Commons.typeFactory();
protected static final int DEFAULT_TBL_SIZE = 500_000;
@@ -981,10 +987,9 @@ public abstract class AbstractPlannerTest extends IgniteAbstractTest {
* An implementation of {@link PlanChecker} with initialized {@link SqlPrepare} to test plans.
*/
public class PlanChecker extends StatementChecker {
-
PlanChecker() {
- super((schema, sql, params) -> {
- PlanningContext planningContext = plannerCtx(sql, List.of(schema), HintStrategyTable.EMPTY, params);
+ super((schema, sql, params, rulesToDisable) -> {
+ PlanningContext planningContext = plannerCtx(sql, List.of(schema), HintStrategyTable.EMPTY, params, rulesToDisable);
IgnitePlanner planner = planningContext.planner();
IgniteRel igniteRel = physicalPlan(planner, sql);
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/DmlPlannerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/DmlPlannerTest.java
index 47b1d55517..ad25745a61 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/DmlPlannerTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/DmlPlannerTest.java
@@ -37,10 +37,9 @@ import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
/**
- * Tests to verify DML plans.
+ * Tests to verify multi-step versions of DML plans.
*/
public class DmlPlannerTest extends AbstractPlannerTest {
-
/**
* Test for INSERT .. VALUES when table has a single distribution.
*/
@@ -51,7 +50,9 @@ public class DmlPlannerTest extends AbstractPlannerTest {
// There should be no exchanges and other operations.
assertPlan("INSERT INTO TEST1 (C1, C2) VALUES(1, 2)", schema,
- isInstanceOf(IgniteTableModify.class).and(input(isInstanceOf(IgniteValues.class))));
+ isInstanceOf(IgniteTableModify.class).and(input(isInstanceOf(IgniteValues.class))),
+ DISABLE_KEY_VALUE_MODIFY_RULES
+ );
}
/**
@@ -68,7 +69,8 @@ public class DmlPlannerTest extends AbstractPlannerTest {
nodeOrAnyChild(isInstanceOf(IgniteExchange.class)
.and(e -> e.distribution().equals(IgniteDistributions.single())))
.and(nodeOrAnyChild(isInstanceOf(IgniteTableModify.class))
- .and(hasChildThat(isInstanceOf(IgniteExchange.class).and(e -> distribution.equals(e.distribution())))))
+ .and(hasChildThat(isInstanceOf(IgniteExchange.class).and(e -> distribution.equals(e.distribution()))))),
+ DISABLE_KEY_VALUE_MODIFY_RULES
);
}
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/DynamicParametersTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/DynamicParametersTest.java
index bda29a8abb..917039ca19 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/DynamicParametersTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/DynamicParametersTest.java
@@ -483,24 +483,28 @@ public class DynamicParametersTest extends AbstractPlannerTest {
public Stream<DynamicTest> testInsertDynamicParams() {
return Stream.of(
checkStatement()
+ .disableRules(DISABLE_KEY_VALUE_MODIFY_RULES)
.table("t1", "c1", NativeTypes.INT32)
.sql("INSERT INTO t1 VALUES (?)", 1)
.parameterTypes(nullable(NativeTypes.INT32))
.project("?0"),
checkStatement()
+ .disableRules(DISABLE_KEY_VALUE_MODIFY_RULES)
.table("t1", "c1", NativeTypes.INT64)
.sql("INSERT INTO t1 VALUES (?)", 1)
.parameterTypes(nullable(NativeTypes.INT32))
.project("CAST(?0):BIGINT"),
checkStatement()
+ .disableRules(DISABLE_KEY_VALUE_MODIFY_RULES)
.table("t1", "c1", NativeTypes.INT64)
.sql("INSERT INTO t1 VALUES (?)", Unspecified.UNKNOWN)
.parameterTypes(nullable(NativeTypes.INT64))
.project("?0"),
checkStatement()
+ .disableRules(DISABLE_KEY_VALUE_MODIFY_RULES)
.table("t1", "c1", NativeTypes.INT64)
.sql("INSERT INTO t1 VALUES (?)", new Object[]{null})
.parameterTypes(new NativeType[]{null})
@@ -513,6 +517,7 @@ public class DynamicParametersTest extends AbstractPlannerTest {
.project("?0"),
checkStatement()
+ .disableRules(DISABLE_KEY_VALUE_MODIFY_RULES)
.table("t1", "c1", NativeTypes.INT32)
.sql("INSERT INTO t1 VALUES (?), (2), (?)", Unspecified.UNKNOWN, Unspecified.UNKNOWN)
.parameterTypes(nullable(NativeTypes.INT32), nullable(NativeTypes.INT32))
@@ -521,6 +526,7 @@ public class DynamicParametersTest extends AbstractPlannerTest {
// compatible type
checkStatement()
+ .disableRules(DISABLE_KEY_VALUE_MODIFY_RULES)
.table("t1", "c1", NativeTypes.INT64)
.sql("INSERT INTO t1 VALUES (?)", 1)
.parameterTypes(nullable(NativeTypes.INT32))
@@ -530,6 +536,7 @@ public class DynamicParametersTest extends AbstractPlannerTest {
// Incompatible types in dynamic params
checkStatement()
+ .disableRules(DISABLE_KEY_VALUE_MODIFY_RULES)
.table("t1", "c1", NativeTypes.INT32)
.sql("INSERT INTO t1 VALUES (?)", "10")
.fails("Values passed to VALUES operator must have compatible types"),
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ImplicitCastsTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ImplicitCastsTest.java
index d876e3a46a..061ae5aec2 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ImplicitCastsTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ImplicitCastsTest.java
@@ -63,7 +63,6 @@ import org.junit.jupiter.params.provider.MethodSource;
* Type coercion related tests that ensure that the necessary casts are placed where it is necessary.
*/
public class ImplicitCastsTest extends AbstractPlannerTest {
-
private static TestTable tableWithColumn(String tableName, String columnName, RelDataType columnType) {
return TestBuilders.table()
.name(tableName)
@@ -318,6 +317,7 @@ public class ImplicitCastsTest extends AbstractPlannerTest {
// DEFAULT is not coerced
checkStatement()
+ .disableRules(DISABLE_KEY_VALUE_MODIFY_RULES)
.table("t1", (table) -> {
return table.name("T1")
.addColumn("INT_COL", NativeTypes.INT32)
@@ -476,6 +476,7 @@ public class ImplicitCastsTest extends AbstractPlannerTest {
return Stream.of(
checkStatement(setup)
+ .disableRules(DISABLE_KEY_VALUE_MODIFY_RULES)
.table("t3", "str_col", NativeTypes.stringOf(36))
.sql("INSERT INTO t3 VALUES('1111'::UUID)")
.project("CAST(CAST(_UTF-8'1111'):UUID NOT NULL):VARCHAR(36) CHARACTER SET \"UTF-8\" NOT NULL"),
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/KeyValueModifyPlannerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/KeyValueModifyPlannerTest.java
new file mode 100644
index 0000000000..08c79c5796
--- /dev/null
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/KeyValueModifyPlannerTest.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.planner;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.not;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.calcite.rex.RexNode;
+import org.apache.ignite.internal.catalog.CatalogCommand;
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.catalog.commands.DropTableCommand;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import org.apache.ignite.internal.sql.engine.framework.TestBuilders;
+import org.apache.ignite.internal.sql.engine.framework.TestCluster;
+import org.apache.ignite.internal.sql.engine.framework.TestNode;
+import org.apache.ignite.internal.sql.engine.prepare.KeyValueModifyPlan;
+import org.apache.ignite.internal.sql.engine.prepare.QueryPlan;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+/**
+ * Test cases to very KV modify optimized plans.
+ */
+public class KeyValueModifyPlannerTest extends AbstractPlannerTest {
+ private static final String NODE_NAME = "N1";
+
+ private static final TestCluster CLUSTER = TestBuilders.cluster()
+ .nodes(NODE_NAME)
+ .build();
+
+ private final TestNode node = CLUSTER.node(NODE_NAME);
+
+ @BeforeAll
+ static void start() {
+ CLUSTER.start();
+ }
+
+ @AfterAll
+ static void stop() throws Exception {
+ CLUSTER.stop();
+ }
+
+ @AfterEach
+ void clearCatalog() {
+ int version = CLUSTER.catalogManager().latestCatalogVersion();
+
+ List<CatalogCommand> commands = new ArrayList<>();
+ for (CatalogTableDescriptor table : CLUSTER.catalogManager().tables(version)) {
+ commands.add(
+ DropTableCommand.builder()
+ .schemaName(CatalogService.DEFAULT_SCHEMA_NAME)
+ .tableName(table.name())
+ .build()
+ );
+ }
+
+ await(CLUSTER.catalogManager().execute(commands));
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = {
+ "INSERT INTO test VALUES (10, 20)",
+ "INSERT INTO test(id, val) VALUES (10, 20)",
+ "INSERT INTO test(val, id) VALUES (20, 10)",
+ })
+ void optimizedInsertUsedForLiterals(String insertStatement) {
+ node.initSchema("CREATE TABLE test (id INT PRIMARY KEY, val INT)");
+
+ {
+ QueryPlan plan = node.prepare(insertStatement);
+
+ assertThat(plan, instanceOf(KeyValueModifyPlan.class));
+ assertExpressions((KeyValueModifyPlan) plan, "10", "20");
+ }
+ }
+
+ @Test
+ void optimizedInsertUsedForDynamicParams() {
+ node.initSchema("CREATE TABLE test (id INT PRIMARY KEY, val INT)");
+
+ {
+ QueryPlan plan = node.prepare("INSERT INTO test VALUES (?, ?)");
+
+ assertThat(plan, instanceOf(KeyValueModifyPlan.class));
+ assertExpressions((KeyValueModifyPlan) plan, "?0", "?1");
+ }
+
+ {
+ QueryPlan plan = node.prepare("INSERT INTO test(id, val) VALUES (?, ?)");
+
+ assertThat(plan, instanceOf(KeyValueModifyPlan.class));
+ assertExpressions((KeyValueModifyPlan) plan, "?0", "?1");
+ }
+
+ {
+ QueryPlan plan = node.prepare("INSERT INTO test(val, id) VALUES (?, ?)");
+
+ assertThat(plan, instanceOf(KeyValueModifyPlan.class));
+ assertExpressions((KeyValueModifyPlan) plan, "?1", "?0");
+ }
+ }
+
+ @Test
+ void optimizedInsertUsedForMixedCase() {
+ node.initSchema("CREATE TABLE test (id INT PRIMARY KEY, int_val INT, str_val VARCHAR(128))");
+
+ QueryPlan plan = node.prepare("INSERT INTO test VALUES (?, 1, CAST(CURRENT_DATE as VARCHAR(128)))");
+
+ assertThat(plan, instanceOf(KeyValueModifyPlan.class));
+ assertExpressions((KeyValueModifyPlan) plan, "?0", "1", "CAST(CURRENT_DATE):VARCHAR(128) CHARACTER SET \"UTF-8\" NOT NULL");
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = {
+ "INSERT INTO test(id) VALUES (1)",
+ "INSERT INTO test(id, int_val) VALUES (1, DEFAULT)"
+ })
+ void optimizedInsertUsedWithDefaults(String insertStatement) {
+ node.initSchema("CREATE TABLE test (id INT PRIMARY KEY, int_val INT DEFAULT 10, str_val VARCHAR(128) DEFAULT 'a')");
+
+ QueryPlan plan = node.prepare(insertStatement);
+
+ assertThat(plan, instanceOf(KeyValueModifyPlan.class));
+ assertExpressions((KeyValueModifyPlan) plan, "1", "10", "_UTF-8'a'");
+ }
+
+ @Test
+ void optimizedInsertNotUsedForMultiInsert() {
+ node.initSchema("CREATE TABLE test (id INT PRIMARY KEY, val INT)");
+
+ QueryPlan plan = node.prepare("INSERT INTO test VALUES (1, 1), (2, 2)");
+
+ assertThat(plan, not(instanceOf(KeyValueModifyPlan.class)));
+ }
+
+ @Test
+ void optimizedInsertNotUsedForInsertFromOtherTable() {
+ node.initSchema("CREATE TABLE source_t (id INT PRIMARY KEY, val INT);"
+ + "CREATE TABLE target_t (id INT PRIMARY KEY, val INT)");
+
+ QueryPlan plan = node.prepare("INSERT INTO target_t SELECT * FROM source_t");
+
+ assertThat(plan, not(instanceOf(KeyValueModifyPlan.class)));
+ }
+
+ private static void assertExpressions(KeyValueModifyPlan plan, String... expectedExpressions) {
+ List<String> keyExpressions = plan.modifyNode().expressions().stream()
+ .map(RexNode::toString)
+ .collect(toList());
+
+ assertThat(
+ keyExpressions,
+ equalTo(List.of(expectedExpressions))
+ );
+ }
+}
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/StatementChecker.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/StatementChecker.java
index e2d2ef252d..25ecc02855 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/StatementChecker.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/StatementChecker.java
@@ -124,6 +124,8 @@ public class StatementChecker {
private boolean dumpPlan;
+ private String[] rulesToDisable = new String[0];
+
private Consumer<StatementChecker> setup = (checker) -> {};
private List<RelDataType> expectedParameterTypes;
@@ -148,8 +150,11 @@ public class StatementChecker {
* @param schema A schema.
* @param sql An SQL statement.
* @param params A list of dynamic parameters.
+ * @param rulesToDisable A list of rules to exclude from optimisation.
*/
- Pair<IgniteRel, IgnitePlanner> prepare(IgniteSchema schema, String sql, List<Object> params) throws Exception;
+ Pair<IgniteRel, IgnitePlanner> prepare(
+ IgniteSchema schema, String sql, List<Object> params, String... rulesToDisable
+ ) throws Exception;
}
/** Sets a function that is going to be called prior to test run. */
@@ -158,6 +163,13 @@ public class StatementChecker {
return this;
}
+ /** Sets rules to exclude from optimisation. */
+ public StatementChecker disableRules(String... rulesToDisable) {
+ this.rulesToDisable = rulesToDisable;
+
+ return this;
+ }
+
/**
* Updates schema to include a table with 1 column.
*/
@@ -419,7 +431,7 @@ public class StatementChecker {
IgnitePlanner planner;
try {
- Pair<IgniteRel, IgnitePlanner> result = sqlPrepare.prepare(schema, sqlStatement, dynamicParams);
+ Pair<IgniteRel, IgnitePlanner> result = sqlPrepare.prepare(schema, sqlStatement, dynamicParams, rulesToDisable);
root = result.getFirst();
planner = result.getSecond();
@@ -478,7 +490,7 @@ public class StatementChecker {
Throwable err = null;
IgniteRel unexpectedPlan = null;
try {
- Pair<IgniteRel, ?> unexpected = sqlPrepare.prepare(schema, sqlStatement, dynamicParams);
+ Pair<IgniteRel, ?> unexpected = sqlPrepare.prepare(schema, sqlStatement, dynamicParams, rulesToDisable);
unexpectedPlan = unexpected.getFirst();
} catch (Throwable t) {
err = t;
diff --git a/modules/sql-engine/src/test/resources/mapping/dml.test b/modules/sql-engine/src/test/resources/mapping/dml.test
index b977e15515..fed5b9ceac 100644
--- a/modules/sql-engine/src/test/resources/mapping/dml.test
+++ b/modules/sql-engine/src/test/resources/mapping/dml.test
@@ -1,5 +1,5 @@
N0
-INSERT INTO t1_n1 VALUES(1, 1, 1)
+INSERT INTO t1_n1 VALUES (1, 1, 1), (2, 2, 2)
---
Fragment#0 root
executionNodes: [N0]