You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Sasaki Toru (JIRA)" <ji...@apache.org> on 2017/03/21 23:16:41 UTC

[jira] [Updated] (SPARK-20050) Kafka 0.10 DirectStream doesn't commit last processed batch's offset when graceful shutdown

     [ https://issues.apache.org/jira/browse/SPARK-20050?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Sasaki Toru updated SPARK-20050:
--------------------------------
    Description: 
I use Kafka 0.10 DirectStream with properties 'enable.auto.commit=false' and call 'DirectKafkaInputDStream#commitAsync' finally in each batches,  such below

{code}
val kafkaStream = KafkaUtils.createDirectStream[String, String](...)

kafkaStream.map { input =>
  "key: " + input.key.toString + " value: " + input.value.toString + " offset: " + input.offset.toString
  }.foreachRDD { rdd =>
    rdd.foreach { input =>
    println(input)
  }
}

kafkaStream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  kafkaStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
{\code}

Some records which processed in the last batch before Streaming graceful shutdown reprocess in the first batch after Spark Streaming restart.

It may cause offsets specified in commitAsync will commit in the head of next batch.


  was:
I use Kafka 0.10 DirectStream with properties 'enable.auto.commit=false' and call 'DirectKafkaInputDStream#commitAsync' finally in each batches such below

{code}
val kafkaStream = KafkaUtils.createDirectStream[String, String](...)

kafkaStream.map { input =>
  "key: " + input.key.toString + " value: " + input.value.toString + " offset: " + input.offset.toString
  }.foreachRDD { rdd =>
    rdd.foreach { input =>
    println(input)
  }
}

kafkaStream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  kafkaStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
{\code}

Some records which processed in the last batch before Streaming graceful shutdown reprocess in the first batch after Spark Streaming restart.

It may cause offsets specified in commitAsync will commit in the head of next batch.


     Issue Type: Bug  (was: Improvement)

> Kafka 0.10 DirectStream doesn't commit last processed batch's offset when graceful shutdown
> -------------------------------------------------------------------------------------------
>
>                 Key: SPARK-20050
>                 URL: https://issues.apache.org/jira/browse/SPARK-20050
>             Project: Spark
>          Issue Type: Bug
>          Components: DStreams
>    Affects Versions: 2.2.0
>            Reporter: Sasaki Toru
>
> I use Kafka 0.10 DirectStream with properties 'enable.auto.commit=false' and call 'DirectKafkaInputDStream#commitAsync' finally in each batches,  such below
> {code}
> val kafkaStream = KafkaUtils.createDirectStream[String, String](...)
> kafkaStream.map { input =>
>   "key: " + input.key.toString + " value: " + input.value.toString + " offset: " + input.offset.toString
>   }.foreachRDD { rdd =>
>     rdd.foreach { input =>
>     println(input)
>   }
> }
> kafkaStream.foreachRDD { rdd =>
>   val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>   kafkaStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
> }
> {\code}
> Some records which processed in the last batch before Streaming graceful shutdown reprocess in the first batch after Spark Streaming restart.
> It may cause offsets specified in commitAsync will commit in the head of next batch.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org