You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by Asahi Lee <97...@qq.com> on 2021/03/09 12:01:48 UTC

flink 1.12.2版本,批环境和流批一体模式下的批模式的数据处理方式

你好!&nbsp; &nbsp; &nbsp; &nbsp; 在flink 1.12版本之后,flink实现真正的流批一体模式,推荐使用DataStream API,然后通过设置运行模式为RuntimeExecutionMode.BATCH,用于执行批处理。
     如下程序示例,这两种批模式的运行有着不同的方式和性能,请问在流批一体的模式下,如何做到真正的批处理?
package com.meritdata.cloud.tempo.dw.flink.test.bug;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class JDBCTest {

    public static void main(String[] args) {
        test();
        /**
         * 使用批环境
         * EnvironmentSettings bbSettings = EnvironmentSettings.newInstance()
         * .useBlinkPlanner().inBatchMode().build();
         *         TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);
         * +--------------------------------+----------------------+
         * |                              a |               EXPR$1 |
         * +--------------------------------+----------------------+
         * |                              2 |                    1 |
         * |                              3 |                    2 |
         * |                              1 |                    2 |
         * |                              4 |                    1 |
         * +--------------------------------+----------------------+
         */
//        test1();

        /**
         * 使用流API的批模式
         * StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
         *         streamEnv.setRuntimeMode(RuntimeExecutionMode.BATCH);
         *         StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv);
         * +----+--------------------------------+----------------------+
         * | op |                              a |               EXPR$1 |
         * +----+--------------------------------+----------------------+
         * | +I |                              2 |                    1 |
         * | +I |                              1 |                    1 |
         * | +I |                              4 |                    1 |
         * | -U |                              1 |                    1 |
         * | +U |                              1 |                    2 |
         * | +I |                              3 |                    1 |
         * | -U |                              3 |                    1 |
         * | +U |                              3 |                    2 |
         * +----+--------------------------------+----------------------+
         */
    }

    public static void test() {
        EnvironmentSettings bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
        TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);

        bbTableEnv.executeSql("CREATE TABLE ab (" +
                "  a STRING, " +
                "  b INT " +
                ") WITH (" +
                "   'connector' = 'jdbc'," +
                "   'url' = 'jdbc:mysql://localhost:3306/a?serverTimezone=UTC'," +
                "   'username' = 'root'," +
                "   'password' = 'root'," +
                "   'table-name' = 'ab'" +
                " )");

        bbTableEnv.sqlQuery("select a, count(b) from ab group by a").execute().print();

    }

    public static void test1() {
        StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        streamEnv.setRuntimeMode(RuntimeExecutionMode.BATCH);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv);

        tableEnv.executeSql("CREATE TABLE ab (" +
                "  a STRING, " +
                "  b INT " +
                ") WITH (" +
                "   'connector' = 'jdbc'," +
                "   'url' = 'jdbc:mysql://localhost:3306/a?serverTimezone=UTC'," +
                "   'username' = 'root'," +
                "   'password' = 'root'," +
                "   'table-name' = 'ab'" +
                " )");

        tableEnv.sqlQuery("select a, count(b) from ab group by a").execute().print();
    }

}