You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by am...@apache.org on 2022/09/28 13:18:39 UTC
[ignite-3] 02/02: WIP. Add test.
This is an automated email from the ASF dual-hosted git repository.
amashenkov pushed a commit to branch ignite-17655
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit 9602cc3b7c8c038ea672ed5b32ab02ca6c767f1b
Author: amashenkov <an...@gmail.com>
AuthorDate: Wed Sep 28 16:18:27 2022 +0300
WIP. Add test.
---
.../sql/engine/exec/rel/IndexScanNode.java | 25 ++-
.../sql/engine/exec/rel/AbstractExecutionTest.java | 2 +
.../exec/rel/IndexScanNodeExecutionTest.java | 176 +++++++++++++++++++++
.../engine/exec/rel/MergeJoinExecutionTest.java | 4 +-
.../exec/rel/NestedLoopJoinExecutionTest.java | 2 -
5 files changed, 189 insertions(+), 20 deletions(-)
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNode.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNode.java
index 31c123b347..8de50a2e23 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNode.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNode.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.sql.engine.exec.rel;
import static org.apache.ignite.internal.util.ArrayUtils.nullOrEmpty;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
import java.util.BitSet;
import java.util.List;
import java.util.Queue;
@@ -33,13 +32,8 @@ import java.util.function.Supplier;
import java.util.stream.IntStream;
import org.apache.calcite.rel.core.TableModify.Operation;
import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.util.ImmutableBitSet;
-import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
-import org.apache.ignite.internal.binarytuple.BinaryTupleCommon;
-import org.apache.ignite.internal.binarytuple.BinaryTuplePrefixBuilder;
import org.apache.ignite.internal.index.SortedIndex;
import org.apache.ignite.internal.schema.BinaryConverter;
-import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryRowEx;
import org.apache.ignite.internal.schema.BinaryTuple;
import org.apache.ignite.internal.schema.BinaryTupleSchema;
@@ -101,12 +95,12 @@ public class IndexScanNode<RowT> extends AbstractNode<RowT> {
/**
* Constructor.
*
- * @param ctx Execution context.
- * @param rowType Output type of the current node.
- * @param schemaTable The table this node should scan.
- * @param parts Partition numbers to scan.
- * @param filters Optional filter to filter out rows.
- * @param rowTransformer Optional projection function.
+ * @param ctx Execution context.
+ * @param rowType Output type of the current node.
+ * @param schemaTable The table this node should scan.
+ * @param parts Partition numbers to scan.
+ * @param filters Optional filter to filter out rows.
+ * @param rowTransformer Optional projection function.
* @param requiredColumns Optional set of column of interest.
*/
public IndexScanNode(
@@ -267,6 +261,7 @@ public class IndexScanNode<RowT> extends AbstractNode<RowT> {
private class SubscriberImpl implements Flow.Subscriber<BinaryTuple> {
private int received = 0; // HB guarded here.
+
/** {@inheritDoc} */
@Override
public void onSubscribe(Subscription subscription) {
@@ -318,17 +313,17 @@ public class IndexScanNode<RowT> extends AbstractNode<RowT> {
//TODO: Change table row layout -> index row layout.
@Contract("null -> null")
private @Nullable BinaryTuple extractIndexColumns(@Nullable Supplier<RowT> lowerCond) {
- if (lowerCond == null)
+ if (lowerCond == null) {
return null;
+ }
TableDescriptor desc = schemaTable.descriptor();
- BinaryTupleSchema binaryTupleSchema = BinaryTupleSchema.create( IntStream.range(0, desc.columnsCount())
+ BinaryTupleSchema binaryTupleSchema = BinaryTupleSchema.create(IntStream.range(0, desc.columnsCount())
.mapToObj(i -> desc.columnDescriptor(i))
.map(colDesc -> new Element(colDesc.physicalType(), colDesc.nullable()))
.toArray(Element[]::new));
-
BinaryRowEx binaryRowEx = schemaTable.toModifyRow(context(), lowerCond.get(), Operation.INSERT, null).getRow();
ByteBuffer tuple = BinaryConverter.forRow(((Row) binaryRowEx).schema()).toTuple(binaryRowEx);
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
index a25b4c3c38..5f17133c0b 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
@@ -55,6 +55,8 @@ import org.junit.jupiter.api.BeforeEach;
* TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
*/
public class AbstractExecutionTest extends IgniteAbstractTest {
+ public static final Object[][] EMPTY = new Object[0][];
+
private Throwable lastE;
private QueryTaskExecutorImpl taskExecutor;
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNodeExecutionTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNodeExecutionTest.java
new file mode 100644
index 0000000000..8d16504976
--- /dev/null
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNodeExecutionTest.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec.rel;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.IsEqual.equalTo;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.concurrent.Flow.Publisher;
+import java.util.concurrent.Flow.Subscription;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
+import org.apache.ignite.internal.index.SortedIndex;
+import org.apache.ignite.internal.schema.BinaryTuple;
+import org.apache.ignite.internal.schema.BinaryTupleSchema;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.NativeTypes;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
+import org.apache.ignite.internal.sql.engine.planner.AbstractPlannerTest;
+import org.apache.ignite.internal.sql.engine.planner.AbstractPlannerTest.TestTable;
+import org.apache.ignite.internal.sql.engine.schema.IgniteIndex;
+import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution;
+import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
+import org.apache.ignite.internal.sql.engine.util.TypeUtils;
+import org.jetbrains.annotations.NotNull;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+//TODO: check merge multiple tries.
+public class IndexScanNodeExecutionTest extends AbstractExecutionTest {
+ @Test
+ public void sortedIndex() {
+ // Empty index.
+ verifyIndexScan(
+ EMPTY,
+ ImmutableBitSet.of(0, 1),
+ EMPTY
+ );
+
+ verifyIndexScan(
+ new Object[][]{
+ {1, "Roman", null},
+ {2, "Igor", 4L},
+ {3, "Taras", 3L},
+ {4, "Alexey", 1L},
+ {5, "Ivan", 4L},
+ {6, "Andrey", 2L}
+ },
+ ImmutableBitSet.of(0, 2),
+ // TODO: sort data
+ new Object[][]{
+ {1, "Roman", null},
+ {2, "Igor", null},
+ {3, "Taras", null},
+ {4, "Alexey", null},
+ {5, "Ivan", null},
+ {6, "Andrey", null}
+ }
+ );
+ }
+
+ private void verifyIndexScan(Object[][] tableData, ImmutableBitSet requiredColumns, Object[][] expRes) {
+ BinaryTuple[] tableRows = createRows(tableData);
+
+ BinaryTuple[] part0Rows = Arrays.stream(tableRows).limit(tableRows.length / 2).toArray(BinaryTuple[]::new);
+ BinaryTuple[] part2Rows = Arrays.stream(tableRows).skip(tableRows.length / 2).toArray(BinaryTuple[]::new);
+
+ ExecutionContext<Object[]> ectx = executionContext(true);
+
+ RelDataType rowType = TypeUtils.createRowType(ectx.getTypeFactory(), long.class, String.class, int.class);
+
+ IgniteIndex indexMock = Mockito.mock(IgniteIndex.class);
+ SortedIndex sortedIndexMock = Mockito.mock(SortedIndex.class);
+
+ Mockito.doReturn(sortedIndexMock).when(indexMock).index();
+
+ Mockito.doReturn(dummyPublisher(part0Rows)).when(sortedIndexMock)
+ .scan(Mockito.eq(0), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.anyInt(), Mockito.any());
+ Mockito.doReturn(dummyPublisher(part2Rows)).when(sortedIndexMock)
+ .scan(Mockito.eq(2), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.anyInt(), Mockito.any());
+
+ AbstractPlannerTest.TestTable table = new AbstractPlannerTest.TestTable(rowType) {
+ @Override
+ public IgniteDistribution distribution() {
+ return IgniteDistributions.any();
+ }
+ };
+
+ IndexScanNode<Object[]> scanNode = new IndexScanNode<>(
+ ectx,
+ rowType,
+ indexMock,
+ table,
+ new int[]{0, 2},
+ null,
+ null,
+ null,
+ null,
+ requiredColumns.toBitSet()
+ );
+
+ RootNode<Object[]> node = new RootNode<>(ectx, rowType);
+ node.register(scanNode);
+
+ ArrayList<Object[]> res = new ArrayList<>();
+
+ while (node.hasNext()) {
+ res.add(node.next());
+ }
+
+ assertThat(res.toArray(EMPTY), equalTo(expRes));
+ }
+
+ private BinaryTuple[] createRows(Object[][] tableData) {
+ BinaryTupleSchema binaryTupleSchema = BinaryTupleSchema.createRowSchema(new SchemaDescriptor(
+ 1,
+ new Column[]{new Column("key", NativeTypes.INT32, false)},
+ new Column[]{
+ new Column("idxVal", NativeTypes.INT64, true),
+ new Column("val", NativeTypes.stringOf(Integer.MAX_VALUE), true)
+ }
+ ));
+
+ return Arrays.stream(tableData)
+ .map(row -> new BinaryTuple(binaryTupleSchema, new BinaryTupleBuilder(3, true)
+ .appendInt((int) row[0])
+ .appendLong((Long) row[2])
+ .appendString((String) row[1])
+ .build())
+ )
+ .toArray(BinaryTuple[]::new);
+ }
+
+ @NotNull
+ private static Publisher<BinaryTuple> dummyPublisher(BinaryTuple[] rows) {
+ return s -> {
+ s.onSubscribe(new Subscription() {
+ @Override
+ public void request(long n) {
+ // No-op.
+ }
+
+ @Override
+ public void cancel() {
+ // No-op.
+ }
+ });
+
+ for (int i = 0; i < rows.length; ++i) {
+ s.onNext(rows[i]);
+ }
+
+ s.onComplete();
+ };
+ }
+
+ //TODO: Add hash index.
+}
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/MergeJoinExecutionTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/MergeJoinExecutionTest.java
index 078111b024..019e3fec03 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/MergeJoinExecutionTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/MergeJoinExecutionTest.java
@@ -12,7 +12,7 @@
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
- * limitations under the License.
+* limitations under the License.
*/
package org.apache.ignite.internal.sql.engine.exec.rel;
@@ -42,8 +42,6 @@ import org.junit.jupiter.api.Test;
* TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
*/
public class MergeJoinExecutionTest extends AbstractExecutionTest {
- public static final Object[][] EMPTY = new Object[0][];
-
@Test
public void joinEmptyTables() {
verifyJoin(EMPTY, EMPTY, INNER, EMPTY);
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/NestedLoopJoinExecutionTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/NestedLoopJoinExecutionTest.java
index 6f8c763959..d457b95306 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/NestedLoopJoinExecutionTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/NestedLoopJoinExecutionTest.java
@@ -43,8 +43,6 @@ import org.junit.jupiter.api.Test;
* TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
*/
public class NestedLoopJoinExecutionTest extends AbstractExecutionTest {
- public static final Object[][] EMPTY = new Object[0][];
-
@Test
public void joinEmptyTables() {
verifyJoin(EMPTY, EMPTY, INNER, EMPTY);