You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2019/05/15 09:18:10 UTC

[GitHub] [spark] skonto opened a new pull request #24613: [SPARK-27549][SS]Add support for committing kafka offsets for

skonto opened a new pull request #24613: [SPARK-27549][SS]Add support for committing kafka offsets for
URL: https://github.com/apache/spark/pull/24613
 
 
   ## What changes were proposed in this pull request?
   
   We introduce offsets commit per micro-batch. Since we cannot commit at the executor side (a batch can be repeated) we need to do this at the driver side. Inspired by the Flink approach although Flink is more close to the continuous mode which the PR does not cover. For more details check the jira ticket.
   
   ## How was this patch tested?
   Manually. Using spark shell.
   ```
   import org.apache.spark.sql.streaming.Trigger
   import spark.implicits._
   
   case class GeneratedData(key: String, value: String)
     
   val df = spark
     .readStream
     .format("kafka")
     .option("checkpointLocation", "/tmp/k4")
     .option("kafka.bootstrap.servers", "localhost:9092")
     .option("subscribe", "Topic4")
     .option("startingOffsets", "latest")
     .option("setCommitOffsetsOnCheckpoints", "true")
     .option("maxOffsetsPerTrigger", 100)
     .option("setCommitOffsetsOnCheckpoints", "true")
     .load()
   
    val query = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
    .as[GeneratedData]
    .writeStream
    .format("console")
    .trigger(Trigger.ProcessingTime("5 seconds"))
   
    query.start()
   ```
   
   ```
   ./kafka-console-consumer.sh  --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --bootstrap-server localhost:9092 --topic __consumer_offsets --from-beginning | grep spark
   
   [spark-kafka-source-b595f412-1ec6-408f-b804-6d44ae763340-348116533-driver-0,Topic4,0]::[OffsetMetadata[0,NO_METADATA],CommitTime 1557910170072,ExpirationTime 1557996570072]
   [spark-kafka-source-b595f412-1ec6-408f-b804-6d44ae763340-348116533-driver-0,Topic4,0]::[OffsetMetadata[1,NO_METADATA],CommitTime 1557910175019,ExpirationTime 1557996575019]
   [spark-kafka-source-b595f412-1ec6-408f-b804-6d44ae763340-348116533-driver-0,Topic4,0]::[OffsetMetadata[2,NO_METADATA],CommitTime 1557910180035,ExpirationTime 1557996580035]
   [spark-kafka-source-b595f412-1ec6-408f-b804-6d44ae763340-348116533-driver-0,Topic4,0]::[OffsetMetadata[6,NO_METADATA],CommitTime 1557910185014,ExpirationTime 1557996585014]
   [spark-kafka-source-b595f412-1ec6-408f-b804-6d44ae763340-348116533-driver-0,Topic4,0]::[OffsetMetadata[10,NO_METADATA],CommitTime 1557910190014,ExpirationTime 1557996590014]
   
   
   scala> -------------------------------------------
   Batch: 0
   -------------------------------------------
   +---+-----+
   |key|value|
   +---+-----+
   +---+-----+
   
   -------------------------------------------
   Batch: 1
   -------------------------------------------
   +----+------------+
   | key|       value|
   +----+------------+
   |null|Hello, World|
   +----+------------+
   
   -------------------------------------------
   Batch: 2
   -------------------------------------------
   +----+------------+
   | key|       value|
   +----+------------+
   |null|Hello, World|
   +----+------------+
   
   -------------------------------------------
   Batch: 3
   -------------------------------------------
   +----+------------+
   | key|       value|
   +----+------------+
   |null|Hello, World|
   |null|Hello, World|
   |null|Hello, World|
   |null|Hello, World|
   +----+------------+
   
   -------------------------------------------
   Batch: 4
   -------------------------------------------
   +----+------------+
   | key|       value|
   +----+------------+
   |null|Hello, World|
   |null|Hello, World|
   |null|Hello, World|
   |null|Hello, World|
   +----+------------+
   
   -------------------------------------------
   Batch: 5
   -------------------------------------------
   +----+------------+
   | key|       value|
   +----+------------+
   |null|Hello, World|
   |null|Hello, World|
   |null|Hello, World|
   +----+------------+
   
   ```
   The last commit reported 10 as the offset while the maximum in this case is 13, reason is that [MicroBatchExecution](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala#L416) will only commit the previously completed batch offsets when a new batch is going to be processed.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org