You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/10/18 21:01:41 UTC

spark git commit: [SPARK-17841][STREAMING][KAFKA] drain commitQueue

Repository: spark
Updated Branches:
  refs/heads/master cd662bc7a -> cd106b050


[SPARK-17841][STREAMING][KAFKA] drain commitQueue

## What changes were proposed in this pull request?

Actually drain commit queue rather than just iterating it.
iterator() on a concurrent linked queue won't remove items from the queue, poll() will.

## How was this patch tested?
Unit tests

Author: cody koeninger <co...@koeninger.org>

Closes #15407 from koeninger/SPARK-17841.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cd106b05
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cd106b05
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cd106b05

Branch: refs/heads/master
Commit: cd106b050ff789b6de539956a7f01159ab15c820
Parents: cd662bc
Author: cody koeninger <co...@koeninger.org>
Authored: Tue Oct 18 14:01:49 2016 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Tue Oct 18 14:01:49 2016 -0700

----------------------------------------------------------------------
 .../spark/streaming/kafka010/DirectKafkaInputDStream.scala     | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/cd106b05/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
index 432537e..7e57bb1 100644
--- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
+++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
@@ -282,13 +282,13 @@ private[spark] class DirectKafkaInputDStream[K, V](
 
   protected def commitAll(): Unit = {
     val m = new ju.HashMap[TopicPartition, OffsetAndMetadata]()
-    val it = commitQueue.iterator()
-    while (it.hasNext) {
-      val osr = it.next
+    var osr = commitQueue.poll()
+    while (null != osr) {
       val tp = osr.topicPartition
       val x = m.get(tp)
       val offset = if (null == x) { osr.untilOffset } else { Math.max(x.offset, osr.untilOffset) }
       m.put(tp, new OffsetAndMetadata(offset))
+      osr = commitQueue.poll()
     }
     if (!m.isEmpty) {
       consumer.commitAsync(m, commitCallback.get)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org