You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by Zhou Zach <wa...@163.com> on 2020/06/03 10:11:38 UTC

flink sink to mysql

hi all,
 flink sink to mysql 是推荐 继承RichSinkFunction的方式,还是 通过JDBCOutputFormat的方式?

Re:Re:Re:flink sink to mysql

Posted by Zhou Zach <wa...@163.com>.



code 代码乱码,重新截图一下:














在 2020-06-08 17:20:54,"Zhou Zach" <wa...@163.com> 写道:
>
>
>
>使用JDBCOutputFormat的方式,一直没成功啊
>
>
>code:
>object FromKafkaSinkJdbcByJdbcOutputFormat { def main(args: Array[String]): Unit = { val env = getEnv() val topic = "t4" val consumer = getFlinkKafkaConsumer(topic) consumer.setStartFromLatest() val sourceStream = env .addSource(consumer) .setParallelism(1) val mapDS = sourceStream.map(message => { try { JSON.parseObject(message, classOf[BehaviorData]) } catch { case _ => { println("read json failed") BehaviorData("", "", "", 0) } } }) val rowDS = mapDS .map(behaviorData => { println(s"behaviorData: ****************** $behaviorData") val row: Row = new Row(6) row.setField(0, behaviorData.uid.getBytes("UTF-8")) row.setField(1, behaviorData.time.getBytes("UTF-8")) row.setField(2, behaviorData.phoneType.getBytes("UTF-8")) row.setField(3, behaviorData.clickCount.intValue()) row }) rowDS.print() val sql = "query = INSERT INTO user_behavior (uid, time, phoneType, clickCount) VALUES (?, ?, ?, ?)" val jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat() .setDrivername("com.mysql.cj.jdbc.Driver") .setDBUrl("jdbc:mysql://localhost:3306/dashboard?useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true&autoReconnect=true") .setUsername("root") .setPassword("") .setQuery(sql) // .setSqlTypes(Array(Types.STRING, Types.STRING, Types.STRING, Types.LONG)) .finish() rowDS.writeUsingOutputFormat( jdbcOutput ) env.execute() } }
>
>
>
>
>当注释掉sink代码:
>rowDS.writeUsingOutputFormat( jdbcOutput )
>可以看到打印的日志:
>behaviorData: ****************** BehaviorData(6b67c8c700427dee7552f81f3228c927,1591607608894,iOS,7) 3> [54, 98, 54, 55, 99, 56, 99, 55, 48, 48, 52, 50, 55, 100, 101, 101, 55, 53, 53, 50, 102, 56, 49, 102, 51, 50, 50, 56, 99, 57, 50, 55],[49, 53, 57, 49, 54, 48, 55, 54, 48, 56, 56, 57, 52],[105, 79, 83],7,null,null behaviorData: ****************** BehaviorData(a95f22eabc4fd4b580c011a3161a9d9d,1591607609394,iOS,1) 4> [97, 57, 53, 102, 50, 50, 101, 97, 98, 99, 52, 102, 100, 52, 98, 53, 56, 48, 99, 48, 49, 49, 97, 51, 49, 54, 49, 97, 57, 100, 57, 100],[49, 53, 57, 49, 54, 48, 55, 54, 48, 57, 51, 57, 52],[105, 79, 83],1,null,null
>
>
>不注释掉sink代码:
>rowDS.writeUsingOutputFormat( jdbcOutput )
>
>就看不到日志,是不是定义的jdbcOutput不对啊
>
>
>在 2020-06-03 19:16:47,"chaojianok" <ch...@163.com> 写道:
>>推荐 JDBCOutputFormat 吧,简单易用。
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>在 2020-06-03 18:11:38,"Zhou Zach" <wa...@163.com> 写道:
>>>hi all,
>>> flink sink to mysql 是推荐 继承RichSinkFunction的方式,还是 通过JDBCOutputFormat的方式?

Re:Re:flink sink to mysql

Posted by Zhou Zach <wa...@163.com>.


使用JDBCOutputFormat的方式,一直没成功啊


code:
object FromKafkaSinkJdbcByJdbcOutputFormat { def main(args: Array[String]): Unit = { val env = getEnv() val topic = "t4" val consumer = getFlinkKafkaConsumer(topic) consumer.setStartFromLatest() val sourceStream = env .addSource(consumer) .setParallelism(1) val mapDS = sourceStream.map(message => { try { JSON.parseObject(message, classOf[BehaviorData]) } catch { case _ => { println("read json failed") BehaviorData("", "", "", 0) } } }) val rowDS = mapDS .map(behaviorData => { println(s"behaviorData: ****************** $behaviorData") val row: Row = new Row(6) row.setField(0, behaviorData.uid.getBytes("UTF-8")) row.setField(1, behaviorData.time.getBytes("UTF-8")) row.setField(2, behaviorData.phoneType.getBytes("UTF-8")) row.setField(3, behaviorData.clickCount.intValue()) row }) rowDS.print() val sql = "query = INSERT INTO user_behavior (uid, time, phoneType, clickCount) VALUES (?, ?, ?, ?)" val jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat() .setDrivername("com.mysql.cj.jdbc.Driver") .setDBUrl("jdbc:mysql://localhost:3306/dashboard?useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true&autoReconnect=true") .setUsername("root") .setPassword("") .setQuery(sql) // .setSqlTypes(Array(Types.STRING, Types.STRING, Types.STRING, Types.LONG)) .finish() rowDS.writeUsingOutputFormat( jdbcOutput ) env.execute() } }




当注释掉sink代码:
rowDS.writeUsingOutputFormat( jdbcOutput )
可以看到打印的日志:
behaviorData: ****************** BehaviorData(6b67c8c700427dee7552f81f3228c927,1591607608894,iOS,7) 3> [54, 98, 54, 55, 99, 56, 99, 55, 48, 48, 52, 50, 55, 100, 101, 101, 55, 53, 53, 50, 102, 56, 49, 102, 51, 50, 50, 56, 99, 57, 50, 55],[49, 53, 57, 49, 54, 48, 55, 54, 48, 56, 56, 57, 52],[105, 79, 83],7,null,null behaviorData: ****************** BehaviorData(a95f22eabc4fd4b580c011a3161a9d9d,1591607609394,iOS,1) 4> [97, 57, 53, 102, 50, 50, 101, 97, 98, 99, 52, 102, 100, 52, 98, 53, 56, 48, 99, 48, 49, 49, 97, 51, 49, 54, 49, 97, 57, 100, 57, 100],[49, 53, 57, 49, 54, 48, 55, 54, 48, 57, 51, 57, 52],[105, 79, 83],1,null,null


不注释掉sink代码:
rowDS.writeUsingOutputFormat( jdbcOutput )

就看不到日志,是不是定义的jdbcOutput不对啊


在 2020-06-03 19:16:47,"chaojianok" <ch...@163.com> 写道:
>推荐 JDBCOutputFormat 吧,简单易用。
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2020-06-03 18:11:38,"Zhou Zach" <wa...@163.com> 写道:
>>hi all,
>> flink sink to mysql 是推荐 继承RichSinkFunction的方式,还是 通过JDBCOutputFormat的方式?

Re:flink sink to mysql

Posted by chaojianok <ch...@163.com>.
推荐 JDBCOutputFormat 吧,简单易用。

















在 2020-06-03 18:11:38,"Zhou Zach" <wa...@163.com> 写道:
>hi all,
> flink sink to mysql 是推荐 继承RichSinkFunction的方式,还是 通过JDBCOutputFormat的方式?