You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Spongebob (Jira)" <ji...@apache.org> on 2021/04/11 03:19:00 UTC

[jira] [Created] (FLINK-22190) no guarantee on Flink exactly_once sink to Kafka

Spongebob created FLINK-22190:
---------------------------------

             Summary: no guarantee on Flink exactly_once sink to Kafka 
                 Key: FLINK-22190
                 URL: https://issues.apache.org/jira/browse/FLINK-22190
             Project: Flink
          Issue Type: Bug
          Components: API / DataStream
    Affects Versions: 1.12.2
         Environment: *flink: 1.12.2*

*kafka: 2.7.0*
            Reporter: Spongebob


When I tried to test the function of flink exactly_once sink to kafka, I found it can not run as expectation.  here's the pipline of the flink applications: raw data(flink app0)-> kafka topic1 -> flink app1 -> kafka topic2 -> flink app2, flink tasks may met / byZeroException in random. Below shows the codes:
{code:java}
//代码占位符
raw data, flink app0:
class SimpleSource1 extends SourceFunction[String] {
 var switch = true
 val students: Array[String] = Array("Tom", "Jerry", "Gory")

 override def run(sourceContext: SourceFunction.SourceContext[String]): Unit = {
 var i = 0
 while (switch) {
 sourceContext.collect(s"${students(Random.nextInt(students.length))},$i")
 i += 1
 Thread.sleep(5000)
 }
 }
 override def cancel(): Unit = switch = false
}
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
val dataStream = streamEnv.addSource(new SimpleSource1)

dataStream.addSink(new FlinkKafkaProducer[String]("xfy:9092", "single-partition-topic-2", new SimpleStringSchema()))

streamEnv.execute("sink kafka")
 
flink-app1:
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
streamEnv.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE)
val prop = new Properties()
prop.setProperty("bootstrap.servers", "xfy:9092")
prop.setProperty("group.id", "test")
val dataStream = streamEnv.addSource(new FlinkKafkaConsumer[String](
 "single-partition-topic-2",
 new SimpleStringSchema,
 prop
))
val resultStream = dataStream.map(x => {
 val data = x.split(",")
 (data(0), data(1), data(1).toInt / Random.nextInt(5)).toString()
}
)
resultStream.print().setParallelism(1)
val propProducer = new Properties()
propProducer.setProperty("bootstrap.servers", "xfy:9092")
propProducer.setProperty("transaction.timeout.ms", s"${1000 * 60 * 5}")
resultStream.addSink(new FlinkKafkaProducer[String](
 "single-partition-topic",
 new MyKafkaSerializationSchema("single-partition-topic"),
 propProducer,
 Semantic.EXACTLY_ONCE))
streamEnv.execute("sink kafka")
 
flink-app2:
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
val prop = new Properties()
prop.setProperty("bootstrap.servers", "xfy:9092")
prop.setProperty("group.id", "test")
prop.setProperty("isolation_level", "read_committed")
val dataStream = streamEnv.addSource(new FlinkKafkaConsumer[String](
 "single-partition-topic",
 new SimpleStringSchema,
 prop
))
dataStream.print().setParallelism(1)
streamEnv.execute("consumer kafka"){code}
 

flink app1 will print some duplicate numbers, and to my expectation flink app2 will deduplicate them but the fact shows not.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)