You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2017/08/30 09:33:28 UTC

spark git commit: [SPARK-21873][SS] - Avoid using `return` inside `CachedKafkaConsumer.get`

Repository: spark
Updated Branches:
  refs/heads/master d4895c9de -> 8f0df6bc1


[SPARK-21873][SS] - Avoid using `return` inside `CachedKafkaConsumer.get`

During profiling of a structured streaming application with Kafka as the source, I came across this exception:

![Structured Streaming Kafka Exceptions](https://user-images.githubusercontent.com/3448320/29743366-4149ef78-8a99-11e7-94d6-f0cbb691134a.png)

This is a 1 minute sample, which caused 106K `NonLocalReturnControl` exceptions to be thrown.
This happens because `CachedKafkaConsumer.get` is ran inside:

`private def runUninterruptiblyIfPossible[T](body: => T): T`

Where `body: => T` is the `get` method. Turning the method into a function means that in order to escape the `while` loop defined in `get` the runtime has to do dirty tricks which involve throwing the above exception.

## What changes were proposed in this pull request?

Instead of using `return` (which is generally not recommended in Scala), we place the result of the `fetchData` method inside a local variable and use a boolean flag to indicate the status of fetching data, which we monitor as our predicate to the `while` loop.

## How was this patch tested?

I've ran the `KafkaSourceSuite` to make sure regression passes. Since the exception isn't visible from user code, there is no way (at least that I could think of) to add this as a test to the existing suite.

Author: Yuval Itzchakov <yu...@clicktale.com>

Closes #19059 from YuvalItzchakov/master.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8f0df6bc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8f0df6bc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8f0df6bc

Branch: refs/heads/master
Commit: 8f0df6bc1092c0c75b41e91e4ffc41a5525c8274
Parents: d4895c9
Author: Yuval Itzchakov <yu...@clicktale.com>
Authored: Wed Aug 30 10:33:23 2017 +0100
Committer: Sean Owen <so...@cloudera.com>
Committed: Wed Aug 30 10:33:23 2017 +0100

----------------------------------------------------------------------
 .../spark/sql/kafka010/CachedKafkaConsumer.scala | 19 +++++++++++++++----
 1 file changed, 15 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8f0df6bc/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
index 7c4f38e..90ed7b1 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
@@ -112,9 +112,15 @@ private[kafka010] case class CachedKafkaConsumer private(
     // we will move to the next available offset within `[offset, untilOffset)` and retry.
     // If `failOnDataLoss` is `true`, the loop body will be executed only once.
     var toFetchOffset = offset
-    while (toFetchOffset != UNKNOWN_OFFSET) {
+    var consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]] = null
+    // We want to break out of the while loop on a successful fetch to avoid using "return"
+    // which may causes a NonLocalReturnControl exception when this method is used as a function.
+    var isFetchComplete = false
+
+    while (toFetchOffset != UNKNOWN_OFFSET && !isFetchComplete) {
       try {
-        return fetchData(toFetchOffset, untilOffset, pollTimeoutMs, failOnDataLoss)
+        consumerRecord = fetchData(toFetchOffset, untilOffset, pollTimeoutMs, failOnDataLoss)
+        isFetchComplete = true
       } catch {
         case e: OffsetOutOfRangeException =>
           // When there is some error thrown, it's better to use a new consumer to drop all cached
@@ -125,8 +131,13 @@ private[kafka010] case class CachedKafkaConsumer private(
           toFetchOffset = getEarliestAvailableOffsetBetween(toFetchOffset, untilOffset)
       }
     }
-    resetFetchedData()
-    null
+
+    if (isFetchComplete) {
+      consumerRecord
+    } else {
+      resetFetchedData()
+      null
+    }
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org