You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "zhangzh (Jira)" <ji...@apache.org> on 2021/12/29 05:16:00 UTC

[jira] [Created] (FLINK-25471) wrong result if table toDataStream then keyey by sum

zhangzh created FLINK-25471:
-------------------------------

             Summary: wrong result if table toDataStream then keyey by sum
                 Key: FLINK-25471
                 URL: https://issues.apache.org/jira/browse/FLINK-25471
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / API
    Affects Versions: 1.14.2
            Reporter: zhangzh


I have 6 lines like this:

Row.of("Alice"),
Row.of("alice"),
Row.of("Bob"),
Row.of("lily"),
Row.of("lily"),
Row.of("lily")

then  make it to  table  with one colums "wrod"

 

 

 

 

 

 

 




import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.api.common.functions.\{MapFunction, ReduceFunction}
import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.types.Row

object TableToDataStreamBatchWordCount {

def main(args: Array[String]) {


//create env and tableEnv
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
//env.setRuntimeMode(RuntimeExecutionMode.BATCH)
env.setParallelism(7)
val tableEnv = StreamTableEnvironment.create(env)

// make data ,3 line
val resultDS2 = env.fromElements(
Row.of("Alice"),
Row.of("alice"),
Row.of("Bob"),
Row.of("lily"),
Row.of("lily"),
Row.of("lily")
)(Types.ROW(Types.STRING))

// dataStream[Row] --> Table --> sql to upper transform table
val table = tableEnv.fromDataStream(resultDS2).as("word")
tableEnv.createTemporaryView(s"tmp_table",table)
val resultTable = tableEnv.sqlQuery(s" select UPPER(word) as word from tmp_table ")

// sql transformed table --> DataStream[String]
val resultDs = tableEnv.toDataStream(resultTable).map(row => {
row.getField("word").asInstanceOf[String]
})


// keyby reduce
val counts: DataStream[(String, Int)] = resultDs
.map((_, 1))
.keyBy(_._1)
.sum(1)

// print result
counts.print()

env.execute("WordCount")
}
}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)