You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Chris Jansen <ch...@permutive.com> on 2022/03/16 12:11:40 UTC

Transactions and `endOffsets` Java client consumer method

Hello all,

I have the need to query end offsets for a particular topic using a
consumer that has been configured with the "READ_COMMITTED" isolation
level. The response I get via the Java client
includes offsets at the end of the log that have not yet been committed and
therefore can't be consumed.

After digging around in the Java client code, it looks like the isolation
level is being transmitted in the API request to the broker. So my question
is does the broker use this information when crafting a response and, if it
is taken into account, why does it return log end offsets for transactions
that have not yet been committed?

For my own future reference I was struggling to find any details on the
broker API anywhere, if someone could point me in the right direction, I'd
really appreciate it.

Thanks,


Chris Jansen

Re: Transactions and `endOffsets` Java client consumer method

Posted by Guozhang Wang <wa...@gmail.com>.
Hi Chris,

Since you are using read_committed mode, the txn marker from the
`endOffsets()` should indeed be skipped, no matter for committed of aborted
txns. For example if the log looks like this:

offsets:   0, 1, 2, 3, t_c(4) // or t_a(4) which means "abort marker"

then the endOffset should return "5".


That's why I'm unclear why you're seeing this issue.


Guozhang

On Tue, Mar 22, 2022 at 6:19 AM Chris Jansen <ch...@permutive.com>
wrote:

> Thanks Luke,
>
> If the transaction marker should be hidden, does it follow that aborted
> transaction at the end of the log should also be hidden for clients that
> are in read committed mode?
>
> Happy to do a KIP/PR.
>
> Thanks again,
>
> Chris
>
> On Tue, Mar 22, 2022 at 10:21 AM Luke Chen <sh...@gmail.com> wrote:
>
> > Hi Chris,
> >
> > Yes, the transaction marker should be hidden to clients.
> > There is similar issues reported:
> > https://issues.apache.org/jira/browse/KAFKA-10683
> > Welcome to submit KIP/PR to improve it.
> >
> > Thank you.
> > Luke
> >
> >
> > On Tue, Mar 22, 2022 at 5:16 PM Chris Jansen <chris.jansen@permutive.com
> >
> > wrote:
> >
> > > Hi Guozhang,
> > >
> > > Sorry, I should have been more clear. By "an unreadable end of the
> log",
> > I
> > > mean the `endOffsets` method returns an offset for a record that is
> never
> > > surfaced to the caller of `poll`.
> > >
> > > I've done some more digging and I think I understand why that is now.
> The
> > > API `endOffsets` calls tells the client, at a low level, what offset it
> > can
> > > read up to in the log. This may include a transaction marker or an
> > aborted
> > > transaction that gets delivered, but the client skips over when
> returning
> > > records via the `poll` method. I understand why the client must be told
> > > where to read up to, as the decision to skip chunks of the log must be
> > made
> > > by the client.
> > >
> > > What I was looking for when calling `endOffsets` was the last offset
> that
> > > would be surfaced via the `poll` method rather than where the low level
> > > client should read up to. If this is at all possible, it seems like
> this
> > > might require a broker change.
> > >
> > > Thanks
> > >
> > > Chris
> > >
> > > On Mon, Mar 21, 2022 at 5:38 PM Guozhang Wang <wa...@gmail.com>
> > wrote:
> > >
> > > > Hi Chris,
> > > >
> > > > What do you mean by "an unreadable end of the log"? Could you
> > elaborate?
> > > >
> > > > The version you used is recent enough, and the configs seems okay.
> So I
> > > > think there are some issues elsewhere.
> > > >
> > > > On Mon, Mar 21, 2022 at 5:51 AM Chris Jansen <
> > chris.jansen@permutive.com
> > > >
> > > > wrote:
> > > >
> > > > > So an update on this, it seems that the consumer reports an
> > unreadable
> > > > end
> > > > > of the log if the transaction has been aborted. Is this the
> intended
> > > > > behaviour?
> > > > >
> > > > > Thanks again,
> > > > >
> > > > > Chris
> > > > >
> > > > > On Mon, Mar 21, 2022 at 12:25 PM Chris Jansen <
> > > > chris.jansen@permutive.com>
> > > > > wrote:
> > > > >
> > > > > > Hi Guozhang,
> > > > > >
> > > > > > Thanks for getting back to me. I'm using Confluent's distribution
> > > > version
> > > > > > 6.2.2, so Kafka 2.8.1. Could there be some consumer or broker
> > > > > configuration
> > > > > > I'm missing that would change this behaviour?
> > > > > >
> > > > > > Just to confirm, here are my consumer settings:
> > > > > >
> > > > > > auto.offset.reset -> earliest, isolation.level -> read_committed,
> > > > > > group.instance.id -> <redacted>, group.id ->
> > > > > > ephemeral-6c5f8338-3d94-4296-b64a-d3a5a331117e, bootstrap.servers
> > ->
> > > > > > PLAINTEXT://localhost:58300, enable.auto.commit -> false
> > > > > >
> > > > > > Thanks again,
> > > > > >
> > > > > >
> > > > > > Chris
> > > > > >
> > > > > >
> > > > > > On Sun, Mar 20, 2022 at 12:46 AM Guozhang Wang <
> wangguoz@gmail.com
> > >
> > > > > wrote:
> > > > > >
> > > > > >> Hi Chris,
> > > > > >>
> > > > > >> The broker does take the isolation level in the ListOffset API
> > > (which
> > > > is
> > > > > >> used for the endoffset call) in to consideration:
> > > > > >>
> > > > > >> val lastFetchableOffset = isolationLevel match {
> > > > > >>   case Some(IsolationLevel.READ_COMMITTED) =>
> > > > localLog.lastStableOffset
> > > > > >>   case Some(IsolationLevel.READ_UNCOMMITTED) =>
> > > localLog.highWatermark
> > > > > >>   case None => localLog.logEndOffset
> > > > > >> }
> > > > > >>
> > > > > >>
> > > > > >> If it is READ_COMMITTED, it would only return the last stable
> > > offset.
> > > > > >>
> > > > > >> But this requires the broker version to be newer enough to
> > actually
> > > > > >> know the new format of the request, is your broker on the newer
> > > > > >> version? Other than that, I cannot think of a reason explaining
> > what
> > > > > >> you saw.
> > > > > >>
> > > > > >>
> > > > > >>
> > > > > >> On Wed, Mar 16, 2022 at 5:12 AM Chris Jansen <
> > > > > chris.jansen@permutive.com>
> > > > > >> wrote:
> > > > > >>
> > > > > >> > Hello all,
> > > > > >> >
> > > > > >> > I have the need to query end offsets for a particular topic
> > using
> > > a
> > > > > >> > consumer that has been configured with the "READ_COMMITTED"
> > > > isolation
> > > > > >> > level. The response I get via the Java client
> > > > > >> > includes offsets at the end of the log that have not yet been
> > > > > committed
> > > > > >> and
> > > > > >> > therefore can't be consumed.
> > > > > >> >
> > > > > >> > After digging around in the Java client code, it looks like
> the
> > > > > >> isolation
> > > > > >> > level is being transmitted in the API request to the broker.
> So
> > my
> > > > > >> question
> > > > > >> > is does the broker use this information when crafting a
> response
> > > > and,
> > > > > >> if it
> > > > > >> > is taken into account, why does it return log end offsets for
> > > > > >> transactions
> > > > > >> > that have not yet been committed?
> > > > > >> >
> > > > > >> > For my own future reference I was struggling to find any
> details
> > > on
> > > > > the
> > > > > >> > broker API anywhere, if someone could point me in the right
> > > > direction,
> > > > > >> I'd
> > > > > >> > really appreciate it.
> > > > > >> >
> > > > > >> > Thanks,
> > > > > >> >
> > > > > >> >
> > > > > >> > Chris Jansen
> > > > > >> >
> > > > > >>
> > > > > >>
> > > > > >> --
> > > > > >> -- Guozhang
> > > > > >>
> > > > > >
> > > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
>


-- 
-- Guozhang

Re: Transactions and `endOffsets` Java client consumer method

Posted by Chris Jansen <ch...@permutive.com>.
Thanks Luke,

If the transaction marker should be hidden, does it follow that aborted
transaction at the end of the log should also be hidden for clients that
are in read committed mode?

Happy to do a KIP/PR.

Thanks again,

Chris

On Tue, Mar 22, 2022 at 10:21 AM Luke Chen <sh...@gmail.com> wrote:

> Hi Chris,
>
> Yes, the transaction marker should be hidden to clients.
> There is similar issues reported:
> https://issues.apache.org/jira/browse/KAFKA-10683
> Welcome to submit KIP/PR to improve it.
>
> Thank you.
> Luke
>
>
> On Tue, Mar 22, 2022 at 5:16 PM Chris Jansen <ch...@permutive.com>
> wrote:
>
> > Hi Guozhang,
> >
> > Sorry, I should have been more clear. By "an unreadable end of the log",
> I
> > mean the `endOffsets` method returns an offset for a record that is never
> > surfaced to the caller of `poll`.
> >
> > I've done some more digging and I think I understand why that is now. The
> > API `endOffsets` calls tells the client, at a low level, what offset it
> can
> > read up to in the log. This may include a transaction marker or an
> aborted
> > transaction that gets delivered, but the client skips over when returning
> > records via the `poll` method. I understand why the client must be told
> > where to read up to, as the decision to skip chunks of the log must be
> made
> > by the client.
> >
> > What I was looking for when calling `endOffsets` was the last offset that
> > would be surfaced via the `poll` method rather than where the low level
> > client should read up to. If this is at all possible, it seems like this
> > might require a broker change.
> >
> > Thanks
> >
> > Chris
> >
> > On Mon, Mar 21, 2022 at 5:38 PM Guozhang Wang <wa...@gmail.com>
> wrote:
> >
> > > Hi Chris,
> > >
> > > What do you mean by "an unreadable end of the log"? Could you
> elaborate?
> > >
> > > The version you used is recent enough, and the configs seems okay. So I
> > > think there are some issues elsewhere.
> > >
> > > On Mon, Mar 21, 2022 at 5:51 AM Chris Jansen <
> chris.jansen@permutive.com
> > >
> > > wrote:
> > >
> > > > So an update on this, it seems that the consumer reports an
> unreadable
> > > end
> > > > of the log if the transaction has been aborted. Is this the intended
> > > > behaviour?
> > > >
> > > > Thanks again,
> > > >
> > > > Chris
> > > >
> > > > On Mon, Mar 21, 2022 at 12:25 PM Chris Jansen <
> > > chris.jansen@permutive.com>
> > > > wrote:
> > > >
> > > > > Hi Guozhang,
> > > > >
> > > > > Thanks for getting back to me. I'm using Confluent's distribution
> > > version
> > > > > 6.2.2, so Kafka 2.8.1. Could there be some consumer or broker
> > > > configuration
> > > > > I'm missing that would change this behaviour?
> > > > >
> > > > > Just to confirm, here are my consumer settings:
> > > > >
> > > > > auto.offset.reset -> earliest, isolation.level -> read_committed,
> > > > > group.instance.id -> <redacted>, group.id ->
> > > > > ephemeral-6c5f8338-3d94-4296-b64a-d3a5a331117e, bootstrap.servers
> ->
> > > > > PLAINTEXT://localhost:58300, enable.auto.commit -> false
> > > > >
> > > > > Thanks again,
> > > > >
> > > > >
> > > > > Chris
> > > > >
> > > > >
> > > > > On Sun, Mar 20, 2022 at 12:46 AM Guozhang Wang <wangguoz@gmail.com
> >
> > > > wrote:
> > > > >
> > > > >> Hi Chris,
> > > > >>
> > > > >> The broker does take the isolation level in the ListOffset API
> > (which
> > > is
> > > > >> used for the endoffset call) in to consideration:
> > > > >>
> > > > >> val lastFetchableOffset = isolationLevel match {
> > > > >>   case Some(IsolationLevel.READ_COMMITTED) =>
> > > localLog.lastStableOffset
> > > > >>   case Some(IsolationLevel.READ_UNCOMMITTED) =>
> > localLog.highWatermark
> > > > >>   case None => localLog.logEndOffset
> > > > >> }
> > > > >>
> > > > >>
> > > > >> If it is READ_COMMITTED, it would only return the last stable
> > offset.
> > > > >>
> > > > >> But this requires the broker version to be newer enough to
> actually
> > > > >> know the new format of the request, is your broker on the newer
> > > > >> version? Other than that, I cannot think of a reason explaining
> what
> > > > >> you saw.
> > > > >>
> > > > >>
> > > > >>
> > > > >> On Wed, Mar 16, 2022 at 5:12 AM Chris Jansen <
> > > > chris.jansen@permutive.com>
> > > > >> wrote:
> > > > >>
> > > > >> > Hello all,
> > > > >> >
> > > > >> > I have the need to query end offsets for a particular topic
> using
> > a
> > > > >> > consumer that has been configured with the "READ_COMMITTED"
> > > isolation
> > > > >> > level. The response I get via the Java client
> > > > >> > includes offsets at the end of the log that have not yet been
> > > > committed
> > > > >> and
> > > > >> > therefore can't be consumed.
> > > > >> >
> > > > >> > After digging around in the Java client code, it looks like the
> > > > >> isolation
> > > > >> > level is being transmitted in the API request to the broker. So
> my
> > > > >> question
> > > > >> > is does the broker use this information when crafting a response
> > > and,
> > > > >> if it
> > > > >> > is taken into account, why does it return log end offsets for
> > > > >> transactions
> > > > >> > that have not yet been committed?
> > > > >> >
> > > > >> > For my own future reference I was struggling to find any details
> > on
> > > > the
> > > > >> > broker API anywhere, if someone could point me in the right
> > > direction,
> > > > >> I'd
> > > > >> > really appreciate it.
> > > > >> >
> > > > >> > Thanks,
> > > > >> >
> > > > >> >
> > > > >> > Chris Jansen
> > > > >> >
> > > > >>
> > > > >>
> > > > >> --
> > > > >> -- Guozhang
> > > > >>
> > > > >
> > > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>

Re: Transactions and `endOffsets` Java client consumer method

Posted by Luke Chen <sh...@gmail.com>.
Hi Chris,

Yes, the transaction marker should be hidden to clients.
There is similar issues reported:
https://issues.apache.org/jira/browse/KAFKA-10683
Welcome to submit KIP/PR to improve it.

Thank you.
Luke


On Tue, Mar 22, 2022 at 5:16 PM Chris Jansen <ch...@permutive.com>
wrote:

> Hi Guozhang,
>
> Sorry, I should have been more clear. By "an unreadable end of the log", I
> mean the `endOffsets` method returns an offset for a record that is never
> surfaced to the caller of `poll`.
>
> I've done some more digging and I think I understand why that is now. The
> API `endOffsets` calls tells the client, at a low level, what offset it can
> read up to in the log. This may include a transaction marker or an aborted
> transaction that gets delivered, but the client skips over when returning
> records via the `poll` method. I understand why the client must be told
> where to read up to, as the decision to skip chunks of the log must be made
> by the client.
>
> What I was looking for when calling `endOffsets` was the last offset that
> would be surfaced via the `poll` method rather than where the low level
> client should read up to. If this is at all possible, it seems like this
> might require a broker change.
>
> Thanks
>
> Chris
>
> On Mon, Mar 21, 2022 at 5:38 PM Guozhang Wang <wa...@gmail.com> wrote:
>
> > Hi Chris,
> >
> > What do you mean by "an unreadable end of the log"? Could you elaborate?
> >
> > The version you used is recent enough, and the configs seems okay. So I
> > think there are some issues elsewhere.
> >
> > On Mon, Mar 21, 2022 at 5:51 AM Chris Jansen <chris.jansen@permutive.com
> >
> > wrote:
> >
> > > So an update on this, it seems that the consumer reports an unreadable
> > end
> > > of the log if the transaction has been aborted. Is this the intended
> > > behaviour?
> > >
> > > Thanks again,
> > >
> > > Chris
> > >
> > > On Mon, Mar 21, 2022 at 12:25 PM Chris Jansen <
> > chris.jansen@permutive.com>
> > > wrote:
> > >
> > > > Hi Guozhang,
> > > >
> > > > Thanks for getting back to me. I'm using Confluent's distribution
> > version
> > > > 6.2.2, so Kafka 2.8.1. Could there be some consumer or broker
> > > configuration
> > > > I'm missing that would change this behaviour?
> > > >
> > > > Just to confirm, here are my consumer settings:
> > > >
> > > > auto.offset.reset -> earliest, isolation.level -> read_committed,
> > > > group.instance.id -> <redacted>, group.id ->
> > > > ephemeral-6c5f8338-3d94-4296-b64a-d3a5a331117e, bootstrap.servers ->
> > > > PLAINTEXT://localhost:58300, enable.auto.commit -> false
> > > >
> > > > Thanks again,
> > > >
> > > >
> > > > Chris
> > > >
> > > >
> > > > On Sun, Mar 20, 2022 at 12:46 AM Guozhang Wang <wa...@gmail.com>
> > > wrote:
> > > >
> > > >> Hi Chris,
> > > >>
> > > >> The broker does take the isolation level in the ListOffset API
> (which
> > is
> > > >> used for the endoffset call) in to consideration:
> > > >>
> > > >> val lastFetchableOffset = isolationLevel match {
> > > >>   case Some(IsolationLevel.READ_COMMITTED) =>
> > localLog.lastStableOffset
> > > >>   case Some(IsolationLevel.READ_UNCOMMITTED) =>
> localLog.highWatermark
> > > >>   case None => localLog.logEndOffset
> > > >> }
> > > >>
> > > >>
> > > >> If it is READ_COMMITTED, it would only return the last stable
> offset.
> > > >>
> > > >> But this requires the broker version to be newer enough to actually
> > > >> know the new format of the request, is your broker on the newer
> > > >> version? Other than that, I cannot think of a reason explaining what
> > > >> you saw.
> > > >>
> > > >>
> > > >>
> > > >> On Wed, Mar 16, 2022 at 5:12 AM Chris Jansen <
> > > chris.jansen@permutive.com>
> > > >> wrote:
> > > >>
> > > >> > Hello all,
> > > >> >
> > > >> > I have the need to query end offsets for a particular topic using
> a
> > > >> > consumer that has been configured with the "READ_COMMITTED"
> > isolation
> > > >> > level. The response I get via the Java client
> > > >> > includes offsets at the end of the log that have not yet been
> > > committed
> > > >> and
> > > >> > therefore can't be consumed.
> > > >> >
> > > >> > After digging around in the Java client code, it looks like the
> > > >> isolation
> > > >> > level is being transmitted in the API request to the broker. So my
> > > >> question
> > > >> > is does the broker use this information when crafting a response
> > and,
> > > >> if it
> > > >> > is taken into account, why does it return log end offsets for
> > > >> transactions
> > > >> > that have not yet been committed?
> > > >> >
> > > >> > For my own future reference I was struggling to find any details
> on
> > > the
> > > >> > broker API anywhere, if someone could point me in the right
> > direction,
> > > >> I'd
> > > >> > really appreciate it.
> > > >> >
> > > >> > Thanks,
> > > >> >
> > > >> >
> > > >> > Chris Jansen
> > > >> >
> > > >>
> > > >>
> > > >> --
> > > >> -- Guozhang
> > > >>
> > > >
> > >
> >
> >
> > --
> > -- Guozhang
> >
>

Re: Transactions and `endOffsets` Java client consumer method

Posted by Chris Jansen <ch...@permutive.com>.
Hi Guozhang,

Sorry, I should have been more clear. By "an unreadable end of the log", I
mean the `endOffsets` method returns an offset for a record that is never
surfaced to the caller of `poll`.

I've done some more digging and I think I understand why that is now. The
API `endOffsets` calls tells the client, at a low level, what offset it can
read up to in the log. This may include a transaction marker or an aborted
transaction that gets delivered, but the client skips over when returning
records via the `poll` method. I understand why the client must be told
where to read up to, as the decision to skip chunks of the log must be made
by the client.

What I was looking for when calling `endOffsets` was the last offset that
would be surfaced via the `poll` method rather than where the low level
client should read up to. If this is at all possible, it seems like this
might require a broker change.

Thanks

Chris

On Mon, Mar 21, 2022 at 5:38 PM Guozhang Wang <wa...@gmail.com> wrote:

> Hi Chris,
>
> What do you mean by "an unreadable end of the log"? Could you elaborate?
>
> The version you used is recent enough, and the configs seems okay. So I
> think there are some issues elsewhere.
>
> On Mon, Mar 21, 2022 at 5:51 AM Chris Jansen <ch...@permutive.com>
> wrote:
>
> > So an update on this, it seems that the consumer reports an unreadable
> end
> > of the log if the transaction has been aborted. Is this the intended
> > behaviour?
> >
> > Thanks again,
> >
> > Chris
> >
> > On Mon, Mar 21, 2022 at 12:25 PM Chris Jansen <
> chris.jansen@permutive.com>
> > wrote:
> >
> > > Hi Guozhang,
> > >
> > > Thanks for getting back to me. I'm using Confluent's distribution
> version
> > > 6.2.2, so Kafka 2.8.1. Could there be some consumer or broker
> > configuration
> > > I'm missing that would change this behaviour?
> > >
> > > Just to confirm, here are my consumer settings:
> > >
> > > auto.offset.reset -> earliest, isolation.level -> read_committed,
> > > group.instance.id -> <redacted>, group.id ->
> > > ephemeral-6c5f8338-3d94-4296-b64a-d3a5a331117e, bootstrap.servers ->
> > > PLAINTEXT://localhost:58300, enable.auto.commit -> false
> > >
> > > Thanks again,
> > >
> > >
> > > Chris
> > >
> > >
> > > On Sun, Mar 20, 2022 at 12:46 AM Guozhang Wang <wa...@gmail.com>
> > wrote:
> > >
> > >> Hi Chris,
> > >>
> > >> The broker does take the isolation level in the ListOffset API (which
> is
> > >> used for the endoffset call) in to consideration:
> > >>
> > >> val lastFetchableOffset = isolationLevel match {
> > >>   case Some(IsolationLevel.READ_COMMITTED) =>
> localLog.lastStableOffset
> > >>   case Some(IsolationLevel.READ_UNCOMMITTED) => localLog.highWatermark
> > >>   case None => localLog.logEndOffset
> > >> }
> > >>
> > >>
> > >> If it is READ_COMMITTED, it would only return the last stable offset.
> > >>
> > >> But this requires the broker version to be newer enough to actually
> > >> know the new format of the request, is your broker on the newer
> > >> version? Other than that, I cannot think of a reason explaining what
> > >> you saw.
> > >>
> > >>
> > >>
> > >> On Wed, Mar 16, 2022 at 5:12 AM Chris Jansen <
> > chris.jansen@permutive.com>
> > >> wrote:
> > >>
> > >> > Hello all,
> > >> >
> > >> > I have the need to query end offsets for a particular topic using a
> > >> > consumer that has been configured with the "READ_COMMITTED"
> isolation
> > >> > level. The response I get via the Java client
> > >> > includes offsets at the end of the log that have not yet been
> > committed
> > >> and
> > >> > therefore can't be consumed.
> > >> >
> > >> > After digging around in the Java client code, it looks like the
> > >> isolation
> > >> > level is being transmitted in the API request to the broker. So my
> > >> question
> > >> > is does the broker use this information when crafting a response
> and,
> > >> if it
> > >> > is taken into account, why does it return log end offsets for
> > >> transactions
> > >> > that have not yet been committed?
> > >> >
> > >> > For my own future reference I was struggling to find any details on
> > the
> > >> > broker API anywhere, if someone could point me in the right
> direction,
> > >> I'd
> > >> > really appreciate it.
> > >> >
> > >> > Thanks,
> > >> >
> > >> >
> > >> > Chris Jansen
> > >> >
> > >>
> > >>
> > >> --
> > >> -- Guozhang
> > >>
> > >
> >
>
>
> --
> -- Guozhang
>

Re: Transactions and `endOffsets` Java client consumer method

Posted by Guozhang Wang <wa...@gmail.com>.
Hi Chris,

What do you mean by "an unreadable end of the log"? Could you elaborate?

The version you used is recent enough, and the configs seems okay. So I
think there are some issues elsewhere.

On Mon, Mar 21, 2022 at 5:51 AM Chris Jansen <ch...@permutive.com>
wrote:

> So an update on this, it seems that the consumer reports an unreadable end
> of the log if the transaction has been aborted. Is this the intended
> behaviour?
>
> Thanks again,
>
> Chris
>
> On Mon, Mar 21, 2022 at 12:25 PM Chris Jansen <ch...@permutive.com>
> wrote:
>
> > Hi Guozhang,
> >
> > Thanks for getting back to me. I'm using Confluent's distribution version
> > 6.2.2, so Kafka 2.8.1. Could there be some consumer or broker
> configuration
> > I'm missing that would change this behaviour?
> >
> > Just to confirm, here are my consumer settings:
> >
> > auto.offset.reset -> earliest, isolation.level -> read_committed,
> > group.instance.id -> <redacted>, group.id ->
> > ephemeral-6c5f8338-3d94-4296-b64a-d3a5a331117e, bootstrap.servers ->
> > PLAINTEXT://localhost:58300, enable.auto.commit -> false
> >
> > Thanks again,
> >
> >
> > Chris
> >
> >
> > On Sun, Mar 20, 2022 at 12:46 AM Guozhang Wang <wa...@gmail.com>
> wrote:
> >
> >> Hi Chris,
> >>
> >> The broker does take the isolation level in the ListOffset API (which is
> >> used for the endoffset call) in to consideration:
> >>
> >> val lastFetchableOffset = isolationLevel match {
> >>   case Some(IsolationLevel.READ_COMMITTED) => localLog.lastStableOffset
> >>   case Some(IsolationLevel.READ_UNCOMMITTED) => localLog.highWatermark
> >>   case None => localLog.logEndOffset
> >> }
> >>
> >>
> >> If it is READ_COMMITTED, it would only return the last stable offset.
> >>
> >> But this requires the broker version to be newer enough to actually
> >> know the new format of the request, is your broker on the newer
> >> version? Other than that, I cannot think of a reason explaining what
> >> you saw.
> >>
> >>
> >>
> >> On Wed, Mar 16, 2022 at 5:12 AM Chris Jansen <
> chris.jansen@permutive.com>
> >> wrote:
> >>
> >> > Hello all,
> >> >
> >> > I have the need to query end offsets for a particular topic using a
> >> > consumer that has been configured with the "READ_COMMITTED" isolation
> >> > level. The response I get via the Java client
> >> > includes offsets at the end of the log that have not yet been
> committed
> >> and
> >> > therefore can't be consumed.
> >> >
> >> > After digging around in the Java client code, it looks like the
> >> isolation
> >> > level is being transmitted in the API request to the broker. So my
> >> question
> >> > is does the broker use this information when crafting a response and,
> >> if it
> >> > is taken into account, why does it return log end offsets for
> >> transactions
> >> > that have not yet been committed?
> >> >
> >> > For my own future reference I was struggling to find any details on
> the
> >> > broker API anywhere, if someone could point me in the right direction,
> >> I'd
> >> > really appreciate it.
> >> >
> >> > Thanks,
> >> >
> >> >
> >> > Chris Jansen
> >> >
> >>
> >>
> >> --
> >> -- Guozhang
> >>
> >
>


-- 
-- Guozhang

Re: Transactions and `endOffsets` Java client consumer method

Posted by Chris Jansen <ch...@permutive.com>.
So an update on this, it seems that the consumer reports an unreadable end
of the log if the transaction has been aborted. Is this the intended
behaviour?

Thanks again,

Chris

On Mon, Mar 21, 2022 at 12:25 PM Chris Jansen <ch...@permutive.com>
wrote:

> Hi Guozhang,
>
> Thanks for getting back to me. I'm using Confluent's distribution version
> 6.2.2, so Kafka 2.8.1. Could there be some consumer or broker configuration
> I'm missing that would change this behaviour?
>
> Just to confirm, here are my consumer settings:
>
> auto.offset.reset -> earliest, isolation.level -> read_committed,
> group.instance.id -> <redacted>, group.id ->
> ephemeral-6c5f8338-3d94-4296-b64a-d3a5a331117e, bootstrap.servers ->
> PLAINTEXT://localhost:58300, enable.auto.commit -> false
>
> Thanks again,
>
>
> Chris
>
>
> On Sun, Mar 20, 2022 at 12:46 AM Guozhang Wang <wa...@gmail.com> wrote:
>
>> Hi Chris,
>>
>> The broker does take the isolation level in the ListOffset API (which is
>> used for the endoffset call) in to consideration:
>>
>> val lastFetchableOffset = isolationLevel match {
>>   case Some(IsolationLevel.READ_COMMITTED) => localLog.lastStableOffset
>>   case Some(IsolationLevel.READ_UNCOMMITTED) => localLog.highWatermark
>>   case None => localLog.logEndOffset
>> }
>>
>>
>> If it is READ_COMMITTED, it would only return the last stable offset.
>>
>> But this requires the broker version to be newer enough to actually
>> know the new format of the request, is your broker on the newer
>> version? Other than that, I cannot think of a reason explaining what
>> you saw.
>>
>>
>>
>> On Wed, Mar 16, 2022 at 5:12 AM Chris Jansen <ch...@permutive.com>
>> wrote:
>>
>> > Hello all,
>> >
>> > I have the need to query end offsets for a particular topic using a
>> > consumer that has been configured with the "READ_COMMITTED" isolation
>> > level. The response I get via the Java client
>> > includes offsets at the end of the log that have not yet been committed
>> and
>> > therefore can't be consumed.
>> >
>> > After digging around in the Java client code, it looks like the
>> isolation
>> > level is being transmitted in the API request to the broker. So my
>> question
>> > is does the broker use this information when crafting a response and,
>> if it
>> > is taken into account, why does it return log end offsets for
>> transactions
>> > that have not yet been committed?
>> >
>> > For my own future reference I was struggling to find any details on the
>> > broker API anywhere, if someone could point me in the right direction,
>> I'd
>> > really appreciate it.
>> >
>> > Thanks,
>> >
>> >
>> > Chris Jansen
>> >
>>
>>
>> --
>> -- Guozhang
>>
>

Re: Transactions and `endOffsets` Java client consumer method

Posted by Chris Jansen <ch...@permutive.com>.
Hi Guozhang,

Thanks for getting back to me. I'm using Confluent's distribution version
6.2.2, so Kafka 2.8.1. Could there be some consumer or broker configuration
I'm missing that would change this behaviour?

Just to confirm, here are my consumer settings:

auto.offset.reset -> earliest, isolation.level -> read_committed,
group.instance.id -> <redacted>, group.id ->
ephemeral-6c5f8338-3d94-4296-b64a-d3a5a331117e, bootstrap.servers ->
PLAINTEXT://localhost:58300, enable.auto.commit -> false

Thanks again,


Chris


On Sun, Mar 20, 2022 at 12:46 AM Guozhang Wang <wa...@gmail.com> wrote:

> Hi Chris,
>
> The broker does take the isolation level in the ListOffset API (which is
> used for the endoffset call) in to consideration:
>
> val lastFetchableOffset = isolationLevel match {
>   case Some(IsolationLevel.READ_COMMITTED) => localLog.lastStableOffset
>   case Some(IsolationLevel.READ_UNCOMMITTED) => localLog.highWatermark
>   case None => localLog.logEndOffset
> }
>
>
> If it is READ_COMMITTED, it would only return the last stable offset.
>
> But this requires the broker version to be newer enough to actually
> know the new format of the request, is your broker on the newer
> version? Other than that, I cannot think of a reason explaining what
> you saw.
>
>
>
> On Wed, Mar 16, 2022 at 5:12 AM Chris Jansen <ch...@permutive.com>
> wrote:
>
> > Hello all,
> >
> > I have the need to query end offsets for a particular topic using a
> > consumer that has been configured with the "READ_COMMITTED" isolation
> > level. The response I get via the Java client
> > includes offsets at the end of the log that have not yet been committed
> and
> > therefore can't be consumed.
> >
> > After digging around in the Java client code, it looks like the isolation
> > level is being transmitted in the API request to the broker. So my
> question
> > is does the broker use this information when crafting a response and, if
> it
> > is taken into account, why does it return log end offsets for
> transactions
> > that have not yet been committed?
> >
> > For my own future reference I was struggling to find any details on the
> > broker API anywhere, if someone could point me in the right direction,
> I'd
> > really appreciate it.
> >
> > Thanks,
> >
> >
> > Chris Jansen
> >
>
>
> --
> -- Guozhang
>

Re: Transactions and `endOffsets` Java client consumer method

Posted by Guozhang Wang <wa...@gmail.com>.
Hi Chris,

The broker does take the isolation level in the ListOffset API (which is
used for the endoffset call) in to consideration:

val lastFetchableOffset = isolationLevel match {
  case Some(IsolationLevel.READ_COMMITTED) => localLog.lastStableOffset
  case Some(IsolationLevel.READ_UNCOMMITTED) => localLog.highWatermark
  case None => localLog.logEndOffset
}


If it is READ_COMMITTED, it would only return the last stable offset.

But this requires the broker version to be newer enough to actually
know the new format of the request, is your broker on the newer
version? Other than that, I cannot think of a reason explaining what
you saw.



On Wed, Mar 16, 2022 at 5:12 AM Chris Jansen <ch...@permutive.com>
wrote:

> Hello all,
>
> I have the need to query end offsets for a particular topic using a
> consumer that has been configured with the "READ_COMMITTED" isolation
> level. The response I get via the Java client
> includes offsets at the end of the log that have not yet been committed and
> therefore can't be consumed.
>
> After digging around in the Java client code, it looks like the isolation
> level is being transmitted in the API request to the broker. So my question
> is does the broker use this information when crafting a response and, if it
> is taken into account, why does it return log end offsets for transactions
> that have not yet been committed?
>
> For my own future reference I was struggling to find any details on the
> broker API anywhere, if someone could point me in the right direction, I'd
> really appreciate it.
>
> Thanks,
>
>
> Chris Jansen
>


-- 
-- Guozhang