You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by Asahi Lee <97...@qq.com> on 2021/03/09 12:01:48 UTC
flink 1.12.2版本,批环境和流批一体模式下的批模式的数据处理方式
你好! 在flink 1.12版本之后,flink实现真正的流批一体模式,推荐使用DataStream API,然后通过设置运行模式为RuntimeExecutionMode.BATCH,用于执行批处理。
如下程序示例,这两种批模式的运行有着不同的方式和性能,请问在流批一体的模式下,如何做到真正的批处理?
package com.meritdata.cloud.tempo.dw.flink.test.bug;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class JDBCTest {
public static void main(String[] args) {
test();
/**
* 使用批环境
* EnvironmentSettings bbSettings = EnvironmentSettings.newInstance()
* .useBlinkPlanner().inBatchMode().build();
* TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);
* +--------------------------------+----------------------+
* | a | EXPR$1 |
* +--------------------------------+----------------------+
* | 2 | 1 |
* | 3 | 2 |
* | 1 | 2 |
* | 4 | 1 |
* +--------------------------------+----------------------+
*/
// test1();
/**
* 使用流API的批模式
* StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
* streamEnv.setRuntimeMode(RuntimeExecutionMode.BATCH);
* StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv);
* +----+--------------------------------+----------------------+
* | op | a | EXPR$1 |
* +----+--------------------------------+----------------------+
* | +I | 2 | 1 |
* | +I | 1 | 1 |
* | +I | 4 | 1 |
* | -U | 1 | 1 |
* | +U | 1 | 2 |
* | +I | 3 | 1 |
* | -U | 3 | 1 |
* | +U | 3 | 2 |
* +----+--------------------------------+----------------------+
*/
}
public static void test() {
EnvironmentSettings bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);
bbTableEnv.executeSql("CREATE TABLE ab (" +
" a STRING, " +
" b INT " +
") WITH (" +
" 'connector' = 'jdbc'," +
" 'url' = 'jdbc:mysql://localhost:3306/a?serverTimezone=UTC'," +
" 'username' = 'root'," +
" 'password' = 'root'," +
" 'table-name' = 'ab'" +
" )");
bbTableEnv.sqlQuery("select a, count(b) from ab group by a").execute().print();
}
public static void test1() {
StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
streamEnv.setRuntimeMode(RuntimeExecutionMode.BATCH);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv);
tableEnv.executeSql("CREATE TABLE ab (" +
" a STRING, " +
" b INT " +
") WITH (" +
" 'connector' = 'jdbc'," +
" 'url' = 'jdbc:mysql://localhost:3306/a?serverTimezone=UTC'," +
" 'username' = 'root'," +
" 'password' = 'root'," +
" 'table-name' = 'ab'" +
" )");
tableEnv.sqlQuery("select a, count(b) from ab group by a").execute().print();
}
}