You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Raju Bairishetti <ra...@apache.org> on 2016/01/22 10:03:23 UTC

[Streaming-Kafka] How to start from topic offset when streamcontext is using checkpoint

Hi,


   I am very new to spark & spark-streaming. I am planning to use spark
streaming for real time processing.

   I have created a streaming context and checkpointing to hdfs directory
for recovery purposes in case of executor failures & driver failures.

I am creating Dstream with offset map for getting the data from kafka. I am
simply ignoring the offsets to understand the behavior. Whenver I restart
application driver restored from checkpoint as expected but Dstream is not
getting started from the initial offsets. Dstream was created with the last
consumed offsets instead of startign from 0 offsets for each topic
partition as I am not storing the offsets any where.

def main : Unit = {

    var sparkStreamingContext =
StreamingContext.getOrCreate(SparkConstants.CHECKPOINT_DIR_LOCATION,
  () => creatingFunc())

    ...


}

def creatingFunc(): Unit = {

    ...

    var offsets:Map[TopicAndPartition, Long] =
Map(TopicAndPartition("sample_sample3_json",0) -> 0)

        KafkaUtils.createDirectStream[String,String, StringDecoder,
StringDecoder,
String](sparkStreamingContext, kafkaParams, offsets, messageHandler)

...
}

I want to get control over offset management at event level instead of RDD
level to make sure that at least once delivery to end system.

As per my understanding, every RDD or RDD partition will stored in hdfs as
a file If I choose to use HDFS as output. If I use 1sec as batch interval
then it will be ended up having huge number of small files in HDFS. Having
small files in HDFS will leads to lots of other issues.
Is there any way to write multiple RDDs into single file? Don't have muh
idea about *coalesce* usage. In the worst case, I can merge all small files
in HDFS in regular intervals.

Thanks...

------
Thanks
Raju Bairishetti
www.lazada.com

Re: [Streaming-Kafka] How to start from topic offset when streamcontext is using checkpoint

Posted by Yash Sharma <ya...@gmail.com>.
For specific offsets you can directly pass the offset ranges and use the
KafkaUtils. createRDD to get the events those were missed in the Dstream.

- Thanks, via mobile,  excuse brevity.
On Jan 25, 2016 3:33 PM, "Raju Bairishetti" <ra...@apache.org> wrote:

> Hi Yash,
>    Basically, my question is how to avoid storing the kafka offsets in
> spark checkpoint directory. Streaming context is getting build from
> checkpoint directory and proceeding with the offsets in checkpointed RDD.
>
> I want to consume data from kafka from specific offsets along with the
> spark checkpoints. Streaming context is getting prepared from the
> checkpoint directory and started consuming from the topic offsets which
> were stored in checkpoint directory.
>
>
> On Sat, Jan 23, 2016 at 3:44 PM, Yash Sharma <ya...@gmail.com> wrote:
>
>> Hi Raju,
>> Could you please explain your expected behavior with the DStream. The
>> DStream will have event only from the 'fromOffsets' that you provided in
>> the createDirectStream (which I think is the expected behavior).
>>
>> For the smaller files, you will have to deal with smaller files if you
>> intend to write it immediately. Alternately what we do sometimes is-
>>
>> 1.  Maintain couple of iterations for some 30-40 seconds in application
>> until we have substantial data and then we write them to disk.
>> 2. Push smaller data back to kafka, and a different job handles the save
>> to disk.
>>
>> On Sat, Jan 23, 2016 at 7:01 PM, Raju Bairishetti <ra...@apache.org>
>> wrote:
>>
>>> Thanks for quick reply.
>>> I am creating Kafka Dstream by passing offsets map. I have pasted code
>>> snippet in my earlier mail. Let me know am I missing something.
>>>
>>> I want to use spark checkpoint for hand ng only driver/executor
>>> failures.
>>> On Jan 22, 2016 10:08 PM, "Cody Koeninger" <co...@koeninger.org> wrote:
>>>
>>>> Offsets are stored in the checkpoint.  If you want to manage offsets
>>>> yourself, don't restart from the checkpoint, specify the starting offsets
>>>> when you create the stream.
>>>>
>>>> Have you read / watched the materials linked from
>>>>
>>>> https://github.com/koeninger/kafka-exactly-once
>>>>
>>>> Regarding the small files problem, either don't use HDFS, or use
>>>> something like filecrush for merging.
>>>>
>>>> On Fri, Jan 22, 2016 at 3:03 AM, Raju Bairishetti <ra...@apache.org>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>>
>>>>>    I am very new to spark & spark-streaming. I am planning to use
>>>>> spark streaming for real time processing.
>>>>>
>>>>>    I have created a streaming context and checkpointing to hdfs
>>>>> directory for recovery purposes in case of executor failures & driver
>>>>> failures.
>>>>>
>>>>> I am creating Dstream with offset map for getting the data from kafka.
>>>>> I am simply ignoring the offsets to understand the behavior. Whenver I
>>>>> restart application driver restored from checkpoint as expected but Dstream
>>>>> is not getting started from the initial offsets. Dstream was created with
>>>>> the last consumed offsets instead of startign from 0 offsets for each topic
>>>>> partition as I am not storing the offsets any where.
>>>>>
>>>>> def main : Unit = {
>>>>>
>>>>>     var sparkStreamingContext = StreamingContext.getOrCreate(SparkConstants.CHECKPOINT_DIR_LOCATION,
>>>>>   () => creatingFunc())
>>>>>
>>>>>     ...
>>>>>
>>>>>
>>>>> }
>>>>>
>>>>> def creatingFunc(): Unit = {
>>>>>
>>>>>     ...
>>>>>
>>>>>     var offsets:Map[TopicAndPartition, Long] = Map(TopicAndPartition("sample_sample3_json",0) -> 0)
>>>>>
>>>>>         KafkaUtils.createDirectStream[String,String, StringDecoder, StringDecoder,
>>>>> String](sparkStreamingContext, kafkaParams, offsets, messageHandler)
>>>>>
>>>>> ...
>>>>> }
>>>>>
>>>>> I want to get control over offset management at event level instead of
>>>>> RDD level to make sure that at least once delivery to end system.
>>>>>
>>>>> As per my understanding, every RDD or RDD partition will stored in
>>>>> hdfs as a file If I choose to use HDFS as output. If I use 1sec as batch
>>>>> interval then it will be ended up having huge number of small files in
>>>>> HDFS. Having small files in HDFS will leads to lots of other issues.
>>>>> Is there any way to write multiple RDDs into single file? Don't have
>>>>> muh idea about *coalesce* usage. In the worst case, I can merge all small
>>>>> files in HDFS in regular intervals.
>>>>>
>>>>> Thanks...
>>>>>
>>>>> ------
>>>>> Thanks
>>>>> Raju Bairishetti
>>>>> www.lazada.com
>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>
>
>
> --
>
> ------
> Thanks
> Raju Bairishetti
> www.lazada.com
>

Re: [Streaming-Kafka] How to start from topic offset when streamcontext is using checkpoint

Posted by Raju Bairishetti <ra...@apache.org>.
Hi Yash,
   Basically, my question is how to avoid storing the kafka offsets in
spark checkpoint directory. Streaming context is getting build from
checkpoint directory and proceeding with the offsets in checkpointed RDD.

I want to consume data from kafka from specific offsets along with the
spark checkpoints. Streaming context is getting prepared from the
checkpoint directory and started consuming from the topic offsets which
were stored in checkpoint directory.


On Sat, Jan 23, 2016 at 3:44 PM, Yash Sharma <ya...@gmail.com> wrote:

> Hi Raju,
> Could you please explain your expected behavior with the DStream. The
> DStream will have event only from the 'fromOffsets' that you provided in
> the createDirectStream (which I think is the expected behavior).
>
> For the smaller files, you will have to deal with smaller files if you
> intend to write it immediately. Alternately what we do sometimes is-
>
> 1.  Maintain couple of iterations for some 30-40 seconds in application
> until we have substantial data and then we write them to disk.
> 2. Push smaller data back to kafka, and a different job handles the save
> to disk.
>
> On Sat, Jan 23, 2016 at 7:01 PM, Raju Bairishetti <ra...@apache.org> wrote:
>
>> Thanks for quick reply.
>> I am creating Kafka Dstream by passing offsets map. I have pasted code
>> snippet in my earlier mail. Let me know am I missing something.
>>
>> I want to use spark checkpoint for hand ng only driver/executor failures.
>> On Jan 22, 2016 10:08 PM, "Cody Koeninger" <co...@koeninger.org> wrote:
>>
>>> Offsets are stored in the checkpoint.  If you want to manage offsets
>>> yourself, don't restart from the checkpoint, specify the starting offsets
>>> when you create the stream.
>>>
>>> Have you read / watched the materials linked from
>>>
>>> https://github.com/koeninger/kafka-exactly-once
>>>
>>> Regarding the small files problem, either don't use HDFS, or use
>>> something like filecrush for merging.
>>>
>>> On Fri, Jan 22, 2016 at 3:03 AM, Raju Bairishetti <ra...@apache.org>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>>
>>>>    I am very new to spark & spark-streaming. I am planning to use spark
>>>> streaming for real time processing.
>>>>
>>>>    I have created a streaming context and checkpointing to hdfs
>>>> directory for recovery purposes in case of executor failures & driver
>>>> failures.
>>>>
>>>> I am creating Dstream with offset map for getting the data from kafka.
>>>> I am simply ignoring the offsets to understand the behavior. Whenver I
>>>> restart application driver restored from checkpoint as expected but Dstream
>>>> is not getting started from the initial offsets. Dstream was created with
>>>> the last consumed offsets instead of startign from 0 offsets for each topic
>>>> partition as I am not storing the offsets any where.
>>>>
>>>> def main : Unit = {
>>>>
>>>>     var sparkStreamingContext = StreamingContext.getOrCreate(SparkConstants.CHECKPOINT_DIR_LOCATION,
>>>>   () => creatingFunc())
>>>>
>>>>     ...
>>>>
>>>>
>>>> }
>>>>
>>>> def creatingFunc(): Unit = {
>>>>
>>>>     ...
>>>>
>>>>     var offsets:Map[TopicAndPartition, Long] = Map(TopicAndPartition("sample_sample3_json",0) -> 0)
>>>>
>>>>         KafkaUtils.createDirectStream[String,String, StringDecoder, StringDecoder,
>>>> String](sparkStreamingContext, kafkaParams, offsets, messageHandler)
>>>>
>>>> ...
>>>> }
>>>>
>>>> I want to get control over offset management at event level instead of
>>>> RDD level to make sure that at least once delivery to end system.
>>>>
>>>> As per my understanding, every RDD or RDD partition will stored in hdfs
>>>> as a file If I choose to use HDFS as output. If I use 1sec as batch
>>>> interval then it will be ended up having huge number of small files in
>>>> HDFS. Having small files in HDFS will leads to lots of other issues.
>>>> Is there any way to write multiple RDDs into single file? Don't have
>>>> muh idea about *coalesce* usage. In the worst case, I can merge all small
>>>> files in HDFS in regular intervals.
>>>>
>>>> Thanks...
>>>>
>>>> ------
>>>> Thanks
>>>> Raju Bairishetti
>>>> www.lazada.com
>>>>
>>>>
>>>>
>>>>
>>>
>


-- 

------
Thanks
Raju Bairishetti
www.lazada.com

Re: [Streaming-Kafka] How to start from topic offset when streamcontext is using checkpoint

Posted by Yash Sharma <ya...@gmail.com>.
Hi Raju,
Could you please explain your expected behavior with the DStream. The
DStream will have event only from the 'fromOffsets' that you provided in
the createDirectStream (which I think is the expected behavior).

For the smaller files, you will have to deal with smaller files if you
intend to write it immediately. Alternately what we do sometimes is-

1.  Maintain couple of iterations for some 30-40 seconds in application
until we have substantial data and then we write them to disk.
2. Push smaller data back to kafka, and a different job handles the save to
disk.

On Sat, Jan 23, 2016 at 7:01 PM, Raju Bairishetti <ra...@apache.org> wrote:

> Thanks for quick reply.
> I am creating Kafka Dstream by passing offsets map. I have pasted code
> snippet in my earlier mail. Let me know am I missing something.
>
> I want to use spark checkpoint for hand ng only driver/executor failures.
> On Jan 22, 2016 10:08 PM, "Cody Koeninger" <co...@koeninger.org> wrote:
>
>> Offsets are stored in the checkpoint.  If you want to manage offsets
>> yourself, don't restart from the checkpoint, specify the starting offsets
>> when you create the stream.
>>
>> Have you read / watched the materials linked from
>>
>> https://github.com/koeninger/kafka-exactly-once
>>
>> Regarding the small files problem, either don't use HDFS, or use
>> something like filecrush for merging.
>>
>> On Fri, Jan 22, 2016 at 3:03 AM, Raju Bairishetti <ra...@apache.org>
>> wrote:
>>
>>> Hi,
>>>
>>>
>>>    I am very new to spark & spark-streaming. I am planning to use spark
>>> streaming for real time processing.
>>>
>>>    I have created a streaming context and checkpointing to hdfs
>>> directory for recovery purposes in case of executor failures & driver
>>> failures.
>>>
>>> I am creating Dstream with offset map for getting the data from kafka. I
>>> am simply ignoring the offsets to understand the behavior. Whenver I
>>> restart application driver restored from checkpoint as expected but Dstream
>>> is not getting started from the initial offsets. Dstream was created with
>>> the last consumed offsets instead of startign from 0 offsets for each topic
>>> partition as I am not storing the offsets any where.
>>>
>>> def main : Unit = {
>>>
>>>     var sparkStreamingContext = StreamingContext.getOrCreate(SparkConstants.CHECKPOINT_DIR_LOCATION,
>>>   () => creatingFunc())
>>>
>>>     ...
>>>
>>>
>>> }
>>>
>>> def creatingFunc(): Unit = {
>>>
>>>     ...
>>>
>>>     var offsets:Map[TopicAndPartition, Long] = Map(TopicAndPartition("sample_sample3_json",0) -> 0)
>>>
>>>         KafkaUtils.createDirectStream[String,String, StringDecoder, StringDecoder,
>>> String](sparkStreamingContext, kafkaParams, offsets, messageHandler)
>>>
>>> ...
>>> }
>>>
>>> I want to get control over offset management at event level instead of
>>> RDD level to make sure that at least once delivery to end system.
>>>
>>> As per my understanding, every RDD or RDD partition will stored in hdfs
>>> as a file If I choose to use HDFS as output. If I use 1sec as batch
>>> interval then it will be ended up having huge number of small files in
>>> HDFS. Having small files in HDFS will leads to lots of other issues.
>>> Is there any way to write multiple RDDs into single file? Don't have muh
>>> idea about *coalesce* usage. In the worst case, I can merge all small files
>>> in HDFS in regular intervals.
>>>
>>> Thanks...
>>>
>>> ------
>>> Thanks
>>> Raju Bairishetti
>>> www.lazada.com
>>>
>>>
>>>
>>>
>>

Re: [Streaming-Kafka] How to start from topic offset when streamcontext is using checkpoint

Posted by Raju Bairishetti <ra...@apache.org>.
Thanks for quick reply.
I am creating Kafka Dstream by passing offsets map. I have pasted code
snippet in my earlier mail. Let me know am I missing something.

I want to use spark checkpoint for hand ng only driver/executor failures.
On Jan 22, 2016 10:08 PM, "Cody Koeninger" <co...@koeninger.org> wrote:

> Offsets are stored in the checkpoint.  If you want to manage offsets
> yourself, don't restart from the checkpoint, specify the starting offsets
> when you create the stream.
>
> Have you read / watched the materials linked from
>
> https://github.com/koeninger/kafka-exactly-once
>
> Regarding the small files problem, either don't use HDFS, or use something
> like filecrush for merging.
>
> On Fri, Jan 22, 2016 at 3:03 AM, Raju Bairishetti <ra...@apache.org> wrote:
>
>> Hi,
>>
>>
>>    I am very new to spark & spark-streaming. I am planning to use spark
>> streaming for real time processing.
>>
>>    I have created a streaming context and checkpointing to hdfs directory
>> for recovery purposes in case of executor failures & driver failures.
>>
>> I am creating Dstream with offset map for getting the data from kafka. I
>> am simply ignoring the offsets to understand the behavior. Whenver I
>> restart application driver restored from checkpoint as expected but Dstream
>> is not getting started from the initial offsets. Dstream was created with
>> the last consumed offsets instead of startign from 0 offsets for each topic
>> partition as I am not storing the offsets any where.
>>
>> def main : Unit = {
>>
>>     var sparkStreamingContext = StreamingContext.getOrCreate(SparkConstants.CHECKPOINT_DIR_LOCATION,
>>   () => creatingFunc())
>>
>>     ...
>>
>>
>> }
>>
>> def creatingFunc(): Unit = {
>>
>>     ...
>>
>>     var offsets:Map[TopicAndPartition, Long] = Map(TopicAndPartition("sample_sample3_json",0) -> 0)
>>
>>         KafkaUtils.createDirectStream[String,String, StringDecoder, StringDecoder,
>> String](sparkStreamingContext, kafkaParams, offsets, messageHandler)
>>
>> ...
>> }
>>
>> I want to get control over offset management at event level instead of
>> RDD level to make sure that at least once delivery to end system.
>>
>> As per my understanding, every RDD or RDD partition will stored in hdfs
>> as a file If I choose to use HDFS as output. If I use 1sec as batch
>> interval then it will be ended up having huge number of small files in
>> HDFS. Having small files in HDFS will leads to lots of other issues.
>> Is there any way to write multiple RDDs into single file? Don't have muh
>> idea about *coalesce* usage. In the worst case, I can merge all small files
>> in HDFS in regular intervals.
>>
>> Thanks...
>>
>> ------
>> Thanks
>> Raju Bairishetti
>> www.lazada.com
>>
>>
>>
>>
>

Re: [Streaming-Kafka] How to start from topic offset when streamcontext is using checkpoint

Posted by Cody Koeninger <co...@koeninger.org>.
Offsets are stored in the checkpoint.  If you want to manage offsets
yourself, don't restart from the checkpoint, specify the starting offsets
when you create the stream.

Have you read / watched the materials linked from

https://github.com/koeninger/kafka-exactly-once

Regarding the small files problem, either don't use HDFS, or use something
like filecrush for merging.

On Fri, Jan 22, 2016 at 3:03 AM, Raju Bairishetti <ra...@apache.org> wrote:

> Hi,
>
>
>    I am very new to spark & spark-streaming. I am planning to use spark
> streaming for real time processing.
>
>    I have created a streaming context and checkpointing to hdfs directory
> for recovery purposes in case of executor failures & driver failures.
>
> I am creating Dstream with offset map for getting the data from kafka. I
> am simply ignoring the offsets to understand the behavior. Whenver I
> restart application driver restored from checkpoint as expected but Dstream
> is not getting started from the initial offsets. Dstream was created with
> the last consumed offsets instead of startign from 0 offsets for each topic
> partition as I am not storing the offsets any where.
>
> def main : Unit = {
>
>     var sparkStreamingContext = StreamingContext.getOrCreate(SparkConstants.CHECKPOINT_DIR_LOCATION,
>   () => creatingFunc())
>
>     ...
>
>
> }
>
> def creatingFunc(): Unit = {
>
>     ...
>
>     var offsets:Map[TopicAndPartition, Long] = Map(TopicAndPartition("sample_sample3_json",0) -> 0)
>
>         KafkaUtils.createDirectStream[String,String, StringDecoder, StringDecoder,
> String](sparkStreamingContext, kafkaParams, offsets, messageHandler)
>
> ...
> }
>
> I want to get control over offset management at event level instead of RDD
> level to make sure that at least once delivery to end system.
>
> As per my understanding, every RDD or RDD partition will stored in hdfs as
> a file If I choose to use HDFS as output. If I use 1sec as batch interval
> then it will be ended up having huge number of small files in HDFS. Having
> small files in HDFS will leads to lots of other issues.
> Is there any way to write multiple RDDs into single file? Don't have muh
> idea about *coalesce* usage. In the worst case, I can merge all small files
> in HDFS in regular intervals.
>
> Thanks...
>
> ------
> Thanks
> Raju Bairishetti
> www.lazada.com
>
>
>
>