You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 花乞丐 <hu...@163.com> on 2021/01/15 07:19:30 UTC
flink sql 消费kafka 消息 写入Hive不提交分区
我这边从kafka消费信息,然后写入到Hive中,目前发现不提交分区,不提交分区的原因是watemark是负数的,不清楚这个负数的watermark是怎么出现的?
<http://apache-flink.147419.n8.nabble.com/file/t1257/E4142DB1-E410-43e8-8653-2B90D0A998EA.png>
我代码也指定了watermark,但是debug的时候好像没有起作用
--
Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink sql 消费kafka 消息 写入Hive不提交分区
Posted by 花乞丐 <hu...@163.com>.
多谢各位的耐心回答,我已经找到问题了,目前是水印使用有点问题,是我自己的问题,不好意思各位
<http://apache-flink.147419.n8.nabble.com/file/t1257/%E4%BD%BF%E7%94%A8%E5%A7%BF%E5%8A%BF.jpg>
修改之后,发现还是无法提交分区数据,经调试发现,watermark值目前是ok,但是其次是,由于Flink的toMills方法使用的UTC时间,导致我们从分区提取值时,比原始值大了8个小时,因此,导致水印一直小于
partition_time+commitDelay。接下来进行相应处理即可。
<http://apache-flink.147419.n8.nabble.com/file/t1257/%E6%97%B6%E5%8C%BA%E9%97%AE%E9%A2%98.jpg>
--
Sent from: http://apache-flink.147419.n8.nabble.com/
Re: 回复: flink sql 消费kafka 消息 写入Hive不提交分区
Posted by 花乞丐 <hu...@163.com>.
package com.zallsteel.flink.app.log;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
import com.zallsteel.flink.entity.ChangelogVO;
import com.zallsteel.flink.entity.OrderInfo;
import com.zallsteel.flink.utils.ConfigUtils;
import lombok.SneakyThrows;
import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import
org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import java.text.ParseException;
import java.time.Duration;
import java.util.Date;
import java.util.Properties;
/**
* @desc 测试MySQLCDC to Hive
*/
public class MySQLCDC2HiveApp {
public static void main(String[] args) {
//获取执行环节
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// 设置并发
env.setParallelism(6);
//设置checkpoint
env.enableCheckpointing(60000);
env.getConfig().setAutoWatermarkInterval(200);
// 设置Flink SQL环境
EnvironmentSettings tableEnvSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
// 创建table Env
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
tableEnvSettings);
// 设置checkpoint 模型
tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,
CheckpointingMode.EXACTLY_ONCE);
// 设置checkpoint间隔
tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
Duration.ofMinutes(1));
// 指定catalog名称
String catalogName = "devHive";
// 创建HiveCatalog
HiveCatalog hiveCatalog = new HiveCatalog(catalogName,
"default",
"/home/beggar/tools/apache-hive-3.1.2-bin/conf",
"/home/beggar/tools/hadoop-3.1.1/etc/hadoop",
"3.1.2"
);
//注册 Hive Catalog
tableEnv.registerCatalog(catalogName,hiveCatalog);
//使用hive Catalog
tableEnv.useCatalog(catalogName);
//创建mysql cdc 数据源
tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS cdc");
// 创建mysql cdc 数据表
tableEnv.executeSql("DROP TABLE IF EXISTS cdc.order_info");
tableEnv.executeSql("CREATE TABLE cdc.order_info(\n" +
" id BIGINT,\n" +
" user_id BIGINT,\n" +
" create_time TIMESTAMP,\n" +
" operate_time TIMESTAMP,\n" +
" province_id INT,\n" +
" order_status STRING,\n" +
" total_amount DECIMAL(10, 5)\n" +
" ) WITH (\n" +
" 'connector' = 'mysql-cdc',\n" +
" 'hostname' = 'beggar',\n" +
" 'port' = '3306',\n" +
" 'username' = 'root',\n" +
" 'password' = '123456',\n" +
" 'database-name' = 'cdc',\n" +
" 'table-name' = 'order_info'\n" +
")");
// 创建kafka source
tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS kafka");
tableEnv.executeSql("DROP TABLE IF EXISTS kafka.order_info");
tableEnv.executeSql("CREATE TABLE kafka.order_info (\n" +
"id BIGINT,\n" +
"user_id BIGINT,\n" +
"create_time TIMESTAMP,\n" +
"operate_time TIMESTAMP,\n" +
"province_id INT,\n" +
"order_status STRING,\n" +
"total_amount DECIMAL(10, 5)\n" +
") WITH (\n" +
"'connector' = 'kafka',\n" +
"'topic' = 'order_info',\n" +
"'scan.startup.mode' = 'earliest-offset',\n" +
"'properties.bootstrap.servers' = 'beggar.dev:9092',\n" +
"'format' = 'changelog-json'\n" +
")");
// 向kafka表中插入数据
tableEnv.executeSql("INSERT INTO kafka.order_info\n" +
"SELECT id, user_id, create_time,
operate_time,province_id,order_status,total_amount\n" +
"FROM cdc.order_info");
// 自定义带op字段的stream
Properties kafkaConfig = ConfigUtils.getKafkaConfig();
FlinkKafkaConsumerBase<String> consumer = new FlinkKafkaConsumer<>(
"order_info",
new SimpleStringSchema(),
kafkaConfig
).setStartFromEarliest();
DataStreamSource<String> streamSource = env.addSource(consumer);
String[] fieldNames =
{"id","user_id","create_time","operate_time","province_id","order_status","total_amount","op"};
TypeInformation[] types =
{Types.LONG,Types.LONG,Types.STRING,Types.STRING,Types.INT,Types.INT,Types.DOUBLE,Types.STRING};
SingleOutputStreamOperator<Row> ds2 = streamSource.map(new
MapFunction<String, Row>() {
@Override
public Row map(String value) throws Exception {
Gson gson = new Gson();
ChangelogVO changelogVO = gson.fromJson(value,
ChangelogVO.class);
String op = changelogVO.getOp();
int arity = fieldNames.length;
Row row = new Row(arity);
row.setField(0, changelogVO.getData().getId());
row.setField(1, changelogVO.getData().getUserId());
row.setField(2, changelogVO.getData().getCreateTime());
row.setField(3, changelogVO.getData().getOperateTime());
row.setField(4, changelogVO.getData().getProviceId());
row.setField(5, changelogVO.getData().getOrderStatus());
row.setField(6, changelogVO.getData().getTotalAmount());
String operation = getOperation(op);
row.setField(7, operation);
return row;
}
private String getOperation(String op) {
String operation = "INSERT";
for (RowKind rk : RowKind.values()) {
if (rk.shortString().equals(op)) {
switch (rk) {
case UPDATE_BEFORE:
operation = "UPDATE-BEFORE";
break;
case UPDATE_AFTER:
operation = "UPDATE-AFTER";
break;
case DELETE:
operation = "DELETE";
break;
case INSERT:
default:
operation = "INSERT";
break;
}
break;
}
}
return operation;
}
}, new RowTypeInfo(types, fieldNames));
/ds2.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(2)));
tableEnv.createTemporaryView("merged_order_info", ds2);
tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS ods");
tableEnv.executeSql("DROP TABLE IF EXISTS ods.order_info");
tableEnv.executeSql("CREATE TABLE ods.order_info (\n" +
" id BIGINT,\n" +
" user_id BIGINT,\n" +
" create_time STRING,\n" +
" operate_time STRING,\n" +
" province_id INT,\n" +
" order_status INT,\n" +
" total_amount DOUBLE,\n" +
" op STRING \n" +
") PARTITIONED BY (dt STRING, hr STRING,sec STRING) STORED
AS parquet TBLPROPERTIES (\n" +
" 'partition.time-extractor.timestamp-pattern'='$dt
$hr:$sec:00',\n" +
" 'sink.partition-commit.trigger'='partition-time',\n" +
" 'sink.partition-commit.delay'='1 min',\n" +
"
'sink.partition-commit.policy.kind'='metastore,success-file'\n" +
")");
tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
TableResult tableResult = tableEnv.executeSql("INSERT INTO
ods.order_info\n" +
"SELECT \n" +
"id,\n" +
"user_id,\n" +
"create_time,\n" +
"operate_time,\n" +
"province_id,\n" +
"order_status,\n" +
"total_amount,\n" +
"op,\n" +
"DATE_FORMAT(TO_TIMESTAMP(create_time,'yyyy-MM-dd
HH:mm:ss'),'yyyy-MM-dd') as dt,\n" +
"DATE_FORMAT(TO_TIMESTAMP(create_time,'yyyy-MM-dd
HH:mm:ss'),'HH') as hr,\n" +
"DATE_FORMAT(TO_TIMESTAMP(create_time,'yyyy-MM-dd
HH:mm:ss'),'mm') as sec\n" +
"FROM merged_order_info"
);
try {
tableEnv.execute("mysqlcdc to hive");
} catch (Exception e) {
e.printStackTrace();
}
}
}
--
Sent from: http://apache-flink.147419.n8.nabble.com/
Re: 回复: flink sql 消费kafka 消息 写入Hive不提交分区
Posted by Shengkai Fang <fs...@gmail.com>.
这种情况一般是kafka的某个分区,不存在数据,导致总体的watermark不前进。遇到这种情况一般是需要手动设置idle
source[1]。但是社区的watemark push down存在一些问题[2],已经在修复了。
[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/config.html#table-exec-source-idle-timeout
[2]
https://issues.apache.org/jira/browse/FLINK-20947?page=com.atlassian.jira.plugin.system.issuetabpanels%3Aall-tabpanel
花乞丐 <hu...@163.com> 于2021年1月18日周一 上午11:42写道:
>
> 代码已经附上,我现在是数据已经写入hdfs,有文件生产,但是目前添加的水印无效,所以一直没有更新metastore信息,导致metastore中一直没有分区信息,必须在hive
> shell中执行命令:hive (ods)> msck repair table
>
> order_info。之后才可以查询到数据,经过debug发现,在分区提交的时候,需要判断水印的值比从分区提取的值+延迟时间大,才会提交分区,但是现在,水印的值一直是Long.MIN_VALUE,导致一直无法提交水印,我在代码中已经设置了水印,是不是我的水印设置姿势不对,还请指教!
> package com.zallsteel.flink.app.log;
>
> import com.alibaba.fastjson.JSON;
> import com.alibaba.fastjson.JSONObject;
> import com.google.gson.Gson;
> import com.google.gson.JsonArray;
> import com.google.gson.JsonElement;
> import com.google.gson.JsonParser;
> import com.zallsteel.flink.entity.ChangelogVO;
> import com.zallsteel.flink.entity.OrderInfo;
> import com.zallsteel.flink.utils.ConfigUtils;
>
> import lombok.SneakyThrows;
> import org.apache.commons.lang3.time.FastDateFormat;
> import org.apache.flink.api.common.eventtime.*;
> import org.apache.flink.api.common.functions.MapFunction;
> import org.apache.flink.api.common.serialization.SimpleStringSchema;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.common.typeinfo.Types;
> import org.apache.flink.api.java.typeutils.RowTypeInfo;
> import org.apache.flink.streaming.api.CheckpointingMode;
> import org.apache.flink.streaming.api.TimeCharacteristic;
> import org.apache.flink.streaming.api.datastream.DataStreamSource;
> import
> org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
> import
> org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import
>
> org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
> import org.apache.flink.streaming.api.windowing.time.Time;
> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.SqlDialect;
> import org.apache.flink.table.api.TableResult;
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> import org.apache.flink.table.catalog.hive.HiveCatalog;
> import org.apache.flink.types.Row;
> import org.apache.flink.types.RowKind;
>
> import java.text.ParseException;
> import java.time.Duration;
> import java.util.Date;
> import java.util.Properties;
>
> /**
> * @author Jackie Zhu
> * @time 2021-01-13 16:50:18
> * @desc 测试MySQLCDC to Hive
> */
> public class MySQLCDC2HiveApp {
> public static void main(String[] args) {
> //获取执行环节
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> // 设置并发
> env.setParallelism(6);
> //设置checkpoint
> env.enableCheckpointing(60000);
> env.getConfig().setAutoWatermarkInterval(200);
> // 设置Flink SQL环境
> EnvironmentSettings tableEnvSettings =
>
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> // 创建table Env
> StreamTableEnvironment tableEnv =
> StreamTableEnvironment.create(env,
> tableEnvSettings);
> // 设置checkpoint 模型
>
>
> tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,
> CheckpointingMode.EXACTLY_ONCE);
> // 设置checkpoint间隔
>
>
> tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
> Duration.ofMinutes(1));
> // 指定catalog名称
> String catalogName = "devHive";
> // 创建HiveCatalog
> HiveCatalog hiveCatalog = new HiveCatalog(catalogName,
> "default",
> "/home/beggar/tools/apache-hive-3.1.2-bin/conf",
> "/home/beggar/tools/hadoop-3.1.1/etc/hadoop",
> "3.1.2"
> );
> //注册 Hive Catalog
> tableEnv.registerCatalog(catalogName,hiveCatalog);
> //使用hive Catalog
> tableEnv.useCatalog(catalogName);
> //创建mysql cdc 数据源
> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS cdc");
> // 创建mysql cdc 数据表
> tableEnv.executeSql("DROP TABLE IF EXISTS cdc.order_info");
> tableEnv.executeSql("CREATE TABLE cdc.order_info(\n" +
> " id BIGINT,\n" +
> " user_id BIGINT,\n" +
> " create_time TIMESTAMP,\n" +
> " operate_time TIMESTAMP,\n" +
> " province_id INT,\n" +
> " order_status STRING,\n" +
> " total_amount DECIMAL(10, 5)\n" +
> " ) WITH (\n" +
> " 'connector' = 'mysql-cdc',\n" +
> " 'hostname' = 'beggar',\n" +
> " 'port' = '3306',\n" +
> " 'username' = 'root',\n" +
> " 'password' = '123456',\n" +
> " 'database-name' = 'cdc',\n" +
> " 'table-name' = 'order_info'\n" +
> ")");
> // 创建kafka source
> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS kafka");
> tableEnv.executeSql("DROP TABLE IF EXISTS kafka.order_info");
> tableEnv.executeSql("CREATE TABLE kafka.order_info (\n" +
> "id BIGINT,\n" +
> "user_id BIGINT,\n" +
> "create_time TIMESTAMP,\n" +
> "operate_time TIMESTAMP,\n" +
> "province_id INT,\n" +
> "order_status STRING,\n" +
> "total_amount DECIMAL(10, 5)\n" +
> ") WITH (\n" +
> "'connector' = 'kafka',\n" +
> "'topic' = 'order_info',\n" +
> "'scan.startup.mode' = 'earliest-offset',\n" +
> "'properties.bootstrap.servers' = 'beggar.dev:9092',\n" +
> "'format' = 'changelog-json'\n" +
> ")");
> // 向kafka表中插入数据
> tableEnv.executeSql("INSERT INTO kafka.order_info\n" +
> "SELECT id, user_id, create_time,
> operate_time,province_id,order_status,total_amount\n" +
> "FROM cdc.order_info");
> // 自定义带op字段的stream
> Properties kafkaConfig = ConfigUtils.getKafkaConfig();
> FlinkKafkaConsumerBase<String> consumer = new FlinkKafkaConsumer<>(
> "order_info",
> new SimpleStringSchema(),
> kafkaConfig
> ).setStartFromEarliest();
> DataStreamSource<String> streamSource = env.addSource(consumer);
>
>
>
> String[] fieldNames =
>
> {"id","user_id","create_time","operate_time","province_id","order_status","total_amount","op"};
>
> TypeInformation[] types =
>
> {Types.LONG,Types.LONG,Types.STRING,Types.STRING,Types.INT,Types.INT,Types.DOUBLE,Types.STRING};
>
> SingleOutputStreamOperator<Row> ds2 = streamSource.map(new
> MapFunction<String, Row>() {
> @Override
> public Row map(String value) throws Exception {
> Gson gson = new Gson();
> ChangelogVO changelogVO = gson.fromJson(value,
> ChangelogVO.class);
> String op = changelogVO.getOp();
> int arity = fieldNames.length;
> Row row = new Row(arity);
> row.setField(0, changelogVO.getData().getId());
> row.setField(1, changelogVO.getData().getUserId());
> row.setField(2, changelogVO.getData().getCreateTime());
> row.setField(3, changelogVO.getData().getOperateTime());
> row.setField(4, changelogVO.getData().getProviceId());
> row.setField(5, changelogVO.getData().getOrderStatus());
> row.setField(6, changelogVO.getData().getTotalAmount());
> String operation = getOperation(op);
> row.setField(7, operation);
> return row;
> }
>
> private String getOperation(String op) {
> String operation = "INSERT";
> for (RowKind rk : RowKind.values()) {
> if (rk.shortString().equals(op)) {
> switch (rk) {
> case UPDATE_BEFORE:
> operation = "UPDATE-BEFORE";
> break;
> case UPDATE_AFTER:
> operation = "UPDATE-AFTER";
> break;
> case DELETE:
> operation = "DELETE";
> break;
> case INSERT:
> default:
> operation = "INSERT";
> break;
> }
> break;
> }
> }
> return operation;
> }
> }, new RowTypeInfo(types, fieldNames));
> // 设置水印
>
>
> ds2.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(2)));
> tableEnv.createTemporaryView("merged_order_info", ds2);
> tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS ods");
> tableEnv.executeSql("DROP TABLE IF EXISTS ods.order_info");
> tableEnv.executeSql("CREATE TABLE ods.order_info (\n" +
> " id BIGINT,\n" +
> " user_id BIGINT,\n" +
> " create_time STRING,\n" +
> " operate_time STRING,\n" +
> " province_id INT,\n" +
> " order_status INT,\n" +
> " total_amount DOUBLE,\n" +
> " op STRING \n" +
> ") PARTITIONED BY (dt STRING, hr STRING,sec STRING) STORED
> AS parquet TBLPROPERTIES (\n" +
> " 'partition.time-extractor.timestamp-pattern'='$dt
> $hr:$sec:00',\n" +
> " 'sink.partition-commit.trigger'='partition-time',\n" +
> " 'sink.partition-commit.delay'='1 min',\n" +
> "
> 'sink.partition-commit.policy.kind'='metastore,success-file'\n" +
> ")");
> tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
> TableResult tableResult = tableEnv.executeSql("INSERT INTO
> ods.order_info\n" +
> "SELECT \n" +
> "id,\n" +
> "user_id,\n" +
> "create_time,\n" +
> "operate_time,\n" +
> "province_id,\n" +
> "order_status,\n" +
> "total_amount,\n" +
> "op,\n" +
> "DATE_FORMAT(TO_TIMESTAMP(create_time,'yyyy-MM-dd
> HH:mm:ss'),'yyyy-MM-dd') as dt,\n" +
> "DATE_FORMAT(TO_TIMESTAMP(create_time,'yyyy-MM-dd
> HH:mm:ss'),'HH') as hr,\n" +
> "DATE_FORMAT(TO_TIMESTAMP(create_time,'yyyy-MM-dd
> HH:mm:ss'),'mm') as sec\n" +
> "FROM merged_order_info"
> );
> try {
> tableEnv.execute("mysqlcdc to hive");
> } catch (Exception e) {
> e.printStackTrace();
> }
>
>
> }
> }
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>
Re: 回复: flink sql 消费kafka 消息 写入Hive不提交分区
Posted by 花乞丐 <hu...@163.com>.
代码已经附上,我现在是数据已经写入hdfs,有文件生产,但是目前添加的水印无效,所以一直没有更新metastore信息,导致metastore中一直没有分区信息,必须在hive
shell中执行命令:hive (ods)> msck repair table
order_info。之后才可以查询到数据,经过debug发现,在分区提交的时候,需要判断水印的值比从分区提取的值+延迟时间大,才会提交分区,但是现在,水印的值一直是Long.MIN_VALUE,导致一直无法提交水印,我在代码中已经设置了水印,是不是我的水印设置姿势不对,还请指教!
package com.zallsteel.flink.app.log;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
import com.zallsteel.flink.entity.ChangelogVO;
import com.zallsteel.flink.entity.OrderInfo;
import com.zallsteel.flink.utils.ConfigUtils;
import lombok.SneakyThrows;
import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import
org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import java.text.ParseException;
import java.time.Duration;
import java.util.Date;
import java.util.Properties;
/**
* @author Jackie Zhu
* @time 2021-01-13 16:50:18
* @desc 测试MySQLCDC to Hive
*/
public class MySQLCDC2HiveApp {
public static void main(String[] args) {
//获取执行环节
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// 设置并发
env.setParallelism(6);
//设置checkpoint
env.enableCheckpointing(60000);
env.getConfig().setAutoWatermarkInterval(200);
// 设置Flink SQL环境
EnvironmentSettings tableEnvSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
// 创建table Env
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
tableEnvSettings);
// 设置checkpoint 模型
tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,
CheckpointingMode.EXACTLY_ONCE);
// 设置checkpoint间隔
tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
Duration.ofMinutes(1));
// 指定catalog名称
String catalogName = "devHive";
// 创建HiveCatalog
HiveCatalog hiveCatalog = new HiveCatalog(catalogName,
"default",
"/home/beggar/tools/apache-hive-3.1.2-bin/conf",
"/home/beggar/tools/hadoop-3.1.1/etc/hadoop",
"3.1.2"
);
//注册 Hive Catalog
tableEnv.registerCatalog(catalogName,hiveCatalog);
//使用hive Catalog
tableEnv.useCatalog(catalogName);
//创建mysql cdc 数据源
tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS cdc");
// 创建mysql cdc 数据表
tableEnv.executeSql("DROP TABLE IF EXISTS cdc.order_info");
tableEnv.executeSql("CREATE TABLE cdc.order_info(\n" +
" id BIGINT,\n" +
" user_id BIGINT,\n" +
" create_time TIMESTAMP,\n" +
" operate_time TIMESTAMP,\n" +
" province_id INT,\n" +
" order_status STRING,\n" +
" total_amount DECIMAL(10, 5)\n" +
" ) WITH (\n" +
" 'connector' = 'mysql-cdc',\n" +
" 'hostname' = 'beggar',\n" +
" 'port' = '3306',\n" +
" 'username' = 'root',\n" +
" 'password' = '123456',\n" +
" 'database-name' = 'cdc',\n" +
" 'table-name' = 'order_info'\n" +
")");
// 创建kafka source
tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS kafka");
tableEnv.executeSql("DROP TABLE IF EXISTS kafka.order_info");
tableEnv.executeSql("CREATE TABLE kafka.order_info (\n" +
"id BIGINT,\n" +
"user_id BIGINT,\n" +
"create_time TIMESTAMP,\n" +
"operate_time TIMESTAMP,\n" +
"province_id INT,\n" +
"order_status STRING,\n" +
"total_amount DECIMAL(10, 5)\n" +
") WITH (\n" +
"'connector' = 'kafka',\n" +
"'topic' = 'order_info',\n" +
"'scan.startup.mode' = 'earliest-offset',\n" +
"'properties.bootstrap.servers' = 'beggar.dev:9092',\n" +
"'format' = 'changelog-json'\n" +
")");
// 向kafka表中插入数据
tableEnv.executeSql("INSERT INTO kafka.order_info\n" +
"SELECT id, user_id, create_time,
operate_time,province_id,order_status,total_amount\n" +
"FROM cdc.order_info");
// 自定义带op字段的stream
Properties kafkaConfig = ConfigUtils.getKafkaConfig();
FlinkKafkaConsumerBase<String> consumer = new FlinkKafkaConsumer<>(
"order_info",
new SimpleStringSchema(),
kafkaConfig
).setStartFromEarliest();
DataStreamSource<String> streamSource = env.addSource(consumer);
String[] fieldNames =
{"id","user_id","create_time","operate_time","province_id","order_status","total_amount","op"};
TypeInformation[] types =
{Types.LONG,Types.LONG,Types.STRING,Types.STRING,Types.INT,Types.INT,Types.DOUBLE,Types.STRING};
SingleOutputStreamOperator<Row> ds2 = streamSource.map(new
MapFunction<String, Row>() {
@Override
public Row map(String value) throws Exception {
Gson gson = new Gson();
ChangelogVO changelogVO = gson.fromJson(value,
ChangelogVO.class);
String op = changelogVO.getOp();
int arity = fieldNames.length;
Row row = new Row(arity);
row.setField(0, changelogVO.getData().getId());
row.setField(1, changelogVO.getData().getUserId());
row.setField(2, changelogVO.getData().getCreateTime());
row.setField(3, changelogVO.getData().getOperateTime());
row.setField(4, changelogVO.getData().getProviceId());
row.setField(5, changelogVO.getData().getOrderStatus());
row.setField(6, changelogVO.getData().getTotalAmount());
String operation = getOperation(op);
row.setField(7, operation);
return row;
}
private String getOperation(String op) {
String operation = "INSERT";
for (RowKind rk : RowKind.values()) {
if (rk.shortString().equals(op)) {
switch (rk) {
case UPDATE_BEFORE:
operation = "UPDATE-BEFORE";
break;
case UPDATE_AFTER:
operation = "UPDATE-AFTER";
break;
case DELETE:
operation = "DELETE";
break;
case INSERT:
default:
operation = "INSERT";
break;
}
break;
}
}
return operation;
}
}, new RowTypeInfo(types, fieldNames));
// 设置水印
ds2.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(2)));
tableEnv.createTemporaryView("merged_order_info", ds2);
tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS ods");
tableEnv.executeSql("DROP TABLE IF EXISTS ods.order_info");
tableEnv.executeSql("CREATE TABLE ods.order_info (\n" +
" id BIGINT,\n" +
" user_id BIGINT,\n" +
" create_time STRING,\n" +
" operate_time STRING,\n" +
" province_id INT,\n" +
" order_status INT,\n" +
" total_amount DOUBLE,\n" +
" op STRING \n" +
") PARTITIONED BY (dt STRING, hr STRING,sec STRING) STORED
AS parquet TBLPROPERTIES (\n" +
" 'partition.time-extractor.timestamp-pattern'='$dt
$hr:$sec:00',\n" +
" 'sink.partition-commit.trigger'='partition-time',\n" +
" 'sink.partition-commit.delay'='1 min',\n" +
"
'sink.partition-commit.policy.kind'='metastore,success-file'\n" +
")");
tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
TableResult tableResult = tableEnv.executeSql("INSERT INTO
ods.order_info\n" +
"SELECT \n" +
"id,\n" +
"user_id,\n" +
"create_time,\n" +
"operate_time,\n" +
"province_id,\n" +
"order_status,\n" +
"total_amount,\n" +
"op,\n" +
"DATE_FORMAT(TO_TIMESTAMP(create_time,'yyyy-MM-dd
HH:mm:ss'),'yyyy-MM-dd') as dt,\n" +
"DATE_FORMAT(TO_TIMESTAMP(create_time,'yyyy-MM-dd
HH:mm:ss'),'HH') as hr,\n" +
"DATE_FORMAT(TO_TIMESTAMP(create_time,'yyyy-MM-dd
HH:mm:ss'),'mm') as sec\n" +
"FROM merged_order_info"
);
try {
tableEnv.execute("mysqlcdc to hive");
} catch (Exception e) {
e.printStackTrace();
}
}
}
--
Sent from: http://apache-flink.147419.n8.nabble.com/
回复: flink sql 消费kafka 消息 写入Hive不提交分区
Posted by 刘小红 <18...@163.com>.
贴下代码,看下你是怎么使用的
| |
刘小红
|
|
18500348251@163.com
|
签名由网易邮箱大师定制
在2021年1月15日 17:40,xufengfeng<50...@qq.com> 写道:
2021年1月15日 下午3:19,花乞丐 <hu...@163.com> 写道:
我这边从kafka消费信息,然后写入到Hive中,目前发现不提交分区,不提交分区的原因是watemark是负数的,不清楚这个负数的watermark是怎么出现的?
<http://apache-flink.147419.n8.nabble.com/file/t1257/E4142DB1-E410-43e8-8653-2B90D0A998EA.png>
我代码也指定了watermark,但是debug的时候好像没有起作用
--
Sent from: http://apache-flink.147419.n8.nabble.com/
会不会是数据里时间有未来时间呢?我之前遇到过一次
Re: flink sql 消费kafka 消息 写入Hive不提交分区
Posted by xufengfeng <50...@qq.com>.
> 2021年1月15日 下午3:19,花乞丐 <hu...@163.com> 写道:
>
> 我这边从kafka消费信息,然后写入到Hive中,目前发现不提交分区,不提交分区的原因是watemark是负数的,不清楚这个负数的watermark是怎么出现的?
> <http://apache-flink.147419.n8.nabble.com/file/t1257/E4142DB1-E410-43e8-8653-2B90D0A998EA.png>
> 我代码也指定了watermark,但是debug的时候好像没有起作用
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
会不会是数据里时间有未来时间呢?我之前遇到过一次