You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Bhagavan (Jira)" <ji...@apache.org> on 2020/01/21 18:17:00 UTC

[jira] [Created] (FLINK-15728) JDBCUpsertOutputFormat does not set bind parameter keyFields in updateStaement

Bhagavan created FLINK-15728:
--------------------------------

             Summary: JDBCUpsertOutputFormat does not set bind parameter keyFields in updateStaement
                 Key: FLINK-15728
                 URL: https://issues.apache.org/jira/browse/FLINK-15728
             Project: Flink
          Issue Type: Bug
          Components: Connectors / JDBC
    Affects Versions: 1.9.1
            Reporter: Bhagavan


When using JDBCUpsertOutputFormat custom dialect e.g. H2/Oracle which uses UpsertWriterUsingInsertUpdateStatement, code fails with below error.
{code:java}
Caused by: org.h2.jdbc.JdbcSQLDataException: Parameter "#6" is not set [90012-200]Caused by: org.h2.jdbc.JdbcSQLDataException: Parameter "#6" is not set [90012-200] at org.h2.message.DbException.getJdbcSQLException(DbException.java:590) at org.h2.message.DbException.getJdbcSQLException(DbException.java:429) at org.h2.message.DbException.get(DbException.java:205) at org.h2.message.DbException.get(DbException.java:181) at org.h2.expression.Parameter.checkSet(Parameter.java:83) at org.h2.jdbc.JdbcPreparedStatement.addBatch(JdbcPreparedStatement.java:1275) at org.apache.flink.api.java.io.jdbc.writer.UpsertWriter$UpsertWriterUsingInsertUpdateStatement.processOneRowInBatch(UpsertWriter.java:233) at org.apache.flink.api.java.io.jdbc.writer.UpsertWriter.executeBatch(UpsertWriter.java:111) {code}
This is due UpsertWriterUsingInsertUpdateStatement#processOneRowInBatch
 does not set all bind paramters in case of Update.

This bug does get surfaced while using Derby DB. 
 In JDBCUpsertOutputFormatTest if we replace Derby with H2 we can reproduce the bug.

The fix is trivial. Happy to raise PR.
{code:java}
//for update case replace below
setRecordToStatement(updateStatement, fieldTypes, row); 
//with
setRecordToStatement(updateStatement, fieldTypes + pkTypes, row  + pkRow);
//NOTE:  as prepared updateStatement contains addition where clause we need pass additional bind values and its sql Types



{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)