You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@samza.apache.org by Malcolm McFarland <mm...@cavulus.com> on 2019/10/07 20:28:51 UTC

Re: Using Kafka's ProducerInterceptor with Samza

Hey y'all, just wanted to follow up on this: does anybody have a working
example of using Kafka's ProducerInterceptor interface under YARN that I
could take a look at, or that they could provide some guidance with?

More generally, we're trying to keep an eye on Samza SSP instances that
occasionally seem to go "zombie"; ie, the AM doesn't see any problems, no
errors are surfaced, but they've just stop listening for messages. My idea
here is to record the topic/partition/offset on each message delivery, and
record the subsequent topic/partition/offset when Samza picks up the
message for processing. Are there any ideas out there about how to do this?

Cheers,
Malcolm McFarland
Cavulus


This correspondence is from HealthPlanCRM, LLC, d/b/a Cavulus. Any
unauthorized or improper disclosure, copying, distribution, or use of the
contents of this message is prohibited. The information contained in this
message is intended only for the personal and confidential use of the
recipient(s) named above. If you have received this message in error,
please notify the sender immediately and delete the original message.


Malcolm McFarland
Cavulus


This correspondence is from HealthPlanCRM, LLC, d/b/a Cavulus. Any
unauthorized or improper disclosure, copying, distribution, or use of the
contents of this message is prohibited. The information contained in this
message is intended only for the personal and confidential use of the
recipient(s) named above. If you have received this message in error,
please notify the sender immediately and delete the original message.


On Tue, Sep 10, 2019 at 3:13 PM Malcolm McFarland <mm...@cavulus.com>
wrote:

> Hey folks,
>
> I'm trying to intercept outgoing Kafka messages before they're sent to
> do some basic tracking (ie get the actual assigned partition number,
> which isn't available in the StreamTask or MessageCollector classes).
> I've written a class that conforms to Kafka's ProducerInterceptor
> interface [0] and have added the following line to my streamtask
> properties file:
>
>
> systems.kafka.producer.interceptor.classes=com.cavulus.kafka.CavulusKafkaProducerInterceptor
>
> Here's that class' package and signature:
>
> package com.cavulus.kafka;
> import org.apache.kafka.clients.producer.ProducerInterceptor;
> ...
> public class CavulusKafkaProducerInterceptor implements
> ProducerInterceptor { .. }
>
> The class compiles fine, so I assume that I'm adhering correctly to
> the interface, but when I try to start up the task in YARN, it stalls
> -- no logging, no errors, just nothing. I have no problem starting
> this streamtask when omitting the above line from my properties file.
>
> KIP-42 [1] explicitly references Samza as an inspiration for adding
> the ProducerInterceptor to Kafka 0.10, so I've been assuming some sort
> of compatibility (but I could be mistaken).
>
> Has anybody attempted this sort of combination?
>
> Cheers,
> Malcolm McFarland
> Cavulus
>
> [0]
> https://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/producer/ProducerInterceptor.html
> [1]
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-42%3A+Add+Producer+and+Consumer+Interceptors
>
> This correspondence is from HealthPlanCRM, LLC, d/b/a Cavulus. Any
> unauthorized or improper disclosure, copying, distribution, or use of
> the contents of this message is prohibited. The information contained
> in this message is intended only for the personal and confidential use
> of the recipient(s) named above. If you have received this message in
> error, please notify the sender immediately and delete the original
> message.
>

Re: Using Kafka's ProducerInterceptor with Samza

Posted by Cameron Lee <ca...@gmail.com>.
That's another interesting data point that it works fine when using
ThreadJobFactory. I'm still not sure why it's not working in YARN though.
If you want, I have a a couple of suggestions you can try:
1) Implement a no-op interceptor and only add the dependencies needed for
that. This could possibly help determine if there are any
dependencies/logic in your custom interceptor that could be causing
problems.
2) Check if the task process is actually dead (e.g. use "ps" on your YARN
hosts), or if it is alive but stuck. If it is dead, then I would hope that
the AM notices that, so maybe you can find something in the AM about the
task process. If it is still alive, maybe you could take a thread dump to
see where it is stuck.
3) Do still double check that the version of Kafka pulled in by Samza
0.14.1 is the same version of Kafka that you are depending on for
implementing the interceptor interface.

All that said, I believe it would be simpler for you to use metrics to do
what you need, so if that works for you, then maybe you don't need to worry
about the interceptor anyways.

On Mon, Oct 7, 2019 at 3:17 PM Malcolm McFarland <mm...@cavulus.com>
wrote:

> Thanks for the advice. I'm pretty sure that I referenced the Kafka 0.11
> docs when implementing this (we're still on Samza 0.14.1), but I'll double
> check. The really strange thing is that the processors have no problems
> starting when I use a ThreadJobFactory, it's only radio silence on a YARN
> deploy (we've had no other code-related issues transitioning to YARN).
>
> I'll take a look through the metrics and see if any of those could fill
> this role. Right now we're looking at per-partition consumption, and maybe
> "process-calls" will help with that.
>
> Cheers,
> Malcolm McFarland
> Cavulus
>
>
> This correspondence is from HealthPlanCRM, LLC, d/b/a Cavulus. Any
> unauthorized or improper disclosure, copying, distribution, or use of the
> contents of this message is prohibited. The information contained in this
> message is intended only for the personal and confidential use of the
> recipient(s) named above. If you have received this message in error,
> please notify the sender immediately and delete the original message.
>
>
> On Mon, Oct 7, 2019 at 2:32 PM Cameron Lee <ca...@gmail.com>
> wrote:
>
> > Hey Malcolm,
> >
> > Sorry for not responding to this earlier. Unfortunately, we don't have an
> > example in which ProducerInterceptor is used in a Samza application. It's
> > odd that the whole task fails when you try to use your interceptor. The
> > only thing I could think of is that the Kafka producer fails to properly
> > initialize, and that causes the task to fail. However, I would still
> expect
> > that you would see some logs pointing out where the error was. One thing
> > you might want to double check is that the version of Kafka that is
> > transitively pulled in by Samza matches the version of Kafka which you
> > implemented your ProducerInterceptor against. It's possible that the
> > interface changed under different versions of Kafka, and so your
> > compile-time Kafka version might not match your runtime Kafka version.
> >
> > That being said, if you are just looking to track partitions that stop
> > listening for messages, then you might not need to use a Kafka
> interceptor.
> > Samza already has metrics for consumption and production of messages, so
> > you can look at those metrics to see if consumption/production rate drops
> > to zero for any partition/container. There is TaskInstanceMetrics which
> > have metrics like "process-calls" and "send-calls", and there are also
> > KafkaSystemProducerMetrics/KafkaSystemConsumerMetrics (although I think
> the
> > Kafka metrics are aggregated across the whole container). Do you think
> > these metrics are sufficient for your use case?
> >
> > Cameron
> >
> > On Mon, Oct 7, 2019 at 1:29 PM Malcolm McFarland <mmcfarland@cavulus.com
> >
> > wrote:
> >
> > > Hey y'all, just wanted to follow up on this: does anybody have a
> working
> > > example of using Kafka's ProducerInterceptor interface under YARN that
> I
> > > could take a look at, or that they could provide some guidance with?
> > >
> > > More generally, we're trying to keep an eye on Samza SSP instances that
> > > occasionally seem to go "zombie"; ie, the AM doesn't see any problems,
> no
> > > errors are surfaced, but they've just stop listening for messages. My
> > idea
> > > here is to record the topic/partition/offset on each message delivery,
> > and
> > > record the subsequent topic/partition/offset when Samza picks up the
> > > message for processing. Are there any ideas out there about how to do
> > this?
> > >
> > > Cheers,
> > > Malcolm McFarland
> > > Cavulus
> > >
> > >
> > > This correspondence is from HealthPlanCRM, LLC, d/b/a Cavulus. Any
> > > unauthorized or improper disclosure, copying, distribution, or use of
> the
> > > contents of this message is prohibited. The information contained in
> this
> > > message is intended only for the personal and confidential use of the
> > > recipient(s) named above. If you have received this message in error,
> > > please notify the sender immediately and delete the original message.
> > >
> > >
> > > Malcolm McFarland
> > > Cavulus
> > >
> > >
> > > This correspondence is from HealthPlanCRM, LLC, d/b/a Cavulus. Any
> > > unauthorized or improper disclosure, copying, distribution, or use of
> the
> > > contents of this message is prohibited. The information contained in
> this
> > > message is intended only for the personal and confidential use of the
> > > recipient(s) named above. If you have received this message in error,
> > > please notify the sender immediately and delete the original message.
> > >
> > >
> > > On Tue, Sep 10, 2019 at 3:13 PM Malcolm McFarland <
> > mmcfarland@cavulus.com>
> > > wrote:
> > >
> > > > Hey folks,
> > > >
> > > > I'm trying to intercept outgoing Kafka messages before they're sent
> to
> > > > do some basic tracking (ie get the actual assigned partition number,
> > > > which isn't available in the StreamTask or MessageCollector classes).
> > > > I've written a class that conforms to Kafka's ProducerInterceptor
> > > > interface [0] and have added the following line to my streamtask
> > > > properties file:
> > > >
> > > >
> > > >
> > >
> >
> systems.kafka.producer.interceptor.classes=com.cavulus.kafka.CavulusKafkaProducerInterceptor
> > > >
> > > > Here's that class' package and signature:
> > > >
> > > > package com.cavulus.kafka;
> > > > import org.apache.kafka.clients.producer.ProducerInterceptor;
> > > > ...
> > > > public class CavulusKafkaProducerInterceptor implements
> > > > ProducerInterceptor { .. }
> > > >
> > > > The class compiles fine, so I assume that I'm adhering correctly to
> > > > the interface, but when I try to start up the task in YARN, it stalls
> > > > -- no logging, no errors, just nothing. I have no problem starting
> > > > this streamtask when omitting the above line from my properties file.
> > > >
> > > > KIP-42 [1] explicitly references Samza as an inspiration for adding
> > > > the ProducerInterceptor to Kafka 0.10, so I've been assuming some
> sort
> > > > of compatibility (but I could be mistaken).
> > > >
> > > > Has anybody attempted this sort of combination?
> > > >
> > > > Cheers,
> > > > Malcolm McFarland
> > > > Cavulus
> > > >
> > > > [0]
> > > >
> > >
> >
> https://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/producer/ProducerInterceptor.html
> > > > [1]
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-42%3A+Add+Producer+and+Consumer+Interceptors
> > > >
> > > > This correspondence is from HealthPlanCRM, LLC, d/b/a Cavulus. Any
> > > > unauthorized or improper disclosure, copying, distribution, or use of
> > > > the contents of this message is prohibited. The information contained
> > > > in this message is intended only for the personal and confidential
> use
> > > > of the recipient(s) named above. If you have received this message in
> > > > error, please notify the sender immediately and delete the original
> > > > message.
> > > >
> > >
> >
>

Re: Using Kafka's ProducerInterceptor with Samza

Posted by Malcolm McFarland <mm...@cavulus.com>.
Thanks for the advice. I'm pretty sure that I referenced the Kafka 0.11
docs when implementing this (we're still on Samza 0.14.1), but I'll double
check. The really strange thing is that the processors have no problems
starting when I use a ThreadJobFactory, it's only radio silence on a YARN
deploy (we've had no other code-related issues transitioning to YARN).

I'll take a look through the metrics and see if any of those could fill
this role. Right now we're looking at per-partition consumption, and maybe
"process-calls" will help with that.

Cheers,
Malcolm McFarland
Cavulus


This correspondence is from HealthPlanCRM, LLC, d/b/a Cavulus. Any
unauthorized or improper disclosure, copying, distribution, or use of the
contents of this message is prohibited. The information contained in this
message is intended only for the personal and confidential use of the
recipient(s) named above. If you have received this message in error,
please notify the sender immediately and delete the original message.


On Mon, Oct 7, 2019 at 2:32 PM Cameron Lee <ca...@gmail.com> wrote:

> Hey Malcolm,
>
> Sorry for not responding to this earlier. Unfortunately, we don't have an
> example in which ProducerInterceptor is used in a Samza application. It's
> odd that the whole task fails when you try to use your interceptor. The
> only thing I could think of is that the Kafka producer fails to properly
> initialize, and that causes the task to fail. However, I would still expect
> that you would see some logs pointing out where the error was. One thing
> you might want to double check is that the version of Kafka that is
> transitively pulled in by Samza matches the version of Kafka which you
> implemented your ProducerInterceptor against. It's possible that the
> interface changed under different versions of Kafka, and so your
> compile-time Kafka version might not match your runtime Kafka version.
>
> That being said, if you are just looking to track partitions that stop
> listening for messages, then you might not need to use a Kafka interceptor.
> Samza already has metrics for consumption and production of messages, so
> you can look at those metrics to see if consumption/production rate drops
> to zero for any partition/container. There is TaskInstanceMetrics which
> have metrics like "process-calls" and "send-calls", and there are also
> KafkaSystemProducerMetrics/KafkaSystemConsumerMetrics (although I think the
> Kafka metrics are aggregated across the whole container). Do you think
> these metrics are sufficient for your use case?
>
> Cameron
>
> On Mon, Oct 7, 2019 at 1:29 PM Malcolm McFarland <mm...@cavulus.com>
> wrote:
>
> > Hey y'all, just wanted to follow up on this: does anybody have a working
> > example of using Kafka's ProducerInterceptor interface under YARN that I
> > could take a look at, or that they could provide some guidance with?
> >
> > More generally, we're trying to keep an eye on Samza SSP instances that
> > occasionally seem to go "zombie"; ie, the AM doesn't see any problems, no
> > errors are surfaced, but they've just stop listening for messages. My
> idea
> > here is to record the topic/partition/offset on each message delivery,
> and
> > record the subsequent topic/partition/offset when Samza picks up the
> > message for processing. Are there any ideas out there about how to do
> this?
> >
> > Cheers,
> > Malcolm McFarland
> > Cavulus
> >
> >
> > This correspondence is from HealthPlanCRM, LLC, d/b/a Cavulus. Any
> > unauthorized or improper disclosure, copying, distribution, or use of the
> > contents of this message is prohibited. The information contained in this
> > message is intended only for the personal and confidential use of the
> > recipient(s) named above. If you have received this message in error,
> > please notify the sender immediately and delete the original message.
> >
> >
> > Malcolm McFarland
> > Cavulus
> >
> >
> > This correspondence is from HealthPlanCRM, LLC, d/b/a Cavulus. Any
> > unauthorized or improper disclosure, copying, distribution, or use of the
> > contents of this message is prohibited. The information contained in this
> > message is intended only for the personal and confidential use of the
> > recipient(s) named above. If you have received this message in error,
> > please notify the sender immediately and delete the original message.
> >
> >
> > On Tue, Sep 10, 2019 at 3:13 PM Malcolm McFarland <
> mmcfarland@cavulus.com>
> > wrote:
> >
> > > Hey folks,
> > >
> > > I'm trying to intercept outgoing Kafka messages before they're sent to
> > > do some basic tracking (ie get the actual assigned partition number,
> > > which isn't available in the StreamTask or MessageCollector classes).
> > > I've written a class that conforms to Kafka's ProducerInterceptor
> > > interface [0] and have added the following line to my streamtask
> > > properties file:
> > >
> > >
> > >
> >
> systems.kafka.producer.interceptor.classes=com.cavulus.kafka.CavulusKafkaProducerInterceptor
> > >
> > > Here's that class' package and signature:
> > >
> > > package com.cavulus.kafka;
> > > import org.apache.kafka.clients.producer.ProducerInterceptor;
> > > ...
> > > public class CavulusKafkaProducerInterceptor implements
> > > ProducerInterceptor { .. }
> > >
> > > The class compiles fine, so I assume that I'm adhering correctly to
> > > the interface, but when I try to start up the task in YARN, it stalls
> > > -- no logging, no errors, just nothing. I have no problem starting
> > > this streamtask when omitting the above line from my properties file.
> > >
> > > KIP-42 [1] explicitly references Samza as an inspiration for adding
> > > the ProducerInterceptor to Kafka 0.10, so I've been assuming some sort
> > > of compatibility (but I could be mistaken).
> > >
> > > Has anybody attempted this sort of combination?
> > >
> > > Cheers,
> > > Malcolm McFarland
> > > Cavulus
> > >
> > > [0]
> > >
> >
> https://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/producer/ProducerInterceptor.html
> > > [1]
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-42%3A+Add+Producer+and+Consumer+Interceptors
> > >
> > > This correspondence is from HealthPlanCRM, LLC, d/b/a Cavulus. Any
> > > unauthorized or improper disclosure, copying, distribution, or use of
> > > the contents of this message is prohibited. The information contained
> > > in this message is intended only for the personal and confidential use
> > > of the recipient(s) named above. If you have received this message in
> > > error, please notify the sender immediately and delete the original
> > > message.
> > >
> >
>

Re: Using Kafka's ProducerInterceptor with Samza

Posted by Cameron Lee <ca...@gmail.com>.
Hey Malcolm,

Sorry for not responding to this earlier. Unfortunately, we don't have an
example in which ProducerInterceptor is used in a Samza application. It's
odd that the whole task fails when you try to use your interceptor. The
only thing I could think of is that the Kafka producer fails to properly
initialize, and that causes the task to fail. However, I would still expect
that you would see some logs pointing out where the error was. One thing
you might want to double check is that the version of Kafka that is
transitively pulled in by Samza matches the version of Kafka which you
implemented your ProducerInterceptor against. It's possible that the
interface changed under different versions of Kafka, and so your
compile-time Kafka version might not match your runtime Kafka version.

That being said, if you are just looking to track partitions that stop
listening for messages, then you might not need to use a Kafka interceptor.
Samza already has metrics for consumption and production of messages, so
you can look at those metrics to see if consumption/production rate drops
to zero for any partition/container. There is TaskInstanceMetrics which
have metrics like "process-calls" and "send-calls", and there are also
KafkaSystemProducerMetrics/KafkaSystemConsumerMetrics (although I think the
Kafka metrics are aggregated across the whole container). Do you think
these metrics are sufficient for your use case?

Cameron

On Mon, Oct 7, 2019 at 1:29 PM Malcolm McFarland <mm...@cavulus.com>
wrote:

> Hey y'all, just wanted to follow up on this: does anybody have a working
> example of using Kafka's ProducerInterceptor interface under YARN that I
> could take a look at, or that they could provide some guidance with?
>
> More generally, we're trying to keep an eye on Samza SSP instances that
> occasionally seem to go "zombie"; ie, the AM doesn't see any problems, no
> errors are surfaced, but they've just stop listening for messages. My idea
> here is to record the topic/partition/offset on each message delivery, and
> record the subsequent topic/partition/offset when Samza picks up the
> message for processing. Are there any ideas out there about how to do this?
>
> Cheers,
> Malcolm McFarland
> Cavulus
>
>
> This correspondence is from HealthPlanCRM, LLC, d/b/a Cavulus. Any
> unauthorized or improper disclosure, copying, distribution, or use of the
> contents of this message is prohibited. The information contained in this
> message is intended only for the personal and confidential use of the
> recipient(s) named above. If you have received this message in error,
> please notify the sender immediately and delete the original message.
>
>
> Malcolm McFarland
> Cavulus
>
>
> This correspondence is from HealthPlanCRM, LLC, d/b/a Cavulus. Any
> unauthorized or improper disclosure, copying, distribution, or use of the
> contents of this message is prohibited. The information contained in this
> message is intended only for the personal and confidential use of the
> recipient(s) named above. If you have received this message in error,
> please notify the sender immediately and delete the original message.
>
>
> On Tue, Sep 10, 2019 at 3:13 PM Malcolm McFarland <mm...@cavulus.com>
> wrote:
>
> > Hey folks,
> >
> > I'm trying to intercept outgoing Kafka messages before they're sent to
> > do some basic tracking (ie get the actual assigned partition number,
> > which isn't available in the StreamTask or MessageCollector classes).
> > I've written a class that conforms to Kafka's ProducerInterceptor
> > interface [0] and have added the following line to my streamtask
> > properties file:
> >
> >
> >
> systems.kafka.producer.interceptor.classes=com.cavulus.kafka.CavulusKafkaProducerInterceptor
> >
> > Here's that class' package and signature:
> >
> > package com.cavulus.kafka;
> > import org.apache.kafka.clients.producer.ProducerInterceptor;
> > ...
> > public class CavulusKafkaProducerInterceptor implements
> > ProducerInterceptor { .. }
> >
> > The class compiles fine, so I assume that I'm adhering correctly to
> > the interface, but when I try to start up the task in YARN, it stalls
> > -- no logging, no errors, just nothing. I have no problem starting
> > this streamtask when omitting the above line from my properties file.
> >
> > KIP-42 [1] explicitly references Samza as an inspiration for adding
> > the ProducerInterceptor to Kafka 0.10, so I've been assuming some sort
> > of compatibility (but I could be mistaken).
> >
> > Has anybody attempted this sort of combination?
> >
> > Cheers,
> > Malcolm McFarland
> > Cavulus
> >
> > [0]
> >
> https://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/producer/ProducerInterceptor.html
> > [1]
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-42%3A+Add+Producer+and+Consumer+Interceptors
> >
> > This correspondence is from HealthPlanCRM, LLC, d/b/a Cavulus. Any
> > unauthorized or improper disclosure, copying, distribution, or use of
> > the contents of this message is prohibited. The information contained
> > in this message is intended only for the personal and confidential use
> > of the recipient(s) named above. If you have received this message in
> > error, please notify the sender immediately and delete the original
> > message.
> >
>