You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by tdas <gi...@git.apache.org> on 2015/02/05 05:07:55 UTC

[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...

GitHub user tdas opened a pull request:

    https://github.com/apache/spark/pull/4384

    [SPARK-4964][Streaming][Kafka] More updates to Exactly-once Kafka stream

    Changes
    - Added example
    - Added a critical unit test that verifies that offset ranges can be recovered through checkpoints

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tdas/spark new-kafka-fixes

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/4384.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4384
    
----
commit 6a91cab3172701e1c286dbcdadee825a33230913
Author: Tathagata Das <ta...@gmail.com>
Date:   2015-02-05T03:56:46Z

    Added example

commit 49867846e72329c849002706441d8165825f4a6b
Author: Tathagata Das <ta...@gmail.com>
Date:   2015-02-05T04:01:17Z

    Added unit test to kafka offset recovery

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4384#discussion_r24216559
  
    --- Diff: examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala ---
    @@ -0,0 +1,60 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.examples.streaming
    +
    +import kafka.serializer.StringDecoder
    +
    +import org.apache.spark.streaming._
    +import org.apache.spark.streaming.kafka._
    +import org.apache.spark.SparkConf
    +
    +/**
    + * Consumes messages from one or more topics in Kafka and does wordcount.
    + * Usage: DirectKafkaWordCount <brokers> <topics>
    + *   <brokers> is a list of one or more zookeeper servers that make quorum
    --- End diff --
    
    Right. Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/4384#issuecomment-73653301
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/27178/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/4384#issuecomment-72992469
  
    @koeninger Can you please take a look. 
    @pwendell (Optional) Take a look if you are interested.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/4384#issuecomment-73164892
  
      [Test build #26878 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26878/consoleFull) for   PR 4384 at commit [`50f2b56`](https://github.com/apache/spark/commit/50f2b56f57b00845d006361049dd2c4bac89957c).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/4384#issuecomment-73335178
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26966/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/4384#issuecomment-73326747
  
      [Test build #26966 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26966/consoleFull) for   PR 4384 at commit [`26df23c`](https://github.com/apache/spark/commit/26df23c5578e52dc594557cfaf2170b1e0d49169).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/4384


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/4384#issuecomment-73170936
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26878/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/4384#issuecomment-73182394
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26891/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...

Posted by koeninger <gi...@git.apache.org>.
Github user koeninger commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4384#discussion_r24142354
  
    --- Diff: examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala ---
    @@ -0,0 +1,60 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.examples.streaming
    +
    +import kafka.serializer.StringDecoder
    +
    +import org.apache.spark.streaming._
    +import org.apache.spark.streaming.kafka._
    +import org.apache.spark.SparkConf
    +
    +/**
    + * Consumes messages from one or more topics in Kafka and does wordcount.
    + * Usage: DirectKafkaWordCount <brokers> <topics>
    + *   <brokers> is a list of one or more zookeeper servers that make quorum
    --- End diff --
    
    These are kafka servers, not zookeeper servers


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/4384#issuecomment-72994801
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26822/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4384#discussion_r24386248
  
    --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala ---
    @@ -179,121 +182,194 @@ object KafkaUtils {
           errs => throw new SparkException(errs.mkString("\n")),
           ok => ok
         )
    -    new KafkaRDD[K, V, U, T, (K, V)](sc, kafkaParams, offsetRanges, leaders, messageHandler)
    +    new KafkaRDD[K, V, KD, VD, (K, V)](sc, kafkaParams, offsetRanges, leaders, messageHandler)
       }
     
    -  /** A batch-oriented interface for consuming from Kafka.
    -   * Starting and ending offsets are specified in advance,
    -   * so that you can control exactly-once semantics.
    +  /**
    +   * :: Experimental ::
    +   * Create a RDD from Kafka using offset ranges for each topic and partition. This allows you
    +   * specify the Kafka leader to connect to (to optimize fetching) and access the message as well
    +   * as the metadata.
    +   *
        * @param sc SparkContext object
        * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
    -   * configuration parameters</a>.
    -   *   Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
    -   *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
    +   *    configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers"
    +   *    to be set with Kafka broker(s) (NOT zookeeper servers) specified in
    +   *    host1:port1,host2:port2 form.
        * @param offsetRanges Each OffsetRange in the batch corresponds to a
        *   range of offsets for a given Kafka topic/partition
        * @param leaders Kafka leaders for each offset range in batch
    -   * @param messageHandler function for translating each message into the desired type
    +   * @param messageHandler Function for translating each message and metadata into the desired type
        */
       @Experimental
       def createRDD[
         K: ClassTag,
         V: ClassTag,
    -    U <: Decoder[_]: ClassTag,
    -    T <: Decoder[_]: ClassTag,
    -    R: ClassTag] (
    +    KD <: Decoder[K]: ClassTag,
    +    VD <: Decoder[V]: ClassTag,
    +    R: ClassTag](
           sc: SparkContext,
           kafkaParams: Map[String, String],
           offsetRanges: Array[OffsetRange],
           leaders: Array[Leader],
           messageHandler: MessageAndMetadata[K, V] => R
    -  ): RDD[R] = {
    -
    +    ): RDD[R] = {
         val leaderMap = leaders
           .map(l => TopicAndPartition(l.topic, l.partition) -> (l.host, l.port))
           .toMap
    -    new KafkaRDD[K, V, U, T, R](sc, kafkaParams, offsetRanges, leaderMap, messageHandler)
    +    new KafkaRDD[K, V, KD, VD, R](sc, kafkaParams, offsetRanges, leaderMap, messageHandler)
       }
     
    +
       /**
    -   * This stream can guarantee that each message from Kafka is included in transformations
    -   * (as opposed to output actions) exactly once, even in most failure situations.
    +   * Create a RDD from Kafka using offset ranges for each topic and partition.
        *
    -   * Points to note:
    -   *
    -   * Failure Recovery - You must checkpoint this stream, or save offsets yourself and provide them
    -   * as the fromOffsets parameter on restart.
    -   * Kafka must have sufficient log retention to obtain messages after failure.
    -   *
    -   * Getting offsets from the stream - see programming guide
    +   * @param jsc JavaSparkContext object
    +   * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
    +   *    configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers"
    +   *    to be set with Kafka broker(s) (NOT zookeeper servers) specified in
    +   *    host1:port1,host2:port2 form.
    +   * @param offsetRanges Each OffsetRange in the batch corresponds to a
    +   *   range of offsets for a given Kafka topic/partition
    +   */
    +  @Experimental
    +  def createRDD[K, V, KD <: Decoder[K], VD <: Decoder[V]](
    +      jsc: JavaSparkContext,
    +      keyClass: Class[K],
    +      valueClass: Class[V],
    +      keyDecoderClass: Class[KD],
    +      valueDecoderClass: Class[VD],
    +      kafkaParams: JMap[String, String],
    +      offsetRanges: Array[OffsetRange]
    +    ): JavaPairRDD[K, V] = {
    +    implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
    +    implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
    +    implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
    +    implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass)
    +    new JavaPairRDD(createRDD[K, V, KD, VD](
    +      jsc.sc, Map(kafkaParams.toSeq: _*), offsetRanges))
    +  }
    +
    +  /**
    +   * :: Experimental ::
    +   * Create a RDD from Kafka using offset ranges for each topic and partition. This allows you
    +   * specify the Kafka leader to connect to (to optimize fetching) and access the message as well
    +   * as the metadata.
        *
    -.  * Zookeeper - This does not use Zookeeper to store offsets.  For interop with Kafka monitors
    -   * that depend on Zookeeper, you must store offsets in ZK yourself.
    +   * @param jsc JavaSparkContext object
    +   * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
    +   *    configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers"
    +   *    to be set with Kafka broker(s) (NOT zookeeper servers) specified in
    +   *    host1:port1,host2:port2 form.
    +   * @param offsetRanges Each OffsetRange in the batch corresponds to a
    +   *   range of offsets for a given Kafka topic/partition
    +   * @param leaders Kafka leaders for each offset range in batch
    +   * @param messageHandler Function for translating each message and metadata into the desired type
    +   */
    +  @Experimental
    +  def createRDD[K, V, KD <: Decoder[K], VD <: Decoder[V], R](
    +      jsc: JavaSparkContext,
    +      keyClass: Class[K],
    +      valueClass: Class[V],
    +      keyDecoderClass: Class[KD],
    +      valueDecoderClass: Class[VD],
    +      recordClass: Class[R],
    +      kafkaParams: JMap[String, String],
    +      offsetRanges: Array[OffsetRange],
    +      leaders: Array[Leader],
    +      messageHandler: JFunction[MessageAndMetadata[K, V], R]
    +    ): JavaRDD[R] = {
    +    implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
    +    implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
    +    implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
    +    implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass)
    +    implicit val recordCmt: ClassTag[R] = ClassTag(recordClass)
    +    createRDD[K, V, KD, VD, R](
    +      jsc.sc, Map(kafkaParams.toSeq: _*), offsetRanges, leaders, messageHandler.call _)
    +  }
    +
    +  /**
    +   * :: Experimental ::
    +   * Create an input stream that pulls messages from a Kafka Broker. This stream can guarantee
    --- End diff --
    
    Does the other one pull messages from a Kafka Broker as well? It might be good to focus on something that is distinctive.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4384#discussion_r24267609
  
    --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala ---
    @@ -179,121 +182,190 @@ object KafkaUtils {
           errs => throw new SparkException(errs.mkString("\n")),
           ok => ok
         )
    -    new KafkaRDD[K, V, U, T, (K, V)](sc, kafkaParams, offsetRanges, leaders, messageHandler)
    +    new KafkaRDD[K, V, KD, VD, (K, V)](sc, kafkaParams, offsetRanges, leaders, messageHandler)
       }
     
    -  /** A batch-oriented interface for consuming from Kafka.
    -   * Starting and ending offsets are specified in advance,
    -   * so that you can control exactly-once semantics.
    +  /**
    +   * :: Experimental ::
    +   * Create a RDD from Kafka using offset ranges for each topic and partition. This allows you
    +   * specify the Kafka leader to connect to (to optimize fetching) and access the message as well
    +   * as the metadata.
    +   *
        * @param sc SparkContext object
        * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
    -   * configuration parameters</a>.
    -   *   Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
    -   *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
    +   *    configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers"
    +   *    to be set with Kafka broker(s) (NOT zookeeper servers) specified in
    +   *    host1:port1,host2:port2 form.
        * @param offsetRanges Each OffsetRange in the batch corresponds to a
        *   range of offsets for a given Kafka topic/partition
        * @param leaders Kafka leaders for each offset range in batch
    -   * @param messageHandler function for translating each message into the desired type
    +   * @param messageHandler function for translating each message and metadata into the desired type
        */
       @Experimental
       def createRDD[
         K: ClassTag,
         V: ClassTag,
    -    U <: Decoder[_]: ClassTag,
    -    T <: Decoder[_]: ClassTag,
    -    R: ClassTag] (
    +    KD <: Decoder[K]: ClassTag,
    +    VD <: Decoder[V]: ClassTag,
    +    R: ClassTag](
           sc: SparkContext,
           kafkaParams: Map[String, String],
           offsetRanges: Array[OffsetRange],
           leaders: Array[Leader],
           messageHandler: MessageAndMetadata[K, V] => R
    -  ): RDD[R] = {
    -
    +    ): RDD[R] = {
         val leaderMap = leaders
           .map(l => TopicAndPartition(l.topic, l.partition) -> (l.host, l.port))
           .toMap
    -    new KafkaRDD[K, V, U, T, R](sc, kafkaParams, offsetRanges, leaderMap, messageHandler)
    +    new KafkaRDD[K, V, KD, VD, R](sc, kafkaParams, offsetRanges, leaderMap, messageHandler)
       }
     
    +
       /**
    -   * This stream can guarantee that each message from Kafka is included in transformations
    -   * (as opposed to output actions) exactly once, even in most failure situations.
    +   * Create a RDD from Kafka using offset ranges for each topic and partition.
        *
    -   * Points to note:
    -   *
    -   * Failure Recovery - You must checkpoint this stream, or save offsets yourself and provide them
    -   * as the fromOffsets parameter on restart.
    -   * Kafka must have sufficient log retention to obtain messages after failure.
    -   *
    -   * Getting offsets from the stream - see programming guide
    +   * @param jsc JavaSparkContext object
    +   * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
    +   *    configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers"
    +   *    to be set with Kafka broker(s) (NOT zookeeper servers) specified in
    +   *    host1:port1,host2:port2 form.
    +   * @param offsetRanges Each OffsetRange in the batch corresponds to a
    +   *   range of offsets for a given Kafka topic/partition
    +   */
    +  @Experimental
    +  def createRDD[K, V, KD <: Decoder[K], VD <: Decoder[V]](
    +      jsc: JavaSparkContext,
    +      keyClass: Class[K],
    +      valueClass: Class[V],
    +      keyDecoderClass: Class[KD],
    +      valueDecoderClass: Class[VD],
    +      kafkaParams: JMap[String, String],
    +      offsetRanges: Array[OffsetRange]
    +    ): JavaPairRDD[K, V] = {
    +    implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
    +    implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
    +    implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
    +    implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass)
    +    new JavaPairRDD(createRDD[K, V, KD, VD](
    +      jsc.sc, Map(kafkaParams.toSeq: _*), offsetRanges))
    +  }
    +
    +  /**
    +   * :: Experimental ::
    +   * Create a RDD from Kafka using offset ranges for each topic and partition. This allows you
    +   * specify the Kafka leader to connect to (to optimize fetching) and access the message as well
    +   * as the metadata.
        *
    -.  * Zookeeper - This does not use Zookeeper to store offsets.  For interop with Kafka monitors
    -   * that depend on Zookeeper, you must store offsets in ZK yourself.
    +   * @param jsc JavaSparkContext object
    +   * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
    +   *    configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers"
    +   *    to be set with Kafka broker(s) (NOT zookeeper servers) specified in
    +   *    host1:port1,host2:port2 form.
    +   * @param offsetRanges Each OffsetRange in the batch corresponds to a
    +   *   range of offsets for a given Kafka topic/partition
    +   * @param leaders Kafka leaders for each offset range in batch
    +   * @param messageHandler function for translating each message and metadata into the desired type
    +   */
    +  @Experimental
    +  def createRDD[K, V, KD <: Decoder[K], VD <: Decoder[V], R](
    +      jsc: JavaSparkContext,
    +      keyClass: Class[K],
    +      valueClass: Class[V],
    +      keyDecoderClass: Class[KD],
    +      valueDecoderClass: Class[VD],
    +      recordClass: Class[R],
    +      kafkaParams: JMap[String, String],
    +      offsetRanges: Array[OffsetRange],
    +      leaders: Array[Leader],
    +      messageHandler: JFunction[MessageAndMetadata[K, V], R]
    +    ): JavaRDD[R] = {
    +    implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
    +    implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
    +    implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
    +    implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass)
    +    implicit val recordCmt: ClassTag[R] = ClassTag(recordClass)
    +    createRDD[K, V, KD, VD, R](
    +      jsc.sc, Map(kafkaParams.toSeq: _*), offsetRanges, leaders, messageHandler.call _)
    +  }
    +
    +  /**
    +   * :: Experimental ::
    +   * Create an input stream that pulls messages from a Kafka Broker. This stream can guarantee
    +   * that each message from Kafka is included in transformations exactly once (see points below).
        *
    -   * End-to-end semantics - This does not guarantee that any output operation will push each record
    -   * exactly once. To ensure end-to-end exactly-once semantics (that is, receiving exactly once and
    -   * outputting exactly once), you have to either ensure that the output operation is
    -   * idempotent, or transactionally store offsets with the output. See the programming guide for
    -   * more details.
    +   * Points to note:
    +   *  - No receivers: This stream does not use any receiver. It directly queries Kafka
    +   *  - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked
    +   *    by the stream itself. For interoperability with Kafka monitoring tools that depend on 
    +   *    Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application.
    +   *  - Failure Recovery: To recover from driver failures, you have to enable checkpointing
    +   *    in the [[StreamingContext]]. The information on consumed offset can be
    +   *    recovered from the checkpoint. See the programming guide for details (constraints, etc.).
    +   *  - End-to-end semantics: This stream ensures that every records is effectively received and
    +   *    transformed exactly once, but gives no guarantees on whether the transformed data are
    +   *    outputted exactly once. For end-to-end exactly-once semantics, you have to either ensure
    +   *    that the output operation is idempotent, or use transactions to output records atomically.
    +   *    See the programming guide for more details.
        *
        * @param ssc StreamingContext object
        * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
    -   * configuration parameters</a>.
    -   *   Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
    -   *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
    -   * @param messageHandler function for translating each message into the desired type
    -   * @param fromOffsets per-topic/partition Kafka offsets defining the (inclusive)
    -   *  starting point of the stream
    +   *    configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers"
    +   *    to be set with Kafka broker(s) (NOT zookeeper servers) specified in
    +   *    host1:port1,host2:port2 form.
    +   * @param fromOffsets Per-topic/partition Kafka offsets defining the (inclusive)
    +   *    starting point of the stream
    +   * @param messageHandler Function for translating each raw message into the desired type
    --- End diff --
    
    Good catch. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/4384#issuecomment-73459269
  
      [Test build #27078 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/27078/consoleFull) for   PR 4384 at commit [`83d0402`](https://github.com/apache/spark/commit/83d04025da1cac3d1ec8565015dbe492f17c3b79).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `public final class JavaDirectKafkaWordCount `



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/4384#issuecomment-73175650
  
      [Test build #26888 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26888/consoleFull) for   PR 4384 at commit [`bb65232`](https://github.com/apache/spark/commit/bb65232c008d66c7895e83e9736353881b5d719e).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/4384#issuecomment-72991452
  
      [Test build #26822 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26822/consoleFull) for   PR 4384 at commit [`4986784`](https://github.com/apache/spark/commit/49867846e72329c849002706441d8165825f4a6b).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/4384#issuecomment-73003057
  
      [Test build #586 has finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/586/consoleFull) for   PR 4384 at commit [`4986784`](https://github.com/apache/spark/commit/49867846e72329c849002706441d8165825f4a6b).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/4384#issuecomment-73448717
  
    @pwendell Could you take a look at the scala docs?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/4384#issuecomment-73180459
  
      [Test build #26888 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26888/consoleFull) for   PR 4384 at commit [`bb65232`](https://github.com/apache/spark/commit/bb65232c008d66c7895e83e9736353881b5d719e).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...

Posted by koeninger <gi...@git.apache.org>.
Github user koeninger commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4384#discussion_r24251761
  
    --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala ---
    @@ -36,14 +36,12 @@ import kafka.utils.VerifiableProperties
      * Starting and ending offsets are specified in advance,
      * so that you can control exactly-once semantics.
      * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
    - * configuration parameters</a>.
    - *   Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
    - *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
    - * @param batch Each KafkaRDDPartition in the batch corresponds to a
    - *   range of offsets for a given Kafka topic/partition
    + * configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers" to be set
    + * with Kafka broker(s) specified in host1:port1,host2:port2 form.
    --- End diff --
    
    You removed the "not zookeeper servers" note here, but left it in for the other methods that take kafkaParams


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/4384#issuecomment-72997883
  
      [Test build #586 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/586/consoleFull) for   PR 4384 at commit [`4986784`](https://github.com/apache/spark/commit/49867846e72329c849002706441d8165825f4a6b).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/4384#issuecomment-73180462
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26888/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...

Posted by koeninger <gi...@git.apache.org>.
Github user koeninger commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4384#discussion_r24252680
  
    --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala ---
    @@ -179,121 +182,190 @@ object KafkaUtils {
           errs => throw new SparkException(errs.mkString("\n")),
           ok => ok
         )
    -    new KafkaRDD[K, V, U, T, (K, V)](sc, kafkaParams, offsetRanges, leaders, messageHandler)
    +    new KafkaRDD[K, V, KD, VD, (K, V)](sc, kafkaParams, offsetRanges, leaders, messageHandler)
       }
     
    -  /** A batch-oriented interface for consuming from Kafka.
    -   * Starting and ending offsets are specified in advance,
    -   * so that you can control exactly-once semantics.
    +  /**
    +   * :: Experimental ::
    +   * Create a RDD from Kafka using offset ranges for each topic and partition. This allows you
    +   * specify the Kafka leader to connect to (to optimize fetching) and access the message as well
    +   * as the metadata.
    +   *
        * @param sc SparkContext object
        * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
    -   * configuration parameters</a>.
    -   *   Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
    -   *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
    +   *    configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers"
    +   *    to be set with Kafka broker(s) (NOT zookeeper servers) specified in
    +   *    host1:port1,host2:port2 form.
        * @param offsetRanges Each OffsetRange in the batch corresponds to a
        *   range of offsets for a given Kafka topic/partition
        * @param leaders Kafka leaders for each offset range in batch
    -   * @param messageHandler function for translating each message into the desired type
    +   * @param messageHandler function for translating each message and metadata into the desired type
        */
       @Experimental
       def createRDD[
         K: ClassTag,
         V: ClassTag,
    -    U <: Decoder[_]: ClassTag,
    -    T <: Decoder[_]: ClassTag,
    -    R: ClassTag] (
    +    KD <: Decoder[K]: ClassTag,
    +    VD <: Decoder[V]: ClassTag,
    +    R: ClassTag](
           sc: SparkContext,
           kafkaParams: Map[String, String],
           offsetRanges: Array[OffsetRange],
           leaders: Array[Leader],
           messageHandler: MessageAndMetadata[K, V] => R
    -  ): RDD[R] = {
    -
    +    ): RDD[R] = {
         val leaderMap = leaders
           .map(l => TopicAndPartition(l.topic, l.partition) -> (l.host, l.port))
           .toMap
    -    new KafkaRDD[K, V, U, T, R](sc, kafkaParams, offsetRanges, leaderMap, messageHandler)
    +    new KafkaRDD[K, V, KD, VD, R](sc, kafkaParams, offsetRanges, leaderMap, messageHandler)
       }
     
    +
       /**
    -   * This stream can guarantee that each message from Kafka is included in transformations
    -   * (as opposed to output actions) exactly once, even in most failure situations.
    +   * Create a RDD from Kafka using offset ranges for each topic and partition.
        *
    -   * Points to note:
    -   *
    -   * Failure Recovery - You must checkpoint this stream, or save offsets yourself and provide them
    -   * as the fromOffsets parameter on restart.
    -   * Kafka must have sufficient log retention to obtain messages after failure.
    -   *
    -   * Getting offsets from the stream - see programming guide
    +   * @param jsc JavaSparkContext object
    +   * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
    +   *    configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers"
    +   *    to be set with Kafka broker(s) (NOT zookeeper servers) specified in
    +   *    host1:port1,host2:port2 form.
    +   * @param offsetRanges Each OffsetRange in the batch corresponds to a
    +   *   range of offsets for a given Kafka topic/partition
    +   */
    +  @Experimental
    +  def createRDD[K, V, KD <: Decoder[K], VD <: Decoder[V]](
    +      jsc: JavaSparkContext,
    +      keyClass: Class[K],
    +      valueClass: Class[V],
    +      keyDecoderClass: Class[KD],
    +      valueDecoderClass: Class[VD],
    +      kafkaParams: JMap[String, String],
    +      offsetRanges: Array[OffsetRange]
    +    ): JavaPairRDD[K, V] = {
    +    implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
    +    implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
    +    implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
    +    implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass)
    +    new JavaPairRDD(createRDD[K, V, KD, VD](
    +      jsc.sc, Map(kafkaParams.toSeq: _*), offsetRanges))
    +  }
    +
    +  /**
    +   * :: Experimental ::
    +   * Create a RDD from Kafka using offset ranges for each topic and partition. This allows you
    +   * specify the Kafka leader to connect to (to optimize fetching) and access the message as well
    +   * as the metadata.
        *
    -.  * Zookeeper - This does not use Zookeeper to store offsets.  For interop with Kafka monitors
    -   * that depend on Zookeeper, you must store offsets in ZK yourself.
    +   * @param jsc JavaSparkContext object
    +   * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
    +   *    configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers"
    +   *    to be set with Kafka broker(s) (NOT zookeeper servers) specified in
    +   *    host1:port1,host2:port2 form.
    +   * @param offsetRanges Each OffsetRange in the batch corresponds to a
    +   *   range of offsets for a given Kafka topic/partition
    +   * @param leaders Kafka leaders for each offset range in batch
    +   * @param messageHandler function for translating each message and metadata into the desired type
    +   */
    +  @Experimental
    +  def createRDD[K, V, KD <: Decoder[K], VD <: Decoder[V], R](
    +      jsc: JavaSparkContext,
    +      keyClass: Class[K],
    +      valueClass: Class[V],
    +      keyDecoderClass: Class[KD],
    +      valueDecoderClass: Class[VD],
    +      recordClass: Class[R],
    +      kafkaParams: JMap[String, String],
    +      offsetRanges: Array[OffsetRange],
    +      leaders: Array[Leader],
    +      messageHandler: JFunction[MessageAndMetadata[K, V], R]
    +    ): JavaRDD[R] = {
    +    implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
    +    implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
    +    implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
    +    implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass)
    +    implicit val recordCmt: ClassTag[R] = ClassTag(recordClass)
    +    createRDD[K, V, KD, VD, R](
    +      jsc.sc, Map(kafkaParams.toSeq: _*), offsetRanges, leaders, messageHandler.call _)
    +  }
    +
    +  /**
    +   * :: Experimental ::
    +   * Create an input stream that pulls messages from a Kafka Broker. This stream can guarantee
    +   * that each message from Kafka is included in transformations exactly once (see points below).
        *
    -   * End-to-end semantics - This does not guarantee that any output operation will push each record
    -   * exactly once. To ensure end-to-end exactly-once semantics (that is, receiving exactly once and
    -   * outputting exactly once), you have to either ensure that the output operation is
    -   * idempotent, or transactionally store offsets with the output. See the programming guide for
    -   * more details.
    +   * Points to note:
    +   *  - No receivers: This stream does not use any receiver. It directly queries Kafka
    +   *  - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked
    +   *    by the stream itself. For interoperability with Kafka monitoring tools that depend on 
    +   *    Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application.
    +   *  - Failure Recovery: To recover from driver failures, you have to enable checkpointing
    +   *    in the [[StreamingContext]]. The information on consumed offset can be
    +   *    recovered from the checkpoint. See the programming guide for details (constraints, etc.).
    +   *  - End-to-end semantics: This stream ensures that every records is effectively received and
    +   *    transformed exactly once, but gives no guarantees on whether the transformed data are
    +   *    outputted exactly once. For end-to-end exactly-once semantics, you have to either ensure
    +   *    that the output operation is idempotent, or use transactions to output records atomically.
    +   *    See the programming guide for more details.
        *
        * @param ssc StreamingContext object
        * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
    -   * configuration parameters</a>.
    -   *   Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
    -   *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
    -   * @param messageHandler function for translating each message into the desired type
    -   * @param fromOffsets per-topic/partition Kafka offsets defining the (inclusive)
    -   *  starting point of the stream
    +   *    configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers"
    +   *    to be set with Kafka broker(s) (NOT zookeeper servers) specified in
    +   *    host1:port1,host2:port2 form.
    +   * @param fromOffsets Per-topic/partition Kafka offsets defining the (inclusive)
    +   *    starting point of the stream
    +   * @param messageHandler Function for translating each raw message into the desired type
    --- End diff --
    
    The other messageHandler documentation was changed to say "translating each message and metadata", this should probably be consistent


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...

Posted by koeninger <gi...@git.apache.org>.
Github user koeninger commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4384#discussion_r24253794
  
    --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala ---
    @@ -19,16 +19,35 @@ package org.apache.spark.streaming.kafka
     
     import kafka.common.TopicAndPartition
     
    -/** Something that has a collection of OffsetRanges */
    +import org.apache.spark.annotation.Experimental
    +
    +/**
    + * :: Experimental ::
    + * Represents any object that has a collection of [[OffsetRange]]s. This can be used access the
    + * offset ranges in RDDs generated by the direct Kafka DStream (see
    + * [[KafkaUtils.createDirectStream()]]).
    --- End diff --
    
    It's probably good for the doc for createDirectStream to link to here, in addition to the other way around


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/4384#issuecomment-73182388
  
      [Test build #26891 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26891/consoleFull) for   PR 4384 at commit [`e4abf69`](https://github.com/apache/spark/commit/e4abf69b63bb6bfa94823bcefd27bcbe821b1f2e).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/4384#issuecomment-73195603
  
      [Test build #589 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/589/consoleFull) for   PR 4384 at commit [`e4abf69`](https://github.com/apache/spark/commit/e4abf69b63bb6bfa94823bcefd27bcbe821b1f2e).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...

Posted by koeninger <gi...@git.apache.org>.
Github user koeninger commented on the pull request:

    https://github.com/apache/spark/pull/4384#issuecomment-73269084
  
    Thanks for adding the java friendly kafka uitls methods.
    
    Your original reason for wanting Array[Leader] rather than Map[TopicAndPartition, Broker] was for java compatibility.
    
    But since there are separate scala and java-specific KafkaUtils.createRDD methods now, couldn't the scala one take a Map and the java one take a JMap?
    
    As it stands currently, what is the meaning if someone passes in an array with multiple Leader objects that have the same topic and partition but different broker?
    
    The first thing we do with the array of leaders is convert it to a map...seems better to just take a map and avoid both the possibility of confusion and the extra conversion.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4384#discussion_r24142682
  
    --- Diff: external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala ---
    @@ -0,0 +1,201 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.kafka
    +
    +import java.io.File
    +
    +import scala.collection.mutable
    +import scala.concurrent.duration._
    +import scala.language.postfixOps
    +
    +import kafka.serializer.StringDecoder
    +import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
    +import org.scalatest.concurrent.{Eventually, Timeouts}
    +
    +import org.apache.spark.SparkConf
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.streaming.{Milliseconds, StreamingContext, Time}
    +import org.apache.spark.streaming.dstream.{DStream, InputDStream}
    +import org.apache.spark.util.Utils
    +
    +class DirectKafkaStreamSuite extends KafkaStreamSuiteBase
    --- End diff --
    
    Just renamed the file and testsuite class from KafkaDirectStreamSuite to DirectKafkaStreamSuite, but Git/Github considered it to be a move. The first unit test is unmodified.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/4384#issuecomment-73455353
  
      [Test build #27078 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/27078/consoleFull) for   PR 4384 at commit [`83d0402`](https://github.com/apache/spark/commit/83d04025da1cac3d1ec8565015dbe492f17c3b79).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/4384#issuecomment-73653296
  
      [Test build #27178 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/27178/consoleFull) for   PR 4384 at commit [`7c931c3`](https://github.com/apache/spark/commit/7c931c3fd174376fc04a436a64ec414dbe8eac46).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4384#discussion_r24386265
  
    --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala ---
    @@ -179,121 +182,194 @@ object KafkaUtils {
           errs => throw new SparkException(errs.mkString("\n")),
           ok => ok
         )
    -    new KafkaRDD[K, V, U, T, (K, V)](sc, kafkaParams, offsetRanges, leaders, messageHandler)
    +    new KafkaRDD[K, V, KD, VD, (K, V)](sc, kafkaParams, offsetRanges, leaders, messageHandler)
       }
     
    -  /** A batch-oriented interface for consuming from Kafka.
    -   * Starting and ending offsets are specified in advance,
    -   * so that you can control exactly-once semantics.
    +  /**
    +   * :: Experimental ::
    +   * Create a RDD from Kafka using offset ranges for each topic and partition. This allows you
    +   * specify the Kafka leader to connect to (to optimize fetching) and access the message as well
    +   * as the metadata.
    +   *
        * @param sc SparkContext object
        * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
    -   * configuration parameters</a>.
    -   *   Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
    -   *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
    +   *    configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers"
    +   *    to be set with Kafka broker(s) (NOT zookeeper servers) specified in
    +   *    host1:port1,host2:port2 form.
        * @param offsetRanges Each OffsetRange in the batch corresponds to a
        *   range of offsets for a given Kafka topic/partition
        * @param leaders Kafka leaders for each offset range in batch
    -   * @param messageHandler function for translating each message into the desired type
    +   * @param messageHandler Function for translating each message and metadata into the desired type
        */
       @Experimental
       def createRDD[
         K: ClassTag,
         V: ClassTag,
    -    U <: Decoder[_]: ClassTag,
    -    T <: Decoder[_]: ClassTag,
    -    R: ClassTag] (
    +    KD <: Decoder[K]: ClassTag,
    +    VD <: Decoder[V]: ClassTag,
    +    R: ClassTag](
           sc: SparkContext,
           kafkaParams: Map[String, String],
           offsetRanges: Array[OffsetRange],
           leaders: Array[Leader],
           messageHandler: MessageAndMetadata[K, V] => R
    -  ): RDD[R] = {
    -
    +    ): RDD[R] = {
         val leaderMap = leaders
           .map(l => TopicAndPartition(l.topic, l.partition) -> (l.host, l.port))
           .toMap
    -    new KafkaRDD[K, V, U, T, R](sc, kafkaParams, offsetRanges, leaderMap, messageHandler)
    +    new KafkaRDD[K, V, KD, VD, R](sc, kafkaParams, offsetRanges, leaderMap, messageHandler)
       }
     
    +
       /**
    -   * This stream can guarantee that each message from Kafka is included in transformations
    -   * (as opposed to output actions) exactly once, even in most failure situations.
    +   * Create a RDD from Kafka using offset ranges for each topic and partition.
        *
    -   * Points to note:
    -   *
    -   * Failure Recovery - You must checkpoint this stream, or save offsets yourself and provide them
    -   * as the fromOffsets parameter on restart.
    -   * Kafka must have sufficient log retention to obtain messages after failure.
    -   *
    -   * Getting offsets from the stream - see programming guide
    +   * @param jsc JavaSparkContext object
    +   * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
    +   *    configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers"
    +   *    to be set with Kafka broker(s) (NOT zookeeper servers) specified in
    +   *    host1:port1,host2:port2 form.
    +   * @param offsetRanges Each OffsetRange in the batch corresponds to a
    +   *   range of offsets for a given Kafka topic/partition
    +   */
    +  @Experimental
    +  def createRDD[K, V, KD <: Decoder[K], VD <: Decoder[V]](
    +      jsc: JavaSparkContext,
    +      keyClass: Class[K],
    +      valueClass: Class[V],
    +      keyDecoderClass: Class[KD],
    +      valueDecoderClass: Class[VD],
    +      kafkaParams: JMap[String, String],
    +      offsetRanges: Array[OffsetRange]
    +    ): JavaPairRDD[K, V] = {
    +    implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
    +    implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
    +    implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
    +    implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass)
    +    new JavaPairRDD(createRDD[K, V, KD, VD](
    +      jsc.sc, Map(kafkaParams.toSeq: _*), offsetRanges))
    +  }
    +
    +  /**
    +   * :: Experimental ::
    +   * Create a RDD from Kafka using offset ranges for each topic and partition. This allows you
    +   * specify the Kafka leader to connect to (to optimize fetching) and access the message as well
    +   * as the metadata.
        *
    -.  * Zookeeper - This does not use Zookeeper to store offsets.  For interop with Kafka monitors
    -   * that depend on Zookeeper, you must store offsets in ZK yourself.
    +   * @param jsc JavaSparkContext object
    +   * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
    +   *    configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers"
    +   *    to be set with Kafka broker(s) (NOT zookeeper servers) specified in
    +   *    host1:port1,host2:port2 form.
    +   * @param offsetRanges Each OffsetRange in the batch corresponds to a
    +   *   range of offsets for a given Kafka topic/partition
    +   * @param leaders Kafka leaders for each offset range in batch
    +   * @param messageHandler Function for translating each message and metadata into the desired type
    +   */
    +  @Experimental
    +  def createRDD[K, V, KD <: Decoder[K], VD <: Decoder[V], R](
    +      jsc: JavaSparkContext,
    +      keyClass: Class[K],
    +      valueClass: Class[V],
    +      keyDecoderClass: Class[KD],
    +      valueDecoderClass: Class[VD],
    +      recordClass: Class[R],
    +      kafkaParams: JMap[String, String],
    +      offsetRanges: Array[OffsetRange],
    +      leaders: Array[Leader],
    +      messageHandler: JFunction[MessageAndMetadata[K, V], R]
    +    ): JavaRDD[R] = {
    +    implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
    +    implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
    +    implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
    +    implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass)
    +    implicit val recordCmt: ClassTag[R] = ClassTag(recordClass)
    +    createRDD[K, V, KD, VD, R](
    +      jsc.sc, Map(kafkaParams.toSeq: _*), offsetRanges, leaders, messageHandler.call _)
    +  }
    +
    +  /**
    +   * :: Experimental ::
    +   * Create an input stream that pulls messages from a Kafka Broker. This stream can guarantee
    --- End diff --
    
    what about saying "without using receivers" at the end?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/4384#issuecomment-73647726
  
      [Test build #27178 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/27178/consoleFull) for   PR 4384 at commit [`7c931c3`](https://github.com/apache/spark/commit/7c931c3fd174376fc04a436a64ec414dbe8eac46).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/4384#issuecomment-73335167
  
      [Test build #26966 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26966/consoleFull) for   PR 4384 at commit [`26df23c`](https://github.com/apache/spark/commit/26df23c5578e52dc594557cfaf2170b1e0d49169).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...

Posted by koeninger <gi...@git.apache.org>.
Github user koeninger commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4384#discussion_r24268035
  
    --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala ---
    @@ -179,121 +182,190 @@ object KafkaUtils {
           errs => throw new SparkException(errs.mkString("\n")),
           ok => ok
         )
    -    new KafkaRDD[K, V, U, T, (K, V)](sc, kafkaParams, offsetRanges, leaders, messageHandler)
    +    new KafkaRDD[K, V, KD, VD, (K, V)](sc, kafkaParams, offsetRanges, leaders, messageHandler)
       }
     
    -  /** A batch-oriented interface for consuming from Kafka.
    -   * Starting and ending offsets are specified in advance,
    -   * so that you can control exactly-once semantics.
    +  /**
    +   * :: Experimental ::
    +   * Create a RDD from Kafka using offset ranges for each topic and partition. This allows you
    +   * specify the Kafka leader to connect to (to optimize fetching) and access the message as well
    +   * as the metadata.
    +   *
        * @param sc SparkContext object
        * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
    -   * configuration parameters</a>.
    -   *   Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
    -   *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
    +   *    configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers"
    +   *    to be set with Kafka broker(s) (NOT zookeeper servers) specified in
    +   *    host1:port1,host2:port2 form.
        * @param offsetRanges Each OffsetRange in the batch corresponds to a
        *   range of offsets for a given Kafka topic/partition
        * @param leaders Kafka leaders for each offset range in batch
    -   * @param messageHandler function for translating each message into the desired type
    +   * @param messageHandler function for translating each message and metadata into the desired type
        */
       @Experimental
       def createRDD[
         K: ClassTag,
         V: ClassTag,
    -    U <: Decoder[_]: ClassTag,
    -    T <: Decoder[_]: ClassTag,
    -    R: ClassTag] (
    +    KD <: Decoder[K]: ClassTag,
    +    VD <: Decoder[V]: ClassTag,
    +    R: ClassTag](
           sc: SparkContext,
           kafkaParams: Map[String, String],
           offsetRanges: Array[OffsetRange],
           leaders: Array[Leader],
           messageHandler: MessageAndMetadata[K, V] => R
    -  ): RDD[R] = {
    -
    +    ): RDD[R] = {
         val leaderMap = leaders
           .map(l => TopicAndPartition(l.topic, l.partition) -> (l.host, l.port))
           .toMap
    -    new KafkaRDD[K, V, U, T, R](sc, kafkaParams, offsetRanges, leaderMap, messageHandler)
    +    new KafkaRDD[K, V, KD, VD, R](sc, kafkaParams, offsetRanges, leaderMap, messageHandler)
       }
     
    +
       /**
    -   * This stream can guarantee that each message from Kafka is included in transformations
    -   * (as opposed to output actions) exactly once, even in most failure situations.
    +   * Create a RDD from Kafka using offset ranges for each topic and partition.
        *
    -   * Points to note:
    -   *
    -   * Failure Recovery - You must checkpoint this stream, or save offsets yourself and provide them
    -   * as the fromOffsets parameter on restart.
    -   * Kafka must have sufficient log retention to obtain messages after failure.
    -   *
    -   * Getting offsets from the stream - see programming guide
    +   * @param jsc JavaSparkContext object
    +   * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
    +   *    configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers"
    +   *    to be set with Kafka broker(s) (NOT zookeeper servers) specified in
    +   *    host1:port1,host2:port2 form.
    +   * @param offsetRanges Each OffsetRange in the batch corresponds to a
    +   *   range of offsets for a given Kafka topic/partition
    +   */
    +  @Experimental
    +  def createRDD[K, V, KD <: Decoder[K], VD <: Decoder[V]](
    +      jsc: JavaSparkContext,
    +      keyClass: Class[K],
    +      valueClass: Class[V],
    +      keyDecoderClass: Class[KD],
    +      valueDecoderClass: Class[VD],
    +      kafkaParams: JMap[String, String],
    +      offsetRanges: Array[OffsetRange]
    +    ): JavaPairRDD[K, V] = {
    +    implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
    +    implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
    +    implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
    +    implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass)
    +    new JavaPairRDD(createRDD[K, V, KD, VD](
    +      jsc.sc, Map(kafkaParams.toSeq: _*), offsetRanges))
    +  }
    +
    +  /**
    +   * :: Experimental ::
    +   * Create a RDD from Kafka using offset ranges for each topic and partition. This allows you
    +   * specify the Kafka leader to connect to (to optimize fetching) and access the message as well
    +   * as the metadata.
        *
    -.  * Zookeeper - This does not use Zookeeper to store offsets.  For interop with Kafka monitors
    -   * that depend on Zookeeper, you must store offsets in ZK yourself.
    +   * @param jsc JavaSparkContext object
    +   * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
    +   *    configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers"
    +   *    to be set with Kafka broker(s) (NOT zookeeper servers) specified in
    +   *    host1:port1,host2:port2 form.
    +   * @param offsetRanges Each OffsetRange in the batch corresponds to a
    +   *   range of offsets for a given Kafka topic/partition
    +   * @param leaders Kafka leaders for each offset range in batch
    +   * @param messageHandler function for translating each message and metadata into the desired type
    +   */
    +  @Experimental
    +  def createRDD[K, V, KD <: Decoder[K], VD <: Decoder[V], R](
    +      jsc: JavaSparkContext,
    +      keyClass: Class[K],
    +      valueClass: Class[V],
    +      keyDecoderClass: Class[KD],
    +      valueDecoderClass: Class[VD],
    +      recordClass: Class[R],
    +      kafkaParams: JMap[String, String],
    +      offsetRanges: Array[OffsetRange],
    +      leaders: Array[Leader],
    +      messageHandler: JFunction[MessageAndMetadata[K, V], R]
    +    ): JavaRDD[R] = {
    +    implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
    +    implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
    +    implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
    +    implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass)
    +    implicit val recordCmt: ClassTag[R] = ClassTag(recordClass)
    +    createRDD[K, V, KD, VD, R](
    +      jsc.sc, Map(kafkaParams.toSeq: _*), offsetRanges, leaders, messageHandler.call _)
    +  }
    +
    +  /**
    +   * :: Experimental ::
    +   * Create an input stream that pulls messages from a Kafka Broker. This stream can guarantee
    +   * that each message from Kafka is included in transformations exactly once (see points below).
        *
    -   * End-to-end semantics - This does not guarantee that any output operation will push each record
    -   * exactly once. To ensure end-to-end exactly-once semantics (that is, receiving exactly once and
    -   * outputting exactly once), you have to either ensure that the output operation is
    -   * idempotent, or transactionally store offsets with the output. See the programming guide for
    -   * more details.
    +   * Points to note:
    +   *  - No receivers: This stream does not use any receiver. It directly queries Kafka
    +   *  - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked
    +   *    by the stream itself. For interoperability with Kafka monitoring tools that depend on 
    +   *    Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application.
    +   *  - Failure Recovery: To recover from driver failures, you have to enable checkpointing
    --- End diff --
    
    Streaming won't let you do windowing or updateStateByKey without checkpointing, so that's a moot point anyway, right?
    
    It's a pretty small nuance, but my point is that people who are storing offsets themselves don't necessarily need to turn on checkpointing if their job doesn't otherwise require it, and get the advantage of not having to worry about lost / unrecoverable checkpoints.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4384#discussion_r24272277
  
    --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala ---
    @@ -179,121 +182,190 @@ object KafkaUtils {
           errs => throw new SparkException(errs.mkString("\n")),
           ok => ok
         )
    -    new KafkaRDD[K, V, U, T, (K, V)](sc, kafkaParams, offsetRanges, leaders, messageHandler)
    +    new KafkaRDD[K, V, KD, VD, (K, V)](sc, kafkaParams, offsetRanges, leaders, messageHandler)
       }
     
    -  /** A batch-oriented interface for consuming from Kafka.
    -   * Starting and ending offsets are specified in advance,
    -   * so that you can control exactly-once semantics.
    +  /**
    +   * :: Experimental ::
    +   * Create a RDD from Kafka using offset ranges for each topic and partition. This allows you
    +   * specify the Kafka leader to connect to (to optimize fetching) and access the message as well
    +   * as the metadata.
    +   *
        * @param sc SparkContext object
        * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
    -   * configuration parameters</a>.
    -   *   Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
    -   *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
    +   *    configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers"
    +   *    to be set with Kafka broker(s) (NOT zookeeper servers) specified in
    +   *    host1:port1,host2:port2 form.
        * @param offsetRanges Each OffsetRange in the batch corresponds to a
        *   range of offsets for a given Kafka topic/partition
        * @param leaders Kafka leaders for each offset range in batch
    -   * @param messageHandler function for translating each message into the desired type
    +   * @param messageHandler function for translating each message and metadata into the desired type
        */
       @Experimental
       def createRDD[
         K: ClassTag,
         V: ClassTag,
    -    U <: Decoder[_]: ClassTag,
    -    T <: Decoder[_]: ClassTag,
    -    R: ClassTag] (
    +    KD <: Decoder[K]: ClassTag,
    +    VD <: Decoder[V]: ClassTag,
    +    R: ClassTag](
           sc: SparkContext,
           kafkaParams: Map[String, String],
           offsetRanges: Array[OffsetRange],
           leaders: Array[Leader],
           messageHandler: MessageAndMetadata[K, V] => R
    -  ): RDD[R] = {
    -
    +    ): RDD[R] = {
         val leaderMap = leaders
           .map(l => TopicAndPartition(l.topic, l.partition) -> (l.host, l.port))
           .toMap
    -    new KafkaRDD[K, V, U, T, R](sc, kafkaParams, offsetRanges, leaderMap, messageHandler)
    +    new KafkaRDD[K, V, KD, VD, R](sc, kafkaParams, offsetRanges, leaderMap, messageHandler)
       }
     
    +
       /**
    -   * This stream can guarantee that each message from Kafka is included in transformations
    -   * (as opposed to output actions) exactly once, even in most failure situations.
    +   * Create a RDD from Kafka using offset ranges for each topic and partition.
        *
    -   * Points to note:
    -   *
    -   * Failure Recovery - You must checkpoint this stream, or save offsets yourself and provide them
    -   * as the fromOffsets parameter on restart.
    -   * Kafka must have sufficient log retention to obtain messages after failure.
    -   *
    -   * Getting offsets from the stream - see programming guide
    +   * @param jsc JavaSparkContext object
    +   * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
    +   *    configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers"
    +   *    to be set with Kafka broker(s) (NOT zookeeper servers) specified in
    +   *    host1:port1,host2:port2 form.
    +   * @param offsetRanges Each OffsetRange in the batch corresponds to a
    +   *   range of offsets for a given Kafka topic/partition
    +   */
    +  @Experimental
    +  def createRDD[K, V, KD <: Decoder[K], VD <: Decoder[V]](
    +      jsc: JavaSparkContext,
    +      keyClass: Class[K],
    +      valueClass: Class[V],
    +      keyDecoderClass: Class[KD],
    +      valueDecoderClass: Class[VD],
    +      kafkaParams: JMap[String, String],
    +      offsetRanges: Array[OffsetRange]
    +    ): JavaPairRDD[K, V] = {
    +    implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
    +    implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
    +    implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
    +    implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass)
    +    new JavaPairRDD(createRDD[K, V, KD, VD](
    +      jsc.sc, Map(kafkaParams.toSeq: _*), offsetRanges))
    +  }
    +
    +  /**
    +   * :: Experimental ::
    +   * Create a RDD from Kafka using offset ranges for each topic and partition. This allows you
    +   * specify the Kafka leader to connect to (to optimize fetching) and access the message as well
    +   * as the metadata.
        *
    -.  * Zookeeper - This does not use Zookeeper to store offsets.  For interop with Kafka monitors
    -   * that depend on Zookeeper, you must store offsets in ZK yourself.
    +   * @param jsc JavaSparkContext object
    +   * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
    +   *    configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers"
    +   *    to be set with Kafka broker(s) (NOT zookeeper servers) specified in
    +   *    host1:port1,host2:port2 form.
    +   * @param offsetRanges Each OffsetRange in the batch corresponds to a
    +   *   range of offsets for a given Kafka topic/partition
    +   * @param leaders Kafka leaders for each offset range in batch
    +   * @param messageHandler function for translating each message and metadata into the desired type
    +   */
    +  @Experimental
    +  def createRDD[K, V, KD <: Decoder[K], VD <: Decoder[V], R](
    +      jsc: JavaSparkContext,
    +      keyClass: Class[K],
    +      valueClass: Class[V],
    +      keyDecoderClass: Class[KD],
    +      valueDecoderClass: Class[VD],
    +      recordClass: Class[R],
    +      kafkaParams: JMap[String, String],
    +      offsetRanges: Array[OffsetRange],
    +      leaders: Array[Leader],
    +      messageHandler: JFunction[MessageAndMetadata[K, V], R]
    +    ): JavaRDD[R] = {
    +    implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
    +    implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
    +    implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
    +    implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass)
    +    implicit val recordCmt: ClassTag[R] = ClassTag(recordClass)
    +    createRDD[K, V, KD, VD, R](
    +      jsc.sc, Map(kafkaParams.toSeq: _*), offsetRanges, leaders, messageHandler.call _)
    +  }
    +
    +  /**
    +   * :: Experimental ::
    +   * Create an input stream that pulls messages from a Kafka Broker. This stream can guarantee
    +   * that each message from Kafka is included in transformations exactly once (see points below).
        *
    -   * End-to-end semantics - This does not guarantee that any output operation will push each record
    -   * exactly once. To ensure end-to-end exactly-once semantics (that is, receiving exactly once and
    -   * outputting exactly once), you have to either ensure that the output operation is
    -   * idempotent, or transactionally store offsets with the output. See the programming guide for
    -   * more details.
    +   * Points to note:
    +   *  - No receivers: This stream does not use any receiver. It directly queries Kafka
    +   *  - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked
    +   *    by the stream itself. For interoperability with Kafka monitoring tools that depend on 
    +   *    Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application.
    +   *  - Failure Recovery: To recover from driver failures, you have to enable checkpointing
    --- End diff --
    
    Of course not! You can do all windowing (except reduceByKeyAndWindow with inverse functions) without checkpointing enabled. And lots of people use Spark Streaming without checkpointing, but use windows. 
    
    It is a small nuance, but here is how i reason it in my head. If you care about recovering stuff through driver failures, you HAVE to enable checkpointing first. Only then can you get driver fault tolerance for all operations, as long as your code does not change. So you get recoverability but not upgradability. If you also want upgradability across code and Spark versions, you can save offsets somewhere else and recover from that. Yes, for some limited cases, you can get recoverability using that mechanism. But that is delving in to too much details for any reader. So if there is ONE thing that a developer needs to know to get driver fault-tolerance, it is "enable checkpointing", and "not saving offsets somewhere else". 
    
    Does that make sense?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/4384#issuecomment-73178166
  
      [Test build #26891 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26891/consoleFull) for   PR 4384 at commit [`e4abf69`](https://github.com/apache/spark/commit/e4abf69b63bb6bfa94823bcefd27bcbe821b1f2e).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4384#discussion_r24268021
  
    --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala ---
    @@ -19,16 +19,35 @@ package org.apache.spark.streaming.kafka
     
     import kafka.common.TopicAndPartition
     
    -/** Something that has a collection of OffsetRanges */
    +import org.apache.spark.annotation.Experimental
    +
    +/**
    + * :: Experimental ::
    + * Represents any object that has a collection of [[OffsetRange]]s. This can be used access the
    + * offset ranges in RDDs generated by the direct Kafka DStream (see
    + * [[KafkaUtils.createDirectStream()]]).
    --- End diff --
    
    Good call. Let me add the references.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4384#discussion_r24267569
  
    --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala ---
    @@ -179,121 +182,190 @@ object KafkaUtils {
           errs => throw new SparkException(errs.mkString("\n")),
           ok => ok
         )
    -    new KafkaRDD[K, V, U, T, (K, V)](sc, kafkaParams, offsetRanges, leaders, messageHandler)
    +    new KafkaRDD[K, V, KD, VD, (K, V)](sc, kafkaParams, offsetRanges, leaders, messageHandler)
       }
     
    -  /** A batch-oriented interface for consuming from Kafka.
    -   * Starting and ending offsets are specified in advance,
    -   * so that you can control exactly-once semantics.
    +  /**
    +   * :: Experimental ::
    +   * Create a RDD from Kafka using offset ranges for each topic and partition. This allows you
    +   * specify the Kafka leader to connect to (to optimize fetching) and access the message as well
    +   * as the metadata.
    +   *
        * @param sc SparkContext object
        * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
    -   * configuration parameters</a>.
    -   *   Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
    -   *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
    +   *    configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers"
    +   *    to be set with Kafka broker(s) (NOT zookeeper servers) specified in
    +   *    host1:port1,host2:port2 form.
        * @param offsetRanges Each OffsetRange in the batch corresponds to a
        *   range of offsets for a given Kafka topic/partition
        * @param leaders Kafka leaders for each offset range in batch
    -   * @param messageHandler function for translating each message into the desired type
    +   * @param messageHandler function for translating each message and metadata into the desired type
        */
       @Experimental
       def createRDD[
         K: ClassTag,
         V: ClassTag,
    -    U <: Decoder[_]: ClassTag,
    -    T <: Decoder[_]: ClassTag,
    -    R: ClassTag] (
    +    KD <: Decoder[K]: ClassTag,
    +    VD <: Decoder[V]: ClassTag,
    +    R: ClassTag](
           sc: SparkContext,
           kafkaParams: Map[String, String],
           offsetRanges: Array[OffsetRange],
           leaders: Array[Leader],
           messageHandler: MessageAndMetadata[K, V] => R
    -  ): RDD[R] = {
    -
    +    ): RDD[R] = {
         val leaderMap = leaders
           .map(l => TopicAndPartition(l.topic, l.partition) -> (l.host, l.port))
           .toMap
    -    new KafkaRDD[K, V, U, T, R](sc, kafkaParams, offsetRanges, leaderMap, messageHandler)
    +    new KafkaRDD[K, V, KD, VD, R](sc, kafkaParams, offsetRanges, leaderMap, messageHandler)
       }
     
    +
       /**
    -   * This stream can guarantee that each message from Kafka is included in transformations
    -   * (as opposed to output actions) exactly once, even in most failure situations.
    +   * Create a RDD from Kafka using offset ranges for each topic and partition.
        *
    -   * Points to note:
    -   *
    -   * Failure Recovery - You must checkpoint this stream, or save offsets yourself and provide them
    -   * as the fromOffsets parameter on restart.
    -   * Kafka must have sufficient log retention to obtain messages after failure.
    -   *
    -   * Getting offsets from the stream - see programming guide
    +   * @param jsc JavaSparkContext object
    +   * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
    +   *    configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers"
    +   *    to be set with Kafka broker(s) (NOT zookeeper servers) specified in
    +   *    host1:port1,host2:port2 form.
    +   * @param offsetRanges Each OffsetRange in the batch corresponds to a
    +   *   range of offsets for a given Kafka topic/partition
    +   */
    +  @Experimental
    +  def createRDD[K, V, KD <: Decoder[K], VD <: Decoder[V]](
    +      jsc: JavaSparkContext,
    +      keyClass: Class[K],
    +      valueClass: Class[V],
    +      keyDecoderClass: Class[KD],
    +      valueDecoderClass: Class[VD],
    +      kafkaParams: JMap[String, String],
    +      offsetRanges: Array[OffsetRange]
    +    ): JavaPairRDD[K, V] = {
    +    implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
    +    implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
    +    implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
    +    implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass)
    +    new JavaPairRDD(createRDD[K, V, KD, VD](
    +      jsc.sc, Map(kafkaParams.toSeq: _*), offsetRanges))
    +  }
    +
    +  /**
    +   * :: Experimental ::
    +   * Create a RDD from Kafka using offset ranges for each topic and partition. This allows you
    +   * specify the Kafka leader to connect to (to optimize fetching) and access the message as well
    +   * as the metadata.
        *
    -.  * Zookeeper - This does not use Zookeeper to store offsets.  For interop with Kafka monitors
    -   * that depend on Zookeeper, you must store offsets in ZK yourself.
    +   * @param jsc JavaSparkContext object
    +   * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
    +   *    configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers"
    +   *    to be set with Kafka broker(s) (NOT zookeeper servers) specified in
    +   *    host1:port1,host2:port2 form.
    +   * @param offsetRanges Each OffsetRange in the batch corresponds to a
    +   *   range of offsets for a given Kafka topic/partition
    +   * @param leaders Kafka leaders for each offset range in batch
    +   * @param messageHandler function for translating each message and metadata into the desired type
    +   */
    +  @Experimental
    +  def createRDD[K, V, KD <: Decoder[K], VD <: Decoder[V], R](
    +      jsc: JavaSparkContext,
    +      keyClass: Class[K],
    +      valueClass: Class[V],
    +      keyDecoderClass: Class[KD],
    +      valueDecoderClass: Class[VD],
    +      recordClass: Class[R],
    +      kafkaParams: JMap[String, String],
    +      offsetRanges: Array[OffsetRange],
    +      leaders: Array[Leader],
    +      messageHandler: JFunction[MessageAndMetadata[K, V], R]
    +    ): JavaRDD[R] = {
    +    implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
    +    implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
    +    implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
    +    implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass)
    +    implicit val recordCmt: ClassTag[R] = ClassTag(recordClass)
    +    createRDD[K, V, KD, VD, R](
    +      jsc.sc, Map(kafkaParams.toSeq: _*), offsetRanges, leaders, messageHandler.call _)
    +  }
    +
    +  /**
    +   * :: Experimental ::
    +   * Create an input stream that pulls messages from a Kafka Broker. This stream can guarantee
    +   * that each message from Kafka is included in transformations exactly once (see points below).
        *
    -   * End-to-end semantics - This does not guarantee that any output operation will push each record
    -   * exactly once. To ensure end-to-end exactly-once semantics (that is, receiving exactly once and
    -   * outputting exactly once), you have to either ensure that the output operation is
    -   * idempotent, or transactionally store offsets with the output. See the programming guide for
    -   * more details.
    +   * Points to note:
    +   *  - No receivers: This stream does not use any receiver. It directly queries Kafka
    +   *  - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked
    +   *    by the stream itself. For interoperability with Kafka monitoring tools that depend on 
    +   *    Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application.
    +   *  - Failure Recovery: To recover from driver failures, you have to enable checkpointing
    --- End diff --
    
    That's an additional step that the developer has to do. The stream created using this function does not automatically support storing offsets somewhere else. But it supports automatically recovering the offsets from checkpoints if checkpointing is enabled (of course within constraints that the code hasnt been updated etc.). 
    
    Also, even if you saved the latest completed offsets to some storage (e.g. postgres), how will recover past batches in case of window operations? If any window operation requires 10 batches, without checkpointing enabled, how would you recover the last 10 batches?
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...

Posted by koeninger <gi...@git.apache.org>.
Github user koeninger commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4384#discussion_r24252952
  
    --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala ---
    @@ -179,121 +182,190 @@ object KafkaUtils {
           errs => throw new SparkException(errs.mkString("\n")),
           ok => ok
         )
    -    new KafkaRDD[K, V, U, T, (K, V)](sc, kafkaParams, offsetRanges, leaders, messageHandler)
    +    new KafkaRDD[K, V, KD, VD, (K, V)](sc, kafkaParams, offsetRanges, leaders, messageHandler)
       }
     
    -  /** A batch-oriented interface for consuming from Kafka.
    -   * Starting and ending offsets are specified in advance,
    -   * so that you can control exactly-once semantics.
    +  /**
    +   * :: Experimental ::
    +   * Create a RDD from Kafka using offset ranges for each topic and partition. This allows you
    +   * specify the Kafka leader to connect to (to optimize fetching) and access the message as well
    +   * as the metadata.
    +   *
        * @param sc SparkContext object
        * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
    -   * configuration parameters</a>.
    -   *   Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
    -   *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
    +   *    configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers"
    +   *    to be set with Kafka broker(s) (NOT zookeeper servers) specified in
    +   *    host1:port1,host2:port2 form.
        * @param offsetRanges Each OffsetRange in the batch corresponds to a
        *   range of offsets for a given Kafka topic/partition
        * @param leaders Kafka leaders for each offset range in batch
    -   * @param messageHandler function for translating each message into the desired type
    +   * @param messageHandler function for translating each message and metadata into the desired type
        */
       @Experimental
       def createRDD[
         K: ClassTag,
         V: ClassTag,
    -    U <: Decoder[_]: ClassTag,
    -    T <: Decoder[_]: ClassTag,
    -    R: ClassTag] (
    +    KD <: Decoder[K]: ClassTag,
    +    VD <: Decoder[V]: ClassTag,
    +    R: ClassTag](
           sc: SparkContext,
           kafkaParams: Map[String, String],
           offsetRanges: Array[OffsetRange],
           leaders: Array[Leader],
           messageHandler: MessageAndMetadata[K, V] => R
    -  ): RDD[R] = {
    -
    +    ): RDD[R] = {
         val leaderMap = leaders
           .map(l => TopicAndPartition(l.topic, l.partition) -> (l.host, l.port))
           .toMap
    -    new KafkaRDD[K, V, U, T, R](sc, kafkaParams, offsetRanges, leaderMap, messageHandler)
    +    new KafkaRDD[K, V, KD, VD, R](sc, kafkaParams, offsetRanges, leaderMap, messageHandler)
       }
     
    +
       /**
    -   * This stream can guarantee that each message from Kafka is included in transformations
    -   * (as opposed to output actions) exactly once, even in most failure situations.
    +   * Create a RDD from Kafka using offset ranges for each topic and partition.
        *
    -   * Points to note:
    -   *
    -   * Failure Recovery - You must checkpoint this stream, or save offsets yourself and provide them
    -   * as the fromOffsets parameter on restart.
    -   * Kafka must have sufficient log retention to obtain messages after failure.
    -   *
    -   * Getting offsets from the stream - see programming guide
    +   * @param jsc JavaSparkContext object
    +   * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
    +   *    configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers"
    +   *    to be set with Kafka broker(s) (NOT zookeeper servers) specified in
    +   *    host1:port1,host2:port2 form.
    +   * @param offsetRanges Each OffsetRange in the batch corresponds to a
    +   *   range of offsets for a given Kafka topic/partition
    +   */
    +  @Experimental
    +  def createRDD[K, V, KD <: Decoder[K], VD <: Decoder[V]](
    +      jsc: JavaSparkContext,
    +      keyClass: Class[K],
    +      valueClass: Class[V],
    +      keyDecoderClass: Class[KD],
    +      valueDecoderClass: Class[VD],
    +      kafkaParams: JMap[String, String],
    +      offsetRanges: Array[OffsetRange]
    +    ): JavaPairRDD[K, V] = {
    +    implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
    +    implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
    +    implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
    +    implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass)
    +    new JavaPairRDD(createRDD[K, V, KD, VD](
    +      jsc.sc, Map(kafkaParams.toSeq: _*), offsetRanges))
    +  }
    +
    +  /**
    +   * :: Experimental ::
    +   * Create a RDD from Kafka using offset ranges for each topic and partition. This allows you
    +   * specify the Kafka leader to connect to (to optimize fetching) and access the message as well
    +   * as the metadata.
        *
    -.  * Zookeeper - This does not use Zookeeper to store offsets.  For interop with Kafka monitors
    -   * that depend on Zookeeper, you must store offsets in ZK yourself.
    +   * @param jsc JavaSparkContext object
    +   * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
    +   *    configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers"
    +   *    to be set with Kafka broker(s) (NOT zookeeper servers) specified in
    +   *    host1:port1,host2:port2 form.
    +   * @param offsetRanges Each OffsetRange in the batch corresponds to a
    +   *   range of offsets for a given Kafka topic/partition
    +   * @param leaders Kafka leaders for each offset range in batch
    +   * @param messageHandler function for translating each message and metadata into the desired type
    +   */
    +  @Experimental
    +  def createRDD[K, V, KD <: Decoder[K], VD <: Decoder[V], R](
    +      jsc: JavaSparkContext,
    +      keyClass: Class[K],
    +      valueClass: Class[V],
    +      keyDecoderClass: Class[KD],
    +      valueDecoderClass: Class[VD],
    +      recordClass: Class[R],
    +      kafkaParams: JMap[String, String],
    +      offsetRanges: Array[OffsetRange],
    +      leaders: Array[Leader],
    +      messageHandler: JFunction[MessageAndMetadata[K, V], R]
    +    ): JavaRDD[R] = {
    +    implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
    +    implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
    +    implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
    +    implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass)
    +    implicit val recordCmt: ClassTag[R] = ClassTag(recordClass)
    +    createRDD[K, V, KD, VD, R](
    +      jsc.sc, Map(kafkaParams.toSeq: _*), offsetRanges, leaders, messageHandler.call _)
    +  }
    +
    +  /**
    +   * :: Experimental ::
    +   * Create an input stream that pulls messages from a Kafka Broker. This stream can guarantee
    +   * that each message from Kafka is included in transformations exactly once (see points below).
        *
    -   * End-to-end semantics - This does not guarantee that any output operation will push each record
    -   * exactly once. To ensure end-to-end exactly-once semantics (that is, receiving exactly once and
    -   * outputting exactly once), you have to either ensure that the output operation is
    -   * idempotent, or transactionally store offsets with the output. See the programming guide for
    -   * more details.
    +   * Points to note:
    +   *  - No receivers: This stream does not use any receiver. It directly queries Kafka
    +   *  - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked
    +   *    by the stream itself. For interoperability with Kafka monitoring tools that depend on 
    +   *    Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application.
    +   *  - Failure Recovery: To recover from driver failures, you have to enable checkpointing
    --- End diff --
    
    The methods that take fromOffsets don't actually require checkpointing to recover from driver failure, you can save offsets yourself and provide them as the fromOffsets parameter on restart, as the docs originally noted.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4384#discussion_r24265028
  
    --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala ---
    @@ -36,14 +36,12 @@ import kafka.utils.VerifiableProperties
      * Starting and ending offsets are specified in advance,
      * so that you can control exactly-once semantics.
      * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
    - * configuration parameters</a>.
    - *   Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
    - *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
    - * @param batch Each KafkaRDDPartition in the batch corresponds to a
    - *   range of offsets for a given Kafka topic/partition
    + * configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers" to be set
    + * with Kafka broker(s) specified in host1:port1,host2:port2 form.
    --- End diff --
    
    The only reason we were writing "not zookeepers" is to make the difference with the earlier stream clear, for people who want to switch from the old one or the new. That applies to the public API. This is internal private API. I can add it back, no issues.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/4384#issuecomment-73170929
  
      [Test build #26878 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26878/consoleFull) for   PR 4384 at commit [`50f2b56`](https://github.com/apache/spark/commit/50f2b56f57b00845d006361049dd2c4bac89957c).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/4384#issuecomment-72994795
  
      [Test build #26822 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26822/consoleFull) for   PR 4384 at commit [`4986784`](https://github.com/apache/spark/commit/49867846e72329c849002706441d8165825f4a6b).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/4384#issuecomment-73207199
  
      [Test build #589 has finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/589/consoleFull) for   PR 4384 at commit [`e4abf69`](https://github.com/apache/spark/commit/e4abf69b63bb6bfa94823bcefd27bcbe821b1f2e).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/4384#issuecomment-73459273
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/27078/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org