You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Lsw_aka_laplace (Jira)" <ji...@apache.org> on 2020/10/14 03:57:00 UTC

[jira] [Commented] (FLINK-19630) Sink data in [ORC] format into Hive By using Legacy Table API caused unexpected Exception

    [ https://issues.apache.org/jira/browse/FLINK-19630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17213579#comment-17213579 ] 

Lsw_aka_laplace commented on FLINK-19630:
-----------------------------------------

[~jark]

[~lzljs3620320]

 

Would u guys mind taking a glimpse~

> Sink data in [ORC] format into Hive By using Legacy Table API  caused unexpected Exception
> ------------------------------------------------------------------------------------------
>
>                 Key: FLINK-19630
>                 URL: https://issues.apache.org/jira/browse/FLINK-19630
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Hive, Table SQL / Ecosystem
>    Affects Versions: 1.11.2
>            Reporter: Lsw_aka_laplace
>            Priority: Critical
>         Attachments: image-2020-10-14-11-36-48-086.png, image-2020-10-14-11-41-53-379.png, image-2020-10-14-11-42-57-353.png, image-2020-10-14-11-48-51-310.png
>
>
> *ENV:*
> *Flink version 1.11.2*
> *Hive exec version: 2.0.1*
> *Hive file storing type :ORC*
> *SQL or Datastream: SQL API*
> *Kafka Connector :  custom Kafka connector which is based on Legacy API (TableSource/`org.apache.flink.types.Row`)*
> *Hive Connector : totally follows the Flink-Hive-connector (we only made some encapsulation upon it)*
> *Using StreamingFileCommitter:YES*
>  
>  
> *Description:*
>    try to execute the following SQL:
>     """
>       insert into hive_table (select * from kafka_table)
>     """
>    HIVE Table SQL seems like:
>     """
> CREATE TABLE `hive_table`(
>  // some fields
> PARTITIONED BY (
>  `dt` string,
>  `hour` string)
> STORED AS orc
> TBLPROPERTIES (
>  'orc.compress'='SNAPPY',
> 'type'='HIVE',
> 'sink.partition-commit.trigger'='process-time',
> 'sink.partition-commit.delay' = '1 h',
> 'sink.partition-commit.policy.kind' = 'metastore,success-file',
> )   
>    """
> When this job starts to process snapshot, here comes the weird exception:
> !image-2020-10-14-11-36-48-086.png|width=882,height=395!
> As we can see from the message:Owner thread shall be the [Legacy Source Thread], but actually the streamTaskThread which represents the whole first stage is found. 
> So I checked the Thread dump at once.
> !image-2020-10-14-11-41-53-379.png|width=801,height=244!
>                                                                      The legacy Source Thread
>  
> !image-2020-10-14-11-42-57-353.png|width=846,height=226!
>                                                                The StreamTask Thread
>  
>    According to the thread dump info and the Exception Message, I searched and read certain source code and then *{color:#ffab00}DID A TEST{color}*
>  
> {color:#172b4d}   Since the Kafka connector is customed, I tried to make the KafkaSource a serpated stage by changing the TaskChainStrategy to Never. The task topology as follows:{color}
> {color:#172b4d}!image-2020-10-14-11-48-51-310.png|width=753,height=208!{color}
>  
> {color:#505f79}*Fortunately, it did work! No Exception is throwed and Checkpoint could be snapshot successfully!*{color}
>  
>  
> So, from my perspective, there shall be something wrong when HiveWritingTask and  LegacySourceTask chained together. the Legacy source task is a seperated thread, which may be the cause of the exception mentioned above.
>  
>                                                                 
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)