You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "zhangzh (Jira)" <ji...@apache.org> on 2021/12/29 05:16:00 UTC
[jira] [Created] (FLINK-25471) wrong result if table toDataStream then keyey by sum
zhangzh created FLINK-25471:
-------------------------------
Summary: wrong result if table toDataStream then keyey by sum
Key: FLINK-25471
URL: https://issues.apache.org/jira/browse/FLINK-25471
Project: Flink
Issue Type: Bug
Components: Table SQL / API
Affects Versions: 1.14.2
Reporter: zhangzh
I have 6 lines like this:
Row.of("Alice"),
Row.of("alice"),
Row.of("Bob"),
Row.of("lily"),
Row.of("lily"),
Row.of("lily")
then make it to table with one colums "wrod"
import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.api.common.functions.\{MapFunction, ReduceFunction}
import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.types.Row
object TableToDataStreamBatchWordCount {
def main(args: Array[String]) {
//create env and tableEnv
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
//env.setRuntimeMode(RuntimeExecutionMode.BATCH)
env.setParallelism(7)
val tableEnv = StreamTableEnvironment.create(env)
// make data ,3 line
val resultDS2 = env.fromElements(
Row.of("Alice"),
Row.of("alice"),
Row.of("Bob"),
Row.of("lily"),
Row.of("lily"),
Row.of("lily")
)(Types.ROW(Types.STRING))
// dataStream[Row] --> Table --> sql to upper transform table
val table = tableEnv.fromDataStream(resultDS2).as("word")
tableEnv.createTemporaryView(s"tmp_table",table)
val resultTable = tableEnv.sqlQuery(s" select UPPER(word) as word from tmp_table ")
// sql transformed table --> DataStream[String]
val resultDs = tableEnv.toDataStream(resultTable).map(row => {
row.getField("word").asInstanceOf[String]
})
// keyby reduce
val counts: DataStream[(String, Int)] = resultDs
.map((_, 1))
.keyBy(_._1)
.sum(1)
// print result
counts.print()
env.execute("WordCount")
}
}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)