You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Randall Hauch <rh...@gmail.com> on 2018/04/02 15:18:16 UTC

Re: Seeking Feedback on Kafka Connect Issues

Yes, Confluent would be interested in improvements to the S3 connector.
Feel free to create an issue/PR in
https://github.com/confluentinc/kafka-connect-storage-cloud/.

I just created https://issues.apache.org/jira/browse/KAFKA-6738 to deal
with the bad data handling issue, and we can use that to track all of the
comments, discussions, and work. I know that Konstantine K has already
thought a fair amount about this, and so I've assigned it (at least
initially) to him. This is something we'd like to get into the next AK
release (2.0?), but would certainly appreciate any help from you or any
other members of the community. If you're willing to help, I'd ask that you
please coordinate with him on
https://issues.apache.org/jira/browse/KAFKA-6738.

As a side note, the KIP freeze for each release is often a good month
before the planned release, and feature freeze usually only a week after
that. This means that KIP that fails to be approved before this deadline
will be pushed to the next release - all the more reason to work on the KIP
and the implementation well before deadline.

Randall

On Tue, Mar 20, 2018 at 9:49 AM, Matt Farmer <ma...@frmr.me> wrote:

> Hi Ewen,
>
> Thanks for the thoughtful response. I’m happy to take some time to write
> up a KIP and do some implementation work here.
> I did KIP-210 previously, so I’ve been through the process before. We also
> have some organizational interest for improving
> Kafka Connect. Our concern internally is that we don’t want to wait on the
> KIP cycle to fully complete before rolling out
> something. It was many months for KIP-210 to go from draft to merge.
>
> It might be sufficient for us in the interim to:
>
> (1) Improve the S3 connector using close/open to be smarter about what
> multipart uploads it cancels during rebalance.
>
> (2) Implement a sidecar service that monitors the connect API and restart
> tasks that fail
>
> … and in parallel work on the KIPs required to provide a less patchwork
> solution in the framework itself.
>
> Is such a contribution to the S3 connector something that Confluent would
> be open to?
>
> > On Mar 19, 2018, at 10:00 PM, Ewen Cheslack-Postava <ew...@confluent.io>
> wrote:
> >
> > Responses inline.
> >
> > On Mon, Mar 19, 2018 at 3:02 PM, Matt Farmer <matt@frmr.me <mailto:
> matt@frmr.me>> wrote:
> >
> >> Hi everyone,
> >>
> >> We’ve been experimenting recently with some limited use of Kafka Connect
> >> and are hoping to expand to wider use cases soon. However, we had some
> >> internal issues that gave us a well-timed preview of error handling
> >> behavior in Kafka Connect. I think the fixes for this will require at
> least
> >> three different KIPs, but I want to share some thoughts to get the
> initial
> >> reaction from folks in the dev community. If these ideas seem
> reasonable, I
> >> can go ahead and create the required KIPs.
> >>
> >> Here are the three things specifically we ran into…
> >>
> >> -----------
> >>
> >> (1) Kafka Connect only retries tasks when certain exceptions are thrown
> >> Currently, Kafka Connect only retries tasks when certain exceptions are
> >> thrown - I believe the logic checks to see if the exception is
> specifically
> >> marked as “retryable” and if not, fails. We’d like to bypass this
> behavior
> >> and implement a configurable exponential backoff for tasks regardless of
> >> the failure reason. This is probably two changes: one to implement
> >> exponential backoff retries for tasks if they don’t already exist and a
> >> chance to implement a RetryPolicy interface that evaluates the
> Exception to
> >> determine whether or not to retry.
> >>
> >
> > This has definitely come up before. The likely "fix" for this is to
> provide
> > general "bad data handling" options within the framework itself. The
> > obvious set would be
> >
> > 1. fail fast, which is what we do today (assuming connector actually
> fails
> > and doesn't eat errors)
> > 2. retry (possibly with configs to limit)
> > 3. drop data and move on
> > 4. dead letter queue
> >
> > This needs to be addressed in a way that handles errors from:
> >
> > 1. The connector itself (e.g. connectivity issues to the other system)
> > 2. Converters/serializers (bad data, unexpected format, etc)
> > 3. SMTs
> > 4. Ideally the fmwk as well (though I don't think we have any known bugs
> > where this would be a problem, and we'd be inclined to just fix them
> > anyway).
> >
> > I think we understand the space of problems and how to address them
> pretty
> > well already, this issue is really just a matter of someone finding the
> > time to KIP, implement, and review/implement. (And that review/commit one
> > realistically means we need multiple people's time). Happy to guide
> anyone
> > interested on next steps. If not addressed by general community,
> Confluent
> > will get to this at some point, but I couldn't say when that would be --
> > Randall might know better than I would.
> >
> >
> >> (2) Kafka Connect doesn’t permit Connectors to smartly reposition after
> >> rebalance
> >> We’re using the S3 connector to dump files with a large number of
> records
> >> into an S3 bucket. About 100,000 records per file. Unfortunately, every
> >> time a task fails, the consumer rebalance causes all partitions to get
> >> re-shuffled amongst the various partitions. To compensate for this, the
> >> connector gets stopped and started from what I can tell from the logs?
> And
> >> then picks up from the last consumer position that was committed to the
> >> brokers.
> >>
> >> This doesn’t work great if you’re batching things into large numbers for
> >> archival.
> >>
> >> For the S3 connector, for example: Let’s say I have two partitions and
> the
> >> connector has two tasks to process each of those. Task 0 is at 5,000
> >> records read from the last commit and Task 1 is at 70,000 records read
> from
> >> the last commit. Then, boom, something goes wrong with Task 0 and it
> falls
> >> over. This triggers a rebalance and Task 1 has to take over the
> workload.
> >> Task 1 will, at this point, discard the 70,000 records in its buffer and
> >> start from the last commit point. This failure mode is brutal for the
> >> archival system we’re building.
> >>
> >>
> > Yes, this is a known pain point. Usually it shows up as more of an issue
> > for running a lot of connectors (where you don't want a tasks failure to
> > unnecessarily affect unrelated work), but the concern for connectors
> which
> > do relatively infrequent commits is valid as well. I'll make a point on
> the
> > first solution then see below for more complete answer.
> >
> >
> >> There are two solutions that I can think of to this:
> >>
> >> (A) Provide an interface for connectors to define their own rebalance
> >> listener. This listener could compare the newly assigned list of
> partitions
> >> with a previously assigned list. For all partitions that this connector
> was
> >> already working on prior to the rebalance, it could manually seek to the
> >> last position it locally processed before resuming. So, in the scenario
> >> above Task 1 could keep an accounting file locally and seek over the
> first
> >> 70,000 records without reprocessing them. It would then wait until
> after it
> >> confirms the S3 upload to commit those offsets back to Kafka. This
> ensures
> >> that if the machine running Task 1 dies a new consumer can take its
> place,
> >> but we’ll still benefit from a local cache if one is present.
> >>
> >
> > For sink tasks, this actually already exists -- see
> > http://kafka.apache.org/10/javadoc/org/apache/kafka/
> connect/sink/SinkTask.html#open-java.util.Collection- <
> http://kafka.apache.org/10/javadoc/org/apache/kafka/
> connect/sink/SinkTask.html#open-java.util.Collection->
> > (open() and close() are the relevant APIs in there). It's not necessarily
> > meant for this purpose originally, but you could take advantage to
> benefit
> > from any coincidental overlap in previous and new partition sets.
> >
> >
> >>
> >> (B) Have connect manually round robin partitions on a topic to tasks and
> >> never rebalance them automatically. If this were combined with better
> task
> >> retry semantics, I think this solution would be simpler.
> >>
> >
> > The real answer here is to get rid of the global rebalance and/or try to
> > achieve some level of stickiness for tasks. The global rebalance was
> > something we knew could become a scalability issue and when we extended
> the
> > group protocol to support connect in addition to consumers (or really any
> > new layer you want to add), we specifically made sure we had a plan to
> > extend and upgrade that protocol, and that we'd be able to implement it
> > such that you could do that upgrade with no downtime.
> >
> > I've had proposals for doing this floating around my head since the first
> > version of Connect, and I've got a draft of some options that I wrote up
> > internally. This particular issue is actually a lot more general than
> just
> > for Connect. It makes things more complicated (a good reason to be
> > conservative in adding a solution!), but Kafka Streams for nodes that
> have
> > large state stores can also benefit from a better rebalance protocol, as
> > can any consumer that has to manage some significant local state.
> >
> > The good news is that the implementations here that get you the biggest
> > bang for your implementation-cost buck aren't that complicated. The bad
> > news is that we have to do it separately for the normal consumer and
> > Connect.
> >
> > Again, this mostly boils down to finding time to go KIP and implement,
> but
> > the basic ideas for partial, sticky, incremental, and deferred
> rebalancing
> > and their implications are pretty well understood by the core team of
> > Connect developers now.
> >
> > Let me see if I can get some copies of some wikis that got written up
> > internally at Confluent ported to the Kafka wiki. No real reason we
> haven't
> > put it there, just happened to create it internally and didn't think to
> > copy it over. Probably both would be useful if anyone in the community
> > wants to tackle these problems.
> >
> >
> >>
> >> (3) As far as I can tell, JMX metrics aren’t reporting the number of
> >> active tasks
> >> This one is arguably the simplest issue to resolve, but we’d like to
> alert
> >> if the number of active tasks isn’t what we expect it to be so that we
> can
> >> have a human investigate.
> >>
> >
> > Interesting. I think you're right that we're probably just reporting
> > *assigned* tasks with the task-count metric rather than active tasks. I
> > think an active tasks metric would be reasonable, though since you really
> > need to look at the aggregate across workers, I'm not sure it's the best
> > for alerting.
> >
> > Maybe an unhealthy/dead tasks count metric would be better? You can alert
> > on that directly without having to aggregate across workers.
> >
> > -Ewen
> >
> >
> >>
> >> -----------
> >>
> >> I would love thoughts on all of the above from anyone on this list.
> >>
> >> Thanks,
> >>
> >> Matt Farmer
>
>

Re: Seeking Feedback on Kafka Connect Issues

Posted by Matt Farmer <ma...@frmr.me>.
Howdy,

I'll keep an eye on that ticket. KIP-275 came out of some work we've done
internally on our
private forks of Kafka and the Confluent Cloud Storage connector.
Essentially, with that extra
API we've tweaked the S3 connector to check the value of isClosing in
preCommit and immediately
attempt to commit files to S3 regardless of whether or not we've reached a
size or time limit.

We've been using this internally for a few days and it's been working well
for our needs. Whenever
it gets approved and merged I'll be able to open PRs against the Confluent
repos for the changes
we made pretty quickly.

We are, however, still interested in some better error handling for
Connect. I think in the interim
we're going to have to build a sidecar service that monitors the Connect
API for failed tasks
and restarts them for us. :(

Happy to provide whatever help I can toward making that sidecar service not
needed.

On Mon, Apr 2, 2018 at 11:18 AM, Randall Hauch <rh...@gmail.com> wrote:

> Yes, Confluent would be interested in improvements to the S3 connector.
> Feel free to create an issue/PR in
> https://github.com/confluentinc/kafka-connect-storage-cloud/.
>
> I just created https://issues.apache.org/jira/browse/KAFKA-6738 to deal
> with the bad data handling issue, and we can use that to track all of the
> comments, discussions, and work. I know that Konstantine K has already
> thought a fair amount about this, and so I've assigned it (at least
> initially) to him. This is something we'd like to get into the next AK
> release (2.0?), but would certainly appreciate any help from you or any
> other members of the community. If you're willing to help, I'd ask that you
> please coordinate with him on
> https://issues.apache.org/jira/browse/KAFKA-6738.
>
> As a side note, the KIP freeze for each release is often a good month
> before the planned release, and feature freeze usually only a week after
> that. This means that KIP that fails to be approved before this deadline
> will be pushed to the next release - all the more reason to work on the KIP
> and the implementation well before deadline.
>
> Randall
>
> On Tue, Mar 20, 2018 at 9:49 AM, Matt Farmer <ma...@frmr.me> wrote:
>
> > Hi Ewen,
> >
> > Thanks for the thoughtful response. I’m happy to take some time to write
> > up a KIP and do some implementation work here.
> > I did KIP-210 previously, so I’ve been through the process before. We
> also
> > have some organizational interest for improving
> > Kafka Connect. Our concern internally is that we don’t want to wait on
> the
> > KIP cycle to fully complete before rolling out
> > something. It was many months for KIP-210 to go from draft to merge.
> >
> > It might be sufficient for us in the interim to:
> >
> > (1) Improve the S3 connector using close/open to be smarter about what
> > multipart uploads it cancels during rebalance.
> >
> > (2) Implement a sidecar service that monitors the connect API and restart
> > tasks that fail
> >
> > … and in parallel work on the KIPs required to provide a less patchwork
> > solution in the framework itself.
> >
> > Is such a contribution to the S3 connector something that Confluent would
> > be open to?
> >
> > > On Mar 19, 2018, at 10:00 PM, Ewen Cheslack-Postava <ewen@confluent.io
> >
> > wrote:
> > >
> > > Responses inline.
> > >
> > > On Mon, Mar 19, 2018 at 3:02 PM, Matt Farmer <matt@frmr.me <mailto:
> > matt@frmr.me>> wrote:
> > >
> > >> Hi everyone,
> > >>
> > >> We’ve been experimenting recently with some limited use of Kafka
> Connect
> > >> and are hoping to expand to wider use cases soon. However, we had some
> > >> internal issues that gave us a well-timed preview of error handling
> > >> behavior in Kafka Connect. I think the fixes for this will require at
> > least
> > >> three different KIPs, but I want to share some thoughts to get the
> > initial
> > >> reaction from folks in the dev community. If these ideas seem
> > reasonable, I
> > >> can go ahead and create the required KIPs.
> > >>
> > >> Here are the three things specifically we ran into…
> > >>
> > >> -----------
> > >>
> > >> (1) Kafka Connect only retries tasks when certain exceptions are
> thrown
> > >> Currently, Kafka Connect only retries tasks when certain exceptions
> are
> > >> thrown - I believe the logic checks to see if the exception is
> > specifically
> > >> marked as “retryable” and if not, fails. We’d like to bypass this
> > behavior
> > >> and implement a configurable exponential backoff for tasks regardless
> of
> > >> the failure reason. This is probably two changes: one to implement
> > >> exponential backoff retries for tasks if they don’t already exist and
> a
> > >> chance to implement a RetryPolicy interface that evaluates the
> > Exception to
> > >> determine whether or not to retry.
> > >>
> > >
> > > This has definitely come up before. The likely "fix" for this is to
> > provide
> > > general "bad data handling" options within the framework itself. The
> > > obvious set would be
> > >
> > > 1. fail fast, which is what we do today (assuming connector actually
> > fails
> > > and doesn't eat errors)
> > > 2. retry (possibly with configs to limit)
> > > 3. drop data and move on
> > > 4. dead letter queue
> > >
> > > This needs to be addressed in a way that handles errors from:
> > >
> > > 1. The connector itself (e.g. connectivity issues to the other system)
> > > 2. Converters/serializers (bad data, unexpected format, etc)
> > > 3. SMTs
> > > 4. Ideally the fmwk as well (though I don't think we have any known
> bugs
> > > where this would be a problem, and we'd be inclined to just fix them
> > > anyway).
> > >
> > > I think we understand the space of problems and how to address them
> > pretty
> > > well already, this issue is really just a matter of someone finding the
> > > time to KIP, implement, and review/implement. (And that review/commit
> one
> > > realistically means we need multiple people's time). Happy to guide
> > anyone
> > > interested on next steps. If not addressed by general community,
> > Confluent
> > > will get to this at some point, but I couldn't say when that would be
> --
> > > Randall might know better than I would.
> > >
> > >
> > >> (2) Kafka Connect doesn’t permit Connectors to smartly reposition
> after
> > >> rebalance
> > >> We’re using the S3 connector to dump files with a large number of
> > records
> > >> into an S3 bucket. About 100,000 records per file. Unfortunately,
> every
> > >> time a task fails, the consumer rebalance causes all partitions to get
> > >> re-shuffled amongst the various partitions. To compensate for this,
> the
> > >> connector gets stopped and started from what I can tell from the logs?
> > And
> > >> then picks up from the last consumer position that was committed to
> the
> > >> brokers.
> > >>
> > >> This doesn’t work great if you’re batching things into large numbers
> for
> > >> archival.
> > >>
> > >> For the S3 connector, for example: Let’s say I have two partitions and
> > the
> > >> connector has two tasks to process each of those. Task 0 is at 5,000
> > >> records read from the last commit and Task 1 is at 70,000 records read
> > from
> > >> the last commit. Then, boom, something goes wrong with Task 0 and it
> > falls
> > >> over. This triggers a rebalance and Task 1 has to take over the
> > workload.
> > >> Task 1 will, at this point, discard the 70,000 records in its buffer
> and
> > >> start from the last commit point. This failure mode is brutal for the
> > >> archival system we’re building.
> > >>
> > >>
> > > Yes, this is a known pain point. Usually it shows up as more of an
> issue
> > > for running a lot of connectors (where you don't want a tasks failure
> to
> > > unnecessarily affect unrelated work), but the concern for connectors
> > which
> > > do relatively infrequent commits is valid as well. I'll make a point on
> > the
> > > first solution then see below for more complete answer.
> > >
> > >
> > >> There are two solutions that I can think of to this:
> > >>
> > >> (A) Provide an interface for connectors to define their own rebalance
> > >> listener. This listener could compare the newly assigned list of
> > partitions
> > >> with a previously assigned list. For all partitions that this
> connector
> > was
> > >> already working on prior to the rebalance, it could manually seek to
> the
> > >> last position it locally processed before resuming. So, in the
> scenario
> > >> above Task 1 could keep an accounting file locally and seek over the
> > first
> > >> 70,000 records without reprocessing them. It would then wait until
> > after it
> > >> confirms the S3 upload to commit those offsets back to Kafka. This
> > ensures
> > >> that if the machine running Task 1 dies a new consumer can take its
> > place,
> > >> but we’ll still benefit from a local cache if one is present.
> > >>
> > >
> > > For sink tasks, this actually already exists -- see
> > > http://kafka.apache.org/10/javadoc/org/apache/kafka/
> > connect/sink/SinkTask.html#open-java.util.Collection- <
> > http://kafka.apache.org/10/javadoc/org/apache/kafka/
> > connect/sink/SinkTask.html#open-java.util.Collection->
> > > (open() and close() are the relevant APIs in there). It's not
> necessarily
> > > meant for this purpose originally, but you could take advantage to
> > benefit
> > > from any coincidental overlap in previous and new partition sets.
> > >
> > >
> > >>
> > >> (B) Have connect manually round robin partitions on a topic to tasks
> and
> > >> never rebalance them automatically. If this were combined with better
> > task
> > >> retry semantics, I think this solution would be simpler.
> > >>
> > >
> > > The real answer here is to get rid of the global rebalance and/or try
> to
> > > achieve some level of stickiness for tasks. The global rebalance was
> > > something we knew could become a scalability issue and when we extended
> > the
> > > group protocol to support connect in addition to consumers (or really
> any
> > > new layer you want to add), we specifically made sure we had a plan to
> > > extend and upgrade that protocol, and that we'd be able to implement it
> > > such that you could do that upgrade with no downtime.
> > >
> > > I've had proposals for doing this floating around my head since the
> first
> > > version of Connect, and I've got a draft of some options that I wrote
> up
> > > internally. This particular issue is actually a lot more general than
> > just
> > > for Connect. It makes things more complicated (a good reason to be
> > > conservative in adding a solution!), but Kafka Streams for nodes that
> > have
> > > large state stores can also benefit from a better rebalance protocol,
> as
> > > can any consumer that has to manage some significant local state.
> > >
> > > The good news is that the implementations here that get you the biggest
> > > bang for your implementation-cost buck aren't that complicated. The bad
> > > news is that we have to do it separately for the normal consumer and
> > > Connect.
> > >
> > > Again, this mostly boils down to finding time to go KIP and implement,
> > but
> > > the basic ideas for partial, sticky, incremental, and deferred
> > rebalancing
> > > and their implications are pretty well understood by the core team of
> > > Connect developers now.
> > >
> > > Let me see if I can get some copies of some wikis that got written up
> > > internally at Confluent ported to the Kafka wiki. No real reason we
> > haven't
> > > put it there, just happened to create it internally and didn't think to
> > > copy it over. Probably both would be useful if anyone in the community
> > > wants to tackle these problems.
> > >
> > >
> > >>
> > >> (3) As far as I can tell, JMX metrics aren’t reporting the number of
> > >> active tasks
> > >> This one is arguably the simplest issue to resolve, but we’d like to
> > alert
> > >> if the number of active tasks isn’t what we expect it to be so that we
> > can
> > >> have a human investigate.
> > >>
> > >
> > > Interesting. I think you're right that we're probably just reporting
> > > *assigned* tasks with the task-count metric rather than active tasks. I
> > > think an active tasks metric would be reasonable, though since you
> really
> > > need to look at the aggregate across workers, I'm not sure it's the
> best
> > > for alerting.
> > >
> > > Maybe an unhealthy/dead tasks count metric would be better? You can
> alert
> > > on that directly without having to aggregate across workers.
> > >
> > > -Ewen
> > >
> > >
> > >>
> > >> -----------
> > >>
> > >> I would love thoughts on all of the above from anyone on this list.
> > >>
> > >> Thanks,
> > >>
> > >> Matt Farmer
> >
> >
>