You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Paolo Patierno <pp...@live.com> on 2016/03/31 09:02:46 UTC

About AMQP connector and Kafka Connect framework

Hi all,

after the following Twitter conversation ...

https://twitter.com/jfield/status/715299287479877632

I'd like to explain better my concerns about using Kafka Connect for an AMQP connector.
I started to develop it almost one month ago (only on source side, https://github.com/ppatierno/kafka-connect-amqp) but then switched to develop a bridge by myself without using Kafka Connect due to some AMQP needs not well supported by Connect framework. 
Following the first problems I noticed ...

In my implementation, the so called "source connector" creates a ProtonServer to accept AMQP connection from sender client and gives the possibility to the related "source task" to receive message on an AMQP connection, extract address, partition (that should be Kafka Topic and related partition) and body and finally return a structured data injected into Kafka.
The
 first big problem is that if the injected message has a problem (for 
example the specified partition doesn't exist for the topic), my 
application logic doesn't receive any exception or any called callback 
to have such information for handling. This could work fine for 
pre-settled message (At Most One) but not for At Least One delivery. I need to return a "disposition" message to the client with the delivery state of accepted or rejected.

If I want to use the same framework to receive from Kafka but on AMQP connection as well, I have a bigger problem.
My current implementation hasn't the receiver side yet but it should be as follow : the "sink connector" has to create a ProtonServer to accept AMQP connection and start reading messages from a Kafka topic that should be the AMQP address (node) specified by the remote AMQP connected receiver.
The big problem is that in the Kafka Connect
 framework architecture, the topic from which to read MUST be defined 
statically in a configuration file used by the framework itself on 
startup. In my scenario, I need to start reading from a Kafka topic defined "dinamically" because it depends on the address/node specified by the remote AMQP receiver. The receiver attach to a link on an address that should be the Kafka topic to consume; the address is defined at runtime and the pattern proposal (https://issues.apache.org/jira/browse/KAFKA-3073) can't be enough.

Paolo

Paolo PatiernoSenior Software Engineer
 

Windows Embedded & IoTMicrosoft Azure Advisor 
Twitter : @ppatierno
Linkedin : paolopatierno
Blog : DevExperienceBlog : Embedded101
 		 	   		  

Re: About AMQP connector and Kafka Connect framework

Posted by Ewen Cheslack-Postava <ew...@confluent.io>.
On Fri, Apr 1, 2016 at 12:23 AM, Paolo Patierno <pp...@live.com> wrote:

> Hi Ewen,
>
> thanks for your reply.
>
> My objective here is to access Kafka through AMQP protocol (now I'm
> working on a bridge from scratch without using Kafka Connect).
>
> Consider the following scenario ...
>
> Producer side :
>
> An AMQP client connects to the SourceConnector which is listening on AMQP
> port (5672) and accept connections. The connection is opened and client
> execute an AMQP "attach" performative on a specific address : this address
> is exactly the Kafka corresponding topic (i.e. /my_topic).
> Starting from now the AMQP client starts to send messages ("transfer"
> frame in the AMQP jargon) which are translated into SourceRecord returned
> by the poll() (inside my SourceTask)  : other than the body, the AMQP
> message can specify which is the partition or the key to use for
> determining partition destination.
> Now ... as the connector is a sort of bridge between AMQP and Kafka, it
> has to return a feedback to the client in a AMQP fashion : it means to send
> a "disposition" frame with acknowledgment about the message injected into
> Kafka. The problem with connector is that after returning SourceRecord
> (from poll() method) we can't have any feedback from the Connect framework
> if that record is injected correctly into Kafka (i.e. partition wrong and
> so on) : it means we can't send the correct "disposition" to the client.
> It could be interesting to have the same callback as for the KafkaProducer
> client where you know the result of message sent to Kafka.
>

Actually, there is API already for getting feedback. There was already a
commit() method that SourceTasks could override to get "bulk" feedback. The
use case here was more for APIs where you might just record or ack to an
offset and allow the source system to clean up anything up to that point.
However, you could still use it to ack individual messages by keeping track
of which messages were outstanding. See
https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java#L56

We also have a new API, commitRecord, in the next version to handle
single-message acks, which I think was originally motivated by supporting a
JMS connector. See
https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java#L83
for this new API. In both cases, they should not be invoked until we're
sure the data has been flushed to Kafka.

It's probably worth pointing out here that this kind of setup may only work
well in standalone mode. In distributed mode, your tasks can bounce around
workers, so unless you dynamically register them somehow you wouldn't know
what host + port clients should communicate with. But since you're
expecting a single fixed port, I'm guessing this was the mode you planned
to deploy in anyway?


>
> Consumer side :
>
> An AMQP client connect to the SinkConnector which is listening on AMQP
> port (5672) and accept connections. The connection is opened and client
> execute an AMQP "attach"
> performative on a specific address : this address is exactly the Kafka
> corresponding topic (i.e. /my_topic).
> This operation at runtime isn't possible with Connect because the
> SinkConnector must be configured statically using the configuration file
> and specifying what's the list of topics to read.
> With AMQP we have clients that connect and disconnect dinamically so we
> know the topic from which have to read only at runtime.
>

I see, this is definitely true currently. Initially I was thinking that
another method on ConnectorContext or SinkTaskContext could address this
(along with removal of some checks on non-empty subscriptions). This might
be harder to make work generally though -- it's not strictly required, but
normally we'd expect all the consumers (and thus SinkTasks for a connector)
to make the same subscriptions. This would mean we'd either need some
additional coordination between them or we'd have to accept that only one
of them would make the subscription for dynamically registered topics. I
think that might work for your use case (which, again, sounds like it'll be
a standalone-mode single-task connector), but has tradeoffs that we'd need
to think through for distributed connectors.

I think there's a second complicating factor as well, which is that any
dynamic subscriptions (that cannot be expressed statically, i.e. as
regexes) would need some state storage for those subscriptions in
distributed mode to provide fault tolerance.

I think you can actually make the source side work just fine w/ features
available today, but the sink side with completely dynamic topics is
definitely more complicated and any new features added to support it would
need to be thoroughly thought through wrt how they impact both standalone
and distributed modes.


>
> After starting to use Connect and facing with these above problems I
> stopped to use it and decided to do it from scratch.
> I don't know if it could be other problems related to Connect adoption for
> AMQP.
> Of course I see the need to use two AMQP server on both sides : source and
> sink connector. With my current implementation I have only one and then use
> KafkaConsumer and KafkaProducer client library.
>

I'll be the first to admit that Connect is not the be-all and end-all for
getting data into/out of Kafka -- we want it to cover a lot of use cases,
especially ones that need scalability and fault tolerance. We want Connect
to help people avoid reimplementing both common and error-prone code. We
explicitly did not try to optimize for connectors that, e.g., are push
based sources & listen on a single host + single port and therefore won't
be able to leverage distributed mode. In some cases (specifically
single-node push-based sources and pull-based sinks), it is possible it
will be easier to use the low level client libraries directly.

Something you might consider is whether you can reuse some of Kafka
Connect's public classes in your implementation. You'd need to be careful
about using internal implementation details that are subject to change, but
I'd at least recommend reusing things like the Converter interface, which
will help you abstract away details around serialization and make your
connector usable by as many folks as possible even if its not using Connect
fully.

-Ewen

>
> Paolo.
>
> Paolo PatiernoSenior Software Engineer @Red Hat
> Windows Embedded & IoTMicrosoft Azure Advisor
> Twitter : @ppatierno
> Linkedin : paolopatierno
> Blog : DevExperience
>
> > Date: Thu, 31 Mar 2016 21:20:15 -0700
> > Subject: Re: About AMQP connector and Kafka Connect framework
> > From: ewen@confluent.io
> > To: users@kafka.apache.org
> >
> > On Thu, Mar 31, 2016 at 12:02 AM, Paolo Patierno <pp...@live.com>
> wrote:
> >
> > > Hi all,
> > >
> > > after the following Twitter conversation ...
> > >
> > > https://twitter.com/jfield/status/715299287479877632
> > >
> > > I'd like to explain better my concerns about using Kafka Connect for an
> > > AMQP connector.
> > > I started to develop it almost one month ago (only on source side,
> > > https://github.com/ppatierno/kafka-connect-amqp) but then switched to
> > > develop a bridge by myself without using Kafka Connect due to some AMQP
> > > needs not well supported by Connect framework.
> > > Following the first problems I noticed ...
> > >
> > > In my implementation, the so called "source connector" creates a
> > > ProtonServer to accept AMQP connection from sender client and gives the
> > > possibility to the related "source task" to receive message on an AMQP
> > > connection, extract address, partition (that should be Kafka Topic and
> > > related partition) and body and finally return a structured data
> injected
> > > into Kafka.
> > > The
> > >  first big problem is that if the injected message has a problem (for
> > > example the specified partition doesn't exist for the topic), my
> > > application logic doesn't receive any exception or any called callback
> > > to have such information for handling. This could work fine for
> > > pre-settled message (At Most One) but not for At Least One delivery. I
> > > need to return a "disposition" message to the client with the delivery
> > > state of accepted or rejected.
> > >
> >
> > This is a really good point and something that is not handled well today.
> > Most connectors wouldn't include partition info in SourceRecords -- they
> > would simply specify a key (or have no keys at all) and not worry about
> the
> > partitioning in Kafka. It sounds like the way you are mapping things
> > actually requires matching partitions. When producing data, you could
> > potentially have two types of errors. The first I'd refer to as something
> > like "functional" errors -- if Kafka is down, brokers fail, networks
> > between connect and brokers fail, etc, you simply can't get the data into
> > Kafka. The second we might refer to as semantic or mapping errors. The
> > example you're giving (mismatched number of partitions) is one case.
> > Another case I can think of is when the Converter you are using simply
> > can't support the format of data provided (which we try to avoid by only
> > supporting common formats, but inevitably, some serialization format
> won't
> > work).
> >
> > You are right that feedback about either type of error are not well
> > supported today. Functional errors we actually just try to address in the
> > framework and don't really want the connector to be aware of. For the
> most
> > part, retries and pausing consumption of new data will address this; you
> > want it exposed via metrics so someone can discover the problem, but the
> > connector probably can't do anything about it anyway, so providing that
> > info to the connector is not helpful.
> >
> > For the semantic errors, we may need some more support. I'm curious what
> > type of feedback you'd like and at what granularity (per record? per set
> of
> > poll() records? something else?) and immediacy (sync? async?). If you
> > encounter one of these errors, what would your connector do in response?
> > Something more than logging an error? Adding something to Connect might
> > make sense here, but it's worth thinking through the entire subsequent
> > sequence of events to understand exactly what type of feedback is needed.
> >
> >
> > >
> > > If I want to use the same framework to receive from Kafka but on AMQP
> > > connection as well, I have a bigger problem.
> > > My current implementation hasn't the receiver side yet but it should
> be as
> > > follow : the "sink connector" has to create a ProtonServer to accept
> AMQP
> > > connection and start reading messages from a Kafka topic that should
> be the
> > > AMQP address (node) specified by the remote AMQP connected receiver.
> > > The big problem is that in the Kafka Connect
> > >  framework architecture, the topic from which to read MUST be defined
> > > statically in a configuration file used by the framework itself on
> > > startup. In my scenario, I need to start reading from a Kafka topic
> > > defined "dinamically" because it depends on the address/node specified
> by
> > > the remote AMQP receiver. The receiver attach to a link on an address
> that
> > > should be the Kafka topic to consume; the address is defined at
> runtime and
> > > the pattern proposal (https://issues.apache.org/jira/browse/KAFKA-3073
> )
> > > can't be enough.
> > >
> >
> > I think this is the piece that I was confused by in the Twitter
> discussion,
> > but admittedly 140 chars isn't great for explaining intent. Can we maybe
> > take a step back from Kafka Connect and just describe the mapping you're
> > looking for between Kafka and AMQP? How do you expect messages to be
> > routed? What is the translation that you expect to happen? I think if we
> > get rid of any assumptions Connect currently makes, we can get at what
> > you're really trying to achieve and then figure out whether that's
> > something Connect can currently support or if it's a gap in the current
> > functionality that we should try to address.
> >
> > Thanks,
> > Ewen
> >
> >
> >
> > >
> > > Paolo
> > >
> > > Paolo PatiernoSenior Software Engineer
> > >
> > >
> > > Windows Embedded & IoTMicrosoft Azure Advisor
> > > Twitter : @ppatierno
> > > Linkedin : paolopatierno
> > > Blog : DevExperienceBlog : Embedded101
> > >
>
>



-- 
Thanks,
Ewen

RE: About AMQP connector and Kafka Connect framework

Posted by Paolo Patierno <pp...@live.com>.
Hi Ewen,

thanks for your reply. 

My objective here is to access Kafka through AMQP protocol (now I'm working on a bridge from scratch without using Kafka Connect).

Consider the following scenario ...

Producer side :

An AMQP client connects to the SourceConnector which is listening on AMQP port (5672) and accept connections. The connection is opened and client execute an AMQP "attach" performative on a specific address : this address is exactly the Kafka corresponding topic (i.e. /my_topic).
Starting from now the AMQP client starts to send messages ("transfer" frame in the AMQP jargon) which are translated into SourceRecord returned by the poll() (inside my SourceTask)  : other than the body, the AMQP message can specify which is the partition or the key to use for determining partition destination.
Now ... as the connector is a sort of bridge between AMQP and Kafka, it has to return a feedback to the client in a AMQP fashion : it means to send a "disposition" frame with acknowledgment about the message injected into Kafka. The problem with connector is that after returning SourceRecord (from poll() method) we can't have any feedback from the Connect framework if that record is injected correctly into Kafka (i.e. partition wrong and so on) : it means we can't send the correct "disposition" to the client.
It could be interesting to have the same callback as for the KafkaProducer client where you know the result of message sent to Kafka.

Consumer side :

An AMQP client connect to the SinkConnector which is listening on AMQP port (5672) and accept connections. The connection is opened and client execute an AMQP "attach" 
performative on a specific address : this address is exactly the Kafka 
corresponding topic (i.e. /my_topic).
This operation at runtime isn't possible with Connect because the SinkConnector must be configured statically using the configuration file and specifying what's the list of topics to read.
With AMQP we have clients that connect and disconnect dinamically so we know the topic from which have to read only at runtime.

After starting to use Connect and facing with these above problems I stopped to use it and decided to do it from scratch.
I don't know if it could be other problems related to Connect adoption for AMQP.
Of course I see the need to use two AMQP server on both sides : source and sink connector. With my current implementation I have only one and then use KafkaConsumer and KafkaProducer client library.

Paolo.

Paolo PatiernoSenior Software Engineer @Red Hat
Windows Embedded & IoTMicrosoft Azure Advisor 
Twitter : @ppatierno
Linkedin : paolopatierno
Blog : DevExperience

> Date: Thu, 31 Mar 2016 21:20:15 -0700
> Subject: Re: About AMQP connector and Kafka Connect framework
> From: ewen@confluent.io
> To: users@kafka.apache.org
> 
> On Thu, Mar 31, 2016 at 12:02 AM, Paolo Patierno <pp...@live.com> wrote:
> 
> > Hi all,
> >
> > after the following Twitter conversation ...
> >
> > https://twitter.com/jfield/status/715299287479877632
> >
> > I'd like to explain better my concerns about using Kafka Connect for an
> > AMQP connector.
> > I started to develop it almost one month ago (only on source side,
> > https://github.com/ppatierno/kafka-connect-amqp) but then switched to
> > develop a bridge by myself without using Kafka Connect due to some AMQP
> > needs not well supported by Connect framework.
> > Following the first problems I noticed ...
> >
> > In my implementation, the so called "source connector" creates a
> > ProtonServer to accept AMQP connection from sender client and gives the
> > possibility to the related "source task" to receive message on an AMQP
> > connection, extract address, partition (that should be Kafka Topic and
> > related partition) and body and finally return a structured data injected
> > into Kafka.
> > The
> >  first big problem is that if the injected message has a problem (for
> > example the specified partition doesn't exist for the topic), my
> > application logic doesn't receive any exception or any called callback
> > to have such information for handling. This could work fine for
> > pre-settled message (At Most One) but not for At Least One delivery. I
> > need to return a "disposition" message to the client with the delivery
> > state of accepted or rejected.
> >
> 
> This is a really good point and something that is not handled well today.
> Most connectors wouldn't include partition info in SourceRecords -- they
> would simply specify a key (or have no keys at all) and not worry about the
> partitioning in Kafka. It sounds like the way you are mapping things
> actually requires matching partitions. When producing data, you could
> potentially have two types of errors. The first I'd refer to as something
> like "functional" errors -- if Kafka is down, brokers fail, networks
> between connect and brokers fail, etc, you simply can't get the data into
> Kafka. The second we might refer to as semantic or mapping errors. The
> example you're giving (mismatched number of partitions) is one case.
> Another case I can think of is when the Converter you are using simply
> can't support the format of data provided (which we try to avoid by only
> supporting common formats, but inevitably, some serialization format won't
> work).
> 
> You are right that feedback about either type of error are not well
> supported today. Functional errors we actually just try to address in the
> framework and don't really want the connector to be aware of. For the most
> part, retries and pausing consumption of new data will address this; you
> want it exposed via metrics so someone can discover the problem, but the
> connector probably can't do anything about it anyway, so providing that
> info to the connector is not helpful.
> 
> For the semantic errors, we may need some more support. I'm curious what
> type of feedback you'd like and at what granularity (per record? per set of
> poll() records? something else?) and immediacy (sync? async?). If you
> encounter one of these errors, what would your connector do in response?
> Something more than logging an error? Adding something to Connect might
> make sense here, but it's worth thinking through the entire subsequent
> sequence of events to understand exactly what type of feedback is needed.
> 
> 
> >
> > If I want to use the same framework to receive from Kafka but on AMQP
> > connection as well, I have a bigger problem.
> > My current implementation hasn't the receiver side yet but it should be as
> > follow : the "sink connector" has to create a ProtonServer to accept AMQP
> > connection and start reading messages from a Kafka topic that should be the
> > AMQP address (node) specified by the remote AMQP connected receiver.
> > The big problem is that in the Kafka Connect
> >  framework architecture, the topic from which to read MUST be defined
> > statically in a configuration file used by the framework itself on
> > startup. In my scenario, I need to start reading from a Kafka topic
> > defined "dinamically" because it depends on the address/node specified by
> > the remote AMQP receiver. The receiver attach to a link on an address that
> > should be the Kafka topic to consume; the address is defined at runtime and
> > the pattern proposal (https://issues.apache.org/jira/browse/KAFKA-3073)
> > can't be enough.
> >
> 
> I think this is the piece that I was confused by in the Twitter discussion,
> but admittedly 140 chars isn't great for explaining intent. Can we maybe
> take a step back from Kafka Connect and just describe the mapping you're
> looking for between Kafka and AMQP? How do you expect messages to be
> routed? What is the translation that you expect to happen? I think if we
> get rid of any assumptions Connect currently makes, we can get at what
> you're really trying to achieve and then figure out whether that's
> something Connect can currently support or if it's a gap in the current
> functionality that we should try to address.
> 
> Thanks,
> Ewen
> 
> 
> 
> >
> > Paolo
> >
> > Paolo PatiernoSenior Software Engineer
> >
> >
> > Windows Embedded & IoTMicrosoft Azure Advisor
> > Twitter : @ppatierno
> > Linkedin : paolopatierno
> > Blog : DevExperienceBlog : Embedded101
> >
 		 	   		  

Re: About AMQP connector and Kafka Connect framework

Posted by Ewen Cheslack-Postava <ew...@confluent.io>.
On Thu, Mar 31, 2016 at 12:02 AM, Paolo Patierno <pp...@live.com> wrote:

> Hi all,
>
> after the following Twitter conversation ...
>
> https://twitter.com/jfield/status/715299287479877632
>
> I'd like to explain better my concerns about using Kafka Connect for an
> AMQP connector.
> I started to develop it almost one month ago (only on source side,
> https://github.com/ppatierno/kafka-connect-amqp) but then switched to
> develop a bridge by myself without using Kafka Connect due to some AMQP
> needs not well supported by Connect framework.
> Following the first problems I noticed ...
>
> In my implementation, the so called "source connector" creates a
> ProtonServer to accept AMQP connection from sender client and gives the
> possibility to the related "source task" to receive message on an AMQP
> connection, extract address, partition (that should be Kafka Topic and
> related partition) and body and finally return a structured data injected
> into Kafka.
> The
>  first big problem is that if the injected message has a problem (for
> example the specified partition doesn't exist for the topic), my
> application logic doesn't receive any exception or any called callback
> to have such information for handling. This could work fine for
> pre-settled message (At Most One) but not for At Least One delivery. I
> need to return a "disposition" message to the client with the delivery
> state of accepted or rejected.
>

This is a really good point and something that is not handled well today.
Most connectors wouldn't include partition info in SourceRecords -- they
would simply specify a key (or have no keys at all) and not worry about the
partitioning in Kafka. It sounds like the way you are mapping things
actually requires matching partitions. When producing data, you could
potentially have two types of errors. The first I'd refer to as something
like "functional" errors -- if Kafka is down, brokers fail, networks
between connect and brokers fail, etc, you simply can't get the data into
Kafka. The second we might refer to as semantic or mapping errors. The
example you're giving (mismatched number of partitions) is one case.
Another case I can think of is when the Converter you are using simply
can't support the format of data provided (which we try to avoid by only
supporting common formats, but inevitably, some serialization format won't
work).

You are right that feedback about either type of error are not well
supported today. Functional errors we actually just try to address in the
framework and don't really want the connector to be aware of. For the most
part, retries and pausing consumption of new data will address this; you
want it exposed via metrics so someone can discover the problem, but the
connector probably can't do anything about it anyway, so providing that
info to the connector is not helpful.

For the semantic errors, we may need some more support. I'm curious what
type of feedback you'd like and at what granularity (per record? per set of
poll() records? something else?) and immediacy (sync? async?). If you
encounter one of these errors, what would your connector do in response?
Something more than logging an error? Adding something to Connect might
make sense here, but it's worth thinking through the entire subsequent
sequence of events to understand exactly what type of feedback is needed.


>
> If I want to use the same framework to receive from Kafka but on AMQP
> connection as well, I have a bigger problem.
> My current implementation hasn't the receiver side yet but it should be as
> follow : the "sink connector" has to create a ProtonServer to accept AMQP
> connection and start reading messages from a Kafka topic that should be the
> AMQP address (node) specified by the remote AMQP connected receiver.
> The big problem is that in the Kafka Connect
>  framework architecture, the topic from which to read MUST be defined
> statically in a configuration file used by the framework itself on
> startup. In my scenario, I need to start reading from a Kafka topic
> defined "dinamically" because it depends on the address/node specified by
> the remote AMQP receiver. The receiver attach to a link on an address that
> should be the Kafka topic to consume; the address is defined at runtime and
> the pattern proposal (https://issues.apache.org/jira/browse/KAFKA-3073)
> can't be enough.
>

I think this is the piece that I was confused by in the Twitter discussion,
but admittedly 140 chars isn't great for explaining intent. Can we maybe
take a step back from Kafka Connect and just describe the mapping you're
looking for between Kafka and AMQP? How do you expect messages to be
routed? What is the translation that you expect to happen? I think if we
get rid of any assumptions Connect currently makes, we can get at what
you're really trying to achieve and then figure out whether that's
something Connect can currently support or if it's a gap in the current
functionality that we should try to address.

Thanks,
Ewen



>
> Paolo
>
> Paolo PatiernoSenior Software Engineer
>
>
> Windows Embedded & IoTMicrosoft Azure Advisor
> Twitter : @ppatierno
> Linkedin : paolopatierno
> Blog : DevExperienceBlog : Embedded101
>