You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by zs...@apache.org on 2022/01/26 15:28:48 UTC
[ignite-3] branch main updated: IGNITE-16364 Sql. Adopt IGNITE-14991, IGNITE-15235, IGNITE-15526 - Fixes #581.
This is an automated email from the ASF dual-hosted git repository.
zstan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new de98821 IGNITE-16364 Sql. Adopt IGNITE-14991, IGNITE-15235, IGNITE-15526 - Fixes #581.
de98821 is described below
commit de98821d6053af7e6e74e15e32c12c5b81adc068
Author: zstan <st...@gmail.com>
AuthorDate: Wed Jan 26 18:26:14 2022 +0300
IGNITE-16364 Sql. Adopt IGNITE-14991, IGNITE-15235, IGNITE-15526 - Fixes #581.
Supports regexp operators.
Remove tableSpoolBroadcastNotRewindable test.
Incorrect grouping reset during rewind.
Signed-off-by: zstan <st...@gmail.com>
---
.../internal/sql/engine/ItFunctionsTest.java | 53 ++++++++---
modules/sql-engine/src/main/codegen/config.fmpp | 2 +-
.../src/main/codegen/includes/parserImpls.ftl | 6 ++
.../sql/engine/exec/QueryTaskExecutorImpl.java | 2 +-
.../internal/sql/engine/exec/exp/RexImpTable.java | 43 +++++++--
.../sql/engine/exec/rel/HashAggregateNode.java | 12 ++-
.../sql/engine/exec/rel/AbstractExecutionTest.java | 103 ++++++++++++++++++++-
.../sql/engine/exec/rel/BaseAggregateTest.java | 12 ++-
.../sql/engine/exec/rel/ExecutionTest.java | 18 ++--
.../exec/rel/HashAggregateExecutionTest.java | 101 +++++++++++++++++---
.../rel/HashAggregateSingleGroupExecutionTest.java | 2 +
.../exec/rel/HashIndexSpoolExecutionTest.java | 2 +-
.../engine/exec/rel/MergeJoinExecutionTest.java | 2 +-
.../exec/rel/NestedLoopJoinExecutionTest.java | 2 +-
.../engine/exec/rel/TableSpoolExecutionTest.java | 21 ++---
.../sql/engine/planner/TableSpoolPlannerTest.java | 60 ------------
16 files changed, 313 insertions(+), 128 deletions(-)
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItFunctionsTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItFunctionsTest.java
index 46da467..4cd013c 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItFunctionsTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItFunctionsTest.java
@@ -42,10 +42,12 @@ import org.junit.jupiter.api.Test;
*/
@Disabled("https://issues.apache.org/jira/browse/IGNITE-15655")
public class ItFunctionsTest extends AbstractBasicIntegrationTest {
+ private static final Object[] NULL_RESULT = new Object[] { null };
+
@Test
public void testLength() {
assertQuery("SELECT LENGTH('TEST')").returns(4).check();
- assertQuery("SELECT LENGTH(NULL)").returns(new Object[]{null}).check();
+ assertQuery("SELECT LENGTH(NULL)").returns(NULL_RESULT).check();
}
@Test
@@ -203,21 +205,21 @@ public class ItFunctionsTest extends AbstractBasicIntegrationTest {
public void testPercentRemainder() {
assertQuery("SELECT 3 % 2").returns(1).check();
assertQuery("SELECT 4 % 2").returns(0).check();
- assertQuery("SELECT NULL % 2").returns(new Object[]{null}).check();
- assertQuery("SELECT 3 % NULL::int").returns(new Object[]{null}).check();
- assertQuery("SELECT 3 % NULL").returns(new Object[] { null }).check();
+ assertQuery("SELECT NULL % 2").returns(NULL_RESULT).check();
+ assertQuery("SELECT 3 % NULL::int").returns(NULL_RESULT).check();
+ assertQuery("SELECT 3 % NULL").returns(NULL_RESULT).check();
}
@Test
public void testNullFunctionArguments() {
// Don't infer result data type from arguments (result is always INTEGER_NULLABLE).
- assertQuery("SELECT ASCII(NULL)").returns(new Object[] { null }).check();
+ assertQuery("SELECT ASCII(NULL)").returns(NULL_RESULT).check();
// Inferring result data type from first STRING argument.
- assertQuery("SELECT REPLACE(NULL, '1', '2')").returns(new Object[] { null }).check();
+ assertQuery("SELECT REPLACE(NULL, '1', '2')").returns(NULL_RESULT).check();
// Inferring result data type from both arguments.
- assertQuery("SELECT MOD(1, null)").returns(new Object[] { null }).check();
+ assertQuery("SELECT MOD(1, null)").returns(NULL_RESULT).check();
// Inferring result data type from first NUMERIC argument.
- assertQuery("SELECT TRUNCATE(NULL, 0)").returns(new Object[] { null }).check();
+ assertQuery("SELECT TRUNCATE(NULL, 0)").returns(NULL_RESULT).check();
// Inferring arguments data types and then inferring result data type from all arguments.
assertQuery("SELECT FALSE AND NULL").returns(false).check();
}
@@ -225,9 +227,9 @@ public class ItFunctionsTest extends AbstractBasicIntegrationTest {
@Test
public void testReplace() {
assertQuery("SELECT REPLACE('12341234', '1', '55')").returns("5523455234").check();
- assertQuery("SELECT REPLACE(NULL, '1', '5')").returns(new Object[] { null }).check();
- assertQuery("SELECT REPLACE('1', NULL, '5')").returns(new Object[] { null }).check();
- assertQuery("SELECT REPLACE('11', '1', NULL)").returns(new Object[] { null }).check();
+ assertQuery("SELECT REPLACE(NULL, '1', '5')").returns(NULL_RESULT).check();
+ assertQuery("SELECT REPLACE('1', NULL, '5')").returns(NULL_RESULT).check();
+ assertQuery("SELECT REPLACE('11', '1', NULL)").returns(NULL_RESULT).check();
assertQuery("SELECT REPLACE('11', '1', '')").returns("").check();
}
@@ -236,4 +238,33 @@ public class ItFunctionsTest extends AbstractBasicIntegrationTest {
assertQuery("SELECT MONTHNAME(DATE '2021-01-01')").returns("January").check();
assertQuery("SELECT DAYNAME(DATE '2021-01-01')").returns("Friday").check();
}
+
+ @Test
+ public void testRegex() {
+ assertQuery("SELECT 'abcd' ~ 'ab[cd]'").returns(true).check();
+ assertQuery("SELECT 'abcd' ~ 'ab[cd]$'").returns(false).check();
+ assertQuery("SELECT 'abcd' ~ 'ab[CD]'").returns(false).check();
+ assertQuery("SELECT 'abcd' ~* 'ab[cd]'").returns(true).check();
+ assertQuery("SELECT 'abcd' ~* 'ab[cd]$'").returns(false).check();
+ assertQuery("SELECT 'abcd' ~* 'ab[CD]'").returns(true).check();
+ assertQuery("SELECT 'abcd' !~ 'ab[cd]'").returns(false).check();
+ assertQuery("SELECT 'abcd' !~ 'ab[cd]$'").returns(true).check();
+ assertQuery("SELECT 'abcd' !~ 'ab[CD]'").returns(true).check();
+ assertQuery("SELECT 'abcd' !~* 'ab[cd]'").returns(false).check();
+ assertQuery("SELECT 'abcd' !~* 'ab[cd]$'").returns(true).check();
+ assertQuery("SELECT 'abcd' !~* 'ab[CD]'").returns(false).check();
+ assertQuery("SELECT null ~ 'ab[cd]'").returns(NULL_RESULT).check();
+ assertQuery("SELECT 'abcd' ~ null").returns(NULL_RESULT).check();
+ assertQuery("SELECT null ~ null").returns(NULL_RESULT).check();
+ assertQuery("SELECT null ~* 'ab[cd]'").returns(NULL_RESULT).check();
+ assertQuery("SELECT 'abcd' ~* null").returns(NULL_RESULT).check();
+ assertQuery("SELECT null ~* null").returns(NULL_RESULT).check();
+ assertQuery("SELECT null !~ 'ab[cd]'").returns(NULL_RESULT).check();
+ assertQuery("SELECT 'abcd' !~ null").returns(NULL_RESULT).check();
+ assertQuery("SELECT null !~ null").returns(NULL_RESULT).check();
+ assertQuery("SELECT null !~* 'ab[cd]'").returns(NULL_RESULT).check();
+ assertQuery("SELECT 'abcd' !~* null").returns(NULL_RESULT).check();
+ assertQuery("SELECT null !~* null").returns(NULL_RESULT).check();
+ assertThrows(IgniteException.class, () -> sql("SELECT 'abcd' ~ '[a-z'"));
+ }
}
diff --git a/modules/sql-engine/src/main/codegen/config.fmpp b/modules/sql-engine/src/main/codegen/config.fmpp
index 45fd12c..ada5e57 100644
--- a/modules/sql-engine/src/main/codegen/config.fmpp
+++ b/modules/sql-engine/src/main/codegen/config.fmpp
@@ -627,7 +627,7 @@ data: {
"parserImpls.ftl"
]
- includePosixOperators: false
+ includePosixOperators: true
includeCompoundIdentifier: true
includeBraces: true
includeAdditionalDeclarations: false
diff --git a/modules/sql-engine/src/main/codegen/includes/parserImpls.ftl b/modules/sql-engine/src/main/codegen/includes/parserImpls.ftl
index 586062b..666b762 100644
--- a/modules/sql-engine/src/main/codegen/includes/parserImpls.ftl
+++ b/modules/sql-engine/src/main/codegen/includes/parserImpls.ftl
@@ -330,3 +330,9 @@ SqlNode SqlAlterTable() :
}
)
}
+
+<DEFAULT, DQID, BTID> TOKEN :
+{
+< NEGATE: "!" >
+| < TILDE: "~" >
+}
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/QueryTaskExecutorImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/QueryTaskExecutorImpl.java
index 726a6ef..feccb22 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/QueryTaskExecutorImpl.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/QueryTaskExecutorImpl.java
@@ -49,7 +49,7 @@ public class QueryTaskExecutorImpl implements QueryTaskExecutor, Thread.Uncaught
public void start() {
this.stripedThreadPoolExecutor = new StripedThreadPoolExecutor(
4,
- NamedThreadFactory.threadPrefix(nodeName, "calciteQry"),
+ NamedThreadFactory.threadPrefix(nodeName, "sqlExec"),
null,
false,
0
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/RexImpTable.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/RexImpTable.java
index d9a002b..d93f59a 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/RexImpTable.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/RexImpTable.java
@@ -413,17 +413,18 @@ public class RexImpTable {
map.put(NOT_SIMILAR_TO, NotImplementor.of(similarImplementor));
// POSIX REGEX
- final MethodImplementor posixRegexImplementor =
- new MethodImplementor(BuiltInMethod.POSIX_REGEX.method,
- NullPolicy.STRICT, false);
+ final MethodImplementor posixRegexImplementorCaseSensitive =
+ new PosixRegexMethodImplementor(true);
+ final MethodImplementor posixRegexImplementorCaseInsensitive =
+ new PosixRegexMethodImplementor(false);
map.put(SqlStdOperatorTable.POSIX_REGEX_CASE_INSENSITIVE,
- posixRegexImplementor);
+ posixRegexImplementorCaseInsensitive);
map.put(SqlStdOperatorTable.POSIX_REGEX_CASE_SENSITIVE,
- posixRegexImplementor);
+ posixRegexImplementorCaseSensitive);
map.put(SqlStdOperatorTable.NEGATED_POSIX_REGEX_CASE_INSENSITIVE,
- NotImplementor.of(posixRegexImplementor));
+ NotImplementor.of(posixRegexImplementorCaseInsensitive));
map.put(SqlStdOperatorTable.NEGATED_POSIX_REGEX_CASE_SENSITIVE,
- NotImplementor.of(posixRegexImplementor));
+ NotImplementor.of(posixRegexImplementorCaseSensitive));
map.put(REGEXP_REPLACE, new RegexpReplaceImplementor());
// Multisets & arrays
@@ -994,6 +995,28 @@ public class RexImpTable {
}
/**
+ * Implementor for {@link org.apache.calcite.sql.fun.SqlPosixRegexOperator}s.
+ */
+ private static class PosixRegexMethodImplementor extends MethodImplementor {
+ protected final boolean caseSensitive;
+
+ PosixRegexMethodImplementor(boolean caseSensitive) {
+ super(BuiltInMethod.POSIX_REGEX.method, NullPolicy.STRICT, false);
+ this.caseSensitive = caseSensitive;
+ }
+
+ /** {@inheritDoc} */
+ @Override Expression implementSafe(RexToLixTranslator translator,
+ RexCall call, List<Expression> argValueList) {
+ assert argValueList.size() == 2;
+ // Add extra parameter (caseSensitive boolean flag), required by SqlFunctions#posixRegex.
+ final List<Expression> newOperands = new ArrayList<>(argValueList);
+ newOperands.add(Expressions.constant(caseSensitive));
+ return super.implementSafe(translator, call, newOperands);
+ }
+ }
+
+ /**
* Implementor for JSON_VALUE function, convert to solid format "JSON_VALUE(json_doc, path, empty_behavior, empty_default,
* error_behavior, error default)" in order to simplify the runtime implementation.
*
@@ -1708,13 +1731,13 @@ public class RexImpTable {
private static class NotImplementor extends AbstractRexCallImplementor {
private AbstractRexCallImplementor implementor;
- private NotImplementor(AbstractRexCallImplementor implementor) {
- super(null, false);
+ private NotImplementor(NullPolicy nullPolicy, AbstractRexCallImplementor implementor) {
+ super(nullPolicy, false);
this.implementor = implementor;
}
static AbstractRexCallImplementor of(AbstractRexCallImplementor implementor) {
- return new NotImplementor(implementor);
+ return new NotImplementor(implementor.nullPolicy, implementor);
}
/** {@inheritDoc} */
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/HashAggregateNode.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/HashAggregateNode.java
index 722d2c1..5bef6cd 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/HashAggregateNode.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/HashAggregateNode.java
@@ -150,7 +150,7 @@ public class HashAggregateNode<RowT> extends AbstractNode<RowT> implements Singl
protected void rewindInternal() {
requested = 0;
waiting = 0;
- groupings.forEach(grouping -> grouping.groups.clear());
+ groupings.forEach(Grouping::reset);
}
/** {@inheritDoc} */
@@ -237,6 +237,10 @@ public class HashAggregateNode<RowT> extends AbstractNode<RowT> implements Singl
handler = context().rowHandler();
+ init();
+ }
+
+ private void init() {
// Initializes aggregates for case when no any rows will be added into the aggregate to have 0 as result.
// Doesn't do it for MAP type due to we don't want send from MAP node zero results because it looks redundant.
if (grpFields.isEmpty() && (type == AggregateType.REDUCE || type == AggregateType.SINGLE)) {
@@ -244,6 +248,12 @@ public class HashAggregateNode<RowT> extends AbstractNode<RowT> implements Singl
}
}
+ private void reset() {
+ groups.clear();
+
+ init();
+ }
+
private void add(RowT row) {
if (type == AggregateType.REDUCE) {
addOnReducer(row);
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
index f7861e4..cc35f1d 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
@@ -18,8 +18,17 @@
package org.apache.ignite.internal.sql.engine.exec.rel;
import com.google.common.collect.ImmutableMap;
+import java.util.ArrayDeque;
+import java.util.Deque;
import java.util.Iterator;
+import java.util.List;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.LockSupport;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.calcite.rel.type.RelDataType;
@@ -32,6 +41,9 @@ import org.apache.ignite.internal.sql.engine.metadata.FragmentDescription;
import org.apache.ignite.internal.sql.engine.util.BaseQueryContext;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.thread.StripedThreadPoolExecutor;
+import org.apache.ignite.internal.util.Pair;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -45,6 +57,8 @@ public class AbstractExecutionTest extends IgniteAbstractTest {
private QueryTaskExecutorImpl taskExecutor;
+ private List<UUID> nodes;
+
@BeforeEach
public void beforeTest() {
taskExecutor = new QueryTaskExecutorImpl("no_node");
@@ -65,6 +79,19 @@ public class AbstractExecutionTest extends IgniteAbstractTest {
}
protected ExecutionContext<Object[]> executionContext() {
+ return executionContext(false);
+ }
+
+ protected ExecutionContext<Object[]> executionContext(boolean withDelays) {
+ if (withDelays) {
+ StripedThreadPoolExecutor testExecutor = new IgniteTestStripedThreadPoolExecutor(8,
+ NamedThreadFactory.threadPrefix("fake-test-node", "sqlTestExec"),
+ null,
+ false,
+ 0);
+ IgniteTestUtils.setFieldValue(taskExecutor, "stripedThreadPoolExecutor", testExecutor);
+ }
+
FragmentDescription fragmentDesc = new FragmentDescription(0, null, null, null);
return new ExecutionContext<>(
BaseQueryContext.builder()
@@ -90,6 +117,80 @@ public class AbstractExecutionTest extends IgniteAbstractTest {
return fields;
}
+ /** Task reordering executor. */
+ private static class IgniteTestStripedThreadPoolExecutor extends StripedThreadPoolExecutor {
+ final Deque<Pair<Runnable, Integer>> tasks = new ArrayDeque<>();
+
+ /** Internal stop flag. */
+ AtomicBoolean stop = new AtomicBoolean();
+
+ /** Inner execution service. */
+ ExecutorService exec = Executors.newWorkStealingPool();
+
+ CompletableFuture fut;
+
+ /** {@inheritDoc} */
+ public IgniteTestStripedThreadPoolExecutor(
+ int concurrentLvl,
+ String threadNamePrefix,
+ Thread.UncaughtExceptionHandler exHnd,
+ boolean allowCoreThreadTimeOut,
+ long keepAliveTime
+ ) {
+ super(concurrentLvl, threadNamePrefix, exHnd, allowCoreThreadTimeOut, keepAliveTime);
+
+ fut = IgniteTestUtils.runAsync(() -> {
+ while (!stop.get()) {
+ synchronized (tasks) {
+ while (tasks.isEmpty()) {
+ try {
+ tasks.wait();
+ } catch (InterruptedException e) {
+ // no op.
+ }
+ }
+
+ Pair<Runnable, Integer> r = tasks.pollFirst();
+
+ exec.execute(() -> {
+ LockSupport.parkNanos(ThreadLocalRandom.current().nextLong(0, 10_000));
+ super.execute(r.getFirst(), r.getSecond());
+ });
+
+ tasks.notifyAll();
+ }
+ }
+ });
+ }
+
+ /** {@inheritDoc} */
+ @Override public void execute(Runnable task, int idx) {
+ synchronized (tasks) {
+ tasks.add(new Pair<>(task, idx));
+
+ tasks.notifyAll();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void shutdown() {
+ stop.set(true);
+
+ fut.cancel(true);
+
+ super.shutdown();
+ }
+
+ /** {@inheritDoc} */
+ @Override public List<Runnable> shutdownNow() {
+ stop.set(true);
+
+ fut.cancel(true);
+
+ return super.shutdownNow();
+ }
+ }
+
/**
* TestTable.
* TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
@@ -155,7 +256,7 @@ public class AbstractExecutionTest extends IgniteAbstractTest {
@NotNull
@Override
public Iterator<Object[]> iterator() {
- return new Iterator<Object[]>() {
+ return new Iterator<>() {
private int curRow;
@Override
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/BaseAggregateTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/BaseAggregateTest.java
index 8c945c7..5f0977e 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/BaseAggregateTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/BaseAggregateTest.java
@@ -59,7 +59,7 @@ public abstract class BaseAggregateTest extends AbstractExecutionTest {
@ParameterizedTest
@EnumSource
public void count(TestAggregateType testAgg) {
- ExecutionContext<Object[]> ctx = executionContext();
+ ExecutionContext<Object[]> ctx = executionContext(true);
IgniteTypeFactory tf = ctx.getTypeFactory();
RelDataType rowType = TypeUtils.createRowType(tf, int.class, int.class);
ScanNode<Object[]> scan = new ScanNode<>(ctx, rowType, Arrays.asList(
@@ -76,6 +76,7 @@ public abstract class BaseAggregateTest extends AbstractExecutionTest {
false,
ImmutableIntList.of(),
-1,
+ null,
RelCollations.EMPTY,
tf.createJavaType(int.class),
null);
@@ -320,7 +321,12 @@ public abstract class BaseAggregateTest extends AbstractExecutionTest {
* @param mustFail {@code true} If expression must throw exception.
**/
@SuppressWarnings("ThrowableNotThrown")
- public void singleAggr(TestAggregateType testAgg, List<Object[]> scanInput, Object[] output, boolean mustFail) {
+ public void singleAggr(
+ TestAggregateType testAgg,
+ List<Object[]> scanInput,
+ Object[] output,
+ boolean mustFail
+ ) {
ExecutionContext<Object[]> ctx = executionContext();
IgniteTypeFactory tf = ctx.getTypeFactory();
RelDataType rowType = TypeUtils.createRowType(tf, int.class, int.class);
@@ -505,7 +511,7 @@ public abstract class BaseAggregateTest extends AbstractExecutionTest {
}
}
- private SingleNode<Object[]> createAggregateNodesChain(
+ protected SingleNode<Object[]> createAggregateNodesChain(
TestAggregateType testAgg,
ExecutionContext<Object[]> ctx,
List<ImmutableBitSet> grpSets,
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExecutionTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExecutionTest.java
index 073dd45..d58d10f 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExecutionTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExecutionTest.java
@@ -64,7 +64,7 @@ public class ExecutionTest extends AbstractExecutionTest {
// ON P.ID = PR.RESP_ID
// WHERE P.ID >= 2
- ExecutionContext<Object[]> ctx = executionContext();
+ ExecutionContext<Object[]> ctx = executionContext(true);
IgniteTypeFactory tf = ctx.getTypeFactory();
RelDataType rowType = TypeUtils.createRowType(tf, int.class, String.class, String.class);
@@ -119,7 +119,7 @@ public class ExecutionTest extends AbstractExecutionTest {
@Test
public void testUnionAll() {
- ExecutionContext<Object[]> ctx = executionContext();
+ ExecutionContext<Object[]> ctx = executionContext(true);
IgniteTypeFactory tf = ctx.getTypeFactory();
RelDataType rowType = TypeUtils.createRowType(tf, String.class, int.class);
@@ -168,7 +168,7 @@ public class ExecutionTest extends AbstractExecutionTest {
// left join dep d
// on e.depno = d.depno
- ExecutionContext<Object[]> ctx = executionContext();
+ ExecutionContext<Object[]> ctx = executionContext(true);
IgniteTypeFactory tf = ctx.getTypeFactory();
RelDataType rowType = TypeUtils.createRowType(tf, int.class, String.class, Integer.class);
@@ -227,7 +227,7 @@ public class ExecutionTest extends AbstractExecutionTest {
// right join emp e
// on e.depno = d.depno
- ExecutionContext<Object[]> ctx = executionContext();
+ ExecutionContext<Object[]> ctx = executionContext(true);
IgniteTypeFactory tf = ctx.getTypeFactory();
RelDataType rowType = TypeUtils.createRowType(tf, int.class, String.class, Integer.class);
@@ -286,7 +286,7 @@ public class ExecutionTest extends AbstractExecutionTest {
// full outer join dep d
// on e.depno = d.depno
- ExecutionContext<Object[]> ctx = executionContext();
+ ExecutionContext<Object[]> ctx = executionContext(true);
IgniteTypeFactory tf = ctx.getTypeFactory();
RelDataType rowType = TypeUtils.createRowType(tf, int.class, String.class, Integer.class);
@@ -346,7 +346,7 @@ public class ExecutionTest extends AbstractExecutionTest {
// semi join emp e
// on e.depno = d.depno
- ExecutionContext<Object[]> ctx = executionContext();
+ ExecutionContext<Object[]> ctx = executionContext(true);
IgniteTypeFactory tf = ctx.getTypeFactory();
RelDataType rowType = TypeUtils.createRowType(tf, int.class, String.class, Integer.class);
@@ -402,7 +402,7 @@ public class ExecutionTest extends AbstractExecutionTest {
// anti join emp e
// on e.depno = d.depno
- ExecutionContext<Object[]> ctx = executionContext();
+ ExecutionContext<Object[]> ctx = executionContext(true);
IgniteTypeFactory tf = ctx.getTypeFactory();
RelDataType rowType = TypeUtils.createRowType(tf, int.class, String.class, Integer.class);
@@ -457,7 +457,7 @@ public class ExecutionTest extends AbstractExecutionTest {
@ParameterizedTest
@MethodSource("provideArgumentsForCnlJtest")
public void testCorrelatedNestedLoopJoin(int leftSize, int rightSize, int rightBufSize, JoinRelType joinType) {
- ExecutionContext<Object[]> ctx = executionContext();
+ ExecutionContext<Object[]> ctx = executionContext(true);
IgniteTypeFactory tf = ctx.getTypeFactory();
RelDataType rowType = TypeUtils.createRowType(tf, int.class, String.class, int.class);
@@ -502,7 +502,7 @@ public class ExecutionTest extends AbstractExecutionTest {
@Test
public void testMergeJoin() {
- ExecutionContext<Object[]> ctx = executionContext();
+ ExecutionContext<Object[]> ctx = executionContext(true);
IgniteTypeFactory tf = ctx.getTypeFactory();
RelDataType rowType = TypeUtils.createRowType(tf, int.class, String.class, int.class);
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/HashAggregateExecutionTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/HashAggregateExecutionTest.java
index c3ba33e..f2f2066 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/HashAggregateExecutionTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/HashAggregateExecutionTest.java
@@ -21,19 +21,29 @@ import static org.apache.ignite.internal.sql.engine.exec.exp.agg.AggregateType.M
import static org.apache.ignite.internal.sql.engine.exec.exp.agg.AggregateType.REDUCE;
import static org.apache.ignite.internal.sql.engine.exec.exp.agg.AggregateType.SINGLE;
import static org.apache.ignite.internal.util.CollectionUtils.first;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import com.google.common.collect.ImmutableList;
+import java.util.Collections;
import java.util.Comparator;
import java.util.List;
-import java.util.stream.Collectors;
+import java.util.Objects;
import java.util.stream.IntStream;
import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelCollations;
import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.calcite.util.ImmutableIntList;
import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
import org.apache.ignite.internal.sql.engine.exec.RowHandler;
+import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
+import org.apache.ignite.internal.sql.engine.util.TypeUtils;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
/**
* HashAggregateExecutionTest.
@@ -64,13 +74,7 @@ public class HashAggregateExecutionTest extends BaseAggregateTest {
agg.register(scan);
- // Collation of the first fields emulates planner behavior:
- // The group's keys placed on the begin of the output row.
- RelCollation collation = RelCollations.of(
- ImmutableIntList.copyOf(
- IntStream.range(0, first(grpSets).cardinality()).boxed().collect(Collectors.toList())
- )
- );
+ RelCollation collation = createOutCollation(grpSets);
Comparator<Object[]> cmp = ctx.expressionFactory().comparator(collation);
@@ -82,6 +86,21 @@ public class HashAggregateExecutionTest extends BaseAggregateTest {
return sort;
}
+ private RelCollation createOutCollation(List<ImmutableBitSet> grpSets) {
+ RelCollation collation;
+
+ if (!grpSets.isEmpty() && grpSets.stream().anyMatch(set -> !set.isEmpty())) {
+ // Sort by group to simplify compare results with expected results.
+ collation = RelCollations.of(
+ ImmutableIntList.of(IntStream.range(0, Objects.requireNonNull(first(grpSets)).cardinality()).toArray()));
+ } else {
+ // Sort for the first column if there are no groups.
+ collation = RelCollations.of(0);
+ }
+
+ return collation;
+ }
+
/** {@inheritDoc} */
@Override
protected SingleNode<Object[]> createMapReduceAggregateNodesChain(
@@ -117,13 +136,7 @@ public class HashAggregateExecutionTest extends BaseAggregateTest {
aggRdc.register(aggMap);
- // Collation of the first fields emulates planner behavior:
- // The group's keys placed on the begin of the output row.
- RelCollation collation = RelCollations.of(
- ImmutableIntList.copyOf(
- IntStream.range(0, first(grpSets).cardinality()).boxed().collect(Collectors.toList())
- )
- );
+ RelCollation collation = createOutCollation(grpSets);
Comparator<Object[]> cmp = ctx.expressionFactory().comparator(collation);
@@ -134,4 +147,62 @@ public class HashAggregateExecutionTest extends BaseAggregateTest {
return sort;
}
+
+
+ /**
+ * Test verifies that after rewind all groups are properly initialized.
+ */
+ @ParameterizedTest
+ @EnumSource
+ public void countOfEmptyWithRewind(TestAggregateType testAgg) {
+ ExecutionContext<Object[]> ctx = executionContext();
+ IgniteTypeFactory tf = ctx.getTypeFactory();
+ RelDataType rowType = TypeUtils.createRowType(tf, int.class, int.class);
+ ScanNode<Object[]> scan = new ScanNode<>(ctx, rowType, Collections.emptyList());
+
+ AggregateCall call = AggregateCall.create(
+ SqlStdOperatorTable.COUNT,
+ false,
+ false,
+ false,
+ ImmutableIntList.of(),
+ -1,
+ null,
+ RelCollations.EMPTY,
+ tf.createJavaType(int.class),
+ null
+ );
+
+ ImmutableList<ImmutableBitSet> grpSets = ImmutableList.of(ImmutableBitSet.of());
+
+ RelDataType aggRowType = TypeUtils.createRowType(tf, int.class);
+
+ SingleNode<Object[]> aggChain = createAggregateNodesChain(
+ testAgg,
+ ctx,
+ grpSets,
+ call,
+ rowType,
+ aggRowType,
+ rowFactory(),
+ scan
+ );
+
+ for (int i = 0; i < 2; i++) {
+ RootNode<Object[]> root = new RootNode<>(ctx, aggRowType) {
+ /** {@inheritDoc} */
+ @Override public void close() {
+ // NO-OP
+ }
+ };
+
+ root.register(aggChain);
+
+ assertTrue(root.hasNext());
+ assertArrayEquals(row(0), root.next());
+ assertFalse(root.hasNext());
+
+ aggChain.rewind();
+ }
+ }
}
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/HashAggregateSingleGroupExecutionTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/HashAggregateSingleGroupExecutionTest.java
index 3f3aa51..b1d2f29 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/HashAggregateSingleGroupExecutionTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/HashAggregateSingleGroupExecutionTest.java
@@ -66,6 +66,7 @@ public class HashAggregateSingleGroupExecutionTest extends AbstractExecutionTest
false,
ImmutableIntList.of(1),
-1,
+ null,
RelCollations.EMPTY,
tf.createJavaType(double.class),
null);
@@ -447,6 +448,7 @@ public class HashAggregateSingleGroupExecutionTest extends AbstractExecutionTest
false,
ImmutableIntList.of(),
-1,
+ null,
RelCollations.EMPTY,
tf.createJavaType(int.class),
null);
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/HashIndexSpoolExecutionTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/HashIndexSpoolExecutionTest.java
index cc86cdc..00a9338 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/HashIndexSpoolExecutionTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/HashIndexSpoolExecutionTest.java
@@ -39,7 +39,7 @@ import org.junit.jupiter.api.Test;
public class HashIndexSpoolExecutionTest extends AbstractExecutionTest {
@Test
public void testIndexSpool() {
- ExecutionContext<Object[]> ctx = executionContext();
+ ExecutionContext<Object[]> ctx = executionContext(true);
IgniteTypeFactory tf = ctx.getTypeFactory();
RelDataType rowType = TypeUtils.createRowType(tf, int.class, String.class, int.class);
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/MergeJoinExecutionTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/MergeJoinExecutionTest.java
index e550dde..bf3ddd5 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/MergeJoinExecutionTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/MergeJoinExecutionTest.java
@@ -306,7 +306,7 @@ public class MergeJoinExecutionTest extends AbstractExecutionTest {
* @param expRes Expected result.
*/
private void verifyJoin(Object[][] left, Object[][] right, JoinRelType joinType, Object[][] expRes) {
- ExecutionContext<Object[]> ctx = executionContext();
+ ExecutionContext<Object[]> ctx = executionContext(true);
RelDataType leftType = TypeUtils.createRowType(ctx.getTypeFactory(), int.class, String.class, Integer.class);
ScanNode<Object[]> leftNode = new ScanNode<>(ctx, leftType, Arrays.asList(left));
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/NestedLoopJoinExecutionTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/NestedLoopJoinExecutionTest.java
index 0d48aca..da50810 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/NestedLoopJoinExecutionTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/NestedLoopJoinExecutionTest.java
@@ -307,7 +307,7 @@ public class NestedLoopJoinExecutionTest extends AbstractExecutionTest {
* @param expRes Expected result.
*/
private void verifyJoin(Object[][] left, Object[][] right, JoinRelType joinType, Object[][] expRes) {
- ExecutionContext<Object[]> ctx = executionContext();
+ ExecutionContext<Object[]> ctx = executionContext(true);
RelDataType leftType = TypeUtils.createRowType(ctx.getTypeFactory(), int.class, String.class, Integer.class);
ScanNode<Object[]> leftNode = new ScanNode<>(ctx, leftType, Arrays.asList(left));
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableSpoolExecutionTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableSpoolExecutionTest.java
index e5960b5..837c1c3 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableSpoolExecutionTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableSpoolExecutionTest.java
@@ -71,18 +71,14 @@ public class TableSpoolExecutionTest extends AbstractExecutionTest {
AtomicReference<Iterator<Object[]>> itRef = new AtomicReference<>();
- ScanNode<Object[]> scan = new ScanNode<>(ctx, rowType, new Iterable<Object[]>() {
- @NotNull
- @Override
- public Iterator<Object[]> iterator() {
- if (itRef.get() != null) {
- throw new AssertionError();
- }
+ ScanNode<Object[]> scan = new ScanNode<>(ctx, rowType, () -> {
+ if (itRef.get() != null) {
+ throw new AssertionError();
+ }
- itRef.set(IntStream.range(0, size).boxed().map(i -> new Object[]{i}).iterator());
+ itRef.set(IntStream.range(0, size).boxed().map(i -> new Object[]{i}).iterator());
- return itRef.get();
- }
+ return itRef.get();
});
TableSpoolNode<Object[]> spool = new TableSpoolNode<>(ctx, rowType, false);
@@ -104,8 +100,7 @@ public class TableSpoolExecutionTest extends AbstractExecutionTest {
* CheckTableSpool.
* TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
*/
- public void checkTableSpool(
- BiFunction<ExecutionContext<Object[]>, RelDataType, TableSpoolNode<Object[]>> spoolFactory) {
+ public void checkTableSpool(BiFunction<ExecutionContext<Object[]>, RelDataType, TableSpoolNode<Object[]>> spoolFactory) {
ExecutionContext<Object[]> ctx = executionContext();
IgniteTypeFactory tf = ctx.getTypeFactory();
RelDataType rowType = TypeUtils.createRowType(tf, int.class, String.class, int.class);
@@ -113,7 +108,7 @@ public class TableSpoolExecutionTest extends AbstractExecutionTest {
int inBufSize = Commons.IN_BUFFER_SIZE;
int[] sizes = {1, inBufSize / 2 - 1, inBufSize / 2, inBufSize / 2 + 1, inBufSize, inBufSize + 1, inBufSize * 4};
- // int[] sizes = {inBufSize * 4};
+
int rewindCnts = 32;
for (int size : sizes) {
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/TableSpoolPlannerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/TableSpoolPlannerTest.java
index c739c53..75ab749 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/TableSpoolPlannerTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/TableSpoolPlannerTest.java
@@ -28,7 +28,6 @@ import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution;
import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
import org.apache.ignite.internal.sql.engine.type.IgniteTypeSystem;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
/**
@@ -89,63 +88,4 @@ public class TableSpoolPlannerTest extends AbstractPlannerTest {
assertNotNull(tblSpool, "Invalid plan:\n" + RelOptUtil.toString(phys));
}
-
- /**
- * TableSpoolBroadcastNotRewindable.
- * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
- *
- * @throws Exception If failed.
- */
- @Test
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-15235")
- public void tableSpoolBroadcastNotRewindable() throws Exception {
- IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
-
- TestTable t0 = new TestTable(
- new RelDataTypeFactory.Builder(f)
- .add("ID", f.createJavaType(Integer.class))
- .add("JID", f.createJavaType(Integer.class))
- .add("VAL", f.createJavaType(String.class))
- .build()
- ) {
-
- @Override
- public IgniteDistribution distribution() {
- return IgniteDistributions.broadcast();
- }
- };
-
- TestTable t1 = new TestTable(
- new RelDataTypeFactory.Builder(f)
- .add("ID", f.createJavaType(Integer.class))
- .add("JID", f.createJavaType(Integer.class))
- .add("VAL", f.createJavaType(String.class))
- .build()
- ) {
-
- @Override
- public IgniteDistribution distribution() {
- return IgniteDistributions.broadcast();
- }
- };
-
- IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
-
- publicSchema.addTable("T0", t0);
- publicSchema.addTable("T1", t1);
-
- String sql = "select * "
- + "from t0 "
- + "join t1 on t0.jid = t1.jid";
-
- RelNode phys = physicalPlan(sql, publicSchema,
- "MergeJoinConverter", "NestedLoopJoinConverter",
- "FilterSpoolMergeToHashIndexSpoolRule", "FilterSpoolMergeToSortIndexSpoolRule");
-
- assertNotNull(phys);
-
- IgniteTableSpool tblSpool = findFirstNode(phys, byClass(IgniteTableSpool.class));
-
- assertNotNull(tblSpool, "Invalid plan:\n" + RelOptUtil.toString(phys));
- }
}