You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Jark Wu (Jira)" <ji...@apache.org> on 2020/08/27 07:58:00 UTC
[jira] [Comment Edited] (FLINK-18769) MiniBatch doesn't work with
FLIP-95 source
[ https://issues.apache.org/jira/browse/FLINK-18769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17171259#comment-17171259 ]
Jark Wu edited comment on FLINK-18769 at 8/27/20, 7:57 AM:
-----------------------------------------------------------
Fixed in
- master (1.12.0): 94b23885ca34927e37334fce51b930933cfd79dd
- 1.11.2: 770a43580de3125aeac6bf81769e438753697fee
was (Author: jark):
Fixed in
- master (1.12.0): 94b23885ca34927e37334fce51b930933cfd79dd
- 1.11.2: ec5a4d3b54de535f97bb67706032eac68d0eb214
> MiniBatch doesn't work with FLIP-95 source
> ------------------------------------------
>
> Key: FLINK-18769
> URL: https://issues.apache.org/jira/browse/FLINK-18769
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Runtime
> Affects Versions: 1.11.1
> Reporter: Nico Kruber
> Assignee: Jark Wu
> Priority: Blocker
> Labels: pull-request-available
> Fix For: 1.12.0, 1.11.2
>
>
> The following Table API streaming job is stuck when enabling mini batching
> {code}
> StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
> EnvironmentSettings settings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
> // disable mini-batching completely to get a result
> Configuration tableConf = tableEnv.getConfig()
> .getConfiguration();
> tableConf.setString("table.exec.mini-batch.enabled", "true");
> tableConf.setString("table.exec.mini-batch.allow-latency", "5 s");
> tableConf.setString("table.exec.mini-batch.size", "5000");
> tableConf.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");
> tableEnv.executeSql(
> "CREATE TABLE input_table ("
> + "location STRING, "
> + "population INT"
> + ") WITH ("
> + "'connector' = 'kafka', "
> + "'topic' = 'kafka_batching_input', "
> + "'properties.bootstrap.servers' = 'localhost:9092', "
> + "'format' = 'csv', "
> + "'scan.startup.mode' = 'earliest-offset'"
> + ")");
> tableEnv.executeSql(
> "CREATE TABLE result_table WITH ('connector' = 'print') LIKE input_table (EXCLUDING OPTIONS)");
> tableEnv
> .from("input_table")
> .groupBy($("location"))
> .select($("location").cast(DataTypes.CHAR(2)).as("location"), $("population").sum().as("population"))
> .executeInsert("result_table");
> {code}
> I am using a pre-populated Kafka topic called {{kafka_batching_input}} with these elements:
> {code}
> "Berlin",1
> "Berlin",2
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)