You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 1101300123 <hd...@163.com> on 2020/04/22 15:10:27 UTC

回复: 关于RetractStream流写mysql出现java.sql.SQLException: No value specified for parameter 1问题

好的,我先换了看看,之后建jira


在2020年4月22日 22:38,Jingsong Li<ji...@gmail.com> 写道:
Hi,

- JDBC是upsert
sink,所以你需要toUpsertStream,而不是toRetractStream,建议你用完整的DDL来插入mysql的表。
- 这个异常看起来是JDBC的bug,你可以建个JIRA来跟踪吗?

Best,
Jingsong Lee

On Wed, Apr 22, 2020 at 9:58 PM 1101300123 <hd...@163.com> wrote:



我在SQL关联后把结果写入mysql出现          No value specified for parameter 1错误?
我的版本是1.10.0,代码如下
JDBCUpsertTableSink build = JDBCUpsertTableSink.builder()
.setTableSchema(results.getSchema())
.setOptions(JDBCOptions.builder()

.setDBUrl("。。。。MultiQueries=true&useUnicode=true&characterEncoding=UTF-8")
.setDriverName("com.mysql.jdbc.Driver")
.setUsername("jczx_cjch")
.setPassword("jczx_cjch2")
.setTableName("xkf_join_result")
.build())
.setFlushIntervalMills(1000)
.setFlushMaxSize(100)
.setMaxRetryTimes(3)
.build();


DataStream<Tuple2<Boolean, Row>> retract =
bsTableEnv.toRetractStream(results, Row.class);
retract.print();
build.emitDataStream(retract);




就会出现如下错误
java.sql.SQLException: No value specified for parameter 1
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:965)
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:898)
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:887)
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:861)
at
com.mysql.jdbc.PreparedStatement.checkAllParametersSet(PreparedStatement.java:2211)
at
com.mysql.jdbc.PreparedStatement.fillSendPacket(PreparedStatement.java:2191)
at
com.mysql.jdbc.PreparedStatement.fillSendPacket(PreparedStatement.java:2121)
at com.mysql.jdbc.PreparedStatement.execute(PreparedStatement.java:1162)
at org.apache.flink.api.java.io
.jdbc.writer.UpsertWriter.executeBatch(UpsertWriter.java:118)
at org.apache.flink.api.java.io
.jdbc.JDBCUpsertOutputFormat.flush(JDBCUpsertOutputFormat.java:159)
at org.apache.flink.api.java.io
.jdbc.JDBCUpsertSinkFunction.snapshotState(JDBCUpsertSinkFunction.java:56)
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:402)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1420)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1354)




我的输出数据是(true,2020-04-22 21:34:00,2020-04-22
21:34:15,20200422213541465568468)是这样的
我查看源码发现
先调用JDBCUpsertOutputFormat类的writeRecord方法给UpsertWriter类的成员变量map中添加元素
@Override
public synchronized void writeRecord(Tuple2<Boolean, Row> tuple2) throws
IOException {
checkFlushException();
try {
jdbcWriter.addRecord(tuple2);
batchCount++;
if (batchCount >= flushMaxSize) {
flush();
}
} catch (Exception e) {
throw new RuntimeException("Writing records to JDBC failed.", e);
}
}
之后调用flush()方法,调用UpsertWriter类执行executeBatch方法
public synchronized void flush() throws Exception {
checkFlushException();
for (int i = 1; i <= maxRetryTimes; i++) {
try {
jdbcWriter.executeBatch();
batchCount = 0;
break;
} catch (SQLException e) {
LOG.error("JDBC executeBatch error, retry times = {}", i, e);
if (i >= maxRetryTimes) {
throw e;
}
Thread.sleep(1000 * i);
}
}
}


然后会调用UpsertWriter类
实现JDBCWriter类在executeBatch方法中先判断map是否为空,然后循环map;之后判断2元组第一个元素的true调用内部类处理元素,否则删除数据;但是我每次的数据只有一条就是map的大小是1,且2元组的第一个元素值是true,循环结束执行
deleteStatement.executeBatch();方法就会出错,因为删除的语句站位符还没有填充;


@Override
public void executeBatch() throws SQLException {
if (keyToRows.size() > 0) {
for (Map.Entry<Row, Tuple2<Boolean, Row>> entry : keyToRows.entrySet())
{
Row pk = entry.getKey();
Tuple2<Boolean, Row> tuple = entry.getValue();
if (tuple.f0) {
processOneRowInBatch(pk, tuple.f1);
} else {
setRecordToStatement(deleteStatement, pkTypes, pk);
deleteStatement.addBatch();
}
}
internalExecuteBatch();
deleteStatement.executeBatch();
keyToRows.clear();
}
}



--
Best, Jingsong Lee