You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ko...@apache.org on 2018/07/13 13:37:33 UTC

spark git commit: [SPARK-24713] AppMatser of spark streaming kafka OOM if there are hund…

Repository: spark
Updated Branches:
  refs/heads/master 0ce11d0e3 -> 0f24c6f8a


[SPARK-24713] AppMatser of spark streaming kafka OOM if there are hund…

We have hundreds of kafka topics need to be consumed in one application. The application master will throw OOM exception after hanging for nearly half of an hour.

OOM happens in the env with a lot of topics, and it's not convenient to set up such kind of env in the unit test. So I didn't change/add test case.

Author: Yuanbo Liu <yu...@Yuanbos-MacBook-Air.local>
Author: yuanbo <yu...@apache.org>

Closes #21690 from yuanboliu/master.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0f24c6f8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0f24c6f8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0f24c6f8

Branch: refs/heads/master
Commit: 0f24c6f8abc11c4525c21ac5cd25991bfed36dc4
Parents: 0ce11d0
Author: Yuanbo Liu <yu...@Yuanbos-MacBook-Air.local>
Authored: Fri Jul 13 07:37:24 2018 -0600
Committer: cody koeninger <co...@koeninger.org>
Committed: Fri Jul 13 07:37:24 2018 -0600

----------------------------------------------------------------------
 .../spark/streaming/kafka010/DirectKafkaInputDStream.scala    | 7 ++-----
 1 file changed, 2 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0f24c6f8/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
index c322148..0246006 100644
--- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
+++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
@@ -166,6 +166,8 @@ private[spark] class DirectKafkaInputDStream[K, V](
    * which would throw off consumer position.  Fix position if this happens.
    */
   private def paranoidPoll(c: Consumer[K, V]): Unit = {
+    // don't actually want to consume any messages, so pause all partitions
+    c.pause(c.assignment())
     val msgs = c.poll(0)
     if (!msgs.isEmpty) {
       // position should be minimum offset per topicpartition
@@ -204,8 +206,6 @@ private[spark] class DirectKafkaInputDStream[K, V](
     // position for new partitions determined by auto.offset.reset if no commit
     currentOffsets = currentOffsets ++ newPartitions.map(tp => tp -> c.position(tp)).toMap
 
-    // don't want to consume messages, so pause
-    c.pause(newPartitions.asJava)
     // find latest available offsets
     c.seekToEnd(currentOffsets.keySet.asJava)
     parts.map(tp => tp -> c.position(tp)).toMap
@@ -262,9 +262,6 @@ private[spark] class DirectKafkaInputDStream[K, V](
         tp -> c.position(tp)
       }.toMap
     }
-
-    // don't actually want to consume any messages, so pause all partitions
-    c.pause(currentOffsets.keySet.asJava)
   }
 
   override def stop(): Unit = this.synchronized {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org