You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "huajiewang (Jira)" <ji...@apache.org> on 2021/01/15 06:17:00 UTC

[jira] [Comment Edited] (FLINK-20972) TwoPhaseCommitSinkFunction Output a large amount of EventData

    [ https://issues.apache.org/jira/browse/FLINK-20972?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17265732#comment-17265732 ] 

huajiewang edited comment on FLINK-20972 at 1/15/21, 6:16 AM:
--------------------------------------------------------------

Hi [~gaoyunhaii] u are right, my Transaction class like this:

scala: 
{code:java}
case class Transaction(transactionId: String = UUID.randomUUID().toString, sql: ListBuffer[String] = ListBuffer.empty) extends Serializable {
  def +(text: String): Unit = sql += text
}{code}
java: 
{code:java}
class Transaction {
  private String transactionId;
  private List<String> sqlList;

  public Transaction() {
    this.transactionId = UUID.randomUUID().toString;
    this.sqlList = new ArrayList<>;
  }

  public void add(String sql) {
     this.sqlList.add(sql)
  }
}{code}
 

 

 


was (Author: benjobs):
u are right, my Transaction class is here:

scala:

 
{code:java}
case class Transaction(transactionId: String = UUID.randomUUID().toString, sql: ListBuffer[String] = ListBuffer.empty) extends Serializable {
  def +(text: String): Unit = sql += text
}{code}
java: 

 

> TwoPhaseCommitSinkFunction Output a large amount of EventData
> -------------------------------------------------------------
>
>                 Key: FLINK-20972
>                 URL: https://issues.apache.org/jira/browse/FLINK-20972
>             Project: Flink
>          Issue Type: Improvement
>          Components: API / DataStream
>    Affects Versions: 1.12.0
>         Environment: flink 1.4.0 +
>            Reporter: huajiewang
>            Priority: Minor
>              Labels: easyfix, pull-request-available
>         Attachments: 1610682498960.jpg, 1610682603148.jpg, Jdbc2PCSinkFunction.scala
>
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> in TwoPhaseCommitSinkFunctionOutput Maybe A large number of EventData will be output(log.info),which will cause IO bottleneck and disk waste.
>  
>  my code in the attachment, A large number event data output in the log output by flink , e.g: 
> {code:java}
> Jdbc2PCSinkFunction 1/1 - checkpoint 4 complete, committing transaction TransactionHolde {handle=Transaction(b420c880a951403984f231dd7e33597b, ListBuffer(insert into table(field1,field2) value ('11','22') ... ... ), transactionStartTime=1610426158532} from checkpoint 4{code}
> in TwoPhaseCommitSinkFunction about LOG.info code is as follows:
> !1610682498960.jpg|width=838,height=630!
> {code:java}
> LOG.info(
>         "{} - checkpoint {} complete, committing transaction {} from checkpoint {}",
>         name(),
>         checkpointId,
>         pendingTransaction,
>         pendingTransactionCheckpointId); {code}
> will be invoke pendingTransaction'toString method (pendingTransaction is TransactionHolder'instance) 
> TransactionHolder'toString method code is:
> !1610682603148.jpg|width=859,height=327!
> {code:java}
> @Override
> public String toString() {
>     return "TransactionHolder{"
>             + "handle="
>             +  handle
>             + ", transactionStartTime="
>             + transactionStartTime
>             + '}';
> }{code}
>  handle is the concrete realization of my Transaction! There is a parameter of List type in my Transaction, which is used to receive data. as a result, these data are printed out(log.info)
>   
>   
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)