You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@samza.apache.org by Massimiliano Tomassi <ma...@gmail.com> on 2014/09/04 23:58:41 UTC

Implementing a custom MessageCollector

Hello all,
I was trying to figure out what's the way to implement and use a custom
MessageCollector. Let's say I want to send messages to a system different
from Kafka. How should I do that? Is there any tutorial explaining this?

I was also thinking at the following use case, not sure if it makes sense
at all but here it is: let's say we receive messages, process them somehow
and then we want to store the results in a remote DB, Cassandra for
example. Does it make sense to create an implementation of MessageCollector
that stores to Cassandra? Also, given that performing a write for every
single message can be not very efficient, would it be possible to collect
some data and then write them to Cassandra as a single batch operation?

I hope to have explained myself decently...and I hope to receive some
suggestions.

All the best.
Max


-- 
------------------------------------------------
Massimiliano Tomassi
------------------------------------------------
e-mail: max.tomassi@gmail.com
------------------------------------------------

Re: Implementing a custom MessageCollector

Posted by Massimiliano Tomassi <ma...@gmail.com>.
Thanks Jakob,
that's exactly the kind of solution had in mind. I'll have a look at the
KafkaSystemProducer then and I'll try to implement new producers. I'll keep
you updated, thanks for this.

Max


On 5 September 2014 18:23, Jakob Homan <jg...@gmail.com> wrote:

> A more general solution would be to implement a SystemProducer for the
> particular destination in mind, in the same way that we have a
> KafkaSystemProducer that writes messages to Kafka.  The MessageCollector's
> purpose is to collect messages and deliver them to the appropriate
> SystemProducer.  Batching can then be handled by the SystemProducer as
> appropriate, particularly as it will be receiving messages from all the
> TaskNames that particular SamzaContainer is responsible for.  A
> CassandraSystemProducer or ActiveMQSystemProducer would be a most welcome
> addition, if you're interested.
>
> -Jakob
>
>
>
> On Fri, Sep 5, 2014 at 1:59 AM, Massimiliano Tomassi <
> max.tomassi@gmail.com>
> wrote:
>
> > That makes a lot of sense to me, thanks.
> >
> > Could you also point me to how implement a custom MessageCollector, for
> > example if I want to send messages to ActiveMQ instead of Kafka?
> >
> > Thanks for your help
> > Max
> > On 5 Sep 2014 09:52, "Yan Fang" <ya...@gmail.com> wrote:
> >
> > > Hi Massimiliano,
> > >
> > > From my understanding, what you want to do is to process the messages
> and
> > > then store them into, say, Cassandra. To implement this, it's not
> > necessary
> > > to write MessageCollector. What you only need to do is to put the
> writing
> > > logic in the process method, see the API doc
> > > <
> > >
> >
> https://samza.incubator.apache.org/learn/documentation/latest/api/overview.html
> > > >.
> > > The method is called for every message. So you can process the message
> > and
> > > store it into the remote DB if you want.
> > >
> > > Assume you already tested the hello-samza
> > > <https://samza.incubator.apache.org/startup/hello-samza/latest/>
> > project,
> > > you can have a look at the WikipediaFeedStreamTask
> > > <
> > >
> >
> https://github.com/apache/incubator-samza-hello-samza/blob/master/samza-wikipedia/src/main/java/samza/examples/wikipedia/task/WikipediaFeedStreamTask.java
> > > >
> > > and
> > > WikipediaParserStreamTask
> > > <
> > >
> >
> https://github.com/apache/incubator-samza-hello-samza/blob/master/samza-wikipedia/src/main/java/samza/examples/wikipedia/task/WikipediaParserStreamTask.java
> > > >
> > > .
> > > You can put your logic in the process method.
> > >
> > > In terms of writing a batch into the DB for better performance, you can
> > > have a batch variable (such as List, Map) in the StreamTask to store
> the
> > > processed result. Then write the results in the variable into the DB
> > after
> > > certain number of messages or after certain time ( by implementing the
> > > Window
> > > <
> > >
> >
> https://samza.incubator.apache.org/learn/documentation/latest/container/windowing.html
> > > >
> > >  interface).
> > >
> > > Hope that helps.
> > >
> > > Thanks,
> > >
> > > Fang, Yan
> > > yanfang724@gmail.com
> > > +1 (206) 849-4108
> > >
> > >
> > > On Thu, Sep 4, 2014 at 2:58 PM, Massimiliano Tomassi <
> > > max.tomassi@gmail.com>
> > > wrote:
> > >
> > > > Hello all,
> > > > I was trying to figure out what's the way to implement and use a
> custom
> > > > MessageCollector. Let's say I want to send messages to a system
> > different
> > > > from Kafka. How should I do that? Is there any tutorial explaining
> > this?
> > > >
> > > > I was also thinking at the following use case, not sure if it makes
> > sense
> > > > at all but here it is: let's say we receive messages, process them
> > > somehow
> > > > and then we want to store the results in a remote DB, Cassandra for
> > > > example. Does it make sense to create an implementation of
> > > MessageCollector
> > > > that stores to Cassandra? Also, given that performing a write for
> every
> > > > single message can be not very efficient, would it be possible to
> > collect
> > > > some data and then write them to Cassandra as a single batch
> operation?
> > > >
> > > > I hope to have explained myself decently...and I hope to receive some
> > > > suggestions.
> > > >
> > > > All the best.
> > > > Max
> > > >
> > > >
> > > > --
> > > > ------------------------------------------------
> > > > Massimiliano Tomassi
> > > > ------------------------------------------------
> > > > e-mail: max.tomassi@gmail.com
> > > > ------------------------------------------------
> > > >
> > >
> >
>



-- 
------------------------------------------------
Massimiliano Tomassi
------------------------------------------------
web: http://about.me/maxtomassi
e-mail: max.tomassi@gmail.com
mobile: +447751193667
------------------------------------------------

Re: Implementing a custom MessageCollector

Posted by Jakob Homan <jg...@gmail.com>.
A more general solution would be to implement a SystemProducer for the
particular destination in mind, in the same way that we have a
KafkaSystemProducer that writes messages to Kafka.  The MessageCollector's
purpose is to collect messages and deliver them to the appropriate
SystemProducer.  Batching can then be handled by the SystemProducer as
appropriate, particularly as it will be receiving messages from all the
TaskNames that particular SamzaContainer is responsible for.  A
CassandraSystemProducer or ActiveMQSystemProducer would be a most welcome
addition, if you're interested.

-Jakob



On Fri, Sep 5, 2014 at 1:59 AM, Massimiliano Tomassi <ma...@gmail.com>
wrote:

> That makes a lot of sense to me, thanks.
>
> Could you also point me to how implement a custom MessageCollector, for
> example if I want to send messages to ActiveMQ instead of Kafka?
>
> Thanks for your help
> Max
> On 5 Sep 2014 09:52, "Yan Fang" <ya...@gmail.com> wrote:
>
> > Hi Massimiliano,
> >
> > From my understanding, what you want to do is to process the messages and
> > then store them into, say, Cassandra. To implement this, it's not
> necessary
> > to write MessageCollector. What you only need to do is to put the writing
> > logic in the process method, see the API doc
> > <
> >
> https://samza.incubator.apache.org/learn/documentation/latest/api/overview.html
> > >.
> > The method is called for every message. So you can process the message
> and
> > store it into the remote DB if you want.
> >
> > Assume you already tested the hello-samza
> > <https://samza.incubator.apache.org/startup/hello-samza/latest/>
> project,
> > you can have a look at the WikipediaFeedStreamTask
> > <
> >
> https://github.com/apache/incubator-samza-hello-samza/blob/master/samza-wikipedia/src/main/java/samza/examples/wikipedia/task/WikipediaFeedStreamTask.java
> > >
> > and
> > WikipediaParserStreamTask
> > <
> >
> https://github.com/apache/incubator-samza-hello-samza/blob/master/samza-wikipedia/src/main/java/samza/examples/wikipedia/task/WikipediaParserStreamTask.java
> > >
> > .
> > You can put your logic in the process method.
> >
> > In terms of writing a batch into the DB for better performance, you can
> > have a batch variable (such as List, Map) in the StreamTask to store the
> > processed result. Then write the results in the variable into the DB
> after
> > certain number of messages or after certain time ( by implementing the
> > Window
> > <
> >
> https://samza.incubator.apache.org/learn/documentation/latest/container/windowing.html
> > >
> >  interface).
> >
> > Hope that helps.
> >
> > Thanks,
> >
> > Fang, Yan
> > yanfang724@gmail.com
> > +1 (206) 849-4108
> >
> >
> > On Thu, Sep 4, 2014 at 2:58 PM, Massimiliano Tomassi <
> > max.tomassi@gmail.com>
> > wrote:
> >
> > > Hello all,
> > > I was trying to figure out what's the way to implement and use a custom
> > > MessageCollector. Let's say I want to send messages to a system
> different
> > > from Kafka. How should I do that? Is there any tutorial explaining
> this?
> > >
> > > I was also thinking at the following use case, not sure if it makes
> sense
> > > at all but here it is: let's say we receive messages, process them
> > somehow
> > > and then we want to store the results in a remote DB, Cassandra for
> > > example. Does it make sense to create an implementation of
> > MessageCollector
> > > that stores to Cassandra? Also, given that performing a write for every
> > > single message can be not very efficient, would it be possible to
> collect
> > > some data and then write them to Cassandra as a single batch operation?
> > >
> > > I hope to have explained myself decently...and I hope to receive some
> > > suggestions.
> > >
> > > All the best.
> > > Max
> > >
> > >
> > > --
> > > ------------------------------------------------
> > > Massimiliano Tomassi
> > > ------------------------------------------------
> > > e-mail: max.tomassi@gmail.com
> > > ------------------------------------------------
> > >
> >
>

Re: Implementing a custom MessageCollector

Posted by Massimiliano Tomassi <ma...@gmail.com>.
That makes a lot of sense to me, thanks.

Could you also point me to how implement a custom MessageCollector, for
example if I want to send messages to ActiveMQ instead of Kafka?

Thanks for your help
Max
On 5 Sep 2014 09:52, "Yan Fang" <ya...@gmail.com> wrote:

> Hi Massimiliano,
>
> From my understanding, what you want to do is to process the messages and
> then store them into, say, Cassandra. To implement this, it's not necessary
> to write MessageCollector. What you only need to do is to put the writing
> logic in the process method, see the API doc
> <
> https://samza.incubator.apache.org/learn/documentation/latest/api/overview.html
> >.
> The method is called for every message. So you can process the message and
> store it into the remote DB if you want.
>
> Assume you already tested the hello-samza
> <https://samza.incubator.apache.org/startup/hello-samza/latest/> project,
> you can have a look at the WikipediaFeedStreamTask
> <
> https://github.com/apache/incubator-samza-hello-samza/blob/master/samza-wikipedia/src/main/java/samza/examples/wikipedia/task/WikipediaFeedStreamTask.java
> >
> and
> WikipediaParserStreamTask
> <
> https://github.com/apache/incubator-samza-hello-samza/blob/master/samza-wikipedia/src/main/java/samza/examples/wikipedia/task/WikipediaParserStreamTask.java
> >
> .
> You can put your logic in the process method.
>
> In terms of writing a batch into the DB for better performance, you can
> have a batch variable (such as List, Map) in the StreamTask to store the
> processed result. Then write the results in the variable into the DB after
> certain number of messages or after certain time ( by implementing the
> Window
> <
> https://samza.incubator.apache.org/learn/documentation/latest/container/windowing.html
> >
>  interface).
>
> Hope that helps.
>
> Thanks,
>
> Fang, Yan
> yanfang724@gmail.com
> +1 (206) 849-4108
>
>
> On Thu, Sep 4, 2014 at 2:58 PM, Massimiliano Tomassi <
> max.tomassi@gmail.com>
> wrote:
>
> > Hello all,
> > I was trying to figure out what's the way to implement and use a custom
> > MessageCollector. Let's say I want to send messages to a system different
> > from Kafka. How should I do that? Is there any tutorial explaining this?
> >
> > I was also thinking at the following use case, not sure if it makes sense
> > at all but here it is: let's say we receive messages, process them
> somehow
> > and then we want to store the results in a remote DB, Cassandra for
> > example. Does it make sense to create an implementation of
> MessageCollector
> > that stores to Cassandra? Also, given that performing a write for every
> > single message can be not very efficient, would it be possible to collect
> > some data and then write them to Cassandra as a single batch operation?
> >
> > I hope to have explained myself decently...and I hope to receive some
> > suggestions.
> >
> > All the best.
> > Max
> >
> >
> > --
> > ------------------------------------------------
> > Massimiliano Tomassi
> > ------------------------------------------------
> > e-mail: max.tomassi@gmail.com
> > ------------------------------------------------
> >
>

Re: Implementing a custom MessageCollector

Posted by Yan Fang <ya...@gmail.com>.
Hi Massimiliano,

>From my understanding, what you want to do is to process the messages and
then store them into, say, Cassandra. To implement this, it's not necessary
to write MessageCollector. What you only need to do is to put the writing
logic in the process method, see the API doc
<https://samza.incubator.apache.org/learn/documentation/latest/api/overview.html>.
The method is called for every message. So you can process the message and
store it into the remote DB if you want.

Assume you already tested the hello-samza
<https://samza.incubator.apache.org/startup/hello-samza/latest/> project,
you can have a look at the WikipediaFeedStreamTask
<https://github.com/apache/incubator-samza-hello-samza/blob/master/samza-wikipedia/src/main/java/samza/examples/wikipedia/task/WikipediaFeedStreamTask.java>
and
WikipediaParserStreamTask
<https://github.com/apache/incubator-samza-hello-samza/blob/master/samza-wikipedia/src/main/java/samza/examples/wikipedia/task/WikipediaParserStreamTask.java>
.
You can put your logic in the process method.

In terms of writing a batch into the DB for better performance, you can
have a batch variable (such as List, Map) in the StreamTask to store the
processed result. Then write the results in the variable into the DB after
certain number of messages or after certain time ( by implementing the
Window
<https://samza.incubator.apache.org/learn/documentation/latest/container/windowing.html>
 interface).

Hope that helps.

Thanks,

Fang, Yan
yanfang724@gmail.com
+1 (206) 849-4108


On Thu, Sep 4, 2014 at 2:58 PM, Massimiliano Tomassi <ma...@gmail.com>
wrote:

> Hello all,
> I was trying to figure out what's the way to implement and use a custom
> MessageCollector. Let's say I want to send messages to a system different
> from Kafka. How should I do that? Is there any tutorial explaining this?
>
> I was also thinking at the following use case, not sure if it makes sense
> at all but here it is: let's say we receive messages, process them somehow
> and then we want to store the results in a remote DB, Cassandra for
> example. Does it make sense to create an implementation of MessageCollector
> that stores to Cassandra? Also, given that performing a write for every
> single message can be not very efficient, would it be possible to collect
> some data and then write them to Cassandra as a single batch operation?
>
> I hope to have explained myself decently...and I hope to receive some
> suggestions.
>
> All the best.
> Max
>
>
> --
> ------------------------------------------------
> Massimiliano Tomassi
> ------------------------------------------------
> e-mail: max.tomassi@gmail.com
> ------------------------------------------------
>