You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 1101300123 <hd...@163.com> on 2020/04/22 13:58:22 UTC

关于RetractStream流写mysql出现java.sql.SQLException: No value specified for parameter 1问题


我在SQL关联后把结果写入mysql出现          No value specified for parameter 1错误?
我的版本是1.10.0,代码如下
 JDBCUpsertTableSink build = JDBCUpsertTableSink.builder()  
 .setTableSchema(results.getSchema())
                .setOptions(JDBCOptions.builder()
                .setDBUrl("。。。。MultiQueries=true&useUnicode=true&characterEncoding=UTF-8")
                .setDriverName("com.mysql.jdbc.Driver")
                .setUsername("jczx_cjch")
                .setPassword("jczx_cjch2")
                        .setTableName("xkf_join_result")
                .build())
                .setFlushIntervalMills(1000)
                .setFlushMaxSize(100)
                .setMaxRetryTimes(3)
                .build();


 DataStream<Tuple2<Boolean, Row>> retract = bsTableEnv.toRetractStream(results, Row.class);
        retract.print();
  build.emitDataStream(retract);




就会出现如下错误
java.sql.SQLException: No value specified for parameter 1
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:965)
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:898)
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:887)
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:861)
at com.mysql.jdbc.PreparedStatement.checkAllParametersSet(PreparedStatement.java:2211)
at com.mysql.jdbc.PreparedStatement.fillSendPacket(PreparedStatement.java:2191)
at com.mysql.jdbc.PreparedStatement.fillSendPacket(PreparedStatement.java:2121)
at com.mysql.jdbc.PreparedStatement.execute(PreparedStatement.java:1162)
at org.apache.flink.api.java.io.jdbc.writer.UpsertWriter.executeBatch(UpsertWriter.java:118)
at org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.flush(JDBCUpsertOutputFormat.java:159)
at org.apache.flink.api.java.io.jdbc.JDBCUpsertSinkFunction.snapshotState(JDBCUpsertSinkFunction.java:56)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:402)
at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1420)
at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1354)




我的输出数据是(true,2020-04-22 21:34:00,2020-04-22 21:34:15,20200422213541465568468)是这样的
我查看源码发现
先调用JDBCUpsertOutputFormat类的writeRecord方法给UpsertWriter类的成员变量map中添加元素
@Override
 public synchronized void writeRecord(Tuple2<Boolean, Row> tuple2) throws IOException {
  checkFlushException();
  try {
   jdbcWriter.addRecord(tuple2);
   batchCount++;
   if (batchCount >= flushMaxSize) {
    flush();
   }
  } catch (Exception e) {
   throw new RuntimeException("Writing records to JDBC failed.", e);
  }
 }
之后调用flush()方法,调用UpsertWriter类执行executeBatch方法
public synchronized void flush() throws Exception {
  checkFlushException();
  for (int i = 1; i <= maxRetryTimes; i++) {
   try {
    jdbcWriter.executeBatch();
    batchCount = 0;
    break;
   } catch (SQLException e) {
    LOG.error("JDBC executeBatch error, retry times = {}", i, e);
    if (i >= maxRetryTimes) {
     throw e;
    }
    Thread.sleep(1000 * i);
   }
  }
 }


然后会调用UpsertWriter类 实现JDBCWriter类在executeBatch方法中先判断map是否为空,然后循环map;之后判断2元组第一个元素的true调用内部类处理元素,否则删除数据;但是我每次的数据只有一条就是map的大小是1,且2元组的第一个元素值是true,循环结束执行 deleteStatement.executeBatch();方法就会出错,因为删除的语句站位符还没有填充;
 

@Override
 public void executeBatch() throws SQLException {
  if (keyToRows.size() > 0) {
   for (Map.Entry<Row, Tuple2<Boolean, Row>> entry : keyToRows.entrySet()) {
    Row pk = entry.getKey();
    Tuple2<Boolean, Row> tuple = entry.getValue();
    if (tuple.f0) {
     processOneRowInBatch(pk, tuple.f1);
    } else {
     setRecordToStatement(deleteStatement, pkTypes, pk);
     deleteStatement.addBatch();
    }
   }
   internalExecuteBatch();
   deleteStatement.executeBatch();
   keyToRows.clear();
  }
 }

回复: 关于RetractStream流写mysql出现java.sql.SQLException: No value specified for parameter 1问题

Posted by 1101300123 <hd...@163.com>.
好的,我先换了看看,之后建jira


在2020年4月22日 22:38,Jingsong Li<ji...@gmail.com> 写道:
Hi,

- JDBC是upsert
sink,所以你需要toUpsertStream,而不是toRetractStream,建议你用完整的DDL来插入mysql的表。
- 这个异常看起来是JDBC的bug,你可以建个JIRA来跟踪吗?

Best,
Jingsong Lee

On Wed, Apr 22, 2020 at 9:58 PM 1101300123 <hd...@163.com> wrote:



我在SQL关联后把结果写入mysql出现          No value specified for parameter 1错误?
我的版本是1.10.0,代码如下
JDBCUpsertTableSink build = JDBCUpsertTableSink.builder()
.setTableSchema(results.getSchema())
.setOptions(JDBCOptions.builder()

.setDBUrl("。。。。MultiQueries=true&useUnicode=true&characterEncoding=UTF-8")
.setDriverName("com.mysql.jdbc.Driver")
.setUsername("jczx_cjch")
.setPassword("jczx_cjch2")
.setTableName("xkf_join_result")
.build())
.setFlushIntervalMills(1000)
.setFlushMaxSize(100)
.setMaxRetryTimes(3)
.build();


DataStream<Tuple2<Boolean, Row>> retract =
bsTableEnv.toRetractStream(results, Row.class);
retract.print();
build.emitDataStream(retract);




就会出现如下错误
java.sql.SQLException: No value specified for parameter 1
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:965)
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:898)
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:887)
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:861)
at
com.mysql.jdbc.PreparedStatement.checkAllParametersSet(PreparedStatement.java:2211)
at
com.mysql.jdbc.PreparedStatement.fillSendPacket(PreparedStatement.java:2191)
at
com.mysql.jdbc.PreparedStatement.fillSendPacket(PreparedStatement.java:2121)
at com.mysql.jdbc.PreparedStatement.execute(PreparedStatement.java:1162)
at org.apache.flink.api.java.io
.jdbc.writer.UpsertWriter.executeBatch(UpsertWriter.java:118)
at org.apache.flink.api.java.io
.jdbc.JDBCUpsertOutputFormat.flush(JDBCUpsertOutputFormat.java:159)
at org.apache.flink.api.java.io
.jdbc.JDBCUpsertSinkFunction.snapshotState(JDBCUpsertSinkFunction.java:56)
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:402)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1420)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1354)




我的输出数据是(true,2020-04-22 21:34:00,2020-04-22
21:34:15,20200422213541465568468)是这样的
我查看源码发现
先调用JDBCUpsertOutputFormat类的writeRecord方法给UpsertWriter类的成员变量map中添加元素
@Override
public synchronized void writeRecord(Tuple2<Boolean, Row> tuple2) throws
IOException {
checkFlushException();
try {
jdbcWriter.addRecord(tuple2);
batchCount++;
if (batchCount >= flushMaxSize) {
flush();
}
} catch (Exception e) {
throw new RuntimeException("Writing records to JDBC failed.", e);
}
}
之后调用flush()方法,调用UpsertWriter类执行executeBatch方法
public synchronized void flush() throws Exception {
checkFlushException();
for (int i = 1; i <= maxRetryTimes; i++) {
try {
jdbcWriter.executeBatch();
batchCount = 0;
break;
} catch (SQLException e) {
LOG.error("JDBC executeBatch error, retry times = {}", i, e);
if (i >= maxRetryTimes) {
throw e;
}
Thread.sleep(1000 * i);
}
}
}


然后会调用UpsertWriter类
实现JDBCWriter类在executeBatch方法中先判断map是否为空,然后循环map;之后判断2元组第一个元素的true调用内部类处理元素,否则删除数据;但是我每次的数据只有一条就是map的大小是1,且2元组的第一个元素值是true,循环结束执行
deleteStatement.executeBatch();方法就会出错,因为删除的语句站位符还没有填充;


@Override
public void executeBatch() throws SQLException {
if (keyToRows.size() > 0) {
for (Map.Entry<Row, Tuple2<Boolean, Row>> entry : keyToRows.entrySet())
{
Row pk = entry.getKey();
Tuple2<Boolean, Row> tuple = entry.getValue();
if (tuple.f0) {
processOneRowInBatch(pk, tuple.f1);
} else {
setRecordToStatement(deleteStatement, pkTypes, pk);
deleteStatement.addBatch();
}
}
internalExecuteBatch();
deleteStatement.executeBatch();
keyToRows.clear();
}
}



--
Best, Jingsong Lee

Re: 关于RetractStream流写mysql出现java.sql.SQLException: No value specified for parameter 1问题

Posted by Jingsong Li <ji...@gmail.com>.
Hi,

- JDBC是upsert
sink,所以你需要toUpsertStream,而不是toRetractStream,建议你用完整的DDL来插入mysql的表。
- 这个异常看起来是JDBC的bug,你可以建个JIRA来跟踪吗?

Best,
Jingsong Lee

On Wed, Apr 22, 2020 at 9:58 PM 1101300123 <hd...@163.com> wrote:

>
>
> 我在SQL关联后把结果写入mysql出现          No value specified for parameter 1错误?
> 我的版本是1.10.0,代码如下
>  JDBCUpsertTableSink build = JDBCUpsertTableSink.builder()
>  .setTableSchema(results.getSchema())
>                 .setOptions(JDBCOptions.builder()
>
> .setDBUrl("。。。。MultiQueries=true&useUnicode=true&characterEncoding=UTF-8")
>                 .setDriverName("com.mysql.jdbc.Driver")
>                 .setUsername("jczx_cjch")
>                 .setPassword("jczx_cjch2")
>                         .setTableName("xkf_join_result")
>                 .build())
>                 .setFlushIntervalMills(1000)
>                 .setFlushMaxSize(100)
>                 .setMaxRetryTimes(3)
>                 .build();
>
>
>  DataStream<Tuple2<Boolean, Row>> retract =
> bsTableEnv.toRetractStream(results, Row.class);
>         retract.print();
>   build.emitDataStream(retract);
>
>
>
>
> 就会出现如下错误
> java.sql.SQLException: No value specified for parameter 1
> at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:965)
> at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:898)
> at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:887)
> at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:861)
> at
> com.mysql.jdbc.PreparedStatement.checkAllParametersSet(PreparedStatement.java:2211)
> at
> com.mysql.jdbc.PreparedStatement.fillSendPacket(PreparedStatement.java:2191)
> at
> com.mysql.jdbc.PreparedStatement.fillSendPacket(PreparedStatement.java:2121)
> at com.mysql.jdbc.PreparedStatement.execute(PreparedStatement.java:1162)
> at org.apache.flink.api.java.io
> .jdbc.writer.UpsertWriter.executeBatch(UpsertWriter.java:118)
> at org.apache.flink.api.java.io
> .jdbc.JDBCUpsertOutputFormat.flush(JDBCUpsertOutputFormat.java:159)
> at org.apache.flink.api.java.io
> .jdbc.JDBCUpsertSinkFunction.snapshotState(JDBCUpsertSinkFunction.java:56)
> at
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
> at
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:402)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1420)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1354)
>
>
>
>
> 我的输出数据是(true,2020-04-22 21:34:00,2020-04-22
> 21:34:15,20200422213541465568468)是这样的
> 我查看源码发现
> 先调用JDBCUpsertOutputFormat类的writeRecord方法给UpsertWriter类的成员变量map中添加元素
> @Override
>  public synchronized void writeRecord(Tuple2<Boolean, Row> tuple2) throws
> IOException {
>   checkFlushException();
>   try {
>    jdbcWriter.addRecord(tuple2);
>    batchCount++;
>    if (batchCount >= flushMaxSize) {
>     flush();
>    }
>   } catch (Exception e) {
>    throw new RuntimeException("Writing records to JDBC failed.", e);
>   }
>  }
> 之后调用flush()方法,调用UpsertWriter类执行executeBatch方法
> public synchronized void flush() throws Exception {
>   checkFlushException();
>   for (int i = 1; i <= maxRetryTimes; i++) {
>    try {
>     jdbcWriter.executeBatch();
>     batchCount = 0;
>     break;
>    } catch (SQLException e) {
>     LOG.error("JDBC executeBatch error, retry times = {}", i, e);
>     if (i >= maxRetryTimes) {
>      throw e;
>     }
>     Thread.sleep(1000 * i);
>    }
>   }
>  }
>
>
> 然后会调用UpsertWriter类
> 实现JDBCWriter类在executeBatch方法中先判断map是否为空,然后循环map;之后判断2元组第一个元素的true调用内部类处理元素,否则删除数据;但是我每次的数据只有一条就是map的大小是1,且2元组的第一个元素值是true,循环结束执行
> deleteStatement.executeBatch();方法就会出错,因为删除的语句站位符还没有填充;
>
>
> @Override
>  public void executeBatch() throws SQLException {
>   if (keyToRows.size() > 0) {
>    for (Map.Entry<Row, Tuple2<Boolean, Row>> entry : keyToRows.entrySet())
> {
>     Row pk = entry.getKey();
>     Tuple2<Boolean, Row> tuple = entry.getValue();
>     if (tuple.f0) {
>      processOneRowInBatch(pk, tuple.f1);
>     } else {
>      setRecordToStatement(deleteStatement, pkTypes, pk);
>      deleteStatement.addBatch();
>     }
>    }
>    internalExecuteBatch();
>    deleteStatement.executeBatch();
>    keyToRows.clear();
>   }
>  }



-- 
Best, Jingsong Lee

Re: 关于RetractStream流写mysql出现java.sql.SQLException: No value specified for parameter 1问题

Posted by Leonard Xu <xb...@gmail.com>.
Hi,

看起来应该是你之前改代码时引入的小bug,因为从代码路径和测试来看都不能复现这个问题。
另外,如果修改了源代码记得邮件里说明下,不然好南。。。。

祝好,
Leonard Xu

> 在 2020年4月23日,16:26,1101300123 <hd...@163.com> 写道:
> 
> 我重新在源码里打了一些日志编译后,之前的问题不见了,试了好多次没有复现了,之前因为集成clickhouse 改过源码的delete代码,不知道是不是这个引起的
> 在2020年4月23日 16:23,Leonard Xu<xb...@gmail.com> 写道:
> Hi,
> 我本地复现了下,用1.10.0发现的你的sql是ok的,结果也符合预期☺️,如下[1]:
> 看到你建了JIRA,我们在issue里继续跟进吧
> 
> 祝好,
> Leonard Xu
> 
> [1]
> mysql> select * from order_state_cnt;
> +------------+--------------+------+
> | order_date | product_code | cnt  |
> +------------+--------------+------+
> | 2020-04-01 | product1     |    3 |
> | 2020-04-01 | product2     |    5 |
> | 2020-04-01 | product1     |    5 |
> | 2020-04-01 | product2     |    9 |
> +------------+--------------+------+
> 4 rows in set (0.00 sec)
> 
> mysql> select * from order_state_cnt;
> +------------+--------------+------+
> | order_date | product_code | cnt  |
> +------------+--------------+------+
> | 2020-04-01 | product1     |    3 |
> | 2020-04-01 | product2     |    5 |
> | 2020-04-01 | product1     |    5 |
> | 2020-04-01 | product2     |    9 |
> | 2020-04-01 | product1     |    2 |
> | 2020-04-01 | product2     |    4 |
> +------------+--------------+------+
> 6 rows in set (0.00 sec)
> 
> 
> 
> 在 2020年4月23日,10:48,1101300123 <hd...@163.com> 写道:
> 
> 
> 
> 我给你一些数据和代码吧!和我真实场景错误一样
> 订单主表:orders
> 13点两条记录;order_state是状态 0取消 1待支付
> {"order_no":"order1","order_state":1,"pay_time":"","create_time":"2020-04-01 13:00:00","update_time":"2020-04-01 13:00:00"}
> {"order_no":"order2","order_state":1,"pay_time":"","create_time":"2020-04-01 13:00:00","update_time":"2020-04-01 13:00:00"}
> 
> 
> 13:15
> 来了一条新的记录 取消订单
> {"order_no":"order1","order_state":0,"pay_time":"","create_time":"2020-04-01 13:00:00","update_time":"2020-04-01 13:15:00"}
> 
> 
> 订单明细表:order_detail
> 4条记录
> {"order_no":"order1","product_code":"product1","quantity":3,"create_time":"2020-04-01 13:00:00","update_time":"2020-04-01 13:00:00"}
> {"order_no":"order1","product_code":"product2","quantity":5,"create_time":"2020-04-01 13:00:00","update_time":"2020-04-01 13:00:00"}
> {"order_no":"order2","product_code":"product1","quantity":2,"create_time":"2020-04-01 13:00:00","update_time":"2020-04-01 13:00:00"}
> {"order_no":"order2","product_code":"product2","quantity":4,"create_time":"2020-04-01 13:00:00","update_time":"2020-04-01 13:00:00"}
> 
> 
> 需求的要求是当订单创建后我们就要统计该订单对应的商品数量,而当订单状态变为取消时我们要减掉该订单对应的商品数量。
> 
> 
> 代码
> package Learn.kafkasql;
> 
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.java.StreamTableEnvironment;
> import org.apache.flink.types.Row;
> 
> public class SqlCount {
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
> EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> StreamTableEnvironment tenv = StreamTableEnvironment.create(env,settings);
> 
> tenv.sqlUpdate("CREATE TABLE orders " +
> "            (" +
> "               order_no     string," +
> "               order_state  int," +
> "               pay_time     string," +
> "               create_time  string," +
> "               update_time  string" +
> "             ) " +
> "       WITH (" +
> "               'connector.type' = 'kafka',       " +
> "               'connector.version' = 'universal', " +//--kafka版本
> "               'connector.topic' = 'tp_orders'," +//--kafkatopic
> "               'connector.properties.zookeeper.connect' = '192.168.179.120:2181', " +
> "               'connector.properties.bootstrap.servers' = '192.168.179.120:9092'," +
> "               'connector.properties.group.id' = 'testGroup'," +
> "               'connector.startup-mode' = 'latest-offset'," +
> "               'format.type' = 'json'  " +//--数据为json格式
> "             )");
> tenv.sqlUpdate("CREATE TABLE order_detail " +
> "            (" +
> "               order_no     string," +
> "               product_code string," +
> "               quantity     int," +
> "               create_time  string," +
> "               update_time  string" +
> "             ) " +
> "       WITH (" +
> "               'connector.type' = 'kafka', " +
> "               'connector.version' = 'universal',  " +//--kafka版本
> "               'connector.topic' = 'tp_order_detail'," +//--kafkatopic
> "               'connector.properties.zookeeper.connect' = '192.168.179.120:2181', " +
> "               'connector.properties.bootstrap.servers' = '192.168.179.120:9092'," +
> "               'connector.properties.group.id' = 'testGroup'," +
> "               'connector.startup-mode' = 'latest-offset'," +
> "               'format.type' = 'json'  " +//--数据为json格式
> "             )");
> 
> tenv.sqlUpdate("CREATE TABLE product_sale" +
> "             (" +
> "              order_date string," +
> "              product_code string," +
> "              cnt int" +
> "              ) " +
> "         WITH (" +
> "           'connector.type' = 'jdbc', " +
> "           'connector.url' = 'jdbc:mysql://192.168.179.120:3306/flink?serverTimezone=UTC&useSSL=true', " +
> "           'connector.table' = 'order_state_cnt', " +
> "           'connector.driver' = 'com.mysql.jdbc.Driver', " +
> "           'connector.username' = 'root'," +
> "           'connector.password' = '123456'," +
> "           'connector.write.flush.max-rows' = '1'," +//--默认每5000条数据写入一次,测试调小一点
> "           'connector.write.flush.interval' = '2s'," +//--写入时间间隔
> "           'connector.write.max-retries' = '3'" +
> "         )");
> tenv.sqlUpdate("insert into product_sale " +
> "select create_date,product_code,sum(quantity)" +
> "from (select t1.order_no," +
> "             t1.create_date," +
> "             t2.product_code," +
> "             t2.quantity" +
> "       from (select order_no," +
> "                    order_state," +
> "                    substring(create_time,1,10) create_date," +
> "                    update_time ," +
> "                    row_number() over(partition by order_no order by update_time desc) as rn" +
> "              from orders" +
> "              )t1" +
> "       left join order_detail t2" +
> "            on t1.order_no=t2.order_no" +
> "      where t1.rn=1" +//--取最新的订单状态数据
> "      and t1.order_state<>0" +//--不包含取消订单
> "   )t3" +
> " group by create_date,product_code");
> 
> Table table = tenv.sqlQuery("select create_date,product_code,sum(quantity)" +
> "from (select t1.order_no," +
> "             t1.create_date," +
> "             t2.product_code," +
> "             t2.quantity" +
> "       from (select order_no," +
> "                    order_state," +
> "                    substring(create_time,1,10) create_date," +
> "                    update_time ," +
> "                    row_number() over(partition by order_no order by update_time desc) as rn" +
> "              from orders" +
> "              )t1" +
> "       left join order_detail t2" +
> "            on t1.order_no=t2.order_no" +
> "      where t1.rn=1" +
> "      and t1.order_state<>0" +
> "   )t3" +
> " group by create_date,product_code");
> tenv.toRetractStream(table, Row.class).print();
> tenv.execute("count");
> }
> }
> mysql 建表语句
> CREATE TABLE `order_state_cnt` (
> `order_date` varchar(12) ,
> `product_code` varchar(12) ,
> `cnt` int
> ) ENGINE=InnoDB DEFAULT CHARSET=utf8
> 
> 
> 使用的是kafka命令行一条条发送数据的方式
> 
> 
> 主要是deleteStatement.executeBatch();这个方法报错
> @Override
> public void executeBatch() throws SQLException {
> if (keyToRows.size() > 0) {
> for (Map.Entry<Row, Tuple2<Boolean, Row>> entry : keyToRows.entrySet()) {
> Row pk = entry.getKey();
> Tuple2<Boolean, Row> tuple = entry.getValue();
> if (tuple.f0) {
> processOneRowInBatch(pk, tuple.f1);
> } else {
> setRecordToStatement(deleteStatement, pkTypes, pk);
> deleteStatement.addBatch();
> }
> }
> internalExecuteBatch();
> deleteStatement.executeBatch();
> keyToRows.clear();
> }
> }
> 在2020年4月23日 00:21,Leonard Xu<xb...@gmail.com> 写道:
> 赞详细的分析!
> 
> 没能复现你说的问题,最后一步的分析应该有点小问题,我看下了jdbc mysql的实现
> com/mysql/jdbc/PreparedStatement.java#executeBatchInternal() 1289行
> 是会判断batchedArgs数组的大小后会直接返回的,应该不会执行,你可以进一步调试确认下
> ```
> if (this.batchedArgs == null || this.batchedArgs.size() == 0) {
> return new long[0];
> }
> ```
> 
> 祝好,
> Leonard Xu
> 
> 在 2020年4月22日,21:58,1101300123 <hd...@163.com> 写道:
> 
> 
> 
> 我在SQL关联后把结果写入mysql出现          No value specified for parameter 1错误?
> 我的版本是1.10.0,代码如下
> JDBCUpsertTableSink build = JDBCUpsertTableSink.builder()  
> .setTableSchema(results.getSchema())
> .setOptions(JDBCOptions.builder()
> .setDBUrl("。。。。MultiQueries=true&useUnicode=true&characterEncoding=UTF-8")
> .setDriverName("com.mysql.jdbc.Driver")
> .setUsername("jczx_cjch")
> .setPassword("jczx_cjch2")
> .setTableName("xkf_join_result")
> .build())
> .setFlushIntervalMills(1000)
> .setFlushMaxSize(100)
> .setMaxRetryTimes(3)
> .build();
> 
> 
> DataStream<Tuple2<Boolean, Row>> retract = bsTableEnv.toRetractStream(results, Row.class);
> retract.print();
> build.emitDataStream(retract);
> 
> 
> 
> 
> 就会出现如下错误
> java.sql.SQLException: No value specified for parameter 1
> at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:965)
> at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:898)
> at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:887)
> at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:861)
> at com.mysql.jdbc.PreparedStatement.checkAllParametersSet(PreparedStatement.java:2211)
> at com.mysql.jdbc.PreparedStatement.fillSendPacket(PreparedStatement.java:2191)
> at com.mysql.jdbc.PreparedStatement.fillSendPacket(PreparedStatement.java:2121)
> at com.mysql.jdbc.PreparedStatement.execute(PreparedStatement.java:1162)
> at org.apache.flink.api.java.io.jdbc.writer.UpsertWriter.executeBatch(UpsertWriter.java:118)
> at org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.flush(JDBCUpsertOutputFormat.java:159)
> at org.apache.flink.api.java.io.jdbc.JDBCUpsertSinkFunction.snapshotState(JDBCUpsertSinkFunction.java:56)
> at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
> at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:402)
> at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1420)
> at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1354)
> 
> 
> 
> 
> 我的输出数据是(true,2020-04-22 21:34:00,2020-04-22 21:34:15,20200422213541465568468)是这样的
> 我查看源码发现
> 先调用JDBCUpsertOutputFormat类的writeRecord方法给UpsertWriter类的成员变量map中添加元素
> @Override
> public synchronized void writeRecord(Tuple2<Boolean, Row> tuple2) throws IOException {
> checkFlushException();
> try {
> jdbcWriter.addRecord(tuple2);
> batchCount++;
> if (batchCount >= flushMaxSize) {
> flush();
> }
> } catch (Exception e) {
> throw new RuntimeException("Writing records to JDBC failed.", e);
> }
> }
> 之后调用flush()方法,调用UpsertWriter类执行executeBatch方法
> public synchronized void flush() throws Exception {
> checkFlushException();
> for (int i = 1; i <= maxRetryTimes; i++) {
> try {
> jdbcWriter.executeBatch();
> batchCount = 0;
> break;
> } catch (SQLException e) {
> LOG.error("JDBC executeBatch error, retry times = {}", i, e);
> if (i >= maxRetryTimes) {
> throw e;
> }
> Thread.sleep(1000 * i);
> }
> }
> }
> 
> 
> 然后会调用UpsertWriter类 实现JDBCWriter类在executeBatch方法中先判断map是否为空,然后循环map;之后判断2元组第一个元素的true调用内部类处理元素,否则删除数据;但是我每次的数据只有一条就是map的大小是1,且2元组的第一个元素值是true,循环结束执行 deleteStatement.executeBatch();方法就会出错,因为删除的语句站位符还没有填充;
> 
> 
> @Override
> public void executeBatch() throws SQLException {
> if (keyToRows.size() > 0) {
> for (Map.Entry<Row, Tuple2<Boolean, Row>> entry : keyToRows.entrySet()) {
> Row pk = entry.getKey();
> Tuple2<Boolean, Row> tuple = entry.getValue();
> if (tuple.f0) {
> processOneRowInBatch(pk, tuple.f1);
> } else {
> setRecordToStatement(deleteStatement, pkTypes, pk);
> deleteStatement.addBatch();
> }
> }
> internalExecuteBatch();
> deleteStatement.executeBatch();
> keyToRows.clear();
> }
> }


回复: 关于RetractStream流写mysql出现java.sql.SQLException: No value specified for parameter 1问题

Posted by 1101300123 <hd...@163.com>.
我重新在源码里打了一些日志编译后,之前的问题不见了,试了好多次没有复现了,之前因为集成clickhouse 改过源码的delete代码,不知道是不是这个引起的
在2020年4月23日 16:23,Leonard Xu<xb...@gmail.com> 写道:
Hi,
我本地复现了下,用1.10.0发现的你的sql是ok的,结果也符合预期☺️,如下[1]:
看到你建了JIRA,我们在issue里继续跟进吧

祝好,
Leonard Xu

[1]
mysql> select * from order_state_cnt;
+------------+--------------+------+
| order_date | product_code | cnt  |
+------------+--------------+------+
| 2020-04-01 | product1     |    3 |
| 2020-04-01 | product2     |    5 |
| 2020-04-01 | product1     |    5 |
| 2020-04-01 | product2     |    9 |
+------------+--------------+------+
4 rows in set (0.00 sec)

mysql> select * from order_state_cnt;
+------------+--------------+------+
| order_date | product_code | cnt  |
+------------+--------------+------+
| 2020-04-01 | product1     |    3 |
| 2020-04-01 | product2     |    5 |
| 2020-04-01 | product1     |    5 |
| 2020-04-01 | product2     |    9 |
| 2020-04-01 | product1     |    2 |
| 2020-04-01 | product2     |    4 |
+------------+--------------+------+
6 rows in set (0.00 sec)



在 2020年4月23日,10:48,1101300123 <hd...@163.com> 写道:



我给你一些数据和代码吧!和我真实场景错误一样
订单主表:orders
13点两条记录;order_state是状态 0取消 1待支付
{"order_no":"order1","order_state":1,"pay_time":"","create_time":"2020-04-01 13:00:00","update_time":"2020-04-01 13:00:00"}
{"order_no":"order2","order_state":1,"pay_time":"","create_time":"2020-04-01 13:00:00","update_time":"2020-04-01 13:00:00"}


13:15
来了一条新的记录 取消订单
{"order_no":"order1","order_state":0,"pay_time":"","create_time":"2020-04-01 13:00:00","update_time":"2020-04-01 13:15:00"}


订单明细表:order_detail
4条记录
{"order_no":"order1","product_code":"product1","quantity":3,"create_time":"2020-04-01 13:00:00","update_time":"2020-04-01 13:00:00"}
{"order_no":"order1","product_code":"product2","quantity":5,"create_time":"2020-04-01 13:00:00","update_time":"2020-04-01 13:00:00"}
{"order_no":"order2","product_code":"product1","quantity":2,"create_time":"2020-04-01 13:00:00","update_time":"2020-04-01 13:00:00"}
{"order_no":"order2","product_code":"product2","quantity":4,"create_time":"2020-04-01 13:00:00","update_time":"2020-04-01 13:00:00"}


需求的要求是当订单创建后我们就要统计该订单对应的商品数量,而当订单状态变为取消时我们要减掉该订单对应的商品数量。


代码
package Learn.kafkasql;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

public class SqlCount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env,settings);

tenv.sqlUpdate("CREATE TABLE orders " +
"            (" +
"               order_no     string," +
"               order_state  int," +
"               pay_time     string," +
"               create_time  string," +
"               update_time  string" +
"             ) " +
"       WITH (" +
"               'connector.type' = 'kafka',       " +
"               'connector.version' = 'universal', " +//--kafka版本
"               'connector.topic' = 'tp_orders'," +//--kafkatopic
"               'connector.properties.zookeeper.connect' = '192.168.179.120:2181', " +
"               'connector.properties.bootstrap.servers' = '192.168.179.120:9092'," +
"               'connector.properties.group.id' = 'testGroup'," +
"               'connector.startup-mode' = 'latest-offset'," +
"               'format.type' = 'json'  " +//--数据为json格式
"             )");
tenv.sqlUpdate("CREATE TABLE order_detail " +
"            (" +
"               order_no     string," +
"               product_code string," +
"               quantity     int," +
"               create_time  string," +
"               update_time  string" +
"             ) " +
"       WITH (" +
"               'connector.type' = 'kafka', " +
"               'connector.version' = 'universal',  " +//--kafka版本
"               'connector.topic' = 'tp_order_detail'," +//--kafkatopic
"               'connector.properties.zookeeper.connect' = '192.168.179.120:2181', " +
"               'connector.properties.bootstrap.servers' = '192.168.179.120:9092'," +
"               'connector.properties.group.id' = 'testGroup'," +
"               'connector.startup-mode' = 'latest-offset'," +
"               'format.type' = 'json'  " +//--数据为json格式
"             )");

tenv.sqlUpdate("CREATE TABLE product_sale" +
"             (" +
"              order_date string," +
"              product_code string," +
"              cnt int" +
"              ) " +
"         WITH (" +
"           'connector.type' = 'jdbc', " +
"           'connector.url' = 'jdbc:mysql://192.168.179.120:3306/flink?serverTimezone=UTC&useSSL=true', " +
"           'connector.table' = 'order_state_cnt', " +
"           'connector.driver' = 'com.mysql.jdbc.Driver', " +
"           'connector.username' = 'root'," +
"           'connector.password' = '123456'," +
"           'connector.write.flush.max-rows' = '1'," +//--默认每5000条数据写入一次,测试调小一点
"           'connector.write.flush.interval' = '2s'," +//--写入时间间隔
"           'connector.write.max-retries' = '3'" +
"         )");
tenv.sqlUpdate("insert into product_sale " +
"select create_date,product_code,sum(quantity)" +
"from (select t1.order_no," +
"             t1.create_date," +
"             t2.product_code," +
"             t2.quantity" +
"       from (select order_no," +
"                    order_state," +
"                    substring(create_time,1,10) create_date," +
"                    update_time ," +
"                    row_number() over(partition by order_no order by update_time desc) as rn" +
"              from orders" +
"              )t1" +
"       left join order_detail t2" +
"            on t1.order_no=t2.order_no" +
"      where t1.rn=1" +//--取最新的订单状态数据
"      and t1.order_state<>0" +//--不包含取消订单
"   )t3" +
" group by create_date,product_code");

Table table = tenv.sqlQuery("select create_date,product_code,sum(quantity)" +
"from (select t1.order_no," +
"             t1.create_date," +
"             t2.product_code," +
"             t2.quantity" +
"       from (select order_no," +
"                    order_state," +
"                    substring(create_time,1,10) create_date," +
"                    update_time ," +
"                    row_number() over(partition by order_no order by update_time desc) as rn" +
"              from orders" +
"              )t1" +
"       left join order_detail t2" +
"            on t1.order_no=t2.order_no" +
"      where t1.rn=1" +
"      and t1.order_state<>0" +
"   )t3" +
" group by create_date,product_code");
tenv.toRetractStream(table, Row.class).print();
tenv.execute("count");
}
}
mysql 建表语句
CREATE TABLE `order_state_cnt` (
`order_date` varchar(12) ,
`product_code` varchar(12) ,
`cnt` int
) ENGINE=InnoDB DEFAULT CHARSET=utf8


使用的是kafka命令行一条条发送数据的方式


主要是deleteStatement.executeBatch();这个方法报错
@Override
public void executeBatch() throws SQLException {
if (keyToRows.size() > 0) {
for (Map.Entry<Row, Tuple2<Boolean, Row>> entry : keyToRows.entrySet()) {
Row pk = entry.getKey();
Tuple2<Boolean, Row> tuple = entry.getValue();
if (tuple.f0) {
processOneRowInBatch(pk, tuple.f1);
} else {
setRecordToStatement(deleteStatement, pkTypes, pk);
deleteStatement.addBatch();
}
}
internalExecuteBatch();
deleteStatement.executeBatch();
keyToRows.clear();
}
}
在2020年4月23日 00:21,Leonard Xu<xb...@gmail.com> 写道:
赞详细的分析!

没能复现你说的问题,最后一步的分析应该有点小问题,我看下了jdbc mysql的实现
com/mysql/jdbc/PreparedStatement.java#executeBatchInternal() 1289行
是会判断batchedArgs数组的大小后会直接返回的,应该不会执行,你可以进一步调试确认下
```
if (this.batchedArgs == null || this.batchedArgs.size() == 0) {
return new long[0];
}
```

祝好,
Leonard Xu

在 2020年4月22日,21:58,1101300123 <hd...@163.com> 写道:



我在SQL关联后把结果写入mysql出现          No value specified for parameter 1错误?
我的版本是1.10.0,代码如下
JDBCUpsertTableSink build = JDBCUpsertTableSink.builder()  
.setTableSchema(results.getSchema())
.setOptions(JDBCOptions.builder()
.setDBUrl("。。。。MultiQueries=true&useUnicode=true&characterEncoding=UTF-8")
.setDriverName("com.mysql.jdbc.Driver")
.setUsername("jczx_cjch")
.setPassword("jczx_cjch2")
.setTableName("xkf_join_result")
.build())
.setFlushIntervalMills(1000)
.setFlushMaxSize(100)
.setMaxRetryTimes(3)
.build();


DataStream<Tuple2<Boolean, Row>> retract = bsTableEnv.toRetractStream(results, Row.class);
retract.print();
build.emitDataStream(retract);




就会出现如下错误
java.sql.SQLException: No value specified for parameter 1
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:965)
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:898)
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:887)
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:861)
at com.mysql.jdbc.PreparedStatement.checkAllParametersSet(PreparedStatement.java:2211)
at com.mysql.jdbc.PreparedStatement.fillSendPacket(PreparedStatement.java:2191)
at com.mysql.jdbc.PreparedStatement.fillSendPacket(PreparedStatement.java:2121)
at com.mysql.jdbc.PreparedStatement.execute(PreparedStatement.java:1162)
at org.apache.flink.api.java.io.jdbc.writer.UpsertWriter.executeBatch(UpsertWriter.java:118)
at org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.flush(JDBCUpsertOutputFormat.java:159)
at org.apache.flink.api.java.io.jdbc.JDBCUpsertSinkFunction.snapshotState(JDBCUpsertSinkFunction.java:56)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:402)
at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1420)
at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1354)




我的输出数据是(true,2020-04-22 21:34:00,2020-04-22 21:34:15,20200422213541465568468)是这样的
我查看源码发现
先调用JDBCUpsertOutputFormat类的writeRecord方法给UpsertWriter类的成员变量map中添加元素
@Override
public synchronized void writeRecord(Tuple2<Boolean, Row> tuple2) throws IOException {
checkFlushException();
try {
jdbcWriter.addRecord(tuple2);
batchCount++;
if (batchCount >= flushMaxSize) {
flush();
}
} catch (Exception e) {
throw new RuntimeException("Writing records to JDBC failed.", e);
}
}
之后调用flush()方法,调用UpsertWriter类执行executeBatch方法
public synchronized void flush() throws Exception {
checkFlushException();
for (int i = 1; i <= maxRetryTimes; i++) {
try {
jdbcWriter.executeBatch();
batchCount = 0;
break;
} catch (SQLException e) {
LOG.error("JDBC executeBatch error, retry times = {}", i, e);
if (i >= maxRetryTimes) {
throw e;
}
Thread.sleep(1000 * i);
}
}
}


然后会调用UpsertWriter类 实现JDBCWriter类在executeBatch方法中先判断map是否为空,然后循环map;之后判断2元组第一个元素的true调用内部类处理元素,否则删除数据;但是我每次的数据只有一条就是map的大小是1,且2元组的第一个元素值是true,循环结束执行 deleteStatement.executeBatch();方法就会出错,因为删除的语句站位符还没有填充;


@Override
public void executeBatch() throws SQLException {
if (keyToRows.size() > 0) {
for (Map.Entry<Row, Tuple2<Boolean, Row>> entry : keyToRows.entrySet()) {
Row pk = entry.getKey();
Tuple2<Boolean, Row> tuple = entry.getValue();
if (tuple.f0) {
processOneRowInBatch(pk, tuple.f1);
} else {
setRecordToStatement(deleteStatement, pkTypes, pk);
deleteStatement.addBatch();
}
}
internalExecuteBatch();
deleteStatement.executeBatch();
keyToRows.clear();
}
}

Re: 关于RetractStream流写mysql出现java.sql.SQLException: No value specified for parameter 1问题

Posted by Leonard Xu <xb...@gmail.com>.
Hi,
我本地复现了下,用1.10.0发现的你的sql是ok的,结果也符合预期☺️,如下[1]:
看到你建了JIRA,我们在issue里继续跟进吧

祝好,
Leonard Xu

[1]
mysql> select * from order_state_cnt;
+------------+--------------+------+
| order_date | product_code | cnt  |
+------------+--------------+------+
| 2020-04-01 | product1     |    3 |
| 2020-04-01 | product2     |    5 |
| 2020-04-01 | product1     |    5 |
| 2020-04-01 | product2     |    9 |
+------------+--------------+------+
4 rows in set (0.00 sec)

mysql> select * from order_state_cnt;
+------------+--------------+------+
| order_date | product_code | cnt  |
+------------+--------------+------+
| 2020-04-01 | product1     |    3 |
| 2020-04-01 | product2     |    5 |
| 2020-04-01 | product1     |    5 |
| 2020-04-01 | product2     |    9 |
| 2020-04-01 | product1     |    2 |
| 2020-04-01 | product2     |    4 |
+------------+--------------+------+
6 rows in set (0.00 sec)



> 在 2020年4月23日,10:48,1101300123 <hd...@163.com> 写道:
> 
> 
> 
> 我给你一些数据和代码吧!和我真实场景错误一样
> 订单主表:orders
> 13点两条记录;order_state是状态 0取消 1待支付
> {"order_no":"order1","order_state":1,"pay_time":"","create_time":"2020-04-01 13:00:00","update_time":"2020-04-01 13:00:00"}
> {"order_no":"order2","order_state":1,"pay_time":"","create_time":"2020-04-01 13:00:00","update_time":"2020-04-01 13:00:00"}
> 
> 
> 13:15
> 来了一条新的记录 取消订单
> {"order_no":"order1","order_state":0,"pay_time":"","create_time":"2020-04-01 13:00:00","update_time":"2020-04-01 13:15:00"}
> 
> 
> 订单明细表:order_detail
> 4条记录
> {"order_no":"order1","product_code":"product1","quantity":3,"create_time":"2020-04-01 13:00:00","update_time":"2020-04-01 13:00:00"}
> {"order_no":"order1","product_code":"product2","quantity":5,"create_time":"2020-04-01 13:00:00","update_time":"2020-04-01 13:00:00"}
> {"order_no":"order2","product_code":"product1","quantity":2,"create_time":"2020-04-01 13:00:00","update_time":"2020-04-01 13:00:00"}
> {"order_no":"order2","product_code":"product2","quantity":4,"create_time":"2020-04-01 13:00:00","update_time":"2020-04-01 13:00:00"}
> 
> 
> 需求的要求是当订单创建后我们就要统计该订单对应的商品数量,而当订单状态变为取消时我们要减掉该订单对应的商品数量。
> 
> 
> 代码
> package Learn.kafkasql;
> 
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.java.StreamTableEnvironment;
> import org.apache.flink.types.Row;
> 
> public class SqlCount {
> public static void main(String[] args) throws Exception {
>        StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
> EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> StreamTableEnvironment tenv = StreamTableEnvironment.create(env,settings);
> 
> tenv.sqlUpdate("CREATE TABLE orders " +
> "            (" +
> "               order_no     string," +
> "               order_state  int," +
> "               pay_time     string," +
> "               create_time  string," +
> "               update_time  string" +
> "             ) " +
> "       WITH (" +
> "               'connector.type' = 'kafka',       " +
> "               'connector.version' = 'universal', " +//--kafka版本
> "               'connector.topic' = 'tp_orders'," +//--kafkatopic
> "               'connector.properties.zookeeper.connect' = '192.168.179.120:2181', " +
> "               'connector.properties.bootstrap.servers' = '192.168.179.120:9092'," +
> "               'connector.properties.group.id' = 'testGroup'," +
> "               'connector.startup-mode' = 'latest-offset'," +
> "               'format.type' = 'json'  " +//--数据为json格式
> "             )");
> tenv.sqlUpdate("CREATE TABLE order_detail " +
> "            (" +
> "               order_no     string," +
> "               product_code string," +
> "               quantity     int," +
> "               create_time  string," +
> "               update_time  string" +
> "             ) " +
> "       WITH (" +
> "               'connector.type' = 'kafka', " +
> "               'connector.version' = 'universal',  " +//--kafka版本
> "               'connector.topic' = 'tp_order_detail'," +//--kafkatopic
> "               'connector.properties.zookeeper.connect' = '192.168.179.120:2181', " +
> "               'connector.properties.bootstrap.servers' = '192.168.179.120:9092'," +
> "               'connector.properties.group.id' = 'testGroup'," +
> "               'connector.startup-mode' = 'latest-offset'," +
> "               'format.type' = 'json'  " +//--数据为json格式
> "             )");
> 
> tenv.sqlUpdate("CREATE TABLE product_sale" +
> "             (" +
> "              order_date string," +
> "              product_code string," +
> "              cnt int" +
> "              ) " +
> "         WITH (" +
> "           'connector.type' = 'jdbc', " +
> "           'connector.url' = 'jdbc:mysql://192.168.179.120:3306/flink?serverTimezone=UTC&useSSL=true', " +
> "           'connector.table' = 'order_state_cnt', " +
> "           'connector.driver' = 'com.mysql.jdbc.Driver', " +
> "           'connector.username' = 'root'," +
> "           'connector.password' = '123456'," +
> "           'connector.write.flush.max-rows' = '1'," +//--默认每5000条数据写入一次,测试调小一点
> "           'connector.write.flush.interval' = '2s'," +//--写入时间间隔
> "           'connector.write.max-retries' = '3'" +
> "         )");
> tenv.sqlUpdate("insert into product_sale " +
> "select create_date,product_code,sum(quantity)" +
> "from (select t1.order_no," +
> "             t1.create_date," +
> "             t2.product_code," +
> "             t2.quantity" +
> "       from (select order_no," +
> "                    order_state," +
> "                    substring(create_time,1,10) create_date," +
> "                    update_time ," +
> "                    row_number() over(partition by order_no order by update_time desc) as rn" +
> "              from orders" +
> "              )t1" +
> "       left join order_detail t2" +
> "            on t1.order_no=t2.order_no" +
> "      where t1.rn=1" +//--取最新的订单状态数据
> "      and t1.order_state<>0" +//--不包含取消订单
> "   )t3" +
> " group by create_date,product_code");
> 
> Table table = tenv.sqlQuery("select create_date,product_code,sum(quantity)" +
> "from (select t1.order_no," +
> "             t1.create_date," +
> "             t2.product_code," +
> "             t2.quantity" +
> "       from (select order_no," +
> "                    order_state," +
> "                    substring(create_time,1,10) create_date," +
> "                    update_time ," +
> "                    row_number() over(partition by order_no order by update_time desc) as rn" +
> "              from orders" +
> "              )t1" +
> "       left join order_detail t2" +
> "            on t1.order_no=t2.order_no" +
> "      where t1.rn=1" +
> "      and t1.order_state<>0" +
> "   )t3" +
> " group by create_date,product_code");
> tenv.toRetractStream(table, Row.class).print();
> tenv.execute("count");
> }
> }
> mysql 建表语句
> CREATE TABLE `order_state_cnt` (
>  `order_date` varchar(12) ,
>  `product_code` varchar(12) ,
>  `cnt` int 
> ) ENGINE=InnoDB DEFAULT CHARSET=utf8
> 
> 
> 使用的是kafka命令行一条条发送数据的方式
> 
> 
> 主要是deleteStatement.executeBatch();这个方法报错
> @Override
> public void executeBatch() throws SQLException {
> if (keyToRows.size() > 0) {
> for (Map.Entry<Row, Tuple2<Boolean, Row>> entry : keyToRows.entrySet()) {
> Row pk = entry.getKey();
> Tuple2<Boolean, Row> tuple = entry.getValue();
> if (tuple.f0) {
> processOneRowInBatch(pk, tuple.f1);
> } else {
> setRecordToStatement(deleteStatement, pkTypes, pk);
> deleteStatement.addBatch();
> }
> }
> internalExecuteBatch();
> deleteStatement.executeBatch();
> keyToRows.clear();
> }
> }
> 在2020年4月23日 00:21,Leonard Xu<xb...@gmail.com> 写道:
> 赞详细的分析!
> 
> 没能复现你说的问题,最后一步的分析应该有点小问题,我看下了jdbc mysql的实现
> com/mysql/jdbc/PreparedStatement.java#executeBatchInternal() 1289行
> 是会判断batchedArgs数组的大小后会直接返回的,应该不会执行,你可以进一步调试确认下
> ```
> if (this.batchedArgs == null || this.batchedArgs.size() == 0) {
> return new long[0];
> }
> ```
> 
> 祝好,
> Leonard Xu
> 
> 在 2020年4月22日,21:58,1101300123 <hd...@163.com> 写道:
> 
> 
> 
> 我在SQL关联后把结果写入mysql出现          No value specified for parameter 1错误?
> 我的版本是1.10.0,代码如下
> JDBCUpsertTableSink build = JDBCUpsertTableSink.builder()  
> .setTableSchema(results.getSchema())
> .setOptions(JDBCOptions.builder()
> .setDBUrl("。。。。MultiQueries=true&useUnicode=true&characterEncoding=UTF-8")
> .setDriverName("com.mysql.jdbc.Driver")
> .setUsername("jczx_cjch")
> .setPassword("jczx_cjch2")
> .setTableName("xkf_join_result")
> .build())
> .setFlushIntervalMills(1000)
> .setFlushMaxSize(100)
> .setMaxRetryTimes(3)
> .build();
> 
> 
> DataStream<Tuple2<Boolean, Row>> retract = bsTableEnv.toRetractStream(results, Row.class);
> retract.print();
> build.emitDataStream(retract);
> 
> 
> 
> 
> 就会出现如下错误
> java.sql.SQLException: No value specified for parameter 1
> at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:965)
> at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:898)
> at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:887)
> at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:861)
> at com.mysql.jdbc.PreparedStatement.checkAllParametersSet(PreparedStatement.java:2211)
> at com.mysql.jdbc.PreparedStatement.fillSendPacket(PreparedStatement.java:2191)
> at com.mysql.jdbc.PreparedStatement.fillSendPacket(PreparedStatement.java:2121)
> at com.mysql.jdbc.PreparedStatement.execute(PreparedStatement.java:1162)
> at org.apache.flink.api.java.io.jdbc.writer.UpsertWriter.executeBatch(UpsertWriter.java:118)
> at org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.flush(JDBCUpsertOutputFormat.java:159)
> at org.apache.flink.api.java.io.jdbc.JDBCUpsertSinkFunction.snapshotState(JDBCUpsertSinkFunction.java:56)
> at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
> at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:402)
> at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1420)
> at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1354)
> 
> 
> 
> 
> 我的输出数据是(true,2020-04-22 21:34:00,2020-04-22 21:34:15,20200422213541465568468)是这样的
> 我查看源码发现
> 先调用JDBCUpsertOutputFormat类的writeRecord方法给UpsertWriter类的成员变量map中添加元素
> @Override
> public synchronized void writeRecord(Tuple2<Boolean, Row> tuple2) throws IOException {
> checkFlushException();
> try {
> jdbcWriter.addRecord(tuple2);
> batchCount++;
> if (batchCount >= flushMaxSize) {
> flush();
> }
> } catch (Exception e) {
> throw new RuntimeException("Writing records to JDBC failed.", e);
> }
> }
> 之后调用flush()方法,调用UpsertWriter类执行executeBatch方法
> public synchronized void flush() throws Exception {
> checkFlushException();
> for (int i = 1; i <= maxRetryTimes; i++) {
> try {
> jdbcWriter.executeBatch();
> batchCount = 0;
> break;
> } catch (SQLException e) {
> LOG.error("JDBC executeBatch error, retry times = {}", i, e);
> if (i >= maxRetryTimes) {
> throw e;
> }
> Thread.sleep(1000 * i);
> }
> }
> }
> 
> 
> 然后会调用UpsertWriter类 实现JDBCWriter类在executeBatch方法中先判断map是否为空,然后循环map;之后判断2元组第一个元素的true调用内部类处理元素,否则删除数据;但是我每次的数据只有一条就是map的大小是1,且2元组的第一个元素值是true,循环结束执行 deleteStatement.executeBatch();方法就会出错,因为删除的语句站位符还没有填充;
> 
> 
> @Override
> public void executeBatch() throws SQLException {
> if (keyToRows.size() > 0) {
> for (Map.Entry<Row, Tuple2<Boolean, Row>> entry : keyToRows.entrySet()) {
> Row pk = entry.getKey();
> Tuple2<Boolean, Row> tuple = entry.getValue();
> if (tuple.f0) {
> processOneRowInBatch(pk, tuple.f1);
> } else {
> setRecordToStatement(deleteStatement, pkTypes, pk);
> deleteStatement.addBatch();
> }
> }
> internalExecuteBatch();
> deleteStatement.executeBatch();
> keyToRows.clear();
> }
> }


回复: 关于RetractStream流写mysql出现java.sql.SQLException: No value specified for parameter 1问题

Posted by 1101300123 <hd...@163.com>.

我给你一些数据和代码吧!和我真实场景错误一样
订单主表:orders
13点两条记录;order_state是状态 0取消 1待支付
{"order_no":"order1","order_state":1,"pay_time":"","create_time":"2020-04-01 13:00:00","update_time":"2020-04-01 13:00:00"}
{"order_no":"order2","order_state":1,"pay_time":"","create_time":"2020-04-01 13:00:00","update_time":"2020-04-01 13:00:00"}


13:15
来了一条新的记录 取消订单
{"order_no":"order1","order_state":0,"pay_time":"","create_time":"2020-04-01 13:00:00","update_time":"2020-04-01 13:15:00"}


订单明细表:order_detail
4条记录
{"order_no":"order1","product_code":"product1","quantity":3,"create_time":"2020-04-01 13:00:00","update_time":"2020-04-01 13:00:00"}
{"order_no":"order1","product_code":"product2","quantity":5,"create_time":"2020-04-01 13:00:00","update_time":"2020-04-01 13:00:00"}
{"order_no":"order2","product_code":"product1","quantity":2,"create_time":"2020-04-01 13:00:00","update_time":"2020-04-01 13:00:00"}
{"order_no":"order2","product_code":"product2","quantity":4,"create_time":"2020-04-01 13:00:00","update_time":"2020-04-01 13:00:00"}


需求的要求是当订单创建后我们就要统计该订单对应的商品数量,而当订单状态变为取消时我们要减掉该订单对应的商品数量。


代码
package Learn.kafkasql;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

public class SqlCount {
public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env,settings);

tenv.sqlUpdate("CREATE TABLE orders " +
"            (" +
"               order_no     string," +
"               order_state  int," +
"               pay_time     string," +
"               create_time  string," +
"               update_time  string" +
"             ) " +
"       WITH (" +
"               'connector.type' = 'kafka',       " +
"               'connector.version' = 'universal', " +//--kafka版本
"               'connector.topic' = 'tp_orders'," +//--kafkatopic
"               'connector.properties.zookeeper.connect' = '192.168.179.120:2181', " +
"               'connector.properties.bootstrap.servers' = '192.168.179.120:9092'," +
"               'connector.properties.group.id' = 'testGroup'," +
"               'connector.startup-mode' = 'latest-offset'," +
"               'format.type' = 'json'  " +//--数据为json格式
"             )");
tenv.sqlUpdate("CREATE TABLE order_detail " +
"            (" +
"               order_no     string," +
"               product_code string," +
"               quantity     int," +
"               create_time  string," +
"               update_time  string" +
"             ) " +
"       WITH (" +
"               'connector.type' = 'kafka', " +
"               'connector.version' = 'universal',  " +//--kafka版本
"               'connector.topic' = 'tp_order_detail'," +//--kafkatopic
"               'connector.properties.zookeeper.connect' = '192.168.179.120:2181', " +
"               'connector.properties.bootstrap.servers' = '192.168.179.120:9092'," +
"               'connector.properties.group.id' = 'testGroup'," +
"               'connector.startup-mode' = 'latest-offset'," +
"               'format.type' = 'json'  " +//--数据为json格式
"             )");

tenv.sqlUpdate("CREATE TABLE product_sale" +
"             (" +
"              order_date string," +
"              product_code string," +
"              cnt int" +
"              ) " +
"         WITH (" +
"           'connector.type' = 'jdbc', " +
"           'connector.url' = 'jdbc:mysql://192.168.179.120:3306/flink?serverTimezone=UTC&useSSL=true', " +
"           'connector.table' = 'order_state_cnt', " +
"           'connector.driver' = 'com.mysql.jdbc.Driver', " +
"           'connector.username' = 'root'," +
"           'connector.password' = '123456'," +
"           'connector.write.flush.max-rows' = '1'," +//--默认每5000条数据写入一次,测试调小一点
"           'connector.write.flush.interval' = '2s'," +//--写入时间间隔
"           'connector.write.max-retries' = '3'" +
"         )");
tenv.sqlUpdate("insert into product_sale " +
"select create_date,product_code,sum(quantity)" +
"from (select t1.order_no," +
"             t1.create_date," +
"             t2.product_code," +
"             t2.quantity" +
"       from (select order_no," +
"                    order_state," +
"                    substring(create_time,1,10) create_date," +
"                    update_time ," +
"                    row_number() over(partition by order_no order by update_time desc) as rn" +
"              from orders" +
"              )t1" +
"       left join order_detail t2" +
"            on t1.order_no=t2.order_no" +
"      where t1.rn=1" +//--取最新的订单状态数据
"      and t1.order_state<>0" +//--不包含取消订单
"   )t3" +
" group by create_date,product_code");

Table table = tenv.sqlQuery("select create_date,product_code,sum(quantity)" +
"from (select t1.order_no," +
"             t1.create_date," +
"             t2.product_code," +
"             t2.quantity" +
"       from (select order_no," +
"                    order_state," +
"                    substring(create_time,1,10) create_date," +
"                    update_time ," +
"                    row_number() over(partition by order_no order by update_time desc) as rn" +
"              from orders" +
"              )t1" +
"       left join order_detail t2" +
"            on t1.order_no=t2.order_no" +
"      where t1.rn=1" +
"      and t1.order_state<>0" +
"   )t3" +
" group by create_date,product_code");
tenv.toRetractStream(table, Row.class).print();
tenv.execute("count");
}
}
mysql 建表语句
CREATE TABLE `order_state_cnt` (
  `order_date` varchar(12) ,
  `product_code` varchar(12) ,
  `cnt` int 
) ENGINE=InnoDB DEFAULT CHARSET=utf8


使用的是kafka命令行一条条发送数据的方式


主要是deleteStatement.executeBatch();这个方法报错
@Override
public void executeBatch() throws SQLException {
if (keyToRows.size() > 0) {
for (Map.Entry<Row, Tuple2<Boolean, Row>> entry : keyToRows.entrySet()) {
Row pk = entry.getKey();
Tuple2<Boolean, Row> tuple = entry.getValue();
if (tuple.f0) {
processOneRowInBatch(pk, tuple.f1);
} else {
setRecordToStatement(deleteStatement, pkTypes, pk);
deleteStatement.addBatch();
}
}
internalExecuteBatch();
deleteStatement.executeBatch();
keyToRows.clear();
}
}
在2020年4月23日 00:21,Leonard Xu<xb...@gmail.com> 写道:
赞详细的分析!

没能复现你说的问题,最后一步的分析应该有点小问题,我看下了jdbc mysql的实现
com/mysql/jdbc/PreparedStatement.java#executeBatchInternal() 1289行
是会判断batchedArgs数组的大小后会直接返回的,应该不会执行,你可以进一步调试确认下
```
if (this.batchedArgs == null || this.batchedArgs.size() == 0) {
return new long[0];
}
```

祝好,
Leonard Xu

在 2020年4月22日,21:58,1101300123 <hd...@163.com> 写道:



我在SQL关联后把结果写入mysql出现          No value specified for parameter 1错误?
我的版本是1.10.0,代码如下
JDBCUpsertTableSink build = JDBCUpsertTableSink.builder()  
.setTableSchema(results.getSchema())
.setOptions(JDBCOptions.builder()
.setDBUrl("。。。。MultiQueries=true&useUnicode=true&characterEncoding=UTF-8")
.setDriverName("com.mysql.jdbc.Driver")
.setUsername("jczx_cjch")
.setPassword("jczx_cjch2")
.setTableName("xkf_join_result")
.build())
.setFlushIntervalMills(1000)
.setFlushMaxSize(100)
.setMaxRetryTimes(3)
.build();


DataStream<Tuple2<Boolean, Row>> retract = bsTableEnv.toRetractStream(results, Row.class);
retract.print();
build.emitDataStream(retract);




就会出现如下错误
java.sql.SQLException: No value specified for parameter 1
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:965)
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:898)
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:887)
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:861)
at com.mysql.jdbc.PreparedStatement.checkAllParametersSet(PreparedStatement.java:2211)
at com.mysql.jdbc.PreparedStatement.fillSendPacket(PreparedStatement.java:2191)
at com.mysql.jdbc.PreparedStatement.fillSendPacket(PreparedStatement.java:2121)
at com.mysql.jdbc.PreparedStatement.execute(PreparedStatement.java:1162)
at org.apache.flink.api.java.io.jdbc.writer.UpsertWriter.executeBatch(UpsertWriter.java:118)
at org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.flush(JDBCUpsertOutputFormat.java:159)
at org.apache.flink.api.java.io.jdbc.JDBCUpsertSinkFunction.snapshotState(JDBCUpsertSinkFunction.java:56)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:402)
at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1420)
at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1354)




我的输出数据是(true,2020-04-22 21:34:00,2020-04-22 21:34:15,20200422213541465568468)是这样的
我查看源码发现
先调用JDBCUpsertOutputFormat类的writeRecord方法给UpsertWriter类的成员变量map中添加元素
@Override
public synchronized void writeRecord(Tuple2<Boolean, Row> tuple2) throws IOException {
checkFlushException();
try {
jdbcWriter.addRecord(tuple2);
batchCount++;
if (batchCount >= flushMaxSize) {
flush();
}
} catch (Exception e) {
throw new RuntimeException("Writing records to JDBC failed.", e);
}
}
之后调用flush()方法,调用UpsertWriter类执行executeBatch方法
public synchronized void flush() throws Exception {
checkFlushException();
for (int i = 1; i <= maxRetryTimes; i++) {
try {
jdbcWriter.executeBatch();
batchCount = 0;
break;
} catch (SQLException e) {
LOG.error("JDBC executeBatch error, retry times = {}", i, e);
if (i >= maxRetryTimes) {
throw e;
}
Thread.sleep(1000 * i);
}
}
}


然后会调用UpsertWriter类 实现JDBCWriter类在executeBatch方法中先判断map是否为空,然后循环map;之后判断2元组第一个元素的true调用内部类处理元素,否则删除数据;但是我每次的数据只有一条就是map的大小是1,且2元组的第一个元素值是true,循环结束执行 deleteStatement.executeBatch();方法就会出错,因为删除的语句站位符还没有填充;


@Override
public void executeBatch() throws SQLException {
if (keyToRows.size() > 0) {
for (Map.Entry<Row, Tuple2<Boolean, Row>> entry : keyToRows.entrySet()) {
Row pk = entry.getKey();
Tuple2<Boolean, Row> tuple = entry.getValue();
if (tuple.f0) {
processOneRowInBatch(pk, tuple.f1);
} else {
setRecordToStatement(deleteStatement, pkTypes, pk);
deleteStatement.addBatch();
}
}
internalExecuteBatch();
deleteStatement.executeBatch();
keyToRows.clear();
}
}

Re: 关于RetractStream流写mysql出现java.sql.SQLException: No value specified for parameter 1问题

Posted by Leonard Xu <xb...@gmail.com>.
赞详细的分析!

没能复现你说的问题,最后一步的分析应该有点小问题,我看下了jdbc mysql的实现
com/mysql/jdbc/PreparedStatement.java#executeBatchInternal() 1289行
是会判断batchedArgs数组的大小后会直接返回的,应该不会执行,你可以进一步调试确认下
```
  if (this.batchedArgs == null || this.batchedArgs.size() == 0) {
              return new long[0];
          }
``` 

祝好,
Leonard Xu

> 在 2020年4月22日,21:58,1101300123 <hd...@163.com> 写道:
> 
> 
> 
> 我在SQL关联后把结果写入mysql出现          No value specified for parameter 1错误?
> 我的版本是1.10.0,代码如下
> JDBCUpsertTableSink build = JDBCUpsertTableSink.builder()  
> .setTableSchema(results.getSchema())
>                .setOptions(JDBCOptions.builder()
>                .setDBUrl("。。。。MultiQueries=true&useUnicode=true&characterEncoding=UTF-8")
>                .setDriverName("com.mysql.jdbc.Driver")
>                .setUsername("jczx_cjch")
>                .setPassword("jczx_cjch2")
>                        .setTableName("xkf_join_result")
>                .build())
>                .setFlushIntervalMills(1000)
>                .setFlushMaxSize(100)
>                .setMaxRetryTimes(3)
>                .build();
> 
> 
> DataStream<Tuple2<Boolean, Row>> retract = bsTableEnv.toRetractStream(results, Row.class);
>        retract.print();
>  build.emitDataStream(retract);
> 
> 
> 
> 
> 就会出现如下错误
> java.sql.SQLException: No value specified for parameter 1
> at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:965)
> at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:898)
> at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:887)
> at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:861)
> at com.mysql.jdbc.PreparedStatement.checkAllParametersSet(PreparedStatement.java:2211)
> at com.mysql.jdbc.PreparedStatement.fillSendPacket(PreparedStatement.java:2191)
> at com.mysql.jdbc.PreparedStatement.fillSendPacket(PreparedStatement.java:2121)
> at com.mysql.jdbc.PreparedStatement.execute(PreparedStatement.java:1162)
> at org.apache.flink.api.java.io.jdbc.writer.UpsertWriter.executeBatch(UpsertWriter.java:118)
> at org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.flush(JDBCUpsertOutputFormat.java:159)
> at org.apache.flink.api.java.io.jdbc.JDBCUpsertSinkFunction.snapshotState(JDBCUpsertSinkFunction.java:56)
> at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
> at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:402)
> at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1420)
> at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1354)
> 
> 
> 
> 
> 我的输出数据是(true,2020-04-22 21:34:00,2020-04-22 21:34:15,20200422213541465568468)是这样的
> 我查看源码发现
> 先调用JDBCUpsertOutputFormat类的writeRecord方法给UpsertWriter类的成员变量map中添加元素
> @Override
> public synchronized void writeRecord(Tuple2<Boolean, Row> tuple2) throws IOException {
>  checkFlushException();
>  try {
>   jdbcWriter.addRecord(tuple2);
>   batchCount++;
>   if (batchCount >= flushMaxSize) {
>    flush();
>   }
>  } catch (Exception e) {
>   throw new RuntimeException("Writing records to JDBC failed.", e);
>  }
> }
> 之后调用flush()方法,调用UpsertWriter类执行executeBatch方法
> public synchronized void flush() throws Exception {
>  checkFlushException();
>  for (int i = 1; i <= maxRetryTimes; i++) {
>   try {
>    jdbcWriter.executeBatch();
>    batchCount = 0;
>    break;
>   } catch (SQLException e) {
>    LOG.error("JDBC executeBatch error, retry times = {}", i, e);
>    if (i >= maxRetryTimes) {
>     throw e;
>    }
>    Thread.sleep(1000 * i);
>   }
>  }
> }
> 
> 
> 然后会调用UpsertWriter类 实现JDBCWriter类在executeBatch方法中先判断map是否为空,然后循环map;之后判断2元组第一个元素的true调用内部类处理元素,否则删除数据;但是我每次的数据只有一条就是map的大小是1,且2元组的第一个元素值是true,循环结束执行 deleteStatement.executeBatch();方法就会出错,因为删除的语句站位符还没有填充;
> 
> 
> @Override
> public void executeBatch() throws SQLException {
>  if (keyToRows.size() > 0) {
>   for (Map.Entry<Row, Tuple2<Boolean, Row>> entry : keyToRows.entrySet()) {
>    Row pk = entry.getKey();
>    Tuple2<Boolean, Row> tuple = entry.getValue();
>    if (tuple.f0) {
>     processOneRowInBatch(pk, tuple.f1);
>    } else {
>     setRecordToStatement(deleteStatement, pkTypes, pk);
>     deleteStatement.addBatch();
>    }
>   }
>   internalExecuteBatch();
>   deleteStatement.executeBatch();
>   keyToRows.clear();
>  }
> }