You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/30 09:16:31 UTC

[GitHub] [flink] godfreyhe commented on a diff in pull request #20359: [FLINK-28682][table-planner] support join hint in batch rules

godfreyhe commented on code in PR #20359:
URL: https://github.com/apache/flink/pull/20359#discussion_r929877615


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/alias/SubQueryAliasNodeClearShuttle.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.alias;
+
+import org.apache.flink.table.planner.hint.FlinkHints;
+import org.apache.flink.table.planner.plan.nodes.calcite.SubQueryAlias;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelShuttleImpl;
+import org.apache.calcite.rel.hint.Hintable;
+import org.apache.calcite.rel.hint.RelHint;
+import org.apache.calcite.rel.logical.LogicalFilter;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexShuttle;
+import org.apache.calcite.rex.RexSubQuery;
+
+import java.util.Collections;
+
+/** A shuttle to remove sub-query alias node and add the alias hint on the child node. */
+public class SubQueryAliasNodeClearShuttle extends RelShuttleImpl {

Review Comment:
   ClearSubQueryAliasNodeShuttle



##########
flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java:
##########
@@ -0,0 +1,5983 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.calcite.sql2rel;
+
+import org.apache.flink.table.planner.alias.SubQueryAliasNodeClearShuttle;
+import org.apache.flink.table.planner.plan.nodes.calcite.LogicalSubQueryAlias;
+
+import org.apache.calcite.avatica.util.Spaces;
+import org.apache.calcite.linq4j.Ord;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptSamplingParameters;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.plan.ViewExpanders;
+import org.apache.calcite.prepare.Prepare;
+import org.apache.calcite.prepare.RelOptTableImpl;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelCollationTraitDef;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelRoot;
+import org.apache.calcite.rel.RelShuttleImpl;
+import org.apache.calcite.rel.SingleRel;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.core.Collect;
+import org.apache.calcite.rel.core.CorrelationId;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.JoinInfo;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.RelFactories;
+import org.apache.calcite.rel.core.Sample;
+import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rel.hint.HintStrategyTable;
+import org.apache.calcite.rel.hint.Hintable;
+import org.apache.calcite.rel.hint.RelHint;
+import org.apache.calcite.rel.logical.LogicalAggregate;
+import org.apache.calcite.rel.logical.LogicalCorrelate;
+import org.apache.calcite.rel.logical.LogicalFilter;
+import org.apache.calcite.rel.logical.LogicalIntersect;
+import org.apache.calcite.rel.logical.LogicalJoin;
+import org.apache.calcite.rel.logical.LogicalMatch;
+import org.apache.calcite.rel.logical.LogicalMinus;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rel.logical.LogicalSort;
+import org.apache.calcite.rel.logical.LogicalTableFunctionScan;
+import org.apache.calcite.rel.logical.LogicalTableModify;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.calcite.rel.logical.LogicalUnion;
+import org.apache.calcite.rel.logical.LogicalValues;
+import org.apache.calcite.rel.metadata.RelColumnMapping;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.stream.Delta;
+import org.apache.calcite.rel.stream.LogicalDelta;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexCallBinding;
+import org.apache.calcite.rex.RexCorrelVariable;
+import org.apache.calcite.rex.RexDynamicParam;
+import org.apache.calcite.rex.RexFieldAccess;
+import org.apache.calcite.rex.RexFieldCollation;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexPatternFieldRef;
+import org.apache.calcite.rex.RexRangeRef;
+import org.apache.calcite.rex.RexShuttle;
+import org.apache.calcite.rex.RexSubQuery;
+import org.apache.calcite.rex.RexUtil;
+import org.apache.calcite.rex.RexWindowBound;
+import org.apache.calcite.rex.RexWindowBounds;
+import org.apache.calcite.schema.ColumnStrategy;
+import org.apache.calcite.schema.ModifiableTable;
+import org.apache.calcite.schema.ModifiableView;
+import org.apache.calcite.schema.Table;
+import org.apache.calcite.schema.TranslatableTable;
+import org.apache.calcite.schema.Wrapper;
+import org.apache.calcite.sql.JoinConditionType;
+import org.apache.calcite.sql.JoinType;
+import org.apache.calcite.sql.SqlAggFunction;
+import org.apache.calcite.sql.SqlBasicCall;
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlCallBinding;
+import org.apache.calcite.sql.SqlDataTypeSpec;
+import org.apache.calcite.sql.SqlDelete;
+import org.apache.calcite.sql.SqlDynamicParam;
+import org.apache.calcite.sql.SqlExplainFormat;
+import org.apache.calcite.sql.SqlExplainLevel;
+import org.apache.calcite.sql.SqlFunction;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlInsert;
+import org.apache.calcite.sql.SqlIntervalQualifier;
+import org.apache.calcite.sql.SqlJoin;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.SqlMatchRecognize;
+import org.apache.calcite.sql.SqlMerge;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlNumericLiteral;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlOperatorTable;
+import org.apache.calcite.sql.SqlOrderBy;
+import org.apache.calcite.sql.SqlPivot;
+import org.apache.calcite.sql.SqlSampleSpec;
+import org.apache.calcite.sql.SqlSelect;
+import org.apache.calcite.sql.SqlSelectKeyword;
+import org.apache.calcite.sql.SqlSetOperator;
+import org.apache.calcite.sql.SqlSnapshot;
+import org.apache.calcite.sql.SqlUnnestOperator;
+import org.apache.calcite.sql.SqlUpdate;
+import org.apache.calcite.sql.SqlUtil;
+import org.apache.calcite.sql.SqlValuesOperator;
+import org.apache.calcite.sql.SqlWindow;
+import org.apache.calcite.sql.SqlWith;
+import org.apache.calcite.sql.SqlWithItem;
+import org.apache.calcite.sql.fun.SqlCase;
+import org.apache.calcite.sql.fun.SqlCountAggFunction;
+import org.apache.calcite.sql.fun.SqlInOperator;
+import org.apache.calcite.sql.fun.SqlQuantifyOperator;
+import org.apache.calcite.sql.fun.SqlRowOperator;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.sql.type.SqlReturnTypeInference;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.type.SqlTypeUtil;
+import org.apache.calcite.sql.type.TableFunctionReturnTypeInference;
+import org.apache.calcite.sql.util.SqlBasicVisitor;
+import org.apache.calcite.sql.util.SqlVisitor;
+import org.apache.calcite.sql.validate.AggregatingSelectScope;
+import org.apache.calcite.sql.validate.CollectNamespace;
+import org.apache.calcite.sql.validate.DelegatingScope;
+import org.apache.calcite.sql.validate.ListScope;
+import org.apache.calcite.sql.validate.MatchRecognizeScope;
+import org.apache.calcite.sql.validate.ParameterScope;
+import org.apache.calcite.sql.validate.SelectScope;
+import org.apache.calcite.sql.validate.SqlMonotonicity;
+import org.apache.calcite.sql.validate.SqlNameMatcher;
+import org.apache.calcite.sql.validate.SqlQualified;
+import org.apache.calcite.sql.validate.SqlUserDefinedTableFunction;
+import org.apache.calcite.sql.validate.SqlUserDefinedTableMacro;
+import org.apache.calcite.sql.validate.SqlValidator;
+import org.apache.calcite.sql.validate.SqlValidatorImpl;
+import org.apache.calcite.sql.validate.SqlValidatorNamespace;
+import org.apache.calcite.sql.validate.SqlValidatorScope;
+import org.apache.calcite.sql.validate.SqlValidatorTable;
+import org.apache.calcite.sql.validate.SqlValidatorUtil;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.calcite.tools.RelBuilderFactory;
+import org.apache.calcite.util.ImmutableBeans;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.calcite.util.Litmus;
+import org.apache.calcite.util.NlsString;
+import org.apache.calcite.util.NumberUtil;
+import org.apache.calcite.util.Pair;
+import org.apache.calcite.util.Util;
+import org.apache.calcite.util.trace.CalciteTrace;
+import org.slf4j.Logger;
+
+import javax.annotation.Nonnull;
+
+import java.lang.reflect.Type;
+import java.math.BigDecimal;
+import java.util.AbstractList;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.function.Supplier;
+import java.util.function.UnaryOperator;
+import java.util.stream.Collectors;
+
+import static org.apache.calcite.sql.SqlUtil.stripAs;
+
+/**
+ * Converts a SQL parse tree (consisting of {@link SqlNode} objects) into a relational algebra
+ * expression (consisting of {@link RelNode} objects).
+ *
+ * <p>The public entry points are: {@link #convertQuery}, {@link #convertExpression(SqlNode)}.
+ */
+@SuppressWarnings("UnstableApiUsage")
+public class SqlToRelConverter {
+    // ~ Static fields/initializers ---------------------------------------------
+
+    /** Default configuration. */
+    private static final Config CONFIG =
+            ImmutableBeans.create(Config.class)
+                    .withRelBuilderFactory(RelFactories.LOGICAL_BUILDER)
+                    .withRelBuilderConfigTransform(c -> c.withPushJoinCondition(true))
+                    .withHintStrategyTable(HintStrategyTable.EMPTY);
+
+    protected static final Logger SQL2REL_LOGGER = CalciteTrace.getSqlToRelTracer();
+
+    /** Size of the smallest IN list that will be converted to a semijoin to a static table. */
+    public static final int DEFAULT_IN_SUB_QUERY_THRESHOLD = 20;
+
+    @Deprecated // to be removed before 2.0
+    public static final int DEFAULT_IN_SUBQUERY_THRESHOLD = DEFAULT_IN_SUB_QUERY_THRESHOLD;
+
+    // ~ Instance fields --------------------------------------------------------
+
+    protected final SqlValidator validator;
+    protected final RexBuilder rexBuilder;
+    protected final Prepare.CatalogReader catalogReader;
+    protected final RelOptCluster cluster;
+    private SubQueryConverter subQueryConverter;
+    protected final Map<RelNode, Integer> leaves = new HashMap<>();
+    private final List<SqlDynamicParam> dynamicParamSqlNodes = new ArrayList<>();
+    private final SqlOperatorTable opTab;
+    protected final RelDataTypeFactory typeFactory;
+    private final SqlNodeToRexConverter exprConverter;
+    private final HintStrategyTable hintStrategies;
+    private int explainParamCount;
+    public final Config config;
+    private final RelBuilder relBuilder;
+
+    /** Fields used in name resolution for correlated sub-queries. */
+    private final Map<CorrelationId, DeferredLookup> mapCorrelToDeferred = new HashMap<>();
+
+    /**
+     * Stack of names of datasets requested by the <code>
+     * TABLE(SAMPLE(&lt;datasetName&gt;, &lt;query&gt;))</code> construct.
+     */
+    private final Deque<String> datasetStack = new ArrayDeque<>();
+
+    /**
+     * Mapping of non-correlated sub-queries that have been converted to their equivalent constants.
+     * Used to avoid re-evaluating the sub-query if it's already been evaluated.
+     */
+    private final Map<SqlNode, RexNode> mapConvertedNonCorrSubqs = new HashMap<>();
+
+    public final RelOptTable.ViewExpander viewExpander;
+
+    // ~ Constructors -----------------------------------------------------------
+    /**
+     * Creates a converter.
+     *
+     * @param viewExpander Preparing statement
+     * @param validator Validator
+     * @param catalogReader Schema
+     * @param planner Planner
+     * @param rexBuilder Rex builder
+     * @param convertletTable Expression converter
+     */
+    @Deprecated // to be removed before 2.0
+    public SqlToRelConverter(
+            RelOptTable.ViewExpander viewExpander,
+            SqlValidator validator,
+            Prepare.CatalogReader catalogReader,
+            RelOptPlanner planner,
+            RexBuilder rexBuilder,
+            SqlRexConvertletTable convertletTable) {
+        this(
+                viewExpander,
+                validator,
+                catalogReader,
+                RelOptCluster.create(planner, rexBuilder),
+                convertletTable,
+                SqlToRelConverter.config());
+    }
+
+    @Deprecated // to be removed before 2.0
+    public SqlToRelConverter(
+            RelOptTable.ViewExpander viewExpander,
+            SqlValidator validator,
+            Prepare.CatalogReader catalogReader,
+            RelOptCluster cluster,
+            SqlRexConvertletTable convertletTable) {
+        this(
+                viewExpander,
+                validator,
+                catalogReader,
+                cluster,
+                convertletTable,
+                SqlToRelConverter.config());
+    }
+
+    /* Creates a converter. */
+    public SqlToRelConverter(
+            RelOptTable.ViewExpander viewExpander,
+            SqlValidator validator,
+            Prepare.CatalogReader catalogReader,
+            RelOptCluster cluster,
+            SqlRexConvertletTable convertletTable,
+            Config config) {
+        this.viewExpander = viewExpander;
+        this.opTab =
+                (validator == null) ? SqlStdOperatorTable.instance() : validator.getOperatorTable();
+        this.validator = validator;
+        this.catalogReader = catalogReader;
+        this.subQueryConverter = new NoOpSubQueryConverter();
+        this.rexBuilder = cluster.getRexBuilder();
+        this.typeFactory = rexBuilder.getTypeFactory();
+        this.exprConverter = new SqlNodeToRexConverterImpl(convertletTable);
+        this.explainParamCount = 0;
+        this.config = Objects.requireNonNull(config);
+        this.relBuilder =
+                config.getRelBuilderFactory()
+                        .create(cluster, null)
+                        .transform(config.getRelBuilderConfigTransform());
+        this.hintStrategies = config.getHintStrategyTable();
+
+        cluster.setHintStrategies(this.hintStrategies);
+        this.cluster = Objects.requireNonNull(cluster);
+    }
+
+    // ~ Methods ----------------------------------------------------------------
+
+    /** Returns the RelOptCluster in use. */
+    public RelOptCluster getCluster() {
+        return cluster;
+    }
+
+    /** Returns the row-expression builder. */
+    public RexBuilder getRexBuilder() {
+        return rexBuilder;
+    }
+
+    /**
+     * Returns the number of dynamic parameters encountered during translation; this must only be
+     * called after {@link #convertQuery}.
+     *
+     * @return number of dynamic parameters
+     */
+    public int getDynamicParamCount() {
+        return dynamicParamSqlNodes.size();
+    }
+
+    /**
+     * Returns the type inferred for a dynamic parameter.
+     *
+     * @param index 0-based index of dynamic parameter
+     * @return inferred type, never null
+     */
+    public RelDataType getDynamicParamType(int index) {
+        SqlNode sqlNode = dynamicParamSqlNodes.get(index);
+        if (sqlNode == null) {
+            throw Util.needToImplement("dynamic param type inference");
+        }
+        return validator.getValidatedNodeType(sqlNode);
+    }
+
+    /**
+     * Returns the current count of the number of dynamic parameters in an EXPLAIN PLAN statement.
+     *
+     * @param increment if true, increment the count
+     * @return the current count before the optional increment
+     */
+    public int getDynamicParamCountInExplain(boolean increment) {
+        int retVal = explainParamCount;
+        if (increment) {
+            ++explainParamCount;
+        }
+        return retVal;
+    }
+
+    /**
+     * Returns the mapping of non-correlated sub-queries that have been converted to the constants
+     * that they evaluate to.
+     */
+    public Map<SqlNode, RexNode> getMapConvertedNonCorrSubqs() {
+        return mapConvertedNonCorrSubqs;
+    }
+
+    /**
+     * Adds to the current map of non-correlated converted sub-queries the elements from another map
+     * that contains non-correlated sub-queries that have been converted by another
+     * SqlToRelConverter.
+     *
+     * @param alreadyConvertedNonCorrSubqs the other map
+     */
+    public void addConvertedNonCorrSubqs(Map<SqlNode, RexNode> alreadyConvertedNonCorrSubqs) {
+        mapConvertedNonCorrSubqs.putAll(alreadyConvertedNonCorrSubqs);
+    }
+
+    /**
+     * Sets a new SubQueryConverter. To have any effect, this must be called before any convert
+     * method.
+     *
+     * @param converter new SubQueryConverter
+     */
+    public void setSubQueryConverter(SubQueryConverter converter) {
+        subQueryConverter = converter;
+    }
+
+    /**
+     * Sets the number of dynamic parameters in the current EXPLAIN PLAN statement.
+     *
+     * @param explainParamCount number of dynamic parameters in the statement
+     */
+    public void setDynamicParamCountInExplain(int explainParamCount) {
+        assert config.isExplain();
+        this.explainParamCount = explainParamCount;
+    }
+
+    private void checkConvertedType(SqlNode query, RelNode result) {
+        if (query.isA(SqlKind.DML)) {
+            return;
+        }
+        // Verify that conversion from SQL to relational algebra did
+        // not perturb any type information.  (We can't do this if the
+        // SQL statement is something like an INSERT which has no
+        // validator type information associated with its result,
+        // hence the namespace check above.)
+        final List<RelDataTypeField> validatedFields =
+                validator.getValidatedNodeType(query).getFieldList();
+        final RelDataType validatedRowType =
+                validator
+                        .getTypeFactory()
+                        .createStructType(
+                                Pair.right(validatedFields),
+                                SqlValidatorUtil.uniquify(
+                                        Pair.left(validatedFields),
+                                        catalogReader.nameMatcher().isCaseSensitive()));
+
+        final List<RelDataTypeField> convertedFields =
+                result.getRowType().getFieldList().subList(0, validatedFields.size());
+        final RelDataType convertedRowType =
+                validator.getTypeFactory().createStructType(convertedFields);
+
+        if (!RelOptUtil.equal(
+                "validated row type",
+                validatedRowType,
+                "converted row type",
+                convertedRowType,
+                Litmus.IGNORE)) {
+            throw new AssertionError(
+                    "Conversion to relational algebra failed to "
+                            + "preserve datatypes:\n"
+                            + "validated type:\n"
+                            + validatedRowType.getFullTypeString()
+                            + "\nconverted type:\n"
+                            + convertedRowType.getFullTypeString()
+                            + "\nrel:\n"
+                            + RelOptUtil.toString(result));
+        }
+    }
+
+    public RelNode flattenTypes(RelNode rootRel, boolean restructure) {
+        RelStructuredTypeFlattener typeFlattener =
+                new RelStructuredTypeFlattener(
+                        relBuilder,
+                        rexBuilder,
+                        createToRelContext(com.google.common.collect.ImmutableList.of()),
+                        restructure);
+        return typeFlattener.rewrite(rootRel);
+    }
+
+    /**
+     * If sub-query is correlated and decorrelation is enabled, performs decorrelation.
+     *
+     * @param query Query
+     * @param rootRel Root relational expression
+     * @return New root relational expression after decorrelation
+     */
+    public RelNode decorrelate(SqlNode query, RelNode rootRel) {
+        if (!config.isDecorrelationEnabled()) {
+            return rootRel;
+        }
+        final RelNode result = decorrelateQuery(rootRel);
+        if (result != rootRel) {
+            checkConvertedType(query, result);
+        }
+        return result;
+    }
+
+    /**
+     * Walks over a tree of relational expressions, replacing each {@link RelNode} with a 'slimmed
+     * down' relational expression that projects only the fields required by its consumer.
+     *
+     * <p>This may make things easier for the optimizer, by removing crud that would expand the
+     * search space, but is difficult for the optimizer itself to do it, because optimizer rules
+     * must preserve the number and type of fields. Hence, this transform that operates on the
+     * entire tree, similar to the {@link RelStructuredTypeFlattener type-flattening transform}.
+     *
+     * <p>Currently this functionality is disabled in farrago/luciddb; the default implementation of
+     * this method does nothing.
+     *
+     * @param ordered Whether the relational expression must produce results in a particular order
+     *     (typically because it has an ORDER BY at top level)
+     * @param rootRel Relational expression that is at the root of the tree
+     * @return Trimmed relational expression
+     */
+    public RelNode trimUnusedFields(boolean ordered, RelNode rootRel) {
+        // Trim fields that are not used by their consumer.
+        if (isTrimUnusedFields()) {
+            final RelFieldTrimmer trimmer = newFieldTrimmer();
+            final List<RelCollation> collations =
+                    rootRel.getTraitSet().getTraits(RelCollationTraitDef.INSTANCE);
+            rootRel = trimmer.trim(rootRel);
+            if (!ordered
+                    && collations != null
+                    && !collations.isEmpty()
+                    && !collations.equals(
+                            com.google.common.collect.ImmutableList.of(RelCollations.EMPTY))) {
+                final RelTraitSet traitSet =
+                        rootRel.getTraitSet().replace(RelCollationTraitDef.INSTANCE, collations);
+                rootRel = rootRel.copy(traitSet, rootRel.getInputs());
+            }
+            if (SQL2REL_LOGGER.isDebugEnabled()) {
+                SQL2REL_LOGGER.debug(
+                        RelOptUtil.dumpPlan(
+                                "Plan after trimming unused fields",
+                                rootRel,
+                                SqlExplainFormat.TEXT,
+                                SqlExplainLevel.EXPPLAN_ATTRIBUTES));
+            }
+        }
+        return rootRel;
+    }
+
+    /**
+     * Creates a RelFieldTrimmer.
+     *
+     * @return Field trimmer
+     */
+    protected RelFieldTrimmer newFieldTrimmer() {
+        return new RelFieldTrimmer(validator, relBuilder);
+    }
+
+    /**
+     * Converts an unvalidated query's parse tree into a relational expression.
+     *
+     * @param query Query to convert
+     * @param needsValidation Whether to validate the query before converting; <code>false</code> if
+     *     the query has already been validated.
+     * @param top Whether the query is top-level, say if its result will become a JDBC result set;
+     *     <code>false</code> if the query will be part of a view.
+     */
+    public RelRoot convertQuery(SqlNode query, final boolean needsValidation, final boolean top) {
+        if (needsValidation) {
+            query = validator.validate(query);
+        }
+
+        RelNode result = convertQueryRecursive(query, top, null).rel;
+        if (top) {
+            if (isStream(query)) {
+                result = new LogicalDelta(cluster, result.getTraitSet(), result);
+            }
+        }
+        RelCollation collation = RelCollations.EMPTY;
+        if (!query.isA(SqlKind.DML)) {
+            if (isOrdered(query)) {
+                collation = requiredCollation(result);
+            }
+        }
+        checkConvertedType(query, result);
+
+        if (SQL2REL_LOGGER.isDebugEnabled()) {
+            SQL2REL_LOGGER.debug(
+                    RelOptUtil.dumpPlan(
+                            "Plan after converting SqlNode to RelNode",
+                            result,
+                            SqlExplainFormat.TEXT,
+                            SqlExplainLevel.EXPPLAN_ATTRIBUTES));
+        }
+
+        final RelDataType validatedRowType = validator.getValidatedNodeType(query);
+        List<RelHint> hints = new ArrayList<>();
+        if (query.getKind() == SqlKind.SELECT) {
+            final SqlSelect select = (SqlSelect) query;
+            if (select.hasHints()) {
+                hints = SqlUtil.getRelHint(hintStrategies, select.getHints());
+            }
+        }
+        // propagate the hints.
+        result = RelOptUtil.propagateRelHints(result, false);
+
+        // clear node SubQueryAlias.

Review Comment:
   Please add FLINK change flag: 
   
   // ----- FLINK MODIFICATION BEGIN -----
   
   
   // ----- FLINK MODIFICATION END -----
   
   and add some comments to explain why we need change it
   



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/RelTimeIndicatorConverter.java:
##########
@@ -305,7 +305,8 @@ public RexNode visitInputRef(RexInputRef inputRef) {
                                             }
                                         }
                                     });
-            return FlinkLogicalJoin.create(newLeft, newRight, newCondition, join.getJoinType());
+            return FlinkLogicalJoin.create(

Review Comment:
   The digest should contains hint info



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalJoinRuleBase.scala:
##########
@@ -20,19 +20,42 @@ package org.apache.flink.table.planner.plan.rules.physical.batch
 import org.apache.flink.annotation.Experimental
 import org.apache.flink.configuration.ConfigOption
 import org.apache.flink.configuration.ConfigOptions.key
+import org.apache.flink.table.api.{TableConfig, TableException, ValidationException}
+import org.apache.flink.table.api.config.OptimizerConfigOptions
+import org.apache.flink.table.planner.{JArrayList, JDouble, JList}
+import org.apache.flink.table.planner.hint.JoinStrategy
 import org.apache.flink.table.planner.plan.nodes.FlinkConventions
 import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalLocalHashAggregate
-import org.apache.flink.table.planner.plan.utils.FlinkRelMdUtil
+import org.apache.flink.table.planner.plan.utils.{FlinkRelMdUtil, OperatorType}
 import org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTableConfig
+import org.apache.flink.table.planner.utils.TableConfigUtils.isOperatorDisabled
 
-import org.apache.calcite.plan.RelOptRule
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
 import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.core.{Join, JoinRelType}
 import org.apache.calcite.util.ImmutableBitSet
 
 import java.lang.{Boolean => JBoolean, Double => JDouble}
 
+import scala.collection.JavaConversions._
+
 trait BatchPhysicalJoinRuleBase {
 
+  protected def checkMatchJoinStrategy(
+      call: RelOptRuleCall,
+      joinStrategy: JoinStrategy): Boolean = {
+    val join: Join = call.rel(0)
+    val tableConfig = unwrapTableConfig(call)
+    val validJoinHints = collectValidJoinHints(join, tableConfig)
+    if (!validJoinHints.isEmpty) {

Review Comment:
   nit: validJoinHints.nonEmpty



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalHashJoinRule.scala:
##########
@@ -89,19 +71,49 @@ class BatchPhysicalHashJoinRule
       case _ => (join.getRight, false)
     }
 
-    val leftSize = binaryRowRelNodeSize(left)
-    val rightSize = binaryRowRelNodeSize(right)
-
-    val (isBroadcast, leftIsBroadcast) = canBroadcast(joinType, leftSize, rightSize, tableConfig)
+    var isBroadcast = false

Review Comment:
   nit: avoid `var`



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalJoinRuleBase.scala:
##########
@@ -72,6 +95,231 @@ trait BatchPhysicalJoinRuleBase {
       rowCount * FlinkRelMdUtil.binaryRowAverageSize(relNode)
     }
   }
+
+  protected def collectValidJoinHints(join: Join, tableConfig: TableConfig): JList[JoinStrategy] = {
+    val allHints = join.getHints
+    val validHints = new JArrayList[JoinStrategy]
+
+    allHints.forEach(
+      relHint => {
+        val joinHint = JoinStrategy.getJoinStrategy(relHint.hintName)
+        if (checkJoinStrategyValid(join, tableConfig, joinHint, withHint = true)._1) {
+          validHints.add(joinHint)
+        }
+      })
+
+    validHints
+  }
+
+  /**
+   * Check whether the join strategy is valid.
+   *
+   * @param join
+   *   the join node
+   * @param tableConfig
+   *   the table config
+   * @param triedJoinStrategy
+   *   the join strategy checked
+   * @param withHint
+   *   whether this check is called with hint
+   * @return
+   *   an Tuple2 instance. The first element of tuple is true if join is valid, false else. The
+   *   second element of tuple is true if left side used as build side, false else.
+   */
+  def checkJoinStrategyValid(
+      join: Join,
+      tableConfig: TableConfig,
+      triedJoinStrategy: JoinStrategy,
+      withHint: Boolean): (Boolean, Boolean) = {
+
+    // TODO currently join hint is not supported with semi/anti join
+    if (withHint && !join.getJoinType.projectsRight()) {
+      return (false, false)
+    }
+
+    triedJoinStrategy match {
+      case JoinStrategy.BROADCAST =>
+        checkBroadcast(join, tableConfig, withHint)
+
+      case JoinStrategy.SHUFFLE_HASH =>
+        checkShuffleHash(join, tableConfig, withHint)
+
+      case JoinStrategy.SHUFFLE_MERGE =>
+        // for SortMergeJoin, there is no diff between with hint or without hint
+        // the second arg should be ignored
+        (checkSortMergeJoin(join, tableConfig), false)
+
+      case JoinStrategy.NEST_LOOP =>
+        checkNestLoopJoin(join, tableConfig, withHint)
+
+      case _ =>
+        throw new ValidationException("Unknown join strategy : " + triedJoinStrategy)
+    }
+  }
+
+  private def isEquivJoin(join: Join): Boolean = {
+    val joinInfo = join.analyzeCondition
+    !joinInfo.pairs().isEmpty
+  }
+
+  /**
+   * Decides whether the join can convert to BroadcastHashJoin.
+   *
+   * @param join
+   *   the join node
+   * @return
+   *   an Tuple2 instance. The first element of tuple is true if join can convert to broadcast hash
+   *   join, false else. The second element of tuple is true if left side used as broadcast side,
+   *   false else.
+   */
+  protected def checkBroadcast(
+      join: Join,
+      tableConfig: TableConfig,
+      withHint: Boolean): (Boolean, Boolean) = {

Review Comment:
   withHint -> withBroadcastHint



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/join/joinhint/JoinHintTestBase.java:
##########
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.batch.sql.join.joinhint;
+
+import org.apache.flink.table.api.SqlParserException;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.planner.hint.JoinStrategy;
+import org.apache.flink.table.planner.utils.BatchTableTestUtil;
+import org.apache.flink.table.planner.utils.TableTestBase;
+
+import org.apache.flink.shaded.curator5.org.apache.curator.shaded.com.google.common.collect.Lists;
+
+import org.apache.logging.log4j.util.Strings;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** A test base for join hint. */
+public abstract class JoinHintTestBase extends TableTestBase {

Review Comment:
   Please add some tests 
   1. operator disabled
   2. complex query, some query blocks have hints and other query blocks do not have hint, such as union
   3. query with project and hints
   4. more cases which can not propagate



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/calcite/LogicalSubQueryAlias.java:
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.calcite;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rel.hint.RelHint;
+import org.apache.calcite.sql.SqlIdentifier;
+
+import java.util.List;
+
+/**
+ * Sub-class of {@link SubQueryAlias} that is a relational operator which that records the names of
+ * the sub-query. This class corresponds to Calcite logical rel.
+ */
+public class LogicalSubQueryAlias extends SubQueryAlias {
+
+    public LogicalSubQueryAlias(
+            RelOptCluster cluster, RelTraitSet traits, RelNode input, String aliasName) {
+        super(cluster, traits, input, aliasName, com.google.common.collect.ImmutableList.of());
+    }
+
+    public LogicalSubQueryAlias(
+            RelOptCluster cluster,
+            RelTraitSet traits,
+            RelNode input,
+            String aliasName,
+            com.google.common.collect.ImmutableList<RelHint> hints) {
+        super(cluster, traits, input, aliasName, hints);
+    }
+
+    public static LogicalSubQueryAlias create(RelNode input, SqlIdentifier aliasName) {
+        return new LogicalSubQueryAlias(
+                input.getCluster(), input.getTraitSet(), input, aliasName.toString());
+    }
+
+    @Override
+    public RelWriter explainTerms(RelWriter pw) {
+        return super.explainTerms(pw).item("aliasName", getAliasName());

Review Comment:
   simply `aliasName ` as `alias`



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/calcite/SubQueryAlias.java:
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.calcite;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.SingleRel;
+import org.apache.calcite.rel.hint.Hintable;
+import org.apache.calcite.rel.hint.RelHint;
+
+/**
+ * Relational operator that records the names of the sub-query.
+ *
+ * <p>This class implements {@link Hintable} to access the function {@see
+ * org.apache.calcite.plan.RelOptUtil#visitHintable}, so the join hints in current query level will
+ * not be propagated into the sub-query.
+ */
+public abstract class SubQueryAlias extends SingleRel implements Hintable {

Review Comment:
   It's better we can rename sub-query to query-block here to avoid concept conflicts with existing RexSubQuery



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalJoinRuleBase.scala:
##########
@@ -20,19 +20,42 @@ package org.apache.flink.table.planner.plan.rules.physical.batch
 import org.apache.flink.annotation.Experimental
 import org.apache.flink.configuration.ConfigOption
 import org.apache.flink.configuration.ConfigOptions.key
+import org.apache.flink.table.api.{TableConfig, TableException, ValidationException}
+import org.apache.flink.table.api.config.OptimizerConfigOptions
+import org.apache.flink.table.planner.{JArrayList, JDouble, JList}
+import org.apache.flink.table.planner.hint.JoinStrategy
 import org.apache.flink.table.planner.plan.nodes.FlinkConventions
 import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalLocalHashAggregate
-import org.apache.flink.table.planner.plan.utils.FlinkRelMdUtil
+import org.apache.flink.table.planner.plan.utils.{FlinkRelMdUtil, OperatorType}
 import org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTableConfig
+import org.apache.flink.table.planner.utils.TableConfigUtils.isOperatorDisabled
 
-import org.apache.calcite.plan.RelOptRule
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
 import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.core.{Join, JoinRelType}
 import org.apache.calcite.util.ImmutableBitSet
 
 import java.lang.{Boolean => JBoolean, Double => JDouble}
 
+import scala.collection.JavaConversions._
+
 trait BatchPhysicalJoinRuleBase {
 
+  protected def checkMatchJoinStrategy(
+      call: RelOptRuleCall,

Review Comment:
   the parameter should be Join instead of RelOptRuleCall



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/JoinHintResolver.scala:
##########
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.planner.plan.optimize
+
+import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.planner.JHashSet
+import org.apache.flink.table.planner.hint.{FlinkHints, JoinStrategy}
+
+import org.apache.calcite.plan.hep.HepRelVertex
+import org.apache.calcite.plan.volcano.RelSubset
+import org.apache.calcite.rel.{BiRel, RelNode, RelShuttleImpl}
+import org.apache.calcite.rel.core.TableScan
+import org.apache.calcite.rel.hint.{Hintable, RelHint}
+import org.apache.calcite.rel.logical.{LogicalCorrelate, LogicalJoin}
+import org.apache.calcite.util.Util
+
+import java.util.Collections
+
+import scala.collection.JavaConversions._
+
+/**
+ * Resolve and validate Hints, currently only join hints are supported.
+ *
+ * Here the duplicated join hints will not be checked.
+ */
+class JoinHintResolver {

Review Comment:
   We should use Java instead of Scala for new Class



##########
flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java:
##########
@@ -0,0 +1,5983 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.calcite.sql2rel;
+
+import org.apache.flink.table.planner.alias.SubQueryAliasNodeClearShuttle;
+import org.apache.flink.table.planner.plan.nodes.calcite.LogicalSubQueryAlias;
+
+import org.apache.calcite.avatica.util.Spaces;
+import org.apache.calcite.linq4j.Ord;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptSamplingParameters;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.plan.ViewExpanders;
+import org.apache.calcite.prepare.Prepare;
+import org.apache.calcite.prepare.RelOptTableImpl;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelCollationTraitDef;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelRoot;
+import org.apache.calcite.rel.RelShuttleImpl;
+import org.apache.calcite.rel.SingleRel;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.core.Collect;
+import org.apache.calcite.rel.core.CorrelationId;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.JoinInfo;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.RelFactories;
+import org.apache.calcite.rel.core.Sample;
+import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rel.hint.HintStrategyTable;
+import org.apache.calcite.rel.hint.Hintable;
+import org.apache.calcite.rel.hint.RelHint;
+import org.apache.calcite.rel.logical.LogicalAggregate;
+import org.apache.calcite.rel.logical.LogicalCorrelate;
+import org.apache.calcite.rel.logical.LogicalFilter;
+import org.apache.calcite.rel.logical.LogicalIntersect;
+import org.apache.calcite.rel.logical.LogicalJoin;
+import org.apache.calcite.rel.logical.LogicalMatch;
+import org.apache.calcite.rel.logical.LogicalMinus;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rel.logical.LogicalSort;
+import org.apache.calcite.rel.logical.LogicalTableFunctionScan;
+import org.apache.calcite.rel.logical.LogicalTableModify;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.calcite.rel.logical.LogicalUnion;
+import org.apache.calcite.rel.logical.LogicalValues;
+import org.apache.calcite.rel.metadata.RelColumnMapping;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.stream.Delta;
+import org.apache.calcite.rel.stream.LogicalDelta;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexCallBinding;
+import org.apache.calcite.rex.RexCorrelVariable;
+import org.apache.calcite.rex.RexDynamicParam;
+import org.apache.calcite.rex.RexFieldAccess;
+import org.apache.calcite.rex.RexFieldCollation;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexPatternFieldRef;
+import org.apache.calcite.rex.RexRangeRef;
+import org.apache.calcite.rex.RexShuttle;
+import org.apache.calcite.rex.RexSubQuery;
+import org.apache.calcite.rex.RexUtil;
+import org.apache.calcite.rex.RexWindowBound;
+import org.apache.calcite.rex.RexWindowBounds;
+import org.apache.calcite.schema.ColumnStrategy;
+import org.apache.calcite.schema.ModifiableTable;
+import org.apache.calcite.schema.ModifiableView;
+import org.apache.calcite.schema.Table;
+import org.apache.calcite.schema.TranslatableTable;
+import org.apache.calcite.schema.Wrapper;
+import org.apache.calcite.sql.JoinConditionType;
+import org.apache.calcite.sql.JoinType;
+import org.apache.calcite.sql.SqlAggFunction;
+import org.apache.calcite.sql.SqlBasicCall;
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlCallBinding;
+import org.apache.calcite.sql.SqlDataTypeSpec;
+import org.apache.calcite.sql.SqlDelete;
+import org.apache.calcite.sql.SqlDynamicParam;
+import org.apache.calcite.sql.SqlExplainFormat;
+import org.apache.calcite.sql.SqlExplainLevel;
+import org.apache.calcite.sql.SqlFunction;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlInsert;
+import org.apache.calcite.sql.SqlIntervalQualifier;
+import org.apache.calcite.sql.SqlJoin;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.SqlMatchRecognize;
+import org.apache.calcite.sql.SqlMerge;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlNumericLiteral;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlOperatorTable;
+import org.apache.calcite.sql.SqlOrderBy;
+import org.apache.calcite.sql.SqlPivot;
+import org.apache.calcite.sql.SqlSampleSpec;
+import org.apache.calcite.sql.SqlSelect;
+import org.apache.calcite.sql.SqlSelectKeyword;
+import org.apache.calcite.sql.SqlSetOperator;
+import org.apache.calcite.sql.SqlSnapshot;
+import org.apache.calcite.sql.SqlUnnestOperator;
+import org.apache.calcite.sql.SqlUpdate;
+import org.apache.calcite.sql.SqlUtil;
+import org.apache.calcite.sql.SqlValuesOperator;
+import org.apache.calcite.sql.SqlWindow;
+import org.apache.calcite.sql.SqlWith;
+import org.apache.calcite.sql.SqlWithItem;
+import org.apache.calcite.sql.fun.SqlCase;
+import org.apache.calcite.sql.fun.SqlCountAggFunction;
+import org.apache.calcite.sql.fun.SqlInOperator;
+import org.apache.calcite.sql.fun.SqlQuantifyOperator;
+import org.apache.calcite.sql.fun.SqlRowOperator;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.sql.type.SqlReturnTypeInference;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.type.SqlTypeUtil;
+import org.apache.calcite.sql.type.TableFunctionReturnTypeInference;
+import org.apache.calcite.sql.util.SqlBasicVisitor;
+import org.apache.calcite.sql.util.SqlVisitor;
+import org.apache.calcite.sql.validate.AggregatingSelectScope;
+import org.apache.calcite.sql.validate.CollectNamespace;
+import org.apache.calcite.sql.validate.DelegatingScope;
+import org.apache.calcite.sql.validate.ListScope;
+import org.apache.calcite.sql.validate.MatchRecognizeScope;
+import org.apache.calcite.sql.validate.ParameterScope;
+import org.apache.calcite.sql.validate.SelectScope;
+import org.apache.calcite.sql.validate.SqlMonotonicity;
+import org.apache.calcite.sql.validate.SqlNameMatcher;
+import org.apache.calcite.sql.validate.SqlQualified;
+import org.apache.calcite.sql.validate.SqlUserDefinedTableFunction;
+import org.apache.calcite.sql.validate.SqlUserDefinedTableMacro;
+import org.apache.calcite.sql.validate.SqlValidator;
+import org.apache.calcite.sql.validate.SqlValidatorImpl;
+import org.apache.calcite.sql.validate.SqlValidatorNamespace;
+import org.apache.calcite.sql.validate.SqlValidatorScope;
+import org.apache.calcite.sql.validate.SqlValidatorTable;
+import org.apache.calcite.sql.validate.SqlValidatorUtil;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.calcite.tools.RelBuilderFactory;
+import org.apache.calcite.util.ImmutableBeans;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.calcite.util.Litmus;
+import org.apache.calcite.util.NlsString;
+import org.apache.calcite.util.NumberUtil;
+import org.apache.calcite.util.Pair;
+import org.apache.calcite.util.Util;
+import org.apache.calcite.util.trace.CalciteTrace;
+import org.slf4j.Logger;
+
+import javax.annotation.Nonnull;
+
+import java.lang.reflect.Type;
+import java.math.BigDecimal;
+import java.util.AbstractList;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.function.Supplier;
+import java.util.function.UnaryOperator;
+import java.util.stream.Collectors;
+
+import static org.apache.calcite.sql.SqlUtil.stripAs;
+
+/**
+ * Converts a SQL parse tree (consisting of {@link SqlNode} objects) into a relational algebra
+ * expression (consisting of {@link RelNode} objects).
+ *
+ * <p>The public entry points are: {@link #convertQuery}, {@link #convertExpression(SqlNode)}.
+ */
+@SuppressWarnings("UnstableApiUsage")
+public class SqlToRelConverter {
+    // ~ Static fields/initializers ---------------------------------------------
+
+    /** Default configuration. */
+    private static final Config CONFIG =
+            ImmutableBeans.create(Config.class)
+                    .withRelBuilderFactory(RelFactories.LOGICAL_BUILDER)
+                    .withRelBuilderConfigTransform(c -> c.withPushJoinCondition(true))
+                    .withHintStrategyTable(HintStrategyTable.EMPTY);
+
+    protected static final Logger SQL2REL_LOGGER = CalciteTrace.getSqlToRelTracer();
+
+    /** Size of the smallest IN list that will be converted to a semijoin to a static table. */
+    public static final int DEFAULT_IN_SUB_QUERY_THRESHOLD = 20;
+
+    @Deprecated // to be removed before 2.0
+    public static final int DEFAULT_IN_SUBQUERY_THRESHOLD = DEFAULT_IN_SUB_QUERY_THRESHOLD;
+
+    // ~ Instance fields --------------------------------------------------------
+
+    protected final SqlValidator validator;
+    protected final RexBuilder rexBuilder;
+    protected final Prepare.CatalogReader catalogReader;
+    protected final RelOptCluster cluster;
+    private SubQueryConverter subQueryConverter;
+    protected final Map<RelNode, Integer> leaves = new HashMap<>();
+    private final List<SqlDynamicParam> dynamicParamSqlNodes = new ArrayList<>();
+    private final SqlOperatorTable opTab;
+    protected final RelDataTypeFactory typeFactory;
+    private final SqlNodeToRexConverter exprConverter;
+    private final HintStrategyTable hintStrategies;
+    private int explainParamCount;
+    public final Config config;
+    private final RelBuilder relBuilder;
+
+    /** Fields used in name resolution for correlated sub-queries. */
+    private final Map<CorrelationId, DeferredLookup> mapCorrelToDeferred = new HashMap<>();
+
+    /**
+     * Stack of names of datasets requested by the <code>
+     * TABLE(SAMPLE(&lt;datasetName&gt;, &lt;query&gt;))</code> construct.
+     */
+    private final Deque<String> datasetStack = new ArrayDeque<>();
+
+    /**
+     * Mapping of non-correlated sub-queries that have been converted to their equivalent constants.
+     * Used to avoid re-evaluating the sub-query if it's already been evaluated.
+     */
+    private final Map<SqlNode, RexNode> mapConvertedNonCorrSubqs = new HashMap<>();
+
+    public final RelOptTable.ViewExpander viewExpander;
+
+    // ~ Constructors -----------------------------------------------------------
+    /**
+     * Creates a converter.
+     *
+     * @param viewExpander Preparing statement
+     * @param validator Validator
+     * @param catalogReader Schema
+     * @param planner Planner
+     * @param rexBuilder Rex builder
+     * @param convertletTable Expression converter
+     */
+    @Deprecated // to be removed before 2.0
+    public SqlToRelConverter(
+            RelOptTable.ViewExpander viewExpander,
+            SqlValidator validator,
+            Prepare.CatalogReader catalogReader,
+            RelOptPlanner planner,
+            RexBuilder rexBuilder,
+            SqlRexConvertletTable convertletTable) {
+        this(
+                viewExpander,
+                validator,
+                catalogReader,
+                RelOptCluster.create(planner, rexBuilder),
+                convertletTable,
+                SqlToRelConverter.config());
+    }
+
+    @Deprecated // to be removed before 2.0
+    public SqlToRelConverter(
+            RelOptTable.ViewExpander viewExpander,
+            SqlValidator validator,
+            Prepare.CatalogReader catalogReader,
+            RelOptCluster cluster,
+            SqlRexConvertletTable convertletTable) {
+        this(
+                viewExpander,
+                validator,
+                catalogReader,
+                cluster,
+                convertletTable,
+                SqlToRelConverter.config());
+    }
+
+    /* Creates a converter. */
+    public SqlToRelConverter(
+            RelOptTable.ViewExpander viewExpander,
+            SqlValidator validator,
+            Prepare.CatalogReader catalogReader,
+            RelOptCluster cluster,
+            SqlRexConvertletTable convertletTable,
+            Config config) {
+        this.viewExpander = viewExpander;
+        this.opTab =
+                (validator == null) ? SqlStdOperatorTable.instance() : validator.getOperatorTable();
+        this.validator = validator;
+        this.catalogReader = catalogReader;
+        this.subQueryConverter = new NoOpSubQueryConverter();
+        this.rexBuilder = cluster.getRexBuilder();
+        this.typeFactory = rexBuilder.getTypeFactory();
+        this.exprConverter = new SqlNodeToRexConverterImpl(convertletTable);
+        this.explainParamCount = 0;
+        this.config = Objects.requireNonNull(config);
+        this.relBuilder =
+                config.getRelBuilderFactory()
+                        .create(cluster, null)
+                        .transform(config.getRelBuilderConfigTransform());
+        this.hintStrategies = config.getHintStrategyTable();
+
+        cluster.setHintStrategies(this.hintStrategies);
+        this.cluster = Objects.requireNonNull(cluster);
+    }
+
+    // ~ Methods ----------------------------------------------------------------
+
+    /** Returns the RelOptCluster in use. */
+    public RelOptCluster getCluster() {
+        return cluster;
+    }
+
+    /** Returns the row-expression builder. */
+    public RexBuilder getRexBuilder() {
+        return rexBuilder;
+    }
+
+    /**
+     * Returns the number of dynamic parameters encountered during translation; this must only be
+     * called after {@link #convertQuery}.
+     *
+     * @return number of dynamic parameters
+     */
+    public int getDynamicParamCount() {
+        return dynamicParamSqlNodes.size();
+    }
+
+    /**
+     * Returns the type inferred for a dynamic parameter.
+     *
+     * @param index 0-based index of dynamic parameter
+     * @return inferred type, never null
+     */
+    public RelDataType getDynamicParamType(int index) {
+        SqlNode sqlNode = dynamicParamSqlNodes.get(index);
+        if (sqlNode == null) {
+            throw Util.needToImplement("dynamic param type inference");
+        }
+        return validator.getValidatedNodeType(sqlNode);
+    }
+
+    /**
+     * Returns the current count of the number of dynamic parameters in an EXPLAIN PLAN statement.
+     *
+     * @param increment if true, increment the count
+     * @return the current count before the optional increment
+     */
+    public int getDynamicParamCountInExplain(boolean increment) {
+        int retVal = explainParamCount;
+        if (increment) {
+            ++explainParamCount;
+        }
+        return retVal;
+    }
+
+    /**
+     * Returns the mapping of non-correlated sub-queries that have been converted to the constants
+     * that they evaluate to.
+     */
+    public Map<SqlNode, RexNode> getMapConvertedNonCorrSubqs() {
+        return mapConvertedNonCorrSubqs;
+    }
+
+    /**
+     * Adds to the current map of non-correlated converted sub-queries the elements from another map
+     * that contains non-correlated sub-queries that have been converted by another
+     * SqlToRelConverter.
+     *
+     * @param alreadyConvertedNonCorrSubqs the other map
+     */
+    public void addConvertedNonCorrSubqs(Map<SqlNode, RexNode> alreadyConvertedNonCorrSubqs) {
+        mapConvertedNonCorrSubqs.putAll(alreadyConvertedNonCorrSubqs);
+    }
+
+    /**
+     * Sets a new SubQueryConverter. To have any effect, this must be called before any convert
+     * method.
+     *
+     * @param converter new SubQueryConverter
+     */
+    public void setSubQueryConverter(SubQueryConverter converter) {
+        subQueryConverter = converter;
+    }
+
+    /**
+     * Sets the number of dynamic parameters in the current EXPLAIN PLAN statement.
+     *
+     * @param explainParamCount number of dynamic parameters in the statement
+     */
+    public void setDynamicParamCountInExplain(int explainParamCount) {
+        assert config.isExplain();
+        this.explainParamCount = explainParamCount;
+    }
+
+    private void checkConvertedType(SqlNode query, RelNode result) {
+        if (query.isA(SqlKind.DML)) {
+            return;
+        }
+        // Verify that conversion from SQL to relational algebra did
+        // not perturb any type information.  (We can't do this if the
+        // SQL statement is something like an INSERT which has no
+        // validator type information associated with its result,
+        // hence the namespace check above.)
+        final List<RelDataTypeField> validatedFields =
+                validator.getValidatedNodeType(query).getFieldList();
+        final RelDataType validatedRowType =
+                validator
+                        .getTypeFactory()
+                        .createStructType(
+                                Pair.right(validatedFields),
+                                SqlValidatorUtil.uniquify(
+                                        Pair.left(validatedFields),
+                                        catalogReader.nameMatcher().isCaseSensitive()));
+
+        final List<RelDataTypeField> convertedFields =
+                result.getRowType().getFieldList().subList(0, validatedFields.size());
+        final RelDataType convertedRowType =
+                validator.getTypeFactory().createStructType(convertedFields);
+
+        if (!RelOptUtil.equal(
+                "validated row type",
+                validatedRowType,
+                "converted row type",
+                convertedRowType,
+                Litmus.IGNORE)) {
+            throw new AssertionError(
+                    "Conversion to relational algebra failed to "
+                            + "preserve datatypes:\n"
+                            + "validated type:\n"
+                            + validatedRowType.getFullTypeString()
+                            + "\nconverted type:\n"
+                            + convertedRowType.getFullTypeString()
+                            + "\nrel:\n"
+                            + RelOptUtil.toString(result));
+        }
+    }
+
+    public RelNode flattenTypes(RelNode rootRel, boolean restructure) {
+        RelStructuredTypeFlattener typeFlattener =
+                new RelStructuredTypeFlattener(
+                        relBuilder,
+                        rexBuilder,
+                        createToRelContext(com.google.common.collect.ImmutableList.of()),
+                        restructure);
+        return typeFlattener.rewrite(rootRel);
+    }
+
+    /**
+     * If sub-query is correlated and decorrelation is enabled, performs decorrelation.
+     *
+     * @param query Query
+     * @param rootRel Root relational expression
+     * @return New root relational expression after decorrelation
+     */
+    public RelNode decorrelate(SqlNode query, RelNode rootRel) {
+        if (!config.isDecorrelationEnabled()) {
+            return rootRel;
+        }
+        final RelNode result = decorrelateQuery(rootRel);
+        if (result != rootRel) {
+            checkConvertedType(query, result);
+        }
+        return result;
+    }
+
+    /**
+     * Walks over a tree of relational expressions, replacing each {@link RelNode} with a 'slimmed
+     * down' relational expression that projects only the fields required by its consumer.
+     *
+     * <p>This may make things easier for the optimizer, by removing crud that would expand the
+     * search space, but is difficult for the optimizer itself to do it, because optimizer rules
+     * must preserve the number and type of fields. Hence, this transform that operates on the
+     * entire tree, similar to the {@link RelStructuredTypeFlattener type-flattening transform}.
+     *
+     * <p>Currently this functionality is disabled in farrago/luciddb; the default implementation of
+     * this method does nothing.
+     *
+     * @param ordered Whether the relational expression must produce results in a particular order
+     *     (typically because it has an ORDER BY at top level)
+     * @param rootRel Relational expression that is at the root of the tree
+     * @return Trimmed relational expression
+     */
+    public RelNode trimUnusedFields(boolean ordered, RelNode rootRel) {
+        // Trim fields that are not used by their consumer.
+        if (isTrimUnusedFields()) {
+            final RelFieldTrimmer trimmer = newFieldTrimmer();
+            final List<RelCollation> collations =
+                    rootRel.getTraitSet().getTraits(RelCollationTraitDef.INSTANCE);
+            rootRel = trimmer.trim(rootRel);
+            if (!ordered
+                    && collations != null
+                    && !collations.isEmpty()
+                    && !collations.equals(
+                            com.google.common.collect.ImmutableList.of(RelCollations.EMPTY))) {
+                final RelTraitSet traitSet =
+                        rootRel.getTraitSet().replace(RelCollationTraitDef.INSTANCE, collations);
+                rootRel = rootRel.copy(traitSet, rootRel.getInputs());
+            }
+            if (SQL2REL_LOGGER.isDebugEnabled()) {
+                SQL2REL_LOGGER.debug(
+                        RelOptUtil.dumpPlan(
+                                "Plan after trimming unused fields",
+                                rootRel,
+                                SqlExplainFormat.TEXT,
+                                SqlExplainLevel.EXPPLAN_ATTRIBUTES));
+            }
+        }
+        return rootRel;
+    }
+
+    /**
+     * Creates a RelFieldTrimmer.
+     *
+     * @return Field trimmer
+     */
+    protected RelFieldTrimmer newFieldTrimmer() {
+        return new RelFieldTrimmer(validator, relBuilder);
+    }
+
+    /**
+     * Converts an unvalidated query's parse tree into a relational expression.
+     *
+     * @param query Query to convert
+     * @param needsValidation Whether to validate the query before converting; <code>false</code> if
+     *     the query has already been validated.
+     * @param top Whether the query is top-level, say if its result will become a JDBC result set;
+     *     <code>false</code> if the query will be part of a view.
+     */
+    public RelRoot convertQuery(SqlNode query, final boolean needsValidation, final boolean top) {
+        if (needsValidation) {
+            query = validator.validate(query);
+        }
+
+        RelNode result = convertQueryRecursive(query, top, null).rel;
+        if (top) {
+            if (isStream(query)) {
+                result = new LogicalDelta(cluster, result.getTraitSet(), result);
+            }
+        }
+        RelCollation collation = RelCollations.EMPTY;
+        if (!query.isA(SqlKind.DML)) {
+            if (isOrdered(query)) {
+                collation = requiredCollation(result);
+            }
+        }
+        checkConvertedType(query, result);
+
+        if (SQL2REL_LOGGER.isDebugEnabled()) {
+            SQL2REL_LOGGER.debug(
+                    RelOptUtil.dumpPlan(
+                            "Plan after converting SqlNode to RelNode",
+                            result,
+                            SqlExplainFormat.TEXT,
+                            SqlExplainLevel.EXPPLAN_ATTRIBUTES));
+        }
+
+        final RelDataType validatedRowType = validator.getValidatedNodeType(query);
+        List<RelHint> hints = new ArrayList<>();
+        if (query.getKind() == SqlKind.SELECT) {
+            final SqlSelect select = (SqlSelect) query;
+            if (select.hasHints()) {
+                hints = SqlUtil.getRelHint(hintStrategies, select.getHints());
+            }
+        }
+        // propagate the hints.
+        result = RelOptUtil.propagateRelHints(result, false);

Review Comment:
   Since we have copied the SqlToRelConverter class, we can just implement a new method in FlinkRelOptUtil. If that, RelOptUtil does not need be copied.



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalJoinRuleBase.scala:
##########
@@ -20,19 +20,42 @@ package org.apache.flink.table.planner.plan.rules.physical.batch
 import org.apache.flink.annotation.Experimental
 import org.apache.flink.configuration.ConfigOption
 import org.apache.flink.configuration.ConfigOptions.key
+import org.apache.flink.table.api.{TableConfig, TableException, ValidationException}
+import org.apache.flink.table.api.config.OptimizerConfigOptions
+import org.apache.flink.table.planner.{JArrayList, JDouble, JList}
+import org.apache.flink.table.planner.hint.JoinStrategy
 import org.apache.flink.table.planner.plan.nodes.FlinkConventions
 import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalLocalHashAggregate
-import org.apache.flink.table.planner.plan.utils.FlinkRelMdUtil
+import org.apache.flink.table.planner.plan.utils.{FlinkRelMdUtil, OperatorType}
 import org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTableConfig
+import org.apache.flink.table.planner.utils.TableConfigUtils.isOperatorDisabled
 
-import org.apache.calcite.plan.RelOptRule
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
 import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.core.{Join, JoinRelType}
 import org.apache.calcite.util.ImmutableBitSet
 
 import java.lang.{Boolean => JBoolean, Double => JDouble}
 
+import scala.collection.JavaConversions._
+
 trait BatchPhysicalJoinRuleBase {
 
+  protected def checkMatchJoinStrategy(
+      call: RelOptRuleCall,
+      joinStrategy: JoinStrategy): Boolean = {
+    val join: Join = call.rel(0)
+    val tableConfig = unwrapTableConfig(call)
+    val validJoinHints = collectValidJoinHints(join, tableConfig)
+    if (!validJoinHints.isEmpty) {
+      // if there are join hints, the first hint must be this one, otherwise it is invalid
+      validJoinHints.head == joinStrategy
+    } else {
+      // if there are no join hints, treat as non-join-hints
+      checkJoinStrategyValid(join, tableConfig, joinStrategy, withHint = false)._1

Review Comment:
   use named expression instead of `_1`, `_2`



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/alias/SubQueryAliasNodeClearShuttle.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.alias;
+
+import org.apache.flink.table.planner.hint.FlinkHints;
+import org.apache.flink.table.planner.plan.nodes.calcite.SubQueryAlias;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelShuttleImpl;
+import org.apache.calcite.rel.hint.Hintable;
+import org.apache.calcite.rel.hint.RelHint;
+import org.apache.calcite.rel.logical.LogicalFilter;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexShuttle;
+import org.apache.calcite.rex.RexSubQuery;
+
+import java.util.Collections;
+
+/** A shuttle to remove sub-query alias node and add the alias hint on the child node. */
+public class SubQueryAliasNodeClearShuttle extends RelShuttleImpl {
+    @Override
+    public RelNode visit(RelNode node) {
+        if (node instanceof SubQueryAlias) {
+            RelHint aliasTag =
+                    RelHint.builder(FlinkHints.HINT_ALIAS)
+                            .hintOption(((SubQueryAlias) node).getAliasName())
+                            .build();
+            RelNode newNode =
+                    ((Hintable) ((SubQueryAlias) node).getInput())
+                            .attachHints(Collections.singletonList(aliasTag));
+            return super.visit(newNode);
+        }
+
+        return super.visit(node);
+    }
+
+    @Override
+    public RelNode visit(LogicalFilter filter) {

Review Comment:
   The expression in LogicalProject and LogicalJoin may also be RexSubQuery



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/JoinStrategy.java:
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.hint;
+
+import org.apache.flink.table.api.ValidationException;
+
+import java.util.List;
+
+/** Currently available join strategies and corresponding join hint names. */
+public enum JoinStrategy {
+    /**
+     * Instructs the optimizer to use broadcast hash join strategy. If both sides are specified in
+     * this hint, the side that is first written will be broadcast.
+     */
+    BROADCAST("BROADCAST"),
+
+    /**
+     * Instructs the optimizer to use shuffle hash join strategy. If both sides are specified in
+     * this hint, the side that is first written will be treated as the build side.
+     */
+    SHUFFLE_HASH("SHUFFLE_HASH"),
+
+    /**
+     * Instructs the optimizer to use shuffle sort merge join strategy. As long as one of the side
+     * is specified in this hint, it will be tried.
+     */
+    SHUFFLE_MERGE("SHUFFLE_MERGE"),
+
+    /**
+     * Instructs the optimizer to use nest loop join strategy. If both sides are specified in this
+     * hint, the side that is first written will be treated as the build side.
+     */
+    NEST_LOOP("NEST_LOOP");
+
+    private final String joinHintName;
+
+    JoinStrategy(String joinHintName) {
+        this.joinHintName = joinHintName;
+    }
+
+    // ~ option name for join hint
+    public static final String LEFT_INPUT = "LEFT";
+    public static final String RIGHT_INPUT = "RIGHT";
+
+    public static JoinStrategy getJoinStrategy(String joinHintName) {
+        for (JoinStrategy joinStrategy : JoinStrategy.values()) {
+            if (joinStrategy.getJoinHintName().equalsIgnoreCase(joinHintName)) {
+                return joinStrategy;
+            }
+        }
+
+        throw new ValidationException(String.format("Unknown join hint : %s", joinHintName));
+    }
+
+    public static boolean isJoinStrategy(String hintName) {
+        try {
+            getJoinStrategy(hintName);
+            return true;
+        } catch (ValidationException e) {
+            return false;
+        }
+    }
+
+    public String getJoinHintName() {
+        return joinHintName;
+    }
+
+    public static boolean validOptions(String hintName, List<String> options) {
+        if (!isJoinStrategy(hintName)) {
+            return false;
+        }
+
+        JoinStrategy strategy = JoinStrategy.getJoinStrategy(hintName);

Review Comment:
   JoinStrategy.valueOf(hintName)



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalJoinRuleBase.scala:
##########
@@ -72,6 +95,231 @@ trait BatchPhysicalJoinRuleBase {
       rowCount * FlinkRelMdUtil.binaryRowAverageSize(relNode)
     }
   }
+
+  protected def collectValidJoinHints(join: Join, tableConfig: TableConfig): JList[JoinStrategy] = {
+    val allHints = join.getHints
+    val validHints = new JArrayList[JoinStrategy]
+
+    allHints.forEach(
+      relHint => {
+        val joinHint = JoinStrategy.getJoinStrategy(relHint.hintName)
+        if (checkJoinStrategyValid(join, tableConfig, joinHint, withHint = true)._1) {
+          validHints.add(joinHint)
+        }
+      })
+
+    validHints
+  }
+
+  /**
+   * Check whether the join strategy is valid.
+   *
+   * @param join
+   *   the join node
+   * @param tableConfig
+   *   the table config
+   * @param triedJoinStrategy
+   *   the join strategy checked
+   * @param withHint
+   *   whether this check is called with hint
+   * @return
+   *   an Tuple2 instance. The first element of tuple is true if join is valid, false else. The
+   *   second element of tuple is true if left side used as build side, false else.
+   */
+  def checkJoinStrategyValid(
+      join: Join,
+      tableConfig: TableConfig,
+      triedJoinStrategy: JoinStrategy,
+      withHint: Boolean): (Boolean, Boolean) = {
+
+    // TODO currently join hint is not supported with semi/anti join
+    if (withHint && !join.getJoinType.projectsRight()) {
+      return (false, false)
+    }
+
+    triedJoinStrategy match {
+      case JoinStrategy.BROADCAST =>
+        checkBroadcast(join, tableConfig, withHint)
+
+      case JoinStrategy.SHUFFLE_HASH =>
+        checkShuffleHash(join, tableConfig, withHint)
+
+      case JoinStrategy.SHUFFLE_MERGE =>
+        // for SortMergeJoin, there is no diff between with hint or without hint
+        // the second arg should be ignored
+        (checkSortMergeJoin(join, tableConfig), false)
+
+      case JoinStrategy.NEST_LOOP =>
+        checkNestLoopJoin(join, tableConfig, withHint)
+
+      case _ =>
+        throw new ValidationException("Unknown join strategy : " + triedJoinStrategy)
+    }
+  }
+
+  private def isEquivJoin(join: Join): Boolean = {
+    val joinInfo = join.analyzeCondition
+    !joinInfo.pairs().isEmpty
+  }
+
+  /**
+   * Decides whether the join can convert to BroadcastHashJoin.
+   *
+   * @param join
+   *   the join node
+   * @return
+   *   an Tuple2 instance. The first element of tuple is true if join can convert to broadcast hash
+   *   join, false else. The second element of tuple is true if left side used as broadcast side,
+   *   false else.
+   */
+  protected def checkBroadcast(
+      join: Join,
+      tableConfig: TableConfig,
+      withHint: Boolean): (Boolean, Boolean) = {
+
+    if (!isEquivJoin(join) || isOperatorDisabled(tableConfig, OperatorType.BroadcastHashJoin)) {
+      return (false, false)
+    }
+
+    val leftSize = binaryRowRelNodeSize(join.getLeft)
+    val rightSize = binaryRowRelNodeSize(join.getRight)
+
+    // if it is with hint, try best to use it and only check the join type
+    if (withHint) {
+      // BROADCAST use first arg as the broadcast side
+      val isLeftToBroadcastInHint =
+        getFirstArgInJoinHint(join, JoinStrategy.BROADCAST.getJoinHintName)
+          .equals(JoinStrategy.LEFT_INPUT)
+
+      join.getJoinType match {
+        // if left join, must broadcast right side
+        case JoinRelType.LEFT => (!isLeftToBroadcastInHint, false)
+        // if right join, must broadcast left side
+        case JoinRelType.RIGHT => (isLeftToBroadcastInHint, true)
+        case JoinRelType.FULL => (false, false)
+        case JoinRelType.INNER =>
+          (true, isLeftToBroadcastInHint)
+        case JoinRelType.SEMI | JoinRelType.ANTI =>
+          // TODO currently join hint is not supported with semi/anti join
+          (false, false)
+      }
+    } else {
+      // if it is not with hint, just check size of left and right side by statistic and config
+      // if leftSize or rightSize is unknown, cannot use broadcast
+      if (leftSize == null || rightSize == null) {
+        return (false, false)
+      }
+
+      val threshold =
+        tableConfig.get(OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD)
+
+      val rightSizeSmallerThanThreshold = rightSize <= threshold
+      val leftSizeSmallerThanThreshold = leftSize <= threshold
+      val leftSmallerThanRight = leftSize <= rightSize
+
+      join.getJoinType match {
+        case JoinRelType.LEFT => (rightSizeSmallerThanThreshold, false)
+        case JoinRelType.RIGHT => (leftSizeSmallerThanThreshold, true)
+        case JoinRelType.FULL => (false, false)
+        case JoinRelType.INNER =>
+          (
+            leftSizeSmallerThanThreshold
+              || rightSizeSmallerThanThreshold,
+            leftSmallerThanRight)
+        // left side cannot be used as build side in SEMI/ANTI join.
+        case JoinRelType.SEMI | JoinRelType.ANTI =>
+          (rightSizeSmallerThanThreshold, false)
+      }
+    }
+  }
+
+  protected def checkShuffleHash(
+      join: Join,
+      tableConfig: TableConfig,
+      withHint: Boolean): (Boolean, Boolean) = {

Review Comment:
   nit: withShuffleHashHint



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalJoinRuleBase.scala:
##########
@@ -72,6 +95,231 @@ trait BatchPhysicalJoinRuleBase {
       rowCount * FlinkRelMdUtil.binaryRowAverageSize(relNode)
     }
   }
+
+  protected def collectValidJoinHints(join: Join, tableConfig: TableConfig): JList[JoinStrategy] = {
+    val allHints = join.getHints
+    val validHints = new JArrayList[JoinStrategy]
+
+    allHints.forEach(
+      relHint => {
+        val joinHint = JoinStrategy.getJoinStrategy(relHint.hintName)
+        if (checkJoinStrategyValid(join, tableConfig, joinHint, withHint = true)._1) {
+          validHints.add(joinHint)
+        }
+      })
+
+    validHints
+  }
+
+  /**
+   * Check whether the join strategy is valid.
+   *
+   * @param join
+   *   the join node
+   * @param tableConfig
+   *   the table config
+   * @param triedJoinStrategy
+   *   the join strategy checked
+   * @param withHint
+   *   whether this check is called with hint
+   * @return
+   *   an Tuple2 instance. The first element of tuple is true if join is valid, false else. The
+   *   second element of tuple is true if left side used as build side, false else.
+   */
+  def checkJoinStrategyValid(
+      join: Join,
+      tableConfig: TableConfig,
+      triedJoinStrategy: JoinStrategy,
+      withHint: Boolean): (Boolean, Boolean) = {
+
+    // TODO currently join hint is not supported with semi/anti join
+    if (withHint && !join.getJoinType.projectsRight()) {
+      return (false, false)
+    }
+
+    triedJoinStrategy match {
+      case JoinStrategy.BROADCAST =>
+        checkBroadcast(join, tableConfig, withHint)
+
+      case JoinStrategy.SHUFFLE_HASH =>
+        checkShuffleHash(join, tableConfig, withHint)
+
+      case JoinStrategy.SHUFFLE_MERGE =>
+        // for SortMergeJoin, there is no diff between with hint or without hint
+        // the second arg should be ignored
+        (checkSortMergeJoin(join, tableConfig), false)
+
+      case JoinStrategy.NEST_LOOP =>
+        checkNestLoopJoin(join, tableConfig, withHint)
+
+      case _ =>
+        throw new ValidationException("Unknown join strategy : " + triedJoinStrategy)
+    }
+  }
+
+  private def isEquivJoin(join: Join): Boolean = {
+    val joinInfo = join.analyzeCondition
+    !joinInfo.pairs().isEmpty
+  }
+
+  /**
+   * Decides whether the join can convert to BroadcastHashJoin.
+   *
+   * @param join
+   *   the join node
+   * @return
+   *   an Tuple2 instance. The first element of tuple is true if join can convert to broadcast hash
+   *   join, false else. The second element of tuple is true if left side used as broadcast side,
+   *   false else.
+   */
+  protected def checkBroadcast(
+      join: Join,
+      tableConfig: TableConfig,
+      withHint: Boolean): (Boolean, Boolean) = {
+
+    if (!isEquivJoin(join) || isOperatorDisabled(tableConfig, OperatorType.BroadcastHashJoin)) {
+      return (false, false)
+    }
+
+    val leftSize = binaryRowRelNodeSize(join.getLeft)
+    val rightSize = binaryRowRelNodeSize(join.getRight)

Review Comment:
   leftSize and rightSize can be calculated in `else` branch



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalHashJoinRule.scala:
##########
@@ -89,19 +71,49 @@ class BatchPhysicalHashJoinRule
       case _ => (join.getRight, false)
     }
 
-    val leftSize = binaryRowRelNodeSize(left)
-    val rightSize = binaryRowRelNodeSize(right)
-
-    val (isBroadcast, leftIsBroadcast) = canBroadcast(joinType, leftSize, rightSize, tableConfig)
+    var isBroadcast = false
+
+    var isLeftToBroadcast = false
+    var isLeftToBuild = false
+
+    val validJoinHints = collectValidJoinHints(join, tableConfig)
+    if (

Review Comment:
   why not:
   if (validJoinHints.nonEmpty) {
     if ((validJoinHints.head.equals(JoinStrategy.BROADCAST)
           || validJoinHints.head.equals(JoinStrategy.SHUFFLE_HASH)) {
      // ...
    } else {
     throw new TableException(...);
    }
   } else {
     
   }



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalJoinRuleBase.scala:
##########
@@ -72,6 +95,231 @@ trait BatchPhysicalJoinRuleBase {
       rowCount * FlinkRelMdUtil.binaryRowAverageSize(relNode)
     }
   }
+
+  protected def collectValidJoinHints(join: Join, tableConfig: TableConfig): JList[JoinStrategy] = {
+    val allHints = join.getHints
+    val validHints = new JArrayList[JoinStrategy]
+
+    allHints.forEach(
+      relHint => {
+        val joinHint = JoinStrategy.getJoinStrategy(relHint.hintName)
+        if (checkJoinStrategyValid(join, tableConfig, joinHint, withHint = true)._1) {
+          validHints.add(joinHint)
+        }
+      })
+
+    validHints
+  }
+
+  /**
+   * Check whether the join strategy is valid.
+   *
+   * @param join
+   *   the join node
+   * @param tableConfig
+   *   the table config
+   * @param triedJoinStrategy
+   *   the join strategy checked
+   * @param withHint
+   *   whether this check is called with hint
+   * @return
+   *   an Tuple2 instance. The first element of tuple is true if join is valid, false else. The
+   *   second element of tuple is true if left side used as build side, false else.
+   */
+  def checkJoinStrategyValid(
+      join: Join,
+      tableConfig: TableConfig,
+      triedJoinStrategy: JoinStrategy,
+      withHint: Boolean): (Boolean, Boolean) = {
+
+    // TODO currently join hint is not supported with semi/anti join
+    if (withHint && !join.getJoinType.projectsRight()) {
+      return (false, false)
+    }
+
+    triedJoinStrategy match {
+      case JoinStrategy.BROADCAST =>
+        checkBroadcast(join, tableConfig, withHint)
+
+      case JoinStrategy.SHUFFLE_HASH =>
+        checkShuffleHash(join, tableConfig, withHint)
+
+      case JoinStrategy.SHUFFLE_MERGE =>
+        // for SortMergeJoin, there is no diff between with hint or without hint
+        // the second arg should be ignored
+        (checkSortMergeJoin(join, tableConfig), false)
+
+      case JoinStrategy.NEST_LOOP =>
+        checkNestLoopJoin(join, tableConfig, withHint)
+
+      case _ =>
+        throw new ValidationException("Unknown join strategy : " + triedJoinStrategy)
+    }
+  }
+
+  private def isEquivJoin(join: Join): Boolean = {
+    val joinInfo = join.analyzeCondition
+    !joinInfo.pairs().isEmpty
+  }
+
+  /**
+   * Decides whether the join can convert to BroadcastHashJoin.
+   *
+   * @param join
+   *   the join node
+   * @return
+   *   an Tuple2 instance. The first element of tuple is true if join can convert to broadcast hash
+   *   join, false else. The second element of tuple is true if left side used as broadcast side,
+   *   false else.
+   */
+  protected def checkBroadcast(
+      join: Join,
+      tableConfig: TableConfig,
+      withHint: Boolean): (Boolean, Boolean) = {
+
+    if (!isEquivJoin(join) || isOperatorDisabled(tableConfig, OperatorType.BroadcastHashJoin)) {
+      return (false, false)
+    }
+
+    val leftSize = binaryRowRelNodeSize(join.getLeft)
+    val rightSize = binaryRowRelNodeSize(join.getRight)
+
+    // if it is with hint, try best to use it and only check the join type
+    if (withHint) {
+      // BROADCAST use first arg as the broadcast side
+      val isLeftToBroadcastInHint =
+        getFirstArgInJoinHint(join, JoinStrategy.BROADCAST.getJoinHintName)
+          .equals(JoinStrategy.LEFT_INPUT)
+
+      join.getJoinType match {
+        // if left join, must broadcast right side
+        case JoinRelType.LEFT => (!isLeftToBroadcastInHint, false)
+        // if right join, must broadcast left side
+        case JoinRelType.RIGHT => (isLeftToBroadcastInHint, true)
+        case JoinRelType.FULL => (false, false)
+        case JoinRelType.INNER =>
+          (true, isLeftToBroadcastInHint)
+        case JoinRelType.SEMI | JoinRelType.ANTI =>
+          // TODO currently join hint is not supported with semi/anti join
+          (false, false)
+      }
+    } else {
+      // if it is not with hint, just check size of left and right side by statistic and config
+      // if leftSize or rightSize is unknown, cannot use broadcast
+      if (leftSize == null || rightSize == null) {
+        return (false, false)
+      }
+
+      val threshold =
+        tableConfig.get(OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD)
+
+      val rightSizeSmallerThanThreshold = rightSize <= threshold
+      val leftSizeSmallerThanThreshold = leftSize <= threshold
+      val leftSmallerThanRight = leftSize <= rightSize
+
+      join.getJoinType match {
+        case JoinRelType.LEFT => (rightSizeSmallerThanThreshold, false)
+        case JoinRelType.RIGHT => (leftSizeSmallerThanThreshold, true)
+        case JoinRelType.FULL => (false, false)
+        case JoinRelType.INNER =>
+          (
+            leftSizeSmallerThanThreshold
+              || rightSizeSmallerThanThreshold,
+            leftSmallerThanRight)
+        // left side cannot be used as build side in SEMI/ANTI join.
+        case JoinRelType.SEMI | JoinRelType.ANTI =>
+          (rightSizeSmallerThanThreshold, false)
+      }
+    }
+  }
+
+  protected def checkShuffleHash(
+      join: Join,
+      tableConfig: TableConfig,
+      withHint: Boolean): (Boolean, Boolean) = {
+    if (!isEquivJoin(join) || isOperatorDisabled(tableConfig, OperatorType.ShuffleHashJoin)) {
+      return (false, false)
+    }
+    if (!withHint) {
+      val leftSize = binaryRowRelNodeSize(join.getLeft)
+      val rightSize = binaryRowRelNodeSize(join.getRight)
+      val leftIsBuild = if (leftSize == null || rightSize == null || leftSize == rightSize) {
+        // use left to build hash table if leftSize or rightSize is unknown or equal size.
+        // choose right to build if join is SEMI/ANTI.
+        !join.getJoinType.projectsRight
+      } else {
+        leftSize < rightSize
+      }
+      (true, leftIsBuild)
+    } else {
+      val isLeftToBuild = getFirstArgInJoinHint(join, JoinStrategy.SHUFFLE_HASH.getJoinHintName)
+        .equals(JoinStrategy.LEFT_INPUT)
+      (true, isLeftToBuild)
+    }
+  }
+
+  // the sort merge join doesn't distinct the build side
+  protected def checkSortMergeJoin(join: Join, tableConfig: TableConfig): Boolean = {
+    if (!isEquivJoin(join) || isOperatorDisabled(tableConfig, OperatorType.ShuffleHashJoin)) {
+      false
+    } else {
+      true
+    }
+  }
+
+  protected def checkNestLoopJoin(
+      join: Join,
+      tableConfig: TableConfig,
+      withHint: Boolean): (Boolean, Boolean) = {
+
+    if (isOperatorDisabled(tableConfig, OperatorType.NestedLoopJoin)) {
+      return (false, false)
+    }
+
+    val isLeftToBuild = if (!withHint) {

Review Comment:
   `if (!withHint)` is clearer



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalJoinRuleBase.scala:
##########
@@ -72,6 +95,231 @@ trait BatchPhysicalJoinRuleBase {
       rowCount * FlinkRelMdUtil.binaryRowAverageSize(relNode)
     }
   }
+
+  protected def collectValidJoinHints(join: Join, tableConfig: TableConfig): JList[JoinStrategy] = {
+    val allHints = join.getHints
+    val validHints = new JArrayList[JoinStrategy]
+
+    allHints.forEach(
+      relHint => {
+        val joinHint = JoinStrategy.getJoinStrategy(relHint.hintName)
+        if (checkJoinStrategyValid(join, tableConfig, joinHint, withHint = true)._1) {
+          validHints.add(joinHint)
+        }
+      })
+
+    validHints
+  }
+
+  /**
+   * Check whether the join strategy is valid.
+   *
+   * @param join
+   *   the join node
+   * @param tableConfig
+   *   the table config
+   * @param triedJoinStrategy
+   *   the join strategy checked
+   * @param withHint
+   *   whether this check is called with hint
+   * @return
+   *   an Tuple2 instance. The first element of tuple is true if join is valid, false else. The
+   *   second element of tuple is true if left side used as build side, false else.
+   */
+  def checkJoinStrategyValid(
+      join: Join,
+      tableConfig: TableConfig,
+      triedJoinStrategy: JoinStrategy,
+      withHint: Boolean): (Boolean, Boolean) = {
+
+    // TODO currently join hint is not supported with semi/anti join
+    if (withHint && !join.getJoinType.projectsRight()) {
+      return (false, false)
+    }
+
+    triedJoinStrategy match {
+      case JoinStrategy.BROADCAST =>
+        checkBroadcast(join, tableConfig, withHint)
+
+      case JoinStrategy.SHUFFLE_HASH =>
+        checkShuffleHash(join, tableConfig, withHint)
+
+      case JoinStrategy.SHUFFLE_MERGE =>
+        // for SortMergeJoin, there is no diff between with hint or without hint
+        // the second arg should be ignored
+        (checkSortMergeJoin(join, tableConfig), false)
+
+      case JoinStrategy.NEST_LOOP =>
+        checkNestLoopJoin(join, tableConfig, withHint)
+
+      case _ =>
+        throw new ValidationException("Unknown join strategy : " + triedJoinStrategy)
+    }
+  }
+
+  private def isEquivJoin(join: Join): Boolean = {
+    val joinInfo = join.analyzeCondition
+    !joinInfo.pairs().isEmpty
+  }
+
+  /**
+   * Decides whether the join can convert to BroadcastHashJoin.
+   *
+   * @param join
+   *   the join node
+   * @return
+   *   an Tuple2 instance. The first element of tuple is true if join can convert to broadcast hash
+   *   join, false else. The second element of tuple is true if left side used as broadcast side,
+   *   false else.
+   */
+  protected def checkBroadcast(
+      join: Join,
+      tableConfig: TableConfig,
+      withHint: Boolean): (Boolean, Boolean) = {
+
+    if (!isEquivJoin(join) || isOperatorDisabled(tableConfig, OperatorType.BroadcastHashJoin)) {
+      return (false, false)
+    }
+
+    val leftSize = binaryRowRelNodeSize(join.getLeft)
+    val rightSize = binaryRowRelNodeSize(join.getRight)
+
+    // if it is with hint, try best to use it and only check the join type
+    if (withHint) {
+      // BROADCAST use first arg as the broadcast side
+      val isLeftToBroadcastInHint =
+        getFirstArgInJoinHint(join, JoinStrategy.BROADCAST.getJoinHintName)
+          .equals(JoinStrategy.LEFT_INPUT)
+
+      join.getJoinType match {
+        // if left join, must broadcast right side
+        case JoinRelType.LEFT => (!isLeftToBroadcastInHint, false)
+        // if right join, must broadcast left side
+        case JoinRelType.RIGHT => (isLeftToBroadcastInHint, true)
+        case JoinRelType.FULL => (false, false)
+        case JoinRelType.INNER =>
+          (true, isLeftToBroadcastInHint)
+        case JoinRelType.SEMI | JoinRelType.ANTI =>
+          // TODO currently join hint is not supported with semi/anti join
+          (false, false)
+      }
+    } else {
+      // if it is not with hint, just check size of left and right side by statistic and config
+      // if leftSize or rightSize is unknown, cannot use broadcast
+      if (leftSize == null || rightSize == null) {
+        return (false, false)
+      }
+
+      val threshold =
+        tableConfig.get(OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD)
+
+      val rightSizeSmallerThanThreshold = rightSize <= threshold
+      val leftSizeSmallerThanThreshold = leftSize <= threshold
+      val leftSmallerThanRight = leftSize <= rightSize
+
+      join.getJoinType match {
+        case JoinRelType.LEFT => (rightSizeSmallerThanThreshold, false)
+        case JoinRelType.RIGHT => (leftSizeSmallerThanThreshold, true)
+        case JoinRelType.FULL => (false, false)
+        case JoinRelType.INNER =>
+          (
+            leftSizeSmallerThanThreshold
+              || rightSizeSmallerThanThreshold,
+            leftSmallerThanRight)
+        // left side cannot be used as build side in SEMI/ANTI join.
+        case JoinRelType.SEMI | JoinRelType.ANTI =>
+          (rightSizeSmallerThanThreshold, false)
+      }
+    }
+  }
+
+  protected def checkShuffleHash(
+      join: Join,
+      tableConfig: TableConfig,
+      withHint: Boolean): (Boolean, Boolean) = {
+    if (!isEquivJoin(join) || isOperatorDisabled(tableConfig, OperatorType.ShuffleHashJoin)) {
+      return (false, false)
+    }
+    if (!withHint) {
+      val leftSize = binaryRowRelNodeSize(join.getLeft)
+      val rightSize = binaryRowRelNodeSize(join.getRight)
+      val leftIsBuild = if (leftSize == null || rightSize == null || leftSize == rightSize) {
+        // use left to build hash table if leftSize or rightSize is unknown or equal size.
+        // choose right to build if join is SEMI/ANTI.
+        !join.getJoinType.projectsRight
+      } else {
+        leftSize < rightSize
+      }
+      (true, leftIsBuild)
+    } else {
+      val isLeftToBuild = getFirstArgInJoinHint(join, JoinStrategy.SHUFFLE_HASH.getJoinHintName)
+        .equals(JoinStrategy.LEFT_INPUT)
+      (true, isLeftToBuild)
+    }
+  }
+
+  // the sort merge join doesn't distinct the build side
+  protected def checkSortMergeJoin(join: Join, tableConfig: TableConfig): Boolean = {
+    if (!isEquivJoin(join) || isOperatorDisabled(tableConfig, OperatorType.ShuffleHashJoin)) {

Review Comment:
   Wrong operator type: ShuffleHashJoin



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalJoinRuleBase.scala:
##########
@@ -72,6 +95,231 @@ trait BatchPhysicalJoinRuleBase {
       rowCount * FlinkRelMdUtil.binaryRowAverageSize(relNode)
     }
   }
+
+  protected def collectValidJoinHints(join: Join, tableConfig: TableConfig): JList[JoinStrategy] = {
+    val allHints = join.getHints
+    val validHints = new JArrayList[JoinStrategy]
+
+    allHints.forEach(
+      relHint => {
+        val joinHint = JoinStrategy.getJoinStrategy(relHint.hintName)

Review Comment:
   why not we just find the first valid strategy and return



##########
flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java:
##########
@@ -0,0 +1,5983 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.calcite.sql2rel;
+
+import org.apache.flink.table.planner.alias.SubQueryAliasNodeClearShuttle;
+import org.apache.flink.table.planner.plan.nodes.calcite.LogicalSubQueryAlias;
+
+import org.apache.calcite.avatica.util.Spaces;
+import org.apache.calcite.linq4j.Ord;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptSamplingParameters;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.plan.ViewExpanders;
+import org.apache.calcite.prepare.Prepare;
+import org.apache.calcite.prepare.RelOptTableImpl;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelCollationTraitDef;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelRoot;
+import org.apache.calcite.rel.RelShuttleImpl;
+import org.apache.calcite.rel.SingleRel;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.core.Collect;
+import org.apache.calcite.rel.core.CorrelationId;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.JoinInfo;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.RelFactories;
+import org.apache.calcite.rel.core.Sample;
+import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rel.hint.HintStrategyTable;
+import org.apache.calcite.rel.hint.Hintable;
+import org.apache.calcite.rel.hint.RelHint;
+import org.apache.calcite.rel.logical.LogicalAggregate;
+import org.apache.calcite.rel.logical.LogicalCorrelate;
+import org.apache.calcite.rel.logical.LogicalFilter;
+import org.apache.calcite.rel.logical.LogicalIntersect;
+import org.apache.calcite.rel.logical.LogicalJoin;
+import org.apache.calcite.rel.logical.LogicalMatch;
+import org.apache.calcite.rel.logical.LogicalMinus;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rel.logical.LogicalSort;
+import org.apache.calcite.rel.logical.LogicalTableFunctionScan;
+import org.apache.calcite.rel.logical.LogicalTableModify;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.calcite.rel.logical.LogicalUnion;
+import org.apache.calcite.rel.logical.LogicalValues;
+import org.apache.calcite.rel.metadata.RelColumnMapping;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.stream.Delta;
+import org.apache.calcite.rel.stream.LogicalDelta;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexCallBinding;
+import org.apache.calcite.rex.RexCorrelVariable;
+import org.apache.calcite.rex.RexDynamicParam;
+import org.apache.calcite.rex.RexFieldAccess;
+import org.apache.calcite.rex.RexFieldCollation;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexPatternFieldRef;
+import org.apache.calcite.rex.RexRangeRef;
+import org.apache.calcite.rex.RexShuttle;
+import org.apache.calcite.rex.RexSubQuery;
+import org.apache.calcite.rex.RexUtil;
+import org.apache.calcite.rex.RexWindowBound;
+import org.apache.calcite.rex.RexWindowBounds;
+import org.apache.calcite.schema.ColumnStrategy;
+import org.apache.calcite.schema.ModifiableTable;
+import org.apache.calcite.schema.ModifiableView;
+import org.apache.calcite.schema.Table;
+import org.apache.calcite.schema.TranslatableTable;
+import org.apache.calcite.schema.Wrapper;
+import org.apache.calcite.sql.JoinConditionType;
+import org.apache.calcite.sql.JoinType;
+import org.apache.calcite.sql.SqlAggFunction;
+import org.apache.calcite.sql.SqlBasicCall;
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlCallBinding;
+import org.apache.calcite.sql.SqlDataTypeSpec;
+import org.apache.calcite.sql.SqlDelete;
+import org.apache.calcite.sql.SqlDynamicParam;
+import org.apache.calcite.sql.SqlExplainFormat;
+import org.apache.calcite.sql.SqlExplainLevel;
+import org.apache.calcite.sql.SqlFunction;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlInsert;
+import org.apache.calcite.sql.SqlIntervalQualifier;
+import org.apache.calcite.sql.SqlJoin;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.SqlMatchRecognize;
+import org.apache.calcite.sql.SqlMerge;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlNumericLiteral;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlOperatorTable;
+import org.apache.calcite.sql.SqlOrderBy;
+import org.apache.calcite.sql.SqlPivot;
+import org.apache.calcite.sql.SqlSampleSpec;
+import org.apache.calcite.sql.SqlSelect;
+import org.apache.calcite.sql.SqlSelectKeyword;
+import org.apache.calcite.sql.SqlSetOperator;
+import org.apache.calcite.sql.SqlSnapshot;
+import org.apache.calcite.sql.SqlUnnestOperator;
+import org.apache.calcite.sql.SqlUpdate;
+import org.apache.calcite.sql.SqlUtil;
+import org.apache.calcite.sql.SqlValuesOperator;
+import org.apache.calcite.sql.SqlWindow;
+import org.apache.calcite.sql.SqlWith;
+import org.apache.calcite.sql.SqlWithItem;
+import org.apache.calcite.sql.fun.SqlCase;
+import org.apache.calcite.sql.fun.SqlCountAggFunction;
+import org.apache.calcite.sql.fun.SqlInOperator;
+import org.apache.calcite.sql.fun.SqlQuantifyOperator;
+import org.apache.calcite.sql.fun.SqlRowOperator;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.sql.type.SqlReturnTypeInference;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.type.SqlTypeUtil;
+import org.apache.calcite.sql.type.TableFunctionReturnTypeInference;
+import org.apache.calcite.sql.util.SqlBasicVisitor;
+import org.apache.calcite.sql.util.SqlVisitor;
+import org.apache.calcite.sql.validate.AggregatingSelectScope;
+import org.apache.calcite.sql.validate.CollectNamespace;
+import org.apache.calcite.sql.validate.DelegatingScope;
+import org.apache.calcite.sql.validate.ListScope;
+import org.apache.calcite.sql.validate.MatchRecognizeScope;
+import org.apache.calcite.sql.validate.ParameterScope;
+import org.apache.calcite.sql.validate.SelectScope;
+import org.apache.calcite.sql.validate.SqlMonotonicity;
+import org.apache.calcite.sql.validate.SqlNameMatcher;
+import org.apache.calcite.sql.validate.SqlQualified;
+import org.apache.calcite.sql.validate.SqlUserDefinedTableFunction;
+import org.apache.calcite.sql.validate.SqlUserDefinedTableMacro;
+import org.apache.calcite.sql.validate.SqlValidator;
+import org.apache.calcite.sql.validate.SqlValidatorImpl;
+import org.apache.calcite.sql.validate.SqlValidatorNamespace;
+import org.apache.calcite.sql.validate.SqlValidatorScope;
+import org.apache.calcite.sql.validate.SqlValidatorTable;
+import org.apache.calcite.sql.validate.SqlValidatorUtil;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.calcite.tools.RelBuilderFactory;
+import org.apache.calcite.util.ImmutableBeans;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.calcite.util.Litmus;
+import org.apache.calcite.util.NlsString;
+import org.apache.calcite.util.NumberUtil;
+import org.apache.calcite.util.Pair;
+import org.apache.calcite.util.Util;
+import org.apache.calcite.util.trace.CalciteTrace;
+import org.slf4j.Logger;
+
+import javax.annotation.Nonnull;
+
+import java.lang.reflect.Type;
+import java.math.BigDecimal;
+import java.util.AbstractList;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.function.Supplier;
+import java.util.function.UnaryOperator;
+import java.util.stream.Collectors;
+
+import static org.apache.calcite.sql.SqlUtil.stripAs;
+
+/**
+ * Converts a SQL parse tree (consisting of {@link SqlNode} objects) into a relational algebra
+ * expression (consisting of {@link RelNode} objects).
+ *
+ * <p>The public entry points are: {@link #convertQuery}, {@link #convertExpression(SqlNode)}.
+ */
+@SuppressWarnings("UnstableApiUsage")
+public class SqlToRelConverter {
+    // ~ Static fields/initializers ---------------------------------------------
+
+    /** Default configuration. */
+    private static final Config CONFIG =
+            ImmutableBeans.create(Config.class)
+                    .withRelBuilderFactory(RelFactories.LOGICAL_BUILDER)
+                    .withRelBuilderConfigTransform(c -> c.withPushJoinCondition(true))
+                    .withHintStrategyTable(HintStrategyTable.EMPTY);
+
+    protected static final Logger SQL2REL_LOGGER = CalciteTrace.getSqlToRelTracer();
+
+    /** Size of the smallest IN list that will be converted to a semijoin to a static table. */
+    public static final int DEFAULT_IN_SUB_QUERY_THRESHOLD = 20;
+
+    @Deprecated // to be removed before 2.0
+    public static final int DEFAULT_IN_SUBQUERY_THRESHOLD = DEFAULT_IN_SUB_QUERY_THRESHOLD;
+
+    // ~ Instance fields --------------------------------------------------------
+
+    protected final SqlValidator validator;
+    protected final RexBuilder rexBuilder;
+    protected final Prepare.CatalogReader catalogReader;
+    protected final RelOptCluster cluster;
+    private SubQueryConverter subQueryConverter;
+    protected final Map<RelNode, Integer> leaves = new HashMap<>();
+    private final List<SqlDynamicParam> dynamicParamSqlNodes = new ArrayList<>();
+    private final SqlOperatorTable opTab;
+    protected final RelDataTypeFactory typeFactory;
+    private final SqlNodeToRexConverter exprConverter;
+    private final HintStrategyTable hintStrategies;
+    private int explainParamCount;
+    public final Config config;
+    private final RelBuilder relBuilder;
+
+    /** Fields used in name resolution for correlated sub-queries. */
+    private final Map<CorrelationId, DeferredLookup> mapCorrelToDeferred = new HashMap<>();
+
+    /**
+     * Stack of names of datasets requested by the <code>
+     * TABLE(SAMPLE(&lt;datasetName&gt;, &lt;query&gt;))</code> construct.
+     */
+    private final Deque<String> datasetStack = new ArrayDeque<>();
+
+    /**
+     * Mapping of non-correlated sub-queries that have been converted to their equivalent constants.
+     * Used to avoid re-evaluating the sub-query if it's already been evaluated.
+     */
+    private final Map<SqlNode, RexNode> mapConvertedNonCorrSubqs = new HashMap<>();
+
+    public final RelOptTable.ViewExpander viewExpander;
+
+    // ~ Constructors -----------------------------------------------------------
+    /**
+     * Creates a converter.
+     *
+     * @param viewExpander Preparing statement
+     * @param validator Validator
+     * @param catalogReader Schema
+     * @param planner Planner
+     * @param rexBuilder Rex builder
+     * @param convertletTable Expression converter
+     */
+    @Deprecated // to be removed before 2.0
+    public SqlToRelConverter(
+            RelOptTable.ViewExpander viewExpander,
+            SqlValidator validator,
+            Prepare.CatalogReader catalogReader,
+            RelOptPlanner planner,
+            RexBuilder rexBuilder,
+            SqlRexConvertletTable convertletTable) {
+        this(
+                viewExpander,
+                validator,
+                catalogReader,
+                RelOptCluster.create(planner, rexBuilder),
+                convertletTable,
+                SqlToRelConverter.config());
+    }
+
+    @Deprecated // to be removed before 2.0
+    public SqlToRelConverter(
+            RelOptTable.ViewExpander viewExpander,
+            SqlValidator validator,
+            Prepare.CatalogReader catalogReader,
+            RelOptCluster cluster,
+            SqlRexConvertletTable convertletTable) {
+        this(
+                viewExpander,
+                validator,
+                catalogReader,
+                cluster,
+                convertletTable,
+                SqlToRelConverter.config());
+    }
+
+    /* Creates a converter. */
+    public SqlToRelConverter(
+            RelOptTable.ViewExpander viewExpander,
+            SqlValidator validator,
+            Prepare.CatalogReader catalogReader,
+            RelOptCluster cluster,
+            SqlRexConvertletTable convertletTable,
+            Config config) {
+        this.viewExpander = viewExpander;
+        this.opTab =
+                (validator == null) ? SqlStdOperatorTable.instance() : validator.getOperatorTable();
+        this.validator = validator;
+        this.catalogReader = catalogReader;
+        this.subQueryConverter = new NoOpSubQueryConverter();
+        this.rexBuilder = cluster.getRexBuilder();
+        this.typeFactory = rexBuilder.getTypeFactory();
+        this.exprConverter = new SqlNodeToRexConverterImpl(convertletTable);
+        this.explainParamCount = 0;
+        this.config = Objects.requireNonNull(config);
+        this.relBuilder =
+                config.getRelBuilderFactory()
+                        .create(cluster, null)
+                        .transform(config.getRelBuilderConfigTransform());
+        this.hintStrategies = config.getHintStrategyTable();
+
+        cluster.setHintStrategies(this.hintStrategies);
+        this.cluster = Objects.requireNonNull(cluster);
+    }
+
+    // ~ Methods ----------------------------------------------------------------
+
+    /** Returns the RelOptCluster in use. */
+    public RelOptCluster getCluster() {
+        return cluster;
+    }
+
+    /** Returns the row-expression builder. */
+    public RexBuilder getRexBuilder() {
+        return rexBuilder;
+    }
+
+    /**
+     * Returns the number of dynamic parameters encountered during translation; this must only be
+     * called after {@link #convertQuery}.
+     *
+     * @return number of dynamic parameters
+     */
+    public int getDynamicParamCount() {
+        return dynamicParamSqlNodes.size();
+    }
+
+    /**
+     * Returns the type inferred for a dynamic parameter.
+     *
+     * @param index 0-based index of dynamic parameter
+     * @return inferred type, never null
+     */
+    public RelDataType getDynamicParamType(int index) {
+        SqlNode sqlNode = dynamicParamSqlNodes.get(index);
+        if (sqlNode == null) {
+            throw Util.needToImplement("dynamic param type inference");
+        }
+        return validator.getValidatedNodeType(sqlNode);
+    }
+
+    /**
+     * Returns the current count of the number of dynamic parameters in an EXPLAIN PLAN statement.
+     *
+     * @param increment if true, increment the count
+     * @return the current count before the optional increment
+     */
+    public int getDynamicParamCountInExplain(boolean increment) {
+        int retVal = explainParamCount;
+        if (increment) {
+            ++explainParamCount;
+        }
+        return retVal;
+    }
+
+    /**
+     * Returns the mapping of non-correlated sub-queries that have been converted to the constants
+     * that they evaluate to.
+     */
+    public Map<SqlNode, RexNode> getMapConvertedNonCorrSubqs() {
+        return mapConvertedNonCorrSubqs;
+    }
+
+    /**
+     * Adds to the current map of non-correlated converted sub-queries the elements from another map
+     * that contains non-correlated sub-queries that have been converted by another
+     * SqlToRelConverter.
+     *
+     * @param alreadyConvertedNonCorrSubqs the other map
+     */
+    public void addConvertedNonCorrSubqs(Map<SqlNode, RexNode> alreadyConvertedNonCorrSubqs) {
+        mapConvertedNonCorrSubqs.putAll(alreadyConvertedNonCorrSubqs);
+    }
+
+    /**
+     * Sets a new SubQueryConverter. To have any effect, this must be called before any convert
+     * method.
+     *
+     * @param converter new SubQueryConverter
+     */
+    public void setSubQueryConverter(SubQueryConverter converter) {
+        subQueryConverter = converter;
+    }
+
+    /**
+     * Sets the number of dynamic parameters in the current EXPLAIN PLAN statement.
+     *
+     * @param explainParamCount number of dynamic parameters in the statement
+     */
+    public void setDynamicParamCountInExplain(int explainParamCount) {
+        assert config.isExplain();
+        this.explainParamCount = explainParamCount;
+    }
+
+    private void checkConvertedType(SqlNode query, RelNode result) {
+        if (query.isA(SqlKind.DML)) {
+            return;
+        }
+        // Verify that conversion from SQL to relational algebra did
+        // not perturb any type information.  (We can't do this if the
+        // SQL statement is something like an INSERT which has no
+        // validator type information associated with its result,
+        // hence the namespace check above.)
+        final List<RelDataTypeField> validatedFields =
+                validator.getValidatedNodeType(query).getFieldList();
+        final RelDataType validatedRowType =
+                validator
+                        .getTypeFactory()
+                        .createStructType(
+                                Pair.right(validatedFields),
+                                SqlValidatorUtil.uniquify(
+                                        Pair.left(validatedFields),
+                                        catalogReader.nameMatcher().isCaseSensitive()));
+
+        final List<RelDataTypeField> convertedFields =
+                result.getRowType().getFieldList().subList(0, validatedFields.size());
+        final RelDataType convertedRowType =
+                validator.getTypeFactory().createStructType(convertedFields);
+
+        if (!RelOptUtil.equal(
+                "validated row type",
+                validatedRowType,
+                "converted row type",
+                convertedRowType,
+                Litmus.IGNORE)) {
+            throw new AssertionError(
+                    "Conversion to relational algebra failed to "
+                            + "preserve datatypes:\n"
+                            + "validated type:\n"
+                            + validatedRowType.getFullTypeString()
+                            + "\nconverted type:\n"
+                            + convertedRowType.getFullTypeString()
+                            + "\nrel:\n"
+                            + RelOptUtil.toString(result));
+        }
+    }
+
+    public RelNode flattenTypes(RelNode rootRel, boolean restructure) {
+        RelStructuredTypeFlattener typeFlattener =
+                new RelStructuredTypeFlattener(
+                        relBuilder,
+                        rexBuilder,
+                        createToRelContext(com.google.common.collect.ImmutableList.of()),
+                        restructure);
+        return typeFlattener.rewrite(rootRel);
+    }
+
+    /**
+     * If sub-query is correlated and decorrelation is enabled, performs decorrelation.
+     *
+     * @param query Query
+     * @param rootRel Root relational expression
+     * @return New root relational expression after decorrelation
+     */
+    public RelNode decorrelate(SqlNode query, RelNode rootRel) {
+        if (!config.isDecorrelationEnabled()) {
+            return rootRel;
+        }
+        final RelNode result = decorrelateQuery(rootRel);
+        if (result != rootRel) {
+            checkConvertedType(query, result);
+        }
+        return result;
+    }
+
+    /**
+     * Walks over a tree of relational expressions, replacing each {@link RelNode} with a 'slimmed
+     * down' relational expression that projects only the fields required by its consumer.
+     *
+     * <p>This may make things easier for the optimizer, by removing crud that would expand the
+     * search space, but is difficult for the optimizer itself to do it, because optimizer rules
+     * must preserve the number and type of fields. Hence, this transform that operates on the
+     * entire tree, similar to the {@link RelStructuredTypeFlattener type-flattening transform}.
+     *
+     * <p>Currently this functionality is disabled in farrago/luciddb; the default implementation of
+     * this method does nothing.
+     *
+     * @param ordered Whether the relational expression must produce results in a particular order
+     *     (typically because it has an ORDER BY at top level)
+     * @param rootRel Relational expression that is at the root of the tree
+     * @return Trimmed relational expression
+     */
+    public RelNode trimUnusedFields(boolean ordered, RelNode rootRel) {
+        // Trim fields that are not used by their consumer.
+        if (isTrimUnusedFields()) {
+            final RelFieldTrimmer trimmer = newFieldTrimmer();
+            final List<RelCollation> collations =
+                    rootRel.getTraitSet().getTraits(RelCollationTraitDef.INSTANCE);
+            rootRel = trimmer.trim(rootRel);
+            if (!ordered
+                    && collations != null
+                    && !collations.isEmpty()
+                    && !collations.equals(
+                            com.google.common.collect.ImmutableList.of(RelCollations.EMPTY))) {
+                final RelTraitSet traitSet =
+                        rootRel.getTraitSet().replace(RelCollationTraitDef.INSTANCE, collations);
+                rootRel = rootRel.copy(traitSet, rootRel.getInputs());
+            }
+            if (SQL2REL_LOGGER.isDebugEnabled()) {
+                SQL2REL_LOGGER.debug(
+                        RelOptUtil.dumpPlan(
+                                "Plan after trimming unused fields",
+                                rootRel,
+                                SqlExplainFormat.TEXT,
+                                SqlExplainLevel.EXPPLAN_ATTRIBUTES));
+            }
+        }
+        return rootRel;
+    }
+
+    /**
+     * Creates a RelFieldTrimmer.
+     *
+     * @return Field trimmer
+     */
+    protected RelFieldTrimmer newFieldTrimmer() {
+        return new RelFieldTrimmer(validator, relBuilder);
+    }
+
+    /**
+     * Converts an unvalidated query's parse tree into a relational expression.
+     *
+     * @param query Query to convert
+     * @param needsValidation Whether to validate the query before converting; <code>false</code> if
+     *     the query has already been validated.
+     * @param top Whether the query is top-level, say if its result will become a JDBC result set;
+     *     <code>false</code> if the query will be part of a view.
+     */
+    public RelRoot convertQuery(SqlNode query, final boolean needsValidation, final boolean top) {
+        if (needsValidation) {
+            query = validator.validate(query);
+        }
+
+        RelNode result = convertQueryRecursive(query, top, null).rel;
+        if (top) {
+            if (isStream(query)) {
+                result = new LogicalDelta(cluster, result.getTraitSet(), result);
+            }
+        }
+        RelCollation collation = RelCollations.EMPTY;
+        if (!query.isA(SqlKind.DML)) {
+            if (isOrdered(query)) {
+                collation = requiredCollation(result);
+            }
+        }
+        checkConvertedType(query, result);
+
+        if (SQL2REL_LOGGER.isDebugEnabled()) {
+            SQL2REL_LOGGER.debug(
+                    RelOptUtil.dumpPlan(
+                            "Plan after converting SqlNode to RelNode",
+                            result,
+                            SqlExplainFormat.TEXT,
+                            SqlExplainLevel.EXPPLAN_ATTRIBUTES));
+        }
+
+        final RelDataType validatedRowType = validator.getValidatedNodeType(query);
+        List<RelHint> hints = new ArrayList<>();
+        if (query.getKind() == SqlKind.SELECT) {
+            final SqlSelect select = (SqlSelect) query;
+            if (select.hasHints()) {
+                hints = SqlUtil.getRelHint(hintStrategies, select.getHints());
+            }
+        }
+        // propagate the hints.
+        result = RelOptUtil.propagateRelHints(result, false);
+
+        // clear node SubQueryAlias.
+        result = result.accept(new SubQueryAliasNodeClearShuttle());
+
+        return RelRoot.of(result, validatedRowType, query.getKind())
+                .withCollation(collation)
+                .withHints(hints);
+    }
+
+    private static boolean isStream(SqlNode query) {
+        return query instanceof SqlSelect
+                && ((SqlSelect) query).isKeywordPresent(SqlSelectKeyword.STREAM);
+    }
+
+    public static boolean isOrdered(SqlNode query) {
+        switch (query.getKind()) {
+            case SELECT:
+                return ((SqlSelect) query).getOrderList() != null
+                        && ((SqlSelect) query).getOrderList().size() > 0;
+            case WITH:
+                return isOrdered(((SqlWith) query).body);
+            case ORDER_BY:
+                return ((SqlOrderBy) query).orderList.size() > 0;
+            default:
+                return false;
+        }
+    }
+
+    private RelCollation requiredCollation(RelNode r) {
+        if (r instanceof Sort) {
+            return ((Sort) r).collation;
+        }
+        if (r instanceof Project) {
+            return requiredCollation(((Project) r).getInput());
+        }
+        if (r instanceof Delta) {
+            return requiredCollation(((Delta) r).getInput());
+        }
+        throw new AssertionError();
+    }
+
+    /** Converts a SELECT statement's parse tree into a relational expression. */
+    public RelNode convertSelect(SqlSelect select, boolean top) {
+        final SqlValidatorScope selectScope = validator.getWhereScope(select);
+        final Blackboard bb = createBlackboard(selectScope, null, top);
+        convertSelectImpl(bb, select);
+        return bb.root;
+    }
+
+    /** Factory method for creating translation workspace. */
+    protected Blackboard createBlackboard(
+            SqlValidatorScope scope, Map<String, RexNode> nameToNodeMap, boolean top) {
+        return new Blackboard(scope, nameToNodeMap, top);
+    }
+
+    /** Implementation of {@link #convertSelect(SqlSelect, boolean)}; derived class may override. */
+    protected void convertSelectImpl(final Blackboard bb, SqlSelect select) {
+        convertFrom(bb, select.getFrom());
+        convertWhere(bb, select.getWhere());
+
+        final List<SqlNode> orderExprList = new ArrayList<>();
+        final List<RelFieldCollation> collationList = new ArrayList<>();
+        gatherOrderExprs(bb, select, select.getOrderList(), orderExprList, collationList);
+        final RelCollation collation = cluster.traitSet().canonize(RelCollations.of(collationList));
+
+        if (validator.isAggregate(select)) {
+            convertAgg(bb, select, orderExprList);
+        } else {
+            convertSelectList(bb, select, orderExprList);
+        }
+
+        if (select.isDistinct()) {
+            distinctify(bb, true);
+        }
+
+        convertOrder(select, bb, collation, orderExprList, select.getOffset(), select.getFetch());
+
+        if (select.hasHints()) {
+            final List<RelHint> hints = SqlUtil.getRelHint(hintStrategies, select.getHints());
+            // Attach the hints to the first Hintable node we found from the root node.
+            bb.setRoot(
+                    bb.root.accept(
+                            new RelShuttleImpl() {
+                                boolean attached = false;
+
+                                @Override
+                                public RelNode visitChild(RelNode parent, int i, RelNode child) {
+                                    if (parent instanceof Hintable && !attached) {
+                                        attached = true;
+                                        return ((Hintable) parent).attachHints(hints);
+                                    } else {
+                                        return super.visitChild(parent, i, child);
+                                    }
+                                }
+                            }),
+                    true);
+        } else {
+            bb.setRoot(bb.root, true);
+        }
+    }
+
+    /**
+     * Having translated 'SELECT ... FROM ... [GROUP BY ...] [HAVING ...]', adds a relational
+     * expression to make the results unique.
+     *
+     * <p>If the SELECT clause contains duplicate expressions, adds {@link LogicalProject}s so that
+     * we are grouping on the minimal set of keys. The performance gain isn't huge, but it is
+     * difficult to detect these duplicate expressions later.
+     *
+     * @param bb Blackboard
+     * @param checkForDupExprs Check for duplicate expressions
+     */
+    private void distinctify(Blackboard bb, boolean checkForDupExprs) {
+        // Look for duplicate expressions in the project.
+        // Say we have 'select x, y, x, z'.
+        // Then dups will be {[2, 0]}
+        // and oldToNew will be {[0, 0], [1, 1], [2, 0], [3, 2]}
+        RelNode rel = bb.root;
+        if (checkForDupExprs && (rel instanceof LogicalProject)) {
+            LogicalProject project = (LogicalProject) rel;
+            final List<RexNode> projectExprs = project.getProjects();
+            final List<Integer> origins = new ArrayList<>();
+            int dupCount = 0;
+            for (int i = 0; i < projectExprs.size(); i++) {
+                int x = projectExprs.indexOf(projectExprs.get(i));
+                if (x >= 0 && x < i) {
+                    origins.add(x);
+                    ++dupCount;
+                } else {
+                    origins.add(i);
+                }
+            }
+            if (dupCount == 0) {
+                distinctify(bb, false);
+                return;
+            }
+
+            final Map<Integer, Integer> squished = new HashMap<>();
+            final List<RelDataTypeField> fields = rel.getRowType().getFieldList();
+            final List<Pair<RexNode, String>> newProjects = new ArrayList<>();
+            for (int i = 0; i < fields.size(); i++) {
+                if (origins.get(i) == i) {
+                    squished.put(i, newProjects.size());
+                    newProjects.add(RexInputRef.of2(i, fields));
+                }
+            }
+            rel =
+                    LogicalProject.create(
+                            rel,
+                            com.google.common.collect.ImmutableList.of(),
+                            Pair.left(newProjects),
+                            Pair.right(newProjects));
+            bb.root = rel;
+            distinctify(bb, false);
+            rel = bb.root;
+
+            // Create the expressions to reverse the mapping.
+            // Project($0, $1, $0, $2).
+            final List<Pair<RexNode, String>> undoProjects = new ArrayList<>();
+            for (int i = 0; i < fields.size(); i++) {
+                final int origin = origins.get(i);
+                RelDataTypeField field = fields.get(i);
+                undoProjects.add(
+                        Pair.of(
+                                new RexInputRef(squished.get(origin), field.getType()),
+                                field.getName()));
+            }
+
+            rel =
+                    LogicalProject.create(
+                            rel,
+                            com.google.common.collect.ImmutableList.of(),
+                            Pair.left(undoProjects),
+                            Pair.right(undoProjects));
+            bb.setRoot(rel, false);
+
+            return;
+        }
+
+        // Usual case: all of the expressions in the SELECT clause are
+        // different.
+        final ImmutableBitSet groupSet = ImmutableBitSet.range(rel.getRowType().getFieldCount());
+        rel =
+                createAggregate(
+                        bb,
+                        groupSet,
+                        com.google.common.collect.ImmutableList.of(groupSet),
+                        com.google.common.collect.ImmutableList.of());
+
+        bb.setRoot(rel, false);
+    }
+
+    /**
+     * Converts a query's ORDER BY clause, if any.
+     *
+     * <p>Ignores the ORDER BY clause if the query is not top-level and FETCH or OFFSET are not
+     * present.
+     *
+     * @param select Query
+     * @param bb Blackboard
+     * @param collation Collation list
+     * @param orderExprList Method populates this list with orderBy expressions not present in
+     *     selectList
+     * @param offset Expression for number of rows to discard before returning first row
+     * @param fetch Expression for number of rows to fetch
+     */
+    protected void convertOrder(
+            SqlSelect select,
+            Blackboard bb,
+            RelCollation collation,
+            List<SqlNode> orderExprList,
+            SqlNode offset,
+            SqlNode fetch) {
+        if (removeSortInSubQuery(bb.top)
+                || select.getOrderList() == null
+                || select.getOrderList().getList().isEmpty()) {
+            assert removeSortInSubQuery(bb.top) || collation.getFieldCollations().isEmpty();
+            if ((offset == null
+                            || (offset instanceof SqlLiteral
+                                    && ((SqlLiteral) offset)
+                                            .bigDecimalValue()
+                                            .equals(BigDecimal.ZERO)))
+                    && fetch == null) {
+                return;
+            }
+        }
+
+        // Create a sorter using the previously constructed collations.
+        bb.setRoot(
+                LogicalSort.create(
+                        bb.root,
+                        collation,
+                        offset == null ? null : convertExpression(offset),
+                        fetch == null ? null : convertExpression(fetch)),
+                false);
+
+        // If extra expressions were added to the project list for sorting,
+        // add another project to remove them. But make the collation empty, because
+        // we can't represent the real collation.
+        //
+        // If it is the top node, use the real collation, but don't trim fields.
+        if (orderExprList.size() > 0 && !bb.top) {
+            final List<RexNode> exprs = new ArrayList<>();
+            final RelDataType rowType = bb.root.getRowType();
+            final int fieldCount = rowType.getFieldCount() - orderExprList.size();
+            for (int i = 0; i < fieldCount; i++) {
+                exprs.add(rexBuilder.makeInputRef(bb.root, i));
+            }
+            bb.setRoot(
+                    LogicalProject.create(
+                            bb.root,
+                            com.google.common.collect.ImmutableList.of(),
+                            exprs,
+                            rowType.getFieldNames().subList(0, fieldCount)),
+                    false);
+        }
+    }
+
+    /**
+     * Returns whether we should remove the sort for the subsequent query conversion.
+     *
+     * @param top Whether the rel to convert is the root of the query
+     */
+    private boolean removeSortInSubQuery(boolean top) {
+        return config.isRemoveSortInSubQuery() && !top;
+    }
+
+    /**
+     * Returns whether a given node contains a {@link SqlInOperator}.
+     *
+     * @param node a RexNode tree
+     */
+    private static boolean containsInOperator(SqlNode node) {
+        try {
+            SqlVisitor<Void> visitor =
+                    new SqlBasicVisitor<Void>() {
+                        public Void visit(SqlCall call) {
+                            if (call.getOperator() instanceof SqlInOperator) {
+                                throw new Util.FoundOne(call);
+                            }
+                            return super.visit(call);
+                        }
+                    };
+            node.accept(visitor);
+            return false;
+        } catch (Util.FoundOne e) {
+            Util.swallow(e, null);
+            return true;
+        }
+    }
+
+    /**
+     * Push down all the NOT logical operators into any IN/NOT IN operators.
+     *
+     * @param scope Scope where {@code sqlNode} occurs
+     * @param sqlNode the root node from which to look for NOT operators
+     * @return the transformed SqlNode representation with NOT pushed down.
+     */
+    private static SqlNode pushDownNotForIn(SqlValidatorScope scope, SqlNode sqlNode) {
+        if (!(sqlNode instanceof SqlCall) || !containsInOperator(sqlNode)) {
+            return sqlNode;
+        }
+        final SqlCall sqlCall = (SqlCall) sqlNode;
+        switch (sqlCall.getKind()) {
+            case AND:
+            case OR:
+                final List<SqlNode> operands = new ArrayList<>();
+                for (SqlNode operand : sqlCall.getOperandList()) {
+                    operands.add(pushDownNotForIn(scope, operand));
+                }
+                final SqlCall newCall =
+                        sqlCall.getOperator().createCall(sqlCall.getParserPosition(), operands);
+                return reg(scope, newCall);
+
+            case NOT:
+                assert sqlCall.operand(0) instanceof SqlCall;
+                final SqlCall call = sqlCall.operand(0);
+                switch (sqlCall.operand(0).getKind()) {
+                    case CASE:
+                        final SqlCase caseNode = (SqlCase) call;
+                        final SqlNodeList thenOperands = new SqlNodeList(SqlParserPos.ZERO);
+
+                        for (SqlNode thenOperand : caseNode.getThenOperands()) {
+                            final SqlCall not =
+                                    SqlStdOperatorTable.NOT.createCall(
+                                            SqlParserPos.ZERO, thenOperand);
+                            thenOperands.add(pushDownNotForIn(scope, reg(scope, not)));
+                        }
+                        SqlNode elseOperand = caseNode.getElseOperand();
+                        if (!SqlUtil.isNull(elseOperand)) {
+                            // "not(unknown)" is "unknown", so no need to simplify
+                            final SqlCall not =
+                                    SqlStdOperatorTable.NOT.createCall(
+                                            SqlParserPos.ZERO, elseOperand);
+                            elseOperand = pushDownNotForIn(scope, reg(scope, not));
+                        }
+
+                        return reg(
+                                scope,
+                                SqlStdOperatorTable.CASE.createCall(
+                                        SqlParserPos.ZERO,
+                                        caseNode.getValueOperand(),
+                                        caseNode.getWhenOperands(),
+                                        thenOperands,
+                                        elseOperand));
+
+                    case AND:
+                        final List<SqlNode> orOperands = new ArrayList<>();
+                        for (SqlNode operand : call.getOperandList()) {
+                            orOperands.add(
+                                    pushDownNotForIn(
+                                            scope,
+                                            reg(
+                                                    scope,
+                                                    SqlStdOperatorTable.NOT.createCall(
+                                                            SqlParserPos.ZERO, operand))));
+                        }
+                        return reg(
+                                scope,
+                                SqlStdOperatorTable.OR.createCall(SqlParserPos.ZERO, orOperands));
+
+                    case OR:
+                        final List<SqlNode> andOperands = new ArrayList<>();
+                        for (SqlNode operand : call.getOperandList()) {
+                            andOperands.add(
+                                    pushDownNotForIn(
+                                            scope,
+                                            reg(
+                                                    scope,
+                                                    SqlStdOperatorTable.NOT.createCall(
+                                                            SqlParserPos.ZERO, operand))));
+                        }
+                        return reg(
+                                scope,
+                                SqlStdOperatorTable.AND.createCall(SqlParserPos.ZERO, andOperands));
+
+                    case NOT:
+                        assert call.operandCount() == 1;
+                        return pushDownNotForIn(scope, call.operand(0));
+
+                    case NOT_IN:
+                        return reg(
+                                scope,
+                                SqlStdOperatorTable.IN.createCall(
+                                        SqlParserPos.ZERO, call.getOperandList()));
+
+                    case IN:
+                        return reg(
+                                scope,
+                                SqlStdOperatorTable.NOT_IN.createCall(
+                                        SqlParserPos.ZERO, call.getOperandList()));
+                }
+        }
+        return sqlNode;
+    }
+
+    /**
+     * Registers with the validator a {@link SqlNode} that has been created during the Sql-to-Rel
+     * process.
+     */
+    private static SqlNode reg(SqlValidatorScope scope, SqlNode e) {
+        scope.getValidator().deriveType(scope, e);
+        return e;
+    }
+
+    /**
+     * Converts a WHERE clause.
+     *
+     * @param bb Blackboard
+     * @param where WHERE clause, may be null
+     */
+    private void convertWhere(final Blackboard bb, final SqlNode where) {
+        if (where == null) {
+            return;
+        }
+        SqlNode newWhere = pushDownNotForIn(bb.scope, where);
+        replaceSubQueries(bb, newWhere, RelOptUtil.Logic.UNKNOWN_AS_FALSE);
+        final RexNode convertedWhere = bb.convertExpression(newWhere);
+        final RexNode convertedWhere2 = RexUtil.removeNullabilityCast(typeFactory, convertedWhere);
+
+        // only allocate filter if the condition is not TRUE
+        if (convertedWhere2.isAlwaysTrue()) {
+            return;
+        }
+
+        final RelFactories.FilterFactory filterFactory = RelFactories.DEFAULT_FILTER_FACTORY;
+        final RelNode filter =
+                filterFactory.createFilter(
+                        bb.root, convertedWhere2, com.google.common.collect.ImmutableSet.of());
+        final RelNode r;
+        final CorrelationUse p = getCorrelationUse(bb, filter);
+        if (p != null) {
+            assert p.r instanceof Filter;
+            Filter f = (Filter) p.r;
+            r =
+                    LogicalFilter.create(
+                            f.getInput(),
+                            f.getCondition(),
+                            com.google.common.collect.ImmutableSet.of(p.id));
+        } else {
+            r = filter;
+        }
+
+        bb.setRoot(r, false);
+    }
+
+    private void replaceSubQueries(
+            final Blackboard bb, final SqlNode expr, RelOptUtil.Logic logic) {
+        findSubQueries(bb, expr, logic, false);
+        for (SubQuery node : bb.subQueryList) {
+            substituteSubQuery(bb, node);
+        }
+    }
+
+    private void substituteSubQuery(Blackboard bb, SubQuery subQuery) {
+        final RexNode expr = subQuery.expr;
+        if (expr != null) {
+            // Already done.
+            return;
+        }
+
+        final SqlBasicCall call;
+        final RelNode rel;
+        final SqlNode query;
+        final RelOptUtil.Exists converted;
+        switch (subQuery.node.getKind()) {
+            case CURSOR:
+                convertCursor(bb, subQuery);
+                return;
+
+            case MULTISET_QUERY_CONSTRUCTOR:
+            case MULTISET_VALUE_CONSTRUCTOR:
+            case ARRAY_QUERY_CONSTRUCTOR:
+                rel =
+                        convertMultisets(
+                                com.google.common.collect.ImmutableList.of(subQuery.node), bb);
+                subQuery.expr = bb.register(rel, JoinRelType.INNER);
+                return;
+
+            case IN:
+            case NOT_IN:
+            case SOME:
+            case ALL:
+                call = (SqlBasicCall) subQuery.node;
+                query = call.operand(1);
+                if (!config.isExpand() && !(query instanceof SqlNodeList)) {
+                    return;
+                }
+                final SqlNode leftKeyNode = call.operand(0);
+
+                final List<RexNode> leftKeys;
+                switch (leftKeyNode.getKind()) {
+                    case ROW:
+                        leftKeys = new ArrayList<>();
+                        for (SqlNode sqlExpr : ((SqlBasicCall) leftKeyNode).getOperandList()) {
+                            leftKeys.add(bb.convertExpression(sqlExpr));
+                        }
+                        break;
+                    default:
+                        leftKeys =
+                                com.google.common.collect.ImmutableList.of(
+                                        bb.convertExpression(leftKeyNode));
+                }
+
+                if (query instanceof SqlNodeList) {
+                    SqlNodeList valueList = (SqlNodeList) query;
+                    if (!containsNullLiteral(valueList)
+                            && valueList.size() < config.getInSubQueryThreshold()) {
+                        // We're under the threshold, so convert to OR.
+                        subQuery.expr =
+                                convertInToOr(
+                                        bb,
+                                        leftKeys,
+                                        valueList,
+                                        (SqlInOperator) call.getOperator());
+                        return;
+                    }
+
+                    // Otherwise, let convertExists translate
+                    // values list into an inline table for the
+                    // reference to Q below.
+                }
+
+                // Project out the search columns from the left side
+
+                // Q1:
+                // "select from emp where emp.deptno in (select col1 from T)"
+                //
+                // is converted to
+                //
+                // "select from
+                //   emp inner join (select distinct col1 from T)) q
+                //   on emp.deptno = q.col1
+                //
+                // Q2:
+                // "select from emp where emp.deptno not in (Q)"
+                //
+                // is converted to
+                //
+                // "select from
+                //   emp left outer join (select distinct col1, TRUE from T) q
+                //   on emp.deptno = q.col1
+                //   where emp.deptno <> null
+                //         and q.indicator <> TRUE"
+                //
+                // Note: Sub-query can be used as SqlUpdate#condition like below:
+                //
+                //   UPDATE emp
+                //   SET empno = 1 WHERE emp.empno IN (
+                //     SELECT emp.empno FROM emp WHERE emp.empno = 2)
+                //
+                // In such case, when converting SqlUpdate#condition, bb.root is null
+                // and it makes no sense to do the sub-query substitution.
+                if (bb.root == null) {
+                    return;
+                }
+                final RelDataType targetRowType =
+                        SqlTypeUtil.promoteToRowType(
+                                typeFactory, validator.getValidatedNodeType(leftKeyNode), null);
+                final boolean notIn = call.getOperator().kind == SqlKind.NOT_IN;
+                converted =
+                        convertExists(
+                                query,
+                                RelOptUtil.SubQueryType.IN,
+                                subQuery.logic,
+                                notIn,
+                                targetRowType);
+                if (converted.indicator) {
+                    // Generate
+                    //    emp CROSS JOIN (SELECT COUNT(*) AS c,
+                    //                       COUNT(deptno) AS ck FROM dept)
+                    final RelDataType longType = typeFactory.createSqlType(SqlTypeName.BIGINT);
+                    final RelNode seek = converted.r.getInput(0); // fragile
+                    final int keyCount = leftKeys.size();
+                    final List<Integer> args = ImmutableIntList.range(0, keyCount);
+                    LogicalAggregate aggregate =
+                            LogicalAggregate.create(
+                                    seek,
+                                    com.google.common.collect.ImmutableList.of(),
+                                    ImmutableBitSet.of(),
+                                    null,
+                                    com.google.common.collect.ImmutableList.of(
+                                            AggregateCall.create(
+                                                    SqlStdOperatorTable.COUNT,
+                                                    false,
+                                                    false,
+                                                    false,
+                                                    com.google.common.collect.ImmutableList.of(),
+                                                    -1,
+                                                    RelCollations.EMPTY,
+                                                    longType,
+                                                    null),
+                                            AggregateCall.create(
+                                                    SqlStdOperatorTable.COUNT,
+                                                    false,
+                                                    false,
+                                                    false,
+                                                    args,
+                                                    -1,
+                                                    RelCollations.EMPTY,
+                                                    longType,
+                                                    null)));
+                    LogicalJoin join =
+                            LogicalJoin.create(
+                                    bb.root,
+                                    aggregate,
+                                    com.google.common.collect.ImmutableList.of(),
+                                    rexBuilder.makeLiteral(true),
+                                    com.google.common.collect.ImmutableSet.of(),
+                                    JoinRelType.INNER);
+                    bb.setRoot(join, false);
+                }
+                final RexNode rex =
+                        bb.register(
+                                converted.r,
+                                converted.outerJoin ? JoinRelType.LEFT : JoinRelType.INNER,
+                                leftKeys);
+
+                RelOptUtil.Logic logic = subQuery.logic;
+                switch (logic) {
+                    case TRUE_FALSE_UNKNOWN:
+                    case UNKNOWN_AS_TRUE:
+                        if (!converted.indicator) {
+                            logic = RelOptUtil.Logic.TRUE_FALSE;
+                        }
+                }
+                subQuery.expr = translateIn(logic, bb.root, rex);
+                if (notIn) {
+                    subQuery.expr = rexBuilder.makeCall(SqlStdOperatorTable.NOT, subQuery.expr);
+                }
+                return;
+
+            case EXISTS:
+                // "select from emp where exists (select a from T)"
+                //
+                // is converted to the following if the sub-query is correlated:
+                //
+                // "select from emp left outer join (select AGG_TRUE() as indicator
+                // from T group by corr_var) q where q.indicator is true"
+                //
+                // If there is no correlation, the expression is replaced with a
+                // boolean indicating whether the sub-query returned 0 or >= 1 row.
+                call = (SqlBasicCall) subQuery.node;
+                query = call.operand(0);
+                if (!config.isExpand()) {
+                    return;
+                }
+                final SqlValidatorScope seekScope =
+                        (query instanceof SqlSelect)
+                                ? validator.getSelectScope((SqlSelect) query)
+                                : null;
+                final Blackboard seekBb = createBlackboard(seekScope, null, false);
+                final RelNode seekRel = convertQueryOrInList(seekBb, query, null);
+                // An EXIST sub-query whose inner child has at least 1 tuple
+                // (e.g. an Aggregate with no grouping columns or non-empty Values
+                // node) should be simplified to a Boolean constant expression.
+                final RelMetadataQuery mq = seekRel.getCluster().getMetadataQuery();
+                final Double minRowCount = mq.getMinRowCount(seekRel);
+                if (minRowCount != null && minRowCount >= 1D) {
+                    subQuery.expr = rexBuilder.makeLiteral(true);
+                    return;
+                }
+                converted =
+                        RelOptUtil.createExistsPlan(
+                                seekRel,
+                                RelOptUtil.SubQueryType.EXISTS,
+                                subQuery.logic,
+                                true,
+                                relBuilder);
+                assert !converted.indicator;
+                if (convertNonCorrelatedSubQuery(subQuery, bb, converted.r, true)) {
+                    return;
+                }
+                subQuery.expr = bb.register(converted.r, JoinRelType.LEFT);
+                return;
+
+            case SCALAR_QUERY:
+                // Convert the sub-query.  If it's non-correlated, convert it
+                // to a constant expression.
+                if (!config.isExpand()) {
+                    return;
+                }
+                call = (SqlBasicCall) subQuery.node;
+                query = call.operand(0);
+                converted =
+                        convertExists(
+                                query, RelOptUtil.SubQueryType.SCALAR, subQuery.logic, true, null);
+                assert !converted.indicator;
+                if (convertNonCorrelatedSubQuery(subQuery, bb, converted.r, false)) {
+                    return;
+                }
+                rel = convertToSingleValueSubq(query, converted.r);
+                subQuery.expr = bb.register(rel, JoinRelType.LEFT);
+                return;
+
+            case SELECT:
+                // This is used when converting multiset queries:
+                //
+                // select * from unnest(select multiset[deptno] from emps);
+                //
+                converted =
+                        convertExists(
+                                subQuery.node,
+                                RelOptUtil.SubQueryType.SCALAR,
+                                subQuery.logic,
+                                true,
+                                null);
+                assert !converted.indicator;
+                subQuery.expr = bb.register(converted.r, JoinRelType.LEFT);
+
+                // This is used when converting window table functions:
+                //
+                // select * from table(tumble(table emps, descriptor(deptno), interval '3' DAY))
+                //
+                bb.cursors.add(converted.r);
+                return;
+
+            default:
+                throw new AssertionError("unexpected kind of sub-query: " + subQuery.node);
+        }
+    }
+
+    private RexNode translateIn(RelOptUtil.Logic logic, RelNode root, final RexNode rex) {
+        switch (logic) {
+            case TRUE:
+                return rexBuilder.makeLiteral(true);
+
+            case TRUE_FALSE:
+            case UNKNOWN_AS_FALSE:
+                assert rex instanceof RexRangeRef;
+                final int fieldCount = rex.getType().getFieldCount();
+                RexNode rexNode = rexBuilder.makeFieldAccess(rex, fieldCount - 1);
+                rexNode = rexBuilder.makeCall(SqlStdOperatorTable.IS_TRUE, rexNode);
+
+                // Then append the IS NOT NULL(leftKeysForIn).
+                //
+                // RexRangeRef contains the following fields:
+                //   leftKeysForIn,
+                //   rightKeysForIn (the original sub-query select list),
+                //   nullIndicator
+                //
+                // The first two lists contain the same number of fields.
+                final int k = (fieldCount - 1) / 2;
+                for (int i = 0; i < k; i++) {
+                    rexNode =
+                            rexBuilder.makeCall(
+                                    SqlStdOperatorTable.AND,
+                                    rexNode,
+                                    rexBuilder.makeCall(
+                                            SqlStdOperatorTable.IS_NOT_NULL,
+                                            rexBuilder.makeFieldAccess(rex, i)));
+                }
+                return rexNode;
+
+            case TRUE_FALSE_UNKNOWN:
+            case UNKNOWN_AS_TRUE:
+                // select e.deptno,
+                //   case
+                //   when ct.c = 0 then false
+                //   when dt.i is not null then true
+                //   when e.deptno is null then null
+                //   when ct.ck < ct.c then null
+                //   else false
+                //   end
+                // from e
+                // cross join (select count(*) as c, count(deptno) as ck from v) as ct
+                // left join (select distinct deptno, true as i from v) as dt
+                //   on e.deptno = dt.deptno
+                final Join join = (Join) root;
+                final Project left = (Project) join.getLeft();
+                final RelNode leftLeft = ((Join) left.getInput()).getLeft();
+                final int leftLeftCount = leftLeft.getRowType().getFieldCount();
+                final RelDataType longType = typeFactory.createSqlType(SqlTypeName.BIGINT);
+                final RexNode cRef = rexBuilder.makeInputRef(root, leftLeftCount);
+                final RexNode ckRef = rexBuilder.makeInputRef(root, leftLeftCount + 1);
+                final RexNode iRef =
+                        rexBuilder.makeInputRef(root, root.getRowType().getFieldCount() - 1);
+
+                final RexLiteral zero = rexBuilder.makeExactLiteral(BigDecimal.ZERO, longType);
+                final RexLiteral trueLiteral = rexBuilder.makeLiteral(true);
+                final RexLiteral falseLiteral = rexBuilder.makeLiteral(false);
+                final RexNode unknownLiteral = rexBuilder.makeNullLiteral(trueLiteral.getType());
+
+                final com.google.common.collect.ImmutableList.Builder<RexNode> args =
+                        com.google.common.collect.ImmutableList.builder();
+                args.add(
+                        rexBuilder.makeCall(SqlStdOperatorTable.EQUALS, cRef, zero),
+                        falseLiteral,
+                        rexBuilder.makeCall(SqlStdOperatorTable.IS_NOT_NULL, iRef),
+                        trueLiteral);
+                final JoinInfo joinInfo = join.analyzeCondition();
+                for (int leftKey : joinInfo.leftKeys) {
+                    final RexNode kRef = rexBuilder.makeInputRef(root, leftKey);
+                    args.add(
+                            rexBuilder.makeCall(SqlStdOperatorTable.IS_NULL, kRef), unknownLiteral);
+                }
+                args.add(
+                        rexBuilder.makeCall(SqlStdOperatorTable.LESS_THAN, ckRef, cRef),
+                        unknownLiteral,
+                        falseLiteral);
+
+                return rexBuilder.makeCall(SqlStdOperatorTable.CASE, args.build());
+
+            default:
+                throw new AssertionError(logic);
+        }
+    }
+
+    private static boolean containsNullLiteral(SqlNodeList valueList) {
+        for (SqlNode node : valueList.getList()) {
+            if (node instanceof SqlLiteral) {
+                SqlLiteral lit = (SqlLiteral) node;
+                if (lit.getValue() == null) {
+                    return true;
+                }
+            }
+        }
+        return false;
+    }
+
+    /**
+     * Determines if a sub-query is non-correlated and if so, converts it to a constant.
+     *
+     * @param subQuery the call that references the sub-query
+     * @param bb blackboard used to convert the sub-query
+     * @param converted RelNode tree corresponding to the sub-query
+     * @param isExists true if the sub-query is part of an EXISTS expression
+     * @return Whether the sub-query can be converted to a constant
+     */
+    private boolean convertNonCorrelatedSubQuery(
+            SubQuery subQuery, Blackboard bb, RelNode converted, boolean isExists) {
+        SqlCall call = (SqlBasicCall) subQuery.node;
+        if (subQueryConverter.canConvertSubQuery() && isSubQueryNonCorrelated(converted, bb)) {
+            // First check if the sub-query has already been converted
+            // because it's a nested sub-query.  If so, don't re-evaluate
+            // it again.
+            RexNode constExpr = mapConvertedNonCorrSubqs.get(call);
+            if (constExpr == null) {
+                constExpr =
+                        subQueryConverter.convertSubQuery(call, this, isExists, config.isExplain());
+            }
+            if (constExpr != null) {
+                subQuery.expr = constExpr;
+                mapConvertedNonCorrSubqs.put(call, constExpr);
+                return true;
+            }
+        }
+        return false;
+    }
+
+    /**
+     * Converts the RelNode tree for a select statement to a select that produces a single value.
+     *
+     * @param query the query
+     * @param plan the original RelNode tree corresponding to the statement
+     * @return the converted RelNode tree
+     */
+    public RelNode convertToSingleValueSubq(SqlNode query, RelNode plan) {
+        // Check whether query is guaranteed to produce a single value.
+        if (query instanceof SqlSelect) {
+            SqlSelect select = (SqlSelect) query;
+            SqlNodeList selectList = select.getSelectList();
+            SqlNodeList groupList = select.getGroup();
+
+            if ((selectList.size() == 1) && ((groupList == null) || (groupList.size() == 0))) {
+                SqlNode selectExpr = selectList.get(0);
+                if (selectExpr instanceof SqlCall) {
+                    SqlCall selectExprCall = (SqlCall) selectExpr;
+                    if (Util.isSingleValue(selectExprCall)) {
+                        return plan;
+                    }
+                }
+
+                // If there is a limit with 0 or 1,
+                // it is ensured to produce a single value
+                if (select.getFetch() != null && select.getFetch() instanceof SqlNumericLiteral) {
+                    SqlNumericLiteral limitNum = (SqlNumericLiteral) select.getFetch();
+                    if (((BigDecimal) limitNum.getValue()).intValue() < 2) {
+                        return plan;
+                    }
+                }
+            }
+        } else if (query instanceof SqlCall) {
+            // If the query is (values ...),
+            // it is necessary to look into the operands to determine
+            // whether SingleValueAgg is necessary
+            SqlCall exprCall = (SqlCall) query;
+            if (exprCall.getOperator() instanceof SqlValuesOperator
+                    && Util.isSingleValue(exprCall)) {
+                return plan;
+            }
+        }
+
+        // If not, project SingleValueAgg
+        return RelOptUtil.createSingleValueAggRel(cluster, plan);
+    }
+
+    /**
+     * Converts "x IN (1, 2, ...)" to "x=1 OR x=2 OR ...".
+     *
+     * @param leftKeys LHS
+     * @param valuesList RHS
+     * @param op The operator (IN, NOT IN, &gt; SOME, ...)
+     * @return converted expression
+     */
+    private RexNode convertInToOr(
+            final Blackboard bb,
+            final List<RexNode> leftKeys,
+            SqlNodeList valuesList,
+            SqlInOperator op) {
+        final List<RexNode> comparisons = new ArrayList<>();
+        for (SqlNode rightVals : valuesList) {
+            RexNode rexComparison;
+            final SqlOperator comparisonOp;
+            if (op instanceof SqlQuantifyOperator) {
+                comparisonOp =
+                        RelOptUtil.op(
+                                ((SqlQuantifyOperator) op).comparisonKind,
+                                SqlStdOperatorTable.EQUALS);
+            } else {
+                comparisonOp = SqlStdOperatorTable.EQUALS;
+            }
+            if (leftKeys.size() == 1) {
+                rexComparison =
+                        rexBuilder.makeCall(
+                                comparisonOp,
+                                leftKeys.get(0),
+                                ensureSqlType(
+                                        leftKeys.get(0).getType(),
+                                        bb.convertExpression(rightVals)));
+            } else {
+                assert rightVals instanceof SqlCall;
+                final SqlBasicCall call = (SqlBasicCall) rightVals;
+                assert (call.getOperator() instanceof SqlRowOperator)
+                        && call.operandCount() == leftKeys.size();
+                rexComparison =
+                        RexUtil.composeConjunction(
+                                rexBuilder,
+                                Util.transform(
+                                        Pair.zip(leftKeys, call.getOperandList()),
+                                        pair ->
+                                                rexBuilder.makeCall(
+                                                        comparisonOp,
+                                                        pair.left,
+                                                        ensureSqlType(
+                                                                pair.left.getType(),
+                                                                bb.convertExpression(
+                                                                        pair.right)))));
+            }
+            comparisons.add(rexComparison);
+        }
+
+        switch (op.kind) {
+            case ALL:
+                return RexUtil.composeConjunction(rexBuilder, comparisons, true);
+            case NOT_IN:
+                return rexBuilder.makeCall(
+                        SqlStdOperatorTable.NOT,
+                        RexUtil.composeDisjunction(rexBuilder, comparisons, true));
+            case IN:
+            case SOME:
+                return RexUtil.composeDisjunction(rexBuilder, comparisons, true);
+            default:
+                throw new AssertionError();
+        }
+    }
+
+    /**
+     * Ensures that an expression has a given {@link SqlTypeName}, applying a cast if necessary. If
+     * the expression already has the right type family, returns the expression unchanged.
+     */
+    private RexNode ensureSqlType(RelDataType type, RexNode node) {
+        if (type.getSqlTypeName() == node.getType().getSqlTypeName()
+                || (type.getSqlTypeName() == SqlTypeName.VARCHAR
+                        && node.getType().getSqlTypeName() == SqlTypeName.CHAR)) {
+            return node;
+        }
+        return rexBuilder.ensureType(type, node, true);
+    }
+
+    /**
+     * Gets the list size threshold under which {@link #convertInToOr} is used. Lists of this size
+     * or greater will instead be converted to use a join against an inline table ({@link
+     * LogicalValues}) rather than a predicate. A threshold of 0 forces usage of an inline table in
+     * all cases; a threshold of Integer.MAX_VALUE forces usage of OR in all cases
+     *
+     * @return threshold, default {@link #DEFAULT_IN_SUB_QUERY_THRESHOLD}
+     */
+    @Deprecated // to be removed before 2.0
+    protected int getInSubqueryThreshold() {
+        return config.getInSubQueryThreshold();
+    }
+
+    /**
+     * Converts an EXISTS or IN predicate into a join. For EXISTS, the sub-query produces an
+     * indicator variable, and the result is a relational expression which outer joins that
+     * indicator to the original query. After performing the outer join, the condition will be TRUE
+     * if the EXISTS condition holds, NULL otherwise.
+     *
+     * @param seek A query, for example 'select * from emp' or 'values (1,2,3)' or '('Foo', 34)'.
+     * @param subQueryType Whether sub-query is IN, EXISTS or scalar
+     * @param logic Whether the answer needs to be in full 3-valued logic (TRUE, FALSE, UNKNOWN)
+     *     will be required, or whether we can accept an approximation (say representing UNKNOWN as
+     *     FALSE)
+     * @param notIn Whether the operation is NOT IN
+     * @return join expression
+     */
+    private RelOptUtil.Exists convertExists(
+            SqlNode seek,
+            RelOptUtil.SubQueryType subQueryType,
+            RelOptUtil.Logic logic,
+            boolean notIn,
+            RelDataType targetDataType) {
+        final SqlValidatorScope seekScope =
+                (seek instanceof SqlSelect) ? validator.getSelectScope((SqlSelect) seek) : null;
+        final Blackboard seekBb = createBlackboard(seekScope, null, false);
+        RelNode seekRel = convertQueryOrInList(seekBb, seek, targetDataType);
+
+        return RelOptUtil.createExistsPlan(seekRel, subQueryType, logic, notIn, relBuilder);
+    }
+
+    private RelNode convertQueryOrInList(Blackboard bb, SqlNode seek, RelDataType targetRowType) {
+        // NOTE: Once we start accepting single-row queries as row constructors,
+        // there will be an ambiguity here for a case like X IN ((SELECT Y FROM
+        // Z)).  The SQL standard resolves the ambiguity by saying that a lone
+        // select should be interpreted as a table expression, not a row
+        // expression.  The semantic difference is that a table expression can
+        // return multiple rows.
+        if (seek instanceof SqlNodeList) {
+            return convertRowValues(bb, seek, ((SqlNodeList) seek).getList(), false, targetRowType);
+        } else {
+            return convertQueryRecursive(seek, false, null).project();
+        }
+    }
+
+    private RelNode convertRowValues(
+            Blackboard bb,
+            SqlNode rowList,
+            Collection<SqlNode> rows,
+            boolean allowLiteralsOnly,
+            RelDataType targetRowType) {
+        // NOTE jvs 30-Apr-2006: We combine all rows consisting entirely of
+        // literals into a single LogicalValues; this gives the optimizer a smaller
+        // input tree.  For everything else (computed expressions, row
+        // sub-queries), we union each row in as a projection on top of a
+        // LogicalOneRow.
+
+        final com.google.common.collect.ImmutableList.Builder<
+                        com.google.common.collect.ImmutableList<RexLiteral>>
+                tupleList = com.google.common.collect.ImmutableList.builder();
+        final RelDataType rowType;
+        if (targetRowType != null) {
+            rowType = targetRowType;
+        } else {
+            rowType =
+                    SqlTypeUtil.promoteToRowType(
+                            typeFactory, validator.getValidatedNodeType(rowList), null);
+        }
+
+        final List<RelNode> unionInputs = new ArrayList<>();
+        for (SqlNode node : rows) {
+            SqlBasicCall call;
+            if (isRowConstructor(node)) {
+                call = (SqlBasicCall) node;
+                com.google.common.collect.ImmutableList.Builder<RexLiteral> tuple =
+                        com.google.common.collect.ImmutableList.builder();
+                for (Ord<SqlNode> operand : Ord.zip(call.operands)) {
+                    RexLiteral rexLiteral =
+                            convertLiteralInValuesList(operand.e, bb, rowType, operand.i);
+                    if ((rexLiteral == null) && allowLiteralsOnly) {
+                        return null;
+                    }
+                    if ((rexLiteral == null) || !config.isCreateValuesRel()) {
+                        // fallback to convertRowConstructor
+                        tuple = null;
+                        break;
+                    }
+                    tuple.add(rexLiteral);
+                }
+                if (tuple != null) {
+                    tupleList.add(tuple.build());
+                    continue;
+                }
+            } else {
+                RexLiteral rexLiteral = convertLiteralInValuesList(node, bb, rowType, 0);
+                if ((rexLiteral != null) && config.isCreateValuesRel()) {
+                    tupleList.add(com.google.common.collect.ImmutableList.of(rexLiteral));
+                    continue;
+                } else {
+                    if ((rexLiteral == null) && allowLiteralsOnly) {
+                        return null;
+                    }
+                }
+
+                // convert "1" to "row(1)"
+                call = (SqlBasicCall) SqlStdOperatorTable.ROW.createCall(SqlParserPos.ZERO, node);
+            }
+            unionInputs.add(convertRowConstructor(bb, call));
+        }
+        LogicalValues values = LogicalValues.create(cluster, rowType, tupleList.build());
+        RelNode resultRel;
+        if (unionInputs.isEmpty()) {
+            resultRel = values;
+        } else {
+            if (!values.getTuples().isEmpty()) {
+                unionInputs.add(values);
+            }
+            resultRel = LogicalUnion.create(unionInputs, true);
+        }
+        leaves.put(resultRel, resultRel.getRowType().getFieldCount());
+        return resultRel;
+    }
+
+    private RexLiteral convertLiteralInValuesList(
+            SqlNode sqlNode, Blackboard bb, RelDataType rowType, int iField) {
+        if (!(sqlNode instanceof SqlLiteral)) {
+            return null;
+        }
+        RelDataTypeField field = rowType.getFieldList().get(iField);
+        RelDataType type = field.getType();
+        if (type.isStruct()) {
+            // null literals for weird stuff like UDT's need
+            // special handling during type flattening, so
+            // don't use LogicalValues for those
+            return null;
+        }
+
+        RexNode literalExpr = exprConverter.convertLiteral(bb, (SqlLiteral) sqlNode);
+
+        if (!(literalExpr instanceof RexLiteral)) {
+            assert literalExpr.isA(SqlKind.CAST);
+            RexNode child = ((RexCall) literalExpr).getOperands().get(0);
+            assert RexLiteral.isNullLiteral(child);
+
+            // NOTE jvs 22-Nov-2006:  we preserve type info
+            // in LogicalValues digest, so it's OK to lose it here
+            return (RexLiteral) child;
+        }
+
+        RexLiteral literal = (RexLiteral) literalExpr;
+
+        Comparable value = literal.getValue();
+
+        if (SqlTypeUtil.isExactNumeric(type) && SqlTypeUtil.hasScale(type)) {
+            BigDecimal roundedValue =
+                    NumberUtil.rescaleBigDecimal((BigDecimal) value, type.getScale());
+            return rexBuilder.makeExactLiteral(roundedValue, type);
+        }
+
+        if ((value instanceof NlsString) && (type.getSqlTypeName() == SqlTypeName.CHAR)) {
+            // pad fixed character type
+            NlsString unpadded = (NlsString) value;
+            return rexBuilder.makeCharLiteral(
+                    new NlsString(
+                            Spaces.padRight(unpadded.getValue(), type.getPrecision()),
+                            unpadded.getCharsetName(),
+                            unpadded.getCollation()));
+        }
+        return literal;
+    }
+
+    private boolean isRowConstructor(SqlNode node) {
+        if (!(node.getKind() == SqlKind.ROW)) {
+            return false;
+        }
+        SqlCall call = (SqlCall) node;
+        return call.getOperator().getName().equalsIgnoreCase("row");
+    }
+
+    /**
+     * Builds a list of all <code>IN</code> or <code>EXISTS</code> operators inside SQL parse tree.
+     * Does not traverse inside queries.
+     *
+     * @param bb blackboard
+     * @param node the SQL parse tree
+     * @param logic Whether the answer needs to be in full 3-valued logic (TRUE, FALSE, UNKNOWN)
+     *     will be required, or whether we can accept an approximation (say representing UNKNOWN as
+     *     FALSE)
+     * @param registerOnlyScalarSubQueries if set to true and the parse tree corresponds to a
+     *     variation of a select node, only register it if it's a scalar sub-query
+     */
+    private void findSubQueries(
+            Blackboard bb,
+            SqlNode node,
+            RelOptUtil.Logic logic,
+            boolean registerOnlyScalarSubQueries) {
+        final SqlKind kind = node.getKind();
+        switch (kind) {
+            case EXISTS:
+            case SELECT:
+            case MULTISET_QUERY_CONSTRUCTOR:
+            case MULTISET_VALUE_CONSTRUCTOR:
+            case ARRAY_QUERY_CONSTRUCTOR:
+            case CURSOR:
+            case SCALAR_QUERY:
+                if (!registerOnlyScalarSubQueries || (kind == SqlKind.SCALAR_QUERY)) {
+                    bb.registerSubQuery(node, RelOptUtil.Logic.TRUE_FALSE);
+                }
+                return;
+            case IN:
+                break;
+            case NOT_IN:
+            case NOT:
+                logic = logic.negate();
+                break;
+        }
+        if (node instanceof SqlCall) {
+            switch (kind) {
+                    // Do no change logic for AND, IN and NOT IN expressions;
+                    // but do change logic for OR, NOT and others;
+                    // EXISTS was handled already.
+                case AND:
+                case IN:
+                case NOT_IN:
+                    break;
+                default:
+                    logic = RelOptUtil.Logic.TRUE_FALSE_UNKNOWN;
+                    break;
+            }
+            for (SqlNode operand : ((SqlCall) node).getOperandList()) {
+                if (operand != null) {
+                    // In the case of an IN expression, locate scalar
+                    // sub-queries so we can convert them to constants
+                    findSubQueries(
+                            bb,
+                            operand,
+                            logic,
+                            kind == SqlKind.IN
+                                    || kind == SqlKind.NOT_IN
+                                    || kind == SqlKind.SOME
+                                    || kind == SqlKind.ALL
+                                    || registerOnlyScalarSubQueries);
+                }
+            }
+        } else if (node instanceof SqlNodeList) {
+            for (SqlNode child : (SqlNodeList) node) {
+                findSubQueries(
+                        bb,
+                        child,
+                        logic,
+                        kind == SqlKind.IN
+                                || kind == SqlKind.NOT_IN
+                                || kind == SqlKind.SOME
+                                || kind == SqlKind.ALL
+                                || registerOnlyScalarSubQueries);
+            }
+        }
+
+        // Now that we've located any scalar sub-queries inside the IN
+        // expression, register the IN expression itself.  We need to
+        // register the scalar sub-queries first so they can be converted
+        // before the IN expression is converted.
+        switch (kind) {
+            case IN:
+            case NOT_IN:
+            case SOME:
+            case ALL:
+                switch (logic) {
+                    case TRUE_FALSE_UNKNOWN:
+                        RelDataType type = validator.getValidatedNodeTypeIfKnown(node);
+                        if (type == null) {
+                            // The node might not be validated if we still don't know type of the
+                            // node.
+                            // Therefore return directly.
+                            return;
+                        } else {
+                            break;
+                        }
+                        // fall through
+                    case UNKNOWN_AS_FALSE:
+                        logic = RelOptUtil.Logic.TRUE;
+                }
+                bb.registerSubQuery(node, logic);
+                break;
+        }
+    }
+
+    /**
+     * Converts an expression from {@link SqlNode} to {@link RexNode} format.
+     *
+     * @param node Expression to translate
+     * @return Converted expression
+     */
+    public RexNode convertExpression(SqlNode node) {
+        Map<String, RelDataType> nameToTypeMap = Collections.emptyMap();
+        final ParameterScope scope =
+                new ParameterScope((SqlValidatorImpl) validator, nameToTypeMap);
+        final Blackboard bb = createBlackboard(scope, null, false);
+        return bb.convertExpression(node);
+    }
+
+    /**
+     * Converts an expression from {@link SqlNode} to {@link RexNode} format, mapping identifier
+     * references to predefined expressions.
+     *
+     * @param node Expression to translate
+     * @param nameToNodeMap map from String to {@link RexNode}; when an {@link SqlIdentifier} is
+     *     encountered, it is used as a key and translated to the corresponding value from this map
+     * @return Converted expression
+     */
+    public RexNode convertExpression(SqlNode node, Map<String, RexNode> nameToNodeMap) {
+        final Map<String, RelDataType> nameToTypeMap = new HashMap<>();
+        for (Map.Entry<String, RexNode> entry : nameToNodeMap.entrySet()) {
+            nameToTypeMap.put(entry.getKey(), entry.getValue().getType());
+        }
+        final ParameterScope scope =
+                new ParameterScope((SqlValidatorImpl) validator, nameToTypeMap);
+        final Blackboard bb = createBlackboard(scope, nameToNodeMap, false);
+        return bb.convertExpression(node);
+    }
+
+    /**
+     * Converts a non-standard expression.
+     *
+     * <p>This method is an extension-point that derived classes can override. If this method
+     * returns a null result, the normal expression translation process will proceed. The default
+     * implementation always returns null.
+     *
+     * @param node Expression
+     * @param bb Blackboard
+     * @return null to proceed with the usual expression translation process
+     */
+    protected RexNode convertExtendedExpression(SqlNode node, Blackboard bb) {
+        return null;
+    }
+
+    private RexNode convertOver(Blackboard bb, SqlNode node) {
+        SqlCall call = (SqlCall) node;
+        SqlCall aggCall = call.operand(0);
+        boolean ignoreNulls = false;
+        switch (aggCall.getKind()) {
+            case IGNORE_NULLS:
+                ignoreNulls = true;
+                // fall through
+            case RESPECT_NULLS:
+                aggCall = aggCall.operand(0);
+        }
+
+        SqlNode windowOrRef = call.operand(1);
+        final SqlWindow window = validator.resolveWindow(windowOrRef, bb.scope);
+
+        SqlNode sqlLowerBound = window.getLowerBound();
+        SqlNode sqlUpperBound = window.getUpperBound();
+        boolean rows = window.isRows();
+        SqlNodeList orderList = window.getOrderList();
+
+        if (!aggCall.getOperator().allowsFraming()) {
+            // If the operator does not allow framing, bracketing is implicitly
+            // everything up to the current row.
+            sqlLowerBound = SqlWindow.createUnboundedPreceding(SqlParserPos.ZERO);
+            sqlUpperBound = SqlWindow.createCurrentRow(SqlParserPos.ZERO);
+            if (aggCall.getKind() == SqlKind.ROW_NUMBER) {
+                // ROW_NUMBER() expects specific kind of framing.
+                rows = true;
+            }
+        } else if (orderList.size() == 0) {
+            // Without ORDER BY, there must be no bracketing.
+            sqlLowerBound = SqlWindow.createUnboundedPreceding(SqlParserPos.ZERO);
+            sqlUpperBound = SqlWindow.createUnboundedFollowing(SqlParserPos.ZERO);
+        } else if (sqlLowerBound == null && sqlUpperBound == null) {
+            sqlLowerBound = SqlWindow.createUnboundedPreceding(SqlParserPos.ZERO);
+            sqlUpperBound = SqlWindow.createCurrentRow(SqlParserPos.ZERO);
+        } else if (sqlUpperBound == null) {
+            sqlUpperBound = SqlWindow.createCurrentRow(SqlParserPos.ZERO);
+        } else if (sqlLowerBound == null) {
+            sqlLowerBound = SqlWindow.createCurrentRow(SqlParserPos.ZERO);
+        }
+        final SqlNodeList partitionList = window.getPartitionList();
+        final com.google.common.collect.ImmutableList.Builder<RexNode> partitionKeys =
+                com.google.common.collect.ImmutableList.builder();
+        for (SqlNode partition : partitionList) {
+            partitionKeys.add(bb.convertExpression(partition));
+        }
+        final RexNode lowerBound = bb.convertExpression(sqlLowerBound);
+        final RexNode upperBound = bb.convertExpression(sqlUpperBound);
+        if (orderList.size() == 0 && !rows) {
+            // A logical range requires an ORDER BY clause. Use the implicit
+            // ordering of this relation. There must be one, otherwise it would
+            // have failed validation.
+            orderList = bb.scope.getOrderList();
+            if (orderList == null) {
+                throw new AssertionError("Relation should have sort key for implicit ORDER BY");
+            }
+        }
+
+        final com.google.common.collect.ImmutableList.Builder<RexFieldCollation> orderKeys =
+                com.google.common.collect.ImmutableList.builder();
+        for (SqlNode order : orderList) {
+            orderKeys.add(
+                    bb.convertSortExpression(
+                            order,
+                            RelFieldCollation.Direction.ASCENDING,
+                            RelFieldCollation.NullDirection.UNSPECIFIED));
+        }
+
+        try {
+            com.google.common.base.Preconditions.checkArgument(
+                    bb.window == null, "already in window agg mode");
+            bb.window = window;
+            RexNode rexAgg = exprConverter.convertCall(bb, aggCall);
+            rexAgg = rexBuilder.ensureType(validator.getValidatedNodeType(call), rexAgg, false);
+
+            // Walk over the tree and apply 'over' to all agg functions. This is
+            // necessary because the returned expression is not necessarily a call
+            // to an agg function. For example, AVG(x) becomes SUM(x) / COUNT(x).
+
+            final SqlLiteral q = aggCall.getFunctionQuantifier();
+            final boolean isDistinct = q != null && q.getValue() == SqlSelectKeyword.DISTINCT;
+
+            final RexShuttle visitor =
+                    new HistogramShuttle(
+                            partitionKeys.build(),
+                            orderKeys.build(),
+                            RexWindowBounds.create(sqlLowerBound, lowerBound),
+                            RexWindowBounds.create(sqlUpperBound, upperBound),
+                            rows,
+                            window.isAllowPartial(),
+                            isDistinct,
+                            ignoreNulls);
+            return rexAgg.accept(visitor);
+        } finally {
+            bb.window = null;
+        }
+    }
+
+    protected void convertFrom(Blackboard bb, SqlNode from) {
+        convertFrom(bb, from, Collections.emptyList());
+    }
+
+    /**
+     * Converts a FROM clause into a relational expression.
+     *
+     * @param bb Scope within which to resolve identifiers
+     * @param from FROM clause of a query. Examples include:
+     *     <ul>
+     *       <li>a single table ("SALES.EMP"),
+     *       <li>an aliased table ("EMP AS E"),
+     *       <li>a list of tables ("EMP, DEPT"),
+     *       <li>an ANSI Join expression ("EMP JOIN DEPT ON EMP.DEPTNO = DEPT.DEPTNO"),
+     *       <li>a VALUES clause ("VALUES ('Fred', 20)"),
+     *       <li>a query ("(SELECT * FROM EMP WHERE GENDER = 'F')"),
+     *       <li>or any combination of the above.
+     *     </ul>
+     *
+     * @param fieldNames Field aliases, usually come from AS clause
+     */
+    protected void convertFrom(Blackboard bb, SqlNode from, List<String> fieldNames) {
+        if (from == null) {
+            bb.setRoot(LogicalValues.createOneRow(cluster), false);
+            return;
+        }
+
+        final SqlCall call;
+        final SqlNode[] operands;
+        switch (from.getKind()) {
+            case AS:
+                call = (SqlCall) from;
+                SqlNode firstOperand = call.operand(0);
+                final List<String> fieldNameList = new ArrayList<>();
+                if (call.operandCount() > 2) {
+                    for (SqlNode node : Util.skip(call.getOperandList(), 2)) {
+                        fieldNameList.add(((SqlIdentifier) node).getSimple());
+                    }
+                }
+                convertFrom(bb, firstOperand, fieldNameList);
+                // flink modification: add a sub-query alias node to distinguish different query

Review Comment:
   ditto, add unified change flag



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org