You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Matthew Barlocker (JIRA)" <ji...@apache.org> on 2016/09/13 22:10:20 UTC

[jira] [Updated] (FLINK-4617) Kafka & Flink duplicate messages on restart

     [ https://issues.apache.org/jira/browse/FLINK-4617?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Matthew Barlocker updated FLINK-4617:
-------------------------------------
    Affects Version/s: 1.0.1
                       1.0.2
                       1.0.3

> Kafka & Flink duplicate messages on restart
> -------------------------------------------
>
>                 Key: FLINK-4617
>                 URL: https://issues.apache.org/jira/browse/FLINK-4617
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector, State Backends, Checkpointing
>    Affects Versions: 1.1.0, 1.0.1, 1.0.2, 1.0.3, 1.1.1, 1.1.2
>         Environment: Ubuntu 16.04
> Flink 1.1.*
> Kafka 0.9.0.1
> Scala 2.11.7
> Java 1.8.0_91
>            Reporter: Matthew Barlocker
>            Priority: Critical
>
> [StackOverflow Link|http://stackoverflow.com/questions/39459315/kafka-flink-duplicate-messages-on-restart]
> Flink (the kafka connector) re-runs the last 3-9 messages it saw before it was shut down.
> *My code:*
> {code}
> import java.util.Properties
> import org.apache.flink.streaming.api.windowing.time.Time
> import org.apache.flink.streaming.api.scala._
> import org.apache.flink.streaming.api.CheckpointingMode
> import org.apache.flink.streaming.connectors.kafka._
> import org.apache.flink.streaming.util.serialization._
> import org.apache.flink.runtime.state.filesystem._
> object Runner {
>   def main(args: Array[String]): Unit = {
>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>     env.enableCheckpointing(500)
>     env.setStateBackend(new FsStateBackend("file:///tmp/checkpoints"))
>     env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
>     val properties = new Properties()
>     properties.setProperty("bootstrap.servers", "localhost:9092");
>     properties.setProperty("group.id", "testing");
>     val kafkaConsumer = new FlinkKafkaConsumer09[String]("testing-in", new SimpleStringSchema(), properties)
>     val kafkaProducer = new FlinkKafkaProducer09[String]("localhost:9092", "testing-out", new SimpleStringSchema())
>     env.addSource(kafkaConsumer)
>       .addSink(kafkaProducer)
>     env.execute()
>   }
> }
> {code}
> *My sbt dependencies:*
> {code}
> libraryDependencies ++= Seq(
>     "org.apache.flink" %% "flink-scala" % "1.1.2",
>     "org.apache.flink" %% "flink-streaming-scala" % "1.1.2",
>     "org.apache.flink" %% "flink-clients" % "1.1.2",
>     "org.apache.flink" %% "flink-connector-kafka-0.9" % "1.1.2",
>     "org.apache.flink" %% "flink-connector-filesystem" % "1.1.2"
> )
> {code}
> *My process:*
> using 3 terminals:
> {code}
> TERM-1 start sbt, run program
> TERM-2 create kafka topics testing-in and testing-out
> TERM-2 run kafka-console-producer on testing-in topic
> TERM-3 run kafka-console-consumer on testing-out topic
> TERM-2 send data to kafka producer.
> Wait for a couple seconds (buffers need to flush)
> TERM-3 watch data appear in testing-out topic
> Wait for at least 500 milliseconds for checkpointing to happen
> TERM-1 stop sbt
> TERM-1 run sbt
> TERM-3 watch last few lines of data appear in testing-out topic
> {code}
> *My expectations:*
> When there are no errors in the system, I expect to be able to turn flink on and off without reprocessing messages that successfully completed the stream in a prior run.
> *My attempts to fix:*
> I've added the call to setStateBackend, thinking that perhaps the default memory backend just didn't remember correctly. That didn't seem to help.
> I've removed the call to enableCheckpointing, hoping that perhaps there was a separate mechanism to track state in Flink vs Zookeeper. That didn't seem to help.
> I've used different sinks, RollingFileSink, print(); hoping that maybe the bug was in kafka. That didn't seem to help.
> I've rolled back to flink (and all connectors) v1.1.0 and v1.1.1, hoping that maybe the bug was in the latest version. That didn't seem to help.
> I've added the zookeeper.connect config to the properties object, hoping that the comment about it only being useful in 0.8 was wrong. That didn't seem to help.
> I've explicitly set the checkpointing mode to EXACTLY_ONCE (good idea drfloob). That didn't seem to help.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)