You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Leonard Xu (Jira)" <ji...@apache.org> on 2020/04/23 08:44:00 UTC

[jira] [Closed] (FLINK-17335) JDBCUpsertTableSink Upsert mysql exception No value specified for parameter 1

     [ https://issues.apache.org/jira/browse/FLINK-17335?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Leonard Xu closed FLINK-17335.
------------------------------
    Resolution: Cannot Reproduce

Hi, [~yutaochina]

I closed the issue because it can not reproduce and may import from the code that you modified in your local branch as we communicated in `user-zh` mail list.

> JDBCUpsertTableSink Upsert mysql exception No value specified for parameter 1
> -----------------------------------------------------------------------------
>
>                 Key: FLINK-17335
>                 URL: https://issues.apache.org/jira/browse/FLINK-17335
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / JDBC
>    Affects Versions: 1.10.0
>            Reporter: yutao
>            Priority: Major
>
> JDBCUpsertTableSink build = JDBCUpsertTableSink.builder()  
> .setTableSchema(results.getSchema())
> .setOptions(JDBCOptions.builder()
> .setDBUrl("。。。。MultiQueries=true&useUnicode=true&characterEncoding=UTF-8")
> .setDriverName("com.mysql.jdbc.Driver")
> .setUsername("***")
> .setPassword("***")
> .setTableName("xkf_join_result")
> .build())
> .setFlushIntervalMills(1000)
> .setFlushMaxSize(100)
> .setMaxRetryTimes(3)
> .build();
> DataStream<Tuple2<Boolean, Row>> retract = bsTableEnv.toRetractStream(results, Row.class);
> retract.print();
> build.emitDataStream(retract);
>  java.sql.SQLException: No value specified for parameter 1
> at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:965)
> at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:898)
> at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:887)
> at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:861)
> at com.mysql.jdbc.PreparedStatement.checkAllParametersSet(PreparedStatement.java:2211)
> at com.mysql.jdbc.PreparedStatement.fillSendPacket(PreparedStatement.java:2191)
> at com.mysql.jdbc.PreparedStatement.fillSendPacket(PreparedStatement.java:2121)
> at com.mysql.jdbc.PreparedStatement.execute(PreparedStatement.java:1162)
> at org.apache.flink.api.java.io.jdbc.writer.UpsertWriter.executeBatch(UpsertWriter.java:118)
> at org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.flush(JDBCUpsertOutputFormat.java:159)
> at org.apache.flink.api.java.io.jdbc.JDBCUpsertSinkFunction.snapshotState(JDBCUpsertSinkFunction.java:56)
> at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
> at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:402)
> at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1420)
> at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1354)
> in code  UpsertWriter you can see executeBatch() method ;when only one record 
> and tuple2’s  first element is true then  end  for round ;deleteStatement.executeBatch(); get error
> @Override
> public void executeBatch() throws SQLException {
> if (keyToRows.size() > 0) {
> for (Map.Entry<Row, Tuple2<Boolean, Row>> entry : keyToRows.entrySet()) {
> Row pk = entry.getKey();
> Tuple2<Boolean, Row> tuple = entry.getValue();
> if (tuple.f0) {
> processOneRowInBatch(pk, tuple.f1);
> } else {
> setRecordToStatement(deleteStatement, pkTypes, pk);
> deleteStatement.addBatch();
> }
> }
> internalExecuteBatch();
> deleteStatement.executeBatch();
> keyToRows.clear();
> }
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)