You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Sean Glover (JIRA)" <ji...@apache.org> on 2019/04/25 15:42:00 UTC

[jira] [Comment Edited] (SPARK-27549) Commit Kafka Source offsets to facilitate external tooling

    [ https://issues.apache.org/jira/browse/SPARK-27549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16826180#comment-16826180 ] 

Sean Glover edited comment on SPARK-27549 at 4/25/19 3:41 PM:
--------------------------------------------------------------

Yes, committing offsets would only be for the benefit of monitoring so the ecosystem of Kafka consumer group offset monitoring software can be used by the client with deployed Spark apps.  Flink manages offsets themselves too, but only commit to Kafka for this purpose.

Below is an excerpt of the [Flink docs applicable to this feature|https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-offset-committing-behaviour-configuration] which I think may be a reasonable design for Spark as well.
{quote}The Flink Kafka Consumer allows configuring the behaviour of how offsets are committed back to Kafka brokers (or Zookeeper in 0.8). Note that the Flink Kafka Consumer does not rely on the committed offsets for fault tolerance guarantees. The committed offsets are only a means to expose the consumer’s progress for monitoring purposes.

The way to configure offset commit behaviour is different, depending on whether or not checkpointing is enabled for the job.
 * _Checkpointing disabled:_ if checkpointing is disabled, the Flink Kafka Consumer relies on the automatic periodic offset committing capability of the internally used Kafka clients. Therefore, to disable or enable offset committing, simply set the {{enable.auto.commit}} (or {{auto.commit.enable}} for Kafka 0.8) / {{auto.commit.interval.ms}} keys to appropriate values in the provided {{Properties}} configuration.

 * _Checkpointing enabled:_ if checkpointing is enabled, the Flink Kafka Consumer will commit the offsets stored in the checkpointed states when the checkpoints are completed. This ensures that the committed offsets in Kafka brokers is consistent with the offsets in the checkpointed states. Users can choose to disable or enable offset committing by calling the\{{setCommitOffsetsOnCheckpoints(boolean)}} method on the consumer (by default, the behaviour is {{true}}). Note that in this scenario, the automatic periodic offset committing settings in {{Properties}} is completely ignored.{quote}
 


was (Author: sean.glover):
Yes, committing offsets would only be for the benefit of monitoring so the ecosystem of Kafka consumer group offset monitoring software can be used by the client with deployed Spark apps.  Flink manages offsets themselves too, but only commit to Kafka for this purpose.

Below is an excerpt of the [Flink docs applicable to this feature|[https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-offset-committing-behaviour-configuration]] which I think may be a reasonable design for Spark as well.
{quote}The Flink Kafka Consumer allows configuring the behaviour of how offsets are committed back to Kafka brokers (or Zookeeper in 0.8). Note that the Flink Kafka Consumer does not rely on the committed offsets for fault tolerance guarantees. The committed offsets are only a means to expose the consumer’s progress for monitoring purposes.

The way to configure offset commit behaviour is different, depending on whether or not checkpointing is enabled for the job.
 * _Checkpointing disabled:_ if checkpointing is disabled, the Flink Kafka Consumer relies on the automatic periodic offset committing capability of the internally used Kafka clients. Therefore, to disable or enable offset committing, simply set the {{enable.auto.commit}} (or {{auto.commit.enable}} for Kafka 0.8) / {{auto.commit.interval.ms}} keys to appropriate values in the provided {{Properties}} configuration.

 * _Checkpointing enabled:_ if checkpointing is enabled, the Flink Kafka Consumer will commit the offsets stored in the checkpointed states when the checkpoints are completed. This ensures that the committed offsets in Kafka brokers is consistent with the offsets in the checkpointed states. Users can choose to disable or enable offset committing by calling the{{setCommitOffsetsOnCheckpoints(boolean)}} method on the consumer (by default, the behaviour is {{true}}). Note that in this scenario, the automatic periodic offset committing settings in {{Properties}} is completely ignored.
{quote}
 

> Commit Kafka Source offsets to facilitate external tooling
> ----------------------------------------------------------
>
>                 Key: SPARK-27549
>                 URL: https://issues.apache.org/jira/browse/SPARK-27549
>             Project: Spark
>          Issue Type: Improvement
>          Components: Structured Streaming
>    Affects Versions: 3.0.0
>            Reporter: Stavros Kontopoulos
>            Priority: Major
>
> Tools monitoring consumer lag could benefit from having the option of saving the source offsets. Sources use the implementation of org.apache.spark.sql.sources.v2.reader.streaming.
> SparkDataStream. KafkaMicroBatchStream currently [does not commit|https://github.com/apache/spark/blob/5bf5d9d854db53541956dedb03e2de8eecf65b81/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala#L170] anything as expected so we could expand that.
> Other streaming engines like [Flink|https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-offset-committing-behaviour-configuration] allow you to enable `auto.commit` at the expense of not having checkpointing.
> Here the proposal is to allow commit the sources offsets when progress has been made.
> I am also aware that another option would be to have a StreamingQueryListener and intercept when batches are completed and then write the offsets anywhere you need to but it would be great if Kafka integration with Structured Streaming could do some of this work anyway.
> [~cody@koeninger.org]  [~marmbrus] what do you think?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org