You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2022/12/27 19:56:22 UTC

[GitHub] [ignite-3] korlov42 commented on a diff in pull request #1469: IGNITE-18227: refactoring scan nodes and add support RO index scans.

korlov42 commented on code in PR #1469:
URL: https://github.com/apache/ignite-3/pull/1469#discussion_r1057867902


##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java:
##########
@@ -343,37 +346,59 @@ public void checkTransactionsWithDml() throws Exception {
         assertEquals(txManagerInternal.finished(), states.size());
     }
 
-    /** Check correctness of rw and ro transactions. */
+    /** Check correctness of rw and ro transactions for index scan. */
     @Test
-    public void checkMixedTransactions() throws Exception {
+    public void checkMixedTransactionsForIndex() throws Exception {
+        sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
+
+        Matcher<String> planMatcher = containsTableScan("PUBLIC", "TEST");
+
+        checkMixedTransactions(planMatcher);
+    }
+
+
+    /** Check correctness of rw and ro transactions for table scan. */
+    @Test
+    public void checkMixedTransactionsForTable() throws Exception {
+        sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
+        sql("CREATE INDEX TEST_IDX ON TEST(VAL0)");
+
+        Matcher<String> planMatcher = containsIndexScan("PUBLIC", "TEST", "TEST_IDX");
+
+        checkMixedTransactions(planMatcher);
+    }
+
+    private void checkMixedTransactions(Matcher<String> planMatcher) throws Exception {
         IgniteSql sql = igniteSql();
 
         if (sql instanceof ClientSql) {
             return;
         }
 
-        sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
-
         Session ses = sql.createSession();
 
         for (int i = 0; i < ROW_COUNT; ++i) {
             sql("INSERT INTO TEST VALUES (?, ?)", i, i);
         }
 
-        checkTx(ses, true, false, true);
-        checkTx(ses, true, false, false);
-        checkTx(ses, true, true, true);
-        checkTx(ses, true, true, false);
-        checkTx(ses, false, true, true);
-        checkTx(ses, false, true, false);
-        checkTx(ses, false, false, true);
-        checkTx(ses, false, false, false);
+        checkTx(ses, true, false, true, planMatcher);
+        checkTx(ses, true, false, false, planMatcher);
+        checkTx(ses, true, true, true, planMatcher);
+        checkTx(ses, true, true, false, planMatcher);
+        checkTx(ses, false, true, true, planMatcher);
+        checkTx(ses, false, true, false, planMatcher);
+        checkTx(ses, false, false, true, planMatcher);
+        checkTx(ses, false, false, false, planMatcher);
     }
 
-    private void checkTx(Session ses, boolean readOnly, boolean commit, boolean explicit) throws Exception {
+    private void checkTx(Session ses, boolean readOnly, boolean commit, boolean explicit, Matcher<String> planMatcher) throws Exception {
         Transaction outerTx = explicit ? (readOnly ? igniteTx().readOnly().begin() : igniteTx().begin()) : null;
 
-        AsyncResultSet rs = ses.executeAsync(outerTx, "SELECT VAL0 FROM TEST ORDER BY VAL0").get();
+        String query = "SELECT VAL0 FROM TEST ORDER BY VAL0";
+
+        assertQuery(query).matches(planMatcher).check();

Review Comment:
   `assertQuery(query).matches(planMatcher).check();` and `ses.executeAsync(outerTx, query).get();` return not quite consistent results. The former still uses `queryAsync` under the hood, whereas the latter uses `querySingleAsync`. I think, we need o update QueryChecker. Besides, `outerTx` should be passed to the checker in order to provided valid plan



##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/StorageScanNode.java:
##########
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec.rel;
+
+import java.util.BitSet;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.Flow;
+import java.util.concurrent.Flow.Publisher;
+import java.util.concurrent.Flow.Subscriber;
+import java.util.concurrent.Flow.Subscription;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
+import org.apache.ignite.internal.sql.engine.exec.RowHandler;
+import org.apache.ignite.internal.sql.engine.schema.InternalIgniteTable;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Based abstract scan node.

Review Comment:
   Could you please add more details to the javadoc? I would shed some light on what ScanNode is for, and what logic it encapsulates



##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/StorageScanNode.java:
##########
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec.rel;
+
+import java.util.BitSet;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.Flow;
+import java.util.concurrent.Flow.Publisher;
+import java.util.concurrent.Flow.Subscriber;
+import java.util.concurrent.Flow.Subscription;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
+import org.apache.ignite.internal.sql.engine.exec.RowHandler;
+import org.apache.ignite.internal.sql.engine.schema.InternalIgniteTable;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Based abstract scan node.
+ */
+public abstract class StorageScanNode<RowT> extends AbstractNode<RowT> {
+    /** Special value to highlights that all row were received and we are not waiting any more. */
+    private static final int NOT_WAITING = -1;
+
+    private final Queue<RowT> inBuff = new LinkedBlockingQueue<>(inBufSize);
+
+    private final @Nullable Predicate<RowT> filters;
+
+    private final @Nullable Function<RowT, RowT> rowTransformer;
+
+    private final Function<BinaryRow, RowT> tableRowConverter;
+
+    private int requested;
+
+    private int waiting;
+
+    private boolean inLoop;
+
+    private Subscription activeSubscription;
+
+    /**
+     * Constructor.
+     *
+     * @param ctx Execution context.
+     * @param rowFactory Row factory.
+     * @param schemaTable The table this node should scan.
+     * @param filters Optional filter to filter out rows.
+     * @param rowTransformer Optional projection function.
+     */
+    public StorageScanNode(
+            ExecutionContext<RowT> ctx,
+            RowHandler.RowFactory<RowT> rowFactory,
+            InternalIgniteTable schemaTable,
+            @Nullable Predicate<RowT> filters,
+            @Nullable Function<RowT, RowT> rowTransformer,
+            @Nullable BitSet requiredColumns
+    ) {
+        super(ctx);
+
+        assert context().transaction() != null || context().transactionTime() != null : "Transaction not initialized.";
+
+        tableRowConverter = row -> schemaTable.toRow(context(), row, rowFactory, requiredColumns);
+
+        this.filters = filters;
+        this.rowTransformer = rowTransformer;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void request(int rowsCnt) throws Exception {
+        assert rowsCnt > 0 && requested == 0 : "rowsCnt=" + rowsCnt + ", requested=" + requested;
+
+        checkState();
+
+        requested = rowsCnt;
+
+        if (!inLoop) {
+            context().execute(this::push, this::onError);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void closeInternal() {
+        super.closeInternal();
+
+        if (activeSubscription != null) {
+            activeSubscription.cancel();
+
+            activeSubscription = null;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    protected void rewindInternal() {
+        requested = 0;
+        waiting = 0;
+
+        if (activeSubscription != null) {
+            activeSubscription.cancel();
+
+            activeSubscription = null;
+        }
+    }
+
+    /**
+     *  Publisher of scan node.
+     *
+     *  @return Publisher of scan node or {@code null} in case nothing to scan.
+     */
+    protected abstract Publisher<RowT> scan();
+
+    /**
+     * Proxy publisher with singe goal convert rows from {@code BinaryRow} to {@code RowT}.
+     *
+     * @param pub {@code BinaryRow} Publisher.
+     *
+     * @return Proxy publisher with conversion from {@code BinaryRow} to {@code RowT}.
+     */
+    @NotNull

Review Comment:
   as far as I know, we are not using `@NotNull`



##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/StorageScanNode.java:
##########
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec.rel;
+
+import java.util.BitSet;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.Flow;
+import java.util.concurrent.Flow.Publisher;
+import java.util.concurrent.Flow.Subscriber;
+import java.util.concurrent.Flow.Subscription;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
+import org.apache.ignite.internal.sql.engine.exec.RowHandler;
+import org.apache.ignite.internal.sql.engine.schema.InternalIgniteTable;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Based abstract scan node.
+ */
+public abstract class StorageScanNode<RowT> extends AbstractNode<RowT> {
+    /** Special value to highlights that all row were received and we are not waiting any more. */
+    private static final int NOT_WAITING = -1;
+
+    private final Queue<RowT> inBuff = new LinkedBlockingQueue<>(inBufSize);
+
+    private final @Nullable Predicate<RowT> filters;
+
+    private final @Nullable Function<RowT, RowT> rowTransformer;
+
+    private final Function<BinaryRow, RowT> tableRowConverter;
+
+    private int requested;
+
+    private int waiting;
+
+    private boolean inLoop;
+
+    private Subscription activeSubscription;
+
+    /**
+     * Constructor.
+     *
+     * @param ctx Execution context.
+     * @param rowFactory Row factory.
+     * @param schemaTable The table this node should scan.
+     * @param filters Optional filter to filter out rows.
+     * @param rowTransformer Optional projection function.
+     */
+    public StorageScanNode(
+            ExecutionContext<RowT> ctx,
+            RowHandler.RowFactory<RowT> rowFactory,
+            InternalIgniteTable schemaTable,
+            @Nullable Predicate<RowT> filters,
+            @Nullable Function<RowT, RowT> rowTransformer,
+            @Nullable BitSet requiredColumns
+    ) {
+        super(ctx);
+
+        assert context().transaction() != null || context().transactionTime() != null : "Transaction not initialized.";
+
+        tableRowConverter = row -> schemaTable.toRow(context(), row, rowFactory, requiredColumns);
+
+        this.filters = filters;
+        this.rowTransformer = rowTransformer;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void request(int rowsCnt) throws Exception {
+        assert rowsCnt > 0 && requested == 0 : "rowsCnt=" + rowsCnt + ", requested=" + requested;
+
+        checkState();
+
+        requested = rowsCnt;
+
+        if (!inLoop) {
+            context().execute(this::push, this::onError);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void closeInternal() {
+        super.closeInternal();
+
+        if (activeSubscription != null) {
+            activeSubscription.cancel();
+
+            activeSubscription = null;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    protected void rewindInternal() {
+        requested = 0;
+        waiting = 0;
+
+        if (activeSubscription != null) {
+            activeSubscription.cancel();
+
+            activeSubscription = null;
+        }
+    }
+
+    /**
+     *  Publisher of scan node.
+     *
+     *  @return Publisher of scan node or {@code null} in case nothing to scan.
+     */
+    protected abstract Publisher<RowT> scan();
+
+    /**
+     * Proxy publisher with singe goal convert rows from {@code BinaryRow} to {@code RowT}.
+     *
+     * @param pub {@code BinaryRow} Publisher.
+     *
+     * @return Proxy publisher with conversion from {@code BinaryRow} to {@code RowT}.
+     */
+    @NotNull
+    public Publisher<RowT> convertPublisher(Publisher<BinaryRow> pub) {

Review Comment:
   do we really need to have this method `public`?



##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/StorageScanNode.java:
##########
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec.rel;
+
+import java.util.BitSet;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.Flow;
+import java.util.concurrent.Flow.Publisher;
+import java.util.concurrent.Flow.Subscriber;
+import java.util.concurrent.Flow.Subscription;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
+import org.apache.ignite.internal.sql.engine.exec.RowHandler;
+import org.apache.ignite.internal.sql.engine.schema.InternalIgniteTable;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Based abstract scan node.
+ */
+public abstract class StorageScanNode<RowT> extends AbstractNode<RowT> {
+    /** Special value to highlights that all row were received and we are not waiting any more. */
+    private static final int NOT_WAITING = -1;
+
+    private final Queue<RowT> inBuff = new LinkedBlockingQueue<>(inBufSize);
+
+    private final @Nullable Predicate<RowT> filters;
+
+    private final @Nullable Function<RowT, RowT> rowTransformer;
+
+    private final Function<BinaryRow, RowT> tableRowConverter;
+
+    private int requested;
+
+    private int waiting;
+
+    private boolean inLoop;
+
+    private Subscription activeSubscription;
+
+    /**
+     * Constructor.
+     *
+     * @param ctx Execution context.
+     * @param rowFactory Row factory.
+     * @param schemaTable The table this node should scan.
+     * @param filters Optional filter to filter out rows.
+     * @param rowTransformer Optional projection function.
+     */
+    public StorageScanNode(
+            ExecutionContext<RowT> ctx,
+            RowHandler.RowFactory<RowT> rowFactory,
+            InternalIgniteTable schemaTable,
+            @Nullable Predicate<RowT> filters,
+            @Nullable Function<RowT, RowT> rowTransformer,
+            @Nullable BitSet requiredColumns

Review Comment:
   param description is missing



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java:
##########
@@ -343,37 +346,59 @@ public void checkTransactionsWithDml() throws Exception {
         assertEquals(txManagerInternal.finished(), states.size());
     }
 
-    /** Check correctness of rw and ro transactions. */
+    /** Check correctness of rw and ro transactions for index scan. */
     @Test
-    public void checkMixedTransactions() throws Exception {
+    public void checkMixedTransactionsForIndex() throws Exception {
+        sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
+
+        Matcher<String> planMatcher = containsTableScan("PUBLIC", "TEST");
+
+        checkMixedTransactions(planMatcher);
+    }
+
+
+    /** Check correctness of rw and ro transactions for table scan. */
+    @Test
+    public void checkMixedTransactionsForTable() throws Exception {

Review Comment:
   looks like the name of this test and one above should be switched



##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/StorageScanNode.java:
##########
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec.rel;
+
+import java.util.BitSet;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.Flow;
+import java.util.concurrent.Flow.Publisher;
+import java.util.concurrent.Flow.Subscriber;
+import java.util.concurrent.Flow.Subscription;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
+import org.apache.ignite.internal.sql.engine.exec.RowHandler;
+import org.apache.ignite.internal.sql.engine.schema.InternalIgniteTable;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Based abstract scan node.
+ */
+public abstract class StorageScanNode<RowT> extends AbstractNode<RowT> {
+    /** Special value to highlights that all row were received and we are not waiting any more. */
+    private static final int NOT_WAITING = -1;
+
+    private final Queue<RowT> inBuff = new LinkedBlockingQueue<>(inBufSize);
+
+    private final @Nullable Predicate<RowT> filters;
+
+    private final @Nullable Function<RowT, RowT> rowTransformer;
+
+    private final Function<BinaryRow, RowT> tableRowConverter;
+
+    private int requested;
+
+    private int waiting;
+
+    private boolean inLoop;
+
+    private Subscription activeSubscription;
+
+    /**
+     * Constructor.
+     *
+     * @param ctx Execution context.
+     * @param rowFactory Row factory.
+     * @param schemaTable The table this node should scan.
+     * @param filters Optional filter to filter out rows.
+     * @param rowTransformer Optional projection function.
+     */
+    public StorageScanNode(
+            ExecutionContext<RowT> ctx,
+            RowHandler.RowFactory<RowT> rowFactory,
+            InternalIgniteTable schemaTable,
+            @Nullable Predicate<RowT> filters,
+            @Nullable Function<RowT, RowT> rowTransformer,
+            @Nullable BitSet requiredColumns
+    ) {
+        super(ctx);
+
+        assert context().transaction() != null || context().transactionTime() != null : "Transaction not initialized.";
+
+        tableRowConverter = row -> schemaTable.toRow(context(), row, rowFactory, requiredColumns);
+
+        this.filters = filters;
+        this.rowTransformer = rowTransformer;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void request(int rowsCnt) throws Exception {
+        assert rowsCnt > 0 && requested == 0 : "rowsCnt=" + rowsCnt + ", requested=" + requested;
+
+        checkState();
+
+        requested = rowsCnt;
+
+        if (!inLoop) {
+            context().execute(this::push, this::onError);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void closeInternal() {
+        super.closeInternal();
+
+        if (activeSubscription != null) {
+            activeSubscription.cancel();
+
+            activeSubscription = null;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    protected void rewindInternal() {
+        requested = 0;
+        waiting = 0;
+
+        if (activeSubscription != null) {
+            activeSubscription.cancel();
+
+            activeSubscription = null;
+        }
+    }
+
+    /**
+     *  Publisher of scan node.
+     *
+     *  @return Publisher of scan node or {@code null} in case nothing to scan.
+     */
+    protected abstract Publisher<RowT> scan();

Review Comment:
   ```suggestion
       protected abstract @Nullable Publisher<RowT> scan();
   ```
   
   Contract of this method is not clear. The fact you need to scan several times in order to scan the whole storage once is counterintuitive



##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNode.java:
##########
@@ -19,66 +19,44 @@
 
 import static org.apache.ignite.internal.util.ArrayUtils.nullOrEmpty;
 
+import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.List;
-import java.util.Queue;
 import java.util.concurrent.Flow;
-import java.util.concurrent.Flow.Subscription;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Flow.Publisher;
 import java.util.function.Function;
 import java.util.function.Predicate;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
 import org.apache.ignite.internal.sql.engine.exec.RowHandler;
 import org.apache.ignite.internal.sql.engine.schema.InternalIgniteTable;
+import org.apache.ignite.internal.sql.engine.util.CompositePublisher;
 import org.apache.ignite.internal.table.InternalTable;
 import org.jetbrains.annotations.Nullable;
 
 /**
- * Scan node.
+ * Table scan node.
  */
-public class TableScanNode<RowT> extends AbstractNode<RowT> {
-    /** Special value to highlights that all row were received and we are not waiting any more. */
-    private static final int NOT_WAITING = -1;
+public class TableScanNode<RowT> extends StorageScanNode<RowT> {
 
     /** Table that provides access to underlying data. */
     private final InternalTable physTable;
 
-    /** Table that is an object in SQL schema. */
-    private final InternalIgniteTable schemaTable;
-
-    private final RowHandler.RowFactory<RowT> factory;
-
     private final int[] parts;
 
-    private final Queue<RowT> inBuff = new LinkedBlockingQueue<>(inBufSize);
-
-    private final @Nullable Predicate<RowT> filters;
-
-    private final @Nullable Function<RowT, RowT> rowTransformer;
-
-    /** Participating columns. */
-    private final @Nullable BitSet requiredColumns;
-
-    private int requested;
-
-    private int waiting;
-
-    private boolean inLoop;
-
-    private Subscription activeSubscription;
+    boolean dataRequested;

Review Comment:
   this may be `private`, as well as `curPartIdx`



##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/StorageScanNode.java:
##########
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec.rel;
+
+import java.util.BitSet;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.Flow;
+import java.util.concurrent.Flow.Publisher;
+import java.util.concurrent.Flow.Subscriber;
+import java.util.concurrent.Flow.Subscription;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
+import org.apache.ignite.internal.sql.engine.exec.RowHandler;
+import org.apache.ignite.internal.sql.engine.schema.InternalIgniteTable;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Based abstract scan node.
+ */
+public abstract class StorageScanNode<RowT> extends AbstractNode<RowT> {
+    /** Special value to highlights that all row were received and we are not waiting any more. */
+    private static final int NOT_WAITING = -1;
+
+    private final Queue<RowT> inBuff = new LinkedBlockingQueue<>(inBufSize);
+
+    private final @Nullable Predicate<RowT> filters;
+
+    private final @Nullable Function<RowT, RowT> rowTransformer;
+
+    private final Function<BinaryRow, RowT> tableRowConverter;
+
+    private int requested;
+
+    private int waiting;
+
+    private boolean inLoop;
+
+    private Subscription activeSubscription;

Review Comment:
   ```suggestion
       private @Nullable Subscription activeSubscription;
   ```



##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNode.java:
##########
@@ -19,66 +19,44 @@
 
 import static org.apache.ignite.internal.util.ArrayUtils.nullOrEmpty;
 
+import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.List;
-import java.util.Queue;
 import java.util.concurrent.Flow;
-import java.util.concurrent.Flow.Subscription;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Flow.Publisher;
 import java.util.function.Function;
 import java.util.function.Predicate;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
 import org.apache.ignite.internal.sql.engine.exec.RowHandler;
 import org.apache.ignite.internal.sql.engine.schema.InternalIgniteTable;
+import org.apache.ignite.internal.sql.engine.util.CompositePublisher;
 import org.apache.ignite.internal.table.InternalTable;
 import org.jetbrains.annotations.Nullable;
 
 /**
- * Scan node.
+ * Table scan node.

Review Comment:
   the same about details



##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/StorageScanNode.java:
##########
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec.rel;
+
+import java.util.BitSet;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.Flow;
+import java.util.concurrent.Flow.Publisher;
+import java.util.concurrent.Flow.Subscriber;
+import java.util.concurrent.Flow.Subscription;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
+import org.apache.ignite.internal.sql.engine.exec.RowHandler;
+import org.apache.ignite.internal.sql.engine.schema.InternalIgniteTable;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Based abstract scan node.
+ */
+public abstract class StorageScanNode<RowT> extends AbstractNode<RowT> {
+    /** Special value to highlights that all row were received and we are not waiting any more. */
+    private static final int NOT_WAITING = -1;
+
+    private final Queue<RowT> inBuff = new LinkedBlockingQueue<>(inBufSize);
+
+    private final @Nullable Predicate<RowT> filters;
+
+    private final @Nullable Function<RowT, RowT> rowTransformer;
+
+    private final Function<BinaryRow, RowT> tableRowConverter;
+
+    private int requested;
+
+    private int waiting;
+
+    private boolean inLoop;
+
+    private Subscription activeSubscription;
+
+    /**
+     * Constructor.
+     *
+     * @param ctx Execution context.
+     * @param rowFactory Row factory.
+     * @param schemaTable The table this node should scan.
+     * @param filters Optional filter to filter out rows.
+     * @param rowTransformer Optional projection function.
+     */
+    public StorageScanNode(
+            ExecutionContext<RowT> ctx,
+            RowHandler.RowFactory<RowT> rowFactory,
+            InternalIgniteTable schemaTable,
+            @Nullable Predicate<RowT> filters,
+            @Nullable Function<RowT, RowT> rowTransformer,
+            @Nullable BitSet requiredColumns
+    ) {
+        super(ctx);
+
+        assert context().transaction() != null || context().transactionTime() != null : "Transaction not initialized.";
+
+        tableRowConverter = row -> schemaTable.toRow(context(), row, rowFactory, requiredColumns);
+
+        this.filters = filters;
+        this.rowTransformer = rowTransformer;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void request(int rowsCnt) throws Exception {
+        assert rowsCnt > 0 && requested == 0 : "rowsCnt=" + rowsCnt + ", requested=" + requested;
+
+        checkState();
+
+        requested = rowsCnt;
+
+        if (!inLoop) {
+            context().execute(this::push, this::onError);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void closeInternal() {
+        super.closeInternal();
+
+        if (activeSubscription != null) {
+            activeSubscription.cancel();
+
+            activeSubscription = null;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    protected void rewindInternal() {
+        requested = 0;
+        waiting = 0;
+
+        if (activeSubscription != null) {
+            activeSubscription.cancel();
+
+            activeSubscription = null;
+        }
+    }
+
+    /**
+     *  Publisher of scan node.
+     *
+     *  @return Publisher of scan node or {@code null} in case nothing to scan.
+     */
+    protected abstract Publisher<RowT> scan();

Review Comment:
   ```suggestion
       protected abstract @Nullable Publisher<RowT> scan();
   ```



##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNode.java:
##########
@@ -52,13 +48,9 @@
 import org.jetbrains.annotations.Nullable;
 
 /**
- * Scan node.
- * TODO: merge with {@link TableScanNode}
+ * Index scan node.

Review Comment:
   could you please add more details to the javadoc?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org