You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/10/18 21:01:41 UTC
spark git commit: [SPARK-17841][STREAMING][KAFKA] drain commitQueue
Repository: spark
Updated Branches:
refs/heads/master cd662bc7a -> cd106b050
[SPARK-17841][STREAMING][KAFKA] drain commitQueue
## What changes were proposed in this pull request?
Actually drain commit queue rather than just iterating it.
iterator() on a concurrent linked queue won't remove items from the queue, poll() will.
## How was this patch tested?
Unit tests
Author: cody koeninger <co...@koeninger.org>
Closes #15407 from koeninger/SPARK-17841.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cd106b05
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cd106b05
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cd106b05
Branch: refs/heads/master
Commit: cd106b050ff789b6de539956a7f01159ab15c820
Parents: cd662bc
Author: cody koeninger <co...@koeninger.org>
Authored: Tue Oct 18 14:01:49 2016 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Tue Oct 18 14:01:49 2016 -0700
----------------------------------------------------------------------
.../spark/streaming/kafka010/DirectKafkaInputDStream.scala | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/cd106b05/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
index 432537e..7e57bb1 100644
--- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
+++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
@@ -282,13 +282,13 @@ private[spark] class DirectKafkaInputDStream[K, V](
protected def commitAll(): Unit = {
val m = new ju.HashMap[TopicPartition, OffsetAndMetadata]()
- val it = commitQueue.iterator()
- while (it.hasNext) {
- val osr = it.next
+ var osr = commitQueue.poll()
+ while (null != osr) {
val tp = osr.topicPartition
val x = m.get(tp)
val offset = if (null == x) { osr.untilOffset } else { Math.max(x.offset, osr.untilOffset) }
m.put(tp, new OffsetAndMetadata(offset))
+ osr = commitQueue.poll()
}
if (!m.isEmpty) {
consumer.commitAsync(m, commitCallback.get)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org