You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 卢伟楠 <gl...@gmail.com> on 2019/12/25 04:17:41 UTC
实现一个两阶段提交的ETL,数据从kafka到mysql,遇到的问题
项目简述:从kafka取数据,每10秒一批,sink到mysql中的ETL
环境相关信息
flink运行模式:local
mysql的global variables中wait_timeout=28800
mysql客户端mysql-connector-java版本5.1.42
报错
org.apache.flink.streaming.runtime.tasks.TimerException: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:284) ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[na:1.8.0_191]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0_191]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) ~[na:1.8.0_191]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) ~[na:1.8.0_191]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[na:1.8.0_191]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[na:1.8.0_191]
at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_191]
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654) ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0]
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612) ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0]
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592) ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0]
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727) ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0]
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705) ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0]
at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0]
at com.feiyu.help.AWF.apply(AWF.java:23) ~[classes/:na]
at com.feiyu.help.AWF.apply(AWF.java:14) ~[classes/:na]
at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableAllWindowFunction.process(InternalIterableAllWindowFunction.java:44) ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0]
at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableAllWindowFunction.process(InternalIterableAllWindowFunction.java:32) ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0]
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:549) ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0]
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:503) ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0]
at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260) ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0]
at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:281) ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0]
... 7 common frames omitted
Caused by: java.sql.SQLException: Could not retrieve transaction read-only status from server
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:964) ~[mysql-connector-java-5.1.42.jar:5.1.42]
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:897) ~[mysql-connector-java-5.1.42.jar:5.1.42]
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:886) ~[mysql-connector-java-5.1.42.jar:5.1.42]
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:860) ~[mysql-connector-java-5.1.42.jar:5.1.42]
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:877) ~[mysql-connector-java-5.1.42.jar:5.1.42]
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:873) ~[mysql-connector-java-5.1.42.jar:5.1.42]
at com.mysql.jdbc.ConnectionImpl.isReadOnly(ConnectionImpl.java:3536) ~[mysql-connector-java-5.1.42.jar:5.1.42]
at com.mysql.jdbc.ConnectionImpl.isReadOnly(ConnectionImpl.java:3505) ~[mysql-connector-java-5.1.42.jar:5.1.42]
at com.mysql.jdbc.PreparedStatement.executeBatchInternal(PreparedStatement.java:1230) ~[mysql-connector-java-5.1.42.jar:5.1.42]
at com.mysql.jdbc.StatementImpl.executeBatch(StatementImpl.java:970) ~[mysql-connector-java-5.1.42.jar:5.1.42]
at com.feiyu.help.MySQLTwoPhaseCommitSink.invoke(MySQLTwoPhaseCommitSink.java:78) ~[classes/:na]
at com.feiyu.help.MySQLTwoPhaseCommitSink.invoke(MySQLTwoPhaseCommitSink.java:23) ~[classes/:na]
at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:228) ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0]
at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0]
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637) ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0]
... 20 common frames omitted
Caused by: com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure
The last packet successfully received from the server was 3,747 milliseconds ago. The last packet sent successfully to the server was 3,748 milliseconds ago.
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[na:1.8.0_191]
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) ~[na:1.8.0_191]
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[na:1.8.0_191]
at java.lang.reflect.Constructor.newInstance(Constructor.java:423) ~[na:1.8.0_191]
at com.mysql.jdbc.Util.handleNewInstance(Util.java:425) ~[mysql-connector-java-5.1.42.jar:5.1.42]
at com.mysql.jdbc.SQLError.createCommunicationsException(SQLError.java:989) ~[mysql-connector-java-5.1.42.jar:5.1.42]
at com.mysql.jdbc.MysqlIO.clearInputStream(MysqlIO.java:905) ~[mysql-connector-java-5.1.42.jar:5.1.42]
at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2474) ~[mysql-connector-java-5.1.42.jar:5.1.42]
at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2680) ~[mysql-connector-java-5.1.42.jar:5.1.42]
at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2486) ~[mysql-connector-java-5.1.42.jar:5.1.42]
at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2444) ~[mysql-connector-java-5.1.42.jar:5.1.42]
at com.mysql.jdbc.StatementImpl.executeQuery(StatementImpl.java:1381) ~[mysql-connector-java-5.1.42.jar:5.1.42]
at com.mysql.jdbc.ConnectionImpl.isReadOnly(ConnectionImpl.java:3530) ~[mysql-connector-java-5.1.42.jar:5.1.42]
... 28 common frames omitted
Caused by: java.net.SocketException: Bad file descriptor (ioctl FIONREAD failed)
at java.net.PlainSocketImpl.socketAvailable(Native Method) ~[na:1.8.0_191]
at java.net.AbstractPlainSocketImpl.available(AbstractPlainSocketImpl.java:490) ~[na:1.8.0_191]
at java.net.SocketInputStream.available(SocketInputStream.java:259) ~[na:1.8.0_191]
at com.mysql.jdbc.util.ReadAheadInputStream.available(ReadAheadInputStream.java:219) ~[mysql-connector-java-5.1.42.jar:5.1.42]
at com.mysql.jdbc.MysqlIO.clearInputStream(MysqlIO.java:901) ~[mysql-connector-java-5.1.42.jar:5.1.42]
... 34 common frames omitted
11:59:30.051 [flink-akka.actor.default-dispatcher-8] INFO o.a.f.r.e.ExecutionGraph - Job flink kafka to Mysql (822054e1e70a7ac7616ffef1e667a20e) switched from state RUNNING to FAILING.
org.apache.flink.streaming.runtime.tasks.TimerException: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:284) ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[na:1.8.0_191]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0_191]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) ~[na:1.8.0_191]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) ~[na:1.8.0_191]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[na:1.8.0_191]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[na:1.8.0_191]
at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_191]
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654) ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0]
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612) ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0]
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592) ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0]
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727) ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0]
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705) ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0]
at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0]
at com.feiyu.help.AWF.apply(AWF.java:23) ~[classes/:na]
at com.feiyu.help.AWF.apply(AWF.java:14) ~[classes/:na]
at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableAllWindowFunction.process(InternalIterableAllWindowFunction.java:44) ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0]
at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableAllWindowFunction.process(InternalIterableAllWindowFunction.java:32) ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0]
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:549) ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0]
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:503) ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0]
at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260) ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0]
at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:281) ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0]
... 7 common frames omitted
Caused by: java.sql.SQLException: Could not retrieve transaction read-only status from server
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:964) ~[mysql-connector-java-5.1.42.jar:5.1.42]
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:897) ~[mysql-connector-java-5.1.42.jar:5.1.42]
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:886) ~[mysql-connector-java-5.1.42.jar:5.1.42]
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:860) ~[mysql-connector-java-5.1.42.jar:5.1.42]
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:877) ~[mysql-connector-java-5.1.42.jar:5.1.42]
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:873) ~[mysql-connector-java-5.1.42.jar:5.1.42]
at com.mysql.jdbc.ConnectionImpl.isReadOnly(ConnectionImpl.java:3536) ~[mysql-connector-java-5.1.42.jar:5.1.42]
at com.mysql.jdbc.ConnectionImpl.isReadOnly(ConnectionImpl.java:3505) ~[mysql-connector-java-5.1.42.jar:5.1.42]
at com.mysql.jdbc.PreparedStatement.executeBatchInternal(PreparedStatement.java:1230) ~[mysql-connector-java-5.1.42.jar:5.1.42]
at com.mysql.jdbc.StatementImpl.executeBatch(StatementImpl.java:970) ~[mysql-connector-java-5.1.42.jar:5.1.42]
at com.feiyu.help.MySQLTwoPhaseCommitSink.invoke(MySQLTwoPhaseCommitSink.java:78) ~[classes/:na]
at com.feiyu.help.MySQLTwoPhaseCommitSink.invoke(MySQLTwoPhaseCommitSink.java:23) ~[classes/:na]
at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:228) ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0]
at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0]
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637) ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0]
... 20 common frames omitted
Caused by: com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure
The last packet successfully received from the server was 3,747 milliseconds ago. The last packet sent successfully to the server was 3,748 milliseconds ago.
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[na:1.8.0_191]
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) ~[na:1.8.0_191]
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[na:1.8.0_191]
at java.lang.reflect.Constructor.newInstance(Constructor.java:423) ~[na:1.8.0_191]
at com.mysql.jdbc.Util.handleNewInstance(Util.java:425) ~[mysql-connector-java-5.1.42.jar:5.1.42]
at com.mysql.jdbc.SQLError.createCommunicationsException(SQLError.java:989) ~[mysql-connector-java-5.1.42.jar:5.1.42]
at com.mysql.jdbc.MysqlIO.clearInputStream(MysqlIO.java:905) ~[mysql-connector-java-5.1.42.jar:5.1.42]
at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2474) ~[mysql-connector-java-5.1.42.jar:5.1.42]
at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2680) ~[mysql-connector-java-5.1.42.jar:5.1.42]
at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2486) ~[mysql-connector-java-5.1.42.jar:5.1.42]
at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2444) ~[mysql-connector-java-5.1.42.jar:5.1.42]
at com.mysql.jdbc.StatementImpl.executeQuery(StatementImpl.java:1381) ~[mysql-connector-java-5.1.42.jar:5.1.42]
at com.mysql.jdbc.ConnectionImpl.isReadOnly(ConnectionImpl.java:3530) ~[mysql-connector-java-5.1.42.jar:5.1.42]
... 28 common frames omitted
Caused by: java.net.SocketException: Bad file descriptor (ioctl FIONREAD failed)
at java.net.PlainSocketImpl.socketAvailable(Native Method) ~[na:1.8.0_191]
at java.net.AbstractPlainSocketImpl.available(AbstractPlainSocketImpl.java:490) ~[na:1.8.0_191]
at java.net.SocketInputStream.available(SocketInputStream.java:259) ~[na:1.8.0_191]
at com.mysql.jdbc.util.ReadAheadInputStream.available(ReadAheadInputStream.java:219) ~[mysql-connector-java-5.1.42.jar:5.1.42]
at com.mysql.jdbc.MysqlIO.clearInputStream(MysqlIO.java:901) ~[mysql-connector-java-5.1.42.jar:5.1.42]
... 34 common frames omitted
代码
主程序:
package com.feiyu.help;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;
import java.sql.Types;
import java.util.Properties;
public class KafkaSinkMysql {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//检查点配置
env.enableCheckpointing(10000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(100000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
//kafka配置
Properties props = new Properties();
props.put("bootstrap.servers", "10.250.0.101:9092,10.250.0.102:9092,10.250.0.103:9092");
props.put("group.id", "test");
props.put("auto.offset.reset", "latest");//earliest
env.addSource(new FlinkKafkaConsumer011<>(
"zzz", //这个 kafka topic 需和生产消息的 topic 一致
new JSONKeyValueDeserializationSchema(true),
props)).setParallelism(1)
.timeWindowAll(Time.seconds(5))
.apply(new AWF()).setParallelism(1)
.addSink(new MySQLTwoPhaseCommitSink(
"jdbc:mysql://10.250.0.38:3306/xy_data_20027?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&useSSL=false&autoReconnect=true&rewriteBatchedStatements=true" ,//failOverReadOnly=false
"com.mysql.jdbc.Driver",
"public",
"public",
"insert into employee_lwn (id, name, password, age, salary, department) values (?, ?, ?, ?, ?, ?)",
(new int[]{Types.INTEGER,Types.VARCHAR,Types.VARCHAR,Types.INTEGER,Types.INTEGER,Types.VARCHAR})))
.setParallelism(1);
env.execute("flink kafka to Mysql");
}
}
窗口处理:
package com.feiyu.help;
import org.apache.flink.shaded.curator.org.apache.curator.shaded.com.google.common.collect.Lists;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
public class AWF implements AllWindowFunction<ObjectNode, List<ObjectNode>, TimeWindow> {
private static final Logger log = LoggerFactory.getLogger(AWF.class);
@Override
public void apply(TimeWindow window, Iterable<ObjectNode> values, Collector<List<ObjectNode>> out) throws Exception {
ArrayList<ObjectNode> model = Lists.newArrayList(values);
if (model.size() > 0) {
log.info("10 秒内收集到 employee 的数据条数是:" + model.size());
out.collect(model);
log.info("collect 执行完毕");
}
}
}
sink:
package com.feiyu.help;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.base.VoidSerializer;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.List;
/**
* 实现两阶段提交MySQL
*/
public class MySQLTwoPhaseCommitSink extends TwoPhaseCommitSinkFunction<List<ObjectNode>, Connection, Void> implements Serializable {
private static final long serialVersionUID = 1L;
private static final Logger log = LoggerFactory.getLogger(MySQLTwoPhaseCommitSink.class);
private String drivername;
private String url;
private String username;
private String password;
private String sql;
private int[] types;
public MySQLTwoPhaseCommitSink(String url, String drivername, String username, String password, String sql, int[] types) {
super(new KryoSerializer<>(Connection.class,new ExecutionConfig()), VoidSerializer.INSTANCE);
this.drivername = drivername;
this.url = url;
this.username = username;
this.password = password;
this.sql = sql;
this.types = types;
}
/**
* 执行数据库入库操作 task初始化的时候调用
*
* @param connection
* @param data
* @param context
* @throws Exception
*/
@Override
protected void invoke(Connection connection, List<ObjectNode> data, Context context) throws Exception {
log.info("start invoke...{},线程id: {}",connection, Thread.currentThread().getId());
log.info("使用连接:{} 创建游标...,线程id: {}",connection, Thread.currentThread().getId());
PreparedStatement prepareStatement = connection.prepareStatement(this.sql);
log.info("创建 ps:{} 成功...,线程id: {}",prepareStatement.toString(), Thread.currentThread().getId());
data.forEach(objectNode -> {
try {
String value = objectNode.get("value").toString();
JSONObject valueJson = JSONObject.parseObject(value);
prepareStatement.setObject(1, valueJson.get("id"));
prepareStatement.setObject(2, valueJson.get("name"));
prepareStatement.setObject(3, valueJson.get("password"));
prepareStatement.setObject(4, valueJson.get("age"));
prepareStatement.setObject(5, valueJson.get("salary"));
prepareStatement.setObject(6, valueJson.get("department"));
prepareStatement.addBatch();
} catch (SQLException e) {
e.printStackTrace();
}
});
log.info("start executeBatch, 使用ps:{}...,线程id: {}",prepareStatement.toString(), Thread.currentThread().getId());
prepareStatement.executeBatch();
log.info("准备关闭ps:{} ...,线程id: {}",prepareStatement.toString(), Thread.currentThread().getId());
prepareStatement.close();
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
log.info("start snapshotState...,线程id: {}", Thread.currentThread().getId());
super.snapshotState(context);
}
/**
* 获取连接,开启手动提交事物(getConnection方法中)
*
* @return
* @throws Exception
*/
@Override
protected Connection beginTransaction() throws Exception {
log.info("start beginTransaction.......,线程id: {}", Thread.currentThread().getId());
Connection connection = null;
try {
log.info("create connection.......");
connection = this.establishConnection();
log.info("建立连接:{} 成功...,线程id: {}",connection, Thread.currentThread().getId());
} catch (SQLException var4) {
throw new IllegalArgumentException("open() failed.", var4);
} catch (ClassNotFoundException var5) {
throw new IllegalArgumentException("JDBC driver class not found.", var5);
}
// 设置手动提交
connection.setAutoCommit(false);
return connection;
}
private Connection establishConnection() throws SQLException, ClassNotFoundException {
Class.forName(this.drivername);
if (this.username == null) {
return DriverManager.getConnection(this.url);
} else {
return DriverManager.getConnection(this.url, this.username, this.password);
}
}
/**
* 预提交,这里预提交的逻辑在invoke方法中
*
* @param connection
* @throws Exception
*/
@Override
protected void preCommit(Connection connection) throws Exception {
log.info("start preCommit...{} ...,线程id: {}",connection, Thread.currentThread().getId());
}
/**
* 如果invoke方法执行正常,则提交事务
*
* @param connection
*/
@Override
protected void commit(Connection connection) {
log.info("start commit..{},线程id: {}",connection, Thread.currentThread().getId());
if (connection != null) {
try {
log.info("准备提交事务,使用连接:{} ...,线程id: {}",connection, Thread.currentThread().getId());
connection.commit();
close(connection);
} catch (SQLException e) {
log.error("提交事务失败,Connection: {},线程id: {}",connection, Thread.currentThread().getId());
e.printStackTrace();
} finally {
}
}
}
@Override
protected void recoverAndCommit(Connection connection) {
log.info("start recoverAndCommit {}.......,线程id: {}",connection, Thread.currentThread().getId());
}
@Override
protected void recoverAndAbort(Connection connection) {
log.info("start abort recoverAndAbort {}.......,线程id: {}",connection, Thread.currentThread().getId());
}
/**
* 如果invoke执行异常则回滚事物,下一次的checkpoint操作也不会执行
*
* @param connection
*/
@Override
protected void abort(Connection connection) {
log.info("start abort rollback... {} ,线程id: {}",connection, Thread.currentThread().getId());
if (connection != null) {
try {
log.error("事务发生回滚,Connection: {} ,线程id: {}",connection, Thread.currentThread().getId());
connection.rollback();
// close(connection);
} catch (SQLException e) {
log.error("事物回滚失败,Connection: {} ,线程id: {}",connection, Thread.currentThread().getId());
} finally {
// close(connection);
}
}
}
/**
* 关闭连接
*
* @param connection
*/
public void close(Connection connection) {
if (connection != null) {
try {
log.info("准备关闭连接:{} ...,线程id: {}",connection, Thread.currentThread().getId());
connection.close();
} catch (SQLException var12) {
log.info("JDBC connection could not be closed: " + var12.getMessage());
} finally {
// connection = null;
}
}
}
}
POJO:
package com.feiyu.help;
import java.io.Serializable;
public class Employee implements Serializable {// implements Model
private static final long serialVersionUID = 2L;
public int id;
public String name;
public String password;
public int age;
public Integer salary;
public String department;
public Employee(int id, String name, String password, int age, Integer salary, String department) {
this.id = id;
this.name = name;
this.password = password;
this.age = age;
this.salary = salary;
this.department = department;
}
public Employee() {
}
public Integer getSalary() {
return salary;
}
public void setSalary(Integer salary) {
this.salary = salary;
}
public String getDepartment() {
return department;
}
public void setDepartment(String department) {
this.department = department;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
}
kafka生产者:(用于生产测试数据)
package com.feiyu.help;
import com.alibaba.fastjson.JSON;
import com.feiyu.gflow.test2.test.Employee;
import com.google.gson.Gson;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
import java.util.Random;
public class KafkaProducerTestCoutinue {
public static void main(String[] args) {
Producer();
}
public static void Producer() {
String broker = "10.250.0.101:9092,10.250.0.102:9092,10.250.0.103:9092";
String topic = "zzz";
Properties props = new Properties();
props.put("bootstrap.servers", broker);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer producer = new KafkaProducer<String, String>(props);
String[] depLists = new String[5];
depLists[0] = "行政部";
depLists[1] = "账务部";
depLists[2] = "市场部";
depLists[3] = "技术部";
depLists[4] = "销售部";
Random rand = new Random(300);
Gson gson = new Gson();
for (int i = 1; i <= 10000; i++) {
String temp = JSON.toJSONString(
new Employee(i, "user" + i, "password" + i, rand.nextInt(40) + 20, (rand.nextInt(300) + 1) * 100, depLists[rand.nextInt(5)])
);
ProducerRecord record = new ProducerRecord<String, String>(topic, null, null, temp);
producer.send(record);
System.out.println("发送数据: " + temp);
try {
Thread.sleep(10); //发送一条数据 sleep
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("发送数据完成");
producer.flush();
}
}
mysql数据库表信息:
CREATE TABLE `employee_lwn` (
`id` bigint(20) DEFAULT NULL,
`name` varchar(50) DEFAULT NULL,
`password` varchar(50) DEFAULT NULL,
`age` int(11) DEFAULT NULL,
`salary` int(11) DEFAULT NULL,
`department` varchar(20) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;