You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Matthew Barlocker (JIRA)" <ji...@apache.org> on 2016/09/13 20:08:20 UTC

[jira] [Created] (FLINK-4617) Kafka & Flink duplicate messages on restart

Matthew Barlocker created FLINK-4617:
----------------------------------------

             Summary: Kafka & Flink duplicate messages on restart
                 Key: FLINK-4617
                 URL: https://issues.apache.org/jira/browse/FLINK-4617
             Project: Flink
          Issue Type: Bug
          Components: Kafka Connector, State Backends, Checkpointing
    Affects Versions: 1.1.2, 1.1.1, 1.1.0
         Environment: Ubuntu 16.04
Flink 1.1.*
Kafka 0.9.0.1
Scala 2.11.7
Java 1.8.0_91
            Reporter: Matthew Barlocker
            Priority: Critical


[StackOverflow Link|http://stackoverflow.com/questions/39459315/kafka-flink-duplicate-messages-on-restart]

Flink (the kafka connector) re-runs the last 3-9 messages it saw before it was shut down.

*My code:*
{code}
import java.util.Properties
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.connectors.kafka._
import org.apache.flink.streaming.util.serialization._
import org.apache.flink.runtime.state.filesystem._

object Runner {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.enableCheckpointing(500)
    env.setStateBackend(new FsStateBackend("file:///tmp/checkpoints"))
    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)

    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "localhost:9092");
    properties.setProperty("group.id", "testing");

    val kafkaConsumer = new FlinkKafkaConsumer09[String]("testing-in", new SimpleStringSchema(), properties)
    val kafkaProducer = new FlinkKafkaProducer09[String]("localhost:9092", "testing-out", new SimpleStringSchema())
    env.addSource(kafkaConsumer)
      .addSink(kafkaProducer)

    env.execute()
  }
}
{code}

*My sbt dependencies:*
{code}
libraryDependencies ++= Seq(
    "org.apache.flink" %% "flink-scala" % "1.1.2",
    "org.apache.flink" %% "flink-streaming-scala" % "1.1.2",
    "org.apache.flink" %% "flink-clients" % "1.1.2",
    "org.apache.flink" %% "flink-connector-kafka-0.9" % "1.1.2",
    "org.apache.flink" %% "flink-connector-filesystem" % "1.1.2"
)
{code}

*My process:*
using 3 terminals:
{code}
TERM-1 start sbt, run program
TERM-2 create kafka topics testing-in and testing-out
TERM-2 run kafka-console-producer on testing-in topic
TERM-3 run kafka-console-consumer on testing-out topic
TERM-2 send data to kafka producer.
Wait for a couple seconds (buffers need to flush)
TERM-3 watch data appear in testing-out topic
Wait for at least 500 milliseconds for checkpointing to happen
TERM-1 stop sbt
TERM-1 run sbt
TERM-3 watch last few lines of data appear in testing-out topic
{code}

*My expectations:*

When there are no errors in the system, I expect to be able to turn flink on and off without reprocessing messages that successfully completed the stream in a prior run.

*My attempts to fix:*

I've added the call to setStateBackend, thinking that perhaps the default memory backend just didn't remember correctly. That didn't seem to help.

I've removed the call to enableCheckpointing, hoping that perhaps there was a separate mechanism to track state in Flink vs Zookeeper. That didn't seem to help.

I've used different sinks, RollingFileSink, print(); hoping that maybe the bug was in kafka. That didn't seem to help.

I've rolled back to flink (and all connectors) v1.1.0 and v1.1.1, hoping that maybe the bug was in the latest version. That didn't seem to help.

I've added the zookeeper.connect config to the properties object, hoping that the comment about it only being useful in 0.8 was wrong. That didn't seem to help.

I've explicitly set the checkpointing mode to EXACTLY_ONCE (good idea drfloob). That didn't seem to help.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)