You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "siying (via GitHub)" <gi...@apache.org> on 2023/04/21 21:38:34 UTC

[GitHub] [spark] siying opened a new pull request, #40905: [SPARK-43233] [SS] Add logging for Kafka Batch Reading for topic partition, offset range and task ID

siying opened a new pull request, #40905:
URL: https://github.com/apache/spark/pull/40905

   ### What changes were proposed in this pull request?
   We add a logging when creating the batch reader with task ID, topic, partition and offset range included.
   The log line looks like following:
   
   23/04/18 22:35:38 INFO KafkaBatchReaderFactory: Creating Kafka reader partitionId=1 partition=StreamingDustTest-KafkaToKafkaTopic-4ccf8662-c3ca-4f3b-871e-1853c0e61765-source-2 fromOffset=0 untilOffset=3 queryId=b5b806c3-ebf3-432e-a9a7-d882d474c0f5 batchId=0 taskId=1
   
   
   ### Why are the changes needed?
   Right now, for structure streaming from Kafka, it's hard to finding which task handling which topic/partition and offset range. 
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   ### How was this patch tested?
   Run KafkaMicroBatchV2SourceSuite and watch logging outputs contain information needed. Also does a small cluster test and observe logs.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] anishshri-db commented on pull request #40905: [SPARK-43233] [SS] Add logging for Kafka Batch Reading for topic partition, offset range and task ID

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on PR #40905:
URL: https://github.com/apache/spark/pull/40905#issuecomment-1518359441

   @HeartSaVioR - please take a look and merge after builds pass, thx !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR closed pull request #40905: [SPARK-43233] [SS] Add logging for Kafka Batch Reading for topic partition, offset range and task ID

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR closed pull request #40905:  [SPARK-43233] [SS] Add logging for Kafka Batch Reading for topic partition, offset range and task ID
URL: https://github.com/apache/spark/pull/40905


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] anishshri-db commented on pull request #40905: [SPARK-43233] [SS] Add logging for Kafka Batch Reading for topic partition, offset range and task ID

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on PR #40905:
URL: https://github.com/apache/spark/pull/40905#issuecomment-1518358413

   @siying - you might need to enable github actions for the tests to run


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #40905: [SPARK-43233] [SS] Add logging for Kafka Batch Reading for topic partition, offset range and task ID

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on PR #40905:
URL: https://github.com/apache/spark/pull/40905#issuecomment-1520959155

   Thanks! Merging to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #40905: [SPARK-43233] [SS] Add logging for Kafka Batch Reading for topic partition, offset range and task ID

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on PR #40905:
URL: https://github.com/apache/spark/pull/40905#issuecomment-1520997483

   @siying Thanks for your first contribution to Apache Spark project. Welcome!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #40905: [SPARK-43233] [SS] Add logging for Kafka Batch Reading for topic partition, offset range and task ID

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #40905:
URL: https://github.com/apache/spark/pull/40905#discussion_r1174808903


##########
oss_spark/pom.xml:
##########
@@ -0,0 +1,17 @@
+<?xml version="1.0" encoding="UTF-8"?>

Review Comment:
   Maybe added by accident?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #40905: [SPARK-43233] [SS] Add logging for Kafka Batch Reading for topic partition, offset range and task ID

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #40905:
URL: https://github.com/apache/spark/pull/40905#discussion_r1174729496


##########
connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatchPartitionReader.scala:
##########
@@ -34,9 +36,17 @@ private[kafka010] case class KafkaBatchInputPartition(
     failOnDataLoss: Boolean,
     includeHeaders: Boolean) extends InputPartition
 
-private[kafka010] object KafkaBatchReaderFactory extends PartitionReaderFactory {
+private[kafka010] object KafkaBatchReaderFactory extends PartitionReaderFactory with Logging {
   override def createReader(partition: InputPartition): PartitionReader[InternalRow] = {
     val p = partition.asInstanceOf[KafkaBatchInputPartition]
+
+    val queryId = TaskContext.get().getLocalProperty(StreamExecution.QUERY_ID_KEY)

Review Comment:
   nit: shall we simply add a new local variable for `TaskContext.get()`? Something like `val taskCtx = TaskContext.get()` or even just `ctx`.



##########
connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatchPartitionReader.scala:
##########
@@ -34,9 +36,17 @@ private[kafka010] case class KafkaBatchInputPartition(
     failOnDataLoss: Boolean,
     includeHeaders: Boolean) extends InputPartition
 
-private[kafka010] object KafkaBatchReaderFactory extends PartitionReaderFactory {
+private[kafka010] object KafkaBatchReaderFactory extends PartitionReaderFactory with Logging {
   override def createReader(partition: InputPartition): PartitionReader[InternalRow] = {
     val p = partition.asInstanceOf[KafkaBatchInputPartition]
+
+    val queryId = TaskContext.get().getLocalProperty(StreamExecution.QUERY_ID_KEY)
+    val batchId = TaskContext.get().getLocalProperty(MicroBatchExecution.BATCH_ID_KEY)
+    logInfo(s"Creating Kafka reader partitionId=${TaskContext.get().partitionId()} " +

Review Comment:
   minor: placing partitionId near to topicPartition may lead to misunderstand that partitionId refer to Kafka's one (hence redundant info.). It's Spark's partition ID. 
   
   I'd suggest placing it after taskId, and maybe add ` for the query ` in between `untilOffset` and `queryId` to differentiate Kafka related info vs Spark/query related info.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org