You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/03/18 07:13:28 UTC

[GitHub] [flink] twalthr commented on a change in pull request #19113: [FLINK-26519][table] Remove FlinkTypeFactory singleton

twalthr commented on a change in pull request #19113:
URL: https://github.com/apache/flink/pull/19113#discussion_r829735651



##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/RowType.java
##########
@@ -180,7 +180,7 @@ public int getFieldCount() {
 
     public int getFieldIndex(String fieldName) {
         for (int i = 0; i < fields.size(); i++) {
-            if (fields.get(i).getName().equals(fieldName)) {
+            if (fields.get(i).getName().equalsIgnoreCase(fieldName)) {

Review comment:
       why this?

##########
File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkTypeSystem.scala
##########
@@ -28,7 +28,7 @@ import org.apache.calcite.sql.`type`.{SqlTypeName, SqlTypeUtil}
 /**
   * Custom type system for Flink.
   */
-class FlinkTypeSystem extends RelDataTypeSystemImpl {
+class FlinkTypeSystem private extends RelDataTypeSystemImpl {

Review comment:
       can we implement this class in Java  and have a proper `INSTANCE` constant instead of a method

##########
File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/CorrelateUtil.scala
##########
@@ -59,11 +59,11 @@ object CorrelateUtil {
 
   def projectCorrelateOutputType(
       originalType: RelDataType,
-      projectableFieldSet: Set[Int]): (RelDataType, ListBuffer[Int]) = {
+      projectableFieldSet: Set[Int],
+      flinkTypeFactory: FlinkTypeFactory): (RelDataType, ListBuffer[Int]) = {

Review comment:
       nit: `flinkTypeFactory` -> `typeFactory` here and at all other locations

##########
File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala
##########
@@ -712,16 +712,14 @@ object AggregateUtil extends Enumeration {
     }
 
     // count(*) not exist in aggregateCalls, insert a count(*) in it.
-    val typeFactory = new FlinkTypeFactory(new FlinkTypeSystem)
     if (indexOfCountStar.isEmpty) {
-
       val count1 = AggregateCall.create(
         SqlStdOperatorTable.COUNT,
         false,
         false,
         new util.ArrayList[Integer](),
         -1,
-        typeFactory.createFieldTypeFromLogicalType(new BigIntType()),
+        new BasicSqlType(FlinkTypeSystem.INSTANCE, SqlTypeName.BIGINT),

Review comment:
       This is not possible. Types need to be `canonized` and need to go through the type factory once in order to allow reference equality.

##########
File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RankUtil.scala
##########
@@ -24,6 +24,7 @@ import org.apache.flink.table.planner.codegen.ExpressionReducer
 import org.apache.flink.table.planner.plan.nodes.calcite.Rank
 import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalRank
 import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalDeduplicate
+import org.apache.flink.table.planner.utils.ShortcutUtils

Review comment:
       unnecessary change

##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/SourceAbilityContext.java
##########
@@ -45,12 +45,16 @@
  * </ul>
  */
 public class SourceAbilityContext implements FlinkContext {
+
     private final RowType sourceRowType;
     private final FlinkContext context;
+    private final FlinkTypeFactory flinkTypeFactory;
 
-    public SourceAbilityContext(FlinkContext context, RowType sourceRowType) {
+    public SourceAbilityContext(
+            FlinkContext context, RowType sourceRowType, FlinkTypeFactory flinkTypeFactory) {

Review comment:
       nit: `flinkTypeFactory` -> `typeFactory`

##########
File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalHashAggRule.scala
##########
@@ -25,6 +25,8 @@ import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalAggregate
 import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalHashAggregate
 import org.apache.flink.table.planner.plan.utils.PythonUtil.isPythonAggregate
 import org.apache.flink.table.planner.plan.utils.{AggregateUtil, OperatorType}
+import org.apache.flink.table.planner.utils.ShortcutUtils

Review comment:
       all changes in this class are unrelated to the actual issue

##########
File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalWindowAggregateRule.scala
##########
@@ -27,10 +27,12 @@ import org.apache.flink.table.planner.plan.`trait`.FlinkRelDistribution
 import org.apache.flink.table.planner.plan.logical.{LogicalWindow, SlidingGroupWindow, TumblingGroupWindow}
 import org.apache.flink.table.planner.plan.nodes.FlinkConventions
 import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalWindowAggregate
-import org.apache.flink.table.planner.plan.nodes.physical.batch.{BatchPhysicalLocalSortWindowAggregate, BatchPhysicalSortWindowAggregate, BatchPhysicalHashWindowAggregate, BatchPhysicalLocalHashWindowAggregate}
+import org.apache.flink.table.planner.plan.nodes.physical.batch.{BatchPhysicalHashWindowAggregate, BatchPhysicalLocalHashWindowAggregate, BatchPhysicalLocalSortWindowAggregate, BatchPhysicalSortWindowAggregate}

Review comment:
       all changes in this class are unrelated to the actual issue

##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/DynamicTableSourceSpec.java
##########
@@ -66,7 +67,8 @@ public DynamicTableSourceSpec(
         this.sourceAbilities = sourceAbilities;
     }
 
-    private DynamicTableSource getTableSource(FlinkContext flinkContext) {
+    private DynamicTableSource getTableSource(
+            FlinkContext flinkContext, FlinkTypeFactory flinkTypeFactory) {

Review comment:
       nit: remove `flink` prefixes, everything is `flink` in this code base

##########
File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/IncrementalAggregateRule.scala
##########
@@ -24,6 +24,8 @@ import org.apache.flink.table.planner.calcite.{FlinkContext, FlinkTypeFactory}
 import org.apache.flink.table.planner.plan.PartialFinalType
 import org.apache.flink.table.planner.plan.nodes.physical.stream.{StreamPhysicalExchange, StreamPhysicalGlobalGroupAggregate, StreamPhysicalIncrementalGroupAggregate, StreamPhysicalLocalGroupAggregate}
 import org.apache.flink.table.planner.plan.utils.AggregateUtil
+import org.apache.flink.table.planner.utils.ShortcutUtils

Review comment:
       all changes in this class are unrelated to the actual issue




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org