You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "twalthr (via GitHub)" <gi...@apache.org> on 2023/09/29 23:59:53 UTC

[GitHub] [flink] twalthr commented on a diff in pull request #23477: [FLINK-33169] Consider descriptor information during system column expansion

twalthr commented on code in PR #23477:
URL: https://github.com/apache/flink/pull/23477#discussion_r1341864501


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java:
##########
@@ -321,4 +332,131 @@ protected void addToSelectList(
         // Always add to list
         super.addToSelectList(list, aliases, fieldList, exp, scope, includeSystemVars);
     }
+
+    @Override
+    protected @PolyNull SqlNode performUnconditionalRewrites(
+            @PolyNull SqlNode node, boolean underFrom) {
+
+        // Special case for window TVFs like:
+        // TUMBLE(TABLE t, DESCRIPTOR(metadata_virtual), INTERVAL '1' MINUTE))
+        //
+        // "TABLE t" is translated into an implicit "SELECT * FROM t". This would ignore columns
+        // that are not expanded by default. However, the descriptor explicitly states the need
+        // for this column. Therefore, explicit table expressions (for window TVFs at most one)
+        // are captured before rewriting and replaced with a "marker" SqlSelect that contains the
+        // descriptor information. The "marker" SqlSelect is considered during column expansion.
+        final List<SqlIdentifier> explicitTableArgs = getExplicitTableOperands(node);
+
+        final SqlNode rewritten = super.performUnconditionalRewrites(node, underFrom);
+
+        if (!(node instanceof SqlBasicCall)) {
+            return rewritten;
+        }
+        final SqlBasicCall call = (SqlBasicCall) node;
+        final SqlOperator operator = call.getOperator();
+
+        if (operator instanceof SqlWindowTableFunction) {
+            if (explicitTableArgs.stream().allMatch(Objects::isNull)) {
+                return rewritten;
+            }
+
+            final List<SqlIdentifier> descriptors =
+                    call.getOperandList().stream()
+                            .filter(op -> op.getKind() == SqlKind.DESCRIPTOR)
+                            .flatMap(
+                                    desc ->
+                                            ((SqlBasicCall) desc)
+                                                    .getOperandList().stream()
+                                                            .filter(SqlIdentifier.class::isInstance)
+                                                            .map(SqlIdentifier.class::cast))
+                            .collect(Collectors.toList());
+
+            for (int i = 0; i < call.operandCount(); i++) {
+                final SqlIdentifier tableArg = explicitTableArgs.get(i);
+                if (tableArg != null) {
+                    call.setOperand(i, new ExplicitTableSqlSelect(tableArg, descriptors));
+                }
+            }
+        }
+
+        return rewritten;
+    }
+
+    // --------------------------------------------------------------------------------------------
+    // Column expansion
+    // --------------------------------------------------------------------------------------------
+
+    /**
+     * A special {@link SqlSelect} to capture the origin of a {@link SqlKind#EXPLICIT_TABLE} within
+     * TVF operands.
+     */
+    private static class ExplicitTableSqlSelect extends SqlSelect {
+
+        private final List<SqlIdentifier> descriptors;
+
+        public ExplicitTableSqlSelect(SqlIdentifier table, List<SqlIdentifier> descriptors) {
+            super(
+                    SqlParserPos.ZERO,
+                    null,
+                    SqlNodeList.of(SqlIdentifier.star(SqlParserPos.ZERO)),
+                    table,
+                    null,
+                    null,
+                    null,
+                    null,
+                    null,
+                    null,
+                    null,
+                    null);
+            this.descriptors = descriptors;
+        }
+    }
+
+    /**
+     * Returns whether the given column has been declared in a {@link SqlKind#DESCRIPTOR} next to a
+     * {@link SqlKind#EXPLICIT_TABLE} within TVF operands.
+     */
+    private static boolean declaredDescriptorColumn(SelectScope scope, Column column) {
+        if (!(scope.getNode() instanceof ExplicitTableSqlSelect)) {
+            return false;
+        }
+        final ExplicitTableSqlSelect select = (ExplicitTableSqlSelect) scope.getNode();
+        return select.descriptors.stream()
+                .map(SqlIdentifier::getSimple)
+                .anyMatch(id -> id.equals(column.getName()));
+    }
+
+    /**
+     * Returns all {@link SqlKind#EXPLICIT_TABLE} operands within TVF operands. A list entry is
+     * {@code null} if the operand is not an {@link SqlKind#EXPLICIT_TABLE}.
+     */
+    private static List<SqlIdentifier> getExplicitTableOperands(SqlNode node) {
+        if (!(node instanceof SqlBasicCall)) {
+            return null;
+        }
+        final SqlBasicCall call = (SqlBasicCall) node;
+
+        if (!(call.getOperator() instanceof SqlFunction)) {
+            return null;
+        }
+        final SqlFunction function = (SqlFunction) call.getOperator();
+
+        if (function.getFunctionType() != SqlFunctionCategory.USER_DEFINED_TABLE_FUNCTION) {

Review Comment:
   For performance, I want to exit this method early. And currently all PTFs have this catagory.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org