You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2015/02/10 07:45:53 UTC

spark git commit: [SPARK-4964][Streaming][Kafka] More updates to Exactly-once Kafka stream

Repository: spark
Updated Branches:
  refs/heads/master ef2f55b97 -> c15134632


[SPARK-4964][Streaming][Kafka] More updates to Exactly-once Kafka stream

Changes
- Added example
- Added a critical unit test that verifies that offset ranges can be recovered through checkpoints

Might add more changes.

Author: Tathagata Das <ta...@gmail.com>

Closes #4384 from tdas/new-kafka-fixes and squashes the following commits:

7c931c3 [Tathagata Das] Small update
3ed9284 [Tathagata Das] updated scala doc
83d0402 [Tathagata Das] Added JavaDirectKafkaWordCount example.
26df23c [Tathagata Das] Updates based on PR comments from Cody
e4abf69 [Tathagata Das] Scala doc improvements and stuff.
bb65232 [Tathagata Das] Fixed test bug and refactored KafkaStreamSuite
50f2b56 [Tathagata Das] Added Java API and added more Scala and Java unit tests. Also updated docs.
e73589c [Tathagata Das] Minor changes.
4986784 [Tathagata Das] Added unit test to kafka offset recovery
6a91cab [Tathagata Das] Added example


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c1513463
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c1513463
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c1513463

Branch: refs/heads/master
Commit: c15134632e74e3dee05eda20c6ef79915e15d02e
Parents: ef2f55b
Author: Tathagata Das <ta...@gmail.com>
Authored: Mon Feb 9 22:45:48 2015 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Mon Feb 9 22:45:48 2015 -0800

----------------------------------------------------------------------
 .../streaming/JavaDirectKafkaWordCount.java     | 113 ++++++
 .../streaming/DirectKafkaWordCount.scala        |  71 ++++
 .../kafka/DirectKafkaInputDStream.scala         |   5 +-
 .../spark/streaming/kafka/KafkaCluster.scala    |   3 +
 .../apache/spark/streaming/kafka/KafkaRDD.scala |  12 +-
 .../streaming/kafka/KafkaRDDPartition.scala     |  23 +-
 .../spark/streaming/kafka/KafkaUtils.scala      | 353 ++++++++++++++-----
 .../apache/spark/streaming/kafka/Leader.scala   |  21 +-
 .../spark/streaming/kafka/OffsetRange.scala     |  53 ++-
 .../kafka/JavaDirectKafkaStreamSuite.java       | 159 +++++++++
 .../streaming/kafka/JavaKafkaStreamSuite.java   |   5 +-
 .../kafka/DirectKafkaStreamSuite.scala          | 302 ++++++++++++++++
 .../streaming/kafka/KafkaClusterSuite.scala     |  24 +-
 .../kafka/KafkaDirectStreamSuite.scala          |  92 -----
 .../spark/streaming/kafka/KafkaRDDSuite.scala   |   8 +-
 .../streaming/kafka/KafkaStreamSuite.scala      |  62 ++--
 .../kafka/ReliableKafkaStreamSuite.scala        |   4 +-
 17 files changed, 1048 insertions(+), 262 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c1513463/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java
----------------------------------------------------------------------
diff --git a/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java b/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java
new file mode 100644
index 0000000..bab9f24
--- /dev/null
+++ b/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.streaming;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Arrays;
+import java.util.regex.Pattern;
+
+import scala.Tuple2;
+
+import com.google.common.collect.Lists;
+import kafka.serializer.StringDecoder;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.function.*;
+import org.apache.spark.streaming.api.java.*;
+import org.apache.spark.streaming.kafka.KafkaUtils;
+import org.apache.spark.streaming.Durations;
+
+/**
+ * Consumes messages from one or more topics in Kafka and does wordcount.
+ * Usage: DirectKafkaWordCount <brokers> <topics>
+ *   <brokers> is a list of one or more Kafka brokers
+ *   <topics> is a list of one or more kafka topics to consume from
+ *
+ * Example:
+ *    $ bin/run-example streaming.KafkaWordCount broker1-host:port,broker2-host:port topic1,topic2
+ */
+
+public final class JavaDirectKafkaWordCount {
+  private static final Pattern SPACE = Pattern.compile(" ");
+
+  public static void main(String[] args) {
+    if (args.length < 2) {
+      System.err.println("Usage: DirectKafkaWordCount <brokers> <topics>\n" +
+          "  <brokers> is a list of one or more Kafka brokers\n" +
+          "  <topics> is a list of one or more kafka topics to consume from\n\n");
+      System.exit(1);
+    }
+
+    StreamingExamples.setStreamingLogLevels();
+
+    String brokers = args[0];
+    String topics = args[1];
+
+    // Create context with 2 second batch interval
+    SparkConf sparkConf = new SparkConf().setAppName("JavaDirectKafkaWordCount");
+    JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2));
+
+    HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(topics.split(",")));
+    HashMap<String, String> kafkaParams = new HashMap<String, String>();
+    kafkaParams.put("metadata.broker.list", brokers);
+
+    // Create direct kafka stream with brokers and topics
+    JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
+        jssc,
+        String.class,
+        String.class,
+        StringDecoder.class,
+        StringDecoder.class,
+        kafkaParams,
+        topicsSet
+    );
+
+    // Get the lines, split them into words, count the words and print
+    JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
+      @Override
+      public String call(Tuple2<String, String> tuple2) {
+        return tuple2._2();
+      }
+    });
+    JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
+      @Override
+      public Iterable<String> call(String x) {
+        return Lists.newArrayList(SPACE.split(x));
+      }
+    });
+    JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
+      new PairFunction<String, String, Integer>() {
+        @Override
+        public Tuple2<String, Integer> call(String s) {
+          return new Tuple2<String, Integer>(s, 1);
+        }
+      }).reduceByKey(
+        new Function2<Integer, Integer, Integer>() {
+        @Override
+        public Integer call(Integer i1, Integer i2) {
+          return i1 + i2;
+        }
+      });
+    wordCounts.print();
+
+    // Start the computation
+    jssc.start();
+    jssc.awaitTermination();
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/c1513463/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala
----------------------------------------------------------------------
diff --git a/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala b/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala
new file mode 100644
index 0000000..deb08fd
--- /dev/null
+++ b/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.streaming
+
+import kafka.serializer.StringDecoder
+
+import org.apache.spark.streaming._
+import org.apache.spark.streaming.kafka._
+import org.apache.spark.SparkConf
+
+/**
+ * Consumes messages from one or more topics in Kafka and does wordcount.
+ * Usage: DirectKafkaWordCount <brokers> <topics>
+ *   <brokers> is a list of one or more Kafka brokers
+ *   <topics> is a list of one or more kafka topics to consume from
+ *
+ * Example:
+ *    $ bin/run-example streaming.KafkaWordCount broker1-host:port,broker2-host:port topic1,topic2
+ */
+object DirectKafkaWordCount {
+  def main(args: Array[String]) {
+    if (args.length < 2) {
+      System.err.println(s"""
+        |Usage: DirectKafkaWordCount <brokers> <topics>
+        |  <brokers> is a list of one or more Kafka brokers
+        |  <topics> is a list of one or more kafka topics to consume from
+        |
+        """".stripMargin)
+      System.exit(1)
+    }
+
+    StreamingExamples.setStreamingLogLevels()
+
+    val Array(brokers, topics) = args
+
+    // Create context with 2 second batch interval
+    val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")
+    val ssc =  new StreamingContext(sparkConf, Seconds(2))
+
+    // Create direct kafka stream with brokers and topics
+    val topicsSet = topics.split(",").toSet
+    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
+    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
+      ssc, kafkaParams, topicsSet)
+
+    // Get the lines, split them into words, count the words and print
+    val lines = messages.map(_._2)
+    val words = lines.flatMap(_.split(" "))
+    val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
+    wordCounts.print()
+
+    // Start the computation
+    ssc.start()
+    ssc.awaitTermination()
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/c1513463/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
index c7bca43..04e65cb 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
@@ -50,14 +50,13 @@ import org.apache.spark.streaming.dstream._
  * @param fromOffsets per-topic/partition Kafka offsets defining the (inclusive)
  *  starting point of the stream
  * @param messageHandler function for translating each message into the desired type
- * @param maxRetries maximum number of times in a row to retry getting leaders' offsets
  */
 private[streaming]
 class DirectKafkaInputDStream[
   K: ClassTag,
   V: ClassTag,
-  U <: Decoder[_]: ClassTag,
-  T <: Decoder[_]: ClassTag,
+  U <: Decoder[K]: ClassTag,
+  T <: Decoder[V]: ClassTag,
   R: ClassTag](
     @transient ssc_ : StreamingContext,
     val kafkaParams: Map[String, String],

http://git-wip-us.apache.org/repos/asf/spark/blob/c1513463/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala
index ccc62bf..2f7e0ab 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala
@@ -332,6 +332,9 @@ object KafkaCluster {
       extends ConsumerConfig(originalProps) {
     val seedBrokers: Array[(String, Int)] = brokers.split(",").map { hp =>
       val hpa = hp.split(":")
+      if (hpa.size == 1) {
+        throw new SparkException(s"Broker not the in correct format of <host>:<port> [$brokers]")
+      }
       (hpa(0), hpa(1).toInt)
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/c1513463/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala
index 50bf7cb..d56cc01 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala
@@ -36,14 +36,12 @@ import kafka.utils.VerifiableProperties
  * Starting and ending offsets are specified in advance,
  * so that you can control exactly-once semantics.
  * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
- * configuration parameters</a>.
- *   Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
- *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
- * @param batch Each KafkaRDDPartition in the batch corresponds to a
- *   range of offsets for a given Kafka topic/partition
+ * configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers" to be set
+ * with Kafka broker(s) specified in host1:port1,host2:port2 form.
+ * @param offsetRanges offset ranges that define the Kafka data belonging to this RDD
  * @param messageHandler function for translating each message into the desired type
  */
-private[spark]
+private[kafka]
 class KafkaRDD[
   K: ClassTag,
   V: ClassTag,
@@ -183,7 +181,7 @@ class KafkaRDD[
   }
 }
 
-private[spark]
+private[kafka]
 object KafkaRDD {
   import KafkaCluster.LeaderOffset
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c1513463/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDDPartition.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDDPartition.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDDPartition.scala
index 36372e0..a842a6f 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDDPartition.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDDPartition.scala
@@ -26,7 +26,7 @@ import org.apache.spark.Partition
   * @param host preferred kafka host, i.e. the leader at the time the rdd was created
   * @param port preferred kafka host's port
   */
-private[spark]
+private[kafka]
 class KafkaRDDPartition(
   val index: Int,
   val topic: String,
@@ -36,24 +36,3 @@ class KafkaRDDPartition(
   val host: String,
   val port: Int
 ) extends Partition
-
-private[spark]
-object KafkaRDDPartition {
-  def apply(
-    index: Int,
-    topic: String,
-    partition: Int,
-    fromOffset: Long,
-    untilOffset: Long,
-    host: String,
-    port: Int
-  ): KafkaRDDPartition = new KafkaRDDPartition(
-    index,
-    topic,
-    partition,
-    fromOffset,
-    untilOffset,
-    host,
-    port
-  )
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/c1513463/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
index f8aa6c5..7a2c3ab 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
@@ -18,7 +18,9 @@
 package org.apache.spark.streaming.kafka
 
 import java.lang.{Integer => JInt}
+import java.lang.{Long => JLong}
 import java.util.{Map => JMap}
+import java.util.{Set => JSet}
 
 import scala.reflect.ClassTag
 import scala.collection.JavaConversions._
@@ -27,18 +29,19 @@ import kafka.common.TopicAndPartition
 import kafka.message.MessageAndMetadata
 import kafka.serializer.{Decoder, StringDecoder}
 
-
+import org.apache.spark.api.java.function.{Function => JFunction}
 import org.apache.spark.{SparkContext, SparkException}
 import org.apache.spark.annotation.Experimental
 import org.apache.spark.rdd.RDD
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.api.java.{JavaPairReceiverInputDStream, JavaStreamingContext}
+import org.apache.spark.streaming.api.java.{JavaPairInputDStream, JavaInputDStream, JavaPairReceiverInputDStream, JavaStreamingContext}
 import org.apache.spark.streaming.dstream.{InputDStream, ReceiverInputDStream}
+import org.apache.spark.api.java.{JavaSparkContext, JavaPairRDD, JavaRDD}
 
 object KafkaUtils {
   /**
-   * Create an input stream that pulls messages from a Kafka Broker.
+   * Create an input stream that pulls messages from Kafka Brokers.
    * @param ssc       StreamingContext object
    * @param zkQuorum  Zookeeper quorum (hostname:port,hostname:port,..)
    * @param groupId   The group id for this consumer
@@ -62,7 +65,7 @@ object KafkaUtils {
   }
 
   /**
-   * Create an input stream that pulls messages from a Kafka Broker.
+   * Create an input stream that pulls messages from Kafka Brokers.
    * @param ssc         StreamingContext object
    * @param kafkaParams Map of kafka configuration parameters,
    *                    see http://kafka.apache.org/08/configuration.html
@@ -81,7 +84,7 @@ object KafkaUtils {
   }
 
   /**
-   * Create an input stream that pulls messages from a Kafka Broker.
+   * Create an input stream that pulls messages from Kafka Brokers.
    * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
    * @param jssc      JavaStreamingContext object
    * @param zkQuorum  Zookeeper quorum (hostname:port,hostname:port,..)
@@ -99,7 +102,7 @@ object KafkaUtils {
   }
 
   /**
-   * Create an input stream that pulls messages from a Kafka Broker.
+   * Create an input stream that pulls messages from Kafka Brokers.
    * @param jssc      JavaStreamingContext object
    * @param zkQuorum  Zookeeper quorum (hostname:port,hostname:port,..).
    * @param groupId   The group id for this consumer.
@@ -119,10 +122,10 @@ object KafkaUtils {
   }
 
   /**
-   * Create an input stream that pulls messages from a Kafka Broker.
+   * Create an input stream that pulls messages from Kafka Brokers.
    * @param jssc      JavaStreamingContext object
-   * @param keyTypeClass Key type of RDD
-   * @param valueTypeClass value type of RDD
+   * @param keyTypeClass Key type of DStream
+   * @param valueTypeClass value type of Dstream
    * @param keyDecoderClass Type of kafka key decoder
    * @param valueDecoderClass Type of kafka value decoder
    * @param kafkaParams Map of kafka configuration parameters,
@@ -151,14 +154,14 @@ object KafkaUtils {
       jssc.ssc, kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
   }
 
-  /** A batch-oriented interface for consuming from Kafka.
-   * Starting and ending offsets are specified in advance,
-   * so that you can control exactly-once semantics.
+  /**
+   * Create a RDD from Kafka using offset ranges for each topic and partition.
+   *
    * @param sc SparkContext object
    * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
-   * configuration parameters</a>.
-   *   Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
-   *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+   *    configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers"
+   *    to be set with Kafka broker(s) (NOT zookeeper servers) specified in
+   *    host1:port1,host2:port2 form.
    * @param offsetRanges Each OffsetRange in the batch corresponds to a
    *   range of offsets for a given Kafka topic/partition
    */
@@ -166,12 +169,12 @@ object KafkaUtils {
   def createRDD[
     K: ClassTag,
     V: ClassTag,
-    U <: Decoder[_]: ClassTag,
-    T <: Decoder[_]: ClassTag] (
+    KD <: Decoder[K]: ClassTag,
+    VD <: Decoder[V]: ClassTag](
       sc: SparkContext,
       kafkaParams: Map[String, String],
       offsetRanges: Array[OffsetRange]
-  ): RDD[(K, V)] = {
+    ): RDD[(K, V)] = {
     val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message)
     val kc = new KafkaCluster(kafkaParams)
     val topics = offsetRanges.map(o => TopicAndPartition(o.topic, o.partition)).toSet
@@ -179,121 +182,196 @@ object KafkaUtils {
       errs => throw new SparkException(errs.mkString("\n")),
       ok => ok
     )
-    new KafkaRDD[K, V, U, T, (K, V)](sc, kafkaParams, offsetRanges, leaders, messageHandler)
+    new KafkaRDD[K, V, KD, VD, (K, V)](sc, kafkaParams, offsetRanges, leaders, messageHandler)
   }
 
-  /** A batch-oriented interface for consuming from Kafka.
-   * Starting and ending offsets are specified in advance,
-   * so that you can control exactly-once semantics.
+  /**
+   * :: Experimental ::
+   * Create a RDD from Kafka using offset ranges for each topic and partition. This allows you
+   * specify the Kafka leader to connect to (to optimize fetching) and access the message as well
+   * as the metadata.
+   *
    * @param sc SparkContext object
    * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
-   * configuration parameters</a>.
-   *   Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
-   *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+   *    configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers"
+   *    to be set with Kafka broker(s) (NOT zookeeper servers) specified in
+   *    host1:port1,host2:port2 form.
    * @param offsetRanges Each OffsetRange in the batch corresponds to a
    *   range of offsets for a given Kafka topic/partition
    * @param leaders Kafka leaders for each offset range in batch
-   * @param messageHandler function for translating each message into the desired type
+   * @param messageHandler Function for translating each message and metadata into the desired type
    */
   @Experimental
   def createRDD[
     K: ClassTag,
     V: ClassTag,
-    U <: Decoder[_]: ClassTag,
-    T <: Decoder[_]: ClassTag,
-    R: ClassTag] (
+    KD <: Decoder[K]: ClassTag,
+    VD <: Decoder[V]: ClassTag,
+    R: ClassTag](
       sc: SparkContext,
       kafkaParams: Map[String, String],
       offsetRanges: Array[OffsetRange],
       leaders: Array[Leader],
       messageHandler: MessageAndMetadata[K, V] => R
-  ): RDD[R] = {
-
+    ): RDD[R] = {
     val leaderMap = leaders
       .map(l => TopicAndPartition(l.topic, l.partition) -> (l.host, l.port))
       .toMap
-    new KafkaRDD[K, V, U, T, R](sc, kafkaParams, offsetRanges, leaderMap, messageHandler)
+    new KafkaRDD[K, V, KD, VD, R](sc, kafkaParams, offsetRanges, leaderMap, messageHandler)
   }
 
+
   /**
-   * This stream can guarantee that each message from Kafka is included in transformations
-   * (as opposed to output actions) exactly once, even in most failure situations.
+   * Create a RDD from Kafka using offset ranges for each topic and partition.
    *
-   * Points to note:
-   *
-   * Failure Recovery - You must checkpoint this stream, or save offsets yourself and provide them
-   * as the fromOffsets parameter on restart.
-   * Kafka must have sufficient log retention to obtain messages after failure.
-   *
-   * Getting offsets from the stream - see programming guide
+   * @param jsc JavaSparkContext object
+   * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
+   *    configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers"
+   *    to be set with Kafka broker(s) (NOT zookeeper servers) specified in
+   *    host1:port1,host2:port2 form.
+   * @param offsetRanges Each OffsetRange in the batch corresponds to a
+   *   range of offsets for a given Kafka topic/partition
+   */
+  @Experimental
+  def createRDD[K, V, KD <: Decoder[K], VD <: Decoder[V]](
+      jsc: JavaSparkContext,
+      keyClass: Class[K],
+      valueClass: Class[V],
+      keyDecoderClass: Class[KD],
+      valueDecoderClass: Class[VD],
+      kafkaParams: JMap[String, String],
+      offsetRanges: Array[OffsetRange]
+    ): JavaPairRDD[K, V] = {
+    implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
+    implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
+    implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
+    implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass)
+    new JavaPairRDD(createRDD[K, V, KD, VD](
+      jsc.sc, Map(kafkaParams.toSeq: _*), offsetRanges))
+  }
+
+  /**
+   * :: Experimental ::
+   * Create a RDD from Kafka using offset ranges for each topic and partition. This allows you
+   * specify the Kafka leader to connect to (to optimize fetching) and access the message as well
+   * as the metadata.
    *
-.  * Zookeeper - This does not use Zookeeper to store offsets.  For interop with Kafka monitors
-   * that depend on Zookeeper, you must store offsets in ZK yourself.
+   * @param jsc JavaSparkContext object
+   * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
+   *    configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers"
+   *    to be set with Kafka broker(s) (NOT zookeeper servers) specified in
+   *    host1:port1,host2:port2 form.
+   * @param offsetRanges Each OffsetRange in the batch corresponds to a
+   *   range of offsets for a given Kafka topic/partition
+   * @param leaders Kafka leaders for each offset range in batch
+   * @param messageHandler Function for translating each message and metadata into the desired type
+   */
+  @Experimental
+  def createRDD[K, V, KD <: Decoder[K], VD <: Decoder[V], R](
+      jsc: JavaSparkContext,
+      keyClass: Class[K],
+      valueClass: Class[V],
+      keyDecoderClass: Class[KD],
+      valueDecoderClass: Class[VD],
+      recordClass: Class[R],
+      kafkaParams: JMap[String, String],
+      offsetRanges: Array[OffsetRange],
+      leaders: Array[Leader],
+      messageHandler: JFunction[MessageAndMetadata[K, V], R]
+    ): JavaRDD[R] = {
+    implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
+    implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
+    implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
+    implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass)
+    implicit val recordCmt: ClassTag[R] = ClassTag(recordClass)
+    createRDD[K, V, KD, VD, R](
+      jsc.sc, Map(kafkaParams.toSeq: _*), offsetRanges, leaders, messageHandler.call _)
+  }
+
+  /**
+   * :: Experimental ::
+   * Create an input stream that directly pulls messages from Kafka Brokers
+   * without using any receiver. This stream can guarantee that each message
+   * from Kafka is included in transformations exactly once (see points below).
    *
-   * End-to-end semantics - This does not guarantee that any output operation will push each record
-   * exactly once. To ensure end-to-end exactly-once semantics (that is, receiving exactly once and
-   * outputting exactly once), you have to either ensure that the output operation is
-   * idempotent, or transactionally store offsets with the output. See the programming guide for
-   * more details.
+   * Points to note:
+   *  - No receivers: This stream does not use any receiver. It directly queries Kafka
+   *  - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked
+   *    by the stream itself. For interoperability with Kafka monitoring tools that depend on 
+   *    Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application.
+   *    You can access the offsets used in each batch from the generated RDDs (see
+   *    [[org.apache.spark.streaming.kafka.HasOffsetRanges]]).
+   *  - Failure Recovery: To recover from driver failures, you have to enable checkpointing
+   *    in the [[StreamingContext]]. The information on consumed offset can be
+   *    recovered from the checkpoint. See the programming guide for details (constraints, etc.).
+   *  - End-to-end semantics: This stream ensures that every records is effectively received and
+   *    transformed exactly once, but gives no guarantees on whether the transformed data are
+   *    outputted exactly once. For end-to-end exactly-once semantics, you have to either ensure
+   *    that the output operation is idempotent, or use transactions to output records atomically.
+   *    See the programming guide for more details.
    *
    * @param ssc StreamingContext object
    * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
-   * configuration parameters</a>.
-   *   Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
-   *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
-   * @param messageHandler function for translating each message into the desired type
-   * @param fromOffsets per-topic/partition Kafka offsets defining the (inclusive)
-   *  starting point of the stream
+   *    configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers"
+   *    to be set with Kafka broker(s) (NOT zookeeper servers) specified in
+   *    host1:port1,host2:port2 form.
+   * @param fromOffsets Per-topic/partition Kafka offsets defining the (inclusive)
+   *    starting point of the stream
+   * @param messageHandler Function for translating each message and metadata into the desired type
    */
   @Experimental
   def createDirectStream[
     K: ClassTag,
     V: ClassTag,
-    U <: Decoder[_]: ClassTag,
-    T <: Decoder[_]: ClassTag,
+    KD <: Decoder[K]: ClassTag,
+    VD <: Decoder[V]: ClassTag,
     R: ClassTag] (
       ssc: StreamingContext,
       kafkaParams: Map[String, String],
       fromOffsets: Map[TopicAndPartition, Long],
       messageHandler: MessageAndMetadata[K, V] => R
   ): InputDStream[R] = {
-    new DirectKafkaInputDStream[K, V, U, T, R](
+    new DirectKafkaInputDStream[K, V, KD, VD, R](
       ssc, kafkaParams, fromOffsets, messageHandler)
   }
 
   /**
-   * This stream can guarantee that each message from Kafka is included in transformations
-   * (as opposed to output actions) exactly once, even in most failure situations.
+   * :: Experimental ::
+   * Create an input stream that directly pulls messages from Kafka Brokers
+   * without using any receiver. This stream can guarantee that each message
+   * from Kafka is included in transformations exactly once (see points below).
    *
    * Points to note:
-   *
-   * Failure Recovery - You must checkpoint this stream.
-   * Kafka must have sufficient log retention to obtain messages after failure.
-   *
-   * Getting offsets from the stream - see programming guide
-   *
-.  * Zookeeper - This does not use Zookeeper to store offsets.  For interop with Kafka monitors
-   * that depend on Zookeeper, you must store offsets in ZK yourself.
-   *
-   * End-to-end semantics - This does not guarantee that any output operation will push each record
-   * exactly once. To ensure end-to-end exactly-once semantics (that is, receiving exactly once and
-   * outputting exactly once), you have to ensure that the output operation is idempotent.
+   *  - No receivers: This stream does not use any receiver. It directly queries Kafka
+   *  - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked
+   *    by the stream itself. For interoperability with Kafka monitoring tools that depend on 
+   *    Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application.
+   *    You can access the offsets used in each batch from the generated RDDs (see
+   *    [[org.apache.spark.streaming.kafka.HasOffsetRanges]]).
+   *  - Failure Recovery: To recover from driver failures, you have to enable checkpointing
+   *    in the [[StreamingContext]]. The information on consumed offset can be
+   *    recovered from the checkpoint. See the programming guide for details (constraints, etc.).
+   *  - End-to-end semantics: This stream ensures that every records is effectively received and
+   *    transformed exactly once, but gives no guarantees on whether the transformed data are
+   *    outputted exactly once. For end-to-end exactly-once semantics, you have to either ensure
+   *    that the output operation is idempotent, or use transactions to output records atomically.
+   *    See the programming guide for more details.
    *
    * @param ssc StreamingContext object
    * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
-   * configuration parameters</a>.
-   *   Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
-   *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
-   *   If starting without a checkpoint, "auto.offset.reset" may be set to "largest" or "smallest"
+   *   configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers"
+   *   to be set with Kafka broker(s) (NOT zookeeper servers), specified in
+   *   host1:port1,host2:port2 form.
+   *   If not starting from a checkpoint, "auto.offset.reset" may be set to "largest" or "smallest"
    *   to determine where the stream starts (defaults to "largest")
-   * @param topics names of the topics to consume
+   * @param topics Names of the topics to consume
    */
   @Experimental
   def createDirectStream[
     K: ClassTag,
     V: ClassTag,
-    U <: Decoder[_]: ClassTag,
-    T <: Decoder[_]: ClassTag] (
+    KD <: Decoder[K]: ClassTag,
+    VD <: Decoder[V]: ClassTag] (
       ssc: StreamingContext,
       kafkaParams: Map[String, String],
       topics: Set[String]
@@ -313,11 +391,128 @@ object KafkaUtils {
       val fromOffsets = leaderOffsets.map { case (tp, lo) =>
           (tp, lo.offset)
       }
-      new DirectKafkaInputDStream[K, V, U, T, (K, V)](
+      new DirectKafkaInputDStream[K, V, KD, VD, (K, V)](
         ssc, kafkaParams, fromOffsets, messageHandler)
     }).fold(
       errs => throw new SparkException(errs.mkString("\n")),
       ok => ok
     )
   }
+
+  /**
+   * :: Experimental ::
+   * Create an input stream that directly pulls messages from Kafka Brokers
+   * without using any receiver. This stream can guarantee that each message
+   * from Kafka is included in transformations exactly once (see points below).
+   *
+   * Points to note:
+   *  - No receivers: This stream does not use any receiver. It directly queries Kafka
+   *  - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked
+   *    by the stream itself. For interoperability with Kafka monitoring tools that depend on 
+   *    Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application.
+   *    You can access the offsets used in each batch from the generated RDDs (see
+   *    [[org.apache.spark.streaming.kafka.HasOffsetRanges]]).
+   *  - Failure Recovery: To recover from driver failures, you have to enable checkpointing
+   *    in the [[StreamingContext]]. The information on consumed offset can be
+   *    recovered from the checkpoint. See the programming guide for details (constraints, etc.).
+   *  - End-to-end semantics: This stream ensures that every records is effectively received and
+   *    transformed exactly once, but gives no guarantees on whether the transformed data are
+   *    outputted exactly once. For end-to-end exactly-once semantics, you have to either ensure
+   *    that the output operation is idempotent, or use transactions to output records atomically.
+   *    See the programming guide for more details.
+   *
+   * @param jssc JavaStreamingContext object
+   * @param keyClass Class of the keys in the Kafka records
+   * @param valueClass Class of the values in the Kafka records
+   * @param keyDecoderClass Class of the key decoder
+   * @param valueDecoderClass Class of the value decoder
+   * @param recordClass Class of the records in DStream
+   * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
+   *   configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers"
+   *   to be set with Kafka broker(s) (NOT zookeeper servers), specified in
+   *   host1:port1,host2:port2 form.
+   * @param fromOffsets Per-topic/partition Kafka offsets defining the (inclusive)
+   *    starting point of the stream
+   * @param messageHandler Function for translating each message and metadata into the desired type
+   */
+  @Experimental
+  def createDirectStream[K, V, KD <: Decoder[K], VD <: Decoder[V], R](
+      jssc: JavaStreamingContext,
+      keyClass: Class[K],
+      valueClass: Class[V],
+      keyDecoderClass: Class[KD],
+      valueDecoderClass: Class[VD],
+      recordClass: Class[R],
+      kafkaParams: JMap[String, String],
+      fromOffsets: JMap[TopicAndPartition, JLong],
+      messageHandler: JFunction[MessageAndMetadata[K, V], R]
+    ): JavaInputDStream[R] = {
+    implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
+    implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
+    implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
+    implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass)
+    implicit val recordCmt: ClassTag[R] = ClassTag(recordClass)
+    createDirectStream[K, V, KD, VD, R](
+      jssc.ssc,
+      Map(kafkaParams.toSeq: _*),
+      Map(fromOffsets.mapValues { _.longValue() }.toSeq: _*),
+      messageHandler.call _
+    )
+  }
+
+  /**
+   * :: Experimental ::
+   * Create an input stream that directly pulls messages from Kafka Brokers
+   * without using any receiver. This stream can guarantee that each message
+   * from Kafka is included in transformations exactly once (see points below).
+   *
+   * Points to note:
+   *  - No receivers: This stream does not use any receiver. It directly queries Kafka
+   *  - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked
+   *    by the stream itself. For interoperability with Kafka monitoring tools that depend on 
+   *    Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application.
+   *    You can access the offsets used in each batch from the generated RDDs (see
+   *    [[org.apache.spark.streaming.kafka.HasOffsetRanges]]).
+   *  - Failure Recovery: To recover from driver failures, you have to enable checkpointing
+   *    in the [[StreamingContext]]. The information on consumed offset can be
+   *    recovered from the checkpoint. See the programming guide for details (constraints, etc.).
+   *  - End-to-end semantics: This stream ensures that every records is effectively received and
+   *    transformed exactly once, but gives no guarantees on whether the transformed data are
+   *    outputted exactly once. For end-to-end exactly-once semantics, you have to either ensure
+   *    that the output operation is idempotent, or use transactions to output records atomically.
+   *    See the programming guide for more details.
+   *
+   * @param jssc JavaStreamingContext object
+   * @param keyClass Class of the keys in the Kafka records
+   * @param valueClass Class of the values in the Kafka records
+   * @param keyDecoderClass Class of the key decoder
+   * @param valueDecoderClass Class type of the value decoder
+   * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
+   *   configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers"
+   *   to be set with Kafka broker(s) (NOT zookeeper servers), specified in
+   *   host1:port1,host2:port2 form.
+   *   If not starting from a checkpoint, "auto.offset.reset" may be set to "largest" or "smallest"
+   *   to determine where the stream starts (defaults to "largest")
+   * @param topics Names of the topics to consume
+   */
+  @Experimental
+  def createDirectStream[K, V, KD <: Decoder[K], VD <: Decoder[V], R](
+      jssc: JavaStreamingContext,
+      keyClass: Class[K],
+      valueClass: Class[V],
+      keyDecoderClass: Class[KD],
+      valueDecoderClass: Class[VD],
+      kafkaParams: JMap[String, String],
+      topics: JSet[String]
+    ): JavaPairInputDStream[K, V] = {
+    implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
+    implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
+    implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
+    implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass)
+    createDirectStream[K, V, KD, VD](
+      jssc.ssc,
+      Map(kafkaParams.toSeq: _*),
+      Set(topics.toSeq: _*)
+    )
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/c1513463/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Leader.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Leader.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Leader.scala
index 3454d92..c129a26 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Leader.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Leader.scala
@@ -19,17 +19,28 @@ package org.apache.spark.streaming.kafka
 
 import kafka.common.TopicAndPartition
 
-/** Host info for the leader of a Kafka TopicAndPartition */
+import org.apache.spark.annotation.Experimental
+
+/**
+ * :: Experimental ::
+ * Represent the host info for the leader of a Kafka partition.
+ */
+@Experimental
 final class Leader private(
-    /** kafka topic name */
+    /** Kafka topic name */
     val topic: String,
-    /** kafka partition id */
+    /** Kafka partition id */
     val partition: Int,
-    /** kafka hostname */
+    /** Leader's hostname */
     val host: String,
-    /** kafka host's port */
+    /** Leader's port */
     val port: Int) extends Serializable
 
+/**
+ * :: Experimental ::
+ * Companion object the provides methods to create instances of [[Leader]].
+ */
+@Experimental
 object Leader {
   def create(topic: String, partition: Int, host: String, port: Int): Leader =
     new Leader(topic, partition, host, port)

http://git-wip-us.apache.org/repos/asf/spark/blob/c1513463/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala
index 334c12e..9c3dfeb 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala
@@ -19,16 +19,35 @@ package org.apache.spark.streaming.kafka
 
 import kafka.common.TopicAndPartition
 
-/** Something that has a collection of OffsetRanges */
+import org.apache.spark.annotation.Experimental
+
+/**
+ * :: Experimental ::
+ * Represents any object that has a collection of [[OffsetRange]]s. This can be used access the
+ * offset ranges in RDDs generated by the direct Kafka DStream (see
+ * [[KafkaUtils.createDirectStream()]]).
+ * {{{
+ *   KafkaUtils.createDirectStream(...).foreachRDD { rdd =>
+ *      val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
+ *      ...
+ *   }
+ * }}}
+ */
+@Experimental
 trait HasOffsetRanges {
   def offsetRanges: Array[OffsetRange]
 }
 
-/** Represents a range of offsets from a single Kafka TopicAndPartition */
+/**
+ * :: Experimental ::
+ * Represents a range of offsets from a single Kafka TopicAndPartition. Instances of this class
+ * can be created with `OffsetRange.create()`.
+ */
+@Experimental
 final class OffsetRange private(
-    /** kafka topic name */
+    /** Kafka topic name */
     val topic: String,
-    /** kafka partition id */
+    /** Kafka partition id */
     val partition: Int,
     /** inclusive starting offset */
     val fromOffset: Long,
@@ -36,11 +55,33 @@ final class OffsetRange private(
     val untilOffset: Long) extends Serializable {
   import OffsetRange.OffsetRangeTuple
 
+  override def equals(obj: Any): Boolean = obj match {
+    case that: OffsetRange =>
+      this.topic == that.topic &&
+        this.partition == that.partition &&
+        this.fromOffset == that.fromOffset &&
+        this.untilOffset == that.untilOffset
+    case _ => false
+  }
+
+  override def hashCode(): Int = {
+    toTuple.hashCode()
+  }
+
+  override def toString(): String = {
+    s"OffsetRange(topic: '$topic', partition: $partition, range: [$fromOffset -> $untilOffset]"
+  }
+
   /** this is to avoid ClassNotFoundException during checkpoint restore */
   private[streaming]
   def toTuple: OffsetRangeTuple = (topic, partition, fromOffset, untilOffset)
 }
 
+/**
+ * :: Experimental ::
+ * Companion object the provides methods to create instances of [[OffsetRange]].
+ */
+@Experimental
 object OffsetRange {
   def create(topic: String, partition: Int, fromOffset: Long, untilOffset: Long): OffsetRange =
     new OffsetRange(topic, partition, fromOffset, untilOffset)
@@ -61,10 +102,10 @@ object OffsetRange {
     new OffsetRange(topicAndPartition.topic, topicAndPartition.partition, fromOffset, untilOffset)
 
   /** this is to avoid ClassNotFoundException during checkpoint restore */
-  private[spark]
+  private[kafka]
   type OffsetRangeTuple = (String, Int, Long, Long)
 
-  private[streaming]
+  private[kafka]
   def apply(t: OffsetRangeTuple) =
     new OffsetRange(t._1, t._2, t._3, t._4)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/c1513463/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
----------------------------------------------------------------------
diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
new file mode 100644
index 0000000..1334cc8
--- /dev/null
+++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Random;
+import java.util.Arrays;
+
+import org.apache.spark.SparkConf;
+
+import scala.Tuple2;
+
+import junit.framework.Assert;
+
+import kafka.common.TopicAndPartition;
+import kafka.message.MessageAndMetadata;
+import kafka.serializer.StringDecoder;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.Durations;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+
+import org.junit.Test;
+import org.junit.After;
+import org.junit.Before;
+
+public class JavaDirectKafkaStreamSuite implements Serializable {
+  private transient JavaStreamingContext ssc = null;
+  private transient Random random = new Random();
+  private transient KafkaStreamSuiteBase suiteBase = null;
+
+  @Before
+  public void setUp() {
+      suiteBase = new KafkaStreamSuiteBase() { };
+      suiteBase.setupKafka();
+      System.clearProperty("spark.driver.port");
+      SparkConf sparkConf = new SparkConf()
+              .setMaster("local[4]").setAppName(this.getClass().getSimpleName());
+      ssc = new JavaStreamingContext(sparkConf, Durations.milliseconds(200));
+  }
+
+  @After
+  public void tearDown() {
+      ssc.stop();
+      ssc = null;
+      System.clearProperty("spark.driver.port");
+      suiteBase.tearDownKafka();
+  }
+
+  @Test
+  public void testKafkaStream() throws InterruptedException {
+    String topic1 = "topic1";
+    String topic2 = "topic2";
+
+    String[] topic1data = createTopicAndSendData(topic1);
+    String[] topic2data = createTopicAndSendData(topic2);
+
+    HashSet<String> sent = new HashSet<String>();
+    sent.addAll(Arrays.asList(topic1data));
+    sent.addAll(Arrays.asList(topic2data));
+
+    HashMap<String, String> kafkaParams = new HashMap<String, String>();
+    kafkaParams.put("metadata.broker.list", suiteBase.brokerAddress());
+    kafkaParams.put("auto.offset.reset", "smallest");
+
+    JavaDStream<String> stream1 = KafkaUtils.createDirectStream(
+        ssc,
+        String.class,
+        String.class,
+        StringDecoder.class,
+        StringDecoder.class,
+        kafkaParams,
+        topicToSet(topic1)
+    ).map(
+        new Function<Tuple2<String, String>, String>() {
+          @Override
+          public String call(scala.Tuple2<String, String> kv) throws Exception {
+            return kv._2();
+          }
+        }
+    );
+
+    JavaDStream<String> stream2 = KafkaUtils.createDirectStream(
+        ssc,
+        String.class,
+        String.class,
+        StringDecoder.class,
+        StringDecoder.class,
+        String.class,
+        kafkaParams,
+        topicOffsetToMap(topic2, (long) 0),
+        new Function<MessageAndMetadata<String, String>, String>() {
+          @Override
+          public String call(MessageAndMetadata<String, String> msgAndMd) throws Exception {
+            return msgAndMd.message();
+          }
+        }
+    );
+    JavaDStream<String> unifiedStream = stream1.union(stream2);
+
+    final HashSet<String> result = new HashSet<String>();
+    unifiedStream.foreachRDD(
+        new Function<JavaRDD<String>, Void>() {
+          @Override
+          public Void call(org.apache.spark.api.java.JavaRDD<String> rdd) throws Exception {
+            result.addAll(rdd.collect());
+            return null;
+          }
+        }
+    );
+    ssc.start();
+    long startTime = System.currentTimeMillis();
+    boolean matches = false;
+    while (!matches && System.currentTimeMillis() - startTime < 20000) {
+      matches = sent.size() == result.size();
+      Thread.sleep(50);
+    }
+    Assert.assertEquals(sent, result);
+    ssc.stop();
+  }
+
+  private HashSet<String> topicToSet(String topic) {
+    HashSet<String> topicSet = new HashSet<String>();
+    topicSet.add(topic);
+    return topicSet;
+  }
+
+  private HashMap<TopicAndPartition, Long> topicOffsetToMap(String topic, Long offsetToStart) {
+    HashMap<TopicAndPartition, Long> topicMap = new HashMap<TopicAndPartition, Long>();
+    topicMap.put(new TopicAndPartition(topic, 0), offsetToStart);
+    return topicMap;
+  }
+
+  private  String[] createTopicAndSendData(String topic) {
+    String[] data = { topic + "-1", topic + "-2", topic + "-3"};
+    suiteBase.createTopic(topic);
+    suiteBase.sendMessages(topic, data);
+    return data;
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/c1513463/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
----------------------------------------------------------------------
diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
index 6e1abf3..208cc51 100644
--- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
+++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
@@ -79,9 +79,10 @@ public class JavaKafkaStreamSuite implements Serializable {
 
     suiteBase.createTopic(topic);
     HashMap<String, Object> tmp = new HashMap<String, Object>(sent);
-    suiteBase.produceAndSendMessage(topic,
+    suiteBase.sendMessages(topic,
         JavaConverters.mapAsScalaMapConverter(tmp).asScala().toMap(
-            Predef.<Tuple2<String, Object>>conforms()));
+            Predef.<Tuple2<String, Object>>conforms())
+    );
 
     HashMap<String, String> kafkaParams = new HashMap<String, String>();
     kafkaParams.put("zookeeper.connect", suiteBase.zkAddress());

http://git-wip-us.apache.org/repos/asf/spark/blob/c1513463/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
new file mode 100644
index 0000000..b25c212
--- /dev/null
+++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import java.io.File
+
+import scala.collection.mutable
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
+import kafka.serializer.StringDecoder
+import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
+import org.scalatest.concurrent.{Eventually, Timeouts}
+
+import org.apache.spark.{SparkContext, SparkConf}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.streaming.{Milliseconds, StreamingContext, Time}
+import org.apache.spark.streaming.dstream.{DStream, InputDStream}
+import org.apache.spark.util.Utils
+import kafka.common.TopicAndPartition
+import kafka.message.MessageAndMetadata
+
+class DirectKafkaStreamSuite extends KafkaStreamSuiteBase
+  with BeforeAndAfter with BeforeAndAfterAll with Eventually {
+  val sparkConf = new SparkConf()
+    .setMaster("local[4]")
+    .setAppName(this.getClass.getSimpleName)
+
+  var sc: SparkContext = _
+  var ssc: StreamingContext = _
+  var testDir: File = _
+
+  override def beforeAll {
+    setupKafka()
+  }
+
+  override def afterAll {
+    tearDownKafka()
+  }
+
+  after {
+    if (ssc != null) {
+      ssc.stop()
+      sc = null
+    }
+    if (sc != null) {
+      sc.stop()
+    }
+    if (testDir != null) {
+      Utils.deleteRecursively(testDir)
+    }
+  }
+
+
+  test("basic stream receiving with multiple topics and smallest starting offset") {
+    val topics = Set("basic1", "basic2", "basic3")
+    val data = Map("a" -> 7, "b" -> 9)
+    topics.foreach { t =>
+      createTopic(t)
+      sendMessages(t, data)
+    }
+    val kafkaParams = Map(
+      "metadata.broker.list" -> s"$brokerAddress",
+      "auto.offset.reset" -> "smallest"
+    )
+
+    ssc = new StreamingContext(sparkConf, Milliseconds(200))
+    val stream = withClue("Error creating direct stream") {
+      KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
+        ssc, kafkaParams, topics)
+    }
+    var total = 0L
+
+    stream.foreachRDD { rdd =>
+    // Get the offset ranges in the RDD
+      val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
+      val collected = rdd.mapPartitionsWithIndex { (i, iter) =>
+      // For each partition, get size of the range in the partition,
+      // and the number of items in the partition
+        val off = offsets(i)
+        val all = iter.toSeq
+        val partSize = all.size
+        val rangeSize = off.untilOffset - off.fromOffset
+        Iterator((partSize, rangeSize))
+      }.collect
+
+      // Verify whether number of elements in each partition
+      // matches with the corresponding offset range
+      collected.foreach { case (partSize, rangeSize) =>
+        assert(partSize === rangeSize, "offset ranges are wrong")
+      }
+      total += collected.size  // Add up all the collected items
+    }
+    ssc.start()
+    eventually(timeout(20000.milliseconds), interval(200.milliseconds)) {
+      assert(total === data.values.sum * topics.size, "didn't get all messages")
+    }
+    ssc.stop()
+  }
+
+  test("receiving from largest starting offset") {
+    val topic = "largest"
+    val topicPartition = TopicAndPartition(topic, 0)
+    val data = Map("a" -> 10)
+    createTopic(topic)
+    val kafkaParams = Map(
+      "metadata.broker.list" -> s"$brokerAddress",
+      "auto.offset.reset" -> "largest"
+    )
+    val kc = new KafkaCluster(kafkaParams)
+    def getLatestOffset(): Long = {
+      kc.getLatestLeaderOffsets(Set(topicPartition)).right.get(topicPartition).offset
+    }
+
+    // Send some initial messages before starting context
+    sendMessages(topic, data)
+    eventually(timeout(10 seconds), interval(20 milliseconds)) {
+      assert(getLatestOffset() > 3)
+    }
+    val offsetBeforeStart = getLatestOffset()
+
+    // Setup context and kafka stream with largest offset
+    ssc = new StreamingContext(sparkConf, Milliseconds(200))
+    val stream = withClue("Error creating direct stream") {
+      KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
+        ssc, kafkaParams, Set(topic))
+    }
+    assert(
+      stream.asInstanceOf[DirectKafkaInputDStream[_, _, _, _, _]]
+        .fromOffsets(topicPartition) >= offsetBeforeStart,
+      "Start offset not from latest"
+    )
+
+    val collectedData = new mutable.ArrayBuffer[String]()
+    stream.map { _._2 }.foreachRDD { rdd => collectedData ++= rdd.collect() }
+    ssc.start()
+    val newData = Map("b" -> 10)
+    sendMessages(topic, newData)
+    eventually(timeout(10 seconds), interval(50 milliseconds)) {
+      collectedData.contains("b")
+    }
+    assert(!collectedData.contains("a"))
+  }
+
+
+  test("creating stream by offset") {
+    val topic = "offset"
+    val topicPartition = TopicAndPartition(topic, 0)
+    val data = Map("a" -> 10)
+    createTopic(topic)
+    val kafkaParams = Map(
+      "metadata.broker.list" -> s"$brokerAddress",
+      "auto.offset.reset" -> "largest"
+    )
+    val kc = new KafkaCluster(kafkaParams)
+    def getLatestOffset(): Long = {
+      kc.getLatestLeaderOffsets(Set(topicPartition)).right.get(topicPartition).offset
+    }
+
+    // Send some initial messages before starting context
+    sendMessages(topic, data)
+    eventually(timeout(10 seconds), interval(20 milliseconds)) {
+      assert(getLatestOffset() >= 10)
+    }
+    val offsetBeforeStart = getLatestOffset()
+
+    // Setup context and kafka stream with largest offset
+    ssc = new StreamingContext(sparkConf, Milliseconds(200))
+    val stream = withClue("Error creating direct stream") {
+      KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, String](
+        ssc, kafkaParams, Map(topicPartition -> 11L),
+        (m: MessageAndMetadata[String, String]) => m.message())
+    }
+    assert(
+      stream.asInstanceOf[DirectKafkaInputDStream[_, _, _, _, _]]
+        .fromOffsets(topicPartition) >= offsetBeforeStart,
+      "Start offset not from latest"
+    )
+
+    val collectedData = new mutable.ArrayBuffer[String]()
+    stream.foreachRDD { rdd => collectedData ++= rdd.collect() }
+    ssc.start()
+    val newData = Map("b" -> 10)
+    sendMessages(topic, newData)
+    eventually(timeout(10 seconds), interval(50 milliseconds)) {
+      collectedData.contains("b")
+    }
+    assert(!collectedData.contains("a"))
+  }
+
+  // Test to verify the offset ranges can be recovered from the checkpoints
+  test("offset recovery") {
+    val topic = "recovery"
+    createTopic(topic)
+    testDir = Utils.createTempDir()
+
+    val kafkaParams = Map(
+      "metadata.broker.list" -> s"$brokerAddress",
+      "auto.offset.reset" -> "smallest"
+    )
+
+    // Send data to Kafka and wait for it to be received
+    def sendDataAndWaitForReceive(data: Seq[Int]) {
+      val strings = data.map { _.toString}
+      sendMessages(topic, strings.map { _ -> 1}.toMap)
+      eventually(timeout(10 seconds), interval(50 milliseconds)) {
+        assert(strings.forall { DirectKafkaStreamSuite.collectedData.contains })
+      }
+    }
+
+    // Setup the streaming context
+    ssc = new StreamingContext(sparkConf, Milliseconds(100))
+    val kafkaStream = withClue("Error creating direct stream") {
+      KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
+        ssc, kafkaParams, Set(topic))
+    }
+    val keyedStream = kafkaStream.map { v => "key" -> v._2.toInt }
+    val stateStream = keyedStream.updateStateByKey { (values: Seq[Int], state: Option[Int]) =>
+      Some(values.sum + state.getOrElse(0))
+    }
+    ssc.checkpoint(testDir.getAbsolutePath)
+
+    // This is to collect the raw data received from Kafka
+    kafkaStream.foreachRDD { (rdd: RDD[(String, String)], time: Time) =>
+      val data = rdd.map { _._2 }.collect()
+      DirectKafkaStreamSuite.collectedData.appendAll(data)
+    }
+
+    // This is ensure all the data is eventually receiving only once
+    stateStream.foreachRDD { (rdd: RDD[(String, Int)]) =>
+      rdd.collect().headOption.foreach { x => DirectKafkaStreamSuite.total = x._2 }
+    }
+    ssc.start()
+
+    // Send some data and wait for them to be received
+    for (i <- (1 to 10).grouped(4)) {
+      sendDataAndWaitForReceive(i)
+    }
+
+    // Verify that offset ranges were generated
+    val offsetRangesBeforeStop = getOffsetRanges(kafkaStream)
+    assert(offsetRangesBeforeStop.size >= 1, "No offset ranges generated")
+    assert(
+      offsetRangesBeforeStop.head._2.forall { _.fromOffset === 0 },
+      "starting offset not zero"
+    )
+    ssc.stop()
+    logInfo("====== RESTARTING ========")
+
+    // Recover context from checkpoints
+    ssc = new StreamingContext(testDir.getAbsolutePath)
+    val recoveredStream = ssc.graph.getInputStreams().head.asInstanceOf[DStream[(String, String)]]
+
+    // Verify offset ranges have been recovered
+    val recoveredOffsetRanges = getOffsetRanges(recoveredStream)
+    assert(recoveredOffsetRanges.size > 0, "No offset ranges recovered")
+    val earlierOffsetRangesAsSets = offsetRangesBeforeStop.map { x => (x._1, x._2.toSet) }
+    assert(
+      recoveredOffsetRanges.forall { or =>
+        earlierOffsetRangesAsSets.contains((or._1, or._2.toSet))
+      },
+      "Recovered ranges are not the same as the ones generated"
+    )
+
+    // Restart context, give more data and verify the total at the end
+    // If the total is write that means each records has been received only once
+    ssc.start()
+    sendDataAndWaitForReceive(11 to 20)
+    eventually(timeout(10 seconds), interval(50 milliseconds)) {
+      assert(DirectKafkaStreamSuite.total === (1 to 20).sum)
+    }
+    ssc.stop()
+  }
+
+  /** Get the generated offset ranges from the DirectKafkaStream */
+  private def getOffsetRanges[K, V](
+      kafkaStream: DStream[(K, V)]): Seq[(Time, Array[OffsetRange])] = {
+    kafkaStream.generatedRDDs.mapValues { rdd =>
+      rdd.asInstanceOf[KafkaRDD[K, V, _, _, (K, V)]].offsetRanges
+    }.toSeq.sortBy { _._1 }
+  }
+}
+
+object DirectKafkaStreamSuite {
+  val collectedData = new mutable.ArrayBuffer[String]()
+  var total = -1L
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/c1513463/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala
index e57c8f6..fc9275b 100644
--- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala
+++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala
@@ -19,33 +19,29 @@ package org.apache.spark.streaming.kafka
 
 import scala.util.Random
 
-import org.scalatest.BeforeAndAfter
 import kafka.common.TopicAndPartition
+import org.scalatest.BeforeAndAfterAll
 
-class KafkaClusterSuite extends KafkaStreamSuiteBase with BeforeAndAfter {
-  val brokerHost = "localhost"
-
-  val kafkaParams = Map("metadata.broker.list" -> s"$brokerHost:$brokerPort")
-
-  val kc = new KafkaCluster(kafkaParams)
-
+class KafkaClusterSuite extends KafkaStreamSuiteBase with BeforeAndAfterAll {
   val topic = "kcsuitetopic" + Random.nextInt(10000)
-
   val topicAndPartition = TopicAndPartition(topic, 0)
+  var kc: KafkaCluster = null
 
-  before {
+  override def beforeAll() {
     setupKafka()
     createTopic(topic)
-    produceAndSendMessage(topic, Map("a" -> 1))
+    sendMessages(topic, Map("a" -> 1))
+    kc = new KafkaCluster(Map("metadata.broker.list" -> s"$brokerAddress"))
   }
 
-  after {
+  override def afterAll() {
     tearDownKafka()
   }
 
   test("metadata apis") {
-    val leader = kc.findLeaders(Set(topicAndPartition)).right.get
-    assert(leader(topicAndPartition) === (brokerHost, brokerPort), "didn't get leader")
+    val leader = kc.findLeaders(Set(topicAndPartition)).right.get(topicAndPartition)
+    val leaderAddress = s"${leader._1}:${leader._2}"
+    assert(leaderAddress === brokerAddress, "didn't get leader")
 
     val parts = kc.getPartitions(Set(topic)).right.get
     assert(parts(topicAndPartition), "didn't get partitions")

http://git-wip-us.apache.org/repos/asf/spark/blob/c1513463/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaDirectStreamSuite.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaDirectStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaDirectStreamSuite.scala
deleted file mode 100644
index 0891ce3..0000000
--- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaDirectStreamSuite.scala
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.kafka
-
-import scala.util.Random
-import scala.concurrent.duration._
-
-import org.scalatest.BeforeAndAfter
-import org.scalatest.concurrent.Eventually
-
-import kafka.serializer.StringDecoder
-
-import org.apache.spark.SparkConf
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.{Milliseconds, StreamingContext}
-
-class KafkaDirectStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter with Eventually {
-  val sparkConf = new SparkConf()
-    .setMaster("local[4]")
-    .setAppName(this.getClass.getSimpleName)
-
-  val brokerHost = "localhost"
-
-  val kafkaParams = Map(
-    "metadata.broker.list" -> s"$brokerHost:$brokerPort",
-    "auto.offset.reset" -> "smallest"
-  )
-
-  var ssc: StreamingContext = _
-
-  before {
-    setupKafka()
-
-    ssc = new StreamingContext(sparkConf, Milliseconds(500))
-  }
-
-  after {
-    if (ssc != null) {
-      ssc.stop()
-    }
-    tearDownKafka()
-  }
-
-  test("multi topic stream") {
-    val topics = Set("newA", "newB")
-    val data = Map("a" -> 7, "b" -> 9)
-    topics.foreach { t =>
-      createTopic(t)
-      produceAndSendMessage(t, data)
-    }
-    val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
-      ssc, kafkaParams, topics)
-    var total = 0L;
-
-    stream.foreachRDD { rdd =>
-      val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
-      val collected = rdd.mapPartitionsWithIndex { (i, iter) =>
-        val off = offsets(i)
-        val all = iter.toSeq
-        val partSize = all.size
-        val rangeSize = off.untilOffset - off.fromOffset
-        all.map { _ =>
-          (partSize, rangeSize)
-        }.toIterator
-      }.collect
-      collected.foreach { case (partSize, rangeSize) =>
-          assert(partSize === rangeSize, "offset ranges are wrong")
-      }
-      total += collected.size
-    }
-    ssc.start()
-    eventually(timeout(20000.milliseconds), interval(200.milliseconds)) {
-      assert(total === data.values.sum * topics.size, "didn't get all messages")
-    }
-    ssc.stop()
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/c1513463/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala
index 9b9e3f5..6774db8 100644
--- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala
+++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala
@@ -46,9 +46,9 @@ class KafkaRDDSuite extends KafkaStreamSuiteBase with BeforeAndAfter {
     val topic = "topic1"
     val sent = Map("a" -> 5, "b" -> 3, "c" -> 10)
     createTopic(topic)
-    produceAndSendMessage(topic, sent)
+    sendMessages(topic, sent)
 
-    val kafkaParams = Map("metadata.broker.list" -> s"localhost:$brokerPort",
+    val kafkaParams = Map("metadata.broker.list" -> brokerAddress,
       "group.id" -> s"test-consumer-${Random.nextInt(10000)}")
 
     val kc = new KafkaCluster(kafkaParams)
@@ -65,14 +65,14 @@ class KafkaRDDSuite extends KafkaStreamSuiteBase with BeforeAndAfter {
 
     val rdd2 = getRdd(kc, Set(topic))
     val sent2 = Map("d" -> 1)
-    produceAndSendMessage(topic, sent2)
+    sendMessages(topic, sent2)
     // this is the "0 messages" case
     // make sure we dont get anything, since messages were sent after rdd was defined
     assert(rdd2.isDefined)
     assert(rdd2.get.count === 0)
 
     val rdd3 = getRdd(kc, Set(topic))
-    produceAndSendMessage(topic, Map("extra" -> 22))
+    sendMessages(topic, Map("extra" -> 22))
     // this is the "exactly 1 message" case
     // make sure we get exactly one message, despite there being lots more available
     assert(rdd3.isDefined)

http://git-wip-us.apache.org/repos/asf/spark/blob/c1513463/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
index f207dc6..e4966ee 100644
--- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
+++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
@@ -48,30 +48,41 @@ import org.apache.spark.util.Utils
  */
 abstract class KafkaStreamSuiteBase extends FunSuite with Eventually with Logging {
 
-  var zkAddress: String = _
-  var zkClient: ZkClient = _
-
   private val zkHost = "localhost"
+  private var zkPort: Int = 0
   private val zkConnectionTimeout = 6000
   private val zkSessionTimeout = 6000
   private var zookeeper: EmbeddedZookeeper = _
-  private var zkPort: Int = 0
-  protected var brokerPort = 9092
+  private val brokerHost = "localhost"
+  private var brokerPort = 9092
   private var brokerConf: KafkaConfig = _
   private var server: KafkaServer = _
   private var producer: Producer[String, String] = _
+  private var zkReady = false
+  private var brokerReady = false
+
+  protected var zkClient: ZkClient = _
+
+  def zkAddress: String = {
+    assert(zkReady, "Zookeeper not setup yet or already torn down, cannot get zookeeper address")
+    s"$zkHost:$zkPort"
+  }
+
+  def brokerAddress: String = {
+    assert(brokerReady, "Kafka not setup yet or already torn down, cannot get broker address")
+    s"$brokerHost:$brokerPort"
+  }
 
   def setupKafka() {
     // Zookeeper server startup
     zookeeper = new EmbeddedZookeeper(s"$zkHost:$zkPort")
     // Get the actual zookeeper binding port
     zkPort = zookeeper.actualPort
-    zkAddress = s"$zkHost:$zkPort"
-    logInfo("==================== 0 ====================")
+    zkReady = true
+    logInfo("==================== Zookeeper Started ====================")
 
-    zkClient = new ZkClient(zkAddress, zkSessionTimeout, zkConnectionTimeout,
-      ZKStringSerializer)
-    logInfo("==================== 1 ====================")
+    zkClient = new ZkClient(zkAddress, zkSessionTimeout, zkConnectionTimeout, ZKStringSerializer)
+    logInfo("==================== Zookeeper Client Created ====================")
 
     // Kafka broker startup
     var bindSuccess: Boolean = false
@@ -80,9 +91,8 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Eventually with Loggin
         val brokerProps = getBrokerConfig()
         brokerConf = new KafkaConfig(brokerProps)
         server = new KafkaServer(brokerConf)
-        logInfo("==================== 2 ====================")
         server.startup()
-        logInfo("==================== 3 ====================")
+        logInfo("==================== Kafka Broker Started ====================")
         bindSuccess = true
       } catch {
         case e: KafkaException =>
@@ -94,10 +104,13 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Eventually with Loggin
     }
 
     Thread.sleep(2000)
-    logInfo("==================== 4 ====================")
+    logInfo("==================== Kafka + Zookeeper Ready ====================")
+    brokerReady = true
   }
 
   def tearDownKafka() {
+    brokerReady = false
+    zkReady = false
     if (producer != null) {
       producer.close()
       producer = null
@@ -121,26 +134,23 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Eventually with Loggin
     }
   }
 
-  private def createTestMessage(topic: String, sent: Map[String, Int])
-    : Seq[KeyedMessage[String, String]] = {
-    val messages = for ((s, freq) <- sent; i <- 0 until freq) yield {
-      new KeyedMessage[String, String](topic, s)
-    }
-    messages.toSeq
-  }
-
   def createTopic(topic: String) {
     AdminUtils.createTopic(zkClient, topic, 1, 1)
-    logInfo("==================== 5 ====================")
     // wait until metadata is propagated
     waitUntilMetadataIsPropagated(topic, 0)
+    logInfo(s"==================== Topic $topic Created ====================")
   }
 
-  def produceAndSendMessage(topic: String, sent: Map[String, Int]) {
+  def sendMessages(topic: String, messageToFreq: Map[String, Int]) {
+    val messages = messageToFreq.flatMap { case (s, freq) => Seq.fill(freq)(s) }.toArray
+    sendMessages(topic, messages)
+  }
+  
+  def sendMessages(topic: String, messages: Array[String]) {
     producer = new Producer[String, String](new ProducerConfig(getProducerConfig()))
-    producer.send(createTestMessage(topic, sent): _*)
+    producer.send(messages.map { new KeyedMessage[String, String](topic, _ ) }: _*)
     producer.close()
-    logInfo("==================== 6 ====================")
+    logInfo(s"==================== Sent Messages: ${messages.mkString(", ")} ====================")
   }
 
   private def getBrokerConfig(): Properties = {
@@ -218,7 +228,7 @@ class KafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter {
     val topic = "topic1"
     val sent = Map("a" -> 5, "b" -> 3, "c" -> 10)
     createTopic(topic)
-    produceAndSendMessage(topic, sent)
+    sendMessages(topic, sent)
 
     val kafkaParams = Map("zookeeper.connect" -> zkAddress,
       "group.id" -> s"test-consumer-${Random.nextInt(10000)}",

http://git-wip-us.apache.org/repos/asf/spark/blob/c1513463/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala
index 64ccc92..fc53c23 100644
--- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala
+++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala
@@ -79,7 +79,7 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter
   test("Reliable Kafka input stream with single topic") {
     var topic = "test-topic"
     createTopic(topic)
-    produceAndSendMessage(topic, data)
+    sendMessages(topic, data)
 
     // Verify whether the offset of this group/topic/partition is 0 before starting.
     assert(getCommitOffset(groupId, topic, 0) === None)
@@ -111,7 +111,7 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter
     val topics = Map("topic1" -> 1, "topic2" -> 1, "topic3" -> 1)
     topics.foreach { case (t, _) =>
       createTopic(t)
-      produceAndSendMessage(t, data)
+      sendMessages(t, data)
     }
 
     // Before started, verify all the group/topic/partition offsets are 0.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org