You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by zs...@apache.org on 2022/01/26 15:28:48 UTC

[ignite-3] branch main updated: IGNITE-16364 Sql. Adopt IGNITE-14991, IGNITE-15235, IGNITE-15526 - Fixes #581.

This is an automated email from the ASF dual-hosted git repository.

zstan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new de98821  IGNITE-16364 Sql. Adopt IGNITE-14991, IGNITE-15235, IGNITE-15526 - Fixes #581.
de98821 is described below

commit de98821d6053af7e6e74e15e32c12c5b81adc068
Author: zstan <st...@gmail.com>
AuthorDate: Wed Jan 26 18:26:14 2022 +0300

    IGNITE-16364 Sql. Adopt IGNITE-14991, IGNITE-15235, IGNITE-15526 - Fixes #581.
    
    Supports regexp operators.
    Remove tableSpoolBroadcastNotRewindable test.
    Incorrect grouping reset during rewind.
    
    Signed-off-by: zstan <st...@gmail.com>
---
 .../internal/sql/engine/ItFunctionsTest.java       |  53 ++++++++---
 modules/sql-engine/src/main/codegen/config.fmpp    |   2 +-
 .../src/main/codegen/includes/parserImpls.ftl      |   6 ++
 .../sql/engine/exec/QueryTaskExecutorImpl.java     |   2 +-
 .../internal/sql/engine/exec/exp/RexImpTable.java  |  43 +++++++--
 .../sql/engine/exec/rel/HashAggregateNode.java     |  12 ++-
 .../sql/engine/exec/rel/AbstractExecutionTest.java | 103 ++++++++++++++++++++-
 .../sql/engine/exec/rel/BaseAggregateTest.java     |  12 ++-
 .../sql/engine/exec/rel/ExecutionTest.java         |  18 ++--
 .../exec/rel/HashAggregateExecutionTest.java       | 101 +++++++++++++++++---
 .../rel/HashAggregateSingleGroupExecutionTest.java |   2 +
 .../exec/rel/HashIndexSpoolExecutionTest.java      |   2 +-
 .../engine/exec/rel/MergeJoinExecutionTest.java    |   2 +-
 .../exec/rel/NestedLoopJoinExecutionTest.java      |   2 +-
 .../engine/exec/rel/TableSpoolExecutionTest.java   |  21 ++---
 .../sql/engine/planner/TableSpoolPlannerTest.java  |  60 ------------
 16 files changed, 313 insertions(+), 128 deletions(-)

diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItFunctionsTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItFunctionsTest.java
index 46da467..4cd013c 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItFunctionsTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItFunctionsTest.java
@@ -42,10 +42,12 @@ import org.junit.jupiter.api.Test;
  */
 @Disabled("https://issues.apache.org/jira/browse/IGNITE-15655")
 public class ItFunctionsTest extends AbstractBasicIntegrationTest {
+    private static final Object[] NULL_RESULT = new Object[] { null };
+
     @Test
     public void testLength() {
         assertQuery("SELECT LENGTH('TEST')").returns(4).check();
-        assertQuery("SELECT LENGTH(NULL)").returns(new Object[]{null}).check();
+        assertQuery("SELECT LENGTH(NULL)").returns(NULL_RESULT).check();
     }
 
     @Test
@@ -203,21 +205,21 @@ public class ItFunctionsTest extends AbstractBasicIntegrationTest {
     public void testPercentRemainder() {
         assertQuery("SELECT 3 % 2").returns(1).check();
         assertQuery("SELECT 4 % 2").returns(0).check();
-        assertQuery("SELECT NULL % 2").returns(new Object[]{null}).check();
-        assertQuery("SELECT 3 % NULL::int").returns(new Object[]{null}).check();
-        assertQuery("SELECT 3 % NULL").returns(new Object[] { null }).check();
+        assertQuery("SELECT NULL % 2").returns(NULL_RESULT).check();
+        assertQuery("SELECT 3 % NULL::int").returns(NULL_RESULT).check();
+        assertQuery("SELECT 3 % NULL").returns(NULL_RESULT).check();
     }
 
     @Test
     public void testNullFunctionArguments() {
         // Don't infer result data type from arguments (result is always INTEGER_NULLABLE).
-        assertQuery("SELECT ASCII(NULL)").returns(new Object[] { null }).check();
+        assertQuery("SELECT ASCII(NULL)").returns(NULL_RESULT).check();
         // Inferring result data type from first STRING argument.
-        assertQuery("SELECT REPLACE(NULL, '1', '2')").returns(new Object[] { null }).check();
+        assertQuery("SELECT REPLACE(NULL, '1', '2')").returns(NULL_RESULT).check();
         // Inferring result data type from both arguments.
-        assertQuery("SELECT MOD(1, null)").returns(new Object[] { null }).check();
+        assertQuery("SELECT MOD(1, null)").returns(NULL_RESULT).check();
         // Inferring result data type from first NUMERIC argument.
-        assertQuery("SELECT TRUNCATE(NULL, 0)").returns(new Object[] { null }).check();
+        assertQuery("SELECT TRUNCATE(NULL, 0)").returns(NULL_RESULT).check();
         // Inferring arguments data types and then inferring result data type from all arguments.
         assertQuery("SELECT FALSE AND NULL").returns(false).check();
     }
@@ -225,9 +227,9 @@ public class ItFunctionsTest extends AbstractBasicIntegrationTest {
     @Test
     public void testReplace() {
         assertQuery("SELECT REPLACE('12341234', '1', '55')").returns("5523455234").check();
-        assertQuery("SELECT REPLACE(NULL, '1', '5')").returns(new Object[] { null }).check();
-        assertQuery("SELECT REPLACE('1', NULL, '5')").returns(new Object[] { null }).check();
-        assertQuery("SELECT REPLACE('11', '1', NULL)").returns(new Object[] { null }).check();
+        assertQuery("SELECT REPLACE(NULL, '1', '5')").returns(NULL_RESULT).check();
+        assertQuery("SELECT REPLACE('1', NULL, '5')").returns(NULL_RESULT).check();
+        assertQuery("SELECT REPLACE('11', '1', NULL)").returns(NULL_RESULT).check();
         assertQuery("SELECT REPLACE('11', '1', '')").returns("").check();
     }
 
@@ -236,4 +238,33 @@ public class ItFunctionsTest extends AbstractBasicIntegrationTest {
         assertQuery("SELECT MONTHNAME(DATE '2021-01-01')").returns("January").check();
         assertQuery("SELECT DAYNAME(DATE '2021-01-01')").returns("Friday").check();
     }
+
+    @Test
+    public void testRegex() {
+        assertQuery("SELECT 'abcd' ~ 'ab[cd]'").returns(true).check();
+        assertQuery("SELECT 'abcd' ~ 'ab[cd]$'").returns(false).check();
+        assertQuery("SELECT 'abcd' ~ 'ab[CD]'").returns(false).check();
+        assertQuery("SELECT 'abcd' ~* 'ab[cd]'").returns(true).check();
+        assertQuery("SELECT 'abcd' ~* 'ab[cd]$'").returns(false).check();
+        assertQuery("SELECT 'abcd' ~* 'ab[CD]'").returns(true).check();
+        assertQuery("SELECT 'abcd' !~ 'ab[cd]'").returns(false).check();
+        assertQuery("SELECT 'abcd' !~ 'ab[cd]$'").returns(true).check();
+        assertQuery("SELECT 'abcd' !~ 'ab[CD]'").returns(true).check();
+        assertQuery("SELECT 'abcd' !~* 'ab[cd]'").returns(false).check();
+        assertQuery("SELECT 'abcd' !~* 'ab[cd]$'").returns(true).check();
+        assertQuery("SELECT 'abcd' !~* 'ab[CD]'").returns(false).check();
+        assertQuery("SELECT null ~ 'ab[cd]'").returns(NULL_RESULT).check();
+        assertQuery("SELECT 'abcd' ~ null").returns(NULL_RESULT).check();
+        assertQuery("SELECT null ~ null").returns(NULL_RESULT).check();
+        assertQuery("SELECT null ~* 'ab[cd]'").returns(NULL_RESULT).check();
+        assertQuery("SELECT 'abcd' ~* null").returns(NULL_RESULT).check();
+        assertQuery("SELECT null ~* null").returns(NULL_RESULT).check();
+        assertQuery("SELECT null !~ 'ab[cd]'").returns(NULL_RESULT).check();
+        assertQuery("SELECT 'abcd' !~ null").returns(NULL_RESULT).check();
+        assertQuery("SELECT null !~ null").returns(NULL_RESULT).check();
+        assertQuery("SELECT null !~* 'ab[cd]'").returns(NULL_RESULT).check();
+        assertQuery("SELECT 'abcd' !~* null").returns(NULL_RESULT).check();
+        assertQuery("SELECT null !~* null").returns(NULL_RESULT).check();
+        assertThrows(IgniteException.class, () -> sql("SELECT 'abcd' ~ '[a-z'"));
+    }
 }
diff --git a/modules/sql-engine/src/main/codegen/config.fmpp b/modules/sql-engine/src/main/codegen/config.fmpp
index 45fd12c..ada5e57 100644
--- a/modules/sql-engine/src/main/codegen/config.fmpp
+++ b/modules/sql-engine/src/main/codegen/config.fmpp
@@ -627,7 +627,7 @@ data: {
       "parserImpls.ftl"
     ]
 
-    includePosixOperators: false
+    includePosixOperators: true
     includeCompoundIdentifier: true
     includeBraces: true
     includeAdditionalDeclarations: false
diff --git a/modules/sql-engine/src/main/codegen/includes/parserImpls.ftl b/modules/sql-engine/src/main/codegen/includes/parserImpls.ftl
index 586062b..666b762 100644
--- a/modules/sql-engine/src/main/codegen/includes/parserImpls.ftl
+++ b/modules/sql-engine/src/main/codegen/includes/parserImpls.ftl
@@ -330,3 +330,9 @@ SqlNode SqlAlterTable() :
         }
     )
 }
+
+<DEFAULT, DQID, BTID> TOKEN :
+{
+< NEGATE: "!" >
+|   < TILDE: "~" >
+}
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/QueryTaskExecutorImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/QueryTaskExecutorImpl.java
index 726a6ef..feccb22 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/QueryTaskExecutorImpl.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/QueryTaskExecutorImpl.java
@@ -49,7 +49,7 @@ public class QueryTaskExecutorImpl implements QueryTaskExecutor, Thread.Uncaught
     public void start() {
         this.stripedThreadPoolExecutor = new StripedThreadPoolExecutor(
                 4,
-                NamedThreadFactory.threadPrefix(nodeName, "calciteQry"),
+                NamedThreadFactory.threadPrefix(nodeName, "sqlExec"),
                 null,
                 false,
                 0
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/RexImpTable.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/RexImpTable.java
index d9a002b..d93f59a 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/RexImpTable.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/RexImpTable.java
@@ -413,17 +413,18 @@ public class RexImpTable {
         map.put(NOT_SIMILAR_TO, NotImplementor.of(similarImplementor));
 
         // POSIX REGEX
-        final MethodImplementor posixRegexImplementor =
-                new MethodImplementor(BuiltInMethod.POSIX_REGEX.method,
-                        NullPolicy.STRICT, false);
+        final MethodImplementor posixRegexImplementorCaseSensitive =
+                new PosixRegexMethodImplementor(true);
+        final MethodImplementor posixRegexImplementorCaseInsensitive =
+                new PosixRegexMethodImplementor(false);
         map.put(SqlStdOperatorTable.POSIX_REGEX_CASE_INSENSITIVE,
-                posixRegexImplementor);
+                posixRegexImplementorCaseInsensitive);
         map.put(SqlStdOperatorTable.POSIX_REGEX_CASE_SENSITIVE,
-                posixRegexImplementor);
+                posixRegexImplementorCaseSensitive);
         map.put(SqlStdOperatorTable.NEGATED_POSIX_REGEX_CASE_INSENSITIVE,
-                NotImplementor.of(posixRegexImplementor));
+                NotImplementor.of(posixRegexImplementorCaseInsensitive));
         map.put(SqlStdOperatorTable.NEGATED_POSIX_REGEX_CASE_SENSITIVE,
-                NotImplementor.of(posixRegexImplementor));
+                NotImplementor.of(posixRegexImplementorCaseSensitive));
         map.put(REGEXP_REPLACE, new RegexpReplaceImplementor());
 
         // Multisets & arrays
@@ -994,6 +995,28 @@ public class RexImpTable {
     }
 
     /**
+     * Implementor for {@link org.apache.calcite.sql.fun.SqlPosixRegexOperator}s.
+     */
+    private static class PosixRegexMethodImplementor extends MethodImplementor {
+        protected final boolean caseSensitive;
+
+        PosixRegexMethodImplementor(boolean caseSensitive) {
+            super(BuiltInMethod.POSIX_REGEX.method, NullPolicy.STRICT, false);
+            this.caseSensitive = caseSensitive;
+        }
+
+        /** {@inheritDoc} */
+        @Override Expression implementSafe(RexToLixTranslator translator,
+                RexCall call, List<Expression> argValueList) {
+            assert argValueList.size() == 2;
+            // Add extra parameter (caseSensitive boolean flag), required by SqlFunctions#posixRegex.
+            final List<Expression> newOperands = new ArrayList<>(argValueList);
+            newOperands.add(Expressions.constant(caseSensitive));
+            return super.implementSafe(translator, call, newOperands);
+        }
+    }
+
+    /**
      * Implementor for JSON_VALUE function, convert to solid format "JSON_VALUE(json_doc, path, empty_behavior, empty_default,
      * error_behavior, error default)" in order to simplify the runtime implementation.
      *
@@ -1708,13 +1731,13 @@ public class RexImpTable {
     private static class NotImplementor extends AbstractRexCallImplementor {
         private AbstractRexCallImplementor implementor;
 
-        private NotImplementor(AbstractRexCallImplementor implementor) {
-            super(null, false);
+        private NotImplementor(NullPolicy nullPolicy, AbstractRexCallImplementor implementor) {
+            super(nullPolicy, false);
             this.implementor = implementor;
         }
 
         static AbstractRexCallImplementor of(AbstractRexCallImplementor implementor) {
-            return new NotImplementor(implementor);
+            return new NotImplementor(implementor.nullPolicy, implementor);
         }
 
         /** {@inheritDoc} */
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/HashAggregateNode.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/HashAggregateNode.java
index 722d2c1..5bef6cd 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/HashAggregateNode.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/HashAggregateNode.java
@@ -150,7 +150,7 @@ public class HashAggregateNode<RowT> extends AbstractNode<RowT> implements Singl
     protected void rewindInternal() {
         requested = 0;
         waiting = 0;
-        groupings.forEach(grouping -> grouping.groups.clear());
+        groupings.forEach(Grouping::reset);
     }
 
     /** {@inheritDoc} */
@@ -237,6 +237,10 @@ public class HashAggregateNode<RowT> extends AbstractNode<RowT> implements Singl
 
             handler = context().rowHandler();
 
+            init();
+        }
+
+        private void init() {
             // Initializes aggregates for case when no any rows will be added into the aggregate to have 0 as result.
             // Doesn't do it for MAP type due to we don't want send from MAP node zero results because it looks redundant.
             if (grpFields.isEmpty() && (type == AggregateType.REDUCE || type == AggregateType.SINGLE)) {
@@ -244,6 +248,12 @@ public class HashAggregateNode<RowT> extends AbstractNode<RowT> implements Singl
             }
         }
 
+        private void reset() {
+            groups.clear();
+
+            init();
+        }
+
         private void add(RowT row) {
             if (type == AggregateType.REDUCE) {
                 addOnReducer(row);
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
index f7861e4..cc35f1d 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
@@ -18,8 +18,17 @@
 package org.apache.ignite.internal.sql.engine.exec.rel;
 
 import com.google.common.collect.ImmutableMap;
+import java.util.ArrayDeque;
+import java.util.Deque;
 import java.util.Iterator;
+import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.LockSupport;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import org.apache.calcite.rel.type.RelDataType;
@@ -32,6 +41,9 @@ import org.apache.ignite.internal.sql.engine.metadata.FragmentDescription;
 import org.apache.ignite.internal.sql.engine.util.BaseQueryContext;
 import org.apache.ignite.internal.testframework.IgniteAbstractTest;
 import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.thread.StripedThreadPoolExecutor;
+import org.apache.ignite.internal.util.Pair;
 import org.jetbrains.annotations.NotNull;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -45,6 +57,8 @@ public class AbstractExecutionTest extends IgniteAbstractTest {
 
     private QueryTaskExecutorImpl taskExecutor;
 
+    private List<UUID> nodes;
+
     @BeforeEach
     public void beforeTest() {
         taskExecutor = new QueryTaskExecutorImpl("no_node");
@@ -65,6 +79,19 @@ public class AbstractExecutionTest extends IgniteAbstractTest {
     }
 
     protected ExecutionContext<Object[]> executionContext() {
+        return executionContext(false);
+    }
+
+    protected ExecutionContext<Object[]> executionContext(boolean withDelays) {
+        if (withDelays) {
+            StripedThreadPoolExecutor testExecutor = new IgniteTestStripedThreadPoolExecutor(8,
+                    NamedThreadFactory.threadPrefix("fake-test-node", "sqlTestExec"),
+                    null,
+                    false,
+                    0);
+            IgniteTestUtils.setFieldValue(taskExecutor, "stripedThreadPoolExecutor", testExecutor);
+        }
+
         FragmentDescription fragmentDesc = new FragmentDescription(0, null, null, null);
         return new ExecutionContext<>(
                 BaseQueryContext.builder()
@@ -90,6 +117,80 @@ public class AbstractExecutionTest extends IgniteAbstractTest {
         return fields;
     }
 
+    /** Task reordering executor. */
+    private static class IgniteTestStripedThreadPoolExecutor extends StripedThreadPoolExecutor {
+        final Deque<Pair<Runnable, Integer>> tasks = new ArrayDeque<>();
+
+        /** Internal stop flag. */
+        AtomicBoolean stop = new AtomicBoolean();
+
+        /** Inner execution service. */
+        ExecutorService exec = Executors.newWorkStealingPool();
+
+        CompletableFuture fut;
+
+        /** {@inheritDoc} */
+        public IgniteTestStripedThreadPoolExecutor(
+                int concurrentLvl,
+                String threadNamePrefix,
+                Thread.UncaughtExceptionHandler exHnd,
+                boolean allowCoreThreadTimeOut,
+                long keepAliveTime
+        ) {
+            super(concurrentLvl, threadNamePrefix, exHnd, allowCoreThreadTimeOut, keepAliveTime);
+
+            fut = IgniteTestUtils.runAsync(() -> {
+                while (!stop.get()) {
+                    synchronized (tasks) {
+                        while (tasks.isEmpty()) {
+                            try {
+                                tasks.wait();
+                            } catch (InterruptedException e) {
+                                // no op.
+                            }
+                        }
+
+                        Pair<Runnable, Integer> r = tasks.pollFirst();
+
+                        exec.execute(() -> {
+                            LockSupport.parkNanos(ThreadLocalRandom.current().nextLong(0, 10_000));
+                            super.execute(r.getFirst(), r.getSecond());
+                        });
+
+                        tasks.notifyAll();
+                    }
+                }
+            });
+        }
+
+        /** {@inheritDoc} */
+        @Override public void execute(Runnable task, int idx) {
+            synchronized (tasks) {
+                tasks.add(new Pair<>(task, idx));
+
+                tasks.notifyAll();
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public void shutdown() {
+            stop.set(true);
+
+            fut.cancel(true);
+
+            super.shutdown();
+        }
+
+        /** {@inheritDoc} */
+        @Override public List<Runnable> shutdownNow() {
+            stop.set(true);
+
+            fut.cancel(true);
+
+            return super.shutdownNow();
+        }
+    }
+
     /**
      * TestTable.
      * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
@@ -155,7 +256,7 @@ public class AbstractExecutionTest extends IgniteAbstractTest {
         @NotNull
         @Override
         public Iterator<Object[]> iterator() {
-            return new Iterator<Object[]>() {
+            return new Iterator<>() {
                 private int curRow;
 
                 @Override
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/BaseAggregateTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/BaseAggregateTest.java
index 8c945c7..5f0977e 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/BaseAggregateTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/BaseAggregateTest.java
@@ -59,7 +59,7 @@ public abstract class BaseAggregateTest extends AbstractExecutionTest {
     @ParameterizedTest
     @EnumSource
     public void count(TestAggregateType testAgg) {
-        ExecutionContext<Object[]> ctx = executionContext();
+        ExecutionContext<Object[]> ctx = executionContext(true);
         IgniteTypeFactory tf = ctx.getTypeFactory();
         RelDataType rowType = TypeUtils.createRowType(tf, int.class, int.class);
         ScanNode<Object[]> scan = new ScanNode<>(ctx, rowType, Arrays.asList(
@@ -76,6 +76,7 @@ public abstract class BaseAggregateTest extends AbstractExecutionTest {
                 false,
                 ImmutableIntList.of(),
                 -1,
+                null,
                 RelCollations.EMPTY,
                 tf.createJavaType(int.class),
                 null);
@@ -320,7 +321,12 @@ public abstract class BaseAggregateTest extends AbstractExecutionTest {
      * @param mustFail  {@code true} If expression must throw exception.
      **/
     @SuppressWarnings("ThrowableNotThrown")
-    public void singleAggr(TestAggregateType testAgg, List<Object[]> scanInput, Object[] output, boolean mustFail) {
+    public void singleAggr(
+            TestAggregateType testAgg,
+            List<Object[]> scanInput,
+            Object[] output,
+            boolean mustFail
+    ) {
         ExecutionContext<Object[]> ctx = executionContext();
         IgniteTypeFactory tf = ctx.getTypeFactory();
         RelDataType rowType = TypeUtils.createRowType(tf, int.class, int.class);
@@ -505,7 +511,7 @@ public abstract class BaseAggregateTest extends AbstractExecutionTest {
         }
     }
 
-    private SingleNode<Object[]> createAggregateNodesChain(
+    protected SingleNode<Object[]> createAggregateNodesChain(
             TestAggregateType testAgg,
             ExecutionContext<Object[]> ctx,
             List<ImmutableBitSet> grpSets,
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExecutionTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExecutionTest.java
index 073dd45..d58d10f 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExecutionTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExecutionTest.java
@@ -64,7 +64,7 @@ public class ExecutionTest extends AbstractExecutionTest {
         // ON P.ID = PR.RESP_ID
         // WHERE P.ID >= 2
 
-        ExecutionContext<Object[]> ctx = executionContext();
+        ExecutionContext<Object[]> ctx = executionContext(true);
         IgniteTypeFactory tf = ctx.getTypeFactory();
         RelDataType rowType = TypeUtils.createRowType(tf, int.class, String.class, String.class);
 
@@ -119,7 +119,7 @@ public class ExecutionTest extends AbstractExecutionTest {
 
     @Test
     public void testUnionAll() {
-        ExecutionContext<Object[]> ctx = executionContext();
+        ExecutionContext<Object[]> ctx = executionContext(true);
         IgniteTypeFactory tf = ctx.getTypeFactory();
         RelDataType rowType = TypeUtils.createRowType(tf, String.class, int.class);
 
@@ -168,7 +168,7 @@ public class ExecutionTest extends AbstractExecutionTest {
         // left join dep d
         //        on e.depno = d.depno
 
-        ExecutionContext<Object[]> ctx = executionContext();
+        ExecutionContext<Object[]> ctx = executionContext(true);
 
         IgniteTypeFactory tf = ctx.getTypeFactory();
         RelDataType rowType = TypeUtils.createRowType(tf, int.class, String.class, Integer.class);
@@ -227,7 +227,7 @@ public class ExecutionTest extends AbstractExecutionTest {
         // right join emp e
         //         on e.depno = d.depno
 
-        ExecutionContext<Object[]> ctx = executionContext();
+        ExecutionContext<Object[]> ctx = executionContext(true);
         IgniteTypeFactory tf = ctx.getTypeFactory();
         RelDataType rowType = TypeUtils.createRowType(tf, int.class, String.class, Integer.class);
 
@@ -286,7 +286,7 @@ public class ExecutionTest extends AbstractExecutionTest {
         // full outer join dep d
         //              on e.depno = d.depno
 
-        ExecutionContext<Object[]> ctx = executionContext();
+        ExecutionContext<Object[]> ctx = executionContext(true);
         IgniteTypeFactory tf = ctx.getTypeFactory();
         RelDataType rowType = TypeUtils.createRowType(tf, int.class, String.class, Integer.class);
 
@@ -346,7 +346,7 @@ public class ExecutionTest extends AbstractExecutionTest {
         // semi join emp e
         //        on e.depno = d.depno
 
-        ExecutionContext<Object[]> ctx = executionContext();
+        ExecutionContext<Object[]> ctx = executionContext(true);
         IgniteTypeFactory tf = ctx.getTypeFactory();
         RelDataType rowType = TypeUtils.createRowType(tf, int.class, String.class, Integer.class);
 
@@ -402,7 +402,7 @@ public class ExecutionTest extends AbstractExecutionTest {
         // anti join emp e
         //        on e.depno = d.depno
 
-        ExecutionContext<Object[]> ctx = executionContext();
+        ExecutionContext<Object[]> ctx = executionContext(true);
         IgniteTypeFactory tf = ctx.getTypeFactory();
         RelDataType rowType = TypeUtils.createRowType(tf, int.class, String.class, Integer.class);
 
@@ -457,7 +457,7 @@ public class ExecutionTest extends AbstractExecutionTest {
     @ParameterizedTest
     @MethodSource("provideArgumentsForCnlJtest")
     public void testCorrelatedNestedLoopJoin(int leftSize, int rightSize, int rightBufSize, JoinRelType joinType) {
-        ExecutionContext<Object[]> ctx = executionContext();
+        ExecutionContext<Object[]> ctx = executionContext(true);
         IgniteTypeFactory tf = ctx.getTypeFactory();
         RelDataType rowType = TypeUtils.createRowType(tf, int.class, String.class, int.class);
 
@@ -502,7 +502,7 @@ public class ExecutionTest extends AbstractExecutionTest {
 
     @Test
     public void testMergeJoin() {
-        ExecutionContext<Object[]> ctx = executionContext();
+        ExecutionContext<Object[]> ctx = executionContext(true);
         IgniteTypeFactory tf = ctx.getTypeFactory();
         RelDataType rowType = TypeUtils.createRowType(tf, int.class, String.class, int.class);
 
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/HashAggregateExecutionTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/HashAggregateExecutionTest.java
index c3ba33e..f2f2066 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/HashAggregateExecutionTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/HashAggregateExecutionTest.java
@@ -21,19 +21,29 @@ import static org.apache.ignite.internal.sql.engine.exec.exp.agg.AggregateType.M
 import static org.apache.ignite.internal.sql.engine.exec.exp.agg.AggregateType.REDUCE;
 import static org.apache.ignite.internal.sql.engine.exec.exp.agg.AggregateType.SINGLE;
 import static org.apache.ignite.internal.util.CollectionUtils.first;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
+import com.google.common.collect.ImmutableList;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
-import java.util.stream.Collectors;
+import java.util.Objects;
 import java.util.stream.IntStream;
 import org.apache.calcite.rel.RelCollation;
 import org.apache.calcite.rel.RelCollations;
 import org.apache.calcite.rel.core.AggregateCall;
 import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.calcite.util.ImmutableIntList;
 import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
 import org.apache.ignite.internal.sql.engine.exec.RowHandler;
+import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
+import org.apache.ignite.internal.sql.engine.util.TypeUtils;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
 
 /**
  * HashAggregateExecutionTest.
@@ -64,13 +74,7 @@ public class HashAggregateExecutionTest extends BaseAggregateTest {
 
         agg.register(scan);
 
-        // Collation of the first fields emulates planner behavior:
-        // The group's keys placed on the begin of the output row.
-        RelCollation collation = RelCollations.of(
-                ImmutableIntList.copyOf(
-                        IntStream.range(0, first(grpSets).cardinality()).boxed().collect(Collectors.toList())
-                )
-        );
+        RelCollation collation = createOutCollation(grpSets);
 
         Comparator<Object[]> cmp = ctx.expressionFactory().comparator(collation);
 
@@ -82,6 +86,21 @@ public class HashAggregateExecutionTest extends BaseAggregateTest {
         return sort;
     }
 
+    private RelCollation createOutCollation(List<ImmutableBitSet> grpSets) {
+        RelCollation collation;
+
+        if (!grpSets.isEmpty() && grpSets.stream().anyMatch(set -> !set.isEmpty())) {
+            // Sort by group to simplify compare results with expected results.
+            collation = RelCollations.of(
+                    ImmutableIntList.of(IntStream.range(0, Objects.requireNonNull(first(grpSets)).cardinality()).toArray()));
+        } else {
+            // Sort for the first column if there are no groups.
+            collation = RelCollations.of(0);
+        }
+
+        return collation;
+    }
+
     /** {@inheritDoc} */
     @Override
     protected SingleNode<Object[]> createMapReduceAggregateNodesChain(
@@ -117,13 +136,7 @@ public class HashAggregateExecutionTest extends BaseAggregateTest {
 
         aggRdc.register(aggMap);
 
-        // Collation of the first fields emulates planner behavior:
-        // The group's keys placed on the begin of the output row.
-        RelCollation collation = RelCollations.of(
-                ImmutableIntList.copyOf(
-                        IntStream.range(0, first(grpSets).cardinality()).boxed().collect(Collectors.toList())
-                )
-        );
+        RelCollation collation = createOutCollation(grpSets);
 
         Comparator<Object[]> cmp = ctx.expressionFactory().comparator(collation);
 
@@ -134,4 +147,62 @@ public class HashAggregateExecutionTest extends BaseAggregateTest {
 
         return sort;
     }
+
+
+    /**
+     * Test verifies that after rewind all groups are properly initialized.
+     */
+    @ParameterizedTest
+    @EnumSource
+    public void countOfEmptyWithRewind(TestAggregateType testAgg) {
+        ExecutionContext<Object[]> ctx = executionContext();
+        IgniteTypeFactory tf = ctx.getTypeFactory();
+        RelDataType rowType = TypeUtils.createRowType(tf, int.class, int.class);
+        ScanNode<Object[]> scan = new ScanNode<>(ctx, rowType, Collections.emptyList());
+
+        AggregateCall call = AggregateCall.create(
+                SqlStdOperatorTable.COUNT,
+                false,
+                false,
+                false,
+                ImmutableIntList.of(),
+                -1,
+                null,
+                RelCollations.EMPTY,
+                tf.createJavaType(int.class),
+                null
+        );
+
+        ImmutableList<ImmutableBitSet> grpSets = ImmutableList.of(ImmutableBitSet.of());
+
+        RelDataType aggRowType = TypeUtils.createRowType(tf, int.class);
+
+        SingleNode<Object[]> aggChain = createAggregateNodesChain(
+                testAgg,
+                ctx,
+                grpSets,
+                call,
+                rowType,
+                aggRowType,
+                rowFactory(),
+                scan
+        );
+
+        for (int i = 0; i < 2; i++) {
+            RootNode<Object[]> root = new RootNode<>(ctx, aggRowType) {
+                /** {@inheritDoc} */
+                @Override public void close() {
+                    // NO-OP
+                }
+            };
+
+            root.register(aggChain);
+
+            assertTrue(root.hasNext());
+            assertArrayEquals(row(0), root.next());
+            assertFalse(root.hasNext());
+
+            aggChain.rewind();
+        }
+    }
 }
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/HashAggregateSingleGroupExecutionTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/HashAggregateSingleGroupExecutionTest.java
index 3f3aa51..b1d2f29 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/HashAggregateSingleGroupExecutionTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/HashAggregateSingleGroupExecutionTest.java
@@ -66,6 +66,7 @@ public class HashAggregateSingleGroupExecutionTest extends AbstractExecutionTest
                 false,
                 ImmutableIntList.of(1),
                 -1,
+                null,
                 RelCollations.EMPTY,
                 tf.createJavaType(double.class),
                 null);
@@ -447,6 +448,7 @@ public class HashAggregateSingleGroupExecutionTest extends AbstractExecutionTest
                 false,
                 ImmutableIntList.of(),
                 -1,
+                null,
                 RelCollations.EMPTY,
                 tf.createJavaType(int.class),
                 null);
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/HashIndexSpoolExecutionTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/HashIndexSpoolExecutionTest.java
index cc86cdc..00a9338 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/HashIndexSpoolExecutionTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/HashIndexSpoolExecutionTest.java
@@ -39,7 +39,7 @@ import org.junit.jupiter.api.Test;
 public class HashIndexSpoolExecutionTest extends AbstractExecutionTest {
     @Test
     public void testIndexSpool() {
-        ExecutionContext<Object[]> ctx = executionContext();
+        ExecutionContext<Object[]> ctx = executionContext(true);
         IgniteTypeFactory tf = ctx.getTypeFactory();
         RelDataType rowType = TypeUtils.createRowType(tf, int.class, String.class, int.class);
 
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/MergeJoinExecutionTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/MergeJoinExecutionTest.java
index e550dde..bf3ddd5 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/MergeJoinExecutionTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/MergeJoinExecutionTest.java
@@ -306,7 +306,7 @@ public class MergeJoinExecutionTest extends AbstractExecutionTest {
      * @param expRes   Expected result.
      */
     private void verifyJoin(Object[][] left, Object[][] right, JoinRelType joinType, Object[][] expRes) {
-        ExecutionContext<Object[]> ctx = executionContext();
+        ExecutionContext<Object[]> ctx = executionContext(true);
 
         RelDataType leftType = TypeUtils.createRowType(ctx.getTypeFactory(), int.class, String.class, Integer.class);
         ScanNode<Object[]> leftNode = new ScanNode<>(ctx, leftType, Arrays.asList(left));
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/NestedLoopJoinExecutionTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/NestedLoopJoinExecutionTest.java
index 0d48aca..da50810 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/NestedLoopJoinExecutionTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/NestedLoopJoinExecutionTest.java
@@ -307,7 +307,7 @@ public class NestedLoopJoinExecutionTest extends AbstractExecutionTest {
      * @param expRes   Expected result.
      */
     private void verifyJoin(Object[][] left, Object[][] right, JoinRelType joinType, Object[][] expRes) {
-        ExecutionContext<Object[]> ctx = executionContext();
+        ExecutionContext<Object[]> ctx = executionContext(true);
 
         RelDataType leftType = TypeUtils.createRowType(ctx.getTypeFactory(), int.class, String.class, Integer.class);
         ScanNode<Object[]> leftNode = new ScanNode<>(ctx, leftType, Arrays.asList(left));
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableSpoolExecutionTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableSpoolExecutionTest.java
index e5960b5..837c1c3 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableSpoolExecutionTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableSpoolExecutionTest.java
@@ -71,18 +71,14 @@ public class TableSpoolExecutionTest extends AbstractExecutionTest {
 
             AtomicReference<Iterator<Object[]>> itRef = new AtomicReference<>();
 
-            ScanNode<Object[]> scan = new ScanNode<>(ctx, rowType, new Iterable<Object[]>() {
-                @NotNull
-                @Override
-                public Iterator<Object[]> iterator() {
-                    if (itRef.get() != null) {
-                        throw new AssertionError();
-                    }
+            ScanNode<Object[]> scan = new ScanNode<>(ctx, rowType, () -> {
+                if (itRef.get() != null) {
+                    throw new AssertionError();
+                }
 
-                    itRef.set(IntStream.range(0, size).boxed().map(i -> new Object[]{i}).iterator());
+                itRef.set(IntStream.range(0, size).boxed().map(i -> new Object[]{i}).iterator());
 
-                    return itRef.get();
-                }
+                return itRef.get();
             });
 
             TableSpoolNode<Object[]> spool = new TableSpoolNode<>(ctx, rowType, false);
@@ -104,8 +100,7 @@ public class TableSpoolExecutionTest extends AbstractExecutionTest {
      * CheckTableSpool.
      * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
      */
-    public void checkTableSpool(
-            BiFunction<ExecutionContext<Object[]>, RelDataType, TableSpoolNode<Object[]>> spoolFactory) {
+    public void checkTableSpool(BiFunction<ExecutionContext<Object[]>, RelDataType, TableSpoolNode<Object[]>> spoolFactory) {
         ExecutionContext<Object[]> ctx = executionContext();
         IgniteTypeFactory tf = ctx.getTypeFactory();
         RelDataType rowType = TypeUtils.createRowType(tf, int.class, String.class, int.class);
@@ -113,7 +108,7 @@ public class TableSpoolExecutionTest extends AbstractExecutionTest {
         int inBufSize = Commons.IN_BUFFER_SIZE;
 
         int[] sizes = {1, inBufSize / 2 - 1, inBufSize / 2, inBufSize / 2 + 1, inBufSize, inBufSize + 1, inBufSize * 4};
-        //        int[] sizes = {inBufSize * 4};
+
         int rewindCnts = 32;
 
         for (int size : sizes) {
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/TableSpoolPlannerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/TableSpoolPlannerTest.java
index c739c53..75ab749 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/TableSpoolPlannerTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/TableSpoolPlannerTest.java
@@ -28,7 +28,6 @@ import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution;
 import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
 import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
 import org.apache.ignite.internal.sql.engine.type.IgniteTypeSystem;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 
 /**
@@ -89,63 +88,4 @@ public class TableSpoolPlannerTest extends AbstractPlannerTest {
 
         assertNotNull(tblSpool, "Invalid plan:\n" + RelOptUtil.toString(phys));
     }
-
-    /**
-     * TableSpoolBroadcastNotRewindable.
-     * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
-     *
-     * @throws Exception If failed.
-     */
-    @Test
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-15235")
-    public void tableSpoolBroadcastNotRewindable() throws Exception {
-        IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
-
-        TestTable t0 = new TestTable(
-                new RelDataTypeFactory.Builder(f)
-                        .add("ID", f.createJavaType(Integer.class))
-                        .add("JID", f.createJavaType(Integer.class))
-                        .add("VAL", f.createJavaType(String.class))
-                        .build()
-                ) {
-
-            @Override
-            public IgniteDistribution distribution() {
-                return IgniteDistributions.broadcast();
-            }
-        };
-
-        TestTable t1 = new TestTable(
-                new RelDataTypeFactory.Builder(f)
-                        .add("ID", f.createJavaType(Integer.class))
-                        .add("JID", f.createJavaType(Integer.class))
-                        .add("VAL", f.createJavaType(String.class))
-                        .build()
-                ) {
-
-            @Override
-            public IgniteDistribution distribution() {
-                return IgniteDistributions.broadcast();
-            }
-        };
-
-        IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
-
-        publicSchema.addTable("T0", t0);
-        publicSchema.addTable("T1", t1);
-
-        String sql = "select * "
-                + "from t0 "
-                + "join t1 on t0.jid = t1.jid";
-
-        RelNode phys = physicalPlan(sql, publicSchema,
-                "MergeJoinConverter", "NestedLoopJoinConverter",
-                "FilterSpoolMergeToHashIndexSpoolRule", "FilterSpoolMergeToSortIndexSpoolRule");
-
-        assertNotNull(phys);
-
-        IgniteTableSpool tblSpool = findFirstNode(phys, byClass(IgniteTableSpool.class));
-
-        assertNotNull(tblSpool, "Invalid plan:\n" + RelOptUtil.toString(phys));
-    }
 }