You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ko...@apache.org on 2018/05/11 18:40:52 UTC
spark git commit: [SPARK-24067][BACKPORT-2.3][STREAMING][KAFKA] Allow
non-consecutive offsets
Repository: spark
Updated Branches:
refs/heads/branch-2.3 414e4e3d7 -> 1d598b771
[SPARK-24067][BACKPORT-2.3][STREAMING][KAFKA] Allow non-consecutive offsets
## What changes were proposed in this pull request?
Backport of the bugfix in SPARK-17147
Add a configuration spark.streaming.kafka.allowNonConsecutiveOffsets to allow streaming jobs to proceed on compacted topics (or other situations involving gaps between offsets in the log).
## How was this patch tested?
Added new unit test
justinrmiller has been testing this branch in production for a few weeks
Author: cody koeninger <co...@koeninger.org>
Closes #21300 from koeninger/branch-2.3_kafkafix.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1d598b77
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1d598b77
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1d598b77
Branch: refs/heads/branch-2.3
Commit: 1d598b771de3b588a2f377ae7ccf8193156641f2
Parents: 414e4e3
Author: cody koeninger <co...@koeninger.org>
Authored: Fri May 11 13:40:36 2018 -0500
Committer: cody koeninger <co...@koeninger.org>
Committed: Fri May 11 13:40:36 2018 -0500
----------------------------------------------------------------------
.../kafka010/CachedKafkaConsumer.scala | 55 ++++-
.../spark/streaming/kafka010/KafkaRDD.scala | 236 +++++++++++++------
.../streaming/kafka010/KafkaRDDSuite.scala | 106 +++++++++
.../streaming/kafka010/KafkaTestUtils.scala | 25 +-
.../kafka010/mocks/MockScheduler.scala | 96 ++++++++
.../streaming/kafka010/mocks/MockTime.scala | 51 ++++
6 files changed, 487 insertions(+), 82 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/1d598b77/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala
index fa3ea61..aeb8c1d 100644
--- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala
+++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala
@@ -22,10 +22,8 @@ import java.{ util => ju }
import org.apache.kafka.clients.consumer.{ ConsumerConfig, ConsumerRecord, KafkaConsumer }
import org.apache.kafka.common.{ KafkaException, TopicPartition }
-import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
-
/**
* Consumer of single topicpartition, intended for cached reuse.
* Underlying consumer is not threadsafe, so neither is this,
@@ -38,7 +36,7 @@ class CachedKafkaConsumer[K, V] private(
val partition: Int,
val kafkaParams: ju.Map[String, Object]) extends Logging {
- assert(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG),
+ require(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG),
"groupId used for cache key must match the groupId in kafkaParams")
val topicPartition = new TopicPartition(topic, partition)
@@ -53,7 +51,7 @@ class CachedKafkaConsumer[K, V] private(
// TODO if the buffer was kept around as a random-access structure,
// could possibly optimize re-calculating of an RDD in the same batch
- protected var buffer = ju.Collections.emptyList[ConsumerRecord[K, V]]().iterator
+ protected var buffer = ju.Collections.emptyListIterator[ConsumerRecord[K, V]]()
protected var nextOffset = -2L
def close(): Unit = consumer.close()
@@ -71,7 +69,7 @@ class CachedKafkaConsumer[K, V] private(
}
if (!buffer.hasNext()) { poll(timeout) }
- assert(buffer.hasNext(),
+ require(buffer.hasNext(),
s"Failed to get records for $groupId $topic $partition $offset after polling for $timeout")
var record = buffer.next()
@@ -79,17 +77,56 @@ class CachedKafkaConsumer[K, V] private(
logInfo(s"Buffer miss for $groupId $topic $partition $offset")
seek(offset)
poll(timeout)
- assert(buffer.hasNext(),
+ require(buffer.hasNext(),
s"Failed to get records for $groupId $topic $partition $offset after polling for $timeout")
record = buffer.next()
- assert(record.offset == offset,
- s"Got wrong record for $groupId $topic $partition even after seeking to offset $offset")
+ require(record.offset == offset,
+ s"Got wrong record for $groupId $topic $partition even after seeking to offset $offset " +
+ s"got offset ${record.offset} instead. If this is a compacted topic, consider enabling " +
+ "spark.streaming.kafka.allowNonConsecutiveOffsets"
+ )
}
nextOffset = offset + 1
record
}
+ /**
+ * Start a batch on a compacted topic
+ */
+ def compactedStart(offset: Long, timeout: Long): Unit = {
+ logDebug(s"compacted start $groupId $topic $partition starting $offset")
+ // This seek may not be necessary, but it's hard to tell due to gaps in compacted topics
+ if (offset != nextOffset) {
+ logInfo(s"Initial fetch for compacted $groupId $topic $partition $offset")
+ seek(offset)
+ poll(timeout)
+ }
+ }
+
+ /**
+ * Get the next record in the batch from a compacted topic.
+ * Assumes compactedStart has been called first, and ignores gaps.
+ */
+ def compactedNext(timeout: Long): ConsumerRecord[K, V] = {
+ if (!buffer.hasNext()) {
+ poll(timeout)
+ }
+ require(buffer.hasNext(),
+ s"Failed to get records for compacted $groupId $topic $partition after polling for $timeout")
+ val record = buffer.next()
+ nextOffset = record.offset + 1
+ record
+ }
+
+ /**
+ * Rewind to previous record in the batch from a compacted topic.
+ * @throws NoSuchElementException if no previous element
+ */
+ def compactedPrevious(): ConsumerRecord[K, V] = {
+ buffer.previous()
+ }
+
private def seek(offset: Long): Unit = {
logDebug(s"Seeking to $topicPartition $offset")
consumer.seek(topicPartition, offset)
@@ -99,7 +136,7 @@ class CachedKafkaConsumer[K, V] private(
val p = consumer.poll(timeout)
val r = p.records(topicPartition)
logDebug(s"Polled ${p.partitions()} ${r.size}")
- buffer = r.iterator
+ buffer = r.listIterator
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/1d598b77/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
index d9fc9cc..07239ed 100644
--- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
+++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
@@ -55,12 +55,12 @@ private[spark] class KafkaRDD[K, V](
useConsumerCache: Boolean
) extends RDD[ConsumerRecord[K, V]](sc, Nil) with Logging with HasOffsetRanges {
- assert("none" ==
+ require("none" ==
kafkaParams.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).asInstanceOf[String],
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG +
" must be set to none for executor kafka params, else messages may not match offsetRange")
- assert(false ==
+ require(false ==
kafkaParams.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG).asInstanceOf[Boolean],
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG +
" must be set to false for executor kafka params, else offsets may commit before processing")
@@ -74,6 +74,8 @@ private[spark] class KafkaRDD[K, V](
conf.getInt("spark.streaming.kafka.consumer.cache.maxCapacity", 64)
private val cacheLoadFactor =
conf.getDouble("spark.streaming.kafka.consumer.cache.loadFactor", 0.75).toFloat
+ private val compacted =
+ conf.getBoolean("spark.streaming.kafka.allowNonConsecutiveOffsets", false)
override def persist(newLevel: StorageLevel): this.type = {
logError("Kafka ConsumerRecord is not serializable. " +
@@ -87,48 +89,63 @@ private[spark] class KafkaRDD[K, V](
}.toArray
}
- override def count(): Long = offsetRanges.map(_.count).sum
+ override def count(): Long =
+ if (compacted) {
+ super.count()
+ } else {
+ offsetRanges.map(_.count).sum
+ }
override def countApprox(
timeout: Long,
confidence: Double = 0.95
- ): PartialResult[BoundedDouble] = {
- val c = count
- new PartialResult(new BoundedDouble(c, 1.0, c, c), true)
- }
-
- override def isEmpty(): Boolean = count == 0L
-
- override def take(num: Int): Array[ConsumerRecord[K, V]] = {
- val nonEmptyPartitions = this.partitions
- .map(_.asInstanceOf[KafkaRDDPartition])
- .filter(_.count > 0)
+ ): PartialResult[BoundedDouble] =
+ if (compacted) {
+ super.countApprox(timeout, confidence)
+ } else {
+ val c = count
+ new PartialResult(new BoundedDouble(c, 1.0, c, c), true)
+ }
- if (num < 1 || nonEmptyPartitions.isEmpty) {
- return new Array[ConsumerRecord[K, V]](0)
+ override def isEmpty(): Boolean =
+ if (compacted) {
+ super.isEmpty()
+ } else {
+ count == 0L
}
- // Determine in advance how many messages need to be taken from each partition
- val parts = nonEmptyPartitions.foldLeft(Map[Int, Int]()) { (result, part) =>
- val remain = num - result.values.sum
- if (remain > 0) {
- val taken = Math.min(remain, part.count)
- result + (part.index -> taken.toInt)
+ override def take(num: Int): Array[ConsumerRecord[K, V]] =
+ if (compacted) {
+ super.take(num)
+ } else if (num < 1) {
+ Array.empty[ConsumerRecord[K, V]]
+ } else {
+ val nonEmptyPartitions = this.partitions
+ .map(_.asInstanceOf[KafkaRDDPartition])
+ .filter(_.count > 0)
+
+ if (nonEmptyPartitions.isEmpty) {
+ Array.empty[ConsumerRecord[K, V]]
} else {
- result
+ // Determine in advance how many messages need to be taken from each partition
+ val parts = nonEmptyPartitions.foldLeft(Map[Int, Int]()) { (result, part) =>
+ val remain = num - result.values.sum
+ if (remain > 0) {
+ val taken = Math.min(remain, part.count)
+ result + (part.index -> taken.toInt)
+ } else {
+ result
+ }
+ }
+
+ context.runJob(
+ this,
+ (tc: TaskContext, it: Iterator[ConsumerRecord[K, V]]) =>
+ it.take(parts(tc.partitionId)).toArray, parts.keys.toArray
+ ).flatten
}
}
- val buf = new ArrayBuffer[ConsumerRecord[K, V]]
- val res = context.runJob(
- this,
- (tc: TaskContext, it: Iterator[ConsumerRecord[K, V]]) =>
- it.take(parts(tc.partitionId)).toArray, parts.keys.toArray
- )
- res.foreach(buf ++= _)
- buf.toArray
- }
-
private def executors(): Array[ExecutorCacheTaskLocation] = {
val bm = sparkContext.env.blockManager
bm.master.getPeers(bm.blockManagerId).toArray
@@ -172,57 +189,138 @@ private[spark] class KafkaRDD[K, V](
override def compute(thePart: Partition, context: TaskContext): Iterator[ConsumerRecord[K, V]] = {
val part = thePart.asInstanceOf[KafkaRDDPartition]
- assert(part.fromOffset <= part.untilOffset, errBeginAfterEnd(part))
+ require(part.fromOffset <= part.untilOffset, errBeginAfterEnd(part))
if (part.fromOffset == part.untilOffset) {
logInfo(s"Beginning offset ${part.fromOffset} is the same as ending offset " +
s"skipping ${part.topic} ${part.partition}")
Iterator.empty
} else {
- new KafkaRDDIterator(part, context)
+ logInfo(s"Computing topic ${part.topic}, partition ${part.partition} " +
+ s"offsets ${part.fromOffset} -> ${part.untilOffset}")
+ if (compacted) {
+ new CompactedKafkaRDDIterator[K, V](
+ part,
+ context,
+ kafkaParams,
+ useConsumerCache,
+ pollTimeout,
+ cacheInitialCapacity,
+ cacheMaxCapacity,
+ cacheLoadFactor
+ )
+ } else {
+ new KafkaRDDIterator[K, V](
+ part,
+ context,
+ kafkaParams,
+ useConsumerCache,
+ pollTimeout,
+ cacheInitialCapacity,
+ cacheMaxCapacity,
+ cacheLoadFactor
+ )
+ }
}
}
+}
- /**
- * An iterator that fetches messages directly from Kafka for the offsets in partition.
- * Uses a cached consumer where possible to take advantage of prefetching
- */
- private class KafkaRDDIterator(
- part: KafkaRDDPartition,
- context: TaskContext) extends Iterator[ConsumerRecord[K, V]] {
-
- logInfo(s"Computing topic ${part.topic}, partition ${part.partition} " +
- s"offsets ${part.fromOffset} -> ${part.untilOffset}")
-
- val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
+/**
+ * An iterator that fetches messages directly from Kafka for the offsets in partition.
+ * Uses a cached consumer where possible to take advantage of prefetching
+ */
+private class KafkaRDDIterator[K, V](
+ part: KafkaRDDPartition,
+ context: TaskContext,
+ kafkaParams: ju.Map[String, Object],
+ useConsumerCache: Boolean,
+ pollTimeout: Long,
+ cacheInitialCapacity: Int,
+ cacheMaxCapacity: Int,
+ cacheLoadFactor: Float
+) extends Iterator[ConsumerRecord[K, V]] {
+
+ val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
+
+ context.addTaskCompletionListener(_ => closeIfNeeded())
+
+ val consumer = if (useConsumerCache) {
+ CachedKafkaConsumer.init(cacheInitialCapacity, cacheMaxCapacity, cacheLoadFactor)
+ if (context.attemptNumber >= 1) {
+ // just in case the prior attempt failures were cache related
+ CachedKafkaConsumer.remove(groupId, part.topic, part.partition)
+ }
+ CachedKafkaConsumer.get[K, V](groupId, part.topic, part.partition, kafkaParams)
+ } else {
+ CachedKafkaConsumer.getUncached[K, V](groupId, part.topic, part.partition, kafkaParams)
+ }
- context.addTaskCompletionListener{ context => closeIfNeeded() }
+ var requestOffset = part.fromOffset
- val consumer = if (useConsumerCache) {
- CachedKafkaConsumer.init(cacheInitialCapacity, cacheMaxCapacity, cacheLoadFactor)
- if (context.attemptNumber >= 1) {
- // just in case the prior attempt failures were cache related
- CachedKafkaConsumer.remove(groupId, part.topic, part.partition)
- }
- CachedKafkaConsumer.get[K, V](groupId, part.topic, part.partition, kafkaParams)
- } else {
- CachedKafkaConsumer.getUncached[K, V](groupId, part.topic, part.partition, kafkaParams)
+ def closeIfNeeded(): Unit = {
+ if (!useConsumerCache && consumer != null) {
+ consumer.close()
}
+ }
- var requestOffset = part.fromOffset
+ override def hasNext(): Boolean = requestOffset < part.untilOffset
- def closeIfNeeded(): Unit = {
- if (!useConsumerCache && consumer != null) {
- consumer.close
- }
+ override def next(): ConsumerRecord[K, V] = {
+ if (!hasNext) {
+ throw new ju.NoSuchElementException("Can't call getNext() once untilOffset has been reached")
}
+ val r = consumer.get(requestOffset, pollTimeout)
+ requestOffset += 1
+ r
+ }
+}
- override def hasNext(): Boolean = requestOffset < part.untilOffset
-
- override def next(): ConsumerRecord[K, V] = {
- assert(hasNext(), "Can't call getNext() once untilOffset has been reached")
- val r = consumer.get(requestOffset, pollTimeout)
- requestOffset += 1
- r
+/**
+ * An iterator that fetches messages directly from Kafka for the offsets in partition.
+ * Uses a cached consumer where possible to take advantage of prefetching.
+ * Intended for compacted topics, or other cases when non-consecutive offsets are ok.
+ */
+private class CompactedKafkaRDDIterator[K, V](
+ part: KafkaRDDPartition,
+ context: TaskContext,
+ kafkaParams: ju.Map[String, Object],
+ useConsumerCache: Boolean,
+ pollTimeout: Long,
+ cacheInitialCapacity: Int,
+ cacheMaxCapacity: Int,
+ cacheLoadFactor: Float
+ ) extends KafkaRDDIterator[K, V](
+ part,
+ context,
+ kafkaParams,
+ useConsumerCache,
+ pollTimeout,
+ cacheInitialCapacity,
+ cacheMaxCapacity,
+ cacheLoadFactor
+ ) {
+
+ consumer.compactedStart(part.fromOffset, pollTimeout)
+
+ private var nextRecord = consumer.compactedNext(pollTimeout)
+
+ private var okNext: Boolean = true
+
+ override def hasNext(): Boolean = okNext
+
+ override def next(): ConsumerRecord[K, V] = {
+ if (!hasNext) {
+ throw new ju.NoSuchElementException("Can't call getNext() once untilOffset has been reached")
+ }
+ val r = nextRecord
+ if (r.offset + 1 >= part.untilOffset) {
+ okNext = false
+ } else {
+ nextRecord = consumer.compactedNext(pollTimeout)
+ if (nextRecord.offset >= part.untilOffset) {
+ okNext = false
+ consumer.compactedPrevious()
+ }
}
+ r
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/1d598b77/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala
index be373af..271adea 100644
--- a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala
+++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala
@@ -18,16 +18,22 @@
package org.apache.spark.streaming.kafka010
import java.{ util => ju }
+import java.io.File
import scala.collection.JavaConverters._
import scala.util.Random
+import kafka.common.TopicAndPartition
+import kafka.log._
+import kafka.message._
+import kafka.utils.Pool
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.scalatest.BeforeAndAfterAll
import org.apache.spark._
import org.apache.spark.scheduler.ExecutorCacheTaskLocation
+import org.apache.spark.streaming.kafka010.mocks.MockTime
class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll {
@@ -64,6 +70,41 @@ class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll {
private val preferredHosts = LocationStrategies.PreferConsistent
+ private def compactLogs(topic: String, partition: Int, messages: Array[(String, String)]) {
+ val mockTime = new MockTime()
+ // LogCleaner in 0.10 version of Kafka is still expecting the old TopicAndPartition api
+ val logs = new Pool[TopicAndPartition, Log]()
+ val logDir = kafkaTestUtils.brokerLogDir
+ val dir = new File(logDir, topic + "-" + partition)
+ dir.mkdirs()
+ val logProps = new ju.Properties()
+ logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
+ logProps.put(LogConfig.MinCleanableDirtyRatioProp, java.lang.Float.valueOf(0.1f))
+ val log = new Log(
+ dir,
+ LogConfig(logProps),
+ 0L,
+ mockTime.scheduler,
+ mockTime
+ )
+ messages.foreach { case (k, v) =>
+ val msg = new ByteBufferMessageSet(
+ NoCompressionCodec,
+ new Message(v.getBytes, k.getBytes, Message.NoTimestamp, Message.CurrentMagicValue))
+ log.append(msg)
+ }
+ log.roll()
+ logs.put(TopicAndPartition(topic, partition), log)
+
+ val cleaner = new LogCleaner(CleanerConfig(), logDirs = Array(dir), logs = logs)
+ cleaner.startup()
+ cleaner.awaitCleaned(topic, partition, log.activeSegment.baseOffset, 1000)
+
+ cleaner.shutdown()
+ mockTime.scheduler.shutdown()
+ }
+
+
test("basic usage") {
val topic = s"topicbasic-${Random.nextInt}-${System.currentTimeMillis}"
kafkaTestUtils.createTopic(topic)
@@ -102,6 +143,71 @@ class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll {
}
}
+ test("compacted topic") {
+ val compactConf = sparkConf.clone()
+ compactConf.set("spark.streaming.kafka.allowNonConsecutiveOffsets", "true")
+ sc.stop()
+ sc = new SparkContext(compactConf)
+ val topic = s"topiccompacted-${Random.nextInt}-${System.currentTimeMillis}"
+
+ val messages = Array(
+ ("a", "1"),
+ ("a", "2"),
+ ("b", "1"),
+ ("c", "1"),
+ ("c", "2"),
+ ("b", "2"),
+ ("b", "3")
+ )
+ val compactedMessages = Array(
+ ("a", "2"),
+ ("b", "3"),
+ ("c", "2")
+ )
+
+ compactLogs(topic, 0, messages)
+
+ val props = new ju.Properties()
+ props.put("cleanup.policy", "compact")
+ props.put("flush.messages", "1")
+ props.put("segment.ms", "1")
+ props.put("segment.bytes", "256")
+ kafkaTestUtils.createTopic(topic, 1, props)
+
+
+ val kafkaParams = getKafkaParams()
+
+ val offsetRanges = Array(OffsetRange(topic, 0, 0, messages.size))
+
+ val rdd = KafkaUtils.createRDD[String, String](
+ sc, kafkaParams, offsetRanges, preferredHosts
+ ).map(m => m.key -> m.value)
+
+ val received = rdd.collect.toSet
+ assert(received === compactedMessages.toSet)
+
+ // size-related method optimizations return sane results
+ assert(rdd.count === compactedMessages.size)
+ assert(rdd.countApprox(0).getFinalValue.mean === compactedMessages.size)
+ assert(!rdd.isEmpty)
+ assert(rdd.take(1).size === 1)
+ assert(rdd.take(1).head === compactedMessages.head)
+ assert(rdd.take(messages.size + 10).size === compactedMessages.size)
+
+ val emptyRdd = KafkaUtils.createRDD[String, String](
+ sc, kafkaParams, Array(OffsetRange(topic, 0, 0, 0)), preferredHosts)
+
+ assert(emptyRdd.isEmpty)
+
+ // invalid offset ranges throw exceptions
+ val badRanges = Array(OffsetRange(topic, 0, 0, messages.size + 1))
+ intercept[SparkException] {
+ val result = KafkaUtils.createRDD[String, String](sc, kafkaParams, badRanges, preferredHosts)
+ .map(_.value)
+ .collect()
+ }
+ }
+
test("iterator boundary conditions") {
// the idea is to find e.g. off-by-one errors between what kafka has available and the rdd
val topic = s"topicboundary-${Random.nextInt}-${System.currentTimeMillis}"
http://git-wip-us.apache.org/repos/asf/spark/blob/1d598b77/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
index 6c7024e..70b579d 100644
--- a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
+++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
@@ -162,8 +162,8 @@ private[kafka010] class KafkaTestUtils extends Logging {
}
/** Create a Kafka topic and wait until it is propagated to the whole cluster */
- def createTopic(topic: String, partitions: Int): Unit = {
- AdminUtils.createTopic(zkUtils, topic, partitions, 1)
+ def createTopic(topic: String, partitions: Int, config: Properties): Unit = {
+ AdminUtils.createTopic(zkUtils, topic, partitions, 1, config)
// wait until metadata is propagated
(0 until partitions).foreach { p =>
waitUntilMetadataIsPropagated(topic, p)
@@ -171,8 +171,13 @@ private[kafka010] class KafkaTestUtils extends Logging {
}
/** Create a Kafka topic and wait until it is propagated to the whole cluster */
+ def createTopic(topic: String, partitions: Int): Unit = {
+ createTopic(topic, partitions, new Properties())
+ }
+
+ /** Create a Kafka topic and wait until it is propagated to the whole cluster */
def createTopic(topic: String): Unit = {
- createTopic(topic, 1)
+ createTopic(topic, 1, new Properties())
}
/** Java-friendly function for sending messages to the Kafka broker */
@@ -196,12 +201,24 @@ private[kafka010] class KafkaTestUtils extends Logging {
producer = null
}
+ /** Send the array of (key, value) messages to the Kafka broker */
+ def sendMessages(topic: String, messages: Array[(String, String)]): Unit = {
+ producer = new KafkaProducer[String, String](producerConfiguration)
+ messages.foreach { message =>
+ producer.send(new ProducerRecord[String, String](topic, message._1, message._2))
+ }
+ producer.close()
+ producer = null
+ }
+
+ val brokerLogDir = Utils.createTempDir().getAbsolutePath
+
private def brokerConfiguration: Properties = {
val props = new Properties()
props.put("broker.id", "0")
props.put("host.name", "localhost")
props.put("port", brokerPort.toString)
- props.put("log.dir", Utils.createTempDir().getAbsolutePath)
+ props.put("log.dir", brokerLogDir)
props.put("zookeeper.connect", zkAddress)
props.put("log.flush.interval.messages", "1")
props.put("replica.socket.timeout.ms", "1500")
http://git-wip-us.apache.org/repos/asf/spark/blob/1d598b77/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/mocks/MockScheduler.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/mocks/MockScheduler.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/mocks/MockScheduler.scala
new file mode 100644
index 0000000..928e1a6
--- /dev/null
+++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/mocks/MockScheduler.scala
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010.mocks
+
+import java.util.concurrent.TimeUnit
+
+import scala.collection.mutable.PriorityQueue
+
+import kafka.utils.{Scheduler, Time}
+
+/**
+ * A mock scheduler that executes tasks synchronously using a mock time instance.
+ * Tasks are executed synchronously when the time is advanced.
+ * This class is meant to be used in conjunction with MockTime.
+ *
+ * Example usage
+ * <code>
+ * val time = new MockTime
+ * time.scheduler.schedule("a task", println("hello world: " + time.milliseconds), delay = 1000)
+ * time.sleep(1001) // this should cause our scheduled task to fire
+ * </code>
+ *
+ * Incrementing the time to the exact next execution time of a task will result in that task
+ * executing (it as if execution itself takes no time).
+ */
+private[kafka010] class MockScheduler(val time: Time) extends Scheduler {
+
+ /* a priority queue of tasks ordered by next execution time */
+ var tasks = new PriorityQueue[MockTask]()
+
+ def isStarted: Boolean = true
+
+ def startup(): Unit = {}
+
+ def shutdown(): Unit = synchronized {
+ tasks.foreach(_.fun())
+ tasks.clear()
+ }
+
+ /**
+ * Check for any tasks that need to execute. Since this is a mock scheduler this check only occurs
+ * when this method is called and the execution happens synchronously in the calling thread.
+ * If you are using the scheduler associated with a MockTime instance this call
+ * will be triggered automatically.
+ */
+ def tick(): Unit = synchronized {
+ val now = time.milliseconds
+ while(!tasks.isEmpty && tasks.head.nextExecution <= now) {
+ /* pop and execute the task with the lowest next execution time */
+ val curr = tasks.dequeue
+ curr.fun()
+ /* if the task is periodic, reschedule it and re-enqueue */
+ if(curr.periodic) {
+ curr.nextExecution += curr.period
+ this.tasks += curr
+ }
+ }
+ }
+
+ def schedule(
+ name: String,
+ fun: () => Unit,
+ delay: Long = 0,
+ period: Long = -1,
+ unit: TimeUnit = TimeUnit.MILLISECONDS): Unit = synchronized {
+ tasks += MockTask(name, fun, time.milliseconds + delay, period = period)
+ tick()
+ }
+
+}
+
+case class MockTask(
+ val name: String,
+ val fun: () => Unit,
+ var nextExecution: Long,
+ val period: Long) extends Ordered[MockTask] {
+ def periodic: Boolean = period >= 0
+ def compare(t: MockTask): Int = {
+ java.lang.Long.compare(t.nextExecution, nextExecution)
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/1d598b77/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/mocks/MockTime.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/mocks/MockTime.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/mocks/MockTime.scala
new file mode 100644
index 0000000..a68f94d
--- /dev/null
+++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/mocks/MockTime.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010.mocks
+
+import java.util.concurrent._
+
+import kafka.utils.Time
+
+/**
+ * A class used for unit testing things which depend on the Time interface.
+ *
+ * This class never manually advances the clock, it only does so when you call
+ * sleep(ms)
+ *
+ * It also comes with an associated scheduler instance for managing background tasks in
+ * a deterministic way.
+ */
+private[kafka010] class MockTime(@volatile private var currentMs: Long) extends Time {
+
+ val scheduler = new MockScheduler(this)
+
+ def this() = this(System.currentTimeMillis)
+
+ def milliseconds: Long = currentMs
+
+ def nanoseconds: Long =
+ TimeUnit.NANOSECONDS.convert(currentMs, TimeUnit.MILLISECONDS)
+
+ def sleep(ms: Long) {
+ this.currentMs += ms
+ scheduler.tick()
+ }
+
+ override def toString(): String = s"MockTime($milliseconds)"
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org