You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Matt Farmer <ma...@frmr.me> on 2018/03/19 22:02:19 UTC

Seeking Feedback on Kafka Connect Issues

Hi everyone,

We’ve been experimenting recently with some limited use of Kafka Connect and are hoping to expand to wider use cases soon. However, we had some internal issues that gave us a well-timed preview of error handling behavior in Kafka Connect. I think the fixes for this will require at least three different KIPs, but I want to share some thoughts to get the initial reaction from folks in the dev community. If these ideas seem reasonable, I can go ahead and create the required KIPs.

Here are the three things specifically we ran into…

-----------

(1) Kafka Connect only retries tasks when certain exceptions are thrown
Currently, Kafka Connect only retries tasks when certain exceptions are thrown - I believe the logic checks to see if the exception is specifically marked as “retryable” and if not, fails. We’d like to bypass this behavior and implement a configurable exponential backoff for tasks regardless of the failure reason. This is probably two changes: one to implement exponential backoff retries for tasks if they don’t already exist and a chance to implement a RetryPolicy interface that evaluates the Exception to determine whether or not to retry.

(2) Kafka Connect doesn’t permit Connectors to smartly reposition after rebalance
We’re using the S3 connector to dump files with a large number of records into an S3 bucket. About 100,000 records per file. Unfortunately, every time a task fails, the consumer rebalance causes all partitions to get re-shuffled amongst the various partitions. To compensate for this, the connector gets stopped and started from what I can tell from the logs? And then picks up from the last consumer position that was committed to the brokers.

This doesn’t work great if you’re batching things into large numbers for archival.

For the S3 connector, for example: Let’s say I have two partitions and the connector has two tasks to process each of those. Task 0 is at 5,000 records read from the last commit and Task 1 is at 70,000 records read from the last commit. Then, boom, something goes wrong with Task 0 and it falls over. This triggers a rebalance and Task 1 has to take over the workload. Task 1 will, at this point, discard the 70,000 records in its buffer and start from the last commit point. This failure mode is brutal for the archival system we’re building.

There are two solutions that I can think of to this:

(A) Provide an interface for connectors to define their own rebalance listener. This listener could compare the newly assigned list of partitions with a previously assigned list. For all partitions that this connector was already working on prior to the rebalance, it could manually seek to the last position it locally processed before resuming. So, in the scenario above Task 1 could keep an accounting file locally and seek over the first 70,000 records without reprocessing them. It would then wait until after it confirms the S3 upload to commit those offsets back to Kafka. This ensures that if the machine running Task 1 dies a new consumer can take its place, but we’ll still benefit from a local cache if one is present.

(B) Have connect manually round robin partitions on a topic to tasks and never rebalance them automatically. If this were combined with better task retry semantics, I think this solution would be simpler.

(3) As far as I can tell, JMX metrics aren’t reporting the number of active tasks
This one is arguably the simplest issue to resolve, but we’d like to alert if the number of active tasks isn’t what we expect it to be so that we can have a human investigate.

-----------

I would love thoughts on all of the above from anyone on this list.

Thanks,

Matt Farmer

Re: Seeking Feedback on Kafka Connect Issues

Posted by Matt Farmer <ma...@frmr.me>.
Howdy,

I'll keep an eye on that ticket. KIP-275 came out of some work we've done
internally on our
private forks of Kafka and the Confluent Cloud Storage connector.
Essentially, with that extra
API we've tweaked the S3 connector to check the value of isClosing in
preCommit and immediately
attempt to commit files to S3 regardless of whether or not we've reached a
size or time limit.

We've been using this internally for a few days and it's been working well
for our needs. Whenever
it gets approved and merged I'll be able to open PRs against the Confluent
repos for the changes
we made pretty quickly.

We are, however, still interested in some better error handling for
Connect. I think in the interim
we're going to have to build a sidecar service that monitors the Connect
API for failed tasks
and restarts them for us. :(

Happy to provide whatever help I can toward making that sidecar service not
needed.

On Mon, Apr 2, 2018 at 11:18 AM, Randall Hauch <rh...@gmail.com> wrote:

> Yes, Confluent would be interested in improvements to the S3 connector.
> Feel free to create an issue/PR in
> https://github.com/confluentinc/kafka-connect-storage-cloud/.
>
> I just created https://issues.apache.org/jira/browse/KAFKA-6738 to deal
> with the bad data handling issue, and we can use that to track all of the
> comments, discussions, and work. I know that Konstantine K has already
> thought a fair amount about this, and so I've assigned it (at least
> initially) to him. This is something we'd like to get into the next AK
> release (2.0?), but would certainly appreciate any help from you or any
> other members of the community. If you're willing to help, I'd ask that you
> please coordinate with him on
> https://issues.apache.org/jira/browse/KAFKA-6738.
>
> As a side note, the KIP freeze for each release is often a good month
> before the planned release, and feature freeze usually only a week after
> that. This means that KIP that fails to be approved before this deadline
> will be pushed to the next release - all the more reason to work on the KIP
> and the implementation well before deadline.
>
> Randall
>
> On Tue, Mar 20, 2018 at 9:49 AM, Matt Farmer <ma...@frmr.me> wrote:
>
> > Hi Ewen,
> >
> > Thanks for the thoughtful response. I’m happy to take some time to write
> > up a KIP and do some implementation work here.
> > I did KIP-210 previously, so I’ve been through the process before. We
> also
> > have some organizational interest for improving
> > Kafka Connect. Our concern internally is that we don’t want to wait on
> the
> > KIP cycle to fully complete before rolling out
> > something. It was many months for KIP-210 to go from draft to merge.
> >
> > It might be sufficient for us in the interim to:
> >
> > (1) Improve the S3 connector using close/open to be smarter about what
> > multipart uploads it cancels during rebalance.
> >
> > (2) Implement a sidecar service that monitors the connect API and restart
> > tasks that fail
> >
> > … and in parallel work on the KIPs required to provide a less patchwork
> > solution in the framework itself.
> >
> > Is such a contribution to the S3 connector something that Confluent would
> > be open to?
> >
> > > On Mar 19, 2018, at 10:00 PM, Ewen Cheslack-Postava <ewen@confluent.io
> >
> > wrote:
> > >
> > > Responses inline.
> > >
> > > On Mon, Mar 19, 2018 at 3:02 PM, Matt Farmer <matt@frmr.me <mailto:
> > matt@frmr.me>> wrote:
> > >
> > >> Hi everyone,
> > >>
> > >> We’ve been experimenting recently with some limited use of Kafka
> Connect
> > >> and are hoping to expand to wider use cases soon. However, we had some
> > >> internal issues that gave us a well-timed preview of error handling
> > >> behavior in Kafka Connect. I think the fixes for this will require at
> > least
> > >> three different KIPs, but I want to share some thoughts to get the
> > initial
> > >> reaction from folks in the dev community. If these ideas seem
> > reasonable, I
> > >> can go ahead and create the required KIPs.
> > >>
> > >> Here are the three things specifically we ran into…
> > >>
> > >> -----------
> > >>
> > >> (1) Kafka Connect only retries tasks when certain exceptions are
> thrown
> > >> Currently, Kafka Connect only retries tasks when certain exceptions
> are
> > >> thrown - I believe the logic checks to see if the exception is
> > specifically
> > >> marked as “retryable” and if not, fails. We’d like to bypass this
> > behavior
> > >> and implement a configurable exponential backoff for tasks regardless
> of
> > >> the failure reason. This is probably two changes: one to implement
> > >> exponential backoff retries for tasks if they don’t already exist and
> a
> > >> chance to implement a RetryPolicy interface that evaluates the
> > Exception to
> > >> determine whether or not to retry.
> > >>
> > >
> > > This has definitely come up before. The likely "fix" for this is to
> > provide
> > > general "bad data handling" options within the framework itself. The
> > > obvious set would be
> > >
> > > 1. fail fast, which is what we do today (assuming connector actually
> > fails
> > > and doesn't eat errors)
> > > 2. retry (possibly with configs to limit)
> > > 3. drop data and move on
> > > 4. dead letter queue
> > >
> > > This needs to be addressed in a way that handles errors from:
> > >
> > > 1. The connector itself (e.g. connectivity issues to the other system)
> > > 2. Converters/serializers (bad data, unexpected format, etc)
> > > 3. SMTs
> > > 4. Ideally the fmwk as well (though I don't think we have any known
> bugs
> > > where this would be a problem, and we'd be inclined to just fix them
> > > anyway).
> > >
> > > I think we understand the space of problems and how to address them
> > pretty
> > > well already, this issue is really just a matter of someone finding the
> > > time to KIP, implement, and review/implement. (And that review/commit
> one
> > > realistically means we need multiple people's time). Happy to guide
> > anyone
> > > interested on next steps. If not addressed by general community,
> > Confluent
> > > will get to this at some point, but I couldn't say when that would be
> --
> > > Randall might know better than I would.
> > >
> > >
> > >> (2) Kafka Connect doesn’t permit Connectors to smartly reposition
> after
> > >> rebalance
> > >> We’re using the S3 connector to dump files with a large number of
> > records
> > >> into an S3 bucket. About 100,000 records per file. Unfortunately,
> every
> > >> time a task fails, the consumer rebalance causes all partitions to get
> > >> re-shuffled amongst the various partitions. To compensate for this,
> the
> > >> connector gets stopped and started from what I can tell from the logs?
> > And
> > >> then picks up from the last consumer position that was committed to
> the
> > >> brokers.
> > >>
> > >> This doesn’t work great if you’re batching things into large numbers
> for
> > >> archival.
> > >>
> > >> For the S3 connector, for example: Let’s say I have two partitions and
> > the
> > >> connector has two tasks to process each of those. Task 0 is at 5,000
> > >> records read from the last commit and Task 1 is at 70,000 records read
> > from
> > >> the last commit. Then, boom, something goes wrong with Task 0 and it
> > falls
> > >> over. This triggers a rebalance and Task 1 has to take over the
> > workload.
> > >> Task 1 will, at this point, discard the 70,000 records in its buffer
> and
> > >> start from the last commit point. This failure mode is brutal for the
> > >> archival system we’re building.
> > >>
> > >>
> > > Yes, this is a known pain point. Usually it shows up as more of an
> issue
> > > for running a lot of connectors (where you don't want a tasks failure
> to
> > > unnecessarily affect unrelated work), but the concern for connectors
> > which
> > > do relatively infrequent commits is valid as well. I'll make a point on
> > the
> > > first solution then see below for more complete answer.
> > >
> > >
> > >> There are two solutions that I can think of to this:
> > >>
> > >> (A) Provide an interface for connectors to define their own rebalance
> > >> listener. This listener could compare the newly assigned list of
> > partitions
> > >> with a previously assigned list. For all partitions that this
> connector
> > was
> > >> already working on prior to the rebalance, it could manually seek to
> the
> > >> last position it locally processed before resuming. So, in the
> scenario
> > >> above Task 1 could keep an accounting file locally and seek over the
> > first
> > >> 70,000 records without reprocessing them. It would then wait until
> > after it
> > >> confirms the S3 upload to commit those offsets back to Kafka. This
> > ensures
> > >> that if the machine running Task 1 dies a new consumer can take its
> > place,
> > >> but we’ll still benefit from a local cache if one is present.
> > >>
> > >
> > > For sink tasks, this actually already exists -- see
> > > http://kafka.apache.org/10/javadoc/org/apache/kafka/
> > connect/sink/SinkTask.html#open-java.util.Collection- <
> > http://kafka.apache.org/10/javadoc/org/apache/kafka/
> > connect/sink/SinkTask.html#open-java.util.Collection->
> > > (open() and close() are the relevant APIs in there). It's not
> necessarily
> > > meant for this purpose originally, but you could take advantage to
> > benefit
> > > from any coincidental overlap in previous and new partition sets.
> > >
> > >
> > >>
> > >> (B) Have connect manually round robin partitions on a topic to tasks
> and
> > >> never rebalance them automatically. If this were combined with better
> > task
> > >> retry semantics, I think this solution would be simpler.
> > >>
> > >
> > > The real answer here is to get rid of the global rebalance and/or try
> to
> > > achieve some level of stickiness for tasks. The global rebalance was
> > > something we knew could become a scalability issue and when we extended
> > the
> > > group protocol to support connect in addition to consumers (or really
> any
> > > new layer you want to add), we specifically made sure we had a plan to
> > > extend and upgrade that protocol, and that we'd be able to implement it
> > > such that you could do that upgrade with no downtime.
> > >
> > > I've had proposals for doing this floating around my head since the
> first
> > > version of Connect, and I've got a draft of some options that I wrote
> up
> > > internally. This particular issue is actually a lot more general than
> > just
> > > for Connect. It makes things more complicated (a good reason to be
> > > conservative in adding a solution!), but Kafka Streams for nodes that
> > have
> > > large state stores can also benefit from a better rebalance protocol,
> as
> > > can any consumer that has to manage some significant local state.
> > >
> > > The good news is that the implementations here that get you the biggest
> > > bang for your implementation-cost buck aren't that complicated. The bad
> > > news is that we have to do it separately for the normal consumer and
> > > Connect.
> > >
> > > Again, this mostly boils down to finding time to go KIP and implement,
> > but
> > > the basic ideas for partial, sticky, incremental, and deferred
> > rebalancing
> > > and their implications are pretty well understood by the core team of
> > > Connect developers now.
> > >
> > > Let me see if I can get some copies of some wikis that got written up
> > > internally at Confluent ported to the Kafka wiki. No real reason we
> > haven't
> > > put it there, just happened to create it internally and didn't think to
> > > copy it over. Probably both would be useful if anyone in the community
> > > wants to tackle these problems.
> > >
> > >
> > >>
> > >> (3) As far as I can tell, JMX metrics aren’t reporting the number of
> > >> active tasks
> > >> This one is arguably the simplest issue to resolve, but we’d like to
> > alert
> > >> if the number of active tasks isn’t what we expect it to be so that we
> > can
> > >> have a human investigate.
> > >>
> > >
> > > Interesting. I think you're right that we're probably just reporting
> > > *assigned* tasks with the task-count metric rather than active tasks. I
> > > think an active tasks metric would be reasonable, though since you
> really
> > > need to look at the aggregate across workers, I'm not sure it's the
> best
> > > for alerting.
> > >
> > > Maybe an unhealthy/dead tasks count metric would be better? You can
> alert
> > > on that directly without having to aggregate across workers.
> > >
> > > -Ewen
> > >
> > >
> > >>
> > >> -----------
> > >>
> > >> I would love thoughts on all of the above from anyone on this list.
> > >>
> > >> Thanks,
> > >>
> > >> Matt Farmer
> >
> >
>

Re: Seeking Feedback on Kafka Connect Issues

Posted by Randall Hauch <rh...@gmail.com>.
Yes, Confluent would be interested in improvements to the S3 connector.
Feel free to create an issue/PR in
https://github.com/confluentinc/kafka-connect-storage-cloud/.

I just created https://issues.apache.org/jira/browse/KAFKA-6738 to deal
with the bad data handling issue, and we can use that to track all of the
comments, discussions, and work. I know that Konstantine K has already
thought a fair amount about this, and so I've assigned it (at least
initially) to him. This is something we'd like to get into the next AK
release (2.0?), but would certainly appreciate any help from you or any
other members of the community. If you're willing to help, I'd ask that you
please coordinate with him on
https://issues.apache.org/jira/browse/KAFKA-6738.

As a side note, the KIP freeze for each release is often a good month
before the planned release, and feature freeze usually only a week after
that. This means that KIP that fails to be approved before this deadline
will be pushed to the next release - all the more reason to work on the KIP
and the implementation well before deadline.

Randall

On Tue, Mar 20, 2018 at 9:49 AM, Matt Farmer <ma...@frmr.me> wrote:

> Hi Ewen,
>
> Thanks for the thoughtful response. I’m happy to take some time to write
> up a KIP and do some implementation work here.
> I did KIP-210 previously, so I’ve been through the process before. We also
> have some organizational interest for improving
> Kafka Connect. Our concern internally is that we don’t want to wait on the
> KIP cycle to fully complete before rolling out
> something. It was many months for KIP-210 to go from draft to merge.
>
> It might be sufficient for us in the interim to:
>
> (1) Improve the S3 connector using close/open to be smarter about what
> multipart uploads it cancels during rebalance.
>
> (2) Implement a sidecar service that monitors the connect API and restart
> tasks that fail
>
> … and in parallel work on the KIPs required to provide a less patchwork
> solution in the framework itself.
>
> Is such a contribution to the S3 connector something that Confluent would
> be open to?
>
> > On Mar 19, 2018, at 10:00 PM, Ewen Cheslack-Postava <ew...@confluent.io>
> wrote:
> >
> > Responses inline.
> >
> > On Mon, Mar 19, 2018 at 3:02 PM, Matt Farmer <matt@frmr.me <mailto:
> matt@frmr.me>> wrote:
> >
> >> Hi everyone,
> >>
> >> We’ve been experimenting recently with some limited use of Kafka Connect
> >> and are hoping to expand to wider use cases soon. However, we had some
> >> internal issues that gave us a well-timed preview of error handling
> >> behavior in Kafka Connect. I think the fixes for this will require at
> least
> >> three different KIPs, but I want to share some thoughts to get the
> initial
> >> reaction from folks in the dev community. If these ideas seem
> reasonable, I
> >> can go ahead and create the required KIPs.
> >>
> >> Here are the three things specifically we ran into…
> >>
> >> -----------
> >>
> >> (1) Kafka Connect only retries tasks when certain exceptions are thrown
> >> Currently, Kafka Connect only retries tasks when certain exceptions are
> >> thrown - I believe the logic checks to see if the exception is
> specifically
> >> marked as “retryable” and if not, fails. We’d like to bypass this
> behavior
> >> and implement a configurable exponential backoff for tasks regardless of
> >> the failure reason. This is probably two changes: one to implement
> >> exponential backoff retries for tasks if they don’t already exist and a
> >> chance to implement a RetryPolicy interface that evaluates the
> Exception to
> >> determine whether or not to retry.
> >>
> >
> > This has definitely come up before. The likely "fix" for this is to
> provide
> > general "bad data handling" options within the framework itself. The
> > obvious set would be
> >
> > 1. fail fast, which is what we do today (assuming connector actually
> fails
> > and doesn't eat errors)
> > 2. retry (possibly with configs to limit)
> > 3. drop data and move on
> > 4. dead letter queue
> >
> > This needs to be addressed in a way that handles errors from:
> >
> > 1. The connector itself (e.g. connectivity issues to the other system)
> > 2. Converters/serializers (bad data, unexpected format, etc)
> > 3. SMTs
> > 4. Ideally the fmwk as well (though I don't think we have any known bugs
> > where this would be a problem, and we'd be inclined to just fix them
> > anyway).
> >
> > I think we understand the space of problems and how to address them
> pretty
> > well already, this issue is really just a matter of someone finding the
> > time to KIP, implement, and review/implement. (And that review/commit one
> > realistically means we need multiple people's time). Happy to guide
> anyone
> > interested on next steps. If not addressed by general community,
> Confluent
> > will get to this at some point, but I couldn't say when that would be --
> > Randall might know better than I would.
> >
> >
> >> (2) Kafka Connect doesn’t permit Connectors to smartly reposition after
> >> rebalance
> >> We’re using the S3 connector to dump files with a large number of
> records
> >> into an S3 bucket. About 100,000 records per file. Unfortunately, every
> >> time a task fails, the consumer rebalance causes all partitions to get
> >> re-shuffled amongst the various partitions. To compensate for this, the
> >> connector gets stopped and started from what I can tell from the logs?
> And
> >> then picks up from the last consumer position that was committed to the
> >> brokers.
> >>
> >> This doesn’t work great if you’re batching things into large numbers for
> >> archival.
> >>
> >> For the S3 connector, for example: Let’s say I have two partitions and
> the
> >> connector has two tasks to process each of those. Task 0 is at 5,000
> >> records read from the last commit and Task 1 is at 70,000 records read
> from
> >> the last commit. Then, boom, something goes wrong with Task 0 and it
> falls
> >> over. This triggers a rebalance and Task 1 has to take over the
> workload.
> >> Task 1 will, at this point, discard the 70,000 records in its buffer and
> >> start from the last commit point. This failure mode is brutal for the
> >> archival system we’re building.
> >>
> >>
> > Yes, this is a known pain point. Usually it shows up as more of an issue
> > for running a lot of connectors (where you don't want a tasks failure to
> > unnecessarily affect unrelated work), but the concern for connectors
> which
> > do relatively infrequent commits is valid as well. I'll make a point on
> the
> > first solution then see below for more complete answer.
> >
> >
> >> There are two solutions that I can think of to this:
> >>
> >> (A) Provide an interface for connectors to define their own rebalance
> >> listener. This listener could compare the newly assigned list of
> partitions
> >> with a previously assigned list. For all partitions that this connector
> was
> >> already working on prior to the rebalance, it could manually seek to the
> >> last position it locally processed before resuming. So, in the scenario
> >> above Task 1 could keep an accounting file locally and seek over the
> first
> >> 70,000 records without reprocessing them. It would then wait until
> after it
> >> confirms the S3 upload to commit those offsets back to Kafka. This
> ensures
> >> that if the machine running Task 1 dies a new consumer can take its
> place,
> >> but we’ll still benefit from a local cache if one is present.
> >>
> >
> > For sink tasks, this actually already exists -- see
> > http://kafka.apache.org/10/javadoc/org/apache/kafka/
> connect/sink/SinkTask.html#open-java.util.Collection- <
> http://kafka.apache.org/10/javadoc/org/apache/kafka/
> connect/sink/SinkTask.html#open-java.util.Collection->
> > (open() and close() are the relevant APIs in there). It's not necessarily
> > meant for this purpose originally, but you could take advantage to
> benefit
> > from any coincidental overlap in previous and new partition sets.
> >
> >
> >>
> >> (B) Have connect manually round robin partitions on a topic to tasks and
> >> never rebalance them automatically. If this were combined with better
> task
> >> retry semantics, I think this solution would be simpler.
> >>
> >
> > The real answer here is to get rid of the global rebalance and/or try to
> > achieve some level of stickiness for tasks. The global rebalance was
> > something we knew could become a scalability issue and when we extended
> the
> > group protocol to support connect in addition to consumers (or really any
> > new layer you want to add), we specifically made sure we had a plan to
> > extend and upgrade that protocol, and that we'd be able to implement it
> > such that you could do that upgrade with no downtime.
> >
> > I've had proposals for doing this floating around my head since the first
> > version of Connect, and I've got a draft of some options that I wrote up
> > internally. This particular issue is actually a lot more general than
> just
> > for Connect. It makes things more complicated (a good reason to be
> > conservative in adding a solution!), but Kafka Streams for nodes that
> have
> > large state stores can also benefit from a better rebalance protocol, as
> > can any consumer that has to manage some significant local state.
> >
> > The good news is that the implementations here that get you the biggest
> > bang for your implementation-cost buck aren't that complicated. The bad
> > news is that we have to do it separately for the normal consumer and
> > Connect.
> >
> > Again, this mostly boils down to finding time to go KIP and implement,
> but
> > the basic ideas for partial, sticky, incremental, and deferred
> rebalancing
> > and their implications are pretty well understood by the core team of
> > Connect developers now.
> >
> > Let me see if I can get some copies of some wikis that got written up
> > internally at Confluent ported to the Kafka wiki. No real reason we
> haven't
> > put it there, just happened to create it internally and didn't think to
> > copy it over. Probably both would be useful if anyone in the community
> > wants to tackle these problems.
> >
> >
> >>
> >> (3) As far as I can tell, JMX metrics aren’t reporting the number of
> >> active tasks
> >> This one is arguably the simplest issue to resolve, but we’d like to
> alert
> >> if the number of active tasks isn’t what we expect it to be so that we
> can
> >> have a human investigate.
> >>
> >
> > Interesting. I think you're right that we're probably just reporting
> > *assigned* tasks with the task-count metric rather than active tasks. I
> > think an active tasks metric would be reasonable, though since you really
> > need to look at the aggregate across workers, I'm not sure it's the best
> > for alerting.
> >
> > Maybe an unhealthy/dead tasks count metric would be better? You can alert
> > on that directly without having to aggregate across workers.
> >
> > -Ewen
> >
> >
> >>
> >> -----------
> >>
> >> I would love thoughts on all of the above from anyone on this list.
> >>
> >> Thanks,
> >>
> >> Matt Farmer
>
>

Re: Seeking Feedback on Kafka Connect Issues

Posted by Matt Farmer <ma...@frmr.me>.
Hi Ewen,

Thanks for the thoughtful response. I’m happy to take some time to write up a KIP and do some implementation work here.
I did KIP-210 previously, so I’ve been through the process before. We also have some organizational interest for improving
Kafka Connect. Our concern internally is that we don’t want to wait on the KIP cycle to fully complete before rolling out
something. It was many months for KIP-210 to go from draft to merge.

It might be sufficient for us in the interim to:

(1) Improve the S3 connector using close/open to be smarter about what multipart uploads it cancels during rebalance.

(2) Implement a sidecar service that monitors the connect API and restart tasks that fail

… and in parallel work on the KIPs required to provide a less patchwork solution in the framework itself.

Is such a contribution to the S3 connector something that Confluent would be open to?

> On Mar 19, 2018, at 10:00 PM, Ewen Cheslack-Postava <ew...@confluent.io> wrote:
> 
> Responses inline.
> 
> On Mon, Mar 19, 2018 at 3:02 PM, Matt Farmer <matt@frmr.me <ma...@frmr.me>> wrote:
> 
>> Hi everyone,
>> 
>> We’ve been experimenting recently with some limited use of Kafka Connect
>> and are hoping to expand to wider use cases soon. However, we had some
>> internal issues that gave us a well-timed preview of error handling
>> behavior in Kafka Connect. I think the fixes for this will require at least
>> three different KIPs, but I want to share some thoughts to get the initial
>> reaction from folks in the dev community. If these ideas seem reasonable, I
>> can go ahead and create the required KIPs.
>> 
>> Here are the three things specifically we ran into…
>> 
>> -----------
>> 
>> (1) Kafka Connect only retries tasks when certain exceptions are thrown
>> Currently, Kafka Connect only retries tasks when certain exceptions are
>> thrown - I believe the logic checks to see if the exception is specifically
>> marked as “retryable” and if not, fails. We’d like to bypass this behavior
>> and implement a configurable exponential backoff for tasks regardless of
>> the failure reason. This is probably two changes: one to implement
>> exponential backoff retries for tasks if they don’t already exist and a
>> chance to implement a RetryPolicy interface that evaluates the Exception to
>> determine whether or not to retry.
>> 
> 
> This has definitely come up before. The likely "fix" for this is to provide
> general "bad data handling" options within the framework itself. The
> obvious set would be
> 
> 1. fail fast, which is what we do today (assuming connector actually fails
> and doesn't eat errors)
> 2. retry (possibly with configs to limit)
> 3. drop data and move on
> 4. dead letter queue
> 
> This needs to be addressed in a way that handles errors from:
> 
> 1. The connector itself (e.g. connectivity issues to the other system)
> 2. Converters/serializers (bad data, unexpected format, etc)
> 3. SMTs
> 4. Ideally the fmwk as well (though I don't think we have any known bugs
> where this would be a problem, and we'd be inclined to just fix them
> anyway).
> 
> I think we understand the space of problems and how to address them pretty
> well already, this issue is really just a matter of someone finding the
> time to KIP, implement, and review/implement. (And that review/commit one
> realistically means we need multiple people's time). Happy to guide anyone
> interested on next steps. If not addressed by general community, Confluent
> will get to this at some point, but I couldn't say when that would be --
> Randall might know better than I would.
> 
> 
>> (2) Kafka Connect doesn’t permit Connectors to smartly reposition after
>> rebalance
>> We’re using the S3 connector to dump files with a large number of records
>> into an S3 bucket. About 100,000 records per file. Unfortunately, every
>> time a task fails, the consumer rebalance causes all partitions to get
>> re-shuffled amongst the various partitions. To compensate for this, the
>> connector gets stopped and started from what I can tell from the logs? And
>> then picks up from the last consumer position that was committed to the
>> brokers.
>> 
>> This doesn’t work great if you’re batching things into large numbers for
>> archival.
>> 
>> For the S3 connector, for example: Let’s say I have two partitions and the
>> connector has two tasks to process each of those. Task 0 is at 5,000
>> records read from the last commit and Task 1 is at 70,000 records read from
>> the last commit. Then, boom, something goes wrong with Task 0 and it falls
>> over. This triggers a rebalance and Task 1 has to take over the workload.
>> Task 1 will, at this point, discard the 70,000 records in its buffer and
>> start from the last commit point. This failure mode is brutal for the
>> archival system we’re building.
>> 
>> 
> Yes, this is a known pain point. Usually it shows up as more of an issue
> for running a lot of connectors (where you don't want a tasks failure to
> unnecessarily affect unrelated work), but the concern for connectors which
> do relatively infrequent commits is valid as well. I'll make a point on the
> first solution then see below for more complete answer.
> 
> 
>> There are two solutions that I can think of to this:
>> 
>> (A) Provide an interface for connectors to define their own rebalance
>> listener. This listener could compare the newly assigned list of partitions
>> with a previously assigned list. For all partitions that this connector was
>> already working on prior to the rebalance, it could manually seek to the
>> last position it locally processed before resuming. So, in the scenario
>> above Task 1 could keep an accounting file locally and seek over the first
>> 70,000 records without reprocessing them. It would then wait until after it
>> confirms the S3 upload to commit those offsets back to Kafka. This ensures
>> that if the machine running Task 1 dies a new consumer can take its place,
>> but we’ll still benefit from a local cache if one is present.
>> 
> 
> For sink tasks, this actually already exists -- see
> http://kafka.apache.org/10/javadoc/org/apache/kafka/connect/sink/SinkTask.html#open-java.util.Collection- <http://kafka.apache.org/10/javadoc/org/apache/kafka/connect/sink/SinkTask.html#open-java.util.Collection->
> (open() and close() are the relevant APIs in there). It's not necessarily
> meant for this purpose originally, but you could take advantage to benefit
> from any coincidental overlap in previous and new partition sets.
> 
> 
>> 
>> (B) Have connect manually round robin partitions on a topic to tasks and
>> never rebalance them automatically. If this were combined with better task
>> retry semantics, I think this solution would be simpler.
>> 
> 
> The real answer here is to get rid of the global rebalance and/or try to
> achieve some level of stickiness for tasks. The global rebalance was
> something we knew could become a scalability issue and when we extended the
> group protocol to support connect in addition to consumers (or really any
> new layer you want to add), we specifically made sure we had a plan to
> extend and upgrade that protocol, and that we'd be able to implement it
> such that you could do that upgrade with no downtime.
> 
> I've had proposals for doing this floating around my head since the first
> version of Connect, and I've got a draft of some options that I wrote up
> internally. This particular issue is actually a lot more general than just
> for Connect. It makes things more complicated (a good reason to be
> conservative in adding a solution!), but Kafka Streams for nodes that have
> large state stores can also benefit from a better rebalance protocol, as
> can any consumer that has to manage some significant local state.
> 
> The good news is that the implementations here that get you the biggest
> bang for your implementation-cost buck aren't that complicated. The bad
> news is that we have to do it separately for the normal consumer and
> Connect.
> 
> Again, this mostly boils down to finding time to go KIP and implement, but
> the basic ideas for partial, sticky, incremental, and deferred rebalancing
> and their implications are pretty well understood by the core team of
> Connect developers now.
> 
> Let me see if I can get some copies of some wikis that got written up
> internally at Confluent ported to the Kafka wiki. No real reason we haven't
> put it there, just happened to create it internally and didn't think to
> copy it over. Probably both would be useful if anyone in the community
> wants to tackle these problems.
> 
> 
>> 
>> (3) As far as I can tell, JMX metrics aren’t reporting the number of
>> active tasks
>> This one is arguably the simplest issue to resolve, but we’d like to alert
>> if the number of active tasks isn’t what we expect it to be so that we can
>> have a human investigate.
>> 
> 
> Interesting. I think you're right that we're probably just reporting
> *assigned* tasks with the task-count metric rather than active tasks. I
> think an active tasks metric would be reasonable, though since you really
> need to look at the aggregate across workers, I'm not sure it's the best
> for alerting.
> 
> Maybe an unhealthy/dead tasks count metric would be better? You can alert
> on that directly without having to aggregate across workers.
> 
> -Ewen
> 
> 
>> 
>> -----------
>> 
>> I would love thoughts on all of the above from anyone on this list.
>> 
>> Thanks,
>> 
>> Matt Farmer


Re: Seeking Feedback on Kafka Connect Issues

Posted by Ewen Cheslack-Postava <ew...@confluent.io>.
Responses inline.

On Mon, Mar 19, 2018 at 3:02 PM, Matt Farmer <ma...@frmr.me> wrote:

> Hi everyone,
>
> We’ve been experimenting recently with some limited use of Kafka Connect
> and are hoping to expand to wider use cases soon. However, we had some
> internal issues that gave us a well-timed preview of error handling
> behavior in Kafka Connect. I think the fixes for this will require at least
> three different KIPs, but I want to share some thoughts to get the initial
> reaction from folks in the dev community. If these ideas seem reasonable, I
> can go ahead and create the required KIPs.
>
> Here are the three things specifically we ran into…
>
> -----------
>
> (1) Kafka Connect only retries tasks when certain exceptions are thrown
> Currently, Kafka Connect only retries tasks when certain exceptions are
> thrown - I believe the logic checks to see if the exception is specifically
> marked as “retryable” and if not, fails. We’d like to bypass this behavior
> and implement a configurable exponential backoff for tasks regardless of
> the failure reason. This is probably two changes: one to implement
> exponential backoff retries for tasks if they don’t already exist and a
> chance to implement a RetryPolicy interface that evaluates the Exception to
> determine whether or not to retry.
>

This has definitely come up before. The likely "fix" for this is to provide
general "bad data handling" options within the framework itself. The
obvious set would be

1. fail fast, which is what we do today (assuming connector actually fails
and doesn't eat errors)
2. retry (possibly with configs to limit)
3. drop data and move on
4. dead letter queue

This needs to be addressed in a way that handles errors from:

1. The connector itself (e.g. connectivity issues to the other system)
2. Converters/serializers (bad data, unexpected format, etc)
3. SMTs
4. Ideally the fmwk as well (though I don't think we have any known bugs
where this would be a problem, and we'd be inclined to just fix them
anyway).

I think we understand the space of problems and how to address them pretty
well already, this issue is really just a matter of someone finding the
time to KIP, implement, and review/implement. (And that review/commit one
realistically means we need multiple people's time). Happy to guide anyone
interested on next steps. If not addressed by general community, Confluent
will get to this at some point, but I couldn't say when that would be --
Randall might know better than I would.


> (2) Kafka Connect doesn’t permit Connectors to smartly reposition after
> rebalance
> We’re using the S3 connector to dump files with a large number of records
> into an S3 bucket. About 100,000 records per file. Unfortunately, every
> time a task fails, the consumer rebalance causes all partitions to get
> re-shuffled amongst the various partitions. To compensate for this, the
> connector gets stopped and started from what I can tell from the logs? And
> then picks up from the last consumer position that was committed to the
> brokers.
>
> This doesn’t work great if you’re batching things into large numbers for
> archival.
>
> For the S3 connector, for example: Let’s say I have two partitions and the
> connector has two tasks to process each of those. Task 0 is at 5,000
> records read from the last commit and Task 1 is at 70,000 records read from
> the last commit. Then, boom, something goes wrong with Task 0 and it falls
> over. This triggers a rebalance and Task 1 has to take over the workload.
> Task 1 will, at this point, discard the 70,000 records in its buffer and
> start from the last commit point. This failure mode is brutal for the
> archival system we’re building.
>
>
Yes, this is a known pain point. Usually it shows up as more of an issue
for running a lot of connectors (where you don't want a tasks failure to
unnecessarily affect unrelated work), but the concern for connectors which
do relatively infrequent commits is valid as well. I'll make a point on the
first solution then see below for more complete answer.


> There are two solutions that I can think of to this:
>
> (A) Provide an interface for connectors to define their own rebalance
> listener. This listener could compare the newly assigned list of partitions
> with a previously assigned list. For all partitions that this connector was
> already working on prior to the rebalance, it could manually seek to the
> last position it locally processed before resuming. So, in the scenario
> above Task 1 could keep an accounting file locally and seek over the first
> 70,000 records without reprocessing them. It would then wait until after it
> confirms the S3 upload to commit those offsets back to Kafka. This ensures
> that if the machine running Task 1 dies a new consumer can take its place,
> but we’ll still benefit from a local cache if one is present.
>

For sink tasks, this actually already exists -- see
http://kafka.apache.org/10/javadoc/org/apache/kafka/connect/sink/SinkTask.html#open-java.util.Collection-
(open() and close() are the relevant APIs in there). It's not necessarily
meant for this purpose originally, but you could take advantage to benefit
from any coincidental overlap in previous and new partition sets.


>
> (B) Have connect manually round robin partitions on a topic to tasks and
> never rebalance them automatically. If this were combined with better task
> retry semantics, I think this solution would be simpler.
>

The real answer here is to get rid of the global rebalance and/or try to
achieve some level of stickiness for tasks. The global rebalance was
something we knew could become a scalability issue and when we extended the
group protocol to support connect in addition to consumers (or really any
new layer you want to add), we specifically made sure we had a plan to
extend and upgrade that protocol, and that we'd be able to implement it
such that you could do that upgrade with no downtime.

I've had proposals for doing this floating around my head since the first
version of Connect, and I've got a draft of some options that I wrote up
internally. This particular issue is actually a lot more general than just
for Connect. It makes things more complicated (a good reason to be
conservative in adding a solution!), but Kafka Streams for nodes that have
large state stores can also benefit from a better rebalance protocol, as
can any consumer that has to manage some significant local state.

The good news is that the implementations here that get you the biggest
bang for your implementation-cost buck aren't that complicated. The bad
news is that we have to do it separately for the normal consumer and
Connect.

Again, this mostly boils down to finding time to go KIP and implement, but
the basic ideas for partial, sticky, incremental, and deferred rebalancing
and their implications are pretty well understood by the core team of
Connect developers now.

Let me see if I can get some copies of some wikis that got written up
internally at Confluent ported to the Kafka wiki. No real reason we haven't
put it there, just happened to create it internally and didn't think to
copy it over. Probably both would be useful if anyone in the community
wants to tackle these problems.


>
> (3) As far as I can tell, JMX metrics aren’t reporting the number of
> active tasks
> This one is arguably the simplest issue to resolve, but we’d like to alert
> if the number of active tasks isn’t what we expect it to be so that we can
> have a human investigate.
>

Interesting. I think you're right that we're probably just reporting
*assigned* tasks with the task-count metric rather than active tasks. I
think an active tasks metric would be reasonable, though since you really
need to look at the aggregate across workers, I'm not sure it's the best
for alerting.

Maybe an unhealthy/dead tasks count metric would be better? You can alert
on that directly without having to aggregate across workers.

-Ewen


>
> -----------
>
> I would love thoughts on all of the above from anyone on this list.
>
> Thanks,
>
> Matt Farmer