You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 花乞丐 <hu...@163.com> on 2021/01/28 07:18:44 UTC

关于Flink程序既有流又有SQL,提交任务到yarn之后没有水印生成

目前我在本地执行Flink程序是可以将kafka中的数据消费到,而且可以成功写入到hive中,但是,当我提交任务到yarn之后,从Flink Web
UI看所有的sink都是 no
watermark的状态,但是去查看hdfs的文件,是成功写入数据的,但是没有提交分区到metastore和提交success文件,所以也就是水印没有作用,但是在本地可以的,怎么在yarn反而不行了!
<http://apache-flink.147419.n8.nabble.com/file/t1257/%E5%BE%AE%E4%BF%A1%E5%9B%BE%E7%89%87_20210128151439.png> 
代码如下所示,第一次使用Flink,是我使用的姿势不对吗:
package com.xxxxx.flink.app.incr;

import com.alibaba.otter.canal.protocol.FlatMessage;
import com.xxxxx.flink.contranst.TopicPattern;
import com.xxxxx.flink.executions.TradeOrderExecutions;
import com.xxxxx.flink.function.RowsFlatMapFunction;
import com.xxxxx.flink.schema.FlatMessageSchema;
import com.xxxxx.flink.utils.ConfigUtils;
import com.xxxxx.flink.utils.TableResolveUtils;
import com.xxxxx.flink.watermark.RowWatermarkAssigner;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.types.Row;

import java.time.Duration;

/**

flink run \
-m yarn-cluster \
-ys 2 \
-yjm 2g \
-ytm 4g \
-c com.xxxxx.flink.app.incr.TradeOrderBinlogResolveApp \
-d \
/opt/tools/flink-1.12.0/xxxxx-realtime-etl-1.0-SNAPSHOT.jar

 */
public class TradeOrderBinlogResolveApp {
    public static void main(String[] args) {
        //获取执行环节
        StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置并发
        env.setParallelism(8);
        //设置checkpoint
        env.enableCheckpointing(60000);
        // 设置水印生产的时间间隔
        env.getConfig().setAutoWatermarkInterval(200);
        // 设置Flink SQL环境
        EnvironmentSettings tableEnvSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        // 创建table Env
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
tableEnvSettings);
        // 设置checkpoint 模型
       
tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,
                CheckpointingMode.EXACTLY_ONCE);
        // 设置checkpoint间隔
       
tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
                Duration.ofMinutes(1));
        // 指定catalog名称
        String catalogName = "devHive";
        // 创建HiveCatalog
        HiveCatalog hiveCatalog = new HiveCatalog(catalogName,
                "default",
                ConfigUtils.HIVE_CONF_DIR,
                ConfigUtils.HADOOP_CONF_DIR,
                ConfigUtils.HIVE_VERSION
        );
        //注册 Hive Catalog
        tableEnv.registerCatalog(catalogName,hiveCatalog);
        //使用hive Catalog
        tableEnv.useCatalog(catalogName);

        //获取表格的schema信息
        RowTypeInfo tradeOrderTypes =
TableResolveUtils.getRowTypeinformations("ods.trade_order_incr",tableEnv);
        RowTypeInfo tradeOrderItemTypes =
TableResolveUtils.getRowTypeinformations("ods.trade_order_item_incr",tableEnv);
        RowTypeInfo tradeRealDeliveryTypes =
TableResolveUtils.getRowTypeinformations("ods.trade_real_delivery_incr",tableEnv);
        RowTypeInfo tradeSteelItemTypes =
TableResolveUtils.getRowTypeinformations("ods.trade_steel_item_incr",tableEnv);

        //构建kafka消费者,消费非资金业务topic
        FlinkKafkaConsumerBase<FlatMessage> messages = new
FlinkKafkaConsumer<>(TopicPattern.TRADE_PATTERN,
                new FlatMessageSchema(),
                ConfigUtils.getKafkaConfig())
                .setStartFromEarliest();
        //给每一条增加水印
        FlinkKafkaConsumerBase<FlatMessage> messagesWaters =
messages.assignTimestampsAndWatermarks(
               
WatermarkStrategy.<FlatMessage>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                        .withTimestampAssigner(
                                new
SerializableTimestampAssigner<FlatMessage>() {
                                    @Override
                                    public long extractTimestamp(FlatMessage
element, long recordTimestamp) {
                                        return element.getEs();
                                    }
                                }
                        )
        );


        // 添加数据源
        DataStreamSource<FlatMessage> messageSources =
env.addSource(messagesWaters);
        // kafka消息进行转换,trade_order,并按照制定类型转换成Row对象
        SingleOutputStreamOperator<Row> tradeOrder = messageSources.filter(x
-> "trade_order".equalsIgnoreCase(x.getTable()))
                .flatMap(new RowsFlatMapFunction(), tradeOrderTypes);
        // trade_order_item
        SingleOutputStreamOperator<Row> tradeOrderItem =
messageSources.filter(x ->
"trade_order_item".equalsIgnoreCase(x.getTable()))
                .flatMap(new RowsFlatMapFunction(), tradeOrderItemTypes);
        //trade_real_delivery
        SingleOutputStreamOperator<Row> tradeRealDelivery =
messageSources.filter(x ->
"trade_real_delivery".equalsIgnoreCase(x.getTable()))
                .flatMap(new RowsFlatMapFunction(), tradeRealDeliveryTypes);
        SingleOutputStreamOperator<Row> tradeSteelItem =
messageSources.filter(x ->
"trade_steel_item".equalsIgnoreCase(x.getTable()))
                .flatMap(new RowsFlatMapFunction(), tradeSteelItemTypes);

        // 将流注册为临时表
        tableEnv.createTemporaryView("trade_order_tmp",tradeOrder);
        tableEnv.createTemporaryView("trade_order_item_tmp",tradeOrderItem);
       
tableEnv.createTemporaryView("trade_real_delivery_tmp",tradeRealDelivery);
        tableEnv.createTemporaryView("trade_steel_item_tmp",tradeSteelItem);
        // 将临时表中的数据存入Hive,拼接多个insert into,一起执行
        StatementSet statementSql =
TradeOrderExecutions.getStatementSql(tableEnv);
        // 执行sql
        TableResult execute = statementSql.execute();

        try {
           // 开启流应用
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}




--
Sent from: http://apache-flink.147419.n8.nabble.com/