You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@apex.apache.org by Chaitanya Chebolu <ch...@datatorrent.com> on 2016/06/13 08:06:03 UTC

Re: Kafka input operator

Hi Raja,

   I think you are using 0.8 version of kafka operator. There is no such
operator in Malhar.  To meet your requirement, please do as below:

  Create a new class which extend from AbstractKafkaInputOperator. Override
the API "void emitTuples()" and create the output port of type
MutablePair<Message,MutablePair<long,int>>

Copy the emitTuples() from AbstractKafkaInputOperator and change the below
line:
emitTuple(message.msg) to
outputPort.emit(new MutablePair<>(message.getMsg(),new
MutablePair<>(message.getOffset(),message.getKafkaPart().getPartitionId())));

Regards,
Chaitanya


On Sat, Jun 11, 2016 at 7:56 PM, Raja.Aravapalli <Raja.Aravapalli@target.com
> wrote:

>
> Hi
>
> Does anyone have an idea, if any of the existing kafka input operators
> give the ability to retrieve  kafka Partition ID & Offset a particular
> message came from, along with the messages ?
>
>
> Thanks a lot in advance.
>
>
> Regards,
> Raja.
>

Re: Kafka input operator

Posted by "Raja.Aravapalli" <Ra...@target.com>.
Thanks Priyanka.

Thanks all for sharing your help. For now I was able to fix my serialization issues. :)

Regards,
Raja.

From: Priyanka Gugale <pr...@datatorrent.com>>
Reply-To: "users@apex.apache.org<ma...@apex.apache.org>" <us...@apex.apache.org>>
Date: Monday, June 20, 2016 at 3:28 AM
To: "users@apex.apache.org<ma...@apex.apache.org>" <us...@apex.apache.org>>
Subject: Re: Kafka input operator

Hi Raja,

I have opened a pull request<https://github.com/apache/apex-malhar/pull/323/files> in Malhar to add emitTuple method to AbstractKafkaInputOperator which has KafkaMessage parameter. The KafkaMessage has both partitionId and offset along with the message. Once this gets merged, you can override only emitTuple method in your subclass and convert "Message" into String or bytes array. So now you won't have variable access problem as well as serialization issues.

-Priyanka

On Mon, Jun 20, 2016 at 3:59 AM, Thomas Weise <th...@gmail.com>> wrote:
kafka.message.Message is the problem, MutablePair has a no-arg constructor and should be serializable for Kryo,


On Sun, Jun 19, 2016 at 3:10 PM, <hs...@gmail.com>> wrote:
The Pairs in Apache common are not Kryo serializable. You can use other pair data structure. For example KeyValuePair in Malhar library

Siyuan

Sent from my iPhone

On Jun 19, 2016, at 14:58, Raja.Aravapalli <Ra...@target.com>> wrote:


Hi Priyanka,

I am writing to read the messages in the next operator with input port defined like the below,


public transient DefaultInputPort<MutablePair<Message, MutablePair<Long, Integer>>> input = new DefaultInputPort<MutablePair<Message, MutablePair<Long, Integer>>>()

Application is failing with below exception:


2016-06-19 16:54:45,498 ERROR codec.DefaultStatefulStreamCodec (DefaultStatefulStreamCodec.java:fromDataStatePair(98)) - Catastrophic Error: Execution halted due to Kryo exception!
com.esotericsoftware.kryo.KryoException: Class cannot be created (missing no-arg constructor): kafka.message.Message
Serialization trace:
left (org.apache.commons.lang3.tuple.MutablePair)
        at com.esotericsoftware.kryo.Kryo$DefaultInstantiatorStrategy.newInstantiatorOf(Kryo.java:1228)
        at com.esotericsoftware.kryo.Kryo.newInstantiator(Kryo.java:1049)
        at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1058)


Any help please.

Regards,
Raja.

From: "Raja.Aravapalli" <Ra...@target.com>>
Reply-To: "users@apex.apache.org<ma...@apex.apache.org>" <us...@apex.apache.org>>
Date: Sunday, June 19, 2016 at 12:22 AM
To: "users@apex.apache.org<ma...@apex.apache.org>" <us...@apex.apache.org>>
Subject: Re: Kafka input operator


Thanks for the response Priyanka…

But, when I try to put in my own package, some of the protected variables are not accessible!!!!


Regards,
Raja.

From: Priyanka Gugale <pr...@datatorrent.com>>
Reply-To: "users@apex.apache.org<ma...@apex.apache.org>" <us...@apex.apache.org>>
Date: Saturday, June 18, 2016 at 10:29 AM
To: "users@apex.apache.org<ma...@apex.apache.org>" <us...@apex.apache.org>>
Subject: Re: Kafka input operator


Hi,

Yes sure, you can use any package name you want. In fact better you put this class outside Malhar jar. Just keep the Malhar jar in your class path.

-Priyanka

On Jun 17, 2016 8:03 PM, "Raja.Aravapalli" <Ra...@target.com>> wrote:

Hi Priyanka,

Can this be done from a class outside the package “com.datatorrent.contrib.kafka;” ?

I don’t want to disturb the source :(



Regards,
Raja.

From: "Raja.Aravapalli" <Ra...@target.com>>
Date: Friday, June 17, 2016 at 5:38 AM
To: "users@apex.apache.org<ma...@apex.apache.org>" <us...@apex.apache.org>>
Subject: Re: Kafka input operator


Hi Priyanka,

I am using kafka version 0.8.x.

Awesome. Yes. This is what is want. I shall test this and share my updates. Having one kafka operator like this in Malhar, will be a very good one. I don’t see such availability in Storm as well!!



Regards,
Raja.

From: Priyanka Gugale <pr...@datatorrent.com>>
Reply-To: "users@apex.apache.org<ma...@apex.apache.org>" <us...@apex.apache.org>>
Date: Friday, June 17, 2016 at 2:05 AM
To: "users@apex.apache.org<ma...@apex.apache.org>" <us...@apex.apache.org>>
Subject: Re: Kafka input operator

Hi Raja,

I have quickly wrote an operator to fulfill your requirement. The code is available here<https://github.com/apache/apex-malhar/compare/master...DT-Priyanka:Kafka-input-updates>. Let me know if this addresses your usecase.

-Priyanka

On Fri, Jun 17, 2016 at 11:32 AM, Priyanka Gugale <pr...@datatorrent.com>> wrote:
Hi Raja,

You will need to update other places as well (I guess it's replay other than emitTuples) . But I think it is not feasible to replicate emitTuples code in subclass as many of the parent class variables are private. I would try to figure out if there is any other way.

Can you please confirm which Kafka version you are using?

-Priyanka

On Thu, Jun 16, 2016 at 8:39 PM, Raja.Aravapalli <Ra...@target.com>> wrote:

Hi Chaitanya,

Would the below changes you proposed enough to retrieve partition & offset ?

I see emitTuple(Message msg) is being called at various places in the code… please advise. Thank you.


Regards,
Raja.

From: "Raja.Aravapalli" <Ra...@target.com>>
Date: Tuesday, June 14, 2016 at 9:50 PM
To: "users@apex.apache.org<ma...@apex.apache.org>" <us...@apex.apache.org>>
Subject: Re: Kafka input operator


Thanks for the response Chaitanya. I will follow the suggestions to retrieve Kafka partitionId & offset!!


Regards,
Raja.

From: Chaitanya Chebolu <ch...@datatorrent.com>>
Reply-To: "users@apex.apache.org<ma...@apex.apache.org>" <us...@apex.apache.org>>
Date: Monday, June 13, 2016 at 3:06 AM
To: "users@apex.apache.org<ma...@apex.apache.org>" <us...@apex.apache.org>>
Subject: Re: Kafka input operator

Hi Raja,

   I think you are using 0.8 version of kafka operator. There is no such operator in Malhar.  To meet your requirement, please do as below:

  Create a new class which extend from AbstractKafkaInputOperator. Override the API "void emitTuples()" and create the output port of type MutablePair<Message,MutablePair<long,int>>

Copy the emitTuples() from AbstractKafkaInputOperator and change the below line:
emitTuple(message.msg) to
outputPort.emit(new MutablePair<>(message.getMsg(),new MutablePair<>(message.getOffset(),message.getKafkaPart().getPartitionId())));

Regards,
Chaitanya


On Sat, Jun 11, 2016 at 7:56 PM, Raja.Aravapalli <Ra...@target.com>> wrote:

Hi

Does anyone have an idea, if any of the existing kafka input operators give the ability to retrieve  kafka Partition ID & Offset a particular message came from, along with the messages ?


Thanks a lot in advance.


Regards,
Raja.






Re: Kafka input operator

Posted by Priyanka Gugale <pr...@datatorrent.com>.
Hi Raja,

I have opened a pull request
<https://github.com/apache/apex-malhar/pull/323/files> in Malhar to add
emitTuple method to AbstractKafkaInputOperator which has KafkaMessage
parameter. The KafkaMessage has both partitionId and offset along with the
message. Once this gets merged, you can override* only* emitTuple method in
your subclass and convert "Message" into String or bytes array. So now you
won't have variable access problem as well as serialization issues.

-Priyanka

On Mon, Jun 20, 2016 at 3:59 AM, Thomas Weise <th...@gmail.com>
wrote:

> kafka.message.Message is the problem, MutablePair has a no-arg constructor
> and should be serializable for Kryo,
>
>
> On Sun, Jun 19, 2016 at 3:10 PM, <hs...@gmail.com> wrote:
>
>> The Pairs in Apache common are not Kryo serializable. You can use other
>> pair data structure. For example KeyValuePair in Malhar library
>>
>> Siyuan
>>
>> Sent from my iPhone
>>
>> On Jun 19, 2016, at 14:58, Raja.Aravapalli <Ra...@target.com>
>> wrote:
>>
>>
>> Hi Priyanka,
>>
>> I am writing to read the messages in the next operator with input port
>> defined like the below,
>>
>> public transient DefaultInputPort<MutablePair<Message, MutablePair<Long, Integer>>> input = new DefaultInputPort<MutablePair<Message, MutablePair<Long, Integer>>>()
>>
>>
>> Application is failing with below exception:
>>
>> 2016-06-19 16:54:45,498 ERROR codec.DefaultStatefulStreamCodec (DefaultStatefulStreamCodec.java:fromDataStatePair(98)) - Catastrophic Error: Execution halted due to Kryo exception!
>> com.esotericsoftware.kryo.KryoException: Class cannot be created (missing no-arg constructor): kafka.message.Message
>> Serialization trace:
>> left (org.apache.commons.lang3.tuple.MutablePair)
>> 	at com.esotericsoftware.kryo.Kryo$DefaultInstantiatorStrategy.newInstantiatorOf(Kryo.java:1228)
>> 	at com.esotericsoftware.kryo.Kryo.newInstantiator(Kryo.java:1049)
>> 	at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1058)
>>
>>
>>
>> Any help please.
>>
>> Regards,
>> Raja.
>>
>> From: "Raja.Aravapalli" <Ra...@target.com>
>> Reply-To: "users@apex.apache.org" <us...@apex.apache.org>
>> Date: Sunday, June 19, 2016 at 12:22 AM
>> To: "users@apex.apache.org" <us...@apex.apache.org>
>> Subject: Re: Kafka input operator
>>
>>
>> Thanks for the response Priyanka…
>>
>> But, when I try to put in my own package, some of the protected variables
>> are not accessible!!!!
>>
>>
>> Regards,
>> Raja.
>>
>> From: Priyanka Gugale <pr...@datatorrent.com>
>> Reply-To: "users@apex.apache.org" <us...@apex.apache.org>
>> Date: Saturday, June 18, 2016 at 10:29 AM
>> To: "users@apex.apache.org" <us...@apex.apache.org>
>> Subject: Re: Kafka input operator
>>
>> Hi,
>>
>> Yes sure, you can use any package name you want. In fact better you put
>> this class outside Malhar jar. Just keep the Malhar jar in your class path.
>>
>> -Priyanka
>> On Jun 17, 2016 8:03 PM, "Raja.Aravapalli" <Ra...@target.com>
>> wrote:
>>
>>>
>>> Hi Priyanka,
>>>
>>> Can this be done from a class outside the package “
>>> com.datatorrent.contrib.kafka;” ?
>>>
>>> I don’t want to disturb the source :(
>>>
>>>
>>>
>>> Regards,
>>> Raja.
>>>
>>> From: "Raja.Aravapalli" <Ra...@target.com>
>>> Date: Friday, June 17, 2016 at 5:38 AM
>>> To: "users@apex.apache.org" <us...@apex.apache.org>
>>> Subject: Re: Kafka input operator
>>>
>>>
>>> Hi Priyanka,
>>>
>>> I am using kafka version 0.8.x.
>>>
>>> Awesome. Yes. This is what is want. I shall test this and share my
>>> updates. Having one kafka operator like this in Malhar, will be a very good
>>> one. I don’t see such availability in Storm as well!!
>>>
>>>
>>>
>>> Regards,
>>> Raja.
>>>
>>> From: Priyanka Gugale <pr...@datatorrent.com>
>>> Reply-To: "users@apex.apache.org" <us...@apex.apache.org>
>>> Date: Friday, June 17, 2016 at 2:05 AM
>>> To: "users@apex.apache.org" <us...@apex.apache.org>
>>> Subject: Re: Kafka input operator
>>>
>>> Hi Raja,
>>>
>>> I have quickly wrote an operator to fulfill your requirement. The code
>>> is available here
>>> <https://github.com/apache/apex-malhar/compare/master...DT-Priyanka:Kafka-input-updates>.
>>> Let me know if this addresses your usecase.
>>>
>>> -Priyanka
>>>
>>> On Fri, Jun 17, 2016 at 11:32 AM, Priyanka Gugale <
>>> priyanka@datatorrent.com> wrote:
>>>
>>>> Hi Raja,
>>>>
>>>> You will need to update other places as well (I guess it's replay other
>>>> than emitTuples) . But I think it is not feasible to replicate emitTuples
>>>> code in subclass as many of the parent class variables are private. I would
>>>> try to figure out if there is any other way.
>>>>
>>>> Can you please confirm which Kafka version you are using?
>>>>
>>>> -Priyanka
>>>>
>>>> On Thu, Jun 16, 2016 at 8:39 PM, Raja.Aravapalli <
>>>> Raja.Aravapalli@target.com> wrote:
>>>>
>>>>>
>>>>> Hi Chaitanya,
>>>>>
>>>>> Would the below changes you proposed enough to retrieve partition &
>>>>> offset ?
>>>>>
>>>>> I see *emitTuple(Message msg) i*s being called at various places in
>>>>> the code… please advise. Thank you.
>>>>>
>>>>>
>>>>> Regards,
>>>>> Raja.
>>>>>
>>>>> From: "Raja.Aravapalli" <Ra...@target.com>
>>>>> Date: Tuesday, June 14, 2016 at 9:50 PM
>>>>> To: "users@apex.apache.org" <us...@apex.apache.org>
>>>>> Subject: Re: Kafka input operator
>>>>>
>>>>>
>>>>> Thanks for the response Chaitanya. I will follow the suggestions to
>>>>> retrieve Kafka partitionId & offset!!
>>>>>
>>>>>
>>>>> Regards,
>>>>> Raja.
>>>>>
>>>>> From: Chaitanya Chebolu <ch...@datatorrent.com>
>>>>> Reply-To: "users@apex.apache.org" <us...@apex.apache.org>
>>>>> Date: Monday, June 13, 2016 at 3:06 AM
>>>>> To: "users@apex.apache.org" <us...@apex.apache.org>
>>>>> Subject: Re: Kafka input operator
>>>>>
>>>>> Hi Raja,
>>>>>
>>>>>    I think you are using 0.8 version of kafka operator. There is no
>>>>> such operator in Malhar.  To meet your requirement, please do as below:
>>>>>
>>>>>   Create a new class which extend from AbstractKafkaInputOperator.
>>>>> Override the API "void emitTuples()" and create the output port of type
>>>>> MutablePair<Message,MutablePair<long,int>>
>>>>>
>>>>> Copy the emitTuples() from AbstractKafkaInputOperator and change the
>>>>> below line:
>>>>> emitTuple(message.msg) to
>>>>> outputPort.emit(new MutablePair<>(message.getMsg(),new
>>>>> MutablePair<>(message.getOffset(),message.getKafkaPart().getPartitionId())));
>>>>>
>>>>> Regards,
>>>>> Chaitanya
>>>>>
>>>>>
>>>>> On Sat, Jun 11, 2016 at 7:56 PM, Raja.Aravapalli <
>>>>> Raja.Aravapalli@target.com> wrote:
>>>>>
>>>>>>
>>>>>> Hi
>>>>>>
>>>>>> Does anyone have an idea, if any of the existing kafka input
>>>>>> operators give the ability to retrieve  kafka Partition ID & Offset a
>>>>>> particular message came from, along with the messages ?
>>>>>>
>>>>>>
>>>>>> Thanks a lot in advance.
>>>>>>
>>>>>>
>>>>>> Regards,
>>>>>> Raja.
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>

Re: Kafka input operator

Posted by Thomas Weise <th...@gmail.com>.
kafka.message.Message is the problem, MutablePair has a no-arg constructor
and should be serializable for Kryo,


On Sun, Jun 19, 2016 at 3:10 PM, <hs...@gmail.com> wrote:

> The Pairs in Apache common are not Kryo serializable. You can use other
> pair data structure. For example KeyValuePair in Malhar library
>
> Siyuan
>
> Sent from my iPhone
>
> On Jun 19, 2016, at 14:58, Raja.Aravapalli <Ra...@target.com>
> wrote:
>
>
> Hi Priyanka,
>
> I am writing to read the messages in the next operator with input port
> defined like the below,
>
> public transient DefaultInputPort<MutablePair<Message, MutablePair<Long, Integer>>> input = new DefaultInputPort<MutablePair<Message, MutablePair<Long, Integer>>>()
>
>
> Application is failing with below exception:
>
> 2016-06-19 16:54:45,498 ERROR codec.DefaultStatefulStreamCodec (DefaultStatefulStreamCodec.java:fromDataStatePair(98)) - Catastrophic Error: Execution halted due to Kryo exception!
> com.esotericsoftware.kryo.KryoException: Class cannot be created (missing no-arg constructor): kafka.message.Message
> Serialization trace:
> left (org.apache.commons.lang3.tuple.MutablePair)
> 	at com.esotericsoftware.kryo.Kryo$DefaultInstantiatorStrategy.newInstantiatorOf(Kryo.java:1228)
> 	at com.esotericsoftware.kryo.Kryo.newInstantiator(Kryo.java:1049)
> 	at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1058)
>
>
>
> Any help please.
>
> Regards,
> Raja.
>
> From: "Raja.Aravapalli" <Ra...@target.com>
> Reply-To: "users@apex.apache.org" <us...@apex.apache.org>
> Date: Sunday, June 19, 2016 at 12:22 AM
> To: "users@apex.apache.org" <us...@apex.apache.org>
> Subject: Re: Kafka input operator
>
>
> Thanks for the response Priyanka…
>
> But, when I try to put in my own package, some of the protected variables
> are not accessible!!!!
>
>
> Regards,
> Raja.
>
> From: Priyanka Gugale <pr...@datatorrent.com>
> Reply-To: "users@apex.apache.org" <us...@apex.apache.org>
> Date: Saturday, June 18, 2016 at 10:29 AM
> To: "users@apex.apache.org" <us...@apex.apache.org>
> Subject: Re: Kafka input operator
>
> Hi,
>
> Yes sure, you can use any package name you want. In fact better you put
> this class outside Malhar jar. Just keep the Malhar jar in your class path.
>
> -Priyanka
> On Jun 17, 2016 8:03 PM, "Raja.Aravapalli" <Ra...@target.com>
> wrote:
>
>>
>> Hi Priyanka,
>>
>> Can this be done from a class outside the package “
>> com.datatorrent.contrib.kafka;” ?
>>
>> I don’t want to disturb the source :(
>>
>>
>>
>> Regards,
>> Raja.
>>
>> From: "Raja.Aravapalli" <Ra...@target.com>
>> Date: Friday, June 17, 2016 at 5:38 AM
>> To: "users@apex.apache.org" <us...@apex.apache.org>
>> Subject: Re: Kafka input operator
>>
>>
>> Hi Priyanka,
>>
>> I am using kafka version 0.8.x.
>>
>> Awesome. Yes. This is what is want. I shall test this and share my
>> updates. Having one kafka operator like this in Malhar, will be a very good
>> one. I don’t see such availability in Storm as well!!
>>
>>
>>
>> Regards,
>> Raja.
>>
>> From: Priyanka Gugale <pr...@datatorrent.com>
>> Reply-To: "users@apex.apache.org" <us...@apex.apache.org>
>> Date: Friday, June 17, 2016 at 2:05 AM
>> To: "users@apex.apache.org" <us...@apex.apache.org>
>> Subject: Re: Kafka input operator
>>
>> Hi Raja,
>>
>> I have quickly wrote an operator to fulfill your requirement. The code is
>> available here
>> <https://github.com/apache/apex-malhar/compare/master...DT-Priyanka:Kafka-input-updates>.
>> Let me know if this addresses your usecase.
>>
>> -Priyanka
>>
>> On Fri, Jun 17, 2016 at 11:32 AM, Priyanka Gugale <
>> priyanka@datatorrent.com> wrote:
>>
>>> Hi Raja,
>>>
>>> You will need to update other places as well (I guess it's replay other
>>> than emitTuples) . But I think it is not feasible to replicate emitTuples
>>> code in subclass as many of the parent class variables are private. I would
>>> try to figure out if there is any other way.
>>>
>>> Can you please confirm which Kafka version you are using?
>>>
>>> -Priyanka
>>>
>>> On Thu, Jun 16, 2016 at 8:39 PM, Raja.Aravapalli <
>>> Raja.Aravapalli@target.com> wrote:
>>>
>>>>
>>>> Hi Chaitanya,
>>>>
>>>> Would the below changes you proposed enough to retrieve partition &
>>>> offset ?
>>>>
>>>> I see *emitTuple(Message msg) i*s being called at various places in
>>>> the code… please advise. Thank you.
>>>>
>>>>
>>>> Regards,
>>>> Raja.
>>>>
>>>> From: "Raja.Aravapalli" <Ra...@target.com>
>>>> Date: Tuesday, June 14, 2016 at 9:50 PM
>>>> To: "users@apex.apache.org" <us...@apex.apache.org>
>>>> Subject: Re: Kafka input operator
>>>>
>>>>
>>>> Thanks for the response Chaitanya. I will follow the suggestions to
>>>> retrieve Kafka partitionId & offset!!
>>>>
>>>>
>>>> Regards,
>>>> Raja.
>>>>
>>>> From: Chaitanya Chebolu <ch...@datatorrent.com>
>>>> Reply-To: "users@apex.apache.org" <us...@apex.apache.org>
>>>> Date: Monday, June 13, 2016 at 3:06 AM
>>>> To: "users@apex.apache.org" <us...@apex.apache.org>
>>>> Subject: Re: Kafka input operator
>>>>
>>>> Hi Raja,
>>>>
>>>>    I think you are using 0.8 version of kafka operator. There is no
>>>> such operator in Malhar.  To meet your requirement, please do as below:
>>>>
>>>>   Create a new class which extend from AbstractKafkaInputOperator.
>>>> Override the API "void emitTuples()" and create the output port of type
>>>> MutablePair<Message,MutablePair<long,int>>
>>>>
>>>> Copy the emitTuples() from AbstractKafkaInputOperator and change the
>>>> below line:
>>>> emitTuple(message.msg) to
>>>> outputPort.emit(new MutablePair<>(message.getMsg(),new
>>>> MutablePair<>(message.getOffset(),message.getKafkaPart().getPartitionId())));
>>>>
>>>> Regards,
>>>> Chaitanya
>>>>
>>>>
>>>> On Sat, Jun 11, 2016 at 7:56 PM, Raja.Aravapalli <
>>>> Raja.Aravapalli@target.com> wrote:
>>>>
>>>>>
>>>>> Hi
>>>>>
>>>>> Does anyone have an idea, if any of the existing kafka input operators
>>>>> give the ability to retrieve  kafka Partition ID & Offset a particular
>>>>> message came from, along with the messages ?
>>>>>
>>>>>
>>>>> Thanks a lot in advance.
>>>>>
>>>>>
>>>>> Regards,
>>>>> Raja.
>>>>>
>>>>
>>>>
>>>
>>

Re: Kafka input operator

Posted by hs...@gmail.com.
The Pairs in Apache common are not Kryo serializable. You can use other pair data structure. For example KeyValuePair in Malhar library 

Siyuan

Sent from my iPhone

> On Jun 19, 2016, at 14:58, Raja.Aravapalli <Ra...@target.com> wrote:
> 
> 
> Hi Priyanka, 
> 
> I am writing to read the messages in the next operator with input port defined like the below, 
> 
> public transient DefaultInputPort<MutablePair<Message, MutablePair<Long, Integer>>> input = new DefaultInputPort<MutablePair<Message, MutablePair<Long, Integer>>>()
> 
> Application is failing with below exception:
> 
> 2016-06-19 16:54:45,498 ERROR codec.DefaultStatefulStreamCodec (DefaultStatefulStreamCodec.java:fromDataStatePair(98)) - Catastrophic Error: Execution halted due to Kryo exception!
> com.esotericsoftware.kryo.KryoException: Class cannot be created (missing no-arg constructor): kafka.message.Message
> Serialization trace:
> left (org.apache.commons.lang3.tuple.MutablePair)
> 	at com.esotericsoftware.kryo.Kryo$DefaultInstantiatorStrategy.newInstantiatorOf(Kryo.java:1228)
> 	at com.esotericsoftware.kryo.Kryo.newInstantiator(Kryo.java:1049)
> 	at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1058)
> 
> 
> Any help please. 
> 
> Regards, 
> Raja.
> 
> From: "Raja.Aravapalli" <Ra...@target.com>
> Reply-To: "users@apex.apache.org" <us...@apex.apache.org>
> Date: Sunday, June 19, 2016 at 12:22 AM
> To: "users@apex.apache.org" <us...@apex.apache.org>
> Subject: Re: Kafka input operator
> 
> 
> Thanks for the response Priyanka…
> 
> But, when I try to put in my own package, some of the protected variables are not accessible!!!! 
> 
> 
> Regards,
> Raja.
> 
> From: Priyanka Gugale <pr...@datatorrent.com>
> Reply-To: "users@apex.apache.org" <us...@apex.apache.org>
> Date: Saturday, June 18, 2016 at 10:29 AM
> To: "users@apex.apache.org" <us...@apex.apache.org>
> Subject: Re: Kafka input operator
> 
> Hi,
> 
> Yes sure, you can use any package name you want. In fact better you put this class outside Malhar jar. Just keep the Malhar jar in your class path.
> 
> -Priyanka
> 
>> On Jun 17, 2016 8:03 PM, "Raja.Aravapalli" <Ra...@target.com> wrote:
>> 
>> Hi Priyanka, 
>> 
>> Can this be done from a class outside the package “com.datatorrent.contrib.kafka;” ?
>> 
>> I don’t want to disturb the source :(
>> 
>> 
>> 
>> Regards, 
>> Raja.
>> 
>> From: "Raja.Aravapalli" <Ra...@target.com>
>> Date: Friday, June 17, 2016 at 5:38 AM
>> To: "users@apex.apache.org" <us...@apex.apache.org>
>> Subject: Re: Kafka input operator
>> 
>> 
>> Hi Priyanka, 
>> 
>> I am using kafka version 0.8.x.
>> 
>> Awesome. Yes. This is what is want. I shall test this and share my updates. Having one kafka operator like this in Malhar, will be a very good one. I don’t see such availability in Storm as well!!
>> 
>> 
>> 
>> Regards, 
>> Raja.
>> 
>> From: Priyanka Gugale <pr...@datatorrent.com>
>> Reply-To: "users@apex.apache.org" <us...@apex.apache.org>
>> Date: Friday, June 17, 2016 at 2:05 AM
>> To: "users@apex.apache.org" <us...@apex.apache.org>
>> Subject: Re: Kafka input operator
>> 
>> Hi Raja,
>> 
>> I have quickly wrote an operator to fulfill your requirement. The code is available here. Let me know if this addresses your usecase.
>> 
>> -Priyanka
>> 
>>> On Fri, Jun 17, 2016 at 11:32 AM, Priyanka Gugale <pr...@datatorrent.com> wrote:
>>> Hi Raja,
>>> 
>>> You will need to update other places as well (I guess it's replay other than emitTuples) . But I think it is not feasible to replicate emitTuples code in subclass as many of the parent class variables are private. I would try to figure out if there is any other way.
>>> 
>>> Can you please confirm which Kafka version you are using?
>>> 
>>> -Priyanka
>>> 
>>>> On Thu, Jun 16, 2016 at 8:39 PM, Raja.Aravapalli <Ra...@target.com> wrote:
>>>> 
>>>> Hi Chaitanya, 
>>>> 
>>>> Would the below changes you proposed enough to retrieve partition & offset ?
>>>> 
>>>> I see emitTuple(Message msg) is being called at various places in the code… please advise. Thank you.
>>>> 
>>>> 
>>>> Regards,
>>>> Raja.
>>>> 
>>>> From: "Raja.Aravapalli" <Ra...@target.com>
>>>> Date: Tuesday, June 14, 2016 at 9:50 PM
>>>> To: "users@apex.apache.org" <us...@apex.apache.org>
>>>> Subject: Re: Kafka input operator
>>>> 
>>>> 
>>>> Thanks for the response Chaitanya. I will follow the suggestions to retrieve Kafka partitionId & offset!! 
>>>> 
>>>> 
>>>> Regards,
>>>> Raja.
>>>> 
>>>> From: Chaitanya Chebolu <ch...@datatorrent.com>
>>>> Reply-To: "users@apex.apache.org" <us...@apex.apache.org>
>>>> Date: Monday, June 13, 2016 at 3:06 AM
>>>> To: "users@apex.apache.org" <us...@apex.apache.org>
>>>> Subject: Re: Kafka input operator
>>>> 
>>>> Hi Raja,
>>>> 
>>>>    I think you are using 0.8 version of kafka operator. There is no such operator in Malhar.  To meet your requirement, please do as below:
>>>> 
>>>>   Create a new class which extend from AbstractKafkaInputOperator. Override the API "void emitTuples()" and create the output port of type MutablePair<Message,MutablePair<long,int>>
>>>> 
>>>> Copy the emitTuples() from AbstractKafkaInputOperator and change the below line:
>>>> emitTuple(message.msg) to
>>>> outputPort.emit(new MutablePair<>(message.getMsg(),new MutablePair<>(message.getOffset(),message.getKafkaPart().getPartitionId())));
>>>> 
>>>> Regards,
>>>> Chaitanya
>>>> 
>>>> 
>>>>> On Sat, Jun 11, 2016 at 7:56 PM, Raja.Aravapalli <Ra...@target.com> wrote:
>>>>> 
>>>>> Hi
>>>>> 
>>>>> Does anyone have an idea, if any of the existing kafka input operators give the ability to retrieve  kafka Partition ID & Offset a particular message came from, along with the messages ?
>>>>> 
>>>>> 
>>>>> Thanks a lot in advance. 
>>>>> 
>>>>> 
>>>>> Regards,
>>>>> Raja.
>>>> 
>>> 
>> 

Re: Kafka input operator

Posted by "Raja.Aravapalli" <Ra...@target.com>.
Hi Priyanka,

I am writing to read the messages in the next operator with input port defined like the below,


public transient DefaultInputPort<MutablePair<Message, MutablePair<Long, Integer>>> input = new DefaultInputPort<MutablePair<Message, MutablePair<Long, Integer>>>()

Application is failing with below exception:


2016-06-19 16:54:45,498 ERROR codec.DefaultStatefulStreamCodec (DefaultStatefulStreamCodec.java:fromDataStatePair(98)) - Catastrophic Error: Execution halted due to Kryo exception!
com.esotericsoftware.kryo.KryoException: Class cannot be created (missing no-arg constructor): kafka.message.Message
Serialization trace:
left (org.apache.commons.lang3.tuple.MutablePair)
        at com.esotericsoftware.kryo.Kryo$DefaultInstantiatorStrategy.newInstantiatorOf(Kryo.java:1228)
        at com.esotericsoftware.kryo.Kryo.newInstantiator(Kryo.java:1049)
        at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1058)


Any help please.

Regards,
Raja.

From: "Raja.Aravapalli" <Ra...@target.com>>
Reply-To: "users@apex.apache.org<ma...@apex.apache.org>" <us...@apex.apache.org>>
Date: Sunday, June 19, 2016 at 12:22 AM
To: "users@apex.apache.org<ma...@apex.apache.org>" <us...@apex.apache.org>>
Subject: Re: Kafka input operator


Thanks for the response Priyanka…

But, when I try to put in my own package, some of the protected variables are not accessible!!!!


Regards,
Raja.

From: Priyanka Gugale <pr...@datatorrent.com>>
Reply-To: "users@apex.apache.org<ma...@apex.apache.org>" <us...@apex.apache.org>>
Date: Saturday, June 18, 2016 at 10:29 AM
To: "users@apex.apache.org<ma...@apex.apache.org>" <us...@apex.apache.org>>
Subject: Re: Kafka input operator


Hi,

Yes sure, you can use any package name you want. In fact better you put this class outside Malhar jar. Just keep the Malhar jar in your class path.

-Priyanka

On Jun 17, 2016 8:03 PM, "Raja.Aravapalli" <Ra...@target.com>> wrote:

Hi Priyanka,

Can this be done from a class outside the package “com.datatorrent.contrib.kafka;” ?

I don’t want to disturb the source :(



Regards,
Raja.

From: "Raja.Aravapalli" <Ra...@target.com>>
Date: Friday, June 17, 2016 at 5:38 AM
To: "users@apex.apache.org<ma...@apex.apache.org>" <us...@apex.apache.org>>
Subject: Re: Kafka input operator


Hi Priyanka,

I am using kafka version 0.8.x.

Awesome. Yes. This is what is want. I shall test this and share my updates. Having one kafka operator like this in Malhar, will be a very good one. I don’t see such availability in Storm as well!!



Regards,
Raja.

From: Priyanka Gugale <pr...@datatorrent.com>>
Reply-To: "users@apex.apache.org<ma...@apex.apache.org>" <us...@apex.apache.org>>
Date: Friday, June 17, 2016 at 2:05 AM
To: "users@apex.apache.org<ma...@apex.apache.org>" <us...@apex.apache.org>>
Subject: Re: Kafka input operator

Hi Raja,

I have quickly wrote an operator to fulfill your requirement. The code is available here<https://github.com/apache/apex-malhar/compare/master...DT-Priyanka:Kafka-input-updates>. Let me know if this addresses your usecase.

-Priyanka

On Fri, Jun 17, 2016 at 11:32 AM, Priyanka Gugale <pr...@datatorrent.com>> wrote:
Hi Raja,

You will need to update other places as well (I guess it's replay other than emitTuples) . But I think it is not feasible to replicate emitTuples code in subclass as many of the parent class variables are private. I would try to figure out if there is any other way.

Can you please confirm which Kafka version you are using?

-Priyanka

On Thu, Jun 16, 2016 at 8:39 PM, Raja.Aravapalli <Ra...@target.com>> wrote:

Hi Chaitanya,

Would the below changes you proposed enough to retrieve partition & offset ?

I see emitTuple(Message msg) is being called at various places in the code… please advise. Thank you.


Regards,
Raja.

From: "Raja.Aravapalli" <Ra...@target.com>>
Date: Tuesday, June 14, 2016 at 9:50 PM
To: "users@apex.apache.org<ma...@apex.apache.org>" <us...@apex.apache.org>>
Subject: Re: Kafka input operator


Thanks for the response Chaitanya. I will follow the suggestions to retrieve Kafka partitionId & offset!!


Regards,
Raja.

From: Chaitanya Chebolu <ch...@datatorrent.com>>
Reply-To: "users@apex.apache.org<ma...@apex.apache.org>" <us...@apex.apache.org>>
Date: Monday, June 13, 2016 at 3:06 AM
To: "users@apex.apache.org<ma...@apex.apache.org>" <us...@apex.apache.org>>
Subject: Re: Kafka input operator

Hi Raja,

   I think you are using 0.8 version of kafka operator. There is no such operator in Malhar.  To meet your requirement, please do as below:

  Create a new class which extend from AbstractKafkaInputOperator. Override the API "void emitTuples()" and create the output port of type MutablePair<Message,MutablePair<long,int>>

Copy the emitTuples() from AbstractKafkaInputOperator and change the below line:
emitTuple(message.msg) to
outputPort.emit(new MutablePair<>(message.getMsg(),new MutablePair<>(message.getOffset(),message.getKafkaPart().getPartitionId())));

Regards,
Chaitanya


On Sat, Jun 11, 2016 at 7:56 PM, Raja.Aravapalli <Ra...@target.com>> wrote:

Hi

Does anyone have an idea, if any of the existing kafka input operators give the ability to retrieve  kafka Partition ID & Offset a particular message came from, along with the messages ?


Thanks a lot in advance.


Regards,
Raja.




Re: Kafka input operator

Posted by "Raja.Aravapalli" <Ra...@target.com>.
Thanks for the response Priyanka…

But, when I try to put in my own package, some of the protected variables are not accessible!!!!


Regards,
Raja.

From: Priyanka Gugale <pr...@datatorrent.com>>
Reply-To: "users@apex.apache.org<ma...@apex.apache.org>" <us...@apex.apache.org>>
Date: Saturday, June 18, 2016 at 10:29 AM
To: "users@apex.apache.org<ma...@apex.apache.org>" <us...@apex.apache.org>>
Subject: Re: Kafka input operator


Hi,

Yes sure, you can use any package name you want. In fact better you put this class outside Malhar jar. Just keep the Malhar jar in your class path.

-Priyanka

On Jun 17, 2016 8:03 PM, "Raja.Aravapalli" <Ra...@target.com>> wrote:

Hi Priyanka,

Can this be done from a class outside the package “com.datatorrent.contrib.kafka;” ?

I don’t want to disturb the source :(



Regards,
Raja.

From: "Raja.Aravapalli" <Ra...@target.com>>
Date: Friday, June 17, 2016 at 5:38 AM
To: "users@apex.apache.org<ma...@apex.apache.org>" <us...@apex.apache.org>>
Subject: Re: Kafka input operator


Hi Priyanka,

I am using kafka version 0.8.x.

Awesome. Yes. This is what is want. I shall test this and share my updates. Having one kafka operator like this in Malhar, will be a very good one. I don’t see such availability in Storm as well!!



Regards,
Raja.

From: Priyanka Gugale <pr...@datatorrent.com>>
Reply-To: "users@apex.apache.org<ma...@apex.apache.org>" <us...@apex.apache.org>>
Date: Friday, June 17, 2016 at 2:05 AM
To: "users@apex.apache.org<ma...@apex.apache.org>" <us...@apex.apache.org>>
Subject: Re: Kafka input operator

Hi Raja,

I have quickly wrote an operator to fulfill your requirement. The code is available here<https://github.com/apache/apex-malhar/compare/master...DT-Priyanka:Kafka-input-updates>. Let me know if this addresses your usecase.

-Priyanka

On Fri, Jun 17, 2016 at 11:32 AM, Priyanka Gugale <pr...@datatorrent.com>> wrote:
Hi Raja,

You will need to update other places as well (I guess it's replay other than emitTuples) . But I think it is not feasible to replicate emitTuples code in subclass as many of the parent class variables are private. I would try to figure out if there is any other way.

Can you please confirm which Kafka version you are using?

-Priyanka

On Thu, Jun 16, 2016 at 8:39 PM, Raja.Aravapalli <Ra...@target.com>> wrote:

Hi Chaitanya,

Would the below changes you proposed enough to retrieve partition & offset ?

I see emitTuple(Message msg) is being called at various places in the code… please advise. Thank you.


Regards,
Raja.

From: "Raja.Aravapalli" <Ra...@target.com>>
Date: Tuesday, June 14, 2016 at 9:50 PM
To: "users@apex.apache.org<ma...@apex.apache.org>" <us...@apex.apache.org>>
Subject: Re: Kafka input operator


Thanks for the response Chaitanya. I will follow the suggestions to retrieve Kafka partitionId & offset!!


Regards,
Raja.

From: Chaitanya Chebolu <ch...@datatorrent.com>>
Reply-To: "users@apex.apache.org<ma...@apex.apache.org>" <us...@apex.apache.org>>
Date: Monday, June 13, 2016 at 3:06 AM
To: "users@apex.apache.org<ma...@apex.apache.org>" <us...@apex.apache.org>>
Subject: Re: Kafka input operator

Hi Raja,

   I think you are using 0.8 version of kafka operator. There is no such operator in Malhar.  To meet your requirement, please do as below:

  Create a new class which extend from AbstractKafkaInputOperator. Override the API "void emitTuples()" and create the output port of type MutablePair<Message,MutablePair<long,int>>

Copy the emitTuples() from AbstractKafkaInputOperator and change the below line:
emitTuple(message.msg) to
outputPort.emit(new MutablePair<>(message.getMsg(),new MutablePair<>(message.getOffset(),message.getKafkaPart().getPartitionId())));

Regards,
Chaitanya


On Sat, Jun 11, 2016 at 7:56 PM, Raja.Aravapalli <Ra...@target.com>> wrote:

Hi

Does anyone have an idea, if any of the existing kafka input operators give the ability to retrieve  kafka Partition ID & Offset a particular message came from, along with the messages ?


Thanks a lot in advance.


Regards,
Raja.




Re: Kafka input operator

Posted by Priyanka Gugale <pr...@datatorrent.com>.
Hi,

Yes sure, you can use any package name you want. In fact better you put
this class outside Malhar jar. Just keep the Malhar jar in your class path.

-Priyanka
On Jun 17, 2016 8:03 PM, "Raja.Aravapalli" <Ra...@target.com>
wrote:

>
> Hi Priyanka,
>
> Can this be done from a class outside the package “
> com.datatorrent.contrib.kafka;” ?
>
> I don’t want to disturb the source :(
>
>
>
> Regards,
> Raja.
>
> From: "Raja.Aravapalli" <Ra...@target.com>
> Date: Friday, June 17, 2016 at 5:38 AM
> To: "users@apex.apache.org" <us...@apex.apache.org>
> Subject: Re: Kafka input operator
>
>
> Hi Priyanka,
>
> I am using kafka version 0.8.x.
>
> Awesome. Yes. This is what is want. I shall test this and share my
> updates. Having one kafka operator like this in Malhar, will be a very good
> one. I don’t see such availability in Storm as well!!
>
>
>
> Regards,
> Raja.
>
> From: Priyanka Gugale <pr...@datatorrent.com>
> Reply-To: "users@apex.apache.org" <us...@apex.apache.org>
> Date: Friday, June 17, 2016 at 2:05 AM
> To: "users@apex.apache.org" <us...@apex.apache.org>
> Subject: Re: Kafka input operator
>
> Hi Raja,
>
> I have quickly wrote an operator to fulfill your requirement. The code is
> available here
> <https://github.com/apache/apex-malhar/compare/master...DT-Priyanka:Kafka-input-updates>.
> Let me know if this addresses your usecase.
>
> -Priyanka
>
> On Fri, Jun 17, 2016 at 11:32 AM, Priyanka Gugale <
> priyanka@datatorrent.com> wrote:
>
>> Hi Raja,
>>
>> You will need to update other places as well (I guess it's replay other
>> than emitTuples) . But I think it is not feasible to replicate emitTuples
>> code in subclass as many of the parent class variables are private. I would
>> try to figure out if there is any other way.
>>
>> Can you please confirm which Kafka version you are using?
>>
>> -Priyanka
>>
>> On Thu, Jun 16, 2016 at 8:39 PM, Raja.Aravapalli <
>> Raja.Aravapalli@target.com> wrote:
>>
>>>
>>> Hi Chaitanya,
>>>
>>> Would the below changes you proposed enough to retrieve partition &
>>> offset ?
>>>
>>> I see *emitTuple(Message msg) i*s being called at various places in the
>>> code… please advise. Thank you.
>>>
>>>
>>> Regards,
>>> Raja.
>>>
>>> From: "Raja.Aravapalli" <Ra...@target.com>
>>> Date: Tuesday, June 14, 2016 at 9:50 PM
>>> To: "users@apex.apache.org" <us...@apex.apache.org>
>>> Subject: Re: Kafka input operator
>>>
>>>
>>> Thanks for the response Chaitanya. I will follow the suggestions to
>>> retrieve Kafka partitionId & offset!!
>>>
>>>
>>> Regards,
>>> Raja.
>>>
>>> From: Chaitanya Chebolu <ch...@datatorrent.com>
>>> Reply-To: "users@apex.apache.org" <us...@apex.apache.org>
>>> Date: Monday, June 13, 2016 at 3:06 AM
>>> To: "users@apex.apache.org" <us...@apex.apache.org>
>>> Subject: Re: Kafka input operator
>>>
>>> Hi Raja,
>>>
>>>    I think you are using 0.8 version of kafka operator. There is no such
>>> operator in Malhar.  To meet your requirement, please do as below:
>>>
>>>   Create a new class which extend from AbstractKafkaInputOperator.
>>> Override the API "void emitTuples()" and create the output port of type
>>> MutablePair<Message,MutablePair<long,int>>
>>>
>>> Copy the emitTuples() from AbstractKafkaInputOperator and change the
>>> below line:
>>> emitTuple(message.msg) to
>>> outputPort.emit(new MutablePair<>(message.getMsg(),new
>>> MutablePair<>(message.getOffset(),message.getKafkaPart().getPartitionId())));
>>>
>>> Regards,
>>> Chaitanya
>>>
>>>
>>> On Sat, Jun 11, 2016 at 7:56 PM, Raja.Aravapalli <
>>> Raja.Aravapalli@target.com> wrote:
>>>
>>>>
>>>> Hi
>>>>
>>>> Does anyone have an idea, if any of the existing kafka input operators
>>>> give the ability to retrieve  kafka Partition ID & Offset a particular
>>>> message came from, along with the messages ?
>>>>
>>>>
>>>> Thanks a lot in advance.
>>>>
>>>>
>>>> Regards,
>>>> Raja.
>>>>
>>>
>>>
>>
>

Re: Kafka input operator

Posted by "Raja.Aravapalli" <Ra...@target.com>.
Hi Priyanka,

Can this be done from a class outside the package “com.datatorrent.contrib.kafka;” ?

I don’t want to disturb the source :(



Regards,
Raja.

From: "Raja.Aravapalli" <Ra...@target.com>>
Date: Friday, June 17, 2016 at 5:38 AM
To: "users@apex.apache.org<ma...@apex.apache.org>" <us...@apex.apache.org>>
Subject: Re: Kafka input operator


Hi Priyanka,

I am using kafka version 0.8.x.

Awesome. Yes. This is what is want. I shall test this and share my updates. Having one kafka operator like this in Malhar, will be a very good one. I don’t see such availability in Storm as well!!



Regards,
Raja.

From: Priyanka Gugale <pr...@datatorrent.com>>
Reply-To: "users@apex.apache.org<ma...@apex.apache.org>" <us...@apex.apache.org>>
Date: Friday, June 17, 2016 at 2:05 AM
To: "users@apex.apache.org<ma...@apex.apache.org>" <us...@apex.apache.org>>
Subject: Re: Kafka input operator

Hi Raja,

I have quickly wrote an operator to fulfill your requirement. The code is available here<https://github.com/apache/apex-malhar/compare/master...DT-Priyanka:Kafka-input-updates>. Let me know if this addresses your usecase.

-Priyanka

On Fri, Jun 17, 2016 at 11:32 AM, Priyanka Gugale <pr...@datatorrent.com>> wrote:
Hi Raja,

You will need to update other places as well (I guess it's replay other than emitTuples) . But I think it is not feasible to replicate emitTuples code in subclass as many of the parent class variables are private. I would try to figure out if there is any other way.

Can you please confirm which Kafka version you are using?

-Priyanka

On Thu, Jun 16, 2016 at 8:39 PM, Raja.Aravapalli <Ra...@target.com>> wrote:

Hi Chaitanya,

Would the below changes you proposed enough to retrieve partition & offset ?

I see emitTuple(Message msg) is being called at various places in the code… please advise. Thank you.


Regards,
Raja.

From: "Raja.Aravapalli" <Ra...@target.com>>
Date: Tuesday, June 14, 2016 at 9:50 PM
To: "users@apex.apache.org<ma...@apex.apache.org>" <us...@apex.apache.org>>
Subject: Re: Kafka input operator


Thanks for the response Chaitanya. I will follow the suggestions to retrieve Kafka partitionId & offset!!


Regards,
Raja.

From: Chaitanya Chebolu <ch...@datatorrent.com>>
Reply-To: "users@apex.apache.org<ma...@apex.apache.org>" <us...@apex.apache.org>>
Date: Monday, June 13, 2016 at 3:06 AM
To: "users@apex.apache.org<ma...@apex.apache.org>" <us...@apex.apache.org>>
Subject: Re: Kafka input operator

Hi Raja,

   I think you are using 0.8 version of kafka operator. There is no such operator in Malhar.  To meet your requirement, please do as below:

  Create a new class which extend from AbstractKafkaInputOperator. Override the API "void emitTuples()" and create the output port of type MutablePair<Message,MutablePair<long,int>>

Copy the emitTuples() from AbstractKafkaInputOperator and change the below line:
emitTuple(message.msg) to
outputPort.emit(new MutablePair<>(message.getMsg(),new MutablePair<>(message.getOffset(),message.getKafkaPart().getPartitionId())));

Regards,
Chaitanya


On Sat, Jun 11, 2016 at 7:56 PM, Raja.Aravapalli <Ra...@target.com>> wrote:

Hi

Does anyone have an idea, if any of the existing kafka input operators give the ability to retrieve  kafka Partition ID & Offset a particular message came from, along with the messages ?


Thanks a lot in advance.


Regards,
Raja.




Re: Kafka input operator

Posted by "Raja.Aravapalli" <Ra...@target.com>.
Hi Priyanka,

I am using kafka version 0.8.x.

Awesome. Yes. This is what is want. I shall test this and share my updates. Having one kafka operator like this in Malhar, will be a very good one. I don’t see such availability in Storm as well!!



Regards,
Raja.

From: Priyanka Gugale <pr...@datatorrent.com>>
Reply-To: "users@apex.apache.org<ma...@apex.apache.org>" <us...@apex.apache.org>>
Date: Friday, June 17, 2016 at 2:05 AM
To: "users@apex.apache.org<ma...@apex.apache.org>" <us...@apex.apache.org>>
Subject: Re: Kafka input operator

Hi Raja,

I have quickly wrote an operator to fulfill your requirement. The code is available here<https://github.com/apache/apex-malhar/compare/master...DT-Priyanka:Kafka-input-updates>. Let me know if this addresses your usecase.

-Priyanka

On Fri, Jun 17, 2016 at 11:32 AM, Priyanka Gugale <pr...@datatorrent.com>> wrote:
Hi Raja,

You will need to update other places as well (I guess it's replay other than emitTuples) . But I think it is not feasible to replicate emitTuples code in subclass as many of the parent class variables are private. I would try to figure out if there is any other way.

Can you please confirm which Kafka version you are using?

-Priyanka

On Thu, Jun 16, 2016 at 8:39 PM, Raja.Aravapalli <Ra...@target.com>> wrote:

Hi Chaitanya,

Would the below changes you proposed enough to retrieve partition & offset ?

I see emitTuple(Message msg) is being called at various places in the code… please advise. Thank you.


Regards,
Raja.

From: "Raja.Aravapalli" <Ra...@target.com>>
Date: Tuesday, June 14, 2016 at 9:50 PM
To: "users@apex.apache.org<ma...@apex.apache.org>" <us...@apex.apache.org>>
Subject: Re: Kafka input operator


Thanks for the response Chaitanya. I will follow the suggestions to retrieve Kafka partitionId & offset!!


Regards,
Raja.

From: Chaitanya Chebolu <ch...@datatorrent.com>>
Reply-To: "users@apex.apache.org<ma...@apex.apache.org>" <us...@apex.apache.org>>
Date: Monday, June 13, 2016 at 3:06 AM
To: "users@apex.apache.org<ma...@apex.apache.org>" <us...@apex.apache.org>>
Subject: Re: Kafka input operator

Hi Raja,

   I think you are using 0.8 version of kafka operator. There is no such operator in Malhar.  To meet your requirement, please do as below:

  Create a new class which extend from AbstractKafkaInputOperator. Override the API "void emitTuples()" and create the output port of type MutablePair<Message,MutablePair<long,int>>

Copy the emitTuples() from AbstractKafkaInputOperator and change the below line:
emitTuple(message.msg) to
outputPort.emit(new MutablePair<>(message.getMsg(),new MutablePair<>(message.getOffset(),message.getKafkaPart().getPartitionId())));

Regards,
Chaitanya


On Sat, Jun 11, 2016 at 7:56 PM, Raja.Aravapalli <Ra...@target.com>> wrote:

Hi

Does anyone have an idea, if any of the existing kafka input operators give the ability to retrieve  kafka Partition ID & Offset a particular message came from, along with the messages ?


Thanks a lot in advance.


Regards,
Raja.




Re: Kafka input operator

Posted by Priyanka Gugale <pr...@datatorrent.com>.
Hi Raja,

I have quickly wrote an operator to fulfill your requirement. The code is
available here
<https://github.com/apache/apex-malhar/compare/master...DT-Priyanka:Kafka-input-updates>.
Let me know if this addresses your usecase.

-Priyanka

On Fri, Jun 17, 2016 at 11:32 AM, Priyanka Gugale <pr...@datatorrent.com>
wrote:

> Hi Raja,
>
> You will need to update other places as well (I guess it's replay other
> than emitTuples) . But I think it is not feasible to replicate emitTuples
> code in subclass as many of the parent class variables are private. I would
> try to figure out if there is any other way.
>
> Can you please confirm which Kafka version you are using?
>
> -Priyanka
>
> On Thu, Jun 16, 2016 at 8:39 PM, Raja.Aravapalli <
> Raja.Aravapalli@target.com> wrote:
>
>>
>> Hi Chaitanya,
>>
>> Would the below changes you proposed enough to retrieve partition &
>> offset ?
>>
>> I see *emitTuple(Message msg) i*s being called at various places in the
>> code… please advise. Thank you.
>>
>>
>> Regards,
>> Raja.
>>
>> From: "Raja.Aravapalli" <Ra...@target.com>
>> Date: Tuesday, June 14, 2016 at 9:50 PM
>> To: "users@apex.apache.org" <us...@apex.apache.org>
>> Subject: Re: Kafka input operator
>>
>>
>> Thanks for the response Chaitanya. I will follow the suggestions to
>> retrieve Kafka partitionId & offset!!
>>
>>
>> Regards,
>> Raja.
>>
>> From: Chaitanya Chebolu <ch...@datatorrent.com>
>> Reply-To: "users@apex.apache.org" <us...@apex.apache.org>
>> Date: Monday, June 13, 2016 at 3:06 AM
>> To: "users@apex.apache.org" <us...@apex.apache.org>
>> Subject: Re: Kafka input operator
>>
>> Hi Raja,
>>
>>    I think you are using 0.8 version of kafka operator. There is no such
>> operator in Malhar.  To meet your requirement, please do as below:
>>
>>   Create a new class which extend from AbstractKafkaInputOperator.
>> Override the API "void emitTuples()" and create the output port of type
>> MutablePair<Message,MutablePair<long,int>>
>>
>> Copy the emitTuples() from AbstractKafkaInputOperator and change the
>> below line:
>> emitTuple(message.msg) to
>> outputPort.emit(new MutablePair<>(message.getMsg(),new
>> MutablePair<>(message.getOffset(),message.getKafkaPart().getPartitionId())));
>>
>> Regards,
>> Chaitanya
>>
>>
>> On Sat, Jun 11, 2016 at 7:56 PM, Raja.Aravapalli <
>> Raja.Aravapalli@target.com> wrote:
>>
>>>
>>> Hi
>>>
>>> Does anyone have an idea, if any of the existing kafka input operators
>>> give the ability to retrieve  kafka Partition ID & Offset a particular
>>> message came from, along with the messages ?
>>>
>>>
>>> Thanks a lot in advance.
>>>
>>>
>>> Regards,
>>> Raja.
>>>
>>
>>
>

Re: Kafka input operator

Posted by Priyanka Gugale <pr...@datatorrent.com>.
Hi Raja,

You will need to update other places as well (I guess it's replay other
than emitTuples) . But I think it is not feasible to replicate emitTuples
code in subclass as many of the parent class variables are private. I would
try to figure out if there is any other way.

Can you please confirm which Kafka version you are using?

-Priyanka

On Thu, Jun 16, 2016 at 8:39 PM, Raja.Aravapalli <Raja.Aravapalli@target.com
> wrote:

>
> Hi Chaitanya,
>
> Would the below changes you proposed enough to retrieve partition & offset
> ?
>
> I see *emitTuple(Message msg) i*s being called at various places in the
> code… please advise. Thank you.
>
>
> Regards,
> Raja.
>
> From: "Raja.Aravapalli" <Ra...@target.com>
> Date: Tuesday, June 14, 2016 at 9:50 PM
> To: "users@apex.apache.org" <us...@apex.apache.org>
> Subject: Re: Kafka input operator
>
>
> Thanks for the response Chaitanya. I will follow the suggestions to
> retrieve Kafka partitionId & offset!!
>
>
> Regards,
> Raja.
>
> From: Chaitanya Chebolu <ch...@datatorrent.com>
> Reply-To: "users@apex.apache.org" <us...@apex.apache.org>
> Date: Monday, June 13, 2016 at 3:06 AM
> To: "users@apex.apache.org" <us...@apex.apache.org>
> Subject: Re: Kafka input operator
>
> Hi Raja,
>
>    I think you are using 0.8 version of kafka operator. There is no such
> operator in Malhar.  To meet your requirement, please do as below:
>
>   Create a new class which extend from AbstractKafkaInputOperator.
> Override the API "void emitTuples()" and create the output port of type
> MutablePair<Message,MutablePair<long,int>>
>
> Copy the emitTuples() from AbstractKafkaInputOperator and change the below
> line:
> emitTuple(message.msg) to
> outputPort.emit(new MutablePair<>(message.getMsg(),new
> MutablePair<>(message.getOffset(),message.getKafkaPart().getPartitionId())));
>
> Regards,
> Chaitanya
>
>
> On Sat, Jun 11, 2016 at 7:56 PM, Raja.Aravapalli <
> Raja.Aravapalli@target.com> wrote:
>
>>
>> Hi
>>
>> Does anyone have an idea, if any of the existing kafka input operators
>> give the ability to retrieve  kafka Partition ID & Offset a particular
>> message came from, along with the messages ?
>>
>>
>> Thanks a lot in advance.
>>
>>
>> Regards,
>> Raja.
>>
>
>

Re: Kafka input operator

Posted by "Raja.Aravapalli" <Ra...@target.com>.
Hi Chaitanya,

Would the below changes you proposed enough to retrieve partition & offset ?

I see emitTuple(Message msg) is being called at various places in the code… please advise. Thank you.


Regards,
Raja.

From: "Raja.Aravapalli" <Ra...@target.com>>
Date: Tuesday, June 14, 2016 at 9:50 PM
To: "users@apex.apache.org<ma...@apex.apache.org>" <us...@apex.apache.org>>
Subject: Re: Kafka input operator


Thanks for the response Chaitanya. I will follow the suggestions to retrieve Kafka partitionId & offset!!


Regards,
Raja.

From: Chaitanya Chebolu <ch...@datatorrent.com>>
Reply-To: "users@apex.apache.org<ma...@apex.apache.org>" <us...@apex.apache.org>>
Date: Monday, June 13, 2016 at 3:06 AM
To: "users@apex.apache.org<ma...@apex.apache.org>" <us...@apex.apache.org>>
Subject: Re: Kafka input operator

Hi Raja,

   I think you are using 0.8 version of kafka operator. There is no such operator in Malhar.  To meet your requirement, please do as below:

  Create a new class which extend from AbstractKafkaInputOperator. Override the API "void emitTuples()" and create the output port of type MutablePair<Message,MutablePair<long,int>>

Copy the emitTuples() from AbstractKafkaInputOperator and change the below line:
emitTuple(message.msg) to
outputPort.emit(new MutablePair<>(message.getMsg(),new MutablePair<>(message.getOffset(),message.getKafkaPart().getPartitionId())));

Regards,
Chaitanya


On Sat, Jun 11, 2016 at 7:56 PM, Raja.Aravapalli <Ra...@target.com>> wrote:

Hi

Does anyone have an idea, if any of the existing kafka input operators give the ability to retrieve  kafka Partition ID & Offset a particular message came from, along with the messages ?


Thanks a lot in advance.


Regards,
Raja.


Re: Kafka input operator

Posted by "Raja.Aravapalli" <Ra...@target.com>.
Thanks for the response Chaitanya. I will follow the suggestions to retrieve Kafka partitionId & offset!!


Regards,
Raja.

From: Chaitanya Chebolu <ch...@datatorrent.com>>
Reply-To: "users@apex.apache.org<ma...@apex.apache.org>" <us...@apex.apache.org>>
Date: Monday, June 13, 2016 at 3:06 AM
To: "users@apex.apache.org<ma...@apex.apache.org>" <us...@apex.apache.org>>
Subject: Re: Kafka input operator

Hi Raja,

   I think you are using 0.8 version of kafka operator. There is no such operator in Malhar.  To meet your requirement, please do as below:

  Create a new class which extend from AbstractKafkaInputOperator. Override the API "void emitTuples()" and create the output port of type MutablePair<Message,MutablePair<long,int>>

Copy the emitTuples() from AbstractKafkaInputOperator and change the below line:
emitTuple(message.msg) to
outputPort.emit(new MutablePair<>(message.getMsg(),new MutablePair<>(message.getOffset(),message.getKafkaPart().getPartitionId())));

Regards,
Chaitanya


On Sat, Jun 11, 2016 at 7:56 PM, Raja.Aravapalli <Ra...@target.com>> wrote:

Hi

Does anyone have an idea, if any of the existing kafka input operators give the ability to retrieve  kafka Partition ID & Offset a particular message came from, along with the messages ?


Thanks a lot in advance.


Regards,
Raja.