You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "sujun (Jira)" <ji...@apache.org> on 2020/01/13 03:11:00 UTC

[jira] [Updated] (FLINK-15563) When using ParquetTableSource, The program hangs until OOM

     [ https://issues.apache.org/jira/browse/FLINK-15563?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

sujun updated FLINK-15563:
--------------------------
    Description: 
This is my code:
{code:java}
def main(args: Array[String]): Unit = {    
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    val tableEnv = StreamTableEnvironment.create(env)    val schema = "{\"type\":\"record\",\"name\":\"root\",\"fields\":[{\"name\":\"log_id\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"city\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"log_from\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"ip\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"type\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"data_source\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"is_scan\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"result\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"timelong\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"is_sec\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"event_name\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"id\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"time_string\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"device\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"timestamp_string\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"occur_time\",\"type\":[\"null\",{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}],\"default\":null},{\"name\":\"row_time\",\"type\":[\"null\",{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}],\"default\":null}]}"
    
    val parquetTableSource: ParquetTableSource = ParquetTableSource
            .builder
            .forParquetSchema(new org.apache.parquet.avro.AvroSchemaConverter().convert(
                org.apache.avro.Schema.parse(schema, true)))
            .path("/Users/sujun/Documents/tmp/login_data")
            .build    
    tableEnv.registerTableSource("source",parquetTableSource)
    val t1 = tableEnv.sqlQuery("select log_id,city from source where city = '274' ")
    tableEnv.registerTable("t1",t1)    
    val t2 = tableEnv.sqlQuery("select * from t1 where log_id='5927070661978133'")
    t2.toAppendStream[Row].print()    
    env.execute()}
{code}
 
When the two SQLS each have a where condition, the main program will hang until OOM. When the filter push down code of ParquetTableSource is deleted, the program runs normally.
 
Through my debugging, I found that the program hangs in the org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp method
 
May be a bug in the calcul optimizer caused by filter push down code
 

  was:
def main(args: Array[String]): Unit = \{
val env = StreamExecutionEnvironment.getExecutionEnvironment
 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
 val tableEnv = StreamTableEnvironment.create(env)
val schema = "{\"type\":\"record\",\"name\":\"root\",\"fields\":[{\"name\":\"log_id\",\"type\":[\"null\",\"string\"],\"default\":null},\{\"name\":\"city\",\"type\":[\"null\",\"string\"],\"default\":null},\{\"name\":\"log_from\",\"type\":[\"null\",\"string\"],\"default\":null},\{\"name\":\"ip\",\"type\":[\"null\",\"string\"],\"default\":null},\{\"name\":\"type\",\"type\":[\"null\",\"string\"],\"default\":null},\{\"name\":\"data_source\",\"type\":[\"null\",\"string\"],\"default\":null},\{\"name\":\"is_scan\",\"type\":[\"null\",\"string\"],\"default\":null},\{\"name\":\"result\",\"type\":[\"null\",\"string\"],\"default\":null},\{\"name\":\"timelong\",\"type\":[\"null\",\"long\"],\"default\":null},\{\"name\":\"is_sec\",\"type\":[\"null\",\"string\"],\"default\":null},\{\"name\":\"event_name\",\"type\":[\"null\",\"string\"],\"default\":null},\{\"name\":\"id\",\"type\":[\"null\",\"string\"],\"default\":null},\{\"name\":\"time_string\",\"type\":[\"null\",\"string\"],\"default\":null},\{\"name\":\"device\",\"type\":[\"null\",\"string\"],\"default\":null},\{\"name\":\"timestamp_string\",\"type\":[\"null\",\"string\"],\"default\":null},\{\"name\":\"occur_time\",\"type\":[\"null\",{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}],\"default\":null},\{\"name\":\"row_time\",\"type\":[\"null\",{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}],\"default\":null}]}"**
 val parquetTableSource: ParquetTableSource = ParquetTableSource
 .builder
 .forParquetSchema(new org.apache.parquet.avro.AvroSchemaConverter().convert(
 org.apache.avro.Schema.parse(schema, true)))
 .path("/path/to/login_data")
 .build
tableEnv.registerTableSource("source",parquetTableSource)

 val t1 = tableEnv.sqlQuery("select log_id,city from source where city = '274' ")
 tableEnv.registerTable("t1",t1)
val t4 = tableEnv.sqlQuery("select * from t1 where log_id='5927070661978133'")
 t1.toAppendStream[Row].print()
env.execute()
}


>  When using ParquetTableSource, The program hangs until OOM
> -----------------------------------------------------------
>
>                 Key: FLINK-15563
>                 URL: https://issues.apache.org/jira/browse/FLINK-15563
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / FileSystem
>    Affects Versions: 1.8.1, 1.9.1
>            Reporter: sujun
>            Priority: Critical
>
> This is my code:
> {code:java}
> def main(args: Array[String]): Unit = {    
>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>     val tableEnv = StreamTableEnvironment.create(env)    val schema = "{\"type\":\"record\",\"name\":\"root\",\"fields\":[{\"name\":\"log_id\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"city\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"log_from\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"ip\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"type\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"data_source\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"is_scan\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"result\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"timelong\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"is_sec\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"event_name\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"id\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"time_string\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"device\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"timestamp_string\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"occur_time\",\"type\":[\"null\",{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}],\"default\":null},{\"name\":\"row_time\",\"type\":[\"null\",{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}],\"default\":null}]}"
>     
>     val parquetTableSource: ParquetTableSource = ParquetTableSource
>             .builder
>             .forParquetSchema(new org.apache.parquet.avro.AvroSchemaConverter().convert(
>                 org.apache.avro.Schema.parse(schema, true)))
>             .path("/Users/sujun/Documents/tmp/login_data")
>             .build    
>     tableEnv.registerTableSource("source",parquetTableSource)
>     val t1 = tableEnv.sqlQuery("select log_id,city from source where city = '274' ")
>     tableEnv.registerTable("t1",t1)    
>     val t2 = tableEnv.sqlQuery("select * from t1 where log_id='5927070661978133'")
>     t2.toAppendStream[Row].print()    
>     env.execute()}
> {code}
>  
> When the two SQLS each have a where condition, the main program will hang until OOM. When the filter push down code of ParquetTableSource is deleted, the program runs normally.
>  
> Through my debugging, I found that the program hangs in the org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp method
>  
> May be a bug in the calcul optimizer caused by filter push down code
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)