You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Tigran Avanesov <ti...@olamobile.com> on 2016/05/02 12:54:56 UTC

kafka direct streaming python API fromOffsets

Hi,

I'm trying to start consuming messages from a kafka topic (via direct 
stream) from a given offset.
The documentation of createDirectStream says:

:param fromOffsets: Per-topic/partition Kafka offsets defining the 
(inclusive) starting
point of the stream.

However it expects a dictionary of topics (not names...), as i tried to 
feed it something like { 'topic' : {0: 123, 1:234}}, and of course got 
an exception.
How should I build this fromOffsets parameter?

Documentation does not say anything about it.
(In general, I think it would be better if the function accepted topic 
names)

Thank you!

Regards,
Tigran


---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: kafka direct streaming python API fromOffsets

Posted by Saisai Shao <sa...@gmail.com>.
I guess the problem is that py4j automatically translate the python int
into java int or long according to the value of the data. If this value is
small it will translate to java int, otherwise it will translate into java
long.

But in java code, the parameter must be long type, so that's the exception
you met.

AFAIK, if you're using python 2, you could specify long type like 123L or
long(123), so this data will be specifically translated into java long.
If you're using python 3, which has no long type, currently I'm sure if
there's a workaround about it.

You could refer to python kafka unit test to see the details of using
python api.

Thanks
Saisai



On Tue, May 3, 2016 at 4:11 PM, Tigran Avanesov <
tigran.avanesov@olamobile.com> wrote:

> Thank you,
>
> But now I have this error:
>
> java.lang.ClassCastException: java.lang.Integer cannot be cast to
> java.lang.Long
>
> My offsets are actually not big enough to be long. If I put bigger values,
> I have no such exception.
> For me looks like a bug.
>
> Any ideas for a workaround?
>
> Thank!
>
>
> On 05/02/2016 06:57 PM, Cody Koeninger wrote:
>
>> If you're confused about the type of an argument, you're probably
>> better off looking at documentation that includes static types:
>>
>>
>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.kafka.KafkaUtils$
>>
>> createDirectStream's fromOffsets parameter takes a map from
>> TopicAndPartition to Long.
>>
>> There is documentation for a python constructor for TopicAndPartition:
>>
>>
>> http://spark.apache.org/docs/latest/api/python/_modules/pyspark/streaming/kafka.html#TopicAndPartition
>>
>>
>> On Mon, May 2, 2016 at 5:54 AM, Tigran Avanesov
>> <ti...@olamobile.com> wrote:
>>
>>> Hi,
>>>
>>> I'm trying to start consuming messages from a kafka topic (via direct
>>> stream) from a given offset.
>>> The documentation of createDirectStream says:
>>>
>>> :param fromOffsets: Per-topic/partition Kafka offsets defining the
>>> (inclusive) starting
>>> point of the stream.
>>>
>>> However it expects a dictionary of topics (not names...), as i tried to
>>> feed
>>> it something like { 'topic' : {0: 123, 1:234}}, and of course got an
>>> exception.
>>> How should I build this fromOffsets parameter?
>>>
>>> Documentation does not say anything about it.
>>> (In general, I think it would be better if the function accepted topic
>>> names)
>>>
>>> Thank you!
>>>
>>> Regards,
>>> Tigran
>>>
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>> For additional commands, e-mail: user-help@spark.apache.org
>>>
>>>
> --
>
> Tigran Avanesov | IT Architect
> phone: +352 261911 3562
> email: tigran.avanesov@olamobile.com
> skype: tigran.avanesov.corporate
> post:  Olamobile S.à.r.l.
>        2-4 rue Eugène Ruppert
>        Bâtiment Vertigo-Polaris
>        L-2453 Luxembourg
>        Luxembourg
> web:   www.olamobile.com
>
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>

Re: kafka direct streaming python API fromOffsets

Posted by Tigran Avanesov <ti...@olamobile.com>.
Thank you,

But now I have this error:

java.lang.ClassCastException: java.lang.Integer cannot be cast to 
java.lang.Long

My offsets are actually not big enough to be long. If I put bigger 
values, I have no such exception.
For me looks like a bug.

Any ideas for a workaround?

Thank!

On 05/02/2016 06:57 PM, Cody Koeninger wrote:
> If you're confused about the type of an argument, you're probably
> better off looking at documentation that includes static types:
>
> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.kafka.KafkaUtils$
>
> createDirectStream's fromOffsets parameter takes a map from
> TopicAndPartition to Long.
>
> There is documentation for a python constructor for TopicAndPartition:
>
> http://spark.apache.org/docs/latest/api/python/_modules/pyspark/streaming/kafka.html#TopicAndPartition
>
>
> On Mon, May 2, 2016 at 5:54 AM, Tigran Avanesov
> <ti...@olamobile.com> wrote:
>> Hi,
>>
>> I'm trying to start consuming messages from a kafka topic (via direct
>> stream) from a given offset.
>> The documentation of createDirectStream says:
>>
>> :param fromOffsets: Per-topic/partition Kafka offsets defining the
>> (inclusive) starting
>> point of the stream.
>>
>> However it expects a dictionary of topics (not names...), as i tried to feed
>> it something like { 'topic' : {0: 123, 1:234}}, and of course got an
>> exception.
>> How should I build this fromOffsets parameter?
>>
>> Documentation does not say anything about it.
>> (In general, I think it would be better if the function accepted topic
>> names)
>>
>> Thank you!
>>
>> Regards,
>> Tigran
>>
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> For additional commands, e-mail: user-help@spark.apache.org
>>

-- 

Tigran Avanesov | IT Architect
phone: +352 261911 3562
email: tigran.avanesov@olamobile.com
skype: tigran.avanesov.corporate
post:  Olamobile S.�.r.l.
        2-4 rue Eug�ne Ruppert
        B�timent Vertigo-Polaris
        L-2453 Luxembourg
        Luxembourg
web:   www.olamobile.com


---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: kafka direct streaming python API fromOffsets

Posted by Cody Koeninger <co...@koeninger.org>.
If you're confused about the type of an argument, you're probably
better off looking at documentation that includes static types:

http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.kafka.KafkaUtils$

createDirectStream's fromOffsets parameter takes a map from
TopicAndPartition to Long.

There is documentation for a python constructor for TopicAndPartition:

http://spark.apache.org/docs/latest/api/python/_modules/pyspark/streaming/kafka.html#TopicAndPartition


On Mon, May 2, 2016 at 5:54 AM, Tigran Avanesov
<ti...@olamobile.com> wrote:
> Hi,
>
> I'm trying to start consuming messages from a kafka topic (via direct
> stream) from a given offset.
> The documentation of createDirectStream says:
>
> :param fromOffsets: Per-topic/partition Kafka offsets defining the
> (inclusive) starting
> point of the stream.
>
> However it expects a dictionary of topics (not names...), as i tried to feed
> it something like { 'topic' : {0: 123, 1:234}}, and of course got an
> exception.
> How should I build this fromOffsets parameter?
>
> Documentation does not say anything about it.
> (In general, I think it would be better if the function accepted topic
> names)
>
> Thank you!
>
> Regards,
> Tigran
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org