You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "sujun (Jira)" <ji...@apache.org> on 2020/12/18 02:17:00 UTC

[jira] [Updated] (FLINK-20660) Time window operator with computed column triggers an exception in batch mode

     [ https://issues.apache.org/jira/browse/FLINK-20660?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

sujun updated FLINK-20660:
--------------------------
    Description: 
{{Time window operator with computed column triggers an exception in batch mode.}}

{{my test code:}}
{code:java}
public class WindowAggWithBigintTest {
      public static void main(String[] args) throws Exception {
            EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
            TableEnvironment tEnv = TableEnvironment.create(settings);            
            tEnv.registerFunction("longToTimestamp",new LongToTimestamp());   
         
            String ddl = "CREATE TABLE source(occur_time bigint,rowtime AS longToTimestamp(occur_time)) WITH ('connector' = 'filesystem', 'format' = 'orc', 'path' = '/path/to/orc')";
            
            tEnv.executeSql(ddl);
             
            Table table = tEnv.sqlQuery("select TUMBLE_START(rowtime, INTERVAL '1' HOUR) as ts,count(1) as ct from source group by TUMBLE(rowtime, INTERVAL '1' HOUR)");    
        
            DiscardingOutputFormat<String> outputFormat = new DiscardingOutputFormat();
            TableResultSink tableResultSink = new TableResultSink(table.getSchema(), outputFormat);
            tEnv.registerTableSink("sink",tableResultSink);
            table.insertInto("sink");
            tEnv.execute("test");
      }    
  
      private static class TableResultSink implements StreamTableSink<String> {
            private final TableSchema schema;
            private final DataType rowType;
            private final OutputFormat<String> outputFormat;
            
            TableResultSink(TableSchema schema, OutputFormat<String> outputFormat) {
                  this.schema = schema;
                  this.rowType = schema.toRowDataType();
                  this.outputFormat = outputFormat;
            }            
            @Override
            public DataType getConsumedDataType() {
                  return rowType;
            }            
            @Override
            public TableSchema getTableSchema() {
                  return schema;
            }            
            @Override
            public TableSink<String> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
                  throw new UnsupportedOperationException(
                              "This sink is configured by passing a static schema when initiating");
            }            
            @Override
            public DataStreamSink<?> consumeDataStream(DataStream<String> dataStream) {
                  return dataStream.writeUsingOutputFormat(outputFormat).setParallelism(1).name("tableResult");
            }
      }
}
{code}
 

Exception:

 
{code:java}
Exception in thread "main" java.lang.RuntimeException: Error while applying rule BatchExecWindowAggregateRule, args [rel#264:FlinkLogicalWindowAggregate.LOGICAL.any.[](input=RelSubset#263,group={},ct=COUNT(),window=TumblingGroupWindow('w$, $f0, 3600000),properties=w$start, w$end, w$rowtime), rel#250:FlinkLogicalLegacyTableSourceScan.LOGICAL.any.[](table=[default_catalog, default_database, source, source: [FileSystemTableSource(occur_time, rowtime)]],fields=occur_time, rowtime)]
      at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:244)
      at org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:636)
      at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:327)
      at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)
      at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
      at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
      at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
      at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
      at scala.collection.Iterator$class.foreach(Iterator.scala:893)
      at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
      at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
      at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
      at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
      at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
      at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
      at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeTree(BatchCommonSubGraphBasedOptimizer.scala:86)
      at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.org$apache$flink$table$planner$plan$optimize$BatchCommonSubGraphBasedOptimizer$$optimizeBlock(BatchCommonSubGraphBasedOptimizer.scala:57)
      at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer$$anonfun$doOptimize$1.apply(BatchCommonSubGraphBasedOptimizer.scala:45)
      at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer$$anonfun$doOptimize$1.apply(BatchCommonSubGraphBasedOptimizer.scala:45)
      at scala.collection.immutable.List.foreach(List.scala:381)
      at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.doOptimize(BatchCommonSubGraphBasedOptimizer.scala:45)
      at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
      at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279)
      at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164)
      at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248)
      at org.apache.flink.table.api.internal.TableEnvironmentImpl.translateAndClearBuffer(TableEnvironmentImpl.java:1240)
      at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197)
      at example.WindowAggWithBigintTest.main(WindowAggWithBigintTest.java:34)
Caused by: java.lang.IllegalArgumentException: field [$f0] not found; input fields are: [occur_time]
      at org.apache.calcite.tools.RelBuilder.field(RelBuilder.java:402)
      at org.apache.calcite.tools.RelBuilder.field(RelBuilder.java:385)
      at org.apache.flink.table.planner.plan.utils.AggregateUtil$.timeFieldIndex(AggregateUtil.scala:720)
      at org.apache.flink.table.planner.plan.rules.physical.batch.BatchExecWindowAggregateRule.transformTimeSlidingWindow(BatchExecWindowAggregateRule.scala:161)
      at org.apache.flink.table.planner.plan.rules.physical.batch.BatchExecWindowAggregateRule.onMatch(BatchExecWindowAggregateRule.scala:111)
      at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:217)
      ... 27 more
{code}

  was:
{{Time window operator with computed column triggers an exception in batch mode.}}

{{my test code:}}
{code:java}
public class WindowAggWithBigintTest {
      public static void main(String[] args) throws Exception {
            EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
            TableEnvironment tEnv = TableEnvironment.create(settings);            tEnv.registerFunction("longToTimestamp",new LongToTimestamp());            String ddl = "CREATE TABLE source(occur_time bigint,rowtime AS longToTimestamp(occur_time)) WITH ('connector' = 'filesystem', 'format' = 'orc', 'path' = '/path/to/orc')";
            tEnv.executeSql(ddl);            Table table = tEnv.sqlQuery("select TUMBLE_START(rowtime, INTERVAL '1' HOUR) as ts,count(1) as ct from source group by TUMBLE(rowtime, INTERVAL '1' HOUR)");            DiscardingOutputFormat<String> outputFormat = new DiscardingOutputFormat();
            TableResultSink tableResultSink = new TableResultSink(table.getSchema(), outputFormat);
            tEnv.registerTableSink("sink",tableResultSink);
            table.insertInto("sink");
            tEnv.execute("test");
      }      private static class TableResultSink implements StreamTableSink<String> {
            private final TableSchema schema;
            private final DataType rowType;
            private final OutputFormat<String> outputFormat;            TableResultSink(TableSchema schema, OutputFormat<String> outputFormat) {
                  this.schema = schema;
                  this.rowType = schema.toRowDataType();
                  this.outputFormat = outputFormat;
            }            @Override
            public DataType getConsumedDataType() {
                  return rowType;
            }            @Override
            public TableSchema getTableSchema() {
                  return schema;
            }            @Override
            public TableSink<String> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
                  throw new UnsupportedOperationException(
                              "This sink is configured by passing a static schema when initiating");
            }            @Override
            public DataStreamSink<?> consumeDataStream(DataStream<String> dataStream) {
                  return dataStream.writeUsingOutputFormat(outputFormat).setParallelism(1).name("tableResult");
            }
      }
}
{code}
 

Exception:

 
{code:java}
Exception in thread "main" java.lang.RuntimeException: Error while applying rule BatchExecWindowAggregateRule, args [rel#264:FlinkLogicalWindowAggregate.LOGICAL.any.[](input=RelSubset#263,group={},ct=COUNT(),window=TumblingGroupWindow('w$, $f0, 3600000),properties=w$start, w$end, w$rowtime), rel#250:FlinkLogicalLegacyTableSourceScan.LOGICAL.any.[](table=[default_catalog, default_database, source, source: [FileSystemTableSource(occur_time, rowtime)]],fields=occur_time, rowtime)]
      at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:244)
      at org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:636)
      at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:327)
      at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)
      at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
      at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
      at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
      at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
      at scala.collection.Iterator$class.foreach(Iterator.scala:893)
      at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
      at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
      at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
      at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
      at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
      at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
      at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeTree(BatchCommonSubGraphBasedOptimizer.scala:86)
      at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.org$apache$flink$table$planner$plan$optimize$BatchCommonSubGraphBasedOptimizer$$optimizeBlock(BatchCommonSubGraphBasedOptimizer.scala:57)
      at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer$$anonfun$doOptimize$1.apply(BatchCommonSubGraphBasedOptimizer.scala:45)
      at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer$$anonfun$doOptimize$1.apply(BatchCommonSubGraphBasedOptimizer.scala:45)
      at scala.collection.immutable.List.foreach(List.scala:381)
      at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.doOptimize(BatchCommonSubGraphBasedOptimizer.scala:45)
      at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
      at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279)
      at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164)
      at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248)
      at org.apache.flink.table.api.internal.TableEnvironmentImpl.translateAndClearBuffer(TableEnvironmentImpl.java:1240)
      at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197)
      at example.WindowAggWithBigintTest.main(WindowAggWithBigintTest.java:34)
Caused by: java.lang.IllegalArgumentException: field [$f0] not found; input fields are: [occur_time]
      at org.apache.calcite.tools.RelBuilder.field(RelBuilder.java:402)
      at org.apache.calcite.tools.RelBuilder.field(RelBuilder.java:385)
      at org.apache.flink.table.planner.plan.utils.AggregateUtil$.timeFieldIndex(AggregateUtil.scala:720)
      at org.apache.flink.table.planner.plan.rules.physical.batch.BatchExecWindowAggregateRule.transformTimeSlidingWindow(BatchExecWindowAggregateRule.scala:161)
      at org.apache.flink.table.planner.plan.rules.physical.batch.BatchExecWindowAggregateRule.onMatch(BatchExecWindowAggregateRule.scala:111)
      at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:217)
      ... 27 more
{code}


> Time window operator with computed column triggers an exception in batch mode
> -----------------------------------------------------------------------------
>
>                 Key: FLINK-20660
>                 URL: https://issues.apache.org/jira/browse/FLINK-20660
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.11.0
>            Reporter: sujun
>            Priority: Minor
>
> {{Time window operator with computed column triggers an exception in batch mode.}}
> {{my test code:}}
> {code:java}
> public class WindowAggWithBigintTest {
>       public static void main(String[] args) throws Exception {
>             EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
>             TableEnvironment tEnv = TableEnvironment.create(settings);            
>             tEnv.registerFunction("longToTimestamp",new LongToTimestamp());   
>          
>             String ddl = "CREATE TABLE source(occur_time bigint,rowtime AS longToTimestamp(occur_time)) WITH ('connector' = 'filesystem', 'format' = 'orc', 'path' = '/path/to/orc')";
>             
>             tEnv.executeSql(ddl);
>              
>             Table table = tEnv.sqlQuery("select TUMBLE_START(rowtime, INTERVAL '1' HOUR) as ts,count(1) as ct from source group by TUMBLE(rowtime, INTERVAL '1' HOUR)");    
>         
>             DiscardingOutputFormat<String> outputFormat = new DiscardingOutputFormat();
>             TableResultSink tableResultSink = new TableResultSink(table.getSchema(), outputFormat);
>             tEnv.registerTableSink("sink",tableResultSink);
>             table.insertInto("sink");
>             tEnv.execute("test");
>       }    
>   
>       private static class TableResultSink implements StreamTableSink<String> {
>             private final TableSchema schema;
>             private final DataType rowType;
>             private final OutputFormat<String> outputFormat;
>             
>             TableResultSink(TableSchema schema, OutputFormat<String> outputFormat) {
>                   this.schema = schema;
>                   this.rowType = schema.toRowDataType();
>                   this.outputFormat = outputFormat;
>             }            
>             @Override
>             public DataType getConsumedDataType() {
>                   return rowType;
>             }            
>             @Override
>             public TableSchema getTableSchema() {
>                   return schema;
>             }            
>             @Override
>             public TableSink<String> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
>                   throw new UnsupportedOperationException(
>                               "This sink is configured by passing a static schema when initiating");
>             }            
>             @Override
>             public DataStreamSink<?> consumeDataStream(DataStream<String> dataStream) {
>                   return dataStream.writeUsingOutputFormat(outputFormat).setParallelism(1).name("tableResult");
>             }
>       }
> }
> {code}
>  
> Exception:
>  
> {code:java}
> Exception in thread "main" java.lang.RuntimeException: Error while applying rule BatchExecWindowAggregateRule, args [rel#264:FlinkLogicalWindowAggregate.LOGICAL.any.[](input=RelSubset#263,group={},ct=COUNT(),window=TumblingGroupWindow('w$, $f0, 3600000),properties=w$start, w$end, w$rowtime), rel#250:FlinkLogicalLegacyTableSourceScan.LOGICAL.any.[](table=[default_catalog, default_database, source, source: [FileSystemTableSource(occur_time, rowtime)]],fields=occur_time, rowtime)]
>       at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:244)
>       at org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:636)
>       at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:327)
>       at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)
>       at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
>       at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
>       at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>       at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>       at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>       at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
>       at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>       at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>       at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
>       at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>       at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
>       at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeTree(BatchCommonSubGraphBasedOptimizer.scala:86)
>       at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.org$apache$flink$table$planner$plan$optimize$BatchCommonSubGraphBasedOptimizer$$optimizeBlock(BatchCommonSubGraphBasedOptimizer.scala:57)
>       at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer$$anonfun$doOptimize$1.apply(BatchCommonSubGraphBasedOptimizer.scala:45)
>       at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer$$anonfun$doOptimize$1.apply(BatchCommonSubGraphBasedOptimizer.scala:45)
>       at scala.collection.immutable.List.foreach(List.scala:381)
>       at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.doOptimize(BatchCommonSubGraphBasedOptimizer.scala:45)
>       at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
>       at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279)
>       at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164)
>       at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248)
>       at org.apache.flink.table.api.internal.TableEnvironmentImpl.translateAndClearBuffer(TableEnvironmentImpl.java:1240)
>       at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197)
>       at example.WindowAggWithBigintTest.main(WindowAggWithBigintTest.java:34)
> Caused by: java.lang.IllegalArgumentException: field [$f0] not found; input fields are: [occur_time]
>       at org.apache.calcite.tools.RelBuilder.field(RelBuilder.java:402)
>       at org.apache.calcite.tools.RelBuilder.field(RelBuilder.java:385)
>       at org.apache.flink.table.planner.plan.utils.AggregateUtil$.timeFieldIndex(AggregateUtil.scala:720)
>       at org.apache.flink.table.planner.plan.rules.physical.batch.BatchExecWindowAggregateRule.transformTimeSlidingWindow(BatchExecWindowAggregateRule.scala:161)
>       at org.apache.flink.table.planner.plan.rules.physical.batch.BatchExecWindowAggregateRule.onMatch(BatchExecWindowAggregateRule.scala:111)
>       at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:217)
>       ... 27 more
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)