You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2021/03/24 07:54:16 UTC

[GitHub] [spark] yijiacui-db opened a new pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics API and Add Kafka metrics to report delay.

yijiacui-db opened a new pull request #31944:
URL: https://github.com/apache/spark/pull/31944


   ### What changes were proposed in this pull request?
   This pull request proposes a new API for streaming sources to signal that they can report metrics, and adds a use case to support Kafka micro batch stream to report the stats of # of offsets for the current offset falling behind the latest.
   
   
   ### Why are the changes needed?
   The new API can expose any custom metrics for the "current" offset for streaming sources. A use case is that people want to know how the current offset falls behind the latest offset.
   
   ### Does this PR introduce _any_ user-facing change?
   No.
   
   
   ### How was this patch tested?
   Unit test for Kafka micro batch source v2 are added to test the Kafka use case.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xuanyuanking commented on pull request #31944: [SPARK-34297][SQL] Expose source metrics API and Add Kafka metrics to report delay.

Posted by GitBox <gi...@apache.org>.
xuanyuanking commented on pull request #31944:
URL: https://github.com/apache/spark/pull/31944#issuecomment-805517034


   Agree with @viirya on creating another JIRA. Let's link these two tickets together.
   IMO, we should have the metrics both in UI and progress reporter. We can do the follow-ups to combine the code (e.g., the metrics collection part).
   
   Also cc @HeartSaVioR @gaborgsomogyi @Ngone51 @zsxwing 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #31944:
URL: https://github.com/apache/spark/pull/31944#issuecomment-832394646






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] yijiacui-db edited a comment on pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
yijiacui-db edited a comment on pull request #31944:
URL: https://github.com/apache/spark/pull/31944#issuecomment-805966497


   > Yeah I guess it's just a matter of parsing and calculation on user end.
   > 
   > * Do you have specific use case leveraging this information?
   > * Are you planning to integrate this information to Spark UI or somewhere?
   > * Could you please try out recent version of Spark and check the available information on SourceProgress, and see whether it could solve the same use case despite of some more calculation?
   
   @viirya @HeartSaVioR  I don't think that's a duplicated information in source progress. The information recorded in the source progress now is the latest consumed offset by the stream, not the latest offset available in the source. Take Kafka as an example, we can have read limit while consuming the offsets, so we can only consume some certain number of offset, but the available data in kafka is more than that. That can be applied to all the other streaming sources too. There are some users want to know whether they fall behind through the listener and want to adjust the cluster size accordingly.
   
   I don't think the current spark progress can calculate the information as i mentioned above, because the latest offset available information is internal for the source, there's no way to know that with the current source progress. 
   
   I didn't have a plan for integrating this information with spark UI. That's something I can work on after @viirya 's PR is merged in. I can refactor and adjust accordingly to see whether this metrics information can be exposed through spark UI too. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] yijiacui-db edited a comment on pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
yijiacui-db edited a comment on pull request #31944:
URL: https://github.com/apache/spark/pull/31944#issuecomment-828832285


   > > > I've tested it on real cluster and works fine.
   > > > Just a question. How this it intended to use for dynamic allocation?
   > > 
   > > 
   > > Users can implement this interface in their customized SparkDataStream and know how far falling behind through the progress listener. Maybe this can provide more useful information to guide/trigger the auto scaling.
   > 
   > This is a valid user-case. But my question is that current offsets in `SourceProgress` should already provide the information the use-case needs (consumed offset, available offset). The source progress should be also available on the customized SparkDataStream. Do you mean the metrics from the customized SparkDataStream is not offset related?
   
   Yes. Available offset is retrieved through reportLatestOffset, that's something Kafka already implemented, so that's duplicated because we can use the latest consumed offset and also the available offset to compute how far is falling behind.
   But, for other customized spark data stream, it's possible that reportLatestOffset isn't implemented, so from the source progress report, there's no way to know the latest available offset to do the computation.  Also, the customized metrics, for example, how far the application is falling behind from the latest, can be represented in other ways (not only in the number of offsets), which all depends on the how the stream defines it.
   
   We want to introduce this metrics interface to let user implement for their data stream to obtain the metrics they want from the source progress report. Kafka Stream is just an example of how users can implement this and retrieve that information, but it happens to have the latest available offset to make it look a little bit duplicated and hard to reason about.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gaborgsomogyi commented on pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on pull request #31944:
URL: https://github.com/apache/spark/pull/31944#issuecomment-827810637


   retest this please


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR edited a comment on pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
HeartSaVioR edited a comment on pull request #31944:
URL: https://github.com/apache/spark/pull/31944#issuecomment-829611545


   Yes that sounds like a good rationalization for real case. Thanks!
   
   I looked into the changes on API side, and felt both #30988 and this can co-exist. #30988 covers specific cases where latest offset as Offset format can be provided by data source, and this covers more general ("arbitrary" might fit better) cases where the information data source wants to provide is not limited to the latest offset.
   
   For sure, the actual behavioral change in #30988 can be implemented with the API being added here, but providing general output across data sources would be ideally more useful, like plotting to the UI. (I know the technical lack here on making it general as the format of "Offset" is varying across data sources and consumer has to take care.)
   
   For the newly added Kafka metrics, it still makes sense when the target persona is human (convenient to check), but otherwise I agree with @viirya that it sounds like redundant. Despite the fact code change is not huge, probably good to split this down to two PRs with two JIRA issues 1) API changes 2) Kafka metrics, and finalize reviewing 1) first as there seems no outstanding concern on API changes. We can still go with this PR only, if @viirya is OK with adding redundant information.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] yijiacui-db commented on pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
yijiacui-db commented on pull request #31944:
URL: https://github.com/apache/spark/pull/31944#issuecomment-832164165


   retest this, please


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] yijiacui-db commented on pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
yijiacui-db commented on pull request #31944:
URL: https://github.com/apache/spark/pull/31944#issuecomment-825434415


   > I see that that the comments are resolved but no changes pushed. Are you still working on this, doing some tests or just forgotten?
   
   I'm sorry for the late reply. I was working on it but forgot to push the commit. Also I found a bug while I was working on it and also pushed the commit for the bug.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gaborgsomogyi commented on pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on pull request #31944:
URL: https://github.com/apache/spark/pull/31944#issuecomment-812477256


   Dynamic allocation based on load is something which is important to users and I support the direction so I'm fine.
   I need some time to test the PR on cluster though...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xuanyuanking commented on pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
xuanyuanking commented on pull request #31944:
URL: https://github.com/apache/spark/pull/31944#issuecomment-806420508


   Besides the flexibility, I can see the new API here can help with the expansibility for the progress reporter. If new customized metrics needed in the future, we don't need to change the top-level for the output JSON string instead of new fields in the map-like field. It also gave us the possibility to customize metrics for different SparkDataSource.
   
   ```
   It's definitely true that for Kafka source, this api isn't that necessary because of that reported latest offset.
   ```
   Yes. Maybe you can also move the metrics added in #30988 to your new implementation. Either way is OK for me. cc @viirya for more opinions.
   
   Agree with Gabor and Juntaek, here we need to provide more use cases for the new API. Besides the metrics added for Kafka source in this PR, from what I'm thinking, we can use this new API to expose more customized metrics for FileStreamSource only. E.g., the unread files, seen files, new files number now we exposed in the trace log.
   
   Of cause, end-users can implement this in their customized SparkDataStream. Maybe @yijiacui-db can provide more use cases.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #31944:
URL: https://github.com/apache/spark/pull/31944#issuecomment-832456860


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/138163/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] yijiacui-db edited a comment on pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
yijiacui-db edited a comment on pull request #31944:
URL: https://github.com/apache/spark/pull/31944#issuecomment-806074981


   > The latest offset in source progress is the latest offset available in the source, not the latest consumed offset by the stream.
   
   @viirya That's a good point. I referred to the latest consumed offset used in metrics method, without realizing that latestOffset available is reported by Kafka through reportLatestOffset. While implementing this metrics interface, it's more general for sources that don't implement reportLatestOffset, so that they can do some computation based on the consumed offset and report stats back.
   
   It's definitely true that for Kafka source, this api isn't that necessary because of that reported latest offset. @zsxwing @tdas Do you think that we should remove this api for kafka source because it's kinda duplicated? And if so, do we still want to merge the metrics api only to apache/spark?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gaborgsomogyi commented on a change in pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on a change in pull request #31944:
URL: https://github.com/apache/spark/pull/31944#discussion_r611520769



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
##########
@@ -188,7 +178,8 @@ class SourceProgress protected[sql](
   val latestOffset: String,
   val numInputRows: Long,
   val inputRowsPerSecond: Double,
-  val processedRowsPerSecond: Double) extends Serializable {
+  val processedRowsPerSecond: Double,
+  val metrics: ju.Map[String, String] = Map[String, String]().asJava) extends Serializable {

Review comment:
       Don't see neither an answer nor a change.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
viirya commented on pull request #31944:
URL: https://github.com/apache/spark/pull/31944#issuecomment-828714048


   > > I've tested it on real cluster and works fine.
   > > Just a question. How this it intended to use for dynamic allocation?
   > 
   > Users can implement this interface in their customized SparkDataStream and know how far falling behind through the progress listener. Maybe this can provide more useful information to guide/trigger the auto scaling.
   
   This is a valid user-case. But my question is that current offsets in `SourceProgress` should already provide the information the use-case needs (consumed offset, available offset). The source progress should be also available on the customized SparkDataStream. Do you mean the metrics from the customized SparkDataStream is not offset related?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gaborgsomogyi commented on a change in pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on a change in pull request #31944:
URL: https://github.com/apache/spark/pull/31944#discussion_r607607358



##########
File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
##########
@@ -218,3 +226,36 @@ private[kafka010] class KafkaMicroBatchStream(
     }
   }
 }
+
+object KafkaMicroBatchStream extends Logging {
+
+  /**
+   * Compute the difference of offset per partition between latestAvailablePartitionOffsets
+   * and latestConsumedPartitionOffsets.
+   * Because of rate limit, latest consumed offset per partition can be smaller than
+   * the latest available offset per partition.
+   * @param latestConsumedOffset latest consumed offset
+   * @param latestAvailablePartitionOffsets latest available offset per partition
+   * @return the generated metrics map
+   */
+  def metrics(latestConsumedOffset: Optional[Offset],

Review comment:
       Even if it's a nit I think instead of 8 spaces 4 needed, right?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gaborgsomogyi commented on a change in pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on a change in pull request #31944:
URL: https://github.com/apache/spark/pull/31944#discussion_r607611280



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
##########
@@ -175,14 +178,20 @@ trait ProgressReporter extends Logging {
 
     val sourceProgress = sources.distinct.map { source =>
       val numRecords = executionStats.inputRows.getOrElse(source, 0L)
+      val sourceMetrics = source match {
+        case withMetrics: ReportsSourceMetrics =>
+          withMetrics.metrics(Optional.ofNullable(latestStreamProgress.get(source).orNull))
+        case _ => Map[String, String]().asJava
+      }
       new SourceProgress(
         description = source.toString,
         startOffset = currentTriggerStartOffsets.get(source).orNull,
         endOffset = currentTriggerEndOffsets.get(source).orNull,
         latestOffset = currentTriggerLatestOffsets.get(source).orNull,
         numInputRows = numRecords,
         inputRowsPerSecond = numRecords / inputTimeSec,
-        processedRowsPerSecond = numRecords / processingTimeSec
+        processedRowsPerSecond = numRecords / processingTimeSec,

Review comment:
       Not tested but can `processingTimeSec` be zero?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #31944:
URL: https://github.com/apache/spark/pull/31944#issuecomment-805122921


   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] yijiacui-db commented on pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
yijiacui-db commented on pull request #31944:
URL: https://github.com/apache/spark/pull/31944#issuecomment-832171967


   retest this please


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] yijiacui-db commented on a change in pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
yijiacui-db commented on a change in pull request #31944:
URL: https://github.com/apache/spark/pull/31944#discussion_r612928297



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
##########
@@ -188,7 +178,8 @@ class SourceProgress protected[sql](
   val latestOffset: String,
   val numInputRows: Long,
   val inputRowsPerSecond: Double,
-  val processedRowsPerSecond: Double) extends Serializable {
+  val processedRowsPerSecond: Double,
+  val metrics: ju.Map[String, String] = Map[String, String]().asJava) extends Serializable {

Review comment:
       sorry I thought I replied to this. We can have a type def here, but it's not that necessary. I'm wondering what's the standard here? If we usually create a type def in this case then I can create a type def for this map. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on pull request #31944:
URL: https://github.com/apache/spark/pull/31944#issuecomment-828804207


   > > > I've tested it on real cluster and works fine.
   > > > Just a question. How this it intended to use for dynamic allocation?
   > > 
   > > Users can implement this interface in their customized SparkDataStream and know how far falling behind through the progress listener. Maybe this can provide more useful information to guide/trigger the auto scaling.
   
   > This is a valid user-case. But my question is that current offsets in `SourceProgress` should already provide the information the use-case needs (consumed offset, available offset).
   
   That is what understand as well - that is just a matter of "where" we want to put calculation.
   
   I have mixed feeling of this as:
   
   1) If the target persona is human, then I'd rather not let them calculate by themselves. It should be helpful to let Spark calculate and provide the information instead.
   
   2) If the target persona is a "process" (maybe Spark driver or some external app?), then it should not be that hard to calculate by itself.
   
   Not sure which is the actual use case for this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] yijiacui-db commented on a change in pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
yijiacui-db commented on a change in pull request #31944:
URL: https://github.com/apache/spark/pull/31944#discussion_r621002779



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
##########
@@ -188,7 +178,8 @@ class SourceProgress protected[sql](
   val latestOffset: String,
   val numInputRows: Long,
   val inputRowsPerSecond: Double,
-  val processedRowsPerSecond: Double) extends Serializable {
+  val processedRowsPerSecond: Double,
+  val metrics: ju.Map[String, String] = Map[String, String]().asJava) extends Serializable {

Review comment:
       Thanks!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #31944:
URL: https://github.com/apache/spark/pull/31944#issuecomment-832328322


   **[Test build #138163 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/138163/testReport)** for PR 31944 at commit [`2f13f28`](https://github.com/apache/spark/commit/2f13f28d7bee15bed657858c53dd8495ba8f1cbc).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on pull request #31944:
URL: https://github.com/apache/spark/pull/31944#issuecomment-832511403


   Thanks all for thoughtful reviewing and thanks @yijiacui-db for the contribution! Merged to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] yijiacui-db commented on a change in pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
yijiacui-db commented on a change in pull request #31944:
URL: https://github.com/apache/spark/pull/31944#discussion_r615586097



##########
File path: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
##########
@@ -1297,6 +1297,130 @@ class KafkaMicroBatchV2SourceSuite extends KafkaMicroBatchSourceSuiteBase {
       CheckNewAnswer(32, 33, 34, 35, 36)
     )
   }
+
+
+  test("test custom metrics - with rate limit") {
+    import testImplicits._
+
+    val topic = newTopic()
+    val data = 1 to 10
+    testUtils.createTopic(topic, partitions = 2)
+    testUtils.sendMessages(topic, (1 to 5).map(_.toString).toArray, Some(0))
+    testUtils.sendMessages(topic, (6 to 10).map(_.toString).toArray, Some(1))
+
+    val kafka = spark
+      .readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("subscribe", topic)
+      .option("maxOffsetsPerTrigger", 1)
+      .option(STARTING_OFFSETS_OPTION_KEY, "earliest")
+      .load()
+      .selectExpr("CAST(value AS STRING)")
+      .as[String]
+      .map(_.toInt)
+
+    testStream(kafka)(
+      StartStream(),
+      makeSureGetOffsetCalled,
+      CheckAnswer(data: _*),
+      Execute { query => {

Review comment:
       Removed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on pull request #31944:
URL: https://github.com/apache/spark/pull/31944#issuecomment-831032642


   retest this, please


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on pull request #31944:
URL: https://github.com/apache/spark/pull/31944#issuecomment-805610537


   Yeah I guess it's just a matter of parsing and calculation on user end. 
   
   * Do you have specific use case leveraging this information?
   * Are you planning to integrate this information to Spark UI or somewhere?
   * Could you please try out recent version of Spark and check the available information on SourceProgress, and see whether it could solve the same use case despite of some more calculation?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on a change in pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
viirya commented on a change in pull request #31944:
URL: https://github.com/apache/spark/pull/31944#discussion_r622374321



##########
File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
##########
@@ -133,6 +137,10 @@ private[kafka010] class KafkaMicroBatchStream(
 
   override def toString(): String = s"KafkaV2[$kafkaOffsetReader]"
 
+  override def metrics(latestOffset: Optional[Offset]): ju.Map[String, String] = {
+    KafkaMicroBatchStream.metrics(latestOffset, latestPartitionOffsets)

Review comment:
       `latestOffset` matches to `latestConsumedOffset` of `KafkaMicroBatchStream.metrics`. So it looks like the latest consumed offset.
   
   In the doc of `ReportsSourceMetrics`, when I read "Returns the metrics reported by the streaming source with respect to the latest offset", I thought the latest offset parameter is latest available offset.
   
   Could you make it clear in the API doc that the passed in is latest consumed offset?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xuanyuanking edited a comment on pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
xuanyuanking edited a comment on pull request #31944:
URL: https://github.com/apache/spark/pull/31944#issuecomment-806420508


   Besides the flexibility, I can see the new API here can help with the expansibility for the progress reporter. If new customized metrics needed in the future, we don't need to change the top-level for the output JSON string instead of new fields in the map-like field. It also gave us the possibility to customize metrics for different SparkDataSource.
   
   ```
   It's definitely true that for Kafka source, this api isn't that necessary because of that reported latest offset.
   ```
   Yes. Maybe you can also move the metrics added in #30988 to your new implementation. Either way is OK for me. cc @viirya for more opinions.
   
   Agree with Gabor and Juntaek, here we need to provide more use cases for the new API. Besides the metrics added for Kafka source in this PR, from what I'm thinking, we can use this new API to expose more customized metrics for FileStreamSource only. E.g., the files or number of bytes outstanding.
   
   Of cause, end-users can implement this in their customized SparkDataStream. Maybe @yijiacui-db can provide more use cases.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #31944:
URL: https://github.com/apache/spark/pull/31944#issuecomment-827861467


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/42524/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] yijiacui-db commented on a change in pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
yijiacui-db commented on a change in pull request #31944:
URL: https://github.com/apache/spark/pull/31944#discussion_r615585901



##########
File path: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
##########
@@ -1297,6 +1297,130 @@ class KafkaMicroBatchV2SourceSuite extends KafkaMicroBatchSourceSuiteBase {
       CheckNewAnswer(32, 33, 34, 35, 36)
     )
   }
+
+
+  test("test custom metrics - with rate limit") {
+    import testImplicits._
+
+    val topic = newTopic()
+    val data = 1 to 10
+    testUtils.createTopic(topic, partitions = 2)
+    testUtils.sendMessages(topic, (1 to 5).map(_.toString).toArray, Some(0))
+    testUtils.sendMessages(topic, (6 to 10).map(_.toString).toArray, Some(1))
+
+    val kafka = spark
+      .readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("subscribe", topic)
+      .option("maxOffsetsPerTrigger", 1)
+      .option(STARTING_OFFSETS_OPTION_KEY, "earliest")
+      .load()
+      .selectExpr("CAST(value AS STRING)")
+      .as[String]
+      .map(_.toInt)
+
+    testStream(kafka)(
+      StartStream(),
+      makeSureGetOffsetCalled,
+      CheckAnswer(data: _*),
+      Execute { query => {
+        // The rate limit is 1, so there must be some delay in offsets per partition.
+        val progressWithDelay = query.recentProgress.map(_.sources.head).reverse.find {

Review comment:
       Done

##########
File path: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
##########
@@ -1297,6 +1297,130 @@ class KafkaMicroBatchV2SourceSuite extends KafkaMicroBatchSourceSuiteBase {
       CheckNewAnswer(32, 33, 34, 35, 36)
     )
   }
+

Review comment:
       Removed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya edited a comment on pull request #31944: [SPARK-34297][SQL] Expose source metrics API and Add Kafka metrics to report delay.

Posted by GitBox <gi...@apache.org>.
viirya edited a comment on pull request #31944:
URL: https://github.com/apache/spark/pull/31944#issuecomment-805259607


   hi @yijiacui-db, thanks for working on this. I'm still working on this and related API. The community has an agreement (SPARK-34366, PR #31476) on metric API for data source. Next step I will implement the API at data source and also add to kafka. If you are interested in this work, welcome to help review the work later. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #31944:
URL: https://github.com/apache/spark/pull/31944#issuecomment-827826656


   **[Test build #138005 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/138005/testReport)** for PR 31944 at commit [`b870302`](https://github.com/apache/spark/commit/b870302f4b4060d900468d2cff3fe67addaa68f9).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] yijiacui-db commented on pull request #31944: [SPARK-34297][SQL] Expose source metrics API and Add Kafka metrics to report delay.

Posted by GitBox <gi...@apache.org>.
yijiacui-db commented on pull request #31944:
URL: https://github.com/apache/spark/pull/31944#issuecomment-805579056


   > Agree with @viirya on creating another JIRA. Let's link these two tickets together.
   > IMO, we should have the metrics both in UI and progress reporter. We can do the follow-ups to combine the code (e.g., the metrics collection part).
   > 
   > Also cc @HeartSaVioR @gaborgsomogyi @bozhang2820 @Ngone51 @zsxwing
   
   @viirya @xuanyuanking Thanks. I opened this SPARK-34854 to track the work. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
viirya commented on pull request #31944:
URL: https://github.com/apache/spark/pull/31944#issuecomment-806023954


   The latest offset in source progress is the latest offset available in the source, not the latest consumed offset by the stream.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gaborgsomogyi commented on a change in pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on a change in pull request #31944:
URL: https://github.com/apache/spark/pull/31944#discussion_r613965855



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
##########
@@ -188,7 +178,8 @@ class SourceProgress protected[sql](
   val latestOffset: String,
   val numInputRows: Long,
   val inputRowsPerSecond: Double,
-  val processedRowsPerSecond: Double) extends Serializable {
+  val processedRowsPerSecond: Double,
+  val metrics: ju.Map[String, String] = Map[String, String]().asJava) extends Serializable {

Review comment:
       When the code can be simplified by removing things then that's the suggested way.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] yijiacui-db commented on pull request #31944: [SPARK-34297][SQL] Expose source metrics API and Add Kafka metrics to report delay.

Posted by GitBox <gi...@apache.org>.
yijiacui-db commented on pull request #31944:
URL: https://github.com/apache/spark/pull/31944#issuecomment-805278090


   > hi @yijiacui-db, thanks for working on this. I'm still working on this and related API. The community has an agreement ([SPARK-34366](https://issues.apache.org/jira/browse/SPARK-34366), PR #31476) on metric API for data source. Next step I will implement the API at data source including kafka data source. If you are interested in this work, welcome to help review the work later. Thanks.
   
   Hi @viirya, this pull request exposes the backlog metrics to end-user through streaming query progress, not the metrics in the Spark UI.  I think it makes sense to combine both of them to expose this information at different levels. Sorry I used your jira number because I thought that's the jira number I should use. I can make another jira to track this work if you perfer.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xuanyuanking commented on pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
xuanyuanking commented on pull request #31944:
URL: https://github.com/apache/spark/pull/31944#issuecomment-810876034


   ```
   They want to dynamically adjust the size of the cluster based on how far they're falling behind the latest. With this PR, they can be exposed to this metrics through source progress at the end of each batch.
   ```
   I think it's a reasonable use case for me. cc @HeartSaVioR @gaborgsomogyi for their opinions.
   I'll take a close look at the code today.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] yijiacui-db commented on a change in pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
yijiacui-db commented on a change in pull request #31944:
URL: https://github.com/apache/spark/pull/31944#discussion_r615591627



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
##########
@@ -138,31 +138,21 @@ class StreamingQueryProgress private[sql](
   override def toString: String = prettyJson
 
   private[sql] def jsonValue: JValue = {
-    def safeDoubleToJValue(value: Double): JValue = {
-      if (value.isNaN || value.isInfinity) JNothing else JDouble(value)
-    }
-
-    /** Convert map to JValue while handling empty maps. Also, this sorts the keys. */
-    def safeMapToJValue[T](map: ju.Map[String, T], valueToJValue: T => JValue): JValue = {
-      if (map.isEmpty) return JNothing
-      val keys = map.asScala.keySet.toSeq.sorted
-      keys.map { k => k -> valueToJValue(map.get(k)) : JObject }.reduce(_ ~ _)
-    }
-
     ("id" -> JString(id.toString)) ~
     ("runId" -> JString(runId.toString)) ~
     ("name" -> JString(name)) ~
     ("timestamp" -> JString(timestamp)) ~
     ("batchId" -> JInt(batchId)) ~
     ("numInputRows" -> JInt(numInputRows)) ~
-    ("inputRowsPerSecond" -> safeDoubleToJValue(inputRowsPerSecond)) ~
-    ("processedRowsPerSecond" -> safeDoubleToJValue(processedRowsPerSecond)) ~
-    ("durationMs" -> safeMapToJValue[JLong](durationMs, v => JInt(v.toLong))) ~
-    ("eventTime" -> safeMapToJValue[String](eventTime, s => JString(s))) ~
+    ("inputRowsPerSecond" -> SafeJsonSerializer.safeDoubleToJValue(inputRowsPerSecond)) ~

Review comment:
       Imported.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
##########
@@ -138,31 +138,21 @@ class StreamingQueryProgress private[sql](
   override def toString: String = prettyJson
 
   private[sql] def jsonValue: JValue = {
-    def safeDoubleToJValue(value: Double): JValue = {
-      if (value.isNaN || value.isInfinity) JNothing else JDouble(value)
-    }
-
-    /** Convert map to JValue while handling empty maps. Also, this sorts the keys. */
-    def safeMapToJValue[T](map: ju.Map[String, T], valueToJValue: T => JValue): JValue = {
-      if (map.isEmpty) return JNothing
-      val keys = map.asScala.keySet.toSeq.sorted
-      keys.map { k => k -> valueToJValue(map.get(k)) : JObject }.reduce(_ ~ _)
-    }
-
     ("id" -> JString(id.toString)) ~
     ("runId" -> JString(runId.toString)) ~
     ("name" -> JString(name)) ~
     ("timestamp" -> JString(timestamp)) ~
     ("batchId" -> JInt(batchId)) ~
     ("numInputRows" -> JInt(numInputRows)) ~
-    ("inputRowsPerSecond" -> safeDoubleToJValue(inputRowsPerSecond)) ~
-    ("processedRowsPerSecond" -> safeDoubleToJValue(processedRowsPerSecond)) ~
-    ("durationMs" -> safeMapToJValue[JLong](durationMs, v => JInt(v.toLong))) ~
-    ("eventTime" -> safeMapToJValue[String](eventTime, s => JString(s))) ~
+    ("inputRowsPerSecond" -> SafeJsonSerializer.safeDoubleToJValue(inputRowsPerSecond)) ~
+    ("processedRowsPerSecond" -> SafeJsonSerializer.safeDoubleToJValue(processedRowsPerSecond)) ~
+    ("durationMs" -> SafeJsonSerializer.safeMapToJValue[JLong](durationMs, v => JInt(v.toLong))) ~
+    ("eventTime" -> SafeJsonSerializer.safeMapToJValue[String](eventTime, s => JString(s))) ~
     ("stateOperators" -> JArray(stateOperators.map(_.jsonValue).toList)) ~
     ("sources" -> JArray(sources.map(_.jsonValue).toList)) ~
     ("sink" -> sink.jsonValue) ~
-    ("observedMetrics" -> safeMapToJValue[Row](observedMetrics, row => row.jsonValue))
+    ("observedMetrics" ->
+      SafeJsonSerializer.safeMapToJValue[Row](observedMetrics, row => row.jsonValue))

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #31944:
URL: https://github.com/apache/spark/pull/31944#issuecomment-828001436


   **[Test build #138005 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/138005/testReport)** for PR 31944 at commit [`b870302`](https://github.com/apache/spark/commit/b870302f4b4060d900468d2cff3fe67addaa68f9).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] yijiacui-db edited a comment on pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
yijiacui-db edited a comment on pull request #31944:
URL: https://github.com/apache/spark/pull/31944#issuecomment-806074981






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #31944:
URL: https://github.com/apache/spark/pull/31944#issuecomment-827852292


   Kubernetes integration test unable to build dist.
   
   exiting with code: 1
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42524/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] yijiacui-db commented on pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
yijiacui-db commented on pull request #31944:
URL: https://github.com/apache/spark/pull/31944#issuecomment-828836467


   > > > > I've tested it on real cluster and works fine.
   > > > > Just a question. How this it intended to use for dynamic allocation?
   > > > 
   > > > 
   > > > Users can implement this interface in their customized SparkDataStream and know how far falling behind through the progress listener. Maybe this can provide more useful information to guide/trigger the auto scaling.
   > 
   > > This is a valid user-case. But my question is that current offsets in `SourceProgress` should already provide the information the use-case needs (consumed offset, available offset).
   > 
   > That is what understand as well - that is just a matter of "where" we want to put calculation.
   > 
   > I have mixed feeling of this as:
   > 
   > 1. If the target persona is human, then I'd rather not let them calculate by themselves. It should be helpful to let Spark calculate and provide the information instead.
   > 2. If the target persona is a "process" (maybe Spark driver or some external app?), then it should not be that hard to calculate by itself.
   > 
   > Not sure which is the actual use case for this PR.
   
   @HeartSaVioR This is a good question! I already updated my answer in the comment above, for how it works, and why we need this metrics interface. No matter whether the target persona is human or process, it's always possible that what is available as the latest is something internal to the customized spark data stream and can't be reported as offset, so it's not possible to calculate metrics using offsets and report them as offsets. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gaborgsomogyi commented on pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on pull request #31944:
URL: https://github.com/apache/spark/pull/31944#issuecomment-808168948


   FYI, there was a PR to commit back the offsets from Spark itself which was not satisfying to everybody needs so was not merged.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] yijiacui-db commented on a change in pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
yijiacui-db commented on a change in pull request #31944:
URL: https://github.com/apache/spark/pull/31944#discussion_r610805416



##########
File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
##########
@@ -218,3 +226,36 @@ private[kafka010] class KafkaMicroBatchStream(
     }
   }
 }
+
+object KafkaMicroBatchStream extends Logging {
+
+  /**
+   * Compute the difference of offset per partition between latestAvailablePartitionOffsets
+   * and latestConsumedPartitionOffsets.
+   * Because of rate limit, latest consumed offset per partition can be smaller than
+   * the latest available offset per partition.
+   * @param latestConsumedOffset latest consumed offset
+   * @param latestAvailablePartitionOffsets latest available offset per partition
+   * @return the generated metrics map
+   */
+  def metrics(latestConsumedOffset: Optional[Offset],
+              latestAvailablePartitionOffsets: PartitionOffsetMap): ju.Map[String, String] = {
+    val offset = Option(latestConsumedOffset.orElse(null))
+
+    if (offset.nonEmpty) {
+      val consumedPartitionOffsets = offset.map(KafkaSourceOffset(_)).get.partitionToOffsets
+      val offsetsBehindLatest = latestAvailablePartitionOffsets
+        .filter(partitionOffset => consumedPartitionOffsets.contains(partitionOffset._1))
+        .map(partitionOffset =>
+          partitionOffset._2 - consumedPartitionOffsets.get(partitionOffset._1).get)
+      if (offsetsBehindLatest.size > 0) {

Review comment:
       Done.

##########
File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
##########
@@ -218,3 +226,36 @@ private[kafka010] class KafkaMicroBatchStream(
     }
   }
 }
+
+object KafkaMicroBatchStream extends Logging {
+
+  /**
+   * Compute the difference of offset per partition between latestAvailablePartitionOffsets
+   * and latestConsumedPartitionOffsets.
+   * Because of rate limit, latest consumed offset per partition can be smaller than
+   * the latest available offset per partition.
+   * @param latestConsumedOffset latest consumed offset
+   * @param latestAvailablePartitionOffsets latest available offset per partition
+   * @return the generated metrics map
+   */
+  def metrics(latestConsumedOffset: Optional[Offset],

Review comment:
       Done.

##########
File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
##########
@@ -218,3 +226,36 @@ private[kafka010] class KafkaMicroBatchStream(
     }
   }
 }
+
+object KafkaMicroBatchStream extends Logging {
+
+  /**
+   * Compute the difference of offset per partition between latestAvailablePartitionOffsets
+   * and latestConsumedPartitionOffsets.
+   * Because of rate limit, latest consumed offset per partition can be smaller than
+   * the latest available offset per partition.
+   * @param latestConsumedOffset latest consumed offset
+   * @param latestAvailablePartitionOffsets latest available offset per partition
+   * @return the generated metrics map
+   */
+  def metrics(latestConsumedOffset: Optional[Offset],
+              latestAvailablePartitionOffsets: PartitionOffsetMap): ju.Map[String, String] = {
+    val offset = Option(latestConsumedOffset.orElse(null))
+
+    if (offset.nonEmpty) {
+      val consumedPartitionOffsets = offset.map(KafkaSourceOffset(_)).get.partitionToOffsets
+      val offsetsBehindLatest = latestAvailablePartitionOffsets
+        .filter(partitionOffset => consumedPartitionOffsets.contains(partitionOffset._1))
+        .map(partitionOffset =>
+          partitionOffset._2 - consumedPartitionOffsets.get(partitionOffset._1).get)
+      if (offsetsBehindLatest.size > 0) {
+        val avgOffsetBehindLatest = offsetsBehindLatest.sum.toDouble / offsetsBehindLatest.size
+        return Map[String, String](
+          "minOffsetsBehindLatest" -> offsetsBehindLatest.min.toString,
+          "maxOffsetsBehindLatest" -> offsetsBehindLatest.max.toString,
+          "avgOffsetsBehindLatest" -> avgOffsetBehindLatest.toString).asJava
+      }
+    }
+    Map[String, String]().asJava

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on pull request #31944:
URL: https://github.com/apache/spark/pull/31944#issuecomment-806196978


   What more metrics you have in mind? I see the flexibility on adding some sort of custom metrics, but want to make sure there're clear actual use cases, as we're adding public API here.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xuanyuanking commented on pull request #31944: [SPARK-34297][SQL] Expose source metrics API and Add Kafka metrics to report delay.

Posted by GitBox <gi...@apache.org>.
xuanyuanking commented on pull request #31944:
URL: https://github.com/apache/spark/pull/31944#issuecomment-805583538


   It's OK to keep this PR open, just change the title to the new Jira number. :) @yijiacui-db 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] yijiacui-db commented on pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
yijiacui-db commented on pull request #31944:
URL: https://github.com/apache/spark/pull/31944#issuecomment-805588711


   > It's OK to keep this PR open, just change the title to the new Jira number. :) @yijiacui-db
   
   @xuanyuanking Thanks! I reopened it and changed the jira # and also descriptions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya edited a comment on pull request #31944: [SPARK-34297][SQL] Expose source metrics API and Add Kafka metrics to report delay.

Posted by GitBox <gi...@apache.org>.
viirya edited a comment on pull request #31944:
URL: https://github.com/apache/spark/pull/31944#issuecomment-805259607


   hi @yijiacui-db, thanks for working on this. I'm still working on this and related API. The community has an agreement (SPARK-34366, PR #31476) on metric API for data source. It was merged recently. Next step I will implement the API at data source including kafka data source. If you are interested in this work, welcome to help review the work later. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] yijiacui-db commented on pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
yijiacui-db commented on pull request #31944:
URL: https://github.com/apache/spark/pull/31944#issuecomment-805966497


   > Yeah I guess it's just a matter of parsing and calculation on user end.
   > 
   > * Do you have specific use case leveraging this information?
   > * Are you planning to integrate this information to Spark UI or somewhere?
   > * Could you please try out recent version of Spark and check the available information on SourceProgress, and see whether it could solve the same use case despite of some more calculation?
   
   @viirya @HeartSaVioR  I don't think that's a duplicated information in source progress. The information recorded in the source progress now is the latest consumed offset by the stream, not the latest offset available in the source. Take Kafka as an example, we can have read limit while consuming the offsets, so we can only consume some certain number of offset, but the available data in kafka is more than that. That can be applied to all the other streaming sources too. There are some users want to know whether they fall behind and want to adjust the cluster size accordingly.
   
   I don't think the current spark progress can calculate the information as i mentioned above, because the latest offset available information is internal for the source, there's no way to know that with the current source progress. 
   
   I didn't have a plan for integrating this information with spark UI. That's something I can work on after @viirya 's PR is merged in. I can refactor and adjust accordingly to see whether this metrics information can be exposed through spark UI too. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on pull request #31944: [SPARK-34297][SQL] Expose source metrics API and Add Kafka metrics to report delay.

Posted by GitBox <gi...@apache.org>.
viirya commented on pull request #31944:
URL: https://github.com/apache/spark/pull/31944#issuecomment-805259607


   hi @yijiacui-db, thanks for working on this. I'm still working on this and related API. The community has an agreement (SPARK-34366, PR #31476) on metric API for data source. Next step I will implement the API at data source (kafka). If you are interested in this work, welcome to help review the work later. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] yijiacui-db commented on pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
yijiacui-db commented on pull request #31944:
URL: https://github.com/apache/spark/pull/31944#issuecomment-806074981


   > The latest offset in source progress is the latest offset available in the source, not the latest consumed offset by the stream.
   
   @viirya That's a good point. I referred to the latest consumed offset used in metrics method, without realizing that latestOffset available is reported by Kafka through reportLatestOffset. While implementing this metrics interface, it's more general for sources that don't implement reportLatestOffset, who doesn't have access to latest available offset in the source progress.
   
   It's definitely true that for Kafka source, this api isn't that necessary because of that reported latest offset. @zsxwing @tdas Do you think that we should remove this api for kafka source because it's kinda duplicated? And if so, do we still want to merge the metrics api only to apache/spark?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR closed pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
HeartSaVioR closed pull request #31944:
URL: https://github.com/apache/spark/pull/31944


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] yijiacui-db commented on a change in pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
yijiacui-db commented on a change in pull request #31944:
URL: https://github.com/apache/spark/pull/31944#discussion_r615586523



##########
File path: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
##########
@@ -1297,6 +1297,130 @@ class KafkaMicroBatchV2SourceSuite extends KafkaMicroBatchSourceSuiteBase {
       CheckNewAnswer(32, 33, 34, 35, 36)
     )
   }
+
+
+  test("test custom metrics - with rate limit") {
+    import testImplicits._
+
+    val topic = newTopic()
+    val data = 1 to 10
+    testUtils.createTopic(topic, partitions = 2)
+    testUtils.sendMessages(topic, (1 to 5).map(_.toString).toArray, Some(0))
+    testUtils.sendMessages(topic, (6 to 10).map(_.toString).toArray, Some(1))
+
+    val kafka = spark
+      .readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("subscribe", topic)
+      .option("maxOffsetsPerTrigger", 1)
+      .option(STARTING_OFFSETS_OPTION_KEY, "earliest")
+      .load()
+      .selectExpr("CAST(value AS STRING)")
+      .as[String]
+      .map(_.toInt)
+
+    testStream(kafka)(
+      StartStream(),
+      makeSureGetOffsetCalled,
+      CheckAnswer(data: _*),
+      Execute { query => {
+        // The rate limit is 1, so there must be some delay in offsets per partition.
+        val progressWithDelay = query.recentProgress.map(_.sources.head).reverse.find {
+          progress =>
+            !progress.metrics.isEmpty && {
+              // find the metrics that has non-zero average offsetsBehindLatest greater than 0.
+              progress.metrics.get("avgOffsetsBehindLatest").toDouble > 0
+            }
+        }
+        assert(progressWithDelay.nonEmpty)
+        val metrics = progressWithDelay.get.metrics
+        assert(metrics.keySet() ==
+          Set("minOffsetsBehindLatest",
+            "maxOffsetsBehindLatest",
+            "avgOffsetsBehindLatest").asJava)
+        assert(metrics.get("minOffsetsBehindLatest").toLong > 0)
+        assert(metrics.get("maxOffsetsBehindLatest").toLong > 0)
+        assert(metrics.get("avgOffsetsBehindLatest").toDouble > 0)
+      }}
+    )
+  }
+
+  test("test custom metrics - no rate limit") {
+    import testImplicits._
+
+    val topic = newTopic()
+    val data = 1 to 10
+    testUtils.createTopic(topic, partitions = 2)
+    testUtils.sendMessages(topic, (1 to 5).map(_.toString).toArray, Some(0))
+    testUtils.sendMessages(topic, (6 to 10).map(_.toString).toArray, Some(1))
+
+    val kafka = spark
+      .readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("subscribe", topic)
+      .option(STARTING_OFFSETS_OPTION_KEY, "earliest")
+      .load()
+      .selectExpr("CAST(value AS STRING)")
+      .as[String]
+      .map(_.toInt)
+
+    testStream(kafka)(
+      StartStream(),
+      makeSureGetOffsetCalled,
+      CheckAnswer(data: _*),
+      Execute { query => {
+        val progress = query.recentProgress.map(_.sources.head).lastOption
+        assert(progress.nonEmpty)
+        val metrics = progress.get.metrics
+        // When there is no rate limit, there shouldn't be any delay in the current stream.
+        assert(metrics.keySet() ==
+          Set("minOffsetsBehindLatest",
+            "maxOffsetsBehindLatest",
+            "avgOffsetsBehindLatest").asJava)
+        assert(metrics.get("minOffsetsBehindLatest").toLong == 0)

Review comment:
       Changed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #31944:
URL: https://github.com/apache/spark/pull/31944#issuecomment-827861467


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/42524/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] yijiacui-db commented on pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
yijiacui-db commented on pull request #31944:
URL: https://github.com/apache/spark/pull/31944#issuecomment-810488123


   @gaborgsomogyi @xuanyuanking We have some customized sources following the user’s requirements. They want to  dynamically adjust the size of the cluster based on how far they're falling behind the latest. With this PR, they can be exposed to this metrics through source progress at the end of each batch. 
   
   @tdas  @zsxwing  Feel free to correct me / add more details.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] yijiacui-db commented on a change in pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
yijiacui-db commented on a change in pull request #31944:
URL: https://github.com/apache/spark/pull/31944#discussion_r615595382



##########
File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
##########
@@ -218,3 +226,37 @@ private[kafka010] class KafkaMicroBatchStream(
     }
   }
 }
+
+object KafkaMicroBatchStream extends Logging {
+
+  /**
+   * Compute the difference of offset per partition between latestAvailablePartitionOffsets
+   * and latestConsumedPartitionOffsets.
+   * Because of rate limit, latest consumed offset per partition can be smaller than
+   * the latest available offset per partition.
+   * @param latestConsumedOffset latest consumed offset
+   * @param latestAvailablePartitionOffsets latest available offset per partition
+   * @return the generated metrics map
+   */
+  def metrics(
+      latestConsumedOffset: Optional[Offset],
+      latestAvailablePartitionOffsets: PartitionOffsetMap): ju.Map[String, String] = {
+    val offset = Option(latestConsumedOffset.orElse(null))
+
+    if (offset.nonEmpty) {
+      val consumedPartitionOffsets = offset.map(KafkaSourceOffset(_)).get.partitionToOffsets
+      val offsetsBehindLatest = latestAvailablePartitionOffsets
+        .filter(partitionOffset => consumedPartitionOffsets.contains(partitionOffset._1))
+        .map(partitionOffset =>

Review comment:
       changed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gaborgsomogyi commented on pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on pull request #31944:
URL: https://github.com/apache/spark/pull/31944#issuecomment-823098027


   I see that that the comments are resolved but no changes pushed. Are you still working on this, doing some tests or just forgotten?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #31944:
URL: https://github.com/apache/spark/pull/31944#issuecomment-832394666


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/42684/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR edited a comment on pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
HeartSaVioR edited a comment on pull request #31944:
URL: https://github.com/apache/spark/pull/31944#issuecomment-829611545


   Yes that sounds like a good rationalization for real case. Thanks!
   
   I looked into the changes on API side, and felt both #30988 and this can co-exist. #30988 covers specific cases where latest offset as Offset format can be provided by data source, and this covers more general ("arbitrary" might fit better) cases where the information data source wants to provide is not limited to the latest offset.
   
   For sure, the actual behavioral change in #30988 can be implemented with the API being added here, but providing general output across data sources would be ideally more useful, like plotting to the UI. (I know the technical lack here on making it general as the format of "Offset" is varying across data sources and consumer has to take care.)
   
   For the newly added Kafka metrics, it still makes sense when the target persona is human (convenient to check), but otherwise I agree with @viirya that it sounds like redundant. Despite the fact code change is not huge, probably good to split this down to two PRs with two JIRA issues 1) API changes 2) Kafka metrics, and finalize reviewing 1) first as there seems no outstanding concern on API changes. We can still go with this PR only if @viirya is OK with adding redundant information.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] yijiacui-db edited a comment on pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
yijiacui-db edited a comment on pull request #31944:
URL: https://github.com/apache/spark/pull/31944#issuecomment-829531520


   > Yeah I agree about the rationalization and benefits of "adding public API on custom source metrics", though it'd be even better if we could talk with real case which is not covered by #30988.
   > 
   > I feel that the reason the review gets dragging is due to Kafka use-case. Your explanation may make sense on "other" data source (hypothetically, as you haven't provided actual one), but for Kafka case it's possible for specific process to calculate lag with the change of #30988. I agree it's bad for human being to calculate the lag per topic partition and summarize by him/herself, but it's still not that hard for specific process to do that.
   
   @viirya @HeartSaVioR
   
   A good example is FileStreamSource, which doesn't implement the reportLatestOffset, because the latest available source isn't matched with the "Offset" representation in the Spark streaming. 
   
   In FileStreamSource, fetchMaxOffsests returns the maximum offset that can be retrieved from the source, which can be rate limited.  Only the file source itself knows internally that how many files are left to be processed for the batch. Possible metrics here to be exposed to the users is the number of files, and the number of bytes remaining in the batch to be processed, which is how far the application is falling behind the stream. Those metrics can't be computed through the current information in source progress report, so we need the metrics api to expose metrics that can only be computed internally to users. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] yijiacui-db commented on a change in pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
yijiacui-db commented on a change in pull request #31944:
URL: https://github.com/apache/spark/pull/31944#discussion_r615588004



##########
File path: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
##########
@@ -1297,6 +1297,130 @@ class KafkaMicroBatchV2SourceSuite extends KafkaMicroBatchSourceSuiteBase {
       CheckNewAnswer(32, 33, 34, 35, 36)
     )
   }
+
+
+  test("test custom metrics - with rate limit") {
+    import testImplicits._
+
+    val topic = newTopic()
+    val data = 1 to 10
+    testUtils.createTopic(topic, partitions = 2)
+    testUtils.sendMessages(topic, (1 to 5).map(_.toString).toArray, Some(0))
+    testUtils.sendMessages(topic, (6 to 10).map(_.toString).toArray, Some(1))
+
+    val kafka = spark
+      .readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("subscribe", topic)
+      .option("maxOffsetsPerTrigger", 1)
+      .option(STARTING_OFFSETS_OPTION_KEY, "earliest")
+      .load()
+      .selectExpr("CAST(value AS STRING)")
+      .as[String]
+      .map(_.toInt)
+
+    testStream(kafka)(
+      StartStream(),
+      makeSureGetOffsetCalled,
+      CheckAnswer(data: _*),
+      Execute { query => {
+        // The rate limit is 1, so there must be some delay in offsets per partition.
+        val progressWithDelay = query.recentProgress.map(_.sources.head).reverse.find {
+          progress =>
+            !progress.metrics.isEmpty && {
+              // find the metrics that has non-zero average offsetsBehindLatest greater than 0.
+              progress.metrics.get("avgOffsetsBehindLatest").toDouble > 0
+            }
+        }
+        assert(progressWithDelay.nonEmpty)
+        val metrics = progressWithDelay.get.metrics
+        assert(metrics.keySet() ==
+          Set("minOffsetsBehindLatest",
+            "maxOffsetsBehindLatest",
+            "avgOffsetsBehindLatest").asJava)
+        assert(metrics.get("minOffsetsBehindLatest").toLong > 0)
+        assert(metrics.get("maxOffsetsBehindLatest").toLong > 0)
+        assert(metrics.get("avgOffsetsBehindLatest").toDouble > 0)
+      }}
+    )
+  }
+
+  test("test custom metrics - no rate limit") {
+    import testImplicits._
+
+    val topic = newTopic()
+    val data = 1 to 10
+    testUtils.createTopic(topic, partitions = 2)
+    testUtils.sendMessages(topic, (1 to 5).map(_.toString).toArray, Some(0))
+    testUtils.sendMessages(topic, (6 to 10).map(_.toString).toArray, Some(1))
+
+    val kafka = spark
+      .readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("subscribe", topic)
+      .option(STARTING_OFFSETS_OPTION_KEY, "earliest")
+      .load()
+      .selectExpr("CAST(value AS STRING)")
+      .as[String]
+      .map(_.toInt)
+
+    testStream(kafka)(
+      StartStream(),
+      makeSureGetOffsetCalled,
+      CheckAnswer(data: _*),
+      Execute { query => {
+        val progress = query.recentProgress.map(_.sources.head).lastOption
+        assert(progress.nonEmpty)
+        val metrics = progress.get.metrics
+        // When there is no rate limit, there shouldn't be any delay in the current stream.
+        assert(metrics.keySet() ==
+          Set("minOffsetsBehindLatest",
+            "maxOffsetsBehindLatest",
+            "avgOffsetsBehindLatest").asJava)
+        assert(metrics.get("minOffsetsBehindLatest").toLong == 0)
+        assert(metrics.get("maxOffsetsBehindLatest").toLong == 0)
+        assert(metrics.get("avgOffsetsBehindLatest").toDouble == 0)
+      }}
+    )
+  }
+
+  test("test custom metrics - corner cases") {
+    val topic1 = new TopicPartition(newTopic(), 0)

Review comment:
       Changed.

##########
File path: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
##########
@@ -1297,6 +1297,130 @@ class KafkaMicroBatchV2SourceSuite extends KafkaMicroBatchSourceSuiteBase {
       CheckNewAnswer(32, 33, 34, 35, 36)
     )
   }
+
+
+  test("test custom metrics - with rate limit") {
+    import testImplicits._
+
+    val topic = newTopic()
+    val data = 1 to 10
+    testUtils.createTopic(topic, partitions = 2)
+    testUtils.sendMessages(topic, (1 to 5).map(_.toString).toArray, Some(0))
+    testUtils.sendMessages(topic, (6 to 10).map(_.toString).toArray, Some(1))
+
+    val kafka = spark
+      .readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("subscribe", topic)
+      .option("maxOffsetsPerTrigger", 1)
+      .option(STARTING_OFFSETS_OPTION_KEY, "earliest")
+      .load()
+      .selectExpr("CAST(value AS STRING)")
+      .as[String]
+      .map(_.toInt)
+
+    testStream(kafka)(
+      StartStream(),
+      makeSureGetOffsetCalled,
+      CheckAnswer(data: _*),
+      Execute { query => {
+        // The rate limit is 1, so there must be some delay in offsets per partition.
+        val progressWithDelay = query.recentProgress.map(_.sources.head).reverse.find {
+          progress =>
+            !progress.metrics.isEmpty && {
+              // find the metrics that has non-zero average offsetsBehindLatest greater than 0.
+              progress.metrics.get("avgOffsetsBehindLatest").toDouble > 0
+            }
+        }
+        assert(progressWithDelay.nonEmpty)
+        val metrics = progressWithDelay.get.metrics
+        assert(metrics.keySet() ==
+          Set("minOffsetsBehindLatest",
+            "maxOffsetsBehindLatest",
+            "avgOffsetsBehindLatest").asJava)
+        assert(metrics.get("minOffsetsBehindLatest").toLong > 0)
+        assert(metrics.get("maxOffsetsBehindLatest").toLong > 0)
+        assert(metrics.get("avgOffsetsBehindLatest").toDouble > 0)
+      }}
+    )
+  }
+
+  test("test custom metrics - no rate limit") {
+    import testImplicits._
+
+    val topic = newTopic()
+    val data = 1 to 10
+    testUtils.createTopic(topic, partitions = 2)
+    testUtils.sendMessages(topic, (1 to 5).map(_.toString).toArray, Some(0))
+    testUtils.sendMessages(topic, (6 to 10).map(_.toString).toArray, Some(1))
+
+    val kafka = spark
+      .readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("subscribe", topic)
+      .option(STARTING_OFFSETS_OPTION_KEY, "earliest")
+      .load()
+      .selectExpr("CAST(value AS STRING)")
+      .as[String]
+      .map(_.toInt)
+
+    testStream(kafka)(
+      StartStream(),
+      makeSureGetOffsetCalled,
+      CheckAnswer(data: _*),
+      Execute { query => {
+        val progress = query.recentProgress.map(_.sources.head).lastOption
+        assert(progress.nonEmpty)
+        val metrics = progress.get.metrics
+        // When there is no rate limit, there shouldn't be any delay in the current stream.
+        assert(metrics.keySet() ==
+          Set("minOffsetsBehindLatest",
+            "maxOffsetsBehindLatest",
+            "avgOffsetsBehindLatest").asJava)
+        assert(metrics.get("minOffsetsBehindLatest").toLong == 0)
+        assert(metrics.get("maxOffsetsBehindLatest").toLong == 0)
+        assert(metrics.get("avgOffsetsBehindLatest").toDouble == 0)
+      }}
+    )
+  }
+
+  test("test custom metrics - corner cases") {
+    val topic1 = new TopicPartition(newTopic(), 0)
+    val topic2 = new TopicPartition(newTopic(), 0)
+    val latestOffset = Map[TopicPartition, Long]((topic1, 3L), (topic2, 6L))
+
+    // test empty offset.
+    assert(KafkaMicroBatchStream.metrics(Optional.ofNullable(null), latestOffset) ==

Review comment:
       Done.

##########
File path: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
##########
@@ -1297,6 +1297,130 @@ class KafkaMicroBatchV2SourceSuite extends KafkaMicroBatchSourceSuiteBase {
       CheckNewAnswer(32, 33, 34, 35, 36)
     )
   }
+
+
+  test("test custom metrics - with rate limit") {
+    import testImplicits._
+
+    val topic = newTopic()
+    val data = 1 to 10
+    testUtils.createTopic(topic, partitions = 2)
+    testUtils.sendMessages(topic, (1 to 5).map(_.toString).toArray, Some(0))
+    testUtils.sendMessages(topic, (6 to 10).map(_.toString).toArray, Some(1))
+
+    val kafka = spark
+      .readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("subscribe", topic)
+      .option("maxOffsetsPerTrigger", 1)
+      .option(STARTING_OFFSETS_OPTION_KEY, "earliest")
+      .load()
+      .selectExpr("CAST(value AS STRING)")
+      .as[String]
+      .map(_.toInt)
+
+    testStream(kafka)(
+      StartStream(),
+      makeSureGetOffsetCalled,
+      CheckAnswer(data: _*),
+      Execute { query => {
+        // The rate limit is 1, so there must be some delay in offsets per partition.
+        val progressWithDelay = query.recentProgress.map(_.sources.head).reverse.find {
+          progress =>
+            !progress.metrics.isEmpty && {
+              // find the metrics that has non-zero average offsetsBehindLatest greater than 0.
+              progress.metrics.get("avgOffsetsBehindLatest").toDouble > 0
+            }
+        }
+        assert(progressWithDelay.nonEmpty)
+        val metrics = progressWithDelay.get.metrics
+        assert(metrics.keySet() ==
+          Set("minOffsetsBehindLatest",
+            "maxOffsetsBehindLatest",
+            "avgOffsetsBehindLatest").asJava)
+        assert(metrics.get("minOffsetsBehindLatest").toLong > 0)
+        assert(metrics.get("maxOffsetsBehindLatest").toLong > 0)
+        assert(metrics.get("avgOffsetsBehindLatest").toDouble > 0)
+      }}
+    )
+  }
+
+  test("test custom metrics - no rate limit") {
+    import testImplicits._
+
+    val topic = newTopic()
+    val data = 1 to 10
+    testUtils.createTopic(topic, partitions = 2)
+    testUtils.sendMessages(topic, (1 to 5).map(_.toString).toArray, Some(0))
+    testUtils.sendMessages(topic, (6 to 10).map(_.toString).toArray, Some(1))
+
+    val kafka = spark
+      .readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("subscribe", topic)
+      .option(STARTING_OFFSETS_OPTION_KEY, "earliest")
+      .load()
+      .selectExpr("CAST(value AS STRING)")
+      .as[String]
+      .map(_.toInt)
+
+    testStream(kafka)(
+      StartStream(),
+      makeSureGetOffsetCalled,
+      CheckAnswer(data: _*),
+      Execute { query => {
+        val progress = query.recentProgress.map(_.sources.head).lastOption
+        assert(progress.nonEmpty)
+        val metrics = progress.get.metrics
+        // When there is no rate limit, there shouldn't be any delay in the current stream.
+        assert(metrics.keySet() ==
+          Set("minOffsetsBehindLatest",
+            "maxOffsetsBehindLatest",
+            "avgOffsetsBehindLatest").asJava)
+        assert(metrics.get("minOffsetsBehindLatest").toLong == 0)
+        assert(metrics.get("maxOffsetsBehindLatest").toLong == 0)
+        assert(metrics.get("avgOffsetsBehindLatest").toDouble == 0)
+      }}
+    )
+  }
+
+  test("test custom metrics - corner cases") {
+    val topic1 = new TopicPartition(newTopic(), 0)
+    val topic2 = new TopicPartition(newTopic(), 0)
+    val latestOffset = Map[TopicPartition, Long]((topic1, 3L), (topic2, 6L))
+
+    // test empty offset.
+    assert(KafkaMicroBatchStream.metrics(Optional.ofNullable(null), latestOffset) ==
+      Map[String, String]().asJava)
+
+    // test empty offsetsBehindLatest && topics are missing in the latestConsumedOffset.
+    val emptyOffset = KafkaSourceOffset(Map[TopicPartition, Long]())
+    assert(KafkaMicroBatchStream.metrics(Optional.ofNullable(emptyOffset), latestOffset) ==

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] yijiacui-db commented on pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
yijiacui-db commented on pull request #31944:
URL: https://github.com/apache/spark/pull/31944#issuecomment-834179633


   > Thanks all for thoughtful reviewing and thanks @yijiacui-db for the contribution! Merged to master.
   
   @HeartSaVioR @xuanyuanking @gaborgsomogyi @viirya Thank you so much for reviewing this PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gaborgsomogyi commented on pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on pull request #31944:
URL: https://github.com/apache/spark/pull/31944#issuecomment-828276415


   @viirya please make judgement how fits into the whole picture since I'm not involved in the metrics API development.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gaborgsomogyi commented on a change in pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on a change in pull request #31944:
URL: https://github.com/apache/spark/pull/31944#discussion_r613997289



##########
File path: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
##########
@@ -1297,6 +1297,130 @@ class KafkaMicroBatchV2SourceSuite extends KafkaMicroBatchSourceSuiteBase {
       CheckNewAnswer(32, 33, 34, 35, 36)
     )
   }
+
+
+  test("test custom metrics - with rate limit") {
+    import testImplicits._
+
+    val topic = newTopic()
+    val data = 1 to 10
+    testUtils.createTopic(topic, partitions = 2)
+    testUtils.sendMessages(topic, (1 to 5).map(_.toString).toArray, Some(0))
+    testUtils.sendMessages(topic, (6 to 10).map(_.toString).toArray, Some(1))
+
+    val kafka = spark
+      .readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("subscribe", topic)
+      .option("maxOffsetsPerTrigger", 1)
+      .option(STARTING_OFFSETS_OPTION_KEY, "earliest")
+      .load()
+      .selectExpr("CAST(value AS STRING)")
+      .as[String]
+      .map(_.toInt)
+
+    testStream(kafka)(
+      StartStream(),
+      makeSureGetOffsetCalled,
+      CheckAnswer(data: _*),
+      Execute { query => {

Review comment:
       Not sure why curly brace at the end needed?

##########
File path: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
##########
@@ -1297,6 +1297,130 @@ class KafkaMicroBatchV2SourceSuite extends KafkaMicroBatchSourceSuiteBase {
       CheckNewAnswer(32, 33, 34, 35, 36)
     )
   }
+

Review comment:
       Nit: -1 newline needed.

##########
File path: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
##########
@@ -1297,6 +1297,130 @@ class KafkaMicroBatchV2SourceSuite extends KafkaMicroBatchSourceSuiteBase {
       CheckNewAnswer(32, 33, 34, 35, 36)
     )
   }
+
+
+  test("test custom metrics - with rate limit") {
+    import testImplicits._
+
+    val topic = newTopic()
+    val data = 1 to 10
+    testUtils.createTopic(topic, partitions = 2)
+    testUtils.sendMessages(topic, (1 to 5).map(_.toString).toArray, Some(0))
+    testUtils.sendMessages(topic, (6 to 10).map(_.toString).toArray, Some(1))
+
+    val kafka = spark
+      .readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("subscribe", topic)
+      .option("maxOffsetsPerTrigger", 1)
+      .option(STARTING_OFFSETS_OPTION_KEY, "earliest")
+      .load()
+      .selectExpr("CAST(value AS STRING)")
+      .as[String]
+      .map(_.toInt)
+
+    testStream(kafka)(
+      StartStream(),
+      makeSureGetOffsetCalled,
+      CheckAnswer(data: _*),
+      Execute { query => {
+        // The rate limit is 1, so there must be some delay in offsets per partition.
+        val progressWithDelay = query.recentProgress.map(_.sources.head).reverse.find {
+          progress =>
+            !progress.metrics.isEmpty && {
+              // find the metrics that has non-zero average offsetsBehindLatest greater than 0.
+              progress.metrics.get("avgOffsetsBehindLatest").toDouble > 0
+            }
+        }
+        assert(progressWithDelay.nonEmpty)
+        val metrics = progressWithDelay.get.metrics
+        assert(metrics.keySet() ==
+          Set("minOffsetsBehindLatest",
+            "maxOffsetsBehindLatest",
+            "avgOffsetsBehindLatest").asJava)
+        assert(metrics.get("minOffsetsBehindLatest").toLong > 0)
+        assert(metrics.get("maxOffsetsBehindLatest").toLong > 0)
+        assert(metrics.get("avgOffsetsBehindLatest").toDouble > 0)
+      }}
+    )
+  }
+
+  test("test custom metrics - no rate limit") {
+    import testImplicits._
+
+    val topic = newTopic()
+    val data = 1 to 10
+    testUtils.createTopic(topic, partitions = 2)
+    testUtils.sendMessages(topic, (1 to 5).map(_.toString).toArray, Some(0))
+    testUtils.sendMessages(topic, (6 to 10).map(_.toString).toArray, Some(1))
+
+    val kafka = spark
+      .readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("subscribe", topic)
+      .option(STARTING_OFFSETS_OPTION_KEY, "earliest")
+      .load()
+      .selectExpr("CAST(value AS STRING)")
+      .as[String]
+      .map(_.toInt)
+
+    testStream(kafka)(
+      StartStream(),
+      makeSureGetOffsetCalled,
+      CheckAnswer(data: _*),
+      Execute { query => {
+        val progress = query.recentProgress.map(_.sources.head).lastOption
+        assert(progress.nonEmpty)
+        val metrics = progress.get.metrics
+        // When there is no rate limit, there shouldn't be any delay in the current stream.
+        assert(metrics.keySet() ==
+          Set("minOffsetsBehindLatest",
+            "maxOffsetsBehindLatest",
+            "avgOffsetsBehindLatest").asJava)
+        assert(metrics.get("minOffsetsBehindLatest").toLong == 0)
+        assert(metrics.get("maxOffsetsBehindLatest").toLong == 0)
+        assert(metrics.get("avgOffsetsBehindLatest").toDouble == 0)
+      }}
+    )
+  }
+
+  test("test custom metrics - corner cases") {
+    val topic1 = new TopicPartition(newTopic(), 0)
+    val topic2 = new TopicPartition(newTopic(), 0)
+    val latestOffset = Map[TopicPartition, Long]((topic1, 3L), (topic2, 6L))
+
+    // test empty offset.
+    assert(KafkaMicroBatchStream.metrics(Optional.ofNullable(null), latestOffset) ==
+      Map[String, String]().asJava)
+
+    // test empty offsetsBehindLatest && topics are missing in the latestConsumedOffset.
+    val emptyOffset = KafkaSourceOffset(Map[TopicPartition, Long]())
+    assert(KafkaMicroBatchStream.metrics(Optional.ofNullable(emptyOffset), latestOffset) ==
+      Map[String, String]().asJava)
+
+    // test valid offsetsBehindLatest
+    val offset = KafkaSourceOffset(Map[TopicPartition, Long]((topic1, 1L), (topic2, 2L)))
+    assert(
+      KafkaMicroBatchStream.metrics(Optional.ofNullable(offset), latestOffset) ==
+        Map[String, String](
+          "minOffsetsBehindLatest" -> "2",
+          "maxOffsetsBehindLatest" -> "4",
+          "avgOffsetsBehindLatest" -> "3.0").asJava)
+
+    // test a topic is missing in both the latestConsumedOffset and latestAvailableOffset.
+    val topic3 = new TopicPartition(newTopic(), 0)
+    val offset2 =
+      KafkaSourceOffset(Map[TopicPartition, Long]((topic1, 1L), (topic3, 2L)))
+    val latestAvailableOffsets = Map[TopicPartition, Long]((topic2, 3L), (topic3, 6L))
+    assert(
+      KafkaMicroBatchStream.metrics(Optional.ofNullable(offset2), latestAvailableOffsets) ==

Review comment:
       Triple equals.

##########
File path: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
##########
@@ -1297,6 +1297,130 @@ class KafkaMicroBatchV2SourceSuite extends KafkaMicroBatchSourceSuiteBase {
       CheckNewAnswer(32, 33, 34, 35, 36)
     )
   }
+
+
+  test("test custom metrics - with rate limit") {
+    import testImplicits._
+
+    val topic = newTopic()
+    val data = 1 to 10
+    testUtils.createTopic(topic, partitions = 2)
+    testUtils.sendMessages(topic, (1 to 5).map(_.toString).toArray, Some(0))
+    testUtils.sendMessages(topic, (6 to 10).map(_.toString).toArray, Some(1))
+
+    val kafka = spark
+      .readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("subscribe", topic)
+      .option("maxOffsetsPerTrigger", 1)
+      .option(STARTING_OFFSETS_OPTION_KEY, "earliest")
+      .load()
+      .selectExpr("CAST(value AS STRING)")
+      .as[String]
+      .map(_.toInt)
+
+    testStream(kafka)(
+      StartStream(),
+      makeSureGetOffsetCalled,
+      CheckAnswer(data: _*),
+      Execute { query => {
+        // The rate limit is 1, so there must be some delay in offsets per partition.
+        val progressWithDelay = query.recentProgress.map(_.sources.head).reverse.find {
+          progress =>
+            !progress.metrics.isEmpty && {
+              // find the metrics that has non-zero average offsetsBehindLatest greater than 0.
+              progress.metrics.get("avgOffsetsBehindLatest").toDouble > 0
+            }
+        }
+        assert(progressWithDelay.nonEmpty)
+        val metrics = progressWithDelay.get.metrics
+        assert(metrics.keySet() ==
+          Set("minOffsetsBehindLatest",
+            "maxOffsetsBehindLatest",
+            "avgOffsetsBehindLatest").asJava)
+        assert(metrics.get("minOffsetsBehindLatest").toLong > 0)
+        assert(metrics.get("maxOffsetsBehindLatest").toLong > 0)
+        assert(metrics.get("avgOffsetsBehindLatest").toDouble > 0)
+      }}
+    )
+  }
+
+  test("test custom metrics - no rate limit") {
+    import testImplicits._
+
+    val topic = newTopic()
+    val data = 1 to 10
+    testUtils.createTopic(topic, partitions = 2)
+    testUtils.sendMessages(topic, (1 to 5).map(_.toString).toArray, Some(0))
+    testUtils.sendMessages(topic, (6 to 10).map(_.toString).toArray, Some(1))
+
+    val kafka = spark
+      .readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("subscribe", topic)
+      .option(STARTING_OFFSETS_OPTION_KEY, "earliest")
+      .load()
+      .selectExpr("CAST(value AS STRING)")
+      .as[String]
+      .map(_.toInt)
+
+    testStream(kafka)(
+      StartStream(),
+      makeSureGetOffsetCalled,
+      CheckAnswer(data: _*),
+      Execute { query => {
+        val progress = query.recentProgress.map(_.sources.head).lastOption
+        assert(progress.nonEmpty)
+        val metrics = progress.get.metrics
+        // When there is no rate limit, there shouldn't be any delay in the current stream.
+        assert(metrics.keySet() ==
+          Set("minOffsetsBehindLatest",
+            "maxOffsetsBehindLatest",
+            "avgOffsetsBehindLatest").asJava)
+        assert(metrics.get("minOffsetsBehindLatest").toLong == 0)
+        assert(metrics.get("maxOffsetsBehindLatest").toLong == 0)
+        assert(metrics.get("avgOffsetsBehindLatest").toDouble == 0)
+      }}
+    )
+  }
+
+  test("test custom metrics - corner cases") {
+    val topic1 = new TopicPartition(newTopic(), 0)
+    val topic2 = new TopicPartition(newTopic(), 0)
+    val latestOffset = Map[TopicPartition, Long]((topic1, 3L), (topic2, 6L))
+
+    // test empty offset.
+    assert(KafkaMicroBatchStream.metrics(Optional.ofNullable(null), latestOffset) ==
+      Map[String, String]().asJava)
+
+    // test empty offsetsBehindLatest && topics are missing in the latestConsumedOffset.
+    val emptyOffset = KafkaSourceOffset(Map[TopicPartition, Long]())
+    assert(KafkaMicroBatchStream.metrics(Optional.ofNullable(emptyOffset), latestOffset) ==

Review comment:
       `isEmpty` would be more simple, right?

##########
File path: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
##########
@@ -1297,6 +1297,130 @@ class KafkaMicroBatchV2SourceSuite extends KafkaMicroBatchSourceSuiteBase {
       CheckNewAnswer(32, 33, 34, 35, 36)
     )
   }
+
+
+  test("test custom metrics - with rate limit") {
+    import testImplicits._
+
+    val topic = newTopic()
+    val data = 1 to 10
+    testUtils.createTopic(topic, partitions = 2)
+    testUtils.sendMessages(topic, (1 to 5).map(_.toString).toArray, Some(0))
+    testUtils.sendMessages(topic, (6 to 10).map(_.toString).toArray, Some(1))
+
+    val kafka = spark
+      .readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("subscribe", topic)
+      .option("maxOffsetsPerTrigger", 1)
+      .option(STARTING_OFFSETS_OPTION_KEY, "earliest")
+      .load()
+      .selectExpr("CAST(value AS STRING)")
+      .as[String]
+      .map(_.toInt)
+
+    testStream(kafka)(
+      StartStream(),
+      makeSureGetOffsetCalled,
+      CheckAnswer(data: _*),
+      Execute { query => {
+        // The rate limit is 1, so there must be some delay in offsets per partition.
+        val progressWithDelay = query.recentProgress.map(_.sources.head).reverse.find {
+          progress =>
+            !progress.metrics.isEmpty && {
+              // find the metrics that has non-zero average offsetsBehindLatest greater than 0.
+              progress.metrics.get("avgOffsetsBehindLatest").toDouble > 0
+            }
+        }
+        assert(progressWithDelay.nonEmpty)
+        val metrics = progressWithDelay.get.metrics
+        assert(metrics.keySet() ==
+          Set("minOffsetsBehindLatest",
+            "maxOffsetsBehindLatest",
+            "avgOffsetsBehindLatest").asJava)
+        assert(metrics.get("minOffsetsBehindLatest").toLong > 0)
+        assert(metrics.get("maxOffsetsBehindLatest").toLong > 0)
+        assert(metrics.get("avgOffsetsBehindLatest").toDouble > 0)
+      }}
+    )
+  }
+
+  test("test custom metrics - no rate limit") {
+    import testImplicits._
+
+    val topic = newTopic()
+    val data = 1 to 10
+    testUtils.createTopic(topic, partitions = 2)
+    testUtils.sendMessages(topic, (1 to 5).map(_.toString).toArray, Some(0))
+    testUtils.sendMessages(topic, (6 to 10).map(_.toString).toArray, Some(1))
+
+    val kafka = spark
+      .readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("subscribe", topic)
+      .option(STARTING_OFFSETS_OPTION_KEY, "earliest")
+      .load()
+      .selectExpr("CAST(value AS STRING)")
+      .as[String]
+      .map(_.toInt)
+
+    testStream(kafka)(
+      StartStream(),
+      makeSureGetOffsetCalled,
+      CheckAnswer(data: _*),
+      Execute { query => {

Review comment:
       Not sure why curly brace at the end needed?

##########
File path: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
##########
@@ -1297,6 +1297,130 @@ class KafkaMicroBatchV2SourceSuite extends KafkaMicroBatchSourceSuiteBase {
       CheckNewAnswer(32, 33, 34, 35, 36)
     )
   }
+
+
+  test("test custom metrics - with rate limit") {
+    import testImplicits._
+
+    val topic = newTopic()
+    val data = 1 to 10
+    testUtils.createTopic(topic, partitions = 2)
+    testUtils.sendMessages(topic, (1 to 5).map(_.toString).toArray, Some(0))
+    testUtils.sendMessages(topic, (6 to 10).map(_.toString).toArray, Some(1))
+
+    val kafka = spark
+      .readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("subscribe", topic)
+      .option("maxOffsetsPerTrigger", 1)
+      .option(STARTING_OFFSETS_OPTION_KEY, "earliest")
+      .load()
+      .selectExpr("CAST(value AS STRING)")
+      .as[String]
+      .map(_.toInt)
+
+    testStream(kafka)(
+      StartStream(),
+      makeSureGetOffsetCalled,
+      CheckAnswer(data: _*),
+      Execute { query => {
+        // The rate limit is 1, so there must be some delay in offsets per partition.
+        val progressWithDelay = query.recentProgress.map(_.sources.head).reverse.find {
+          progress =>
+            !progress.metrics.isEmpty && {
+              // find the metrics that has non-zero average offsetsBehindLatest greater than 0.
+              progress.metrics.get("avgOffsetsBehindLatest").toDouble > 0
+            }
+        }
+        assert(progressWithDelay.nonEmpty)
+        val metrics = progressWithDelay.get.metrics
+        assert(metrics.keySet() ==
+          Set("minOffsetsBehindLatest",
+            "maxOffsetsBehindLatest",
+            "avgOffsetsBehindLatest").asJava)
+        assert(metrics.get("minOffsetsBehindLatest").toLong > 0)
+        assert(metrics.get("maxOffsetsBehindLatest").toLong > 0)
+        assert(metrics.get("avgOffsetsBehindLatest").toDouble > 0)
+      }}
+    )
+  }
+
+  test("test custom metrics - no rate limit") {
+    import testImplicits._
+
+    val topic = newTopic()
+    val data = 1 to 10
+    testUtils.createTopic(topic, partitions = 2)
+    testUtils.sendMessages(topic, (1 to 5).map(_.toString).toArray, Some(0))
+    testUtils.sendMessages(topic, (6 to 10).map(_.toString).toArray, Some(1))
+
+    val kafka = spark
+      .readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("subscribe", topic)
+      .option(STARTING_OFFSETS_OPTION_KEY, "earliest")
+      .load()
+      .selectExpr("CAST(value AS STRING)")
+      .as[String]
+      .map(_.toInt)
+
+    testStream(kafka)(
+      StartStream(),
+      makeSureGetOffsetCalled,
+      CheckAnswer(data: _*),
+      Execute { query => {
+        val progress = query.recentProgress.map(_.sources.head).lastOption
+        assert(progress.nonEmpty)
+        val metrics = progress.get.metrics
+        // When there is no rate limit, there shouldn't be any delay in the current stream.
+        assert(metrics.keySet() ==
+          Set("minOffsetsBehindLatest",
+            "maxOffsetsBehindLatest",
+            "avgOffsetsBehindLatest").asJava)
+        assert(metrics.get("minOffsetsBehindLatest").toLong == 0)
+        assert(metrics.get("maxOffsetsBehindLatest").toLong == 0)
+        assert(metrics.get("avgOffsetsBehindLatest").toDouble == 0)
+      }}
+    )
+  }
+
+  test("test custom metrics - corner cases") {
+    val topic1 = new TopicPartition(newTopic(), 0)
+    val topic2 = new TopicPartition(newTopic(), 0)
+    val latestOffset = Map[TopicPartition, Long]((topic1, 3L), (topic2, 6L))
+
+    // test empty offset.
+    assert(KafkaMicroBatchStream.metrics(Optional.ofNullable(null), latestOffset) ==

Review comment:
       `isEmpty` would be more simple, right?

##########
File path: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
##########
@@ -1297,6 +1297,130 @@ class KafkaMicroBatchV2SourceSuite extends KafkaMicroBatchSourceSuiteBase {
       CheckNewAnswer(32, 33, 34, 35, 36)
     )
   }
+
+
+  test("test custom metrics - with rate limit") {
+    import testImplicits._
+
+    val topic = newTopic()
+    val data = 1 to 10
+    testUtils.createTopic(topic, partitions = 2)
+    testUtils.sendMessages(topic, (1 to 5).map(_.toString).toArray, Some(0))
+    testUtils.sendMessages(topic, (6 to 10).map(_.toString).toArray, Some(1))
+
+    val kafka = spark
+      .readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("subscribe", topic)
+      .option("maxOffsetsPerTrigger", 1)
+      .option(STARTING_OFFSETS_OPTION_KEY, "earliest")
+      .load()
+      .selectExpr("CAST(value AS STRING)")
+      .as[String]
+      .map(_.toInt)
+
+    testStream(kafka)(
+      StartStream(),
+      makeSureGetOffsetCalled,
+      CheckAnswer(data: _*),
+      Execute { query => {
+        // The rate limit is 1, so there must be some delay in offsets per partition.
+        val progressWithDelay = query.recentProgress.map(_.sources.head).reverse.find {
+          progress =>
+            !progress.metrics.isEmpty && {
+              // find the metrics that has non-zero average offsetsBehindLatest greater than 0.
+              progress.metrics.get("avgOffsetsBehindLatest").toDouble > 0
+            }
+        }
+        assert(progressWithDelay.nonEmpty)
+        val metrics = progressWithDelay.get.metrics
+        assert(metrics.keySet() ==
+          Set("minOffsetsBehindLatest",
+            "maxOffsetsBehindLatest",
+            "avgOffsetsBehindLatest").asJava)
+        assert(metrics.get("minOffsetsBehindLatest").toLong > 0)
+        assert(metrics.get("maxOffsetsBehindLatest").toLong > 0)
+        assert(metrics.get("avgOffsetsBehindLatest").toDouble > 0)
+      }}
+    )
+  }
+
+  test("test custom metrics - no rate limit") {
+    import testImplicits._
+
+    val topic = newTopic()
+    val data = 1 to 10
+    testUtils.createTopic(topic, partitions = 2)
+    testUtils.sendMessages(topic, (1 to 5).map(_.toString).toArray, Some(0))
+    testUtils.sendMessages(topic, (6 to 10).map(_.toString).toArray, Some(1))
+
+    val kafka = spark
+      .readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("subscribe", topic)
+      .option(STARTING_OFFSETS_OPTION_KEY, "earliest")
+      .load()
+      .selectExpr("CAST(value AS STRING)")
+      .as[String]
+      .map(_.toInt)
+
+    testStream(kafka)(
+      StartStream(),
+      makeSureGetOffsetCalled,
+      CheckAnswer(data: _*),
+      Execute { query => {
+        val progress = query.recentProgress.map(_.sources.head).lastOption
+        assert(progress.nonEmpty)
+        val metrics = progress.get.metrics
+        // When there is no rate limit, there shouldn't be any delay in the current stream.
+        assert(metrics.keySet() ==
+          Set("minOffsetsBehindLatest",
+            "maxOffsetsBehindLatest",
+            "avgOffsetsBehindLatest").asJava)
+        assert(metrics.get("minOffsetsBehindLatest").toLong == 0)
+        assert(metrics.get("maxOffsetsBehindLatest").toLong == 0)
+        assert(metrics.get("avgOffsetsBehindLatest").toDouble == 0)
+      }}
+    )
+  }
+
+  test("test custom metrics - corner cases") {
+    val topic1 = new TopicPartition(newTopic(), 0)

Review comment:
       These are not topics but topicPartitions, right?

##########
File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
##########
@@ -218,3 +226,37 @@ private[kafka010] class KafkaMicroBatchStream(
     }
   }
 }
+
+object KafkaMicroBatchStream extends Logging {
+
+  /**
+   * Compute the difference of offset per partition between latestAvailablePartitionOffsets
+   * and latestConsumedPartitionOffsets.
+   * Because of rate limit, latest consumed offset per partition can be smaller than
+   * the latest available offset per partition.
+   * @param latestConsumedOffset latest consumed offset
+   * @param latestAvailablePartitionOffsets latest available offset per partition
+   * @return the generated metrics map
+   */
+  def metrics(
+      latestConsumedOffset: Optional[Offset],
+      latestAvailablePartitionOffsets: PartitionOffsetMap): ju.Map[String, String] = {
+    val offset = Option(latestConsumedOffset.orElse(null))
+
+    if (offset.nonEmpty) {
+      val consumedPartitionOffsets = offset.map(KafkaSourceOffset(_)).get.partitionToOffsets
+      val offsetsBehindLatest = latestAvailablePartitionOffsets
+        .filter(partitionOffset => consumedPartitionOffsets.contains(partitionOffset._1))
+        .map(partitionOffset =>

Review comment:
       This can be simplified and no need to break line:
   ```
   .map(partitionOffset => partitionOffset._2 - consumedPartitionOffsets(partitionOffset._1))
   ```

##########
File path: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
##########
@@ -1297,6 +1297,130 @@ class KafkaMicroBatchV2SourceSuite extends KafkaMicroBatchSourceSuiteBase {
       CheckNewAnswer(32, 33, 34, 35, 36)
     )
   }
+
+
+  test("test custom metrics - with rate limit") {
+    import testImplicits._
+
+    val topic = newTopic()
+    val data = 1 to 10
+    testUtils.createTopic(topic, partitions = 2)
+    testUtils.sendMessages(topic, (1 to 5).map(_.toString).toArray, Some(0))
+    testUtils.sendMessages(topic, (6 to 10).map(_.toString).toArray, Some(1))
+
+    val kafka = spark
+      .readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("subscribe", topic)
+      .option("maxOffsetsPerTrigger", 1)
+      .option(STARTING_OFFSETS_OPTION_KEY, "earliest")
+      .load()
+      .selectExpr("CAST(value AS STRING)")
+      .as[String]
+      .map(_.toInt)
+
+    testStream(kafka)(
+      StartStream(),
+      makeSureGetOffsetCalled,
+      CheckAnswer(data: _*),
+      Execute { query => {
+        // The rate limit is 1, so there must be some delay in offsets per partition.
+        val progressWithDelay = query.recentProgress.map(_.sources.head).reverse.find {
+          progress =>
+            !progress.metrics.isEmpty && {
+              // find the metrics that has non-zero average offsetsBehindLatest greater than 0.
+              progress.metrics.get("avgOffsetsBehindLatest").toDouble > 0
+            }
+        }
+        assert(progressWithDelay.nonEmpty)
+        val metrics = progressWithDelay.get.metrics
+        assert(metrics.keySet() ==
+          Set("minOffsetsBehindLatest",
+            "maxOffsetsBehindLatest",
+            "avgOffsetsBehindLatest").asJava)
+        assert(metrics.get("minOffsetsBehindLatest").toLong > 0)
+        assert(metrics.get("maxOffsetsBehindLatest").toLong > 0)
+        assert(metrics.get("avgOffsetsBehindLatest").toDouble > 0)
+      }}
+    )
+  }
+
+  test("test custom metrics - no rate limit") {
+    import testImplicits._
+
+    val topic = newTopic()
+    val data = 1 to 10
+    testUtils.createTopic(topic, partitions = 2)
+    testUtils.sendMessages(topic, (1 to 5).map(_.toString).toArray, Some(0))
+    testUtils.sendMessages(topic, (6 to 10).map(_.toString).toArray, Some(1))
+
+    val kafka = spark
+      .readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("subscribe", topic)
+      .option(STARTING_OFFSETS_OPTION_KEY, "earliest")
+      .load()
+      .selectExpr("CAST(value AS STRING)")
+      .as[String]
+      .map(_.toInt)
+
+    testStream(kafka)(
+      StartStream(),
+      makeSureGetOffsetCalled,
+      CheckAnswer(data: _*),
+      Execute { query => {
+        val progress = query.recentProgress.map(_.sources.head).lastOption
+        assert(progress.nonEmpty)
+        val metrics = progress.get.metrics
+        // When there is no rate limit, there shouldn't be any delay in the current stream.
+        assert(metrics.keySet() ==
+          Set("minOffsetsBehindLatest",
+            "maxOffsetsBehindLatest",
+            "avgOffsetsBehindLatest").asJava)
+        assert(metrics.get("minOffsetsBehindLatest").toLong == 0)

Review comment:
       Is there a specific reason why not use triple equals? Normally we prefer that in Scala.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
##########
@@ -138,31 +138,21 @@ class StreamingQueryProgress private[sql](
   override def toString: String = prettyJson
 
   private[sql] def jsonValue: JValue = {
-    def safeDoubleToJValue(value: Double): JValue = {
-      if (value.isNaN || value.isInfinity) JNothing else JDouble(value)
-    }
-
-    /** Convert map to JValue while handling empty maps. Also, this sorts the keys. */
-    def safeMapToJValue[T](map: ju.Map[String, T], valueToJValue: T => JValue): JValue = {
-      if (map.isEmpty) return JNothing
-      val keys = map.asScala.keySet.toSeq.sorted
-      keys.map { k => k -> valueToJValue(map.get(k)) : JObject }.reduce(_ ~ _)
-    }
-
     ("id" -> JString(id.toString)) ~
     ("runId" -> JString(runId.toString)) ~
     ("name" -> JString(name)) ~
     ("timestamp" -> JString(timestamp)) ~
     ("batchId" -> JInt(batchId)) ~
     ("numInputRows" -> JInt(numInputRows)) ~
-    ("inputRowsPerSecond" -> safeDoubleToJValue(inputRowsPerSecond)) ~
-    ("processedRowsPerSecond" -> safeDoubleToJValue(processedRowsPerSecond)) ~
-    ("durationMs" -> safeMapToJValue[JLong](durationMs, v => JInt(v.toLong))) ~
-    ("eventTime" -> safeMapToJValue[String](eventTime, s => JString(s))) ~
+    ("inputRowsPerSecond" -> SafeJsonSerializer.safeDoubleToJValue(inputRowsPerSecond)) ~

Review comment:
       This can be added to reduce the boilerplate code, right?
   ```
   import org.apache.spark.sql.streaming.SafeJsonSerializer.safeDoubleToJValue
   ```
   

##########
File path: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
##########
@@ -1297,6 +1297,130 @@ class KafkaMicroBatchV2SourceSuite extends KafkaMicroBatchSourceSuiteBase {
       CheckNewAnswer(32, 33, 34, 35, 36)
     )
   }
+
+
+  test("test custom metrics - with rate limit") {
+    import testImplicits._
+
+    val topic = newTopic()
+    val data = 1 to 10
+    testUtils.createTopic(topic, partitions = 2)
+    testUtils.sendMessages(topic, (1 to 5).map(_.toString).toArray, Some(0))
+    testUtils.sendMessages(topic, (6 to 10).map(_.toString).toArray, Some(1))
+
+    val kafka = spark
+      .readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("subscribe", topic)
+      .option("maxOffsetsPerTrigger", 1)
+      .option(STARTING_OFFSETS_OPTION_KEY, "earliest")
+      .load()
+      .selectExpr("CAST(value AS STRING)")
+      .as[String]
+      .map(_.toInt)
+
+    testStream(kafka)(
+      StartStream(),
+      makeSureGetOffsetCalled,
+      CheckAnswer(data: _*),
+      Execute { query => {
+        // The rate limit is 1, so there must be some delay in offsets per partition.
+        val progressWithDelay = query.recentProgress.map(_.sources.head).reverse.find {
+          progress =>
+            !progress.metrics.isEmpty && {
+              // find the metrics that has non-zero average offsetsBehindLatest greater than 0.
+              progress.metrics.get("avgOffsetsBehindLatest").toDouble > 0
+            }
+        }
+        assert(progressWithDelay.nonEmpty)
+        val metrics = progressWithDelay.get.metrics
+        assert(metrics.keySet() ==
+          Set("minOffsetsBehindLatest",
+            "maxOffsetsBehindLatest",
+            "avgOffsetsBehindLatest").asJava)
+        assert(metrics.get("minOffsetsBehindLatest").toLong > 0)
+        assert(metrics.get("maxOffsetsBehindLatest").toLong > 0)
+        assert(metrics.get("avgOffsetsBehindLatest").toDouble > 0)
+      }}
+    )
+  }
+
+  test("test custom metrics - no rate limit") {
+    import testImplicits._
+
+    val topic = newTopic()
+    val data = 1 to 10
+    testUtils.createTopic(topic, partitions = 2)
+    testUtils.sendMessages(topic, (1 to 5).map(_.toString).toArray, Some(0))
+    testUtils.sendMessages(topic, (6 to 10).map(_.toString).toArray, Some(1))
+
+    val kafka = spark
+      .readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("subscribe", topic)
+      .option(STARTING_OFFSETS_OPTION_KEY, "earliest")
+      .load()
+      .selectExpr("CAST(value AS STRING)")
+      .as[String]
+      .map(_.toInt)
+
+    testStream(kafka)(
+      StartStream(),
+      makeSureGetOffsetCalled,
+      CheckAnswer(data: _*),
+      Execute { query => {
+        val progress = query.recentProgress.map(_.sources.head).lastOption
+        assert(progress.nonEmpty)
+        val metrics = progress.get.metrics
+        // When there is no rate limit, there shouldn't be any delay in the current stream.
+        assert(metrics.keySet() ==
+          Set("minOffsetsBehindLatest",
+            "maxOffsetsBehindLatest",
+            "avgOffsetsBehindLatest").asJava)
+        assert(metrics.get("minOffsetsBehindLatest").toLong == 0)
+        assert(metrics.get("maxOffsetsBehindLatest").toLong == 0)
+        assert(metrics.get("avgOffsetsBehindLatest").toDouble == 0)
+      }}
+    )
+  }
+
+  test("test custom metrics - corner cases") {
+    val topic1 = new TopicPartition(newTopic(), 0)
+    val topic2 = new TopicPartition(newTopic(), 0)
+    val latestOffset = Map[TopicPartition, Long]((topic1, 3L), (topic2, 6L))
+
+    // test empty offset.
+    assert(KafkaMicroBatchStream.metrics(Optional.ofNullable(null), latestOffset) ==
+      Map[String, String]().asJava)
+
+    // test empty offsetsBehindLatest && topics are missing in the latestConsumedOffset.
+    val emptyOffset = KafkaSourceOffset(Map[TopicPartition, Long]())
+    assert(KafkaMicroBatchStream.metrics(Optional.ofNullable(emptyOffset), latestOffset) ==
+      Map[String, String]().asJava)
+
+    // test valid offsetsBehindLatest
+    val offset = KafkaSourceOffset(Map[TopicPartition, Long]((topic1, 1L), (topic2, 2L)))
+    assert(
+      KafkaMicroBatchStream.metrics(Optional.ofNullable(offset), latestOffset) ==
+        Map[String, String](
+          "minOffsetsBehindLatest" -> "2",
+          "maxOffsetsBehindLatest" -> "4",
+          "avgOffsetsBehindLatest" -> "3.0").asJava)
+
+    // test a topic is missing in both the latestConsumedOffset and latestAvailableOffset.
+    val topic3 = new TopicPartition(newTopic(), 0)

Review comment:
       This is not topic but topicPartition, right?

##########
File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
##########
@@ -218,3 +226,37 @@ private[kafka010] class KafkaMicroBatchStream(
     }
   }
 }
+
+object KafkaMicroBatchStream extends Logging {
+
+  /**
+   * Compute the difference of offset per partition between latestAvailablePartitionOffsets
+   * and latestConsumedPartitionOffsets.
+   * Because of rate limit, latest consumed offset per partition can be smaller than
+   * the latest available offset per partition.
+   * @param latestConsumedOffset latest consumed offset
+   * @param latestAvailablePartitionOffsets latest available offset per partition
+   * @return the generated metrics map
+   */
+  def metrics(
+      latestConsumedOffset: Optional[Offset],
+      latestAvailablePartitionOffsets: PartitionOffsetMap): ju.Map[String, String] = {
+    val offset = Option(latestConsumedOffset.orElse(null))
+
+    if (offset.nonEmpty) {
+      val consumedPartitionOffsets = offset.map(KafkaSourceOffset(_)).get.partitionToOffsets

Review comment:
       Had a deeper look and this maps to `None` and not `Some(null)` so this is fine.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
##########
@@ -138,31 +138,21 @@ class StreamingQueryProgress private[sql](
   override def toString: String = prettyJson
 
   private[sql] def jsonValue: JValue = {
-    def safeDoubleToJValue(value: Double): JValue = {
-      if (value.isNaN || value.isInfinity) JNothing else JDouble(value)
-    }
-
-    /** Convert map to JValue while handling empty maps. Also, this sorts the keys. */
-    def safeMapToJValue[T](map: ju.Map[String, T], valueToJValue: T => JValue): JValue = {
-      if (map.isEmpty) return JNothing
-      val keys = map.asScala.keySet.toSeq.sorted
-      keys.map { k => k -> valueToJValue(map.get(k)) : JObject }.reduce(_ ~ _)
-    }
-
     ("id" -> JString(id.toString)) ~
     ("runId" -> JString(runId.toString)) ~
     ("name" -> JString(name)) ~
     ("timestamp" -> JString(timestamp)) ~
     ("batchId" -> JInt(batchId)) ~
     ("numInputRows" -> JInt(numInputRows)) ~
-    ("inputRowsPerSecond" -> safeDoubleToJValue(inputRowsPerSecond)) ~
-    ("processedRowsPerSecond" -> safeDoubleToJValue(processedRowsPerSecond)) ~
-    ("durationMs" -> safeMapToJValue[JLong](durationMs, v => JInt(v.toLong))) ~
-    ("eventTime" -> safeMapToJValue[String](eventTime, s => JString(s))) ~
+    ("inputRowsPerSecond" -> SafeJsonSerializer.safeDoubleToJValue(inputRowsPerSecond)) ~
+    ("processedRowsPerSecond" -> SafeJsonSerializer.safeDoubleToJValue(processedRowsPerSecond)) ~
+    ("durationMs" -> SafeJsonSerializer.safeMapToJValue[JLong](durationMs, v => JInt(v.toLong))) ~
+    ("eventTime" -> SafeJsonSerializer.safeMapToJValue[String](eventTime, s => JString(s))) ~
     ("stateOperators" -> JArray(stateOperators.map(_.jsonValue).toList)) ~
     ("sources" -> JArray(sources.map(_.jsonValue).toList)) ~
     ("sink" -> sink.jsonValue) ~
-    ("observedMetrics" -> safeMapToJValue[Row](observedMetrics, row => row.jsonValue))
+    ("observedMetrics" ->
+      SafeJsonSerializer.safeMapToJValue[Row](observedMetrics, row => row.jsonValue))

Review comment:
       Same applies here.

##########
File path: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
##########
@@ -1297,6 +1297,130 @@ class KafkaMicroBatchV2SourceSuite extends KafkaMicroBatchSourceSuiteBase {
       CheckNewAnswer(32, 33, 34, 35, 36)
     )
   }
+
+
+  test("test custom metrics - with rate limit") {
+    import testImplicits._
+
+    val topic = newTopic()
+    val data = 1 to 10
+    testUtils.createTopic(topic, partitions = 2)
+    testUtils.sendMessages(topic, (1 to 5).map(_.toString).toArray, Some(0))
+    testUtils.sendMessages(topic, (6 to 10).map(_.toString).toArray, Some(1))
+
+    val kafka = spark
+      .readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("subscribe", topic)
+      .option("maxOffsetsPerTrigger", 1)
+      .option(STARTING_OFFSETS_OPTION_KEY, "earliest")
+      .load()
+      .selectExpr("CAST(value AS STRING)")
+      .as[String]
+      .map(_.toInt)
+
+    testStream(kafka)(
+      StartStream(),
+      makeSureGetOffsetCalled,
+      CheckAnswer(data: _*),
+      Execute { query => {
+        // The rate limit is 1, so there must be some delay in offsets per partition.
+        val progressWithDelay = query.recentProgress.map(_.sources.head).reverse.find {

Review comment:
       This can be simplified like:
   ```
   val progressWithDelay = query.recentProgress.map(_.sources.head).reverse.find { progress =>
     // find the metrics that has non-zero average offsetsBehindLatest greater than 0.
     !progress.metrics.isEmpty && progress.metrics.get("avgOffsetsBehindLatest").toDouble > 0
   }
   ```
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gaborgsomogyi commented on pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on pull request #31944:
URL: https://github.com/apache/spark/pull/31944#issuecomment-813908760


   I've tested it on real cluster and works fine.
   Just a question. How this it intended to use for dynamic allocation?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #31944:
URL: https://github.com/apache/spark/pull/31944#discussion_r623398237



##########
File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala
##########
@@ -62,4 +63,17 @@ private[kafka010] object KafkaSourceOffset {
    */
   def apply(offset: SerializedOffset): KafkaSourceOffset =
     KafkaSourceOffset(JsonUtils.partitionOffsets(offset.json))
+
+  /**
+   * Returns [[KafkaSourceOffset]] from a streaming.Offset
+   */
+  def apply(offset: streaming.Offset): KafkaSourceOffset = {

Review comment:
       Nice to refer non-deprecated one :) I missed that `Offset` type in above method is deprecated.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] yijiacui-db closed pull request #31944: [SPARK-34297][SQL] Expose source metrics API and Add Kafka metrics to report delay.

Posted by GitBox <gi...@apache.org>.
yijiacui-db closed pull request #31944:
URL: https://github.com/apache/spark/pull/31944


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
viirya commented on pull request #31944:
URL: https://github.com/apache/spark/pull/31944#issuecomment-831004037


   Okay, that metrics for FileStreamSource sound an use-case that makes sense. I'm okay you go with this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] yijiacui-db commented on a change in pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
yijiacui-db commented on a change in pull request #31944:
URL: https://github.com/apache/spark/pull/31944#discussion_r615588569



##########
File path: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
##########
@@ -1297,6 +1297,130 @@ class KafkaMicroBatchV2SourceSuite extends KafkaMicroBatchSourceSuiteBase {
       CheckNewAnswer(32, 33, 34, 35, 36)
     )
   }
+
+
+  test("test custom metrics - with rate limit") {
+    import testImplicits._
+
+    val topic = newTopic()
+    val data = 1 to 10
+    testUtils.createTopic(topic, partitions = 2)
+    testUtils.sendMessages(topic, (1 to 5).map(_.toString).toArray, Some(0))
+    testUtils.sendMessages(topic, (6 to 10).map(_.toString).toArray, Some(1))
+
+    val kafka = spark
+      .readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("subscribe", topic)
+      .option("maxOffsetsPerTrigger", 1)
+      .option(STARTING_OFFSETS_OPTION_KEY, "earliest")
+      .load()
+      .selectExpr("CAST(value AS STRING)")
+      .as[String]
+      .map(_.toInt)
+
+    testStream(kafka)(
+      StartStream(),
+      makeSureGetOffsetCalled,
+      CheckAnswer(data: _*),
+      Execute { query => {
+        // The rate limit is 1, so there must be some delay in offsets per partition.
+        val progressWithDelay = query.recentProgress.map(_.sources.head).reverse.find {
+          progress =>
+            !progress.metrics.isEmpty && {
+              // find the metrics that has non-zero average offsetsBehindLatest greater than 0.
+              progress.metrics.get("avgOffsetsBehindLatest").toDouble > 0
+            }
+        }
+        assert(progressWithDelay.nonEmpty)
+        val metrics = progressWithDelay.get.metrics
+        assert(metrics.keySet() ==
+          Set("minOffsetsBehindLatest",
+            "maxOffsetsBehindLatest",
+            "avgOffsetsBehindLatest").asJava)
+        assert(metrics.get("minOffsetsBehindLatest").toLong > 0)
+        assert(metrics.get("maxOffsetsBehindLatest").toLong > 0)
+        assert(metrics.get("avgOffsetsBehindLatest").toDouble > 0)
+      }}
+    )
+  }
+
+  test("test custom metrics - no rate limit") {
+    import testImplicits._
+
+    val topic = newTopic()
+    val data = 1 to 10
+    testUtils.createTopic(topic, partitions = 2)
+    testUtils.sendMessages(topic, (1 to 5).map(_.toString).toArray, Some(0))
+    testUtils.sendMessages(topic, (6 to 10).map(_.toString).toArray, Some(1))
+
+    val kafka = spark
+      .readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("subscribe", topic)
+      .option(STARTING_OFFSETS_OPTION_KEY, "earliest")
+      .load()
+      .selectExpr("CAST(value AS STRING)")
+      .as[String]
+      .map(_.toInt)
+
+    testStream(kafka)(
+      StartStream(),
+      makeSureGetOffsetCalled,
+      CheckAnswer(data: _*),
+      Execute { query => {
+        val progress = query.recentProgress.map(_.sources.head).lastOption
+        assert(progress.nonEmpty)
+        val metrics = progress.get.metrics
+        // When there is no rate limit, there shouldn't be any delay in the current stream.
+        assert(metrics.keySet() ==
+          Set("minOffsetsBehindLatest",
+            "maxOffsetsBehindLatest",
+            "avgOffsetsBehindLatest").asJava)
+        assert(metrics.get("minOffsetsBehindLatest").toLong == 0)
+        assert(metrics.get("maxOffsetsBehindLatest").toLong == 0)
+        assert(metrics.get("avgOffsetsBehindLatest").toDouble == 0)
+      }}
+    )
+  }
+
+  test("test custom metrics - corner cases") {
+    val topic1 = new TopicPartition(newTopic(), 0)
+    val topic2 = new TopicPartition(newTopic(), 0)
+    val latestOffset = Map[TopicPartition, Long]((topic1, 3L), (topic2, 6L))
+
+    // test empty offset.
+    assert(KafkaMicroBatchStream.metrics(Optional.ofNullable(null), latestOffset) ==
+      Map[String, String]().asJava)
+
+    // test empty offsetsBehindLatest && topics are missing in the latestConsumedOffset.
+    val emptyOffset = KafkaSourceOffset(Map[TopicPartition, Long]())
+    assert(KafkaMicroBatchStream.metrics(Optional.ofNullable(emptyOffset), latestOffset) ==
+      Map[String, String]().asJava)
+
+    // test valid offsetsBehindLatest
+    val offset = KafkaSourceOffset(Map[TopicPartition, Long]((topic1, 1L), (topic2, 2L)))
+    assert(
+      KafkaMicroBatchStream.metrics(Optional.ofNullable(offset), latestOffset) ==
+        Map[String, String](
+          "minOffsetsBehindLatest" -> "2",
+          "maxOffsetsBehindLatest" -> "4",
+          "avgOffsetsBehindLatest" -> "3.0").asJava)
+
+    // test a topic is missing in both the latestConsumedOffset and latestAvailableOffset.
+    val topic3 = new TopicPartition(newTopic(), 0)

Review comment:
       Changed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] yijiacui-db commented on a change in pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
yijiacui-db commented on a change in pull request #31944:
URL: https://github.com/apache/spark/pull/31944#discussion_r618975894



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
##########
@@ -188,7 +178,8 @@ class SourceProgress protected[sql](
   val latestOffset: String,
   val numInputRows: Long,
   val inputRowsPerSecond: Double,
-  val processedRowsPerSecond: Double) extends Serializable {
+  val processedRowsPerSecond: Double,
+  val metrics: ju.Map[String, String] = Map[String, String]().asJava) extends Serializable {

Review comment:
       I tried to add a type for this Map but find it hard to do so. Because we also use this Map[String, String] in java, if we define this alias in this file, we can't import it into the java ReportSourceMetrics File.
   
   If we want to create an alias in java, by extending a map type, then we need to make it public and add another file, then import them in scala files.
   
   The other option is to only use this alias in the two source metrics files, which it's not that efficient to have a type.
   
   Do you have any suggestion about which way we should go to define this alias?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] yijiacui-db commented on a change in pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
yijiacui-db commented on a change in pull request #31944:
URL: https://github.com/apache/spark/pull/31944#discussion_r622628911



##########
File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
##########
@@ -218,3 +226,35 @@ private[kafka010] class KafkaMicroBatchStream(
     }
   }
 }
+
+object KafkaMicroBatchStream extends Logging {
+
+  /**
+   * Compute the difference of offset per partition between latestAvailablePartitionOffsets
+   * and latestConsumedPartitionOffsets.

Review comment:
       I changed it to "partition offsets in the latestConsumedOffset" to make it more clear.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #31944:
URL: https://github.com/apache/spark/pull/31944#issuecomment-832328322


   **[Test build #138163 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/138163/testReport)** for PR 31944 at commit [`2f13f28`](https://github.com/apache/spark/commit/2f13f28d7bee15bed657858c53dd8495ba8f1cbc).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on pull request #31944:
URL: https://github.com/apache/spark/pull/31944#issuecomment-829612381


   cc. @HyukjinKwon @dongjoon-hyun as they reviewed #30988.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on a change in pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
viirya commented on a change in pull request #31944:
URL: https://github.com/apache/spark/pull/31944#discussion_r622381019



##########
File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
##########
@@ -218,3 +226,35 @@ private[kafka010] class KafkaMicroBatchStream(
     }
   }
 }
+
+object KafkaMicroBatchStream extends Logging {
+
+  /**
+   * Compute the difference of offset per partition between latestAvailablePartitionOffsets

Review comment:
       It returns min/max/avg offset behind latest offset, may you describe the actual returned metric map in the doc?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on a change in pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
viirya commented on a change in pull request #31944:
URL: https://github.com/apache/spark/pull/31944#discussion_r622372065



##########
File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
##########
@@ -218,3 +226,35 @@ private[kafka010] class KafkaMicroBatchStream(
     }
   }
 }
+
+object KafkaMicroBatchStream extends Logging {
+
+  /**
+   * Compute the difference of offset per partition between latestAvailablePartitionOffsets
+   * and latestConsumedPartitionOffsets.

Review comment:
       Mismatch between `latestConsumedOffset` and `latestConsumedPartitionOffsets` in doc and code.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #31944:
URL: https://github.com/apache/spark/pull/31944#issuecomment-832433513


   **[Test build #138163 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/138163/testReport)** for PR 31944 at commit [`2f13f28`](https://github.com/apache/spark/commit/2f13f28d7bee15bed657858c53dd8495ba8f1cbc).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xuanyuanking edited a comment on pull request #31944: [SPARK-34297][SQL] Expose source metrics API and Add Kafka metrics to report delay.

Posted by GitBox <gi...@apache.org>.
xuanyuanking edited a comment on pull request #31944:
URL: https://github.com/apache/spark/pull/31944#issuecomment-805517034


   Agree with @viirya on creating another JIRA. Let's link these two tickets together.
   IMO, we should have the metrics both in UI and progress reporter. We can do the follow-ups to combine the code (e.g., the metrics collection part).
   
   Also cc @HeartSaVioR @gaborgsomogyi @bozhang2820 @Ngone51 @zsxwing 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #31944: [SPARK-34297][SQL] Expose source metrics API and Add Kafka metrics to report delay.

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #31944:
URL: https://github.com/apache/spark/pull/31944#issuecomment-805122921


   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on pull request #31944: [SPARK-34297][SQL] Expose source metrics API and Add Kafka metrics to report delay.

Posted by GitBox <gi...@apache.org>.
viirya commented on pull request #31944:
URL: https://github.com/apache/spark/pull/31944#issuecomment-805307697


   For different purpose, I think it is better to create another JIRA. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] yijiacui-db edited a comment on pull request #31944: [SPARK-34297][SQL] Expose source metrics API and Add Kafka metrics to report delay.

Posted by GitBox <gi...@apache.org>.
yijiacui-db edited a comment on pull request #31944:
URL: https://github.com/apache/spark/pull/31944#issuecomment-805278090


   > hi @yijiacui-db, thanks for working on this. I'm still working on this and related API. The community has an agreement ([SPARK-34366](https://issues.apache.org/jira/browse/SPARK-34366), PR #31476) on metric API for data source. Next step I will implement the API at data source including kafka data source. If you are interested in this work, welcome to help review the work later. Thanks.
   
   Hi @viirya, this pull request exposes the backlog metrics to end-user through streaming query progress, not the metrics in the Spark UI.  I think it makes sense to combine both of them to expose metrics at different levels. Sorry I used your jira number because I thought that's the jira number I should use. I can make another jira to track this work if you perfer.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #31944:
URL: https://github.com/apache/spark/pull/31944#issuecomment-828018316


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/138005/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #31944:
URL: https://github.com/apache/spark/pull/31944#issuecomment-832456860


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/138163/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] yijiacui-db commented on a change in pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
yijiacui-db commented on a change in pull request #31944:
URL: https://github.com/apache/spark/pull/31944#discussion_r621001992



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
##########
@@ -138,31 +138,21 @@ class StreamingQueryProgress private[sql](
   override def toString: String = prettyJson
 
   private[sql] def jsonValue: JValue = {
-    def safeDoubleToJValue(value: Double): JValue = {
-      if (value.isNaN || value.isInfinity) JNothing else JDouble(value)
-    }
-
-    /** Convert map to JValue while handling empty maps. Also, this sorts the keys. */
-    def safeMapToJValue[T](map: ju.Map[String, T], valueToJValue: T => JValue): JValue = {
-      if (map.isEmpty) return JNothing
-      val keys = map.asScala.keySet.toSeq.sorted
-      keys.map { k => k -> valueToJValue(map.get(k)) : JObject }.reduce(_ ~ _)
-    }
-
     ("id" -> JString(id.toString)) ~
     ("runId" -> JString(runId.toString)) ~
     ("name" -> JString(name)) ~
     ("timestamp" -> JString(timestamp)) ~
     ("batchId" -> JInt(batchId)) ~
     ("numInputRows" -> JInt(numInputRows)) ~
-    ("inputRowsPerSecond" -> safeDoubleToJValue(inputRowsPerSecond)) ~
-    ("processedRowsPerSecond" -> safeDoubleToJValue(processedRowsPerSecond)) ~
-    ("durationMs" -> safeMapToJValue[JLong](durationMs, v => JInt(v.toLong))) ~
-    ("eventTime" -> safeMapToJValue[String](eventTime, s => JString(s))) ~
+    ("inputRowsPerSecond" -> SafeJsonSerializer.safeDoubleToJValue(inputRowsPerSecond)) ~
+    ("processedRowsPerSecond" -> SafeJsonSerializer.safeDoubleToJValue(processedRowsPerSecond)) ~

Review comment:
       sorry my bad. I checked out that change accidentally. Now updated. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] yijiacui-db commented on a change in pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
yijiacui-db commented on a change in pull request #31944:
URL: https://github.com/apache/spark/pull/31944#discussion_r610752616



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
##########
@@ -175,14 +178,20 @@ trait ProgressReporter extends Logging {
 
     val sourceProgress = sources.distinct.map { source =>
       val numRecords = executionStats.inputRows.getOrElse(source, 0L)
+      val sourceMetrics = source match {
+        case withMetrics: ReportsSourceMetrics =>
+          withMetrics.metrics(Optional.ofNullable(latestStreamProgress.get(source).orNull))
+        case _ => Map[String, String]().asJava
+      }
       new SourceProgress(
         description = source.toString,
         startOffset = currentTriggerStartOffsets.get(source).orNull,
         endOffset = currentTriggerEndOffsets.get(source).orNull,
         latestOffset = currentTriggerLatestOffsets.get(source).orNull,
         numInputRows = numRecords,
         inputRowsPerSecond = numRecords / inputTimeSec,
-        processedRowsPerSecond = numRecords / processingTimeSec
+        processedRowsPerSecond = numRecords / processingTimeSec,

Review comment:
       I checked `processingTimeSec` can't be 0. It's calculated from `processingTimeMills`, which is the difference between the trigger clock of start and end of a trigger. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] yijiacui-db commented on a change in pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
yijiacui-db commented on a change in pull request #31944:
URL: https://github.com/apache/spark/pull/31944#discussion_r605797547



##########
File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
##########
@@ -133,6 +137,10 @@ private[kafka010] class KafkaMicroBatchStream(
 
   override def toString(): String = s"KafkaV2[$kafkaOffsetReader]"
 
+  override def metrics(latestOffset: Optional[Offset]): ju.Map[String, String] = {
+    KafkaMicroBatchStream.metrics(latestOffset, latestPartitionOffsets)

Review comment:
       Yes. It's to unit test the calculation. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xuanyuanking commented on a change in pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
xuanyuanking commented on a change in pull request #31944:
URL: https://github.com/apache/spark/pull/31944#discussion_r605661850



##########
File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
##########
@@ -218,3 +226,36 @@ private[kafka010] class KafkaMicroBatchStream(
     }
   }
 }
+
+object KafkaMicroBatchStream extends Logging {
+
+  /**
+   * Compute the difference of offset per partition between latestAvailablePartitionOffsets
+   * and latestConsumedPartitionOffsets.
+   * Because of rate limit, latest consumed offset per partition can be smaller than
+   * the latest available offset per partition.
+   * @param latestConsumedOffset latest consumed offset
+   * @param latestAvailablePartitionOffsets latest available offset per partition
+   * @return the generated metrics map
+   */
+  def metrics(latestConsumedOffset: Optional[Offset],
+              latestAvailablePartitionOffsets: PartitionOffsetMap): ju.Map[String, String] = {
+    val offset = Option(latestConsumedOffset.orElse(null))
+
+    if (offset.nonEmpty) {
+      val consumedPartitionOffsets = offset.map(KafkaSourceOffset(_)).get.partitionToOffsets
+      val offsetsBehindLatest = latestAvailablePartitionOffsets
+        .filter(partitionOffset => consumedPartitionOffsets.contains(partitionOffset._1))
+        .map(partitionOffset =>
+          partitionOffset._2 - consumedPartitionOffsets.get(partitionOffset._1).get)
+      if (offsetsBehindLatest.size > 0) {

Review comment:
       Maybe `offsetsBehindLatest.nonEmpty`

##########
File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
##########
@@ -133,6 +137,10 @@ private[kafka010] class KafkaMicroBatchStream(
 
   override def toString(): String = s"KafkaV2[$kafkaOffsetReader]"
 
+  override def metrics(latestOffset: Optional[Offset]): ju.Map[String, String] = {
+    KafkaMicroBatchStream.metrics(latestOffset, latestPartitionOffsets)

Review comment:
       I think it's ok to directly override metrics instead of having the new companion object `KafkaMicroBatchStream`

##########
File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
##########
@@ -218,3 +226,36 @@ private[kafka010] class KafkaMicroBatchStream(
     }
   }
 }
+
+object KafkaMicroBatchStream extends Logging {
+
+  /**
+   * Compute the difference of offset per partition between latestAvailablePartitionOffsets
+   * and latestConsumedPartitionOffsets.
+   * Because of rate limit, latest consumed offset per partition can be smaller than
+   * the latest available offset per partition.
+   * @param latestConsumedOffset latest consumed offset
+   * @param latestAvailablePartitionOffsets latest available offset per partition
+   * @return the generated metrics map
+   */
+  def metrics(latestConsumedOffset: Optional[Offset],

Review comment:
       Seems the right code style to follow is
   ```
   def metrics(
           latestConsumedOffset: ...
           latestAvaliablePartitionOffsets: ...): ...
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on pull request #31944:
URL: https://github.com/apache/spark/pull/31944#issuecomment-829611545


   Yes that sounds like a good rationalization for real case. Thanks!
   
   I looked into the changes on API side, and felt both #30988 and this can co-exist. #30988 covers specific cases where latest offset as Offset format can be provided by data source, and this covers more general ("arbitrary" might fit better) cases where the information data source wants to provide is not limited to the latest offset.
   
   For sure, the actual behavioral change in #30988 can be implemented with the API being added here, but providing general output across data sources would be ideally more useful, like plotting to the UI. (I know the technical lack here on making it general as the format of "Offset" is varying across data sources and consumer has to take care.)
   
   For the newly added Kafka metrics, it still makes sense when the target persona is human (convenient to check), but otherwise I agree with @viirya that it sounds like redundant. Despite the fact code change is not huge, probably good to split this down to two PRs with two JIRA issues 1) API changes 2) Kafka metrics, and finalize reviewing 1) first as there seems no outstanding concern on API changes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gaborgsomogyi commented on a change in pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on a change in pull request #31944:
URL: https://github.com/apache/spark/pull/31944#discussion_r620995101



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
##########
@@ -188,7 +178,8 @@ class SourceProgress protected[sql](
   val latestOffset: String,
   val numInputRows: Long,
   val inputRowsPerSecond: Double,
-  val processedRowsPerSecond: Double) extends Serializable {
+  val processedRowsPerSecond: Double,
+  val metrics: ju.Map[String, String] = Map[String, String]().asJava) extends Serializable {

Review comment:
       I've had a deeper look and I think it's fine.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
##########
@@ -138,31 +138,21 @@ class StreamingQueryProgress private[sql](
   override def toString: String = prettyJson
 
   private[sql] def jsonValue: JValue = {
-    def safeDoubleToJValue(value: Double): JValue = {
-      if (value.isNaN || value.isInfinity) JNothing else JDouble(value)
-    }
-
-    /** Convert map to JValue while handling empty maps. Also, this sorts the keys. */
-    def safeMapToJValue[T](map: ju.Map[String, T], valueToJValue: T => JValue): JValue = {
-      if (map.isEmpty) return JNothing
-      val keys = map.asScala.keySet.toSeq.sorted
-      keys.map { k => k -> valueToJValue(map.get(k)) : JObject }.reduce(_ ~ _)
-    }
-
     ("id" -> JString(id.toString)) ~
     ("runId" -> JString(runId.toString)) ~
     ("name" -> JString(name)) ~
     ("timestamp" -> JString(timestamp)) ~
     ("batchId" -> JInt(batchId)) ~
     ("numInputRows" -> JInt(numInputRows)) ~
-    ("inputRowsPerSecond" -> safeDoubleToJValue(inputRowsPerSecond)) ~
-    ("processedRowsPerSecond" -> safeDoubleToJValue(processedRowsPerSecond)) ~
-    ("durationMs" -> safeMapToJValue[JLong](durationMs, v => JInt(v.toLong))) ~
-    ("eventTime" -> safeMapToJValue[String](eventTime, s => JString(s))) ~
+    ("inputRowsPerSecond" -> SafeJsonSerializer.safeDoubleToJValue(inputRowsPerSecond)) ~
+    ("processedRowsPerSecond" -> SafeJsonSerializer.safeDoubleToJValue(processedRowsPerSecond)) ~

Review comment:
       My comment was resolved but I still see `SafeJsonSerializer.` prefixes all over the PR.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xuanyuanking commented on a change in pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
xuanyuanking commented on a change in pull request #31944:
URL: https://github.com/apache/spark/pull/31944#discussion_r605670964



##########
File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
##########
@@ -133,6 +137,10 @@ private[kafka010] class KafkaMicroBatchStream(
 
   override def toString(): String = s"KafkaV2[$kafkaOffsetReader]"
 
+  override def metrics(latestOffset: Optional[Offset]): ju.Map[String, String] = {
+    KafkaMicroBatchStream.metrics(latestOffset, latestPartitionOffsets)

Review comment:
       Never mind, the object is added for testing I think.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on pull request #31944:
URL: https://github.com/apache/spark/pull/31944#issuecomment-828842015


   Yeah I agree about the rationalization and benefits of "adding public API on custom source metrics", though it'd be even better if we could talk with real case which is not covered by #30988.
   
   I feel that the reason the review gets dragging is due to Kafka use-case. Your explanation may make sense on "other" data source (hypothetically, as you haven't provided actual one), but for Kafka case it's possible for specific process to calculate lag with the change of #30988. I agree it's bad for human being to calculate the lag per topic partition and summarize by him/herself, but it's still not that hard for specific process to do that.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] yijiacui-db commented on a change in pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
yijiacui-db commented on a change in pull request #31944:
URL: https://github.com/apache/spark/pull/31944#discussion_r612926596



##########
File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
##########
@@ -218,3 +226,37 @@ private[kafka010] class KafkaMicroBatchStream(
     }
   }
 }
+
+object KafkaMicroBatchStream extends Logging {
+
+  /**
+   * Compute the difference of offset per partition between latestAvailablePartitionOffsets
+   * and latestConsumedPartitionOffsets.
+   * Because of rate limit, latest consumed offset per partition can be smaller than
+   * the latest available offset per partition.
+   * @param latestConsumedOffset latest consumed offset
+   * @param latestAvailablePartitionOffsets latest available offset per partition
+   * @return the generated metrics map
+   */
+  def metrics(
+      latestConsumedOffset: Optional[Offset],
+      latestAvailablePartitionOffsets: PartitionOffsetMap): ju.Map[String, String] = {
+    val offset = Option(latestConsumedOffset.orElse(null))
+
+    if (offset.nonEmpty) {
+      val consumedPartitionOffsets = offset.map(KafkaSourceOffset(_)).get.partitionToOffsets

Review comment:
       Yes. LatestConsumedOffset can be None, and it will skip the case. This is checked by offset.nonEmpty.
   
   Also this is the test for checking latestConsumedOffset to be none case: https://github.com/apache/spark/pull/31944/files#diff-4ddf27dccbb57c9b29179090d9430bbbd3d1bc8d57ab8100901fb45971d23810R1395




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on pull request #31944:
URL: https://github.com/apache/spark/pull/31944#issuecomment-831034365


   @yijiacui-db 
   Appreciate if you could rerun your Github Action test in your fork. I guess rerunning action here wouldn't retrigger the test on your fork. We don't have good way for others (except author) to retrigger tests automatically yet.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] yijiacui-db commented on a change in pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
yijiacui-db commented on a change in pull request #31944:
URL: https://github.com/apache/spark/pull/31944#discussion_r615586599



##########
File path: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
##########
@@ -1297,6 +1297,130 @@ class KafkaMicroBatchV2SourceSuite extends KafkaMicroBatchSourceSuiteBase {
       CheckNewAnswer(32, 33, 34, 35, 36)
     )
   }
+
+
+  test("test custom metrics - with rate limit") {
+    import testImplicits._
+
+    val topic = newTopic()
+    val data = 1 to 10
+    testUtils.createTopic(topic, partitions = 2)
+    testUtils.sendMessages(topic, (1 to 5).map(_.toString).toArray, Some(0))
+    testUtils.sendMessages(topic, (6 to 10).map(_.toString).toArray, Some(1))
+
+    val kafka = spark
+      .readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("subscribe", topic)
+      .option("maxOffsetsPerTrigger", 1)
+      .option(STARTING_OFFSETS_OPTION_KEY, "earliest")
+      .load()
+      .selectExpr("CAST(value AS STRING)")
+      .as[String]
+      .map(_.toInt)
+
+    testStream(kafka)(
+      StartStream(),
+      makeSureGetOffsetCalled,
+      CheckAnswer(data: _*),
+      Execute { query => {
+        // The rate limit is 1, so there must be some delay in offsets per partition.
+        val progressWithDelay = query.recentProgress.map(_.sources.head).reverse.find {
+          progress =>
+            !progress.metrics.isEmpty && {
+              // find the metrics that has non-zero average offsetsBehindLatest greater than 0.
+              progress.metrics.get("avgOffsetsBehindLatest").toDouble > 0
+            }
+        }
+        assert(progressWithDelay.nonEmpty)
+        val metrics = progressWithDelay.get.metrics
+        assert(metrics.keySet() ==
+          Set("minOffsetsBehindLatest",
+            "maxOffsetsBehindLatest",
+            "avgOffsetsBehindLatest").asJava)
+        assert(metrics.get("minOffsetsBehindLatest").toLong > 0)
+        assert(metrics.get("maxOffsetsBehindLatest").toLong > 0)
+        assert(metrics.get("avgOffsetsBehindLatest").toDouble > 0)
+      }}
+    )
+  }
+
+  test("test custom metrics - no rate limit") {
+    import testImplicits._
+
+    val topic = newTopic()
+    val data = 1 to 10
+    testUtils.createTopic(topic, partitions = 2)
+    testUtils.sendMessages(topic, (1 to 5).map(_.toString).toArray, Some(0))
+    testUtils.sendMessages(topic, (6 to 10).map(_.toString).toArray, Some(1))
+
+    val kafka = spark
+      .readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("subscribe", topic)
+      .option(STARTING_OFFSETS_OPTION_KEY, "earliest")
+      .load()
+      .selectExpr("CAST(value AS STRING)")
+      .as[String]
+      .map(_.toInt)
+
+    testStream(kafka)(
+      StartStream(),
+      makeSureGetOffsetCalled,
+      CheckAnswer(data: _*),
+      Execute { query => {

Review comment:
       Removed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #31944:
URL: https://github.com/apache/spark/pull/31944#issuecomment-828018316


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/138005/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gaborgsomogyi commented on pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on pull request #31944:
URL: https://github.com/apache/spark/pull/31944#issuecomment-808166003


   > Take Kafka as an example, we can have read limit while consuming the offsets, so we can only consume some certain number of offset, but the available data in kafka is more than that. That can be applied to all the other streaming sources too. There are some users want to know whether they fall behind through the listener and want to adjust the cluster size accordingly.
   
   If I understand correctly it's planned to monitor whether Spark is behind in Kafka processing. If that's true there is an existing solution for this which works like charm. The user can commit the offsets back to Kafka with a listener and the delta between available and committed offsets can be monitored. If this would be the only use-case then not 100% sure it worth the ~300 lines change set in the Kafka part.
   
   I would like to emphasize I'm not against to make this better, just would be good to see a bit more from use-case perspective.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gaborgsomogyi commented on pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on pull request #31944:
URL: https://github.com/apache/spark/pull/31944#issuecomment-805655228


   * Do you have specific use case leveraging this information?
   
   +1 on @HeartSaVioR comment and would be good to see the big picture before we go into the details.
   Related UI it worth to create a separate Jira.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on pull request #31944:
URL: https://github.com/apache/spark/pull/31944#issuecomment-832507837


   Thanks! Merging to master!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] yijiacui-db commented on pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
yijiacui-db commented on pull request #31944:
URL: https://github.com/apache/spark/pull/31944#issuecomment-829531520


   > Yeah I agree about the rationalization and benefits of "adding public API on custom source metrics", though it'd be even better if we could talk with real case which is not covered by #30988.
   > 
   > I feel that the reason the review gets dragging is due to Kafka use-case. Your explanation may make sense on "other" data source (hypothetically, as you haven't provided actual one), but for Kafka case it's possible for specific process to calculate lag with the change of #30988. I agree it's bad for human being to calculate the lag per topic partition and summarize by him/herself, but it's still not that hard for specific process to do that.
   
   @viirya @HeartSaVioR
   
   A good example is FileStreamSource, which doesn't implement the reportLatestOffset, because the latest available source isn't matched with the "Offset" representation in the Spark streaming. 
   
   In FileStreamSource, fetchMaxOffsests returns the maximum offset that can be retrieved from the source, which can be rate limited.  Only the file source itself knows internally that how many files are left to be processed for the batch. Possible metrics here to be exposed to the users is the number of files, and the number of bytes remaining in the batch to be processed, which is how far the application is falling behind the stream.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gaborgsomogyi commented on a change in pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on a change in pull request #31944:
URL: https://github.com/apache/spark/pull/31944#discussion_r607611872



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
##########
@@ -188,7 +178,8 @@ class SourceProgress protected[sql](
   val latestOffset: String,
   val numInputRows: Long,
   val inputRowsPerSecond: Double,
-  val processedRowsPerSecond: Double) extends Serializable {
+  val processedRowsPerSecond: Double,
+  val metrics: ju.Map[String, String] = Map[String, String]().asJava) extends Serializable {

Review comment:
       Do we need the type def here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] yijiacui-db commented on a change in pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
yijiacui-db commented on a change in pull request #31944:
URL: https://github.com/apache/spark/pull/31944#discussion_r622628591



##########
File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
##########
@@ -133,6 +137,10 @@ private[kafka010] class KafkaMicroBatchStream(
 
   override def toString(): String = s"KafkaV2[$kafkaOffsetReader]"
 
+  override def metrics(latestOffset: Optional[Offset]): ju.Map[String, String] = {
+    KafkaMicroBatchStream.metrics(latestOffset, latestPartitionOffsets)

Review comment:
       Thanks for your suggestion! Yes, it's really really confusing. I've changed the doc and param name. 

##########
File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
##########
@@ -218,3 +226,35 @@ private[kafka010] class KafkaMicroBatchStream(
     }
   }
 }
+
+object KafkaMicroBatchStream extends Logging {
+
+  /**
+   * Compute the difference of offset per partition between latestAvailablePartitionOffsets

Review comment:
       Added.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #31944:
URL: https://github.com/apache/spark/pull/31944#issuecomment-832394666


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/42684/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] yijiacui-db edited a comment on pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
yijiacui-db edited a comment on pull request #31944:
URL: https://github.com/apache/spark/pull/31944#issuecomment-828832285


   > > > I've tested it on real cluster and works fine.
   > > > Just a question. How this it intended to use for dynamic allocation?
   > > 
   > > 
   > > Users can implement this interface in their customized SparkDataStream and know how far falling behind through the progress listener. Maybe this can provide more useful information to guide/trigger the auto scaling.
   > 
   > This is a valid user-case. But my question is that current offsets in `SourceProgress` should already provide the information the use-case needs (consumed offset, available offset). The source progress should be also available on the customized SparkDataStream. Do you mean the metrics from the customized SparkDataStream is not offset related?
   
   Yes. Available offset is retrieved through reportLatestOffset, that's something Kafka already implemented, so that's duplicated because we can use the latest consumed offset and also the available offset to compute how far is falling behind.
   But, for other customized spark data stream, it's possible that reportLatestOffset isn't implemented, so from the source progress report, there's no way to know the latest available offset to do the computation.  Also, the customized metrics, for example, how far the application is falling behind from the latest, can be represented in other ways (not only in the number of offset), which all depends on the how the stream defines it.
   
   We want to introduce this metrics interface to let user implement for their data stream to obtain the metrics they want from the source progress report. Kafka Stream is just an example of how users can implement this and retrieve that information, but it happens to have the latest available offset to make it look a little bit duplicated and hard to reason about.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #31944:
URL: https://github.com/apache/spark/pull/31944#issuecomment-827826656


   **[Test build #138005 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/138005/testReport)** for PR 31944 at commit [`b870302`](https://github.com/apache/spark/commit/b870302f4b4060d900468d2cff3fe67addaa68f9).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on a change in pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
viirya commented on a change in pull request #31944:
URL: https://github.com/apache/spark/pull/31944#discussion_r622376735



##########
File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
##########
@@ -133,6 +137,10 @@ private[kafka010] class KafkaMicroBatchStream(
 
   override def toString(): String = s"KafkaV2[$kafkaOffsetReader]"
 
+  override def metrics(latestOffset: Optional[Offset]): ju.Map[String, String] = {
+    KafkaMicroBatchStream.metrics(latestOffset, latestPartitionOffsets)

Review comment:
       Because `MicroBatchStream` has an API `latestOffset` which is latest offset available. I guess we need to be more clear about the API item to avoid confusing.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] yijiacui-db commented on a change in pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
yijiacui-db commented on a change in pull request #31944:
URL: https://github.com/apache/spark/pull/31944#discussion_r615588991



##########
File path: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
##########
@@ -1297,6 +1297,130 @@ class KafkaMicroBatchV2SourceSuite extends KafkaMicroBatchSourceSuiteBase {
       CheckNewAnswer(32, 33, 34, 35, 36)
     )
   }
+
+
+  test("test custom metrics - with rate limit") {
+    import testImplicits._
+
+    val topic = newTopic()
+    val data = 1 to 10
+    testUtils.createTopic(topic, partitions = 2)
+    testUtils.sendMessages(topic, (1 to 5).map(_.toString).toArray, Some(0))
+    testUtils.sendMessages(topic, (6 to 10).map(_.toString).toArray, Some(1))
+
+    val kafka = spark
+      .readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("subscribe", topic)
+      .option("maxOffsetsPerTrigger", 1)
+      .option(STARTING_OFFSETS_OPTION_KEY, "earliest")
+      .load()
+      .selectExpr("CAST(value AS STRING)")
+      .as[String]
+      .map(_.toInt)
+
+    testStream(kafka)(
+      StartStream(),
+      makeSureGetOffsetCalled,
+      CheckAnswer(data: _*),
+      Execute { query => {
+        // The rate limit is 1, so there must be some delay in offsets per partition.
+        val progressWithDelay = query.recentProgress.map(_.sources.head).reverse.find {
+          progress =>
+            !progress.metrics.isEmpty && {
+              // find the metrics that has non-zero average offsetsBehindLatest greater than 0.
+              progress.metrics.get("avgOffsetsBehindLatest").toDouble > 0
+            }
+        }
+        assert(progressWithDelay.nonEmpty)
+        val metrics = progressWithDelay.get.metrics
+        assert(metrics.keySet() ==
+          Set("minOffsetsBehindLatest",
+            "maxOffsetsBehindLatest",
+            "avgOffsetsBehindLatest").asJava)
+        assert(metrics.get("minOffsetsBehindLatest").toLong > 0)
+        assert(metrics.get("maxOffsetsBehindLatest").toLong > 0)
+        assert(metrics.get("avgOffsetsBehindLatest").toDouble > 0)
+      }}
+    )
+  }
+
+  test("test custom metrics - no rate limit") {
+    import testImplicits._
+
+    val topic = newTopic()
+    val data = 1 to 10
+    testUtils.createTopic(topic, partitions = 2)
+    testUtils.sendMessages(topic, (1 to 5).map(_.toString).toArray, Some(0))
+    testUtils.sendMessages(topic, (6 to 10).map(_.toString).toArray, Some(1))
+
+    val kafka = spark
+      .readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("subscribe", topic)
+      .option(STARTING_OFFSETS_OPTION_KEY, "earliest")
+      .load()
+      .selectExpr("CAST(value AS STRING)")
+      .as[String]
+      .map(_.toInt)
+
+    testStream(kafka)(
+      StartStream(),
+      makeSureGetOffsetCalled,
+      CheckAnswer(data: _*),
+      Execute { query => {
+        val progress = query.recentProgress.map(_.sources.head).lastOption
+        assert(progress.nonEmpty)
+        val metrics = progress.get.metrics
+        // When there is no rate limit, there shouldn't be any delay in the current stream.
+        assert(metrics.keySet() ==
+          Set("minOffsetsBehindLatest",
+            "maxOffsetsBehindLatest",
+            "avgOffsetsBehindLatest").asJava)
+        assert(metrics.get("minOffsetsBehindLatest").toLong == 0)
+        assert(metrics.get("maxOffsetsBehindLatest").toLong == 0)
+        assert(metrics.get("avgOffsetsBehindLatest").toDouble == 0)
+      }}
+    )
+  }
+
+  test("test custom metrics - corner cases") {
+    val topic1 = new TopicPartition(newTopic(), 0)
+    val topic2 = new TopicPartition(newTopic(), 0)
+    val latestOffset = Map[TopicPartition, Long]((topic1, 3L), (topic2, 6L))
+
+    // test empty offset.
+    assert(KafkaMicroBatchStream.metrics(Optional.ofNullable(null), latestOffset) ==
+      Map[String, String]().asJava)
+
+    // test empty offsetsBehindLatest && topics are missing in the latestConsumedOffset.
+    val emptyOffset = KafkaSourceOffset(Map[TopicPartition, Long]())
+    assert(KafkaMicroBatchStream.metrics(Optional.ofNullable(emptyOffset), latestOffset) ==
+      Map[String, String]().asJava)
+
+    // test valid offsetsBehindLatest
+    val offset = KafkaSourceOffset(Map[TopicPartition, Long]((topic1, 1L), (topic2, 2L)))
+    assert(
+      KafkaMicroBatchStream.metrics(Optional.ofNullable(offset), latestOffset) ==
+        Map[String, String](
+          "minOffsetsBehindLatest" -> "2",
+          "maxOffsetsBehindLatest" -> "4",
+          "avgOffsetsBehindLatest" -> "3.0").asJava)
+
+    // test a topic is missing in both the latestConsumedOffset and latestAvailableOffset.
+    val topic3 = new TopicPartition(newTopic(), 0)
+    val offset2 =
+      KafkaSourceOffset(Map[TopicPartition, Long]((topic1, 1L), (topic3, 2L)))
+    val latestAvailableOffsets = Map[TopicPartition, Long]((topic2, 3L), (topic3, 6L))
+    assert(
+      KafkaMicroBatchStream.metrics(Optional.ofNullable(offset2), latestAvailableOffsets) ==

Review comment:
       Changed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gaborgsomogyi commented on a change in pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on a change in pull request #31944:
URL: https://github.com/apache/spark/pull/31944#discussion_r611551597



##########
File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
##########
@@ -218,3 +226,37 @@ private[kafka010] class KafkaMicroBatchStream(
     }
   }
 }
+
+object KafkaMicroBatchStream extends Logging {
+
+  /**
+   * Compute the difference of offset per partition between latestAvailablePartitionOffsets
+   * and latestConsumedPartitionOffsets.
+   * Because of rate limit, latest consumed offset per partition can be smaller than
+   * the latest available offset per partition.
+   * @param latestConsumedOffset latest consumed offset
+   * @param latestAvailablePartitionOffsets latest available offset per partition
+   * @return the generated metrics map
+   */
+  def metrics(
+      latestConsumedOffset: Optional[Offset],
+      latestAvailablePartitionOffsets: PartitionOffsetMap): ju.Map[String, String] = {
+    val offset = Option(latestConsumedOffset.orElse(null))
+
+    if (offset.nonEmpty) {
+      val consumedPartitionOffsets = offset.map(KafkaSourceOffset(_)).get.partitionToOffsets

Review comment:
       `latestConsumedOffset` techically can be none, right? Wondering what will happen such case (`KafkaSourceOffset(null)`)?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] yijiacui-db commented on pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
yijiacui-db commented on pull request #31944:
URL: https://github.com/apache/spark/pull/31944#issuecomment-816064025


   > I've tested it on real cluster and works fine.
   > Just a question. How this it intended to use for dynamic allocation?
   
   Users can implement this interface in their customized SparkDataStream and know how far falling behind through the progress listener. Maybe this can provide more useful information to guide/trigger the auto scaling.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on pull request #31944:
URL: https://github.com/apache/spark/pull/31944#issuecomment-806201324


   It's not only true for Kafka source. We'd need to think like this it can be true for all data sources migrated to Spark 3.2.0, as the API is available.
   
   That said, if we only imagine the use case of the new API as the same purpose from #30988, I feel that is duplicated. That's the reason I'd like to see more use cases.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya edited a comment on pull request #31944: [SPARK-34297][SQL] Expose source metrics API and Add Kafka metrics to report delay.

Posted by GitBox <gi...@apache.org>.
viirya edited a comment on pull request #31944:
URL: https://github.com/apache/spark/pull/31944#issuecomment-805259607


   hi @yijiacui-db, thanks for working on this. I'm still working on this and related API. The community has an agreement (SPARK-34366, PR #31476) on metric API for data source. Next step I will implement the API at data source including kafka data source. If you are interested in this work, welcome to help review the work later. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] yijiacui-db commented on pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
yijiacui-db commented on pull request #31944:
URL: https://github.com/apache/spark/pull/31944#issuecomment-828832285


   > > > I've tested it on real cluster and works fine.
   > > > Just a question. How this it intended to use for dynamic allocation?
   > > 
   > > 
   > > Users can implement this interface in their customized SparkDataStream and know how far falling behind through the progress listener. Maybe this can provide more useful information to guide/trigger the auto scaling.
   > 
   > This is a valid user-case. But my question is that current offsets in `SourceProgress` should already provide the information the use-case needs (consumed offset, available offset). The source progress should be also available on the customized SparkDataStream. Do you mean the metrics from the customized SparkDataStream is not offset related?
   
   Yes. Available offset is retrieved through reportLatestOffset, that's something Kafka already implemented, so that's duplicated because we can use the latest consumed offset and also the available offset to compute how far is falling behind.
   But, for other customized spark data stream, it's possible that reportLatestOffset isn't implemented, so from the source progress report, there's no way to know the latest available offset to do the computation.  Also, the customized metrics, for example, how far the application is falling behind from the latest, can be represented in other ways (not only in the number of offset), which all depends on the how the stream defines it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gaborgsomogyi commented on a change in pull request #31944: [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay.

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on a change in pull request #31944:
URL: https://github.com/apache/spark/pull/31944#discussion_r607608797



##########
File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
##########
@@ -218,3 +226,36 @@ private[kafka010] class KafkaMicroBatchStream(
     }
   }
 }
+
+object KafkaMicroBatchStream extends Logging {
+
+  /**
+   * Compute the difference of offset per partition between latestAvailablePartitionOffsets
+   * and latestConsumedPartitionOffsets.
+   * Because of rate limit, latest consumed offset per partition can be smaller than
+   * the latest available offset per partition.
+   * @param latestConsumedOffset latest consumed offset
+   * @param latestAvailablePartitionOffsets latest available offset per partition
+   * @return the generated metrics map
+   */
+  def metrics(latestConsumedOffset: Optional[Offset],
+              latestAvailablePartitionOffsets: PartitionOffsetMap): ju.Map[String, String] = {
+    val offset = Option(latestConsumedOffset.orElse(null))
+
+    if (offset.nonEmpty) {
+      val consumedPartitionOffsets = offset.map(KafkaSourceOffset(_)).get.partitionToOffsets
+      val offsetsBehindLatest = latestAvailablePartitionOffsets
+        .filter(partitionOffset => consumedPartitionOffsets.contains(partitionOffset._1))
+        .map(partitionOffset =>
+          partitionOffset._2 - consumedPartitionOffsets.get(partitionOffset._1).get)
+      if (offsetsBehindLatest.size > 0) {
+        val avgOffsetBehindLatest = offsetsBehindLatest.sum.toDouble / offsetsBehindLatest.size
+        return Map[String, String](
+          "minOffsetsBehindLatest" -> offsetsBehindLatest.min.toString,
+          "maxOffsetsBehindLatest" -> offsetsBehindLatest.max.toString,
+          "avgOffsetsBehindLatest" -> avgOffsetBehindLatest.toString).asJava
+      }
+    }
+    Map[String, String]().asJava

Review comment:
       `Collections.emptyMap()`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org