You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by harishreedharan <gi...@git.apache.org> on 2014/10/29 07:13:59 UTC

[GitHub] spark pull request: [SPARK-4122][STREAMING] Add a library that can...

GitHub user harishreedharan opened a pull request:

    https://github.com/apache/spark/pull/2994

    [SPARK-4122][STREAMING] Add a library that can write data back to Kafka ...

    ...from Spark Streaming.
    
    This adds a library that can writes dstreams to Kafka. An implicit also has been added so users
    can call dstream.writeToKafka(..)

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/harishreedharan/spark Kafka-output

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/2994.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2994
    
----
commit f61d82f3152f8e6cf758fa349c8198289e0deae8
Author: Hari Shreedharan <hs...@apache.org>
Date:   2014-10-29T06:06:33Z

    [SPARK-4122][STREAMING] Add a library that can write data back to Kafka from Spark Streaming.
    
    This adds a library that can writes dstreams to Kafka. An implicit also has been added so users
    can call dstream.writeToKafka(..)

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4122][STREAMING] Add a library that can...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2994#issuecomment-60883766
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22434/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4122][STREAMING] Add a library that can...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/2994#issuecomment-73113758
  
    @harishreedharan Thanks for the design doc. Can you make it open for public to comment :)



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4122][STREAMING] Add a library that can...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2994#issuecomment-60883762
  
      [Test build #22434 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22434/consoleFull) for   PR 2994 at commit [`f61d82f`](https://github.com/apache/spark/commit/f61d82f3152f8e6cf758fa349c8198289e0deae8).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class KafkaWriter[T: ClassTag](@transient dstream: DStream[T]) `



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4122][STREAMING] Add a library that can...

Posted by harishreedharan <gi...@git.apache.org>.
Github user harishreedharan commented on the pull request:

    https://github.com/apache/spark/pull/2994#issuecomment-97581093
  
    I have a version of this here: https://github.com/harishreedharan/spark-streaming-kafka-output
    
    I am not sure this is going to make it to core spark, but you can use that one. At some point, I will get this populated to the cloudera mvn repo for ease of use.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4122][STREAMING] Add a library that can...

Posted by BenFradet <gi...@git.apache.org>.
Github user BenFradet commented on the pull request:

    https://github.com/apache/spark/pull/2994#issuecomment-97577120
  
    @tdas @harishreedharan Any updates on this?
    
    Since we're incorporating [Kafka 0.8.2.1](https://github.com/apache/spark/pull/4537) and that there is a [new Producer API](http://kafka.apache.org/082/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html), there might be a need to start over.
    
    It might be interesting to think about pooling producers (as well as consumers, for that matter) also.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4122][STREAMING] Add a library that can...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2994#issuecomment-60884111
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22435/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4122][STREAMING] Add a library that can...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2994#issuecomment-61034107
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22489/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4122][STREAMING] Add a library that can...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2994#issuecomment-60879163
  
      [Test build #22434 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22434/consoleFull) for   PR 2994 at commit [`f61d82f`](https://github.com/apache/spark/commit/f61d82f3152f8e6cf758fa349c8198289e0deae8).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4122][STREAMING] Add a library that can...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/2994#issuecomment-107701172
  
    Yes, I plan to do so when I update the documentation for Spark 1.4 soon.
    
    On Mon, Jun 1, 2015 at 2:33 AM, Ben Fradet <no...@github.com> wrote:
    
    > @tdas <https://github.com/tdas>
    >
    > I was thinking we should maybe document this use case in the
    > checkpointing section of the streaming programming guide
    > <http://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing>,
    > since it's basically the same issue with broadcast variables, accumulators
    > and external RDDs.
    > What do you think?
    >
    > —
    > Reply to this email directly or view it on GitHub
    > <https://github.com/apache/spark/pull/2994#issuecomment-107380655>.
    >



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4122][STREAMING] Add a library that can...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2994#issuecomment-61045263
  
      [Test build #22512 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22512/consoleFull) for   PR 2994 at commit [`0a45f1a`](https://github.com/apache/spark/commit/0a45f1ab5ba5f9440a78e47e48b48f0321d440c1).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class KafkaWriter[T: ClassTag](@transient dstream: DStream[T]) extends Serializable with Logging `



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4122][STREAMING] Add a library that can...

Posted by harishreedharan <gi...@git.apache.org>.
Github user harishreedharan commented on the pull request:

    https://github.com/apache/spark/pull/2994#issuecomment-143944627
  
    I will get back to this one in a week or so


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4122][STREAMING] Add a library that can...

Posted by mkuthan <gi...@git.apache.org>.
Github user mkuthan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2994#discussion_r43876592
  
    --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaOutputWriter.scala ---
    @@ -0,0 +1,126 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.kafka
    +
    +import java.util.Properties
    +
    +import scala.reflect.ClassTag
    +
    +import kafka.producer.{ProducerConfig, KeyedMessage, Producer}
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.streaming.dstream.DStream
    +
    +/**
    + * Import this object in this form:
    + * {{{
    + *   import org.apache.spark.streaming.kafka.KafkaWriter._
    + * }}}
    + *
    + * Once imported, the `writeToKafka` can be called on any [[DStream]] object in this form:
    + * {{{
    + *   dstream.writeToKafka(producerConfig, f)
    + * }}}
    + */
    +object KafkaWriter {
    +  import scala.language.implicitConversions
    +  /**
    +   * This implicit method allows the user to call dstream.writeToKafka(..)
    +   * @param dstream - DStream to write to Kafka
    +   * @tparam T - The type of the DStream
    +   * @tparam K - The type of the key to serialize to
    +   * @tparam V - The type of the value to serialize to
    +   * @return
    +   */
    +  implicit def createKafkaOutputWriter[T: ClassTag, K, V](dstream: DStream[T]): KafkaWriter[T] = {
    +    new KafkaWriter[T](dstream)
    +  }
    +}
    +
    +/**
    + *
    + * This class can be used to write data to Kafka from Spark Streaming. To write data to Kafka
    + * simply `import org.apache.spark.streaming.kafka.KafkaWriter._` in your application and call
    + * `dstream.writeToKafka(producerConf, func)`
    + *
    + * Here is an example:
    + * {{{
    + * // Adding this line allows the user to call dstream.writeDStreamToKafka(..)
    + * import org.apache.spark.streaming.kafka.KafkaWriter._
    + *
    + * class ExampleWriter {
    + *   val instream = ssc.queueStream(toBe)
    + *   val producerConf = new Properties()
    + *   producerConf.put("serializer.class", "kafka.serializer.DefaultEncoder")
    + *   producerConf.put("key.serializer.class", "kafka.serializer.StringEncoder")
    + *   producerConf.put("metadata.broker.list", "kafka.example.com:5545")
    + *   producerConf.put("request.required.acks", "1")
    + *   instream.writeToKafka(producerConf,
    + *    (x: String) => new KeyedMessage[String, String]("default", null, x))
    + *   ssc.start()
    + * }
    + *
    + * }}}
    + * @param dstream - The [[DStream]] to be written to Kafka
    + *
    + */
    +class KafkaWriter[T: ClassTag](@transient dstream: DStream[T]) extends Serializable with Logging {
    +
    +  /**
    +   * To write data from a DStream to Kafka, call this function after creating the DStream. Once
    +   * the DStream is passed into this function, all data coming from the DStream is written out to
    +   * Kafka. The properties instance takes the configuration required to connect to the Kafka
    +   * brokers in the standard Kafka format. The serializerFunc is a function that converts each
    +   * element of the RDD to a Kafka [[KeyedMessage]]. This closure should be serializable - so it
    +   * should use only instances of Serializables.
    +   * @param producerConfig The configuration that can be used to connect to Kafka
    +   * @param serializerFunc The function to convert the data from the stream into Kafka
    +   *                       [[KeyedMessage]]s.
    +   * @tparam K The type of the key
    +   * @tparam V The type of the value
    +   *
    +   */
    +  def writeToKafka[K, V](producerConfig: Properties,
    +    serializerFunc: T => KeyedMessage[K, V]): Unit = {
    +
    +    // Broadcast the producer to avoid sending it every time.
    +    val broadcastedConfig = dstream.ssc.sc.broadcast(producerConfig)
    +
    +    def func = (rdd: RDD[T]) => {
    +      rdd.foreachPartition(events => {
    +        // The ForEachDStream runs the function locally on the driver.
    +        // This code can alternatively use sc.runJob, but this approach seemed cleaner.
    +        val producer: Producer[K, V] =
    +          new Producer[K, V](new ProducerConfig(broadcastedConfig.value))
    +        try {
    +          producer.send(events.map(serializerFunc).toArray: _*)
    --- End diff --
    
    Please remember that kafka producer is asynchronous. The processing should wait until the results are really published to the kafka cluster.  Please check how it is implemented in Samza:
    https://github.com/apache/samza/blob/master/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
    
    I'm not sure that we need retry policy, but at least we should check the result of the producer.send.
    
    My $.02 cents :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4122][STREAMING] Add a library that can...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2994#issuecomment-61024633
  
      [Test build #22489 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22489/consoleFull) for   PR 2994 at commit [`aa1f10d`](https://github.com/apache/spark/commit/aa1f10dc5684c3dcb647048fe5651a1c9ab79154).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4122][STREAMING] Add a library that can...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2994#issuecomment-61020918
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22480/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4122][STREAMING] Add a library that can...

Posted by harishreedharan <gi...@git.apache.org>.
Github user harishreedharan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2994#discussion_r19579681
  
    --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ProducerCache.scala ---
    @@ -0,0 +1,34 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.streaming.kafka
    +
    +object ProducerCache {
    +
    +  private var producerOpt: Option[Any] = None
    --- End diff --
    
    My understanding is that the tasks are all part of the same application - so it should be fine (we are not sharing data between applications).
    
    The Producer is thread-safe (I tested that while writing a KafkaChannel for Flume), so that should be fine.
    
    But if it is not that heavy to create per partition (it would be around once per 2 seconds or so), then I will go ahead and do that. That does have the benefit of avoiding some of the complex code meant to handle reusing the producer.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4122][STREAMING] Add a library that can...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/2994#issuecomment-97582163
  
    THAT IS A NEAT IDEA. We should have bindings for both RDD and DStream.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4122][STREAMING] Add a library that can...

Posted by BenFradet <gi...@git.apache.org>.
Github user BenFradet commented on the pull request:

    https://github.com/apache/spark/pull/2994#issuecomment-97580261
  
    Interesting, thanks a lot for sharing your findings.
    Any info regarding producers?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4122][STREAMING] Add a library that can...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/2994#issuecomment-97581420
  
    @BenFradet  That's a good point. This is dropped off my radar but I really would love to have this in. @harishreedharan Let;s plan to get this in 1.5.0 release. Ping me after 1.4.0 release so that it does not fall off my radar again. :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4122][STREAMING] Add a library that can...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2994#discussion_r19523702
  
    --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaOutputWriter.scala ---
    @@ -0,0 +1,126 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.streaming.kafka
    +
    +import java.util.Properties
    +
    +import scala.reflect.ClassTag
    +
    +import kafka.producer.{ProducerConfig, KeyedMessage, Producer}
    +
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.streaming.dstream.DStream
    +
    +/**
    + * Import this object in this form:
    + * {{{
    + *   import org.apache.spark.streaming.kafka.KafkaWriter._
    + * }}}
    + *
    + * Once imported, the `writeToKafka` can be called on any [[DStream]] object in this form:
    + * {{{
    + *   dstream.writeToKafka(producerConfig, f)
    + * }}}
    + */
    +object KafkaWriter {
    +  import scala.language.implicitConversions
    +  /**
    +   * This implicit method allows the user to call dstream.writeToKafka(..)
    +   * @param dstream - DStream to write to Kafka
    +   * @tparam T - The type of the DStream
    +   * @tparam K - The type of the key to serialize to
    +   * @tparam V - The type of the value to serialize to
    +   * @return
    +   */
    +  implicit def createKafkaOutputWriter[T: ClassTag, K, V](dstream: DStream[T]): KafkaWriter[T] = {
    +    new KafkaWriter[T](dstream)
    +  }
    +}
    +
    +/**
    + *
    + * This class can be used to write data to Kafka from Spark Streaming. To write data to Kafka
    + * simply `import org.apache.spark.streaming.kafka.KafkaWriter._` in your application and call
    + * `dstream.writeToKafka(producerConf, func)`
    + *
    + * Here is an example:
    + * {{{
    + * // Adding this line allows the user to call dstream.writeDStreamToKafka(..)
    + * import org.apache.spark.streaming.kafka.KafkaWriter._
    + *
    + * class ExampleWriter {
    + *   val instream = ssc.queueStream(toBe)
    + *   val producerConf = new Properties()
    + *   producerConf.put("serializer.class", "kafka.serializer.DefaultEncoder")
    + *   producerConf.put("key.serializer.class", "kafka.serializer.StringEncoder")
    + *   producerConf.put("metadata.broker.list", "kafka.example.com:5545")
    + *   producerConf.put("request.required.acks", "1")
    + *   instream.writeToKafka(producerConf,
    + *    (x: String) => new KeyedMessage[String, String]("default", null, x))
    + *   ssc.start()
    + * }
    + *
    + * }}}
    + * @param dstream - The [[DStream]] to be written to Kafka
    + *
    + */
    +class KafkaWriter[T: ClassTag](@transient dstream: DStream[T]) {
    +
    +  /**
    +   * To write data from a DStream to Kafka, call this function after creating the DStream. Once
    +   * the DStream is passed into this function, all data coming from the DStream is written out to
    +   * Kafka. The properties instance takes the configuration required to connect to the Kafka
    +   * brokers in the standard Kafka format. The serializerFunc is a function that converts each
    +   * element of the RDD to a Kafka [[KeyedMessage]]. This closure should be serializable - so it
    +   * should use only instances of Serializables.
    +   * @param producerConfig The configuration that can be used to connect to Kafka
    +   * @param serializerFunc The function to convert the data from the stream into Kafka
    +   *                       [[KeyedMessage]]s.
    +   * @tparam K The type of the key
    +   * @tparam V The type of the value
    +   *
    +   */
    +  def writeToKafka[K, V](producerConfig: Properties,
    +    serializerFunc: T => KeyedMessage[K, V]): Unit = {
    +
    +    // Broadcast the producer to avoid sending it every time.
    +    val broadcastedConfig = dstream.ssc.sc.broadcast(producerConfig)
    +
    +    def func = (rdd: RDD[T]) => {
    +      rdd.foreachPartition(events => {
    +        // The ForEachDStream runs the function locally on the driver. So the
    +        // ProducerCache from the driver is likely to get serialized and
    +        // sent, which is fine - because at that point the Producer itself is
    +        // not initialized, so a None is sent over the wire.
    +        // Get the producer from that local executor and write!
    +        val producer: Producer[K, V] = {
    +          if (ProducerCache.isCached) {
    --- End diff --
    
    Besides from my understanding I think this `if (...) else (...)` code may has concurrency issue, it would be better to change `ProducerCache` into a singleton factory.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4122][STREAMING] Add a library that can...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/2994#issuecomment-68116052
  
    @harishreedharan Since this feature involves a public API, it requires a design doc and some discussion.  Could you make one, so that a few us can take a look and discuss the naming scheme and other API stuff?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4122][STREAMING] Add a library that can...

Posted by BenFradet <gi...@git.apache.org>.
Github user BenFradet commented on the pull request:

    https://github.com/apache/spark/pull/2994#issuecomment-97582182
  
    @harishreedharan Thanks a lot for sharing, I will have to test that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4122][STREAMING] Add a library that can...

Posted by zzcclp <gi...@git.apache.org>.
Github user zzcclp commented on the pull request:

    https://github.com/apache/spark/pull/2994#issuecomment-153951643
  
    @harishreedharan , dose this pr release with 1.6?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4122][STREAMING] Add a library that can...

Posted by harishreedharan <gi...@git.apache.org>.
Github user harishreedharan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2994#discussion_r20106590
  
    --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaOutputWriter.scala ---
    @@ -0,0 +1,126 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.kafka
    +
    +import java.util.Properties
    +
    +import scala.reflect.ClassTag
    +
    +import kafka.producer.{ProducerConfig, KeyedMessage, Producer}
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.streaming.dstream.DStream
    +
    +/**
    + * Import this object in this form:
    + * {{{
    + *   import org.apache.spark.streaming.kafka.KafkaWriter._
    + * }}}
    + *
    + * Once imported, the `writeToKafka` can be called on any [[DStream]] object in this form:
    + * {{{
    + *   dstream.writeToKafka(producerConfig, f)
    + * }}}
    + */
    +object KafkaWriter {
    +  import scala.language.implicitConversions
    +  /**
    +   * This implicit method allows the user to call dstream.writeToKafka(..)
    +   * @param dstream - DStream to write to Kafka
    +   * @tparam T - The type of the DStream
    +   * @tparam K - The type of the key to serialize to
    +   * @tparam V - The type of the value to serialize to
    +   * @return
    +   */
    +  implicit def createKafkaOutputWriter[T: ClassTag, K, V](dstream: DStream[T]): KafkaWriter[T] = {
    +    new KafkaWriter[T](dstream)
    +  }
    +}
    +
    +/**
    + *
    + * This class can be used to write data to Kafka from Spark Streaming. To write data to Kafka
    + * simply `import org.apache.spark.streaming.kafka.KafkaWriter._` in your application and call
    + * `dstream.writeToKafka(producerConf, func)`
    + *
    + * Here is an example:
    + * {{{
    + * // Adding this line allows the user to call dstream.writeDStreamToKafka(..)
    + * import org.apache.spark.streaming.kafka.KafkaWriter._
    + *
    + * class ExampleWriter {
    + *   val instream = ssc.queueStream(toBe)
    + *   val producerConf = new Properties()
    + *   producerConf.put("serializer.class", "kafka.serializer.DefaultEncoder")
    + *   producerConf.put("key.serializer.class", "kafka.serializer.StringEncoder")
    + *   producerConf.put("metadata.broker.list", "kafka.example.com:5545")
    + *   producerConf.put("request.required.acks", "1")
    + *   instream.writeToKafka(producerConf,
    + *    (x: String) => new KeyedMessage[String, String]("default", null, x))
    + *   ssc.start()
    + * }
    + *
    + * }}}
    + * @param dstream - The [[DStream]] to be written to Kafka
    + *
    + */
    +class KafkaWriter[T: ClassTag](@transient dstream: DStream[T]) extends Serializable with Logging {
    +
    +  /**
    +   * To write data from a DStream to Kafka, call this function after creating the DStream. Once
    +   * the DStream is passed into this function, all data coming from the DStream is written out to
    +   * Kafka. The properties instance takes the configuration required to connect to the Kafka
    +   * brokers in the standard Kafka format. The serializerFunc is a function that converts each
    +   * element of the RDD to a Kafka [[KeyedMessage]]. This closure should be serializable - so it
    +   * should use only instances of Serializables.
    +   * @param producerConfig The configuration that can be used to connect to Kafka
    +   * @param serializerFunc The function to convert the data from the stream into Kafka
    +   *                       [[KeyedMessage]]s.
    +   * @tparam K The type of the key
    +   * @tparam V The type of the value
    +   *
    +   */
    +  def writeToKafka[K, V](producerConfig: Properties,
    +    serializerFunc: T => KeyedMessage[K, V]): Unit = {
    +
    +    // Broadcast the producer to avoid sending it every time.
    +    val broadcastedConfig = dstream.ssc.sc.broadcast(producerConfig)
    +
    +    def func = (rdd: RDD[T]) => {
    +      rdd.foreachPartition(events => {
    +        // The ForEachDStream runs the function locally on the driver.
    +        // This code can alternatively use sc.runJob, but this approach seemed cleaner.
    +        val producer: Producer[K, V] =
    --- End diff --
    
    @helena - Please see discussion earlier on this thread. For now, we can do this for simplicity, but an earlier commit does cache the Producer. If we see this as too inefficient once we run it and test it out on real use-cases, we can bring back the cached Producer code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4122][STREAMING] Add a library that can...

Posted by BenFradet <gi...@git.apache.org>.
Github user BenFradet commented on the pull request:

    https://github.com/apache/spark/pull/2994#issuecomment-107380655
  
    @tdas 
    
    I was thinking we should maybe document this use case in [the checkpointing section of the streaming programming guide](http://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing), since it's basically the same issue with broadcast variables, accumulators and external RDDs.
    What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4122][STREAMING] Add a library that can...

Posted by harishreedharan <gi...@git.apache.org>.
Github user harishreedharan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2994#discussion_r19553018
  
    --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaOutputWriter.scala ---
    @@ -0,0 +1,126 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.streaming.kafka
    +
    +import java.util.Properties
    +
    +import scala.reflect.ClassTag
    +
    +import kafka.producer.{ProducerConfig, KeyedMessage, Producer}
    +
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.streaming.dstream.DStream
    +
    +/**
    + * Import this object in this form:
    + * {{{
    + *   import org.apache.spark.streaming.kafka.KafkaWriter._
    + * }}}
    + *
    + * Once imported, the `writeToKafka` can be called on any [[DStream]] object in this form:
    + * {{{
    + *   dstream.writeToKafka(producerConfig, f)
    + * }}}
    + */
    +object KafkaWriter {
    +  import scala.language.implicitConversions
    +  /**
    +   * This implicit method allows the user to call dstream.writeToKafka(..)
    +   * @param dstream - DStream to write to Kafka
    +   * @tparam T - The type of the DStream
    +   * @tparam K - The type of the key to serialize to
    +   * @tparam V - The type of the value to serialize to
    +   * @return
    +   */
    +  implicit def createKafkaOutputWriter[T: ClassTag, K, V](dstream: DStream[T]): KafkaWriter[T] = {
    +    new KafkaWriter[T](dstream)
    +  }
    +}
    +
    +/**
    + *
    + * This class can be used to write data to Kafka from Spark Streaming. To write data to Kafka
    + * simply `import org.apache.spark.streaming.kafka.KafkaWriter._` in your application and call
    + * `dstream.writeToKafka(producerConf, func)`
    + *
    + * Here is an example:
    + * {{{
    + * // Adding this line allows the user to call dstream.writeDStreamToKafka(..)
    + * import org.apache.spark.streaming.kafka.KafkaWriter._
    + *
    + * class ExampleWriter {
    + *   val instream = ssc.queueStream(toBe)
    + *   val producerConf = new Properties()
    + *   producerConf.put("serializer.class", "kafka.serializer.DefaultEncoder")
    + *   producerConf.put("key.serializer.class", "kafka.serializer.StringEncoder")
    + *   producerConf.put("metadata.broker.list", "kafka.example.com:5545")
    + *   producerConf.put("request.required.acks", "1")
    + *   instream.writeToKafka(producerConf,
    + *    (x: String) => new KeyedMessage[String, String]("default", null, x))
    + *   ssc.start()
    + * }
    + *
    + * }}}
    + * @param dstream - The [[DStream]] to be written to Kafka
    + *
    + */
    +class KafkaWriter[T: ClassTag](@transient dstream: DStream[T]) {
    +
    +  /**
    +   * To write data from a DStream to Kafka, call this function after creating the DStream. Once
    +   * the DStream is passed into this function, all data coming from the DStream is written out to
    +   * Kafka. The properties instance takes the configuration required to connect to the Kafka
    +   * brokers in the standard Kafka format. The serializerFunc is a function that converts each
    +   * element of the RDD to a Kafka [[KeyedMessage]]. This closure should be serializable - so it
    +   * should use only instances of Serializables.
    +   * @param producerConfig The configuration that can be used to connect to Kafka
    +   * @param serializerFunc The function to convert the data from the stream into Kafka
    +   *                       [[KeyedMessage]]s.
    +   * @tparam K The type of the key
    +   * @tparam V The type of the value
    +   *
    +   */
    +  def writeToKafka[K, V](producerConfig: Properties,
    +    serializerFunc: T => KeyedMessage[K, V]): Unit = {
    +
    +    // Broadcast the producer to avoid sending it every time.
    +    val broadcastedConfig = dstream.ssc.sc.broadcast(producerConfig)
    +
    +    def func = (rdd: RDD[T]) => {
    +      rdd.foreachPartition(events => {
    +        // The ForEachDStream runs the function locally on the driver. So the
    +        // ProducerCache from the driver is likely to get serialized and
    +        // sent, which is fine - because at that point the Producer itself is
    +        // not initialized, so a None is sent over the wire.
    +        // Get the producer from that local executor and write!
    +        val producer: Producer[K, V] = {
    +          if (ProducerCache.isCached) {
    --- End diff --
    
    The idea is to not create a new Producer instance for each partition. Instead, use the instance of a producer already available on that executor. The drawback is this slightly complex caching mechanism - while the advantage is to reduce the overhead of creating a new producer instance and connecting to Kafka etc. 
    
    I am not too familiar with the internals of Kafka - so erring on the side of caution.
    
    You are right in the fact that there is a concurrency issue here - I will fix that in an update later today.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4122][STREAMING] Add a library that can...

Posted by harishreedharan <gi...@git.apache.org>.
Github user harishreedharan commented on the pull request:

    https://github.com/apache/spark/pull/2994#issuecomment-68764128
  
    Sure. I will work on a short design doc soon


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4122][STREAMING] Add a library that can...

Posted by harishreedharan <gi...@git.apache.org>.
Github user harishreedharan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2994#discussion_r20108077
  
    --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaOutputWriter.scala ---
    @@ -0,0 +1,126 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.kafka
    +
    +import java.util.Properties
    +
    +import scala.reflect.ClassTag
    +
    +import kafka.producer.{ProducerConfig, KeyedMessage, Producer}
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.streaming.dstream.DStream
    +
    +/**
    + * Import this object in this form:
    + * {{{
    + *   import org.apache.spark.streaming.kafka.KafkaWriter._
    + * }}}
    + *
    + * Once imported, the `writeToKafka` can be called on any [[DStream]] object in this form:
    + * {{{
    + *   dstream.writeToKafka(producerConfig, f)
    + * }}}
    + */
    +object KafkaWriter {
    +  import scala.language.implicitConversions
    +  /**
    +   * This implicit method allows the user to call dstream.writeToKafka(..)
    +   * @param dstream - DStream to write to Kafka
    +   * @tparam T - The type of the DStream
    +   * @tparam K - The type of the key to serialize to
    +   * @tparam V - The type of the value to serialize to
    +   * @return
    +   */
    +  implicit def createKafkaOutputWriter[T: ClassTag, K, V](dstream: DStream[T]): KafkaWriter[T] = {
    +    new KafkaWriter[T](dstream)
    +  }
    +}
    +
    +/**
    + *
    + * This class can be used to write data to Kafka from Spark Streaming. To write data to Kafka
    + * simply `import org.apache.spark.streaming.kafka.KafkaWriter._` in your application and call
    + * `dstream.writeToKafka(producerConf, func)`
    + *
    + * Here is an example:
    + * {{{
    + * // Adding this line allows the user to call dstream.writeDStreamToKafka(..)
    + * import org.apache.spark.streaming.kafka.KafkaWriter._
    + *
    + * class ExampleWriter {
    + *   val instream = ssc.queueStream(toBe)
    + *   val producerConf = new Properties()
    + *   producerConf.put("serializer.class", "kafka.serializer.DefaultEncoder")
    + *   producerConf.put("key.serializer.class", "kafka.serializer.StringEncoder")
    + *   producerConf.put("metadata.broker.list", "kafka.example.com:5545")
    + *   producerConf.put("request.required.acks", "1")
    + *   instream.writeToKafka(producerConf,
    + *    (x: String) => new KeyedMessage[String, String]("default", null, x))
    + *   ssc.start()
    + * }
    + *
    + * }}}
    + * @param dstream - The [[DStream]] to be written to Kafka
    + *
    + */
    +class KafkaWriter[T: ClassTag](@transient dstream: DStream[T]) extends Serializable with Logging {
    +
    +  /**
    +   * To write data from a DStream to Kafka, call this function after creating the DStream. Once
    +   * the DStream is passed into this function, all data coming from the DStream is written out to
    +   * Kafka. The properties instance takes the configuration required to connect to the Kafka
    +   * brokers in the standard Kafka format. The serializerFunc is a function that converts each
    +   * element of the RDD to a Kafka [[KeyedMessage]]. This closure should be serializable - so it
    +   * should use only instances of Serializables.
    +   * @param producerConfig The configuration that can be used to connect to Kafka
    +   * @param serializerFunc The function to convert the data from the stream into Kafka
    +   *                       [[KeyedMessage]]s.
    +   * @tparam K The type of the key
    +   * @tparam V The type of the value
    +   *
    +   */
    +  def writeToKafka[K, V](producerConfig: Properties,
    +    serializerFunc: T => KeyedMessage[K, V]): Unit = {
    +
    +    // Broadcast the producer to avoid sending it every time.
    +    val broadcastedConfig = dstream.ssc.sc.broadcast(producerConfig)
    +
    +    def func = (rdd: RDD[T]) => {
    +      rdd.foreachPartition(events => {
    +        // The ForEachDStream runs the function locally on the driver.
    +        // This code can alternatively use sc.runJob, but this approach seemed cleaner.
    +        val producer: Producer[K, V] =
    --- End diff --
    
    The broadcast is only for the configuration -- to avoid shipping that with each task in the closure (since that is reused). Do you see an alternate way of doing it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4122][STREAMING] Add a library that can...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2994#discussion_r19523818
  
    --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ProducerCache.scala ---
    @@ -0,0 +1,34 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.streaming.kafka
    +
    +object ProducerCache {
    +
    +  private var producerOpt: Option[Any] = None
    --- End diff --
    
    Isn't this going to share one Producer across the entire JVM, and potentially unrelated applications?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4122][STREAMING] Add a library that can...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2994#issuecomment-61034102
  
    **[Test build #22489 timed out](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22489/consoleFull)**     for PR 2994 at commit [`aa1f10d`](https://github.com/apache/spark/commit/aa1f10dc5684c3dcb647048fe5651a1c9ab79154)     after a configured wait of `120m`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4122][STREAMING] Add a library that can...

Posted by helena <gi...@git.apache.org>.
Github user helena commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2994#discussion_r20059145
  
    --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaOutputWriter.scala ---
    @@ -0,0 +1,126 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.kafka
    +
    +import java.util.Properties
    +
    +import scala.reflect.ClassTag
    +
    +import kafka.producer.{ProducerConfig, KeyedMessage, Producer}
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.streaming.dstream.DStream
    +
    +/**
    + * Import this object in this form:
    + * {{{
    + *   import org.apache.spark.streaming.kafka.KafkaWriter._
    + * }}}
    + *
    + * Once imported, the `writeToKafka` can be called on any [[DStream]] object in this form:
    + * {{{
    + *   dstream.writeToKafka(producerConfig, f)
    + * }}}
    + */
    +object KafkaWriter {
    +  import scala.language.implicitConversions
    +  /**
    +   * This implicit method allows the user to call dstream.writeToKafka(..)
    +   * @param dstream - DStream to write to Kafka
    +   * @tparam T - The type of the DStream
    +   * @tparam K - The type of the key to serialize to
    +   * @tparam V - The type of the value to serialize to
    +   * @return
    +   */
    +  implicit def createKafkaOutputWriter[T: ClassTag, K, V](dstream: DStream[T]): KafkaWriter[T] = {
    +    new KafkaWriter[T](dstream)
    +  }
    +}
    +
    +/**
    + *
    + * This class can be used to write data to Kafka from Spark Streaming. To write data to Kafka
    + * simply `import org.apache.spark.streaming.kafka.KafkaWriter._` in your application and call
    + * `dstream.writeToKafka(producerConf, func)`
    + *
    + * Here is an example:
    + * {{{
    + * // Adding this line allows the user to call dstream.writeDStreamToKafka(..)
    + * import org.apache.spark.streaming.kafka.KafkaWriter._
    + *
    + * class ExampleWriter {
    + *   val instream = ssc.queueStream(toBe)
    + *   val producerConf = new Properties()
    + *   producerConf.put("serializer.class", "kafka.serializer.DefaultEncoder")
    + *   producerConf.put("key.serializer.class", "kafka.serializer.StringEncoder")
    + *   producerConf.put("metadata.broker.list", "kafka.example.com:5545")
    + *   producerConf.put("request.required.acks", "1")
    + *   instream.writeToKafka(producerConf,
    + *    (x: String) => new KeyedMessage[String, String]("default", null, x))
    + *   ssc.start()
    + * }
    + *
    + * }}}
    + * @param dstream - The [[DStream]] to be written to Kafka
    + *
    + */
    +class KafkaWriter[T: ClassTag](@transient dstream: DStream[T]) extends Serializable with Logging {
    +
    +  /**
    +   * To write data from a DStream to Kafka, call this function after creating the DStream. Once
    +   * the DStream is passed into this function, all data coming from the DStream is written out to
    +   * Kafka. The properties instance takes the configuration required to connect to the Kafka
    +   * brokers in the standard Kafka format. The serializerFunc is a function that converts each
    +   * element of the RDD to a Kafka [[KeyedMessage]]. This closure should be serializable - so it
    +   * should use only instances of Serializables.
    +   * @param producerConfig The configuration that can be used to connect to Kafka
    +   * @param serializerFunc The function to convert the data from the stream into Kafka
    +   *                       [[KeyedMessage]]s.
    +   * @tparam K The type of the key
    +   * @tparam V The type of the value
    +   *
    +   */
    +  def writeToKafka[K, V](producerConfig: Properties,
    +    serializerFunc: T => KeyedMessage[K, V]): Unit = {
    +
    +    // Broadcast the producer to avoid sending it every time.
    +    val broadcastedConfig = dstream.ssc.sc.broadcast(producerConfig)
    +
    +    def func = (rdd: RDD[T]) => {
    +      rdd.foreachPartition(events => {
    +        // The ForEachDStream runs the function locally on the driver.
    +        // This code can alternatively use sc.runJob, but this approach seemed cleaner.
    +        val producer: Producer[K, V] =
    --- End diff --
    
    Why create a new producer each write?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4122][STREAMING] Add a library that can...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2994#issuecomment-60879456
  
      [Test build #22435 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22435/consoleFull) for   PR 2994 at commit [`372c749`](https://github.com/apache/spark/commit/372c749458e22ba1a9acd2badbfad51e4dda3968).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4122][STREAMING] Add a library that can...

Posted by harishreedharan <gi...@git.apache.org>.
Github user harishreedharan commented on the pull request:

    https://github.com/apache/spark/pull/2994#issuecomment-61176487
  
    Yes, sure. I'd like to keep the API really simple and adding the implicit helps in that case.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4122][STREAMING] Add a library that can...

Posted by koeninger <gi...@git.apache.org>.
Github user koeninger commented on the pull request:

    https://github.com/apache/spark/pull/2994#issuecomment-97578847
  
    I have a branch of the directStream api that caches consumers.
    
    It had no noticeable impact on processing time.   Even at 100 partitions /
    200ms batches on a production-like workload.
    
    
    On Wed, Apr 29, 2015 at 3:37 PM, Ben Fradet <no...@github.com>
    wrote:
    
    > @tdas <https://github.com/tdas> @harishreedharan
    > <https://github.com/harishreedharan> Any updates on this?
    >
    > Since we're incorporating Kafka 0.8.2.1
    > <https://github.com/apache/spark/pull/4537> and that there is a new
    > Producer API
    > <http://kafka.apache.org/082/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html>,
    > there might be a need to start over.
    >
    > It might be interesting to think about pooling producers (as well as
    > consumers, for that matter) also.
    >
    > —
    > Reply to this email directly or view it on GitHub
    > <https://github.com/apache/spark/pull/2994#issuecomment-97577120>.
    >



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4122][STREAMING] Add a library that can...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2994#issuecomment-61011944
  
      [Test build #22480 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22480/consoleFull) for   PR 2994 at commit [`e6ef32f`](https://github.com/apache/spark/commit/e6ef32f54d77c706f7b36b199b58655b052111dc).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4122][STREAMING] Add a library that can...

Posted by harishreedharan <gi...@git.apache.org>.
Github user harishreedharan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2994#discussion_r19553035
  
    --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaOutputWriter.scala ---
    @@ -0,0 +1,126 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    --- End diff --
    
    Will fix.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4122][STREAMING] Add a library that can...

Posted by harishreedharan <gi...@git.apache.org>.
Github user harishreedharan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2994#discussion_r19553217
  
    --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ProducerCache.scala ---
    @@ -0,0 +1,34 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.streaming.kafka
    +
    +object ProducerCache {
    +
    +  private var producerOpt: Option[Any] = None
    --- End diff --
    
    AFAIK, each executor runs a single JVM per application. The idea is to just reuse the Producer in the JVM.
    
    I maybe wrong though (at least it seems that way in YARN mode at least - the executors are allocated when the application starts, and I don't think they are shared between applications).
    
    If that is not the case, I think we'd have to just create a new Producer per partition - which is a higher overhead though.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4122][STREAMING] Add a library that can...

Posted by harishreedharan <gi...@git.apache.org>.
Github user harishreedharan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2994#discussion_r20324513
  
    --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaOutputWriter.scala ---
    @@ -0,0 +1,126 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.kafka
    +
    +import java.util.Properties
    +
    +import scala.reflect.ClassTag
    +
    +import kafka.producer.{ProducerConfig, KeyedMessage, Producer}
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.streaming.dstream.DStream
    +
    +/**
    + * Import this object in this form:
    + * {{{
    + *   import org.apache.spark.streaming.kafka.KafkaWriter._
    + * }}}
    + *
    + * Once imported, the `writeToKafka` can be called on any [[DStream]] object in this form:
    + * {{{
    + *   dstream.writeToKafka(producerConfig, f)
    + * }}}
    + */
    +object KafkaWriter {
    +  import scala.language.implicitConversions
    +  /**
    +   * This implicit method allows the user to call dstream.writeToKafka(..)
    +   * @param dstream - DStream to write to Kafka
    +   * @tparam T - The type of the DStream
    +   * @tparam K - The type of the key to serialize to
    +   * @tparam V - The type of the value to serialize to
    +   * @return
    +   */
    +  implicit def createKafkaOutputWriter[T: ClassTag, K, V](dstream: DStream[T]): KafkaWriter[T] = {
    +    new KafkaWriter[T](dstream)
    +  }
    +}
    +
    +/**
    + *
    + * This class can be used to write data to Kafka from Spark Streaming. To write data to Kafka
    + * simply `import org.apache.spark.streaming.kafka.KafkaWriter._` in your application and call
    + * `dstream.writeToKafka(producerConf, func)`
    + *
    + * Here is an example:
    + * {{{
    + * // Adding this line allows the user to call dstream.writeDStreamToKafka(..)
    + * import org.apache.spark.streaming.kafka.KafkaWriter._
    + *
    + * class ExampleWriter {
    + *   val instream = ssc.queueStream(toBe)
    + *   val producerConf = new Properties()
    + *   producerConf.put("serializer.class", "kafka.serializer.DefaultEncoder")
    + *   producerConf.put("key.serializer.class", "kafka.serializer.StringEncoder")
    + *   producerConf.put("metadata.broker.list", "kafka.example.com:5545")
    + *   producerConf.put("request.required.acks", "1")
    + *   instream.writeToKafka(producerConf,
    + *    (x: String) => new KeyedMessage[String, String]("default", null, x))
    + *   ssc.start()
    + * }
    + *
    + * }}}
    + * @param dstream - The [[DStream]] to be written to Kafka
    + *
    + */
    +class KafkaWriter[T: ClassTag](@transient dstream: DStream[T]) extends Serializable with Logging {
    +
    +  /**
    +   * To write data from a DStream to Kafka, call this function after creating the DStream. Once
    +   * the DStream is passed into this function, all data coming from the DStream is written out to
    +   * Kafka. The properties instance takes the configuration required to connect to the Kafka
    +   * brokers in the standard Kafka format. The serializerFunc is a function that converts each
    +   * element of the RDD to a Kafka [[KeyedMessage]]. This closure should be serializable - so it
    +   * should use only instances of Serializables.
    +   * @param producerConfig The configuration that can be used to connect to Kafka
    +   * @param serializerFunc The function to convert the data from the stream into Kafka
    +   *                       [[KeyedMessage]]s.
    +   * @tparam K The type of the key
    +   * @tparam V The type of the value
    +   *
    +   */
    +  def writeToKafka[K, V](producerConfig: Properties,
    +    serializerFunc: T => KeyedMessage[K, V]): Unit = {
    +
    +    // Broadcast the producer to avoid sending it every time.
    +    val broadcastedConfig = dstream.ssc.sc.broadcast(producerConfig)
    +
    +    def func = (rdd: RDD[T]) => {
    +      rdd.foreachPartition(events => {
    +        // The ForEachDStream runs the function locally on the driver.
    +        // This code can alternatively use sc.runJob, but this approach seemed cleaner.
    +        val producer: Producer[K, V] =
    --- End diff --
    
    Also, this is likely going in after 1.2 -- so we can wait for you to get back and discuss this further.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4122][STREAMING] Add a library that can...

Posted by harishreedharan <gi...@git.apache.org>.
Github user harishreedharan commented on the pull request:

    https://github.com/apache/spark/pull/2994#issuecomment-97581484
  
    If there is interest, I might change the code to do writes from an RDD and use the RDD as the basis for the stream (a la Direct Kafka Stream - @koeninger route)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4122][STREAMING] Add a library that can...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2994#issuecomment-61045266
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22512/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4122][STREAMING] Add a library that can...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2994#issuecomment-61041536
  
      [Test build #22512 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22512/consoleFull) for   PR 2994 at commit [`0a45f1a`](https://github.com/apache/spark/commit/0a45f1ab5ba5f9440a78e47e48b48f0321d440c1).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4122][STREAMING] Add a library that can...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on the pull request:

    https://github.com/apache/spark/pull/2994#issuecomment-136910279
  
    @harishreedharan any updates on this? Could you rebase to master?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4122][STREAMING] Add a library that can...

Posted by WangTaoTheTonic <gi...@git.apache.org>.
Github user WangTaoTheTonic commented on the pull request:

    https://github.com/apache/spark/pull/2994#issuecomment-143799798
  
    Hi @harishreedharan, I think this is a nice feature which is very helpful for user who tries to write DStream back into Kafka. The implement is very neat too.
    
    Could you please rebase the code so that we have the opptunity to push it into spark?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4122][STREAMING] Add a library that can...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2994#issuecomment-61020908
  
      [Test build #22480 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22480/consoleFull) for   PR 2994 at commit [`e6ef32f`](https://github.com/apache/spark/commit/e6ef32f54d77c706f7b36b199b58655b052111dc).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class KafkaWriter[T: ClassTag](@transient dstream: DStream[T]) `



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4122][STREAMING] Add a library that can...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/2994#issuecomment-61173172
  
    Hey @harishreedharan, 
    
    This is a good contribution! However, I think we need take a call on the right API, and go through a proper design discussion regarding this. Lets come back to this after Spark 1.2 release craze is over.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4122][STREAMING] Add a library that can...

Posted by BenFradet <gi...@git.apache.org>.
Github user BenFradet commented on the pull request:

    https://github.com/apache/spark/pull/2994#issuecomment-108002092
  
    Great!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4122][STREAMING] Add a library that can...

Posted by helena <gi...@git.apache.org>.
Github user helena commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2994#discussion_r20160540
  
    --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaOutputWriter.scala ---
    @@ -0,0 +1,126 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.kafka
    +
    +import java.util.Properties
    +
    +import scala.reflect.ClassTag
    +
    +import kafka.producer.{ProducerConfig, KeyedMessage, Producer}
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.streaming.dstream.DStream
    +
    +/**
    + * Import this object in this form:
    + * {{{
    + *   import org.apache.spark.streaming.kafka.KafkaWriter._
    + * }}}
    + *
    + * Once imported, the `writeToKafka` can be called on any [[DStream]] object in this form:
    + * {{{
    + *   dstream.writeToKafka(producerConfig, f)
    + * }}}
    + */
    +object KafkaWriter {
    +  import scala.language.implicitConversions
    +  /**
    +   * This implicit method allows the user to call dstream.writeToKafka(..)
    +   * @param dstream - DStream to write to Kafka
    +   * @tparam T - The type of the DStream
    +   * @tparam K - The type of the key to serialize to
    +   * @tparam V - The type of the value to serialize to
    +   * @return
    +   */
    +  implicit def createKafkaOutputWriter[T: ClassTag, K, V](dstream: DStream[T]): KafkaWriter[T] = {
    +    new KafkaWriter[T](dstream)
    +  }
    +}
    +
    +/**
    + *
    + * This class can be used to write data to Kafka from Spark Streaming. To write data to Kafka
    + * simply `import org.apache.spark.streaming.kafka.KafkaWriter._` in your application and call
    + * `dstream.writeToKafka(producerConf, func)`
    + *
    + * Here is an example:
    + * {{{
    + * // Adding this line allows the user to call dstream.writeDStreamToKafka(..)
    + * import org.apache.spark.streaming.kafka.KafkaWriter._
    + *
    + * class ExampleWriter {
    + *   val instream = ssc.queueStream(toBe)
    + *   val producerConf = new Properties()
    + *   producerConf.put("serializer.class", "kafka.serializer.DefaultEncoder")
    + *   producerConf.put("key.serializer.class", "kafka.serializer.StringEncoder")
    + *   producerConf.put("metadata.broker.list", "kafka.example.com:5545")
    + *   producerConf.put("request.required.acks", "1")
    + *   instream.writeToKafka(producerConf,
    + *    (x: String) => new KeyedMessage[String, String]("default", null, x))
    + *   ssc.start()
    + * }
    + *
    + * }}}
    + * @param dstream - The [[DStream]] to be written to Kafka
    + *
    + */
    +class KafkaWriter[T: ClassTag](@transient dstream: DStream[T]) extends Serializable with Logging {
    +
    +  /**
    +   * To write data from a DStream to Kafka, call this function after creating the DStream. Once
    +   * the DStream is passed into this function, all data coming from the DStream is written out to
    +   * Kafka. The properties instance takes the configuration required to connect to the Kafka
    +   * brokers in the standard Kafka format. The serializerFunc is a function that converts each
    +   * element of the RDD to a Kafka [[KeyedMessage]]. This closure should be serializable - so it
    +   * should use only instances of Serializables.
    +   * @param producerConfig The configuration that can be used to connect to Kafka
    +   * @param serializerFunc The function to convert the data from the stream into Kafka
    +   *                       [[KeyedMessage]]s.
    +   * @tparam K The type of the key
    +   * @tparam V The type of the value
    +   *
    +   */
    +  def writeToKafka[K, V](producerConfig: Properties,
    +    serializerFunc: T => KeyedMessage[K, V]): Unit = {
    +
    +    // Broadcast the producer to avoid sending it every time.
    +    val broadcastedConfig = dstream.ssc.sc.broadcast(producerConfig)
    +
    +    def func = (rdd: RDD[T]) => {
    +      rdd.foreachPartition(events => {
    +        // The ForEachDStream runs the function locally on the driver.
    +        // This code can alternatively use sc.runJob, but this approach seemed cleaner.
    +        val producer: Producer[K, V] =
    --- End diff --
    
    I do, but it is a completely different design of doing this. I'm working on something that allows writing to kafka from DStream and RDD but i'm headed to talk at a conference and won't be able to get back to this code until next week. Getting this functionality in asap is important :) so I'm glad you are doing this.
    I might suggest 'writeToKafka' be changed to 'saveToKafka' for api naming convention reasons.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4122][STREAMING] Add a library that can...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on the pull request:

    https://github.com/apache/spark/pull/2994#issuecomment-61033489
  
    Hi Hari, can you assure that `Producer` is thread-safe? 
    
    Besides I have one concern, the overhead of creating `Producer` compared to the time cost for writing partition to Kafka. Since RDD in DStream sometimes may have large number partitions (`batchInterval / blockInterval`), and the size of each partition is not so large (depending on the input stream).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4122][STREAMING] Add a library that can...

Posted by mkuthan <gi...@git.apache.org>.
Github user mkuthan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2994#discussion_r43876334
  
    --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaOutputWriter.scala ---
    @@ -0,0 +1,126 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.kafka
    +
    +import java.util.Properties
    +
    +import scala.reflect.ClassTag
    +
    +import kafka.producer.{ProducerConfig, KeyedMessage, Producer}
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.streaming.dstream.DStream
    +
    +/**
    + * Import this object in this form:
    + * {{{
    + *   import org.apache.spark.streaming.kafka.KafkaWriter._
    + * }}}
    + *
    + * Once imported, the `writeToKafka` can be called on any [[DStream]] object in this form:
    + * {{{
    + *   dstream.writeToKafka(producerConfig, f)
    + * }}}
    + */
    +object KafkaWriter {
    +  import scala.language.implicitConversions
    +  /**
    +   * This implicit method allows the user to call dstream.writeToKafka(..)
    +   * @param dstream - DStream to write to Kafka
    +   * @tparam T - The type of the DStream
    +   * @tparam K - The type of the key to serialize to
    +   * @tparam V - The type of the value to serialize to
    +   * @return
    +   */
    +  implicit def createKafkaOutputWriter[T: ClassTag, K, V](dstream: DStream[T]): KafkaWriter[T] = {
    +    new KafkaWriter[T](dstream)
    +  }
    +}
    +
    +/**
    + *
    + * This class can be used to write data to Kafka from Spark Streaming. To write data to Kafka
    + * simply `import org.apache.spark.streaming.kafka.KafkaWriter._` in your application and call
    + * `dstream.writeToKafka(producerConf, func)`
    + *
    + * Here is an example:
    + * {{{
    + * // Adding this line allows the user to call dstream.writeDStreamToKafka(..)
    + * import org.apache.spark.streaming.kafka.KafkaWriter._
    + *
    + * class ExampleWriter {
    + *   val instream = ssc.queueStream(toBe)
    + *   val producerConf = new Properties()
    + *   producerConf.put("serializer.class", "kafka.serializer.DefaultEncoder")
    + *   producerConf.put("key.serializer.class", "kafka.serializer.StringEncoder")
    + *   producerConf.put("metadata.broker.list", "kafka.example.com:5545")
    + *   producerConf.put("request.required.acks", "1")
    + *   instream.writeToKafka(producerConf,
    + *    (x: String) => new KeyedMessage[String, String]("default", null, x))
    + *   ssc.start()
    + * }
    + *
    + * }}}
    + * @param dstream - The [[DStream]] to be written to Kafka
    + *
    + */
    +class KafkaWriter[T: ClassTag](@transient dstream: DStream[T]) extends Serializable with Logging {
    +
    +  /**
    +   * To write data from a DStream to Kafka, call this function after creating the DStream. Once
    +   * the DStream is passed into this function, all data coming from the DStream is written out to
    +   * Kafka. The properties instance takes the configuration required to connect to the Kafka
    +   * brokers in the standard Kafka format. The serializerFunc is a function that converts each
    +   * element of the RDD to a Kafka [[KeyedMessage]]. This closure should be serializable - so it
    +   * should use only instances of Serializables.
    +   * @param producerConfig The configuration that can be used to connect to Kafka
    +   * @param serializerFunc The function to convert the data from the stream into Kafka
    +   *                       [[KeyedMessage]]s.
    +   * @tparam K The type of the key
    +   * @tparam V The type of the value
    +   *
    +   */
    +  def writeToKafka[K, V](producerConfig: Properties,
    +    serializerFunc: T => KeyedMessage[K, V]): Unit = {
    +
    +    // Broadcast the producer to avoid sending it every time.
    +    val broadcastedConfig = dstream.ssc.sc.broadcast(producerConfig)
    +
    +    def func = (rdd: RDD[T]) => {
    +      rdd.foreachPartition(events => {
    +        // The ForEachDStream runs the function locally on the driver.
    +        // This code can alternatively use sc.runJob, but this approach seemed cleaner.
    +        val producer: Producer[K, V] =
    --- End diff --
    
    You could consider share kafkaproducer per JVM/executor as long as producer is thread safe. I'm affraid that new producer for every partition will be main bottleneck in real-word scenarios.
    Please refer to this blog post: http://allegro.tech/2015/08/spark-kafka-integration.html


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4122][STREAMING] Add a library that can...

Posted by harishreedharan <gi...@git.apache.org>.
Github user harishreedharan commented on the pull request:

    https://github.com/apache/spark/pull/2994#issuecomment-69281819
  
    Here is a design doc: https://docs.google.com/document/d/1jy160SNSJyrkck7rM9IFrFxa3OFQbPwv3S-pH0rGyjU/edit?usp=sharing


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4122][STREAMING] Add a library that can...

Posted by harishreedharan <gi...@git.apache.org>.
Github user harishreedharan commented on the pull request:

    https://github.com/apache/spark/pull/2994#issuecomment-108008961
  
    I will get around to this in a few days.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4122][STREAMING] Add a library that can...

Posted by harishreedharan <gi...@git.apache.org>.
Github user harishreedharan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2994#discussion_r20174262
  
    --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaOutputWriter.scala ---
    @@ -0,0 +1,126 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.kafka
    +
    +import java.util.Properties
    +
    +import scala.reflect.ClassTag
    +
    +import kafka.producer.{ProducerConfig, KeyedMessage, Producer}
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.streaming.dstream.DStream
    +
    +/**
    + * Import this object in this form:
    + * {{{
    + *   import org.apache.spark.streaming.kafka.KafkaWriter._
    + * }}}
    + *
    + * Once imported, the `writeToKafka` can be called on any [[DStream]] object in this form:
    + * {{{
    + *   dstream.writeToKafka(producerConfig, f)
    + * }}}
    + */
    +object KafkaWriter {
    +  import scala.language.implicitConversions
    +  /**
    +   * This implicit method allows the user to call dstream.writeToKafka(..)
    +   * @param dstream - DStream to write to Kafka
    +   * @tparam T - The type of the DStream
    +   * @tparam K - The type of the key to serialize to
    +   * @tparam V - The type of the value to serialize to
    +   * @return
    +   */
    +  implicit def createKafkaOutputWriter[T: ClassTag, K, V](dstream: DStream[T]): KafkaWriter[T] = {
    +    new KafkaWriter[T](dstream)
    +  }
    +}
    +
    +/**
    + *
    + * This class can be used to write data to Kafka from Spark Streaming. To write data to Kafka
    + * simply `import org.apache.spark.streaming.kafka.KafkaWriter._` in your application and call
    + * `dstream.writeToKafka(producerConf, func)`
    + *
    + * Here is an example:
    + * {{{
    + * // Adding this line allows the user to call dstream.writeDStreamToKafka(..)
    + * import org.apache.spark.streaming.kafka.KafkaWriter._
    + *
    + * class ExampleWriter {
    + *   val instream = ssc.queueStream(toBe)
    + *   val producerConf = new Properties()
    + *   producerConf.put("serializer.class", "kafka.serializer.DefaultEncoder")
    + *   producerConf.put("key.serializer.class", "kafka.serializer.StringEncoder")
    + *   producerConf.put("metadata.broker.list", "kafka.example.com:5545")
    + *   producerConf.put("request.required.acks", "1")
    + *   instream.writeToKafka(producerConf,
    + *    (x: String) => new KeyedMessage[String, String]("default", null, x))
    + *   ssc.start()
    + * }
    + *
    + * }}}
    + * @param dstream - The [[DStream]] to be written to Kafka
    + *
    + */
    +class KafkaWriter[T: ClassTag](@transient dstream: DStream[T]) extends Serializable with Logging {
    +
    +  /**
    +   * To write data from a DStream to Kafka, call this function after creating the DStream. Once
    +   * the DStream is passed into this function, all data coming from the DStream is written out to
    +   * Kafka. The properties instance takes the configuration required to connect to the Kafka
    +   * brokers in the standard Kafka format. The serializerFunc is a function that converts each
    +   * element of the RDD to a Kafka [[KeyedMessage]]. This closure should be serializable - so it
    +   * should use only instances of Serializables.
    +   * @param producerConfig The configuration that can be used to connect to Kafka
    +   * @param serializerFunc The function to convert the data from the stream into Kafka
    +   *                       [[KeyedMessage]]s.
    +   * @tparam K The type of the key
    +   * @tparam V The type of the value
    +   *
    +   */
    +  def writeToKafka[K, V](producerConfig: Properties,
    +    serializerFunc: T => KeyedMessage[K, V]): Unit = {
    +
    +    // Broadcast the producer to avoid sending it every time.
    +    val broadcastedConfig = dstream.ssc.sc.broadcast(producerConfig)
    +
    +    def func = (rdd: RDD[T]) => {
    +      rdd.foreachPartition(events => {
    +        // The ForEachDStream runs the function locally on the driver.
    +        // This code can alternatively use sc.runJob, but this approach seemed cleaner.
    +        val producer: Producer[K, V] =
    --- End diff --
    
    If there is a way to keep the Producer around, that is not terribly complex, it would be awesome. As far as I could see, the ProducerCache thing in an earlier version was what I could think of. 
    
    I think we could extend this patch to also write an RDD to Kafka (it can use the same semantics). Do you have a repo which I can peek at?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4122][STREAMING] Add a library that can...

Posted by harishreedharan <gi...@git.apache.org>.
Github user harishreedharan commented on the pull request:

    https://github.com/apache/spark/pull/2994#issuecomment-136950364
  
    Sure, if there is interest in this. I have a version of this on the cloudera github (which I pushed there, since I didn't see much interest in having this in spark itself). I will port it over.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4122][STREAMING] Add a library that can...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2994#discussion_r19523728
  
    --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaOutputWriter.scala ---
    @@ -0,0 +1,126 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    --- End diff --
    
    A empty line after Apache header :).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4122][STREAMING] Add a library that can...

Posted by harishreedharan <gi...@git.apache.org>.
Github user harishreedharan commented on the pull request:

    https://github.com/apache/spark/pull/2994#issuecomment-73129328
  
    Done!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4122][STREAMING] Add a library that can...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2994#issuecomment-60884103
  
      [Test build #22435 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22435/consoleFull) for   PR 2994 at commit [`372c749`](https://github.com/apache/spark/commit/372c749458e22ba1a9acd2badbfad51e4dda3968).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class KafkaWriter[T: ClassTag](@transient dstream: DStream[T]) `



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4122][STREAMING] Add a library that can...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2994#discussion_r19577422
  
    --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ProducerCache.scala ---
    @@ -0,0 +1,34 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.streaming.kafka
    +
    +object ProducerCache {
    +
    +  private var producerOpt: Option[Any] = None
    --- End diff --
    
    An executor can run multiple tasks at once though. Do they always have the same config and is the Producer thread-safe. I am not sure how much it is guaranteed that there is one executor per JVM. To me it feels safer to make a Producer per partition. They aren't that heavy to create and there aren't that many partitions relative to data size.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4122][STREAMING] Add a library that can...

Posted by harishreedharan <gi...@git.apache.org>.
Github user harishreedharan commented on the pull request:

    https://github.com/apache/spark/pull/2994#issuecomment-61040608
  
    I talked to people working on Kafka, and they assure me it is thread-safe. Also see this: 
    https://github.com/apache/flume/blob/trunk/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
    
    There is a single producer that is written to by various threads. See the corresponding test where it is written from multiple threads. I have run it in loops several times on travis, never seen a threading issue.
    
    By creating a Producer per partition, this issue is avoided anyway. For now, we can keep it simple by creating a producer per partition - if we see this is a problem, we can revert to the ProducerCache.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4122][STREAMING] Add a library that can...

Posted by helena <gi...@git.apache.org>.
Github user helena commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2994#discussion_r20107380
  
    --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaOutputWriter.scala ---
    @@ -0,0 +1,126 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.kafka
    +
    +import java.util.Properties
    +
    +import scala.reflect.ClassTag
    +
    +import kafka.producer.{ProducerConfig, KeyedMessage, Producer}
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.streaming.dstream.DStream
    +
    +/**
    + * Import this object in this form:
    + * {{{
    + *   import org.apache.spark.streaming.kafka.KafkaWriter._
    + * }}}
    + *
    + * Once imported, the `writeToKafka` can be called on any [[DStream]] object in this form:
    + * {{{
    + *   dstream.writeToKafka(producerConfig, f)
    + * }}}
    + */
    +object KafkaWriter {
    +  import scala.language.implicitConversions
    +  /**
    +   * This implicit method allows the user to call dstream.writeToKafka(..)
    +   * @param dstream - DStream to write to Kafka
    +   * @tparam T - The type of the DStream
    +   * @tparam K - The type of the key to serialize to
    +   * @tparam V - The type of the value to serialize to
    +   * @return
    +   */
    +  implicit def createKafkaOutputWriter[T: ClassTag, K, V](dstream: DStream[T]): KafkaWriter[T] = {
    +    new KafkaWriter[T](dstream)
    +  }
    +}
    +
    +/**
    + *
    + * This class can be used to write data to Kafka from Spark Streaming. To write data to Kafka
    + * simply `import org.apache.spark.streaming.kafka.KafkaWriter._` in your application and call
    + * `dstream.writeToKafka(producerConf, func)`
    + *
    + * Here is an example:
    + * {{{
    + * // Adding this line allows the user to call dstream.writeDStreamToKafka(..)
    + * import org.apache.spark.streaming.kafka.KafkaWriter._
    + *
    + * class ExampleWriter {
    + *   val instream = ssc.queueStream(toBe)
    + *   val producerConf = new Properties()
    + *   producerConf.put("serializer.class", "kafka.serializer.DefaultEncoder")
    + *   producerConf.put("key.serializer.class", "kafka.serializer.StringEncoder")
    + *   producerConf.put("metadata.broker.list", "kafka.example.com:5545")
    + *   producerConf.put("request.required.acks", "1")
    + *   instream.writeToKafka(producerConf,
    + *    (x: String) => new KeyedMessage[String, String]("default", null, x))
    + *   ssc.start()
    + * }
    + *
    + * }}}
    + * @param dstream - The [[DStream]] to be written to Kafka
    + *
    + */
    +class KafkaWriter[T: ClassTag](@transient dstream: DStream[T]) extends Serializable with Logging {
    +
    +  /**
    +   * To write data from a DStream to Kafka, call this function after creating the DStream. Once
    +   * the DStream is passed into this function, all data coming from the DStream is written out to
    +   * Kafka. The properties instance takes the configuration required to connect to the Kafka
    +   * brokers in the standard Kafka format. The serializerFunc is a function that converts each
    +   * element of the RDD to a Kafka [[KeyedMessage]]. This closure should be serializable - so it
    +   * should use only instances of Serializables.
    +   * @param producerConfig The configuration that can be used to connect to Kafka
    +   * @param serializerFunc The function to convert the data from the stream into Kafka
    +   *                       [[KeyedMessage]]s.
    +   * @tparam K The type of the key
    +   * @tparam V The type of the value
    +   *
    +   */
    +  def writeToKafka[K, V](producerConfig: Properties,
    +    serializerFunc: T => KeyedMessage[K, V]): Unit = {
    +
    +    // Broadcast the producer to avoid sending it every time.
    +    val broadcastedConfig = dstream.ssc.sc.broadcast(producerConfig)
    +
    +    def func = (rdd: RDD[T]) => {
    +      rdd.foreachPartition(events => {
    +        // The ForEachDStream runs the function locally on the driver.
    +        // This code can alternatively use sc.runJob, but this approach seemed cleaner.
    +        val producer: Producer[K, V] =
    --- End diff --
    
    Note taken but it seems quite inefficient to both broadcast and create a new Producer for each.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4122][STREAMING] Add a library that can...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2994#discussion_r19523356
  
    --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaOutputWriter.scala ---
    @@ -0,0 +1,126 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.streaming.kafka
    +
    +import java.util.Properties
    +
    +import scala.reflect.ClassTag
    +
    +import kafka.producer.{ProducerConfig, KeyedMessage, Producer}
    +
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.streaming.dstream.DStream
    +
    +/**
    + * Import this object in this form:
    + * {{{
    + *   import org.apache.spark.streaming.kafka.KafkaWriter._
    + * }}}
    + *
    + * Once imported, the `writeToKafka` can be called on any [[DStream]] object in this form:
    + * {{{
    + *   dstream.writeToKafka(producerConfig, f)
    + * }}}
    + */
    +object KafkaWriter {
    +  import scala.language.implicitConversions
    +  /**
    +   * This implicit method allows the user to call dstream.writeToKafka(..)
    +   * @param dstream - DStream to write to Kafka
    +   * @tparam T - The type of the DStream
    +   * @tparam K - The type of the key to serialize to
    +   * @tparam V - The type of the value to serialize to
    +   * @return
    +   */
    +  implicit def createKafkaOutputWriter[T: ClassTag, K, V](dstream: DStream[T]): KafkaWriter[T] = {
    +    new KafkaWriter[T](dstream)
    +  }
    +}
    +
    +/**
    + *
    + * This class can be used to write data to Kafka from Spark Streaming. To write data to Kafka
    + * simply `import org.apache.spark.streaming.kafka.KafkaWriter._` in your application and call
    + * `dstream.writeToKafka(producerConf, func)`
    + *
    + * Here is an example:
    + * {{{
    + * // Adding this line allows the user to call dstream.writeDStreamToKafka(..)
    + * import org.apache.spark.streaming.kafka.KafkaWriter._
    + *
    + * class ExampleWriter {
    + *   val instream = ssc.queueStream(toBe)
    + *   val producerConf = new Properties()
    + *   producerConf.put("serializer.class", "kafka.serializer.DefaultEncoder")
    + *   producerConf.put("key.serializer.class", "kafka.serializer.StringEncoder")
    + *   producerConf.put("metadata.broker.list", "kafka.example.com:5545")
    + *   producerConf.put("request.required.acks", "1")
    + *   instream.writeToKafka(producerConf,
    + *    (x: String) => new KeyedMessage[String, String]("default", null, x))
    + *   ssc.start()
    + * }
    + *
    + * }}}
    + * @param dstream - The [[DStream]] to be written to Kafka
    + *
    + */
    +class KafkaWriter[T: ClassTag](@transient dstream: DStream[T]) {
    +
    +  /**
    +   * To write data from a DStream to Kafka, call this function after creating the DStream. Once
    +   * the DStream is passed into this function, all data coming from the DStream is written out to
    +   * Kafka. The properties instance takes the configuration required to connect to the Kafka
    +   * brokers in the standard Kafka format. The serializerFunc is a function that converts each
    +   * element of the RDD to a Kafka [[KeyedMessage]]. This closure should be serializable - so it
    +   * should use only instances of Serializables.
    +   * @param producerConfig The configuration that can be used to connect to Kafka
    +   * @param serializerFunc The function to convert the data from the stream into Kafka
    +   *                       [[KeyedMessage]]s.
    +   * @tparam K The type of the key
    +   * @tparam V The type of the value
    +   *
    +   */
    +  def writeToKafka[K, V](producerConfig: Properties,
    +    serializerFunc: T => KeyedMessage[K, V]): Unit = {
    +
    +    // Broadcast the producer to avoid sending it every time.
    +    val broadcastedConfig = dstream.ssc.sc.broadcast(producerConfig)
    +
    +    def func = (rdd: RDD[T]) => {
    +      rdd.foreachPartition(events => {
    +        // The ForEachDStream runs the function locally on the driver. So the
    +        // ProducerCache from the driver is likely to get serialized and
    +        // sent, which is fine - because at that point the Producer itself is
    +        // not initialized, so a None is sent over the wire.
    +        // Get the producer from that local executor and write!
    +        val producer: Producer[K, V] = {
    +          if (ProducerCache.isCached) {
    --- End diff --
    
    Hi Hari, a simple question, will `Producer` be shared between each partition? IIUC your code seems reuse Producer when it is already created, is there any specific purpose to do so?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org