You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Dave Ariens <da...@blackberry.com> on 2015/10/21 20:50:39 UTC

Kafka Streaming and Filtering > 3000 partitons

Hey folks,

I have a very large number of Kafka topics (many thousands of partitions) that I want to consume, filter based on topic-specific filters, then produce back to filtered topics in Kafka.

Using the receiver-less based approach with Spark 1.4.1 (described here<https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md>) I am able to use either KafkaUtils.createDirectStream or KafkaUtils.createRDD, consume from many topics, and filter them with the same filters but I can't seem to wrap my head around how to apply topic-specific filters, or to finally produce to topic-specific destination topics.

Another point would be that I will need to checkpoint the metadata after each successful batch and set starting offsets per partition back to ZK.  I expect I can do that on the final RDDs after casting them accordingly, but if anyone has any expertise/guidance doing that and is willing to share, I'd be pretty grateful.

Re: Kafka Streaming and Filtering > 3000 partitons

Posted by varun sharma <va...@gmail.com>.
You can try something like this to filter by topic:

val kafkaStringStream = KafkaUtils.createDirectStream[.......]

//you might want to create Stream by fetching offsets from zk

kafkaStringStream.foreachRDD{rdd =>
  val topics = rdd.map(_._1).distinct().collect()
  if (topics.length > 0) {
    val rdd_value = rdd.take(10).mkString("\n.....\n")
    Logger.log(this.getClass, INFO, BaseSLog(s"Printing all feeds\n$rdd_value"))

    topics.foreach { topic =>
      val filteredRdd = rdd.collect { case (t, data) if t == topic => data }
      //do anything with this filteredRdd, like saving to data store
    }
    //update the offsets
    ZookeeperManaager.updateOffsetsinZk(rdd)
  }
}

Regards

Varun


On Thu, Oct 22, 2015 at 2:44 AM, Cody Koeninger <co...@koeninger.org> wrote:

> Yeah, that's the general idea.
>
> Regarding the question in your code comments ... The code inside of
> foreachPartition is what's running on the executor.  It wouldn't make any
> sense to try to get a partition ID before that block.
>
> On Wed, Oct 21, 2015 at 4:07 PM, Dave Ariens <da...@blackberry.com>
> wrote:
>
>> Cody,
>>
>>
>>
>> First off--thanks for your contributions and blog post, I actually linked
>> to in my original question. You'll have to forgive me as I've only been
>> using Spark and writing Scala for a few days. I'm aware that the RDD
>> partitions are 1:1 with Kafka topic partitions and you can get the offset
>> ranges.  But my understand is that the below code would need to be executed
>> after the stream has been processed.
>>
>>
>>
>> Let's say we're storing our filters in a key value map where the key is
>> the topic name, and the value is a string that a message within a partition
>> of that topic must contain to match.
>>
>>
>>
>> Is this the approach you're suggesting (using your example code)?
>>
>>
>>
>> // This would get built up on the driver, likely fetching the topic and
>> filters from ZK
>>
>> val topicFilters = Map("topic1" -> "this text must match", "topic2" ->
>> "this other text must match")
>>
>>
>>
>>
>>
>> val stream = KafkaUtils.createDirectStream(...)
>>
>>   ...
>>
>>   stream.foreachRDD { rdd =>
>>
>>     // Cast the rdd to an interface that lets us get an array of
>> OffsetRange
>>
>>     val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>>
>>
>>
>>     rdd.foreachPartition { iter =>
>>
>>       // index to get the correct offset range for the rdd partition
>> we're working on
>>
>>       val osr: OffsetRange = offsetRanges(TaskContext.get.partitionId)
>>
>>
>>
>>       // get any needed data from the offset range
>>
>>       val topic = osr.topic
>>
>>       val kafkaPartitionId = osr.partition
>>
>>       val begin = osr.fromOffset
>>
>>       val end = osr.untilOffset
>>
>>
>>
>>       // Now we know the topic name, we can filter something
>>
>>       // Or could we have referenced the topic name from
>>
>>       // offsetRanges(TaskContext.get.partitionId) earlier
>>
>>       // before we entered into stream.foreachRDD...?
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> *From:* Cody Koeninger [mailto:cody@koeninger.org]
>> *Sent:* Wednesday, October 21, 2015 3:01 PM
>> *To:* Dave Ariens
>> *Cc:* user@spark.apache.org
>> *Subject:* Re: Kafka Streaming and Filtering > 3000 partitons
>>
>>
>>
>> The rdd partitions are 1:1 with kafka topicpartitions, so you can use
>> offsets ranges to figure out which topic a given rdd partition is for and
>> proceed accordingly.  See the kafka integration guide in the spark
>> streaming docs for more details, or
>> https://github.com/koeninger/kafka-exactly-once
>>
>>
>>
>> As far as setting offsets in ZK, there's a private interface in the spark
>> codebase that would make it a little easier for you to do that.  You can
>> see that code for reference, or there's an outstanding ticket for making it
>> public https://issues.apache.org/jira/browse/SPARK-10963
>>
>>
>>
>> On Wed, Oct 21, 2015 at 1:50 PM, Dave Ariens <da...@blackberry.com>
>> wrote:
>>
>> Hey folks,
>>
>>
>>
>> I have a very large number of Kafka topics (many thousands of partitions)
>> that I want to consume, filter based on topic-specific filters, then
>> produce back to filtered topics in Kafka.
>>
>>
>>
>> Using the receiver-less based approach with Spark 1.4.1 (described here
>> <https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md>)
>> I am able to use either KafkaUtils.createDirectStream or
>> KafkaUtils.createRDD, consume from many topics, and filter them with the
>> same filters but I can't seem to wrap my head around how to apply
>> topic-specific filters, or to finally produce to topic-specific destination
>> topics.
>>
>>
>>
>> Another point would be that I will need to checkpoint the metadata after
>> each successful batch and set starting offsets per partition back to ZK.  I
>> expect I can do that on the final RDDs after casting them accordingly, but
>> if anyone has any expertise/guidance doing that and is willing to share,
>> I'd be pretty grateful.
>>
>>
>>
>
>


-- 
*VARUN SHARMA*
*Flipkart*
*Bangalore*

Re: Kafka Streaming and Filtering > 3000 partitons

Posted by Cody Koeninger <co...@koeninger.org>.
Yeah, that's the general idea.

Regarding the question in your code comments ... The code inside of
foreachPartition is what's running on the executor.  It wouldn't make any
sense to try to get a partition ID before that block.

On Wed, Oct 21, 2015 at 4:07 PM, Dave Ariens <da...@blackberry.com> wrote:

> Cody,
>
>
>
> First off--thanks for your contributions and blog post, I actually linked
> to in my original question. You'll have to forgive me as I've only been
> using Spark and writing Scala for a few days. I'm aware that the RDD
> partitions are 1:1 with Kafka topic partitions and you can get the offset
> ranges.  But my understand is that the below code would need to be executed
> after the stream has been processed.
>
>
>
> Let's say we're storing our filters in a key value map where the key is
> the topic name, and the value is a string that a message within a partition
> of that topic must contain to match.
>
>
>
> Is this the approach you're suggesting (using your example code)?
>
>
>
> // This would get built up on the driver, likely fetching the topic and
> filters from ZK
>
> val topicFilters = Map("topic1" -> "this text must match", "topic2" ->
> "this other text must match")
>
>
>
>
>
> val stream = KafkaUtils.createDirectStream(...)
>
>   ...
>
>   stream.foreachRDD { rdd =>
>
>     // Cast the rdd to an interface that lets us get an array of
> OffsetRange
>
>     val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>
>
>
>     rdd.foreachPartition { iter =>
>
>       // index to get the correct offset range for the rdd partition we're
> working on
>
>       val osr: OffsetRange = offsetRanges(TaskContext.get.partitionId)
>
>
>
>       // get any needed data from the offset range
>
>       val topic = osr.topic
>
>       val kafkaPartitionId = osr.partition
>
>       val begin = osr.fromOffset
>
>       val end = osr.untilOffset
>
>
>
>       // Now we know the topic name, we can filter something
>
>       // Or could we have referenced the topic name from
>
>       // offsetRanges(TaskContext.get.partitionId) earlier
>
>       // before we entered into stream.foreachRDD...?
>
>
>
>
>
>
>
>
>
>
>
> *From:* Cody Koeninger [mailto:cody@koeninger.org]
> *Sent:* Wednesday, October 21, 2015 3:01 PM
> *To:* Dave Ariens
> *Cc:* user@spark.apache.org
> *Subject:* Re: Kafka Streaming and Filtering > 3000 partitons
>
>
>
> The rdd partitions are 1:1 with kafka topicpartitions, so you can use
> offsets ranges to figure out which topic a given rdd partition is for and
> proceed accordingly.  See the kafka integration guide in the spark
> streaming docs for more details, or
> https://github.com/koeninger/kafka-exactly-once
>
>
>
> As far as setting offsets in ZK, there's a private interface in the spark
> codebase that would make it a little easier for you to do that.  You can
> see that code for reference, or there's an outstanding ticket for making it
> public https://issues.apache.org/jira/browse/SPARK-10963
>
>
>
> On Wed, Oct 21, 2015 at 1:50 PM, Dave Ariens <da...@blackberry.com>
> wrote:
>
> Hey folks,
>
>
>
> I have a very large number of Kafka topics (many thousands of partitions)
> that I want to consume, filter based on topic-specific filters, then
> produce back to filtered topics in Kafka.
>
>
>
> Using the receiver-less based approach with Spark 1.4.1 (described here
> <https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md>)
> I am able to use either KafkaUtils.createDirectStream or
> KafkaUtils.createRDD, consume from many topics, and filter them with the
> same filters but I can't seem to wrap my head around how to apply
> topic-specific filters, or to finally produce to topic-specific destination
> topics.
>
>
>
> Another point would be that I will need to checkpoint the metadata after
> each successful batch and set starting offsets per partition back to ZK.  I
> expect I can do that on the final RDDs after casting them accordingly, but
> if anyone has any expertise/guidance doing that and is willing to share,
> I'd be pretty grateful.
>
>
>

RE: Kafka Streaming and Filtering > 3000 partitons

Posted by Dave Ariens <da...@blackberry.com>.
Cody,

First off--thanks for your contributions and blog post, I actually linked to in my original question. You'll have to forgive me as I've only been using Spark and writing Scala for a few days. I'm aware that the RDD partitions are 1:1 with Kafka topic partitions and you can get the offset ranges.  But my understand is that the below code would need to be executed after the stream has been processed.

Let's say we're storing our filters in a key value map where the key is the topic name, and the value is a string that a message within a partition of that topic must contain to match.

Is this the approach you're suggesting (using your example code)?

// This would get built up on the driver, likely fetching the topic and filters from ZK
val topicFilters = Map("topic1" -> "this text must match", "topic2" -> "this other text must match")


val stream = KafkaUtils.createDirectStream(...)
  ...
  stream.foreachRDD { rdd =>
    // Cast the rdd to an interface that lets us get an array of OffsetRange
    val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

    rdd.foreachPartition { iter =>
      // index to get the correct offset range for the rdd partition we're working on
      val osr: OffsetRange = offsetRanges(TaskContext.get.partitionId)

      // get any needed data from the offset range
      val topic = osr.topic
      val kafkaPartitionId = osr.partition
      val begin = osr.fromOffset
      val end = osr.untilOffset

      // Now we know the topic name, we can filter something
      // Or could we have referenced the topic name from
      // offsetRanges(TaskContext.get.partitionId) earlier
      // before we entered into stream.foreachRDD...?





From: Cody Koeninger [mailto:cody@koeninger.org]
Sent: Wednesday, October 21, 2015 3:01 PM
To: Dave Ariens
Cc: user@spark.apache.org
Subject: Re: Kafka Streaming and Filtering > 3000 partitons

The rdd partitions are 1:1 with kafka topicpartitions, so you can use offsets ranges to figure out which topic a given rdd partition is for and proceed accordingly.  See the kafka integration guide in the spark streaming docs for more details, or  https://github.com/koeninger/kafka-exactly-once

As far as setting offsets in ZK, there's a private interface in the spark codebase that would make it a little easier for you to do that.  You can see that code for reference, or there's an outstanding ticket for making it public https://issues.apache.org/jira/browse/SPARK-10963

On Wed, Oct 21, 2015 at 1:50 PM, Dave Ariens <da...@blackberry.com>> wrote:
Hey folks,

I have a very large number of Kafka topics (many thousands of partitions) that I want to consume, filter based on topic-specific filters, then produce back to filtered topics in Kafka.

Using the receiver-less based approach with Spark 1.4.1 (described here<https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md>) I am able to use either KafkaUtils.createDirectStream or KafkaUtils.createRDD, consume from many topics, and filter them with the same filters but I can't seem to wrap my head around how to apply topic-specific filters, or to finally produce to topic-specific destination topics.

Another point would be that I will need to checkpoint the metadata after each successful batch and set starting offsets per partition back to ZK.  I expect I can do that on the final RDDs after casting them accordingly, but if anyone has any expertise/guidance doing that and is willing to share, I'd be pretty grateful.


Re: Kafka Streaming and Filtering > 3000 partitons

Posted by Cody Koeninger <co...@koeninger.org>.
The rdd partitions are 1:1 with kafka topicpartitions, so you can use
offsets ranges to figure out which topic a given rdd partition is for and
proceed accordingly.  See the kafka integration guide in the spark
streaming docs for more details, or
https://github.com/koeninger/kafka-exactly-once

As far as setting offsets in ZK, there's a private interface in the spark
codebase that would make it a little easier for you to do that.  You can
see that code for reference, or there's an outstanding ticket for making it
public https://issues.apache.org/jira/browse/SPARK-10963

On Wed, Oct 21, 2015 at 1:50 PM, Dave Ariens <da...@blackberry.com> wrote:

> Hey folks,
>
>
>
> I have a very large number of Kafka topics (many thousands of partitions)
> that I want to consume, filter based on topic-specific filters, then
> produce back to filtered topics in Kafka.
>
>
>
> Using the receiver-less based approach with Spark 1.4.1 (described here
> <https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md>)
> I am able to use either KafkaUtils.createDirectStream or
> KafkaUtils.createRDD, consume from many topics, and filter them with the
> same filters but I can't seem to wrap my head around how to apply
> topic-specific filters, or to finally produce to topic-specific destination
> topics.
>
>
>
> Another point would be that I will need to checkpoint the metadata after
> each successful batch and set starting offsets per partition back to ZK.  I
> expect I can do that on the final RDDs after casting them accordingly, but
> if anyone has any expertise/guidance doing that and is willing to share,
> I'd be pretty grateful.
>