You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "mzz (Jira)" <ji...@apache.org> on 2020/07/21 02:07:00 UTC

[jira] [Created] (FLINK-18652) JDBCAppendTableSink to ClickHouse (data always repeating)

mzz created FLINK-18652:
---------------------------

             Summary: JDBCAppendTableSink  to  ClickHouse  (data  always  repeating)
                 Key: FLINK-18652
                 URL: https://issues.apache.org/jira/browse/FLINK-18652
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / API
    Affects Versions: 1.10.0
            Reporter: mzz


Hi all,
   data stream is : kafka->flinkSQL->clickhouse。
   The  window is 15 min,but,15 minutes after the first time, the data kepping repeat sink to ClickHouse, plz  help me ,thx。
{code:java}
*// data source from kafka 
* streamTableEnvironment.sqlUpdate(createTableSql)
    LOG.info("kafka source table has created !")
    val groupTable = streamTableEnvironment.sqlQuery(tempSql)
    streamTableEnvironment.createTemporaryView("aggs_temp_table", groupTable)
*// this is window sql  ,use ProcessingTime
*    val re_table = streamTableEnvironment.sqlQuery(windowSql)
    re_table.printSchema()
    //    groupTable.printSchema()
    val rr = streamTableEnvironment.toAppendStream[Result](re_table)
* // The data here is printed normally
*    rr.print()

    streamTableEnvironment.createTemporaryView("result_table", rr)
    val s = streamTableEnvironment.sqlQuery(sql)

*// sink to clickhouse*
    val sink: JDBCAppendTableSink = JDBCAppendTableSink.builder()
      .setDrivername("ru.yandex.clickhouse.ClickHouseDriver")
      .setDBUrl(URL)
      .setQuery(insertCKSql)
      .setUsername(USERNAME)
      .setPassword(PASSWORD)
      .setBatchSize(10000)
      .setParameterTypes(
        Types.LONG, Types.LONG, Types.STRING, Types.STRING, Types.STRING, Types.STRING,
        Types.STRING, Types.STRING, Types.STRING, Types.LONG, Types.LONG, Types.FLOAT,
        Types.LONG, Types.FLOAT, Types.LONG, Types.FLOAT, Types.FLOAT, Types.FLOAT, Types.LONG()
      )
      .build()
    streamTableEnvironment.registerTableSink("ckResult", Array[String]("data_date", "point", "platform", "page_name", "component_name", "booth_name", "position1", "advertiser",
      "adv_code", "request_num", "return_num", "fill_rate", "expose_num", "expose_rate", "click_num", "click_rate", "ecpm", "income", "created_at"),
      Array[TypeInformation[_]](Types.LONG, Types.LONG, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.LONG, Types.LONG, Types.FLOAT, Types.LONG, Types.FLOAT, Types.LONG, Types.FLOAT, Types.FLOAT, Types.FLOAT, Types.LONG()),
      sink)


// insert into TableSink
    s.insertInto("ckResult")
{code}






--
This message was sent by Atlassian Jira
(v8.3.4#803005)