You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 卢伟楠 <gl...@gmail.com> on 2019/12/25 04:17:41 UTC

实现一个两阶段提交的ETL,数据从kafka到mysql,遇到的问题

项目简述:从kafka取数据,每10秒一批,sink到mysql中的ETL


环境相关信息
flink运行模式:local
mysql的global variables中wait_timeout=28800
mysql客户端mysql-connector-java版本5.1.42



报错
org.apache.flink.streaming.runtime.tasks.TimerException: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
	at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:284) ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[na:1.8.0_191]
	at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0_191]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) ~[na:1.8.0_191]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) ~[na:1.8.0_191]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[na:1.8.0_191]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[na:1.8.0_191]
	at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_191]
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654) ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0]
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612) ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0]
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592) ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0]
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727) ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0]
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705) ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0]
	at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0]
	at com.feiyu.help.AWF.apply(AWF.java:23) ~[classes/:na]
	at com.feiyu.help.AWF.apply(AWF.java:14) ~[classes/:na]
	at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableAllWindowFunction.process(InternalIterableAllWindowFunction.java:44) ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0]
	at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableAllWindowFunction.process(InternalIterableAllWindowFunction.java:32) ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0]
	at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:549) ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0]
	at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:503) ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0]
	at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260) ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0]
	at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:281) ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0]
	... 7 common frames omitted
Caused by: java.sql.SQLException: Could not retrieve transaction read-only status from server
	at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:964) ~[mysql-connector-java-5.1.42.jar:5.1.42]
	at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:897) ~[mysql-connector-java-5.1.42.jar:5.1.42]
	at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:886) ~[mysql-connector-java-5.1.42.jar:5.1.42]
	at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:860) ~[mysql-connector-java-5.1.42.jar:5.1.42]
	at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:877) ~[mysql-connector-java-5.1.42.jar:5.1.42]
	at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:873) ~[mysql-connector-java-5.1.42.jar:5.1.42]
	at com.mysql.jdbc.ConnectionImpl.isReadOnly(ConnectionImpl.java:3536) ~[mysql-connector-java-5.1.42.jar:5.1.42]
	at com.mysql.jdbc.ConnectionImpl.isReadOnly(ConnectionImpl.java:3505) ~[mysql-connector-java-5.1.42.jar:5.1.42]
	at com.mysql.jdbc.PreparedStatement.executeBatchInternal(PreparedStatement.java:1230) ~[mysql-connector-java-5.1.42.jar:5.1.42]
	at com.mysql.jdbc.StatementImpl.executeBatch(StatementImpl.java:970) ~[mysql-connector-java-5.1.42.jar:5.1.42]
	at com.feiyu.help.MySQLTwoPhaseCommitSink.invoke(MySQLTwoPhaseCommitSink.java:78) ~[classes/:na]
	at com.feiyu.help.MySQLTwoPhaseCommitSink.invoke(MySQLTwoPhaseCommitSink.java:23) ~[classes/:na]
	at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:228) ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0]
	at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0]
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637) ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0]
	... 20 common frames omitted
Caused by: com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure

The last packet successfully received from the server was 3,747 milliseconds ago.  The last packet sent successfully to the server was 3,748 milliseconds ago.
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[na:1.8.0_191]
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) ~[na:1.8.0_191]
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[na:1.8.0_191]
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423) ~[na:1.8.0_191]
	at com.mysql.jdbc.Util.handleNewInstance(Util.java:425) ~[mysql-connector-java-5.1.42.jar:5.1.42]
	at com.mysql.jdbc.SQLError.createCommunicationsException(SQLError.java:989) ~[mysql-connector-java-5.1.42.jar:5.1.42]
	at com.mysql.jdbc.MysqlIO.clearInputStream(MysqlIO.java:905) ~[mysql-connector-java-5.1.42.jar:5.1.42]
	at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2474) ~[mysql-connector-java-5.1.42.jar:5.1.42]
	at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2680) ~[mysql-connector-java-5.1.42.jar:5.1.42]
	at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2486) ~[mysql-connector-java-5.1.42.jar:5.1.42]
	at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2444) ~[mysql-connector-java-5.1.42.jar:5.1.42]
	at com.mysql.jdbc.StatementImpl.executeQuery(StatementImpl.java:1381) ~[mysql-connector-java-5.1.42.jar:5.1.42]
	at com.mysql.jdbc.ConnectionImpl.isReadOnly(ConnectionImpl.java:3530) ~[mysql-connector-java-5.1.42.jar:5.1.42]
	... 28 common frames omitted
Caused by: java.net.SocketException: Bad file descriptor (ioctl FIONREAD failed)
	at java.net.PlainSocketImpl.socketAvailable(Native Method) ~[na:1.8.0_191]
	at java.net.AbstractPlainSocketImpl.available(AbstractPlainSocketImpl.java:490) ~[na:1.8.0_191]
	at java.net.SocketInputStream.available(SocketInputStream.java:259) ~[na:1.8.0_191]
	at com.mysql.jdbc.util.ReadAheadInputStream.available(ReadAheadInputStream.java:219) ~[mysql-connector-java-5.1.42.jar:5.1.42]
	at com.mysql.jdbc.MysqlIO.clearInputStream(MysqlIO.java:901) ~[mysql-connector-java-5.1.42.jar:5.1.42]
	... 34 common frames omitted
11:59:30.051 [flink-akka.actor.default-dispatcher-8] INFO  o.a.f.r.e.ExecutionGraph - Job flink kafka to Mysql (822054e1e70a7ac7616ffef1e667a20e) switched from state RUNNING to FAILING.
org.apache.flink.streaming.runtime.tasks.TimerException: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
	at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:284) ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[na:1.8.0_191]
	at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0_191]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) ~[na:1.8.0_191]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) ~[na:1.8.0_191]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[na:1.8.0_191]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[na:1.8.0_191]
	at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_191]
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654) ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0]
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612) ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0]
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592) ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0]
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727) ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0]
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705) ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0]
	at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0]
	at com.feiyu.help.AWF.apply(AWF.java:23) ~[classes/:na]
	at com.feiyu.help.AWF.apply(AWF.java:14) ~[classes/:na]
	at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableAllWindowFunction.process(InternalIterableAllWindowFunction.java:44) ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0]
	at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableAllWindowFunction.process(InternalIterableAllWindowFunction.java:32) ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0]
	at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:549) ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0]
	at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:503) ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0]
	at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260) ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0]
	at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:281) ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0]
	... 7 common frames omitted
Caused by: java.sql.SQLException: Could not retrieve transaction read-only status from server
	at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:964) ~[mysql-connector-java-5.1.42.jar:5.1.42]
	at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:897) ~[mysql-connector-java-5.1.42.jar:5.1.42]
	at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:886) ~[mysql-connector-java-5.1.42.jar:5.1.42]
	at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:860) ~[mysql-connector-java-5.1.42.jar:5.1.42]
	at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:877) ~[mysql-connector-java-5.1.42.jar:5.1.42]
	at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:873) ~[mysql-connector-java-5.1.42.jar:5.1.42]
	at com.mysql.jdbc.ConnectionImpl.isReadOnly(ConnectionImpl.java:3536) ~[mysql-connector-java-5.1.42.jar:5.1.42]
	at com.mysql.jdbc.ConnectionImpl.isReadOnly(ConnectionImpl.java:3505) ~[mysql-connector-java-5.1.42.jar:5.1.42]
	at com.mysql.jdbc.PreparedStatement.executeBatchInternal(PreparedStatement.java:1230) ~[mysql-connector-java-5.1.42.jar:5.1.42]
	at com.mysql.jdbc.StatementImpl.executeBatch(StatementImpl.java:970) ~[mysql-connector-java-5.1.42.jar:5.1.42]
	at com.feiyu.help.MySQLTwoPhaseCommitSink.invoke(MySQLTwoPhaseCommitSink.java:78) ~[classes/:na]
	at com.feiyu.help.MySQLTwoPhaseCommitSink.invoke(MySQLTwoPhaseCommitSink.java:23) ~[classes/:na]
	at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:228) ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0]
	at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0]
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637) ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0]
	... 20 common frames omitted
Caused by: com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure

The last packet successfully received from the server was 3,747 milliseconds ago.  The last packet sent successfully to the server was 3,748 milliseconds ago.
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[na:1.8.0_191]
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) ~[na:1.8.0_191]
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[na:1.8.0_191]
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423) ~[na:1.8.0_191]
	at com.mysql.jdbc.Util.handleNewInstance(Util.java:425) ~[mysql-connector-java-5.1.42.jar:5.1.42]
	at com.mysql.jdbc.SQLError.createCommunicationsException(SQLError.java:989) ~[mysql-connector-java-5.1.42.jar:5.1.42]
	at com.mysql.jdbc.MysqlIO.clearInputStream(MysqlIO.java:905) ~[mysql-connector-java-5.1.42.jar:5.1.42]
	at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2474) ~[mysql-connector-java-5.1.42.jar:5.1.42]
	at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2680) ~[mysql-connector-java-5.1.42.jar:5.1.42]
	at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2486) ~[mysql-connector-java-5.1.42.jar:5.1.42]
	at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2444) ~[mysql-connector-java-5.1.42.jar:5.1.42]
	at com.mysql.jdbc.StatementImpl.executeQuery(StatementImpl.java:1381) ~[mysql-connector-java-5.1.42.jar:5.1.42]
	at com.mysql.jdbc.ConnectionImpl.isReadOnly(ConnectionImpl.java:3530) ~[mysql-connector-java-5.1.42.jar:5.1.42]
	... 28 common frames omitted
Caused by: java.net.SocketException: Bad file descriptor (ioctl FIONREAD failed)
	at java.net.PlainSocketImpl.socketAvailable(Native Method) ~[na:1.8.0_191]
	at java.net.AbstractPlainSocketImpl.available(AbstractPlainSocketImpl.java:490) ~[na:1.8.0_191]
	at java.net.SocketInputStream.available(SocketInputStream.java:259) ~[na:1.8.0_191]
	at com.mysql.jdbc.util.ReadAheadInputStream.available(ReadAheadInputStream.java:219) ~[mysql-connector-java-5.1.42.jar:5.1.42]
	at com.mysql.jdbc.MysqlIO.clearInputStream(MysqlIO.java:901) ~[mysql-connector-java-5.1.42.jar:5.1.42]
	... 34 common frames omitted


代码
主程序:
package com.feiyu.help;

import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;

import java.sql.Types;
import java.util.Properties;

public class KafkaSinkMysql {

    public static void main(String[] args) throws Exception {

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //检查点配置
        env.enableCheckpointing(10000);
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointTimeout(100000);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

        //kafka配置
        Properties props = new Properties();
        props.put("bootstrap.servers", "10.250.0.101:9092,10.250.0.102:9092,10.250.0.103:9092");
        props.put("group.id", "test");
        props.put("auto.offset.reset", "latest");//earliest

        env.addSource(new FlinkKafkaConsumer011<>(
                "zzz",   //这个 kafka topic 需和生产消息的 topic 一致
                new JSONKeyValueDeserializationSchema(true),
                props)).setParallelism(1)
                .timeWindowAll(Time.seconds(5))
                .apply(new AWF()).setParallelism(1)
                .addSink(new MySQLTwoPhaseCommitSink(
                        "jdbc:mysql://10.250.0.38:3306/xy_data_20027?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&useSSL=false&autoReconnect=true&rewriteBatchedStatements=true" ,//failOverReadOnly=false
                        "com.mysql.jdbc.Driver",
                        "public",
                        "public",
                        "insert into employee_lwn (id, name, password, age, salary, department) values (?, ?, ?, ?, ?, ?)",
                        (new int[]{Types.INTEGER,Types.VARCHAR,Types.VARCHAR,Types.INTEGER,Types.INTEGER,Types.VARCHAR})))
                .setParallelism(1);

        env.execute("flink kafka to Mysql");
    }

}
窗口处理:
package com.feiyu.help;

import org.apache.flink.shaded.curator.org.apache.curator.shaded.com.google.common.collect.Lists;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;

public class AWF implements AllWindowFunction<ObjectNode, List<ObjectNode>, TimeWindow> {

    private static final Logger log = LoggerFactory.getLogger(AWF.class);

    @Override
    public void apply(TimeWindow window, Iterable<ObjectNode> values, Collector<List<ObjectNode>> out) throws Exception {
        ArrayList<ObjectNode> model = Lists.newArrayList(values);
        if (model.size() > 0) {
            log.info("10 秒内收集到 employee 的数据条数是:" + model.size());
            out.collect(model);
            log.info("collect 执行完毕");
        }
    }
}
sink:
package com.feiyu.help;

import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.base.VoidSerializer;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Serializable;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.List;

/**
 * 实现两阶段提交MySQL
 */
public class MySQLTwoPhaseCommitSink extends TwoPhaseCommitSinkFunction<List<ObjectNode>, Connection, Void>  implements Serializable {
    private static final long serialVersionUID = 1L;
    private static final Logger log = LoggerFactory.getLogger(MySQLTwoPhaseCommitSink.class);

    private String drivername;
    private String url;
    private String username;
    private String password;
    private String sql;
    private int[] types;

    public MySQLTwoPhaseCommitSink(String url, String drivername, String username, String password, String sql, int[] types) {
        super(new KryoSerializer<>(Connection.class,new ExecutionConfig()), VoidSerializer.INSTANCE);
        this.drivername = drivername;
        this.url = url;
        this.username = username;
        this.password = password;
        this.sql = sql;
        this.types = types;
    }

    /**
     * 执行数据库入库操作  task初始化的时候调用
     *
     * @param connection
     * @param data
     * @param context
     * @throws Exception
     */
    @Override
    protected void invoke(Connection connection, List<ObjectNode> data, Context context) throws Exception {
        log.info("start invoke...{},线程id: {}",connection, Thread.currentThread().getId());

        log.info("使用连接:{} 创建游标...,线程id: {}",connection, Thread.currentThread().getId());
        PreparedStatement prepareStatement = connection.prepareStatement(this.sql);
        log.info("创建 ps:{} 成功...,线程id: {}",prepareStatement.toString(), Thread.currentThread().getId());

        data.forEach(objectNode -> {
            try {
                String value = objectNode.get("value").toString();
                JSONObject valueJson = JSONObject.parseObject(value);
                prepareStatement.setObject(1, valueJson.get("id"));
                prepareStatement.setObject(2, valueJson.get("name"));
                prepareStatement.setObject(3, valueJson.get("password"));
                prepareStatement.setObject(4, valueJson.get("age"));
                prepareStatement.setObject(5, valueJson.get("salary"));
                prepareStatement.setObject(6, valueJson.get("department"));

                prepareStatement.addBatch();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        });

        log.info("start executeBatch, 使用ps:{}...,线程id: {}",prepareStatement.toString(), Thread.currentThread().getId());
        prepareStatement.executeBatch();

        log.info("准备关闭ps:{} ...,线程id: {}",prepareStatement.toString(), Thread.currentThread().getId());
        prepareStatement.close();

    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        log.info("start snapshotState...,线程id: {}", Thread.currentThread().getId());
        super.snapshotState(context);
    }

    /**
     * 获取连接,开启手动提交事物(getConnection方法中)
     *
     * @return
     * @throws Exception
     */
    @Override
    protected Connection beginTransaction() throws Exception {
        log.info("start beginTransaction.......,线程id: {}", Thread.currentThread().getId());
        Connection connection = null;

        try {
            log.info("create connection.......");
            connection = this.establishConnection();
            log.info("建立连接:{} 成功...,线程id: {}",connection, Thread.currentThread().getId());

        } catch (SQLException var4) {
            throw new IllegalArgumentException("open() failed.", var4);
        } catch (ClassNotFoundException var5) {
            throw new IllegalArgumentException("JDBC driver class not found.", var5);
        }

        // 设置手动提交
        connection.setAutoCommit(false);

        return connection;
    }

    private Connection establishConnection() throws SQLException, ClassNotFoundException {
        Class.forName(this.drivername);
        if (this.username == null) {
            return DriverManager.getConnection(this.url);
        } else {
            return DriverManager.getConnection(this.url, this.username, this.password);
        }
    }



    /**
     * 预提交,这里预提交的逻辑在invoke方法中
     *
     * @param connection
     * @throws Exception
     */
    @Override
    protected void preCommit(Connection connection) throws Exception {
        log.info("start preCommit...{} ...,线程id: {}",connection, Thread.currentThread().getId());
    }

    /**
     * 如果invoke方法执行正常,则提交事务
     *
     * @param connection
     */
    @Override
    protected void commit(Connection connection) {
        log.info("start commit..{},线程id: {}",connection, Thread.currentThread().getId());
        if (connection != null) {
            try {
                log.info("准备提交事务,使用连接:{} ...,线程id: {}",connection, Thread.currentThread().getId());
                connection.commit();

                close(connection);
            } catch (SQLException e) {
                log.error("提交事务失败,Connection: {},线程id: {}",connection, Thread.currentThread().getId());
                e.printStackTrace();
            } finally {

            }
        }
    }

    @Override
    protected void recoverAndCommit(Connection connection) {
        log.info("start recoverAndCommit {}.......,线程id: {}",connection, Thread.currentThread().getId());
    }


    @Override
    protected void recoverAndAbort(Connection connection) {
        log.info("start abort recoverAndAbort {}.......,线程id: {}",connection, Thread.currentThread().getId());
    }

    /**
     * 如果invoke执行异常则回滚事物,下一次的checkpoint操作也不会执行
     *
     * @param connection
     */
    @Override
    protected void abort(Connection connection) {
        log.info("start abort rollback... {} ,线程id: {}",connection, Thread.currentThread().getId());
        if (connection != null) {
            try {
                log.error("事务发生回滚,Connection: {} ,线程id: {}",connection, Thread.currentThread().getId());
                connection.rollback();

//                close(connection);
            } catch (SQLException e) {
                log.error("事物回滚失败,Connection: {} ,线程id: {}",connection, Thread.currentThread().getId());
            } finally {
//                close(connection);
            }
        }
    }

    /**
     * 关闭连接
     *
     * @param connection
     */
    public void close(Connection connection) {
        if (connection != null) {
            try {
                log.info("准备关闭连接:{} ...,线程id: {}",connection, Thread.currentThread().getId());
                connection.close();
            } catch (SQLException var12) {
                log.info("JDBC connection could not be closed: " + var12.getMessage());
            } finally {
//                connection = null;
            }
        }
    }

}
POJO:
package com.feiyu.help;

import java.io.Serializable;

public class Employee implements Serializable {// implements Model

    private static final long serialVersionUID = 2L;

    public int id;
    public String name;
    public String password;
    public int age;
    public Integer salary;
    public String department;

    public Employee(int id, String name, String password, int age, Integer salary, String department) {
        this.id = id;
        this.name = name;
        this.password = password;
        this.age = age;
        this.salary = salary;
        this.department = department;
    }

    public Employee() {

    }

    public Integer getSalary() {
        return salary;
    }

    public void setSalary(Integer salary) {
        this.salary = salary;
    }

    public String getDepartment() {
        return department;
    }

    public void setDepartment(String department) {
        this.department = department;
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }

}
kafka生产者:(用于生产测试数据)
package com.feiyu.help;

import com.alibaba.fastjson.JSON;
import com.feiyu.gflow.test2.test.Employee;
import com.google.gson.Gson;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;
import java.util.Random;

public class KafkaProducerTestCoutinue {
    public static void main(String[] args) {
        Producer();
    }

    public static void Producer() {
        String broker = "10.250.0.101:9092,10.250.0.102:9092,10.250.0.103:9092";
        String topic = "zzz";
        Properties props = new Properties();
        props.put("bootstrap.servers", broker);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer producer = new KafkaProducer<String, String>(props);

        String[] depLists = new String[5];
        depLists[0] = "行政部";
        depLists[1] = "账务部";
        depLists[2] = "市场部";
        depLists[3] = "技术部";
        depLists[4] = "销售部";

        Random rand = new Random(300);
        Gson gson = new Gson();
        for (int i = 1; i <= 10000; i++) {
            String temp = JSON.toJSONString(
                    new Employee(i, "user" + i, "password" + i, rand.nextInt(40) + 20, (rand.nextInt(300) + 1) * 100, depLists[rand.nextInt(5)])
            );
            ProducerRecord record = new ProducerRecord<String, String>(topic, null, null, temp);
            producer.send(record);
            System.out.println("发送数据: " + temp);
            try {
                Thread.sleep(10); //发送一条数据 sleep
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println("发送数据完成");
        producer.flush();
    }

}
mysql数据库表信息:

CREATE TABLE `employee_lwn` (
  `id` bigint(20) DEFAULT NULL,
  `name` varchar(50) DEFAULT NULL,
  `password` varchar(50) DEFAULT NULL,
  `age` int(11) DEFAULT NULL,
  `salary` int(11) DEFAULT NULL,
  `department` varchar(20) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;