You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Dongjoon Hyun (JIRA)" <ji...@apache.org> on 2019/06/09 21:17:00 UTC

[jira] [Commented] (SPARK-27456) Support commitSync for offsets in DirectKafkaInputDStream

    [ https://issues.apache.org/jira/browse/SPARK-27456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16859572#comment-16859572 ] 

Dongjoon Hyun commented on SPARK-27456:
---------------------------------------

Thank you for filing a JIRA. Since new feature is not allowed to land at branch-2.4, I'll set the `Affected Version` to new one, 3.0.0.

> Support commitSync for offsets in DirectKafkaInputDStream
> ---------------------------------------------------------
>
>                 Key: SPARK-27456
>                 URL: https://issues.apache.org/jira/browse/SPARK-27456
>             Project: Spark
>          Issue Type: Improvement
>          Components: Spark Core
>    Affects Versions: 3.0.0
>            Reporter: Jackson Westeen
>            Priority: Major
>
> Hello! I left a comment under SPARK-22486 but wasn't sure if it would get noticed as that one got closed; x-posting here.
> ----
> I'm trying to achieve "effectively once" semantics with Spark Streaming for batch writes to S3. Only way to do this is to partitionBy(startOffsets) in some way, such that re-writes on failure/retry are idempotent; they overwrite the past batch if failure occurred before commitAsync was successful.
>  
> Here's my example:
> {code:java}
> stream.foreachRDD((rdd:  ConsumerRecord[String, Array[Byte]]) => {
>   val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>   // make dataset, with this batch's offsets included
>   spark
>     .createDataset(inputRdd)
>     .map(record => from_json(new String(record.value))) // just for example
>     .write
>     .mode(SaveMode.Overwrite)
>     .option("partitionOverwriteMode", "dynamic")
>     .withColumn("dateKey", from_unixtime($"from_json.timestamp"), "yyyyMMDD"))
>     .withColumn("startOffsets",   lit(offsetRanges.sortBy(_.partition).map(_.fromOffset).mkString("_"))  )
>     .partitionBy("dateKey", "startOffsets")
>     .parquet("s3://mybucket/kafka-parquet")
>   stream.asInstanceOf[CanCommitOffsets].commitAsync...
> })
> {code}
> This almost works. The only issue is, I can still end up with duplicate/overlapping data if:
>  # an initial write to S3 succeeds (batch A)
>  # commitAsync takes a long time, eventually fails, *but the job carries on to successfully write another batch in the meantime (batch B)*
>  # job fails for any reason, we start back at the last committed offsets, however now with more data in Kafka to process than before... (batch A' which includes A, B, ...)
>  # we successfully overwrite the initial batch by startOffsets with (batch A') and progress as normal. No data is lost, however (batch B) is leftover in S3 and contains partially duplicate data.
> It would be very nice to have an atomic operation for write and commitOffsets, or be able to simulate one with commitSync in Spark Streaming :)
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org