You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Michael Armbrust (JIRA)" <ji...@apache.org> on 2017/03/29 22:50:41 UTC

[jira] [Updated] (SPARK-20103) Spark structured steaming from kafka - last message processed again after resume from checkpoint

     [ https://issues.apache.org/jira/browse/SPARK-20103?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Michael Armbrust updated SPARK-20103:
-------------------------------------
    Description: 
When the application starts after a failure or a graceful shutdown, it is consistently processing the last message of the previous batch even though it was already processed correctly without failure.

We are making sure database writes are idempotent using postgres 9.6 feature. Is this the default behavior of spark? I added a code snippet with 2 streaming queries. One of the query is idempotent; since query2 is not idempotent, we are seeing duplicate entries in table. 

{code}
object StructuredStreaming {
  def main(args: Array[String]): Unit = {
    val db_url = "jdbc:postgresql://dynamic-milestone-dev.crv1otzbekk9.us-east-1.rds.amazonaws.com:5432/DYNAMICPOSTGRES?user=dsdbadmin&password=password"
    val spark = SparkSession
      .builder
      .appName("StructuredKafkaReader")
      .master("local[*]")
      .getOrCreate()
    spark.conf.set("spark.sql.streaming.checkpointLocation", "/tmp/checkpoint_research/")
    import spark.implicits._
    val server = "10.205.82.113:9092"
    val topic = "checkpoint"
    val subscribeType="subscribe"
    val lines = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", server)
      .option(subscribeType, topic)
      .load().selectExpr("CAST(value AS STRING)").as[String]
    lines.printSchema()
    import org.apache.spark.sql.ForeachWriter
    val writer = new ForeachWriter[String] {
       def open(partitionId: Long, version: Long):  Boolean = {
         println("After db props"); true
       }
       def process(value: String) = {
         val conn = DriverManager.getConnection(db_url)
         try{
           conn.createStatement().executeUpdate("INSERT INTO PUBLIC.checkpoint1 VALUES ('"+value+"')")
         }
         finally {
           conn.close()
         }
      }
       def close(errorOrNull: Throwable) = {}
    }
    import scala.concurrent.duration._
    val query1 = lines.writeStream
                 .outputMode("append")
                 .queryName("checkpoint1")
                 .trigger(ProcessingTime(30.seconds))
                 .foreach(writer)
                 .start()
 val writer2 = new ForeachWriter[String] {
      def open(partitionId: Long, version: Long):  Boolean = {
        println("After db props"); true
      }
      def process(value: String) = {
        val conn = DriverManager.getConnection(db_url)
        try{
          conn.createStatement().executeUpdate("INSERT INTO PUBLIC.checkpoint2 VALUES ('"+value+"')")
        }
        finally {
          conn.close()
        }
   }
      def close(errorOrNull: Throwable) = {}
    }
    import scala.concurrent.duration._
    val query2 = lines.writeStream
      .outputMode("append")
      .queryName("checkpoint2")
      .trigger(ProcessingTime(30.seconds))
      .foreach(writer2)
      .start()
    query2.awaitTermination()
    query1.awaitTermination()
}}
{code}

  was:
When the application starts after a failure or a graceful shutdown, it is consistently processing the last message of the previous batch even though it was already processed correctly without failure.

We are making sure database writes are idempotent using postgres 9.6 feature. Is this the default behavior of spark? I added a code snippet with 2 streaming queries. One of the query is idempotent; since query2 is not idempotent, we are seeing duplicate entries in table. 

-------
object StructuredStreaming {
  def main(args: Array[String]): Unit = {
    val db_url = "jdbc:postgresql://dynamic-milestone-dev.crv1otzbekk9.us-east-1.rds.amazonaws.com:5432/DYNAMICPOSTGRES?user=dsdbadmin&password=password"
    val spark = SparkSession
      .builder
      .appName("StructuredKafkaReader")
      .master("local[*]")
      .getOrCreate()
    spark.conf.set("spark.sql.streaming.checkpointLocation", "/tmp/checkpoint_research/")
    import spark.implicits._
    val server = "10.205.82.113:9092"
    val topic = "checkpoint"
    val subscribeType="subscribe"
    val lines = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", server)
      .option(subscribeType, topic)
      .load().selectExpr("CAST(value AS STRING)").as[String]
    lines.printSchema()
    import org.apache.spark.sql.ForeachWriter
    val writer = new ForeachWriter[String] {
       def open(partitionId: Long, version: Long):  Boolean = {
         println("After db props"); true
       }
       def process(value: String) = {
         val conn = DriverManager.getConnection(db_url)
         try{
           conn.createStatement().executeUpdate("INSERT INTO PUBLIC.checkpoint1 VALUES ('"+value+"')")
         }
         finally {
           conn.close()
         }
      }
       def close(errorOrNull: Throwable) = {}
    }
    import scala.concurrent.duration._
    val query1 = lines.writeStream
                 .outputMode("append")
                 .queryName("checkpoint1")
                 .trigger(ProcessingTime(30.seconds))
                 .foreach(writer)
                 .start()
 val writer2 = new ForeachWriter[String] {
      def open(partitionId: Long, version: Long):  Boolean = {
        println("After db props"); true
      }
      def process(value: String) = {
        val conn = DriverManager.getConnection(db_url)
        try{
          conn.createStatement().executeUpdate("INSERT INTO PUBLIC.checkpoint2 VALUES ('"+value+"')")
        }
        finally {
          conn.close()
        }
   }
      def close(errorOrNull: Throwable) = {}
    }
    import scala.concurrent.duration._
    val query2 = lines.writeStream
      .outputMode("append")
      .queryName("checkpoint2")
      .trigger(ProcessingTime(30.seconds))
      .foreach(writer2)
      .start()
    query2.awaitTermination()
    query1.awaitTermination()
}}
-------


> Spark structured steaming from kafka - last message processed again after resume from checkpoint
> ------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-20103
>                 URL: https://issues.apache.org/jira/browse/SPARK-20103
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.1.0
>         Environment: Linux, Spark 2.10 
>            Reporter: Rajesh Mutha
>              Labels: spark, streaming
>
> When the application starts after a failure or a graceful shutdown, it is consistently processing the last message of the previous batch even though it was already processed correctly without failure.
> We are making sure database writes are idempotent using postgres 9.6 feature. Is this the default behavior of spark? I added a code snippet with 2 streaming queries. One of the query is idempotent; since query2 is not idempotent, we are seeing duplicate entries in table. 
> {code}
> object StructuredStreaming {
>   def main(args: Array[String]): Unit = {
>     val db_url = "jdbc:postgresql://dynamic-milestone-dev.crv1otzbekk9.us-east-1.rds.amazonaws.com:5432/DYNAMICPOSTGRES?user=dsdbadmin&password=password"
>     val spark = SparkSession
>       .builder
>       .appName("StructuredKafkaReader")
>       .master("local[*]")
>       .getOrCreate()
>     spark.conf.set("spark.sql.streaming.checkpointLocation", "/tmp/checkpoint_research/")
>     import spark.implicits._
>     val server = "10.205.82.113:9092"
>     val topic = "checkpoint"
>     val subscribeType="subscribe"
>     val lines = spark
>       .readStream
>       .format("kafka")
>       .option("kafka.bootstrap.servers", server)
>       .option(subscribeType, topic)
>       .load().selectExpr("CAST(value AS STRING)").as[String]
>     lines.printSchema()
>     import org.apache.spark.sql.ForeachWriter
>     val writer = new ForeachWriter[String] {
>        def open(partitionId: Long, version: Long):  Boolean = {
>          println("After db props"); true
>        }
>        def process(value: String) = {
>          val conn = DriverManager.getConnection(db_url)
>          try{
>            conn.createStatement().executeUpdate("INSERT INTO PUBLIC.checkpoint1 VALUES ('"+value+"')")
>          }
>          finally {
>            conn.close()
>          }
>       }
>        def close(errorOrNull: Throwable) = {}
>     }
>     import scala.concurrent.duration._
>     val query1 = lines.writeStream
>                  .outputMode("append")
>                  .queryName("checkpoint1")
>                  .trigger(ProcessingTime(30.seconds))
>                  .foreach(writer)
>                  .start()
>  val writer2 = new ForeachWriter[String] {
>       def open(partitionId: Long, version: Long):  Boolean = {
>         println("After db props"); true
>       }
>       def process(value: String) = {
>         val conn = DriverManager.getConnection(db_url)
>         try{
>           conn.createStatement().executeUpdate("INSERT INTO PUBLIC.checkpoint2 VALUES ('"+value+"')")
>         }
>         finally {
>           conn.close()
>         }
>    }
>       def close(errorOrNull: Throwable) = {}
>     }
>     import scala.concurrent.duration._
>     val query2 = lines.writeStream
>       .outputMode("append")
>       .queryName("checkpoint2")
>       .trigger(ProcessingTime(30.seconds))
>       .foreach(writer2)
>       .start()
>     query2.awaitTermination()
>     query1.awaitTermination()
> }}
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org