You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 花乞丐 <hu...@163.com> on 2021/01/28 07:18:44 UTC
关于Flink程序既有流又有SQL,提交任务到yarn之后没有水印生成
目前我在本地执行Flink程序是可以将kafka中的数据消费到,而且可以成功写入到hive中,但是,当我提交任务到yarn之后,从Flink Web
UI看所有的sink都是 no
watermark的状态,但是去查看hdfs的文件,是成功写入数据的,但是没有提交分区到metastore和提交success文件,所以也就是水印没有作用,但是在本地可以的,怎么在yarn反而不行了!
<http://apache-flink.147419.n8.nabble.com/file/t1257/%E5%BE%AE%E4%BF%A1%E5%9B%BE%E7%89%87_20210128151439.png>
代码如下所示,第一次使用Flink,是我使用的姿势不对吗:
package com.xxxxx.flink.app.incr;
import com.alibaba.otter.canal.protocol.FlatMessage;
import com.xxxxx.flink.contranst.TopicPattern;
import com.xxxxx.flink.executions.TradeOrderExecutions;
import com.xxxxx.flink.function.RowsFlatMapFunction;
import com.xxxxx.flink.schema.FlatMessageSchema;
import com.xxxxx.flink.utils.ConfigUtils;
import com.xxxxx.flink.utils.TableResolveUtils;
import com.xxxxx.flink.watermark.RowWatermarkAssigner;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.types.Row;
import java.time.Duration;
/**
flink run \
-m yarn-cluster \
-ys 2 \
-yjm 2g \
-ytm 4g \
-c com.xxxxx.flink.app.incr.TradeOrderBinlogResolveApp \
-d \
/opt/tools/flink-1.12.0/xxxxx-realtime-etl-1.0-SNAPSHOT.jar
*/
public class TradeOrderBinlogResolveApp {
public static void main(String[] args) {
//获取执行环节
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// 设置并发
env.setParallelism(8);
//设置checkpoint
env.enableCheckpointing(60000);
// 设置水印生产的时间间隔
env.getConfig().setAutoWatermarkInterval(200);
// 设置Flink SQL环境
EnvironmentSettings tableEnvSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
// 创建table Env
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
tableEnvSettings);
// 设置checkpoint 模型
tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,
CheckpointingMode.EXACTLY_ONCE);
// 设置checkpoint间隔
tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
Duration.ofMinutes(1));
// 指定catalog名称
String catalogName = "devHive";
// 创建HiveCatalog
HiveCatalog hiveCatalog = new HiveCatalog(catalogName,
"default",
ConfigUtils.HIVE_CONF_DIR,
ConfigUtils.HADOOP_CONF_DIR,
ConfigUtils.HIVE_VERSION
);
//注册 Hive Catalog
tableEnv.registerCatalog(catalogName,hiveCatalog);
//使用hive Catalog
tableEnv.useCatalog(catalogName);
//获取表格的schema信息
RowTypeInfo tradeOrderTypes =
TableResolveUtils.getRowTypeinformations("ods.trade_order_incr",tableEnv);
RowTypeInfo tradeOrderItemTypes =
TableResolveUtils.getRowTypeinformations("ods.trade_order_item_incr",tableEnv);
RowTypeInfo tradeRealDeliveryTypes =
TableResolveUtils.getRowTypeinformations("ods.trade_real_delivery_incr",tableEnv);
RowTypeInfo tradeSteelItemTypes =
TableResolveUtils.getRowTypeinformations("ods.trade_steel_item_incr",tableEnv);
//构建kafka消费者,消费非资金业务topic
FlinkKafkaConsumerBase<FlatMessage> messages = new
FlinkKafkaConsumer<>(TopicPattern.TRADE_PATTERN,
new FlatMessageSchema(),
ConfigUtils.getKafkaConfig())
.setStartFromEarliest();
//给每一条增加水印
FlinkKafkaConsumerBase<FlatMessage> messagesWaters =
messages.assignTimestampsAndWatermarks(
WatermarkStrategy.<FlatMessage>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner(
new
SerializableTimestampAssigner<FlatMessage>() {
@Override
public long extractTimestamp(FlatMessage
element, long recordTimestamp) {
return element.getEs();
}
}
)
);
// 添加数据源
DataStreamSource<FlatMessage> messageSources =
env.addSource(messagesWaters);
// kafka消息进行转换,trade_order,并按照制定类型转换成Row对象
SingleOutputStreamOperator<Row> tradeOrder = messageSources.filter(x
-> "trade_order".equalsIgnoreCase(x.getTable()))
.flatMap(new RowsFlatMapFunction(), tradeOrderTypes);
// trade_order_item
SingleOutputStreamOperator<Row> tradeOrderItem =
messageSources.filter(x ->
"trade_order_item".equalsIgnoreCase(x.getTable()))
.flatMap(new RowsFlatMapFunction(), tradeOrderItemTypes);
//trade_real_delivery
SingleOutputStreamOperator<Row> tradeRealDelivery =
messageSources.filter(x ->
"trade_real_delivery".equalsIgnoreCase(x.getTable()))
.flatMap(new RowsFlatMapFunction(), tradeRealDeliveryTypes);
SingleOutputStreamOperator<Row> tradeSteelItem =
messageSources.filter(x ->
"trade_steel_item".equalsIgnoreCase(x.getTable()))
.flatMap(new RowsFlatMapFunction(), tradeSteelItemTypes);
// 将流注册为临时表
tableEnv.createTemporaryView("trade_order_tmp",tradeOrder);
tableEnv.createTemporaryView("trade_order_item_tmp",tradeOrderItem);
tableEnv.createTemporaryView("trade_real_delivery_tmp",tradeRealDelivery);
tableEnv.createTemporaryView("trade_steel_item_tmp",tradeSteelItem);
// 将临时表中的数据存入Hive,拼接多个insert into,一起执行
StatementSet statementSql =
TradeOrderExecutions.getStatementSql(tableEnv);
// 执行sql
TableResult execute = statementSql.execute();
try {
// 开启流应用
env.execute();
} catch (Exception e) {
e.printStackTrace();
}
}
}
--
Sent from: http://apache-flink.147419.n8.nabble.com/