You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Spongebob (Jira)" <ji...@apache.org> on 2021/04/11 03:19:00 UTC
[jira] [Created] (FLINK-22190) no guarantee on Flink exactly_once
sink to Kafka
Spongebob created FLINK-22190:
---------------------------------
Summary: no guarantee on Flink exactly_once sink to Kafka
Key: FLINK-22190
URL: https://issues.apache.org/jira/browse/FLINK-22190
Project: Flink
Issue Type: Bug
Components: API / DataStream
Affects Versions: 1.12.2
Environment: *flink: 1.12.2*
*kafka: 2.7.0*
Reporter: Spongebob
When I tried to test the function of flink exactly_once sink to kafka, I found it can not run as expectation. here's the pipline of the flink applications: raw data(flink app0)-> kafka topic1 -> flink app1 -> kafka topic2 -> flink app2, flink tasks may met / byZeroException in random. Below shows the codes:
{code:java}
//代码占位符
raw data, flink app0:
class SimpleSource1 extends SourceFunction[String] {
var switch = true
val students: Array[String] = Array("Tom", "Jerry", "Gory")
override def run(sourceContext: SourceFunction.SourceContext[String]): Unit = {
var i = 0
while (switch) {
sourceContext.collect(s"${students(Random.nextInt(students.length))},$i")
i += 1
Thread.sleep(5000)
}
}
override def cancel(): Unit = switch = false
}
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
val dataStream = streamEnv.addSource(new SimpleSource1)
dataStream.addSink(new FlinkKafkaProducer[String]("xfy:9092", "single-partition-topic-2", new SimpleStringSchema()))
streamEnv.execute("sink kafka")
flink-app1:
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
streamEnv.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE)
val prop = new Properties()
prop.setProperty("bootstrap.servers", "xfy:9092")
prop.setProperty("group.id", "test")
val dataStream = streamEnv.addSource(new FlinkKafkaConsumer[String](
"single-partition-topic-2",
new SimpleStringSchema,
prop
))
val resultStream = dataStream.map(x => {
val data = x.split(",")
(data(0), data(1), data(1).toInt / Random.nextInt(5)).toString()
}
)
resultStream.print().setParallelism(1)
val propProducer = new Properties()
propProducer.setProperty("bootstrap.servers", "xfy:9092")
propProducer.setProperty("transaction.timeout.ms", s"${1000 * 60 * 5}")
resultStream.addSink(new FlinkKafkaProducer[String](
"single-partition-topic",
new MyKafkaSerializationSchema("single-partition-topic"),
propProducer,
Semantic.EXACTLY_ONCE))
streamEnv.execute("sink kafka")
flink-app2:
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
val prop = new Properties()
prop.setProperty("bootstrap.servers", "xfy:9092")
prop.setProperty("group.id", "test")
prop.setProperty("isolation_level", "read_committed")
val dataStream = streamEnv.addSource(new FlinkKafkaConsumer[String](
"single-partition-topic",
new SimpleStringSchema,
prop
))
dataStream.print().setParallelism(1)
streamEnv.execute("consumer kafka"){code}
flink app1 will print some duplicate numbers, and to my expectation flink app2 will deduplicate them but the fact shows not.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)