You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Flink Jira Bot (Jira)" <ji...@apache.org> on 2022/01/03 10:40:00 UTC
[jira] [Updated] (FLINK-18652) JDBCAppendTableSink to ClickHouse (data always repeating)
[ https://issues.apache.org/jira/browse/FLINK-18652?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Flink Jira Bot updated FLINK-18652:
-----------------------------------
Labels: auto-deprioritized-critical auto-deprioritized-major auto-deprioritized-minor (was: auto-deprioritized-critical auto-deprioritized-major stale-minor)
Priority: Not a Priority (was: Minor)
This issue was labeled "stale-minor" 7 days ago and has not received any updates so it is being deprioritized. If this ticket is actually Minor, please raise the priority and ask a committer to assign you the issue or revive the public discussion.
> JDBCAppendTableSink to ClickHouse (data always repeating)
> --------------------------------------------------------------
>
> Key: FLINK-18652
> URL: https://issues.apache.org/jira/browse/FLINK-18652
> Project: Flink
> Issue Type: Bug
> Components: Connectors / JDBC, Table SQL / Ecosystem
> Affects Versions: 1.10.0
> Reporter: mzz
> Priority: Not a Priority
> Labels: auto-deprioritized-critical, auto-deprioritized-major, auto-deprioritized-minor
> Attachments: FLINK-UI.png, checkpoint-failed.png
>
>
> Hi all,
> data stream is : kafka->flinkSQL->clickhouse。
> The window is 15 min,but,15 minutes after the first time, the data kepping repeat sink to ClickHouse, plz help me ,thx。
> {code:java}
> *// data source from kafka
> * streamTableEnvironment.sqlUpdate(createTableSql)
> LOG.info("kafka source table has created !")
> val groupTable = streamTableEnvironment.sqlQuery(tempSql)
> streamTableEnvironment.createTemporaryView("aggs_temp_table", groupTable)
> *// this is window sql ,use ProcessingTime
> * val re_table = streamTableEnvironment.sqlQuery(windowSql)
> re_table.printSchema()
> // groupTable.printSchema()
> val rr = streamTableEnvironment.toAppendStream[Result](re_table)
> * // The data here is printed normally
> * rr.print()
> streamTableEnvironment.createTemporaryView("result_table", rr)
> val s = streamTableEnvironment.sqlQuery(sql)
> *// sink to clickhouse*
> val sink: JDBCAppendTableSink = JDBCAppendTableSink.builder()
> .setDrivername("ru.yandex.clickhouse.ClickHouseDriver")
> .setDBUrl(URL)
> .setQuery(insertCKSql)
> .setUsername(USERNAME)
> .setPassword(PASSWORD)
> .setBatchSize(10000)
> .setParameterTypes(
> Types.LONG, Types.LONG, Types.STRING, Types.STRING, Types.STRING, Types.STRING,
> Types.STRING, Types.STRING, Types.STRING, Types.LONG, Types.LONG, Types.FLOAT,
> Types.LONG, Types.FLOAT, Types.LONG, Types.FLOAT, Types.FLOAT, Types.FLOAT, Types.LONG()
> )
> .build()
> streamTableEnvironment.registerTableSink("ckResult", Array[String]("data_date", "point", "platform", "page_name", "component_name", "booth_name", "position1", "advertiser",
> "adv_code", "request_num", "return_num", "fill_rate", "expose_num", "expose_rate", "click_num", "click_rate", "ecpm", "income", "created_at"),
> Array[TypeInformation[_]](Types.LONG, Types.LONG, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.LONG, Types.LONG, Types.FLOAT, Types.LONG, Types.FLOAT, Types.LONG, Types.FLOAT, Types.FLOAT, Types.FLOAT, Types.LONG()),
> sink)
> // insert into TableSink
> s.insertInto("ckResult")
> {code}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)