You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ka...@apache.org on 2023/06/20 22:34:02 UTC

[spark] branch master updated: [SPARK-44012][SS] KafkaDataConsumer to print some read status

This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new ef8f22e53d3 [SPARK-44012][SS] KafkaDataConsumer to print some read status
ef8f22e53d3 is described below

commit ef8f22e53d31225b3429e2f24ca588d113fc7462
Author: Siying Dong <si...@databricks.com>
AuthorDate: Wed Jun 21 07:33:46 2023 +0900

    [SPARK-44012][SS] KafkaDataConsumer to print some read status
    
    ### What changes were proposed in this pull request?
    In the end of each KafkaDataConsumer, it logs some stats. Here is an sample log line:
    
    23/06/08 23:48:14 INFO KafkaDataConsumer: From Kafka topicPartition=topic-121-2 groupId=spark-kafka-source-623fa0a8-04a5-4f34-a9ad-adbf31494e85-711383366-executor read 1 records, taking 504554479 nanos, during time span of 504620999 nanos
    
    ### Why are the changes needed?
    For each task, Kafka source should report fraction of time spent in KafkaConsumer to fetch records. It should also report overall read bandwidth (bytes or records read / time spent fetching).
    
    This will be useful in verifying if fetching is the bottleneck.
    
    ### Does this PR introduce _any_ user-facing change?
    no
    
    ### How was this patch tested?
    1. Run unit tests and validate log line is correct
    2. Run some benchmarks and see it doesn't show up much in CPU profiling.
    
    Closes #41525 from siying/kafka_logging2.
    
    Authored-by: Siying Dong <si...@databricks.com>
    Signed-off-by: Jungtaek Lim <ka...@gmail.com>
---
 .../sql/kafka010/consumer/KafkaDataConsumer.scala  | 55 ++++++++++++++++++++--
 1 file changed, 51 insertions(+), 4 deletions(-)

diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala
index d88e9821489..a9e394d3c88 100644
--- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala
+++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala
@@ -258,6 +258,17 @@ private[kafka010] class KafkaDataConsumer(
    */
   private val fetchedRecord: FetchedRecord = FetchedRecord(null, UNKNOWN_OFFSET)
 
+  // Total duration spent on reading from Kafka
+  private var totalTimeReadNanos: Long = 0
+  // Number of times we poll Kafka consumers.
+  private var numPolls: Long = 0
+  // Number of times we poll Kafka consumers.
+  private var numRecordsPolled: Long = 0
+  // Total number of records fetched from Kafka
+  private var totalRecordsRead: Long = 0
+  // Starting timestamp when the consumer is created.
+  private var startTimestampNano: Long = System.nanoTime()
+
   /**
    * Get the record for the given offset if available.
    *
@@ -343,6 +354,7 @@ private[kafka010] class KafkaDataConsumer(
     }
 
     if (isFetchComplete) {
+      totalRecordsRead += 1
       fetchedRecord.record
     } else {
       fetchedData.reset()
@@ -356,7 +368,9 @@ private[kafka010] class KafkaDataConsumer(
    */
   def getAvailableOffsetRange(): AvailableOffsetRange = runUninterruptiblyIfPossible {
     val consumer = getOrRetrieveConsumer()
-    consumer.getAvailableOffsetRange()
+    timeNanos {
+      consumer.getAvailableOffsetRange()
+    }
   }
 
   def getNumOffsetOutOfRange(): Long = offsetOutOfRange
@@ -367,6 +381,17 @@ private[kafka010] class KafkaDataConsumer(
    * must call method after using the instance to make sure resources are not leaked.
    */
   def release(): Unit = {
+    val kafkaMeta = _consumer
+      .map(c => s"topicPartition=${c.topicPartition} groupId=${c.groupId}")
+      .getOrElse("")
+    val walTime = System.nanoTime() - startTimestampNano
+
+    logInfo(
+      s"From Kafka $kafkaMeta read $totalRecordsRead records through $numPolls polls (polled " +
+      s" out $numRecordsPolled records), taking $totalTimeReadNanos nanos, during time span of " +
+      s"$walTime nanos."
+    )
+
     releaseConsumer()
     releaseFetchedData()
   }
@@ -394,7 +419,9 @@ private[kafka010] class KafkaDataConsumer(
       consumer: InternalKafkaConsumer,
       offset: Long,
       untilOffset: Long): Long = {
-    val range = consumer.getAvailableOffsetRange()
+    val range = timeNanos {
+      consumer.getAvailableOffsetRange()
+    }
     logWarning(s"Some data may be lost. Recovering from the earliest offset: ${range.earliest}")
 
     val topicPartition = consumer.topicPartition
@@ -548,7 +575,11 @@ private[kafka010] class KafkaDataConsumer(
       fetchedData: FetchedData,
       offset: Long,
       pollTimeoutMs: Long): Unit = {
-    val (records, offsetAfterPoll, range) = consumer.fetch(offset, pollTimeoutMs)
+    val (records, offsetAfterPoll, range) = timeNanos {
+      consumer.fetch(offset, pollTimeoutMs)
+    }
+    numPolls += 1
+    numRecordsPolled += records.size
     fetchedData.withNewPoll(records.listIterator, offsetAfterPoll, range)
   }
 
@@ -569,7 +600,15 @@ private[kafka010] class KafkaDataConsumer(
   }
 
   private def retrieveConsumer(): Unit = {
-    _consumer = Option(consumerPool.borrowObject(cacheKey, kafkaParams))
+    _consumer = timeNanos {
+      Option(consumerPool.borrowObject(cacheKey, kafkaParams))
+    }
+    startTimestampNano = System.nanoTime()
+    totalTimeReadNanos = 0
+    numPolls = 0
+    numRecordsPolled = 0
+    totalRecordsRead = 0
+
     require(_consumer.isDefined, "borrowing consumer from pool must always succeed.")
   }
 
@@ -620,6 +659,14 @@ private[kafka010] class KafkaDataConsumer(
         "It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894")
       body
   }
+
+  /** Records the duration of running `body` and increase totalTimeReadNanos accordingly. */
+  private def timeNanos[T](body: => T): T = {
+    val startTime = System.nanoTime()
+    val result = body
+    totalTimeReadNanos += System.nanoTime() - startTime
+    result
+  }
 }
 
 private[kafka010] object KafkaDataConsumer extends Logging {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org