You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2018/02/27 14:21:14 UTC

spark git commit: [SPARK-17147][STREAMING][KAFKA] Allow non-consecutive offsets

Repository: spark
Updated Branches:
  refs/heads/master 649ed9c57 -> eac0b0672


[SPARK-17147][STREAMING][KAFKA] Allow non-consecutive offsets

## What changes were proposed in this pull request?

Add a configuration spark.streaming.kafka.allowNonConsecutiveOffsets to allow streaming jobs to proceed on compacted topics (or other situations involving gaps between offsets in the log).

## How was this patch tested?

Added new unit test

justinrmiller has been testing this branch in production for a few weeks

Author: cody koeninger <co...@koeninger.org>

Closes #20572 from koeninger/SPARK-17147.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/eac0b067
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/eac0b067
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/eac0b067

Branch: refs/heads/master
Commit: eac0b067222a3dfa52be20360a453cb7bd420bf2
Parents: 649ed9c
Author: cody koeninger <co...@koeninger.org>
Authored: Tue Feb 27 08:21:11 2018 -0600
Committer: Sean Owen <so...@cloudera.com>
Committed: Tue Feb 27 08:21:11 2018 -0600

----------------------------------------------------------------------
 .../kafka010/CachedKafkaConsumer.scala          |  55 ++++-
 .../spark/streaming/kafka010/KafkaRDD.scala     | 236 +++++++++++++------
 .../streaming/kafka010/KafkaRDDSuite.scala      | 106 +++++++++
 .../streaming/kafka010/KafkaTestUtils.scala     |  25 +-
 .../kafka010/mocks/MockScheduler.scala          |  96 ++++++++
 .../streaming/kafka010/mocks/MockTime.scala     |  51 ++++
 6 files changed, 487 insertions(+), 82 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/eac0b067/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala
index fa3ea61..aeb8c1d 100644
--- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala
+++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala
@@ -22,10 +22,8 @@ import java.{ util => ju }
 import org.apache.kafka.clients.consumer.{ ConsumerConfig, ConsumerRecord, KafkaConsumer }
 import org.apache.kafka.common.{ KafkaException, TopicPartition }
 
-import org.apache.spark.SparkConf
 import org.apache.spark.internal.Logging
 
-
 /**
  * Consumer of single topicpartition, intended for cached reuse.
  * Underlying consumer is not threadsafe, so neither is this,
@@ -38,7 +36,7 @@ class CachedKafkaConsumer[K, V] private(
   val partition: Int,
   val kafkaParams: ju.Map[String, Object]) extends Logging {
 
-  assert(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG),
+  require(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG),
     "groupId used for cache key must match the groupId in kafkaParams")
 
   val topicPartition = new TopicPartition(topic, partition)
@@ -53,7 +51,7 @@ class CachedKafkaConsumer[K, V] private(
 
   // TODO if the buffer was kept around as a random-access structure,
   // could possibly optimize re-calculating of an RDD in the same batch
-  protected var buffer = ju.Collections.emptyList[ConsumerRecord[K, V]]().iterator
+  protected var buffer = ju.Collections.emptyListIterator[ConsumerRecord[K, V]]()
   protected var nextOffset = -2L
 
   def close(): Unit = consumer.close()
@@ -71,7 +69,7 @@ class CachedKafkaConsumer[K, V] private(
     }
 
     if (!buffer.hasNext()) { poll(timeout) }
-    assert(buffer.hasNext(),
+    require(buffer.hasNext(),
       s"Failed to get records for $groupId $topic $partition $offset after polling for $timeout")
     var record = buffer.next()
 
@@ -79,17 +77,56 @@ class CachedKafkaConsumer[K, V] private(
       logInfo(s"Buffer miss for $groupId $topic $partition $offset")
       seek(offset)
       poll(timeout)
-      assert(buffer.hasNext(),
+      require(buffer.hasNext(),
         s"Failed to get records for $groupId $topic $partition $offset after polling for $timeout")
       record = buffer.next()
-      assert(record.offset == offset,
-        s"Got wrong record for $groupId $topic $partition even after seeking to offset $offset")
+      require(record.offset == offset,
+        s"Got wrong record for $groupId $topic $partition even after seeking to offset $offset " +
+          s"got offset ${record.offset} instead. If this is a compacted topic, consider enabling " +
+          "spark.streaming.kafka.allowNonConsecutiveOffsets"
+      )
     }
 
     nextOffset = offset + 1
     record
   }
 
+  /**
+   * Start a batch on a compacted topic
+   */
+  def compactedStart(offset: Long, timeout: Long): Unit = {
+    logDebug(s"compacted start $groupId $topic $partition starting $offset")
+    // This seek may not be necessary, but it's hard to tell due to gaps in compacted topics
+    if (offset != nextOffset) {
+      logInfo(s"Initial fetch for compacted $groupId $topic $partition $offset")
+      seek(offset)
+      poll(timeout)
+    }
+  }
+
+  /**
+   * Get the next record in the batch from a compacted topic.
+   * Assumes compactedStart has been called first, and ignores gaps.
+   */
+  def compactedNext(timeout: Long): ConsumerRecord[K, V] = {
+    if (!buffer.hasNext()) {
+      poll(timeout)
+    }
+    require(buffer.hasNext(),
+      s"Failed to get records for compacted $groupId $topic $partition after polling for $timeout")
+    val record = buffer.next()
+    nextOffset = record.offset + 1
+    record
+  }
+
+  /**
+   * Rewind to previous record in the batch from a compacted topic.
+   * @throws NoSuchElementException if no previous element
+   */
+  def compactedPrevious(): ConsumerRecord[K, V] = {
+    buffer.previous()
+  }
+
   private def seek(offset: Long): Unit = {
     logDebug(s"Seeking to $topicPartition $offset")
     consumer.seek(topicPartition, offset)
@@ -99,7 +136,7 @@ class CachedKafkaConsumer[K, V] private(
     val p = consumer.poll(timeout)
     val r = p.records(topicPartition)
     logDebug(s"Polled ${p.partitions()}  ${r.size}")
-    buffer = r.iterator
+    buffer = r.listIterator
   }
 
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/eac0b067/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
index d9fc9cc..07239ed 100644
--- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
+++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
@@ -55,12 +55,12 @@ private[spark] class KafkaRDD[K, V](
     useConsumerCache: Boolean
 ) extends RDD[ConsumerRecord[K, V]](sc, Nil) with Logging with HasOffsetRanges {
 
-  assert("none" ==
+  require("none" ==
     kafkaParams.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).asInstanceOf[String],
     ConsumerConfig.AUTO_OFFSET_RESET_CONFIG +
       " must be set to none for executor kafka params, else messages may not match offsetRange")
 
-  assert(false ==
+  require(false ==
     kafkaParams.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG).asInstanceOf[Boolean],
     ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG +
       " must be set to false for executor kafka params, else offsets may commit before processing")
@@ -74,6 +74,8 @@ private[spark] class KafkaRDD[K, V](
     conf.getInt("spark.streaming.kafka.consumer.cache.maxCapacity", 64)
   private val cacheLoadFactor =
     conf.getDouble("spark.streaming.kafka.consumer.cache.loadFactor", 0.75).toFloat
+  private val compacted =
+    conf.getBoolean("spark.streaming.kafka.allowNonConsecutiveOffsets", false)
 
   override def persist(newLevel: StorageLevel): this.type = {
     logError("Kafka ConsumerRecord is not serializable. " +
@@ -87,48 +89,63 @@ private[spark] class KafkaRDD[K, V](
     }.toArray
   }
 
-  override def count(): Long = offsetRanges.map(_.count).sum
+  override def count(): Long =
+    if (compacted) {
+      super.count()
+    } else {
+      offsetRanges.map(_.count).sum
+    }
 
   override def countApprox(
       timeout: Long,
       confidence: Double = 0.95
-  ): PartialResult[BoundedDouble] = {
-    val c = count
-    new PartialResult(new BoundedDouble(c, 1.0, c, c), true)
-  }
-
-  override def isEmpty(): Boolean = count == 0L
-
-  override def take(num: Int): Array[ConsumerRecord[K, V]] = {
-    val nonEmptyPartitions = this.partitions
-      .map(_.asInstanceOf[KafkaRDDPartition])
-      .filter(_.count > 0)
+  ): PartialResult[BoundedDouble] =
+    if (compacted) {
+      super.countApprox(timeout, confidence)
+    } else {
+      val c = count
+      new PartialResult(new BoundedDouble(c, 1.0, c, c), true)
+    }
 
-    if (num < 1 || nonEmptyPartitions.isEmpty) {
-      return new Array[ConsumerRecord[K, V]](0)
+  override def isEmpty(): Boolean =
+    if (compacted) {
+      super.isEmpty()
+    } else {
+      count == 0L
     }
 
-    // Determine in advance how many messages need to be taken from each partition
-    val parts = nonEmptyPartitions.foldLeft(Map[Int, Int]()) { (result, part) =>
-      val remain = num - result.values.sum
-      if (remain > 0) {
-        val taken = Math.min(remain, part.count)
-        result + (part.index -> taken.toInt)
+  override def take(num: Int): Array[ConsumerRecord[K, V]] =
+    if (compacted) {
+      super.take(num)
+    } else if (num < 1) {
+      Array.empty[ConsumerRecord[K, V]]
+    } else {
+      val nonEmptyPartitions = this.partitions
+        .map(_.asInstanceOf[KafkaRDDPartition])
+        .filter(_.count > 0)
+
+      if (nonEmptyPartitions.isEmpty) {
+        Array.empty[ConsumerRecord[K, V]]
       } else {
-        result
+        // Determine in advance how many messages need to be taken from each partition
+        val parts = nonEmptyPartitions.foldLeft(Map[Int, Int]()) { (result, part) =>
+          val remain = num - result.values.sum
+          if (remain > 0) {
+            val taken = Math.min(remain, part.count)
+            result + (part.index -> taken.toInt)
+          } else {
+            result
+          }
+        }
+
+        context.runJob(
+          this,
+          (tc: TaskContext, it: Iterator[ConsumerRecord[K, V]]) =>
+          it.take(parts(tc.partitionId)).toArray, parts.keys.toArray
+        ).flatten
       }
     }
 
-    val buf = new ArrayBuffer[ConsumerRecord[K, V]]
-    val res = context.runJob(
-      this,
-      (tc: TaskContext, it: Iterator[ConsumerRecord[K, V]]) =>
-      it.take(parts(tc.partitionId)).toArray, parts.keys.toArray
-    )
-    res.foreach(buf ++= _)
-    buf.toArray
-  }
-
   private def executors(): Array[ExecutorCacheTaskLocation] = {
     val bm = sparkContext.env.blockManager
     bm.master.getPeers(bm.blockManagerId).toArray
@@ -172,57 +189,138 @@ private[spark] class KafkaRDD[K, V](
 
   override def compute(thePart: Partition, context: TaskContext): Iterator[ConsumerRecord[K, V]] = {
     val part = thePart.asInstanceOf[KafkaRDDPartition]
-    assert(part.fromOffset <= part.untilOffset, errBeginAfterEnd(part))
+    require(part.fromOffset <= part.untilOffset, errBeginAfterEnd(part))
     if (part.fromOffset == part.untilOffset) {
       logInfo(s"Beginning offset ${part.fromOffset} is the same as ending offset " +
         s"skipping ${part.topic} ${part.partition}")
       Iterator.empty
     } else {
-      new KafkaRDDIterator(part, context)
+      logInfo(s"Computing topic ${part.topic}, partition ${part.partition} " +
+        s"offsets ${part.fromOffset} -> ${part.untilOffset}")
+      if (compacted) {
+        new CompactedKafkaRDDIterator[K, V](
+          part,
+          context,
+          kafkaParams,
+          useConsumerCache,
+          pollTimeout,
+          cacheInitialCapacity,
+          cacheMaxCapacity,
+          cacheLoadFactor
+        )
+      } else {
+        new KafkaRDDIterator[K, V](
+          part,
+          context,
+          kafkaParams,
+          useConsumerCache,
+          pollTimeout,
+          cacheInitialCapacity,
+          cacheMaxCapacity,
+          cacheLoadFactor
+        )
+      }
     }
   }
+}
 
-  /**
-   * An iterator that fetches messages directly from Kafka for the offsets in partition.
-   * Uses a cached consumer where possible to take advantage of prefetching
-   */
-  private class KafkaRDDIterator(
-      part: KafkaRDDPartition,
-      context: TaskContext) extends Iterator[ConsumerRecord[K, V]] {
-
-    logInfo(s"Computing topic ${part.topic}, partition ${part.partition} " +
-      s"offsets ${part.fromOffset} -> ${part.untilOffset}")
-
-    val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
+/**
+ * An iterator that fetches messages directly from Kafka for the offsets in partition.
+ * Uses a cached consumer where possible to take advantage of prefetching
+ */
+private class KafkaRDDIterator[K, V](
+  part: KafkaRDDPartition,
+  context: TaskContext,
+  kafkaParams: ju.Map[String, Object],
+  useConsumerCache: Boolean,
+  pollTimeout: Long,
+  cacheInitialCapacity: Int,
+  cacheMaxCapacity: Int,
+  cacheLoadFactor: Float
+) extends Iterator[ConsumerRecord[K, V]] {
+
+  val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
+
+  context.addTaskCompletionListener(_ => closeIfNeeded())
+
+  val consumer = if (useConsumerCache) {
+    CachedKafkaConsumer.init(cacheInitialCapacity, cacheMaxCapacity, cacheLoadFactor)
+    if (context.attemptNumber >= 1) {
+      // just in case the prior attempt failures were cache related
+      CachedKafkaConsumer.remove(groupId, part.topic, part.partition)
+    }
+    CachedKafkaConsumer.get[K, V](groupId, part.topic, part.partition, kafkaParams)
+  } else {
+    CachedKafkaConsumer.getUncached[K, V](groupId, part.topic, part.partition, kafkaParams)
+  }
 
-    context.addTaskCompletionListener{ context => closeIfNeeded() }
+  var requestOffset = part.fromOffset
 
-    val consumer = if (useConsumerCache) {
-      CachedKafkaConsumer.init(cacheInitialCapacity, cacheMaxCapacity, cacheLoadFactor)
-      if (context.attemptNumber >= 1) {
-        // just in case the prior attempt failures were cache related
-        CachedKafkaConsumer.remove(groupId, part.topic, part.partition)
-      }
-      CachedKafkaConsumer.get[K, V](groupId, part.topic, part.partition, kafkaParams)
-    } else {
-      CachedKafkaConsumer.getUncached[K, V](groupId, part.topic, part.partition, kafkaParams)
+  def closeIfNeeded(): Unit = {
+    if (!useConsumerCache && consumer != null) {
+      consumer.close()
     }
+  }
 
-    var requestOffset = part.fromOffset
+  override def hasNext(): Boolean = requestOffset < part.untilOffset
 
-    def closeIfNeeded(): Unit = {
-      if (!useConsumerCache && consumer != null) {
-        consumer.close
-      }
+  override def next(): ConsumerRecord[K, V] = {
+    if (!hasNext) {
+      throw new ju.NoSuchElementException("Can't call getNext() once untilOffset has been reached")
     }
+    val r = consumer.get(requestOffset, pollTimeout)
+    requestOffset += 1
+    r
+  }
+}
 
-    override def hasNext(): Boolean = requestOffset < part.untilOffset
-
-    override def next(): ConsumerRecord[K, V] = {
-      assert(hasNext(), "Can't call getNext() once untilOffset has been reached")
-      val r = consumer.get(requestOffset, pollTimeout)
-      requestOffset += 1
-      r
+/**
+ * An iterator that fetches messages directly from Kafka for the offsets in partition.
+ * Uses a cached consumer where possible to take advantage of prefetching.
+ * Intended for compacted topics, or other cases when non-consecutive offsets are ok.
+ */
+private class CompactedKafkaRDDIterator[K, V](
+    part: KafkaRDDPartition,
+    context: TaskContext,
+    kafkaParams: ju.Map[String, Object],
+    useConsumerCache: Boolean,
+    pollTimeout: Long,
+    cacheInitialCapacity: Int,
+    cacheMaxCapacity: Int,
+    cacheLoadFactor: Float
+  ) extends KafkaRDDIterator[K, V](
+    part,
+    context,
+    kafkaParams,
+    useConsumerCache,
+    pollTimeout,
+    cacheInitialCapacity,
+    cacheMaxCapacity,
+    cacheLoadFactor
+  ) {
+
+  consumer.compactedStart(part.fromOffset, pollTimeout)
+
+  private var nextRecord = consumer.compactedNext(pollTimeout)
+
+  private var okNext: Boolean = true
+
+  override def hasNext(): Boolean = okNext
+
+  override def next(): ConsumerRecord[K, V] = {
+    if (!hasNext) {
+      throw new ju.NoSuchElementException("Can't call getNext() once untilOffset has been reached")
+    }
+    val r = nextRecord
+    if (r.offset + 1 >= part.untilOffset) {
+      okNext = false
+    } else {
+      nextRecord = consumer.compactedNext(pollTimeout)
+      if (nextRecord.offset >= part.untilOffset) {
+        okNext = false
+        consumer.compactedPrevious()
+      }
     }
+    r
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/eac0b067/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala
index be373af..271adea 100644
--- a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala
+++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala
@@ -18,16 +18,22 @@
 package org.apache.spark.streaming.kafka010
 
 import java.{ util => ju }
+import java.io.File
 
 import scala.collection.JavaConverters._
 import scala.util.Random
 
+import kafka.common.TopicAndPartition
+import kafka.log._
+import kafka.message._
+import kafka.utils.Pool
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.serialization.StringDeserializer
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.spark._
 import org.apache.spark.scheduler.ExecutorCacheTaskLocation
+import org.apache.spark.streaming.kafka010.mocks.MockTime
 
 class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll {
 
@@ -64,6 +70,41 @@ class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll {
 
   private val preferredHosts = LocationStrategies.PreferConsistent
 
+  private def compactLogs(topic: String, partition: Int, messages: Array[(String, String)]) {
+    val mockTime = new MockTime()
+    // LogCleaner in 0.10 version of Kafka is still expecting the old TopicAndPartition api
+    val logs = new Pool[TopicAndPartition, Log]()
+    val logDir = kafkaTestUtils.brokerLogDir
+    val dir = new File(logDir, topic + "-" + partition)
+    dir.mkdirs()
+    val logProps = new ju.Properties()
+    logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
+    logProps.put(LogConfig.MinCleanableDirtyRatioProp, java.lang.Float.valueOf(0.1f))
+    val log = new Log(
+      dir,
+      LogConfig(logProps),
+      0L,
+      mockTime.scheduler,
+      mockTime
+    )
+    messages.foreach { case (k, v) =>
+      val msg = new ByteBufferMessageSet(
+        NoCompressionCodec,
+        new Message(v.getBytes, k.getBytes, Message.NoTimestamp, Message.CurrentMagicValue))
+      log.append(msg)
+    }
+    log.roll()
+    logs.put(TopicAndPartition(topic, partition), log)
+
+    val cleaner = new LogCleaner(CleanerConfig(), logDirs = Array(dir), logs = logs)
+    cleaner.startup()
+    cleaner.awaitCleaned(topic, partition, log.activeSegment.baseOffset, 1000)
+
+    cleaner.shutdown()
+    mockTime.scheduler.shutdown()
+  }
+
+
   test("basic usage") {
     val topic = s"topicbasic-${Random.nextInt}-${System.currentTimeMillis}"
     kafkaTestUtils.createTopic(topic)
@@ -102,6 +143,71 @@ class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll {
     }
   }
 
+  test("compacted topic") {
+    val compactConf = sparkConf.clone()
+    compactConf.set("spark.streaming.kafka.allowNonConsecutiveOffsets", "true")
+    sc.stop()
+    sc = new SparkContext(compactConf)
+    val topic = s"topiccompacted-${Random.nextInt}-${System.currentTimeMillis}"
+
+    val messages = Array(
+      ("a", "1"),
+      ("a", "2"),
+      ("b", "1"),
+      ("c", "1"),
+      ("c", "2"),
+      ("b", "2"),
+      ("b", "3")
+    )
+    val compactedMessages = Array(
+      ("a", "2"),
+      ("b", "3"),
+      ("c", "2")
+    )
+
+    compactLogs(topic, 0, messages)
+
+    val props = new ju.Properties()
+    props.put("cleanup.policy", "compact")
+    props.put("flush.messages", "1")
+    props.put("segment.ms", "1")
+    props.put("segment.bytes", "256")
+    kafkaTestUtils.createTopic(topic, 1, props)
+
+
+    val kafkaParams = getKafkaParams()
+
+    val offsetRanges = Array(OffsetRange(topic, 0, 0, messages.size))
+
+    val rdd = KafkaUtils.createRDD[String, String](
+      sc, kafkaParams, offsetRanges, preferredHosts
+    ).map(m => m.key -> m.value)
+
+    val received = rdd.collect.toSet
+    assert(received === compactedMessages.toSet)
+
+    // size-related method optimizations return sane results
+    assert(rdd.count === compactedMessages.size)
+    assert(rdd.countApprox(0).getFinalValue.mean === compactedMessages.size)
+    assert(!rdd.isEmpty)
+    assert(rdd.take(1).size === 1)
+    assert(rdd.take(1).head === compactedMessages.head)
+    assert(rdd.take(messages.size + 10).size === compactedMessages.size)
+
+    val emptyRdd = KafkaUtils.createRDD[String, String](
+      sc, kafkaParams, Array(OffsetRange(topic, 0, 0, 0)), preferredHosts)
+
+    assert(emptyRdd.isEmpty)
+
+    // invalid offset ranges throw exceptions
+    val badRanges = Array(OffsetRange(topic, 0, 0, messages.size + 1))
+    intercept[SparkException] {
+      val result = KafkaUtils.createRDD[String, String](sc, kafkaParams, badRanges, preferredHosts)
+        .map(_.value)
+        .collect()
+    }
+  }
+
   test("iterator boundary conditions") {
     // the idea is to find e.g. off-by-one errors between what kafka has available and the rdd
     val topic = s"topicboundary-${Random.nextInt}-${System.currentTimeMillis}"

http://git-wip-us.apache.org/repos/asf/spark/blob/eac0b067/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
index 6c7024e..70b579d 100644
--- a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
+++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
@@ -162,8 +162,8 @@ private[kafka010] class KafkaTestUtils extends Logging {
   }
 
   /** Create a Kafka topic and wait until it is propagated to the whole cluster */
-  def createTopic(topic: String, partitions: Int): Unit = {
-    AdminUtils.createTopic(zkUtils, topic, partitions, 1)
+  def createTopic(topic: String, partitions: Int, config: Properties): Unit = {
+    AdminUtils.createTopic(zkUtils, topic, partitions, 1, config)
     // wait until metadata is propagated
     (0 until partitions).foreach { p =>
       waitUntilMetadataIsPropagated(topic, p)
@@ -171,8 +171,13 @@ private[kafka010] class KafkaTestUtils extends Logging {
   }
 
   /** Create a Kafka topic and wait until it is propagated to the whole cluster */
+  def createTopic(topic: String, partitions: Int): Unit = {
+    createTopic(topic, partitions, new Properties())
+  }
+
+  /** Create a Kafka topic and wait until it is propagated to the whole cluster */
   def createTopic(topic: String): Unit = {
-    createTopic(topic, 1)
+    createTopic(topic, 1, new Properties())
   }
 
   /** Java-friendly function for sending messages to the Kafka broker */
@@ -196,12 +201,24 @@ private[kafka010] class KafkaTestUtils extends Logging {
     producer = null
   }
 
+  /** Send the array of (key, value) messages to the Kafka broker */
+  def sendMessages(topic: String, messages: Array[(String, String)]): Unit = {
+    producer = new KafkaProducer[String, String](producerConfiguration)
+    messages.foreach { message =>
+      producer.send(new ProducerRecord[String, String](topic, message._1, message._2))
+    }
+    producer.close()
+    producer = null
+  }
+
+  val brokerLogDir = Utils.createTempDir().getAbsolutePath
+
   private def brokerConfiguration: Properties = {
     val props = new Properties()
     props.put("broker.id", "0")
     props.put("host.name", "localhost")
     props.put("port", brokerPort.toString)
-    props.put("log.dir", Utils.createTempDir().getAbsolutePath)
+    props.put("log.dir", brokerLogDir)
     props.put("zookeeper.connect", zkAddress)
     props.put("log.flush.interval.messages", "1")
     props.put("replica.socket.timeout.ms", "1500")

http://git-wip-us.apache.org/repos/asf/spark/blob/eac0b067/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/mocks/MockScheduler.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/mocks/MockScheduler.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/mocks/MockScheduler.scala
new file mode 100644
index 0000000..928e1a6
--- /dev/null
+++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/mocks/MockScheduler.scala
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010.mocks
+
+import java.util.concurrent.TimeUnit
+
+import scala.collection.mutable.PriorityQueue
+
+import kafka.utils.{Scheduler, Time}
+
+/**
+ * A mock scheduler that executes tasks synchronously using a mock time instance.
+ * Tasks are executed synchronously when the time is advanced.
+ * This class is meant to be used in conjunction with MockTime.
+ *
+ * Example usage
+ * <code>
+ *   val time = new MockTime
+ *   time.scheduler.schedule("a task", println("hello world: " + time.milliseconds), delay = 1000)
+ *   time.sleep(1001) // this should cause our scheduled task to fire
+ * </code>
+ *
+ * Incrementing the time to the exact next execution time of a task will result in that task
+ * executing (it as if execution itself takes no time).
+ */
+private[kafka010] class MockScheduler(val time: Time) extends Scheduler {
+
+  /* a priority queue of tasks ordered by next execution time */
+  var tasks = new PriorityQueue[MockTask]()
+
+  def isStarted: Boolean = true
+
+  def startup(): Unit = {}
+
+  def shutdown(): Unit = synchronized {
+    tasks.foreach(_.fun())
+    tasks.clear()
+  }
+
+  /**
+   * Check for any tasks that need to execute. Since this is a mock scheduler this check only occurs
+   * when this method is called and the execution happens synchronously in the calling thread.
+   * If you are using the scheduler associated with a MockTime instance this call
+   * will be triggered automatically.
+   */
+  def tick(): Unit = synchronized {
+    val now = time.milliseconds
+    while(!tasks.isEmpty && tasks.head.nextExecution <= now) {
+      /* pop and execute the task with the lowest next execution time */
+      val curr = tasks.dequeue
+      curr.fun()
+      /* if the task is periodic, reschedule it and re-enqueue */
+      if(curr.periodic) {
+        curr.nextExecution += curr.period
+        this.tasks += curr
+      }
+    }
+  }
+
+  def schedule(
+      name: String,
+      fun: () => Unit,
+      delay: Long = 0,
+      period: Long = -1,
+      unit: TimeUnit = TimeUnit.MILLISECONDS): Unit = synchronized {
+    tasks += MockTask(name, fun, time.milliseconds + delay, period = period)
+    tick()
+  }
+
+}
+
+case class MockTask(
+    val name: String,
+    val fun: () => Unit,
+    var nextExecution: Long,
+    val period: Long) extends Ordered[MockTask] {
+  def periodic: Boolean = period >= 0
+  def compare(t: MockTask): Int = {
+    java.lang.Long.compare(t.nextExecution, nextExecution)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/eac0b067/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/mocks/MockTime.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/mocks/MockTime.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/mocks/MockTime.scala
new file mode 100644
index 0000000..a68f94d
--- /dev/null
+++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/mocks/MockTime.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010.mocks
+
+import java.util.concurrent._
+
+import kafka.utils.Time
+
+/**
+ * A class used for unit testing things which depend on the Time interface.
+ *
+ * This class never manually advances the clock, it only does so when you call
+ *   sleep(ms)
+ *
+ * It also comes with an associated scheduler instance for managing background tasks in
+ * a deterministic way.
+ */
+private[kafka010] class MockTime(@volatile private var currentMs: Long) extends Time {
+
+  val scheduler = new MockScheduler(this)
+
+  def this() = this(System.currentTimeMillis)
+
+  def milliseconds: Long = currentMs
+
+  def nanoseconds: Long =
+    TimeUnit.NANOSECONDS.convert(currentMs, TimeUnit.MILLISECONDS)
+
+  def sleep(ms: Long) {
+    this.currentMs += ms
+    scheduler.tick()
+  }
+
+  override def toString(): String = s"MockTime($milliseconds)"
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org