You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 花乞丐 <hu...@163.com> on 2021/01/19 04:01:07 UTC

flink sql任务提交,sql一直只执行一个

现在我使用flink cdc
读取mysql的binlog,然后发送至kafak,使用flink读取kafka消息,最后写入hive中,但是现在我在向yarn提交代码的时候,发现提交了两个job,但是,两个job执行的都是insert
into kafka.order_info;一直不执行insert into
ods.order_info;程序目前也没有任何报错!代码如下,是我提交job的姿势不对吗,还是什么其他的问题?提交命令:flink run -m
yarn-client -ynm mysql-cdc-2-hive -ys 3 -yjm 4g -ytm 8g -c
com.zallsteel.flink.app.log.MySQLCDC2HiveApp -d
/opt/tools/flink-1.12.0/zallsteel-realtime-etl-1.0-SNAPSHOT.jar

package com.zallsteel.flink.app.log;


import com.google.gson.Gson;
import com.zallsteel.flink.entity.ChangelogVO;
import com.zallsteel.flink.utils.ConfigUtils;

import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;

import java.text.ParseException;
import java.time.Duration;
import java.util.Properties;

/**
 *
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemOptions.java
 */
public class MySQLCDC2HiveApp {
    public static void main(String[] args) {
        //获取执行环节
        StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置并发
        env.setParallelism(6);
        //设置checkpoint
        env.enableCheckpointing(60000);

        env.getConfig().setAutoWatermarkInterval(200);
        // 设置Flink SQL环境
        EnvironmentSettings tableEnvSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        // 创建table Env
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
tableEnvSettings);
        // 设置checkpoint 模型
       
tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,
                CheckpointingMode.EXACTLY_ONCE);
        // 设置checkpoint间隔
       
tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
                Duration.ofMinutes(1));
        // 指定catalog名称
        String catalogName = "devHive";
        // 创建HiveCatalog
        HiveCatalog hiveCatalog = new HiveCatalog(
                catalogName,
                "default",
                "/etc/hive/conf"
        );
        //注册 Hive Catalog
        tableEnv.registerCatalog(catalogName,hiveCatalog);
        //使用hive Catalog
        tableEnv.useCatalog(catalogName);
        //创建mysql cdc 数据源
        tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS cdc");
        // 创建mysql cdc 数据表
        tableEnv.executeSql("DROP TABLE IF EXISTS cdc.order_info");
        tableEnv.executeSql("CREATE TABLE cdc.order_info(\n" +
                "    id BIGINT,\n" +
                "    user_id BIGINT,\n" +
                "    create_time TIMESTAMP,\n" +
                "    operate_time TIMESTAMP,\n" +
                "    province_id INT,\n" +
                "    order_status STRING,\n" +
                "    total_amount DECIMAL(10, 5)\n" +
                "  ) WITH (\n" +
                "    'connector' = 'mysql-cdc',\n" +
                "    'hostname' = 'hdp-xxx-dev-node01',\n" +
                "    'port' = '3306',\n" +
                "    'username' = 'root',\n" +
                "    'password' = 'phkC4DE4dM28$PUD',\n" +
                "    'database-name' = 'cdc_test',\n" +
                "    'table-name' = 'order_info'\n" +
                ")");
        // 创建kafka source
        tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS kafka");
        tableEnv.executeSql("DROP TABLE IF EXISTS kafka.order_info");
        tableEnv.executeSql("CREATE TABLE kafka.order_info (\n" +
                "id BIGINT,\n" +
                "user_id BIGINT,\n" +
                "create_time TIMESTAMP,\n" +
                "operate_time TIMESTAMP,\n" +
                "province_id INT,\n" +
                "order_status STRING,\n" +
                "total_amount DECIMAL(10, 5)\n" +
                ") WITH (\n" +
                "'connector' = 'kafka',\n" +
                "'topic' = 'order_info',\n" +
                "'scan.startup.mode' = 'earliest-offset',\n" +
                "'properties.bootstrap.servers' =
'hdp-xxx-dev-node03:9092',\n" +
                "'format' = 'changelog-json'\n" +
                ")");
        // 向kafka表中插入数据
        tableEnv.executeSql("INSERT INTO kafka.order_info\n" +
                "SELECT id, user_id, create_time,
operate_time,province_id,order_status,total_amount\n" +
                "FROM cdc.order_info");
        // 自定义带op字段的stream
        Properties kafkaConfig = ConfigUtils.getKafkaConfig();
        FlinkKafkaConsumerBase<String> consumer = new FlinkKafkaConsumer<>(
                "order_info",
                new SimpleStringSchema(),
                kafkaConfig
        ).setStartFromEarliest();
        DataStreamSource<String> streamSource = env.addSource(consumer);



        String[] fieldNames  =
{"id","user_id","create_time","operate_time","province_id","order_status","total_amount","op"};

        TypeInformation[] types =
{Types.LONG,Types.LONG,Types.STRING,Types.STRING,Types.INT,Types.INT,Types.DOUBLE,Types.STRING};

        SingleOutputStreamOperator<Row> ds2 = streamSource.map(new
MapFunction<String, Row>() {
            @Override
            public Row map(String value) throws Exception {
                Gson gson = new Gson();
                ChangelogVO changelogVO = gson.fromJson(value,
ChangelogVO.class);
                String op = changelogVO.getOp();
                int arity = fieldNames.length;
                Row row = new Row(arity);
                row.setField(0, changelogVO.getData().getId());
                row.setField(1, changelogVO.getData().getUserId());
                row.setField(2, changelogVO.getData().getCreateTime());
                row.setField(3, changelogVO.getData().getOperateTime());
                row.setField(4, changelogVO.getData().getProviceId());
                row.setField(5, changelogVO.getData().getOrderStatus());
                row.setField(6, changelogVO.getData().getTotalAmount());
                String operation = getOperation(op);
                row.setField(7, operation);
                return row;
            }

            private String getOperation(String op) {
                String operation = "INSERT";
                for (RowKind rk : RowKind.values()) {
                    if (rk.shortString().equals(op)) {
                        switch (rk) {
                            case UPDATE_BEFORE:
                                operation = "UPDATE-BEFORE";
                                break;
                            case UPDATE_AFTER:
                                operation = "UPDATE-AFTER";
                                break;
                            case DELETE:
                                operation = "DELETE";
                                break;
                            case INSERT:
                            default:
                                operation = "INSERT";
                                break;
                        }
                        break;
                    }
                }
                return operation;
            }
        }, new RowTypeInfo(types, fieldNames));
        // 设置水印
        SingleOutputStreamOperator<Row> ds3 =
ds2.assignTimestampsAndWatermarks(
               
WatermarkStrategy.<Row>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                        .withTimestampAssigner(new
SerializableTimestampAssigner<Row>() {
                            @Override
                            public long extractTimestamp(Row element, long
recordTimestamp) {
                                String create_time = (String)
element.getField(2);
                                FastDateFormat dateFormat =
FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss");
                                try {
                                    long time =
dateFormat.parse(create_time).getTime();
                                    return time;
                                } catch (ParseException e) {
                                    e.printStackTrace();
                                }
                                return 0;
                            }
                        })
        );
        tableEnv.createTemporaryView("merged_order_info", ds3);
        tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
        tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS ods");
        tableEnv.executeSql("DROP TABLE IF EXISTS ods.order_info");
        tableEnv.executeSql("CREATE TABLE ods.order_info (\n" +
                "  id BIGINT,\n" +
                "   user_id BIGINT,\n" +
                "   create_time STRING,\n" +
                "   operate_time STRING,\n" +
                "   province_id INT,\n" +
                "   order_status INT,\n" +
                "   total_amount DOUBLE,\n" +
                "   op STRING \n" +
                ") PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet
TBLPROPERTIES (\n" +
                "  'partition.time-extractor.timestamp-pattern'='$dt
$hr:00:00',\n" +
                "  'sink.partition-commit.trigger'='partition-time',\n" +
                "  'sink.partition-commit.delay'='1 min',\n" +
                " 
'sink.partition-commit.policy.kind'='metastore,success-file'\n" +
                ")");
        tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
        tableEnv.executeSql("INSERT INTO ods.order_info\n" +
                "SELECT \n" +
                "id,\n" +
                "user_id,\n" +
                "create_time,\n" +
                "operate_time,\n" +
                "province_id,\n" +
                "order_status,\n" +
                "total_amount,\n" +
                "op,\n" +
                "DATE_FORMAT(TO_TIMESTAMP(create_time,'yyyy-MM-dd
HH:mm:ss'),'yyyy-MM-dd') as dt,\n" +
                "DATE_FORMAT(TO_TIMESTAMP(create_time,'yyyy-MM-dd
HH:mm:ss'),'HH') as hr\n" +
                "FROM merged_order_info"
        );
    }
}



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink sql任务提交,sql一直只执行一个

Posted by 花乞丐 <hu...@163.com>.
目前我所了解的Flink SQL 触发任务提交的是execcuteSql()中的insert into
以及StatementSet.execute()。我之前将两个insert into
分为两个executeSql进行提交,结果结果出现上述效果,修改成StatementSet.execute()之后得到了想要的效果,不太清楚这两种有什么区别?暂记录一下,待以后分析。小白学Flink真是好多坑

package com.zallsteel.flink.app.log;


import com.google.gson.Gson;
import com.zallsteel.flink.utils.ConfigUtils;


import com.google.gson.Gson;
import com.zallsteel.flink.entity.ChangelogVO;
import com.zallsteel.flink.utils.ConfigUtils;

import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.CheckpointingMode;

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;


import java.text.ParseException;

import java.time.Duration;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;

/**
 * @author Jackie Zhu
 * @time 2021-01-13 16:50:18
 * @desc 测试MySQLCDC to Hive
flink run \
-m yarn-cluster \
-ys 2 \
-yjm 2g \
-ytm 4g \
-c com.zallsteel.flink.app.log.MySQLCDC2HiveApp \
-d \
/opt/tools/flink-1.12.0/zallsteel-realtime-etl-1.0-SNAPSHOT.jar
 */
public class MySQLCDC2HiveApp {
    public static void main(String[] args) {
        //获取执行环节
        StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置并发
        env.setParallelism(6);
        //设置checkpoint
        env.enableCheckpointing(60000);

        env.getConfig().setAutoWatermarkInterval(200);
        // 设置Flink SQL环境
        EnvironmentSettings tableEnvSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        // 创建table Env
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
tableEnvSettings);
        // 设置checkpoint 模型
       
tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,
                CheckpointingMode.EXACTLY_ONCE);
        // 设置checkpoint间隔
       
tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
                Duration.ofMinutes(1));
        // 指定catalog名称
        String catalogName = "devHive";
        // 创建HiveCatalog
        HiveCatalog hiveCatalog = new HiveCatalog(catalogName,
                "default",
                "/etc/hive/conf",
                "/etc/hadoop/conf",
                "3.1.2"
        );
        //注册 Hive Catalog
        tableEnv.registerCatalog(catalogName,hiveCatalog);
        //使用hive Catalog
        tableEnv.useCatalog(catalogName);
        //创建mysql cdc 数据源
        tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS cdc");
        // 创建mysql cdc 数据表
        tableEnv.executeSql("DROP TABLE IF EXISTS cdc.order_info");
        tableEnv.executeSql("CREATE TABLE cdc.order_info(\n" +
                "    id BIGINT,\n" +
                "    user_id BIGINT,\n" +
                "    create_time TIMESTAMP,\n" +
                "    operate_time TIMESTAMP,\n" +
                "    province_id INT,\n" +
                "    order_status STRING,\n" +
                "    total_amount DECIMAL(10, 5)\n" +
                "  ) WITH (\n" +
                "    'connector' = 'mysql-cdc',\n" +
                "    'hostname' = 'hdp-xxx-dev-node01',\n" +
                "    'port' = '3306',\n" +
                "    'username' = 'xxx',\n" +
                "    'password' = 'xxxx',\n" +
                "    'database-name' = 'cdc_test',\n" +
                "    'table-name' = 'order_info'\n" +
                ")");
        // 创建kafka source
        tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS kafka");
        tableEnv.executeSql("DROP TABLE IF EXISTS kafka.order_info");
        tableEnv.executeSql("CREATE TABLE kafka.order_info (\n" +
                "id BIGINT,\n" +
                "user_id BIGINT,\n" +
                "create_time TIMESTAMP,\n" +
                "operate_time TIMESTAMP,\n" +
                "province_id INT,\n" +
                "order_status STRING,\n" +
                "total_amount DECIMAL(10, 5)\n" +
                ") WITH (\n" +
                "'connector' = 'kafka',\n" +
                "'topic' = 'order_info',\n" +
                "'scan.startup.mode' = 'earliest-offset',\n" +
                "'properties.bootstrap.servers' =
'hdp-xxx-dev-node03:9092,hdp-xxx-dev-node04:9092,hdp-xxx-dev-node05:9092',\n"
+
                "'format' = 'changelog-json'\n" +
                ")");
        // 向kafka表中插入数据
//        tableEnv.executeSql("INSERT INTO kafka.order_info\n" +
//                "SELECT id, user_id, create_time,
operate_time,province_id,order_status,total_amount\n" +
//                "FROM cdc.order_info");
        // 自定义带op字段的stream
        Properties kafkaConfig = ConfigUtils.getKafkaConfig();
        FlinkKafkaConsumerBase<String> consumer = new FlinkKafkaConsumer<>(
                "order_info",
                new SimpleStringSchema(),
                kafkaConfig
        ).setStartFromEarliest();

        DataStreamSource<String> streamSource = env.addSource(consumer);

        String[] fieldNames  =
{"id","user_id","create_time","operate_time","province_id","order_status","total_amount","op"};

        TypeInformation[] types =
{Types.LONG,Types.LONG,Types.STRING,Types.STRING,Types.INT,Types.INT,Types.DOUBLE,Types.STRING};

        SingleOutputStreamOperator<Row> ds2 = streamSource.map(new
MapFunction<String, Row>() {
            @Override
            public Row map(String value) throws Exception {
                Gson gson = new Gson();
                ChangelogVO changelogVO = gson.fromJson(value,
ChangelogVO.class);
                String op = changelogVO.getOp();
                int arity = fieldNames.length;
                Row row = new Row(arity);
                row.setField(0, changelogVO.getData().getId());
                row.setField(1, changelogVO.getData().getUserId());
                row.setField(2, changelogVO.getData().getCreateTime());
                row.setField(3, changelogVO.getData().getOperateTime());
                row.setField(4, changelogVO.getData().getProviceId());
                row.setField(5, changelogVO.getData().getOrderStatus());
                row.setField(6, changelogVO.getData().getTotalAmount());
                String operation = getOperation(op);
                row.setField(7, operation);
                return row;
            }

            private String getOperation(String op) {
                String operation = "INSERT";
                for (RowKind rk : RowKind.values()) {
                    if (rk.shortString().equals(op)) {
                        switch (rk) {
                            case UPDATE_BEFORE:
                                operation = "UPDATE-BEFORE";
                                break;
                            case UPDATE_AFTER:
                                operation = "UPDATE-AFTER";
                                break;
                            case DELETE:
                                operation = "DELETE";
                                break;
                            case INSERT:
                            default:
                                operation = "INSERT";
                                break;
                        }
                        break;
                    }
                }
                return operation;
            }
        }, new RowTypeInfo(types, fieldNames));

        // 设置水印
        SingleOutputStreamOperator<Row> ds3 =
ds2.assignTimestampsAndWatermarks(
               
WatermarkStrategy.<Row>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                        .withTimestampAssigner(new
SerializableTimestampAssigner<Row>() {
                            @Override
                            public long extractTimestamp(Row element, long
recordTimestamp) {
                                String create_time = (String)
element.getField(2);
                                FastDateFormat dateFormat =
FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss");
                                try {
                                    long time =
dateFormat.parse(create_time).getTime();
                                    return time;
                                } catch (ParseException e) {
                                    e.printStackTrace();
                                }
                                return 0;
                            }
                        })
        );
        tableEnv.createTemporaryView("merged_order_info", ds3);
        tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
        tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS ods");
        tableEnv.executeSql("DROP TABLE IF EXISTS ods.order_info");
        tableEnv.executeSql("CREATE TABLE ods.order_info (\n" +
                "  id BIGINT,\n" +
                "   user_id BIGINT,\n" +
                "   create_time STRING,\n" +
                "   operate_time STRING,\n" +
                "   province_id INT,\n" +
                "   order_status INT,\n" +
                "   total_amount DOUBLE,\n" +
                "   op STRING \n" +
                ") PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet
TBLPROPERTIES (\n" +
                "  'partition.time-extractor.timestamp-pattern'='$dt
$hr:00:00',\n" +
                "  'sink.partition-commit.trigger'='partition-time',\n" +
                "  'sink.partition-commit.delay'='1 min',\n" +
                " 
'sink.partition-commit.policy.kind'='metastore,success-file'\n" +
                ")");
        tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
        *StatementSet statementSet = tableEnv.createStatementSet();
        statementSet.addInsertSql("INSERT INTO kafka.order_info\n" +
                "SELECT id, user_id, create_time,
operate_time,province_id,order_status,total_amount\n" +
                "FROM cdc.order_info")
                .addInsertSql("INSERT INTO ods.order_info\n" +
                        "SELECT " +
                        "id," +
                        "user_id," +
                        "create_time," +
                        "operate_time," +
                        "province_id," +
                        "order_status," +
                        "total_amount," +
                        "op," +
                        "DATE_FORMAT(TO_TIMESTAMP(create_time,'yyyy-MM-dd
HH:mm:ss'),'yyyy-MM-dd') as dt," +
                        "DATE_FORMAT(TO_TIMESTAMP(create_time,'yyyy-MM-dd
HH:mm:ss'),'HH') as hr " +
                        "FROM merged_order_info");*
        CompletableFuture<JobStatus> jobStatus =
statementSet.execute().getJobClient().get().getJobStatus();
        System.out.println(jobStatus);
//        TableResult tableResult = tableEnv.executeSql(
//        );
    }
}




--
Sent from: http://apache-flink.147419.n8.nabble.com/