You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Luke Cwik via dev <de...@beam.apache.org> on 2022/09/01 16:48:00 UTC

Re: [JmsIO] => Pull Request to fix message acknowledgement issue

I have a better understanding of the problem after reviewing the doc and we
need to decide on what lifecycle scope we want the `Connection`, `Session`,
and `MessageConsumer` to have.

It looks like for the `Connection` we should try to have at most one
instance for the entire process per connection factory.
https://docs.oracle.com/javaee/7/api/javax/jms/Connection.html says that
the connection should be re-used. Having less connections would likely be
beneficial unless you think there would be a performance limitation of
using a single connection per process for all the messages?

For the `Session` and `MessageConsumer`, I'm not sure what lifecycle scope
it should have. Some ideas:
1. we could make it so that each `Session` is unique per checkpoint, e.g.
we hand off the ownership of the `Session` to the JmsCheckpointMark
everytime we checkpoint and create a new `Session` for the next set of
messages we receive. This would mean that we would also close the
`MessageConsumer` at every checkpoint and create a new one.
2. we could make it so that the `Session` is scoped to the reader
start/close and possibly multiple checkpoint marks and effectively close
the `Session` once the reader is closed and all checkpoint marks are
finalized/expired. We would close the `MessageConsumer` whenever the reader
is closed.
3. we could make it so that the `Session` is scoped to the `Connection` and
would only close it when the `Connection` closes.

1 seems pretty simple since the ownership of the `Session` is always owned
by a single distinct owner. This seems like it would make the most sense if
`Session` creation and management was cheap. Another positive is that once
the `Session` closes any messages that weren't acknowledged are returned
back to the queue and we will not have to wait for the reader to be closed
or all the checkpoint marks to be finalized.

What do you think?

On Mon, Aug 29, 2022 at 10:06 PM Jean-Baptiste Onofré <jb...@nanthrax.net>
wrote:

> Hi Vincent,
>
> thanks, I will take a look (as original JmsIO author ;)).
>
> Regards
> JB
>
> On Mon, Aug 29, 2022 at 6:43 PM BALLADA Vincent
> <vi...@renault.com> wrote:
> >
> > Hi all,
> >
> >
> >
> > Here is a PR related to the following issue (Runner acknowledges
> messages on closed session):
> >
> > https://github.com/apache/beam/issues/20814
> >
> >
> >
> > And here is a documentation explaining the fix:
> >
> >
> https://docs.google.com/document/d/19HiNPoJeIlzCFyWGdlw7WEFutceL2AmN_5Z0Vmi1s-I/edit?usp=sharing
> >
> >
> >
> > And finally the PR:
> >
> > https://github.com/apache/beam/pull/22932
> >
> >
> >
> > Regards
> >
> >
> >
> > Vincent BALLADA
> >
> >
> >
> >
> >
> >
> > Confidential C
> >
> > -- Disclaimer ------------------------------------
> > Ce message ainsi que les eventuelles pieces jointes constituent une
> correspondance privee et confidentielle a l'attention exclusive du
> destinataire designe ci-dessus. Si vous n'etes pas le destinataire du
> present message ou une personne susceptible de pouvoir le lui delivrer, il
> vous est signifie que toute divulgation, distribution ou copie de cette
> transmission est strictement interdite. Si vous avez recu ce message par
> erreur, nous vous remercions d'en informer l'expediteur par telephone ou de
> lui retourner le present message, puis d'effacer immediatement ce message
> de votre systeme.
> >
> > *** This e-mail and any attachments is a confidential correspondence
> intended only for use of the individual or entity named above. If you are
> not the intended recipient or the agent responsible for delivering the
> message to the intended recipient, you are hereby notified that any
> disclosure, distribution or copying of this communication is strictly
> prohibited. If you have received this communication in error, please notify
> the sender by phone or by replying this message, and then delete this
> message from your system.
>

Re: [JmsIO] => Pull Request to fix message acknowledgement issue

Posted by BALLADA Vincent via dev <de...@beam.apache.org>.
De : Luke Cwik <lc...@google.com>
Date : jeudi, 8 septembre 2022 à 19:17
À : BALLADA Vincent <vi...@renault.com>
Cc : dev@beam.apache.org <de...@beam.apache.org>
Objet : Re: [JmsIO] => Pull Request to fix message acknowledgement issue
[vwP6KQExYeP8ewAAAAASUVORK5CYII=]

[EXT]
Could we have more than one active checkpoint per reader instance?
Yes. Readers are saved and reused across multiple bundles. They aren't always closed at bundle boundaries.

Are we sure that all checkpoints are finalized when the reader is closed?
No, readers are closed after a certain period of time of inactivity. It is likely that all checkpoints will have expired or been finalized but it is not guaranteed by when the reader is closed for example in multi language pipelines the downstream processing in another language can delay committing the output to the runner which can lead to the readers being closed due to inactivity and then the checkpoint being finalized.

We could choose to hand off the session ownership to the JmsCheckpoint and create a new one. This way finalizing the checkpoint would own closing the session.




On Thu, Sep 8, 2022 at 8:01 AM BALLADA Vincent <vi...@renault.com>> wrote:
Hello Luke,

Thanks for your remarks.

Connection reuse
Concerning the use of a single connection fort the entire process per connection factory, that would mean that we would have one JMS connection per worker, and there may be a downside to do so:
If the broker is hosted into a multi-node cluster infrastructure, and if we want to consumer messages from all cluster nodes, we have to make sure that we have enough connections to be load balanced to all the nodes.
If for some reason (autoscaling, low backlog size) we have only one worker, we may not consume from all the cluster nodes.
As the number of connections is limited by the number of split/Readers, and as connections are opened/closed not so often (when workers are killed or created, or reader closes/started), I would suggest to keep the connection management as it is currently.

Session and consumer lifecycle


  1.  Session unique per checkpoint
Could we have more than one active checkpoint per reader instance?

Should we close the session/consumer and create new session/consumer at the end of finalizeCheckpoint? The goal here is to ensure that the message acknowledgement occurs before the session is closed.
If advance and finalizeCheckpoint can be called concurrently, we need to make sure that the session is active in “advance” in order to receive message.
Are we sure that all checkpoints are finalized when the reader is closed?


  1.  Session scoped to the reader start/close
It seems to be more or less the case currently.

Regards

Vincent BALLADA


De : Luke Cwik via dev <de...@beam.apache.org>>
Date : jeudi, 1 septembre 2022 à 18:48
À : dev <de...@beam.apache.org>>
Objet : Re: [JmsIO] => Pull Request to fix message acknowledgement issue
[vwP6KQExYeP8ewAAAAASUVORK5CYII=]

[EXT]
I have a better understanding of the problem after reviewing the doc and we need to decide on what lifecycle scope we want the `Connection`, `Session`, and `MessageConsumer` to have.

It looks like for the `Connection` we should try to have at most one instance for the entire process per connection factory. https://docs.oracle.com/javaee/7/api/javax/jms/Connection.html says that the connection should be re-used. Having less connections would likely be beneficial unless you think there would be a performance limitation of using a single connection per process for all the messages?

For the `Session` and `MessageConsumer`, I'm not sure what lifecycle scope it should have. Some ideas:
1. we could make it so that each `Session` is unique per checkpoint, e.g. we hand off the ownership of the `Session` to the JmsCheckpointMark everytime we checkpoint and create a new `Session` for the next set of messages we receive. This would mean that we would also close the `MessageConsumer` at every checkpoint and create a new one.
2. we could make it so that the `Session` is scoped to the reader start/close and possibly multiple checkpoint marks and effectively close the `Session` once the reader is closed and all checkpoint marks are finalized/expired. We would close the `MessageConsumer` whenever the reader is closed.
3. we could make it so that the `Session` is scoped to the `Connection` and would only close it when the `Connection` closes.

1 seems pretty simple since the ownership of the `Session` is always owned by a single distinct owner. This seems like it would make the most sense if `Session` creation and management was cheap. Another positive is that once the `Session` closes any messages that weren't acknowledged are returned back to the queue and we will not have to wait for the reader to be closed or all the checkpoint marks to be finalized.

What do you think?

On Mon, Aug 29, 2022 at 10:06 PM Jean-Baptiste Onofré <jb...@nanthrax.net>> wrote:
Hi Vincent,

thanks, I will take a look (as original JmsIO author ;)).

Regards
JB

On Mon, Aug 29, 2022 at 6:43 PM BALLADA Vincent
<vi...@renault.com>> wrote:
>
> Hi all,
>
>
>
> Here is a PR related to the following issue (Runner acknowledges messages on closed session):
>
> https://github.com/apache/beam/issues/20814
>
>
>
> And here is a documentation explaining the fix:
>
> https://docs.google.com/document/d/19HiNPoJeIlzCFyWGdlw7WEFutceL2AmN_5Z0Vmi1s-I/edit?usp=sharing
>
>
>
> And finally the PR:
>
> https://github.com/apache/beam/pull/22932
>
>
>
> Regards
>
>
>
> Vincent BALLADA
>
>
>
>
>
>
> Confidential C
>
> -- Disclaimer ------------------------------------
> Ce message ainsi que les eventuelles pieces jointes constituent une correspondance privee et confidentielle a l'attention exclusive du destinataire designe ci-dessus. Si vous n'etes pas le destinataire du present message ou une personne susceptible de pouvoir le lui delivrer, il vous est signifie que toute divulgation, distribution ou copie de cette transmission est strictement interdite. Si vous avez recu ce message par erreur, nous vous remercions d'en informer l'expediteur par telephone ou de lui retourner le present message, puis d'effacer immediatement ce message de votre systeme.
>
> *** This e-mail and any attachments is a confidential correspondence intended only for use of the individual or entity named above. If you are not the intended recipient or the agent responsible for delivering the message to the intended recipient, you are hereby notified that any disclosure, distribution or copying of this communication is strictly prohibited. If you have received this communication in error, please notify the sender by phone or by replying this message, and then delete this message from your system.



Confidential C

-- Disclaimer ------------------------------------
Ce message ainsi que les eventuelles pieces jointes constituent une correspondance privee et confidentielle a l'attention exclusive du destinataire designe ci-dessus. Si vous n'etes pas le destinataire du present message ou une personne susceptible de pouvoir le lui delivrer, il vous est signifie que toute divulgation, distribution ou copie de cette transmission est strictement interdite. Si vous avez recu ce message par erreur, nous vous remercions d'en informer l'expediteur par telephone ou de lui retourner le present message, puis d'effacer immediatement ce message de votre systeme.

*** This e-mail and any attachments is a confidential correspondence intended only for use of the individual or entity named above. If you are not the intended recipient or the agent responsible for delivering the message to the intended recipient, you are hereby notified that any disclosure, distribution or copying of this communication is strictly prohibited. If you have received this communication in error, please notify the sender by phone or by replying this message, and then delete this message from your system.



Confidential C
-- Disclaimer ------------------------------------ 
Ce message ainsi que les eventuelles pieces jointes constituent une correspondance privee et confidentielle a l'attention exclusive du destinataire designe ci-dessus. Si vous n'etes pas le destinataire du present message ou une personne susceptible de pouvoir le lui delivrer, il vous est signifie que toute divulgation, distribution ou copie de cette transmission est strictement interdite. Si vous avez recu ce message par erreur, nous vous remercions d'en informer l'expediteur par telephone ou de lui retourner le present message, puis d'effacer immediatement ce message de votre systeme.

*** This e-mail and any attachments is a confidential correspondence intended only for use of the individual or entity named above. If you are not the intended recipient or the agent responsible for delivering the message to the intended recipient, you are hereby notified that any disclosure, distribution or copying of this communication is strictly prohibited. If you have received this communication in error, please notify the sender by phone or by replying this message, and then delete this message from your system.

Re: [JmsIO] => Pull Request to fix message acknowledgement issue

Posted by Luke Cwik via dev <de...@beam.apache.org>.
Could we have more than one active checkpoint per reader instance?
Yes. Readers are saved and reused across multiple bundles. They aren't
always closed at bundle boundaries.

Are we sure that all checkpoints are finalized when the reader is closed?
No, readers are closed after a certain period of time of inactivity. It is
likely that all checkpoints will have expired or been finalized but it is
not guaranteed by when the reader is closed for example in multi
language pipelines the downstream processing in another language can delay
committing the output to the runner which can lead to the readers being
closed due to inactivity and then the checkpoint being finalized.

We could choose to hand off the session ownership to the JmsCheckpoint and
create a new one. This way finalizing the checkpoint would own closing the
session.




On Thu, Sep 8, 2022 at 8:01 AM BALLADA Vincent <vi...@renault.com>
wrote:

> Hello Luke,
>
>
>
> Thanks for your remarks.
>
>
>
> *Connection reuse*
>
> Concerning the use of a single connection fort the entire process per
> connection factory, that would mean that we would have one JMS connection
> per worker, and there may be a downside to do so:
>
> If the broker is hosted into a multi-node cluster infrastructure, and if
> we want to consumer messages from all cluster nodes, we have to make sure
> that we have enough connections to be load balanced to all the nodes.
>
> If for some reason (autoscaling, low backlog size) we have only one
> worker, we may not consume from all the cluster nodes.
>
> As the number of connections is limited by the number of split/Readers,
> and as connections are opened/closed not so often (when workers are killed
> or created, or reader closes/started), I would suggest to keep the
> connection management as it is currently.
>
>
>
> *Session and consumer lifecycle*
>
>
>
>    1. Session unique per checkpoint
>
> Could we have more than one active checkpoint per reader instance?
>
>
>
> Should we close the session/consumer and create new session/consumer at
> the end of finalizeCheckpoint? The goal here is to ensure that the message
> acknowledgement occurs before the session is closed.
>
> If advance and finalizeCheckpoint can be called concurrently, we need to
> make sure that the session is active in “advance” in order to receive
> message.
>
> Are we sure that all checkpoints are finalized when the reader is closed?
>
>
>
>    1. Session scoped to the reader start/close
>
> It seems to be more or less the case currently.
>
>
>
> Regards
>
>
>
> Vincent BALLADA
>
>
>
>
>
> *De : *Luke Cwik via dev <de...@beam.apache.org>
> *Date : *jeudi, 1 septembre 2022 à 18:48
> *À : *dev <de...@beam.apache.org>
> *Objet : *Re: [JmsIO] => Pull Request to fix message acknowledgement issue
>
> [image: vwP6KQExYeP8ewAAAAASUVORK5CYII=]
>
> [EXT]
>
> I have a better understanding of the problem after reviewing the doc and
> we need to decide on what lifecycle scope we want the `Connection`,
> `Session`, and `MessageConsumer` to have.
>
> It looks like for the `Connection` we should try to have at most one
> instance for the entire process per connection factory.
> https://docs.oracle.com/javaee/7/api/javax/jms/Connection.html says that
> the connection should be re-used. Having less connections would likely be
> beneficial unless you think there would be a performance limitation of
> using a single connection per process for all the messages?
>
> For the `Session` and `MessageConsumer`, I'm not sure what lifecycle scope
> it should have. Some ideas:
> 1. we could make it so that each `Session` is unique per checkpoint, e.g.
> we hand off the ownership of the `Session` to the JmsCheckpointMark
> everytime we checkpoint and create a new `Session` for the next set of
> messages we receive. This would mean that we would also close the
> `MessageConsumer` at every checkpoint and create a new one.
> 2. we could make it so that the `Session` is scoped to the reader
> start/close and possibly multiple checkpoint marks and effectively close
> the `Session` once the reader is closed and all checkpoint marks are
> finalized/expired. We would close the `MessageConsumer` whenever the reader
> is closed.
> 3. we could make it so that the `Session` is scoped to the `Connection`
> and would only close it when the `Connection` closes.
>
> 1 seems pretty simple since the ownership of the `Session` is always owned
> by a single distinct owner. This seems like it would make the most sense if
> `Session` creation and management was cheap. Another positive is that once
> the `Session` closes any messages that weren't acknowledged are returned
> back to the queue and we will not have to wait for the reader to be closed
> or all the checkpoint marks to be finalized.
>
> What do you think?
>
>
>
> On Mon, Aug 29, 2022 at 10:06 PM Jean-Baptiste Onofré <jb...@nanthrax.net>
> wrote:
>
> Hi Vincent,
>
> thanks, I will take a look (as original JmsIO author ;)).
>
> Regards
> JB
>
> On Mon, Aug 29, 2022 at 6:43 PM BALLADA Vincent
> <vi...@renault.com> wrote:
> >
> > Hi all,
> >
> >
> >
> > Here is a PR related to the following issue (Runner acknowledges
> messages on closed session):
> >
> > https://github.com/apache/beam/issues/20814
> >
> >
> >
> > And here is a documentation explaining the fix:
> >
> >
> https://docs.google.com/document/d/19HiNPoJeIlzCFyWGdlw7WEFutceL2AmN_5Z0Vmi1s-I/edit?usp=sharing
> >
> >
> >
> > And finally the PR:
> >
> > https://github.com/apache/beam/pull/22932
> >
> >
> >
> > Regards
> >
> >
> >
> > Vincent BALLADA
> >
> >
> >
> >
> >
> >
> > Confidential C
> >
> > -- Disclaimer ------------------------------------
> > Ce message ainsi que les eventuelles pieces jointes constituent une
> correspondance privee et confidentielle a l'attention exclusive du
> destinataire designe ci-dessus. Si vous n'etes pas le destinataire du
> present message ou une personne susceptible de pouvoir le lui delivrer, il
> vous est signifie que toute divulgation, distribution ou copie de cette
> transmission est strictement interdite. Si vous avez recu ce message par
> erreur, nous vous remercions d'en informer l'expediteur par telephone ou de
> lui retourner le present message, puis d'effacer immediatement ce message
> de votre systeme.
> >
> > *** This e-mail and any attachments is a confidential correspondence
> intended only for use of the individual or entity named above. If you are
> not the intended recipient or the agent responsible for delivering the
> message to the intended recipient, you are hereby notified that any
> disclosure, distribution or copying of this communication is strictly
> prohibited. If you have received this communication in error, please notify
> the sender by phone or by replying this message, and then delete this
> message from your system.
>
>
>
> Confidential C
>
> -- Disclaimer ------------------------------------
> Ce message ainsi que les eventuelles pieces jointes constituent une
> correspondance privee et confidentielle a l'attention exclusive du
> destinataire designe ci-dessus. Si vous n'etes pas le destinataire du
> present message ou une personne susceptible de pouvoir le lui delivrer, il
> vous est signifie que toute divulgation, distribution ou copie de cette
> transmission est strictement interdite. Si vous avez recu ce message par
> erreur, nous vous remercions d'en informer l'expediteur par telephone ou de
> lui retourner le present message, puis d'effacer immediatement ce message
> de votre systeme.
>
> *** This e-mail and any attachments is a confidential correspondence
> intended only for use of the individual or entity named above. If you are
> not the intended recipient or the agent responsible for delivering the
> message to the intended recipient, you are hereby notified that any
> disclosure, distribution or copying of this communication is strictly
> prohibited. If you have received this communication in error, please notify
> the sender by phone or by replying this message, and then delete this
> message from your system.
>

Re: [JmsIO] => Pull Request to fix message acknowledgement issue

Posted by BALLADA Vincent <vi...@renault.com>.
Hello Luke,

Thanks for your remarks.

Connection reuse
Concerning the use of a single connection fort the entire process per connection factory, that would mean that we would have one JMS connection per worker, and there may be a downside to do so:
If the broker is hosted into a multi-node cluster infrastructure, and if we want to consumer messages from all cluster nodes, we have to make sure that we have enough connections to be load balanced to all the nodes.
If for some reason (autoscaling, low backlog size) we have only one worker, we may not consume from all the cluster nodes.
As the number of connections is limited by the number of split/Readers, and as connections are opened/closed not so often (when workers are killed or created, or reader closes/started), I would suggest to keep the connection management as it is currently.

Session and consumer lifecycle


  1.  Session unique per checkpoint
Could we have more than one active checkpoint per reader instance?

Should we close the session/consumer and create new session/consumer at the end of finalizeCheckpoint? The goal here is to ensure that the message acknowledgement occurs before the session is closed.
If advance and finalizeCheckpoint can be called concurrently, we need to make sure that the session is active in “advance” in order to receive message.
Are we sure that all checkpoints are finalized when the reader is closed?


  1.  Session scoped to the reader start/close
It seems to be more or less the case currently.

Regards

Vincent BALLADA


De : Luke Cwik via dev <de...@beam.apache.org>
Date : jeudi, 1 septembre 2022 à 18:48
À : dev <de...@beam.apache.org>
Objet : Re: [JmsIO] => Pull Request to fix message acknowledgement issue
[vwP6KQExYeP8ewAAAAASUVORK5CYII=]

[EXT]
I have a better understanding of the problem after reviewing the doc and we need to decide on what lifecycle scope we want the `Connection`, `Session`, and `MessageConsumer` to have.

It looks like for the `Connection` we should try to have at most one instance for the entire process per connection factory. https://docs.oracle.com/javaee/7/api/javax/jms/Connection.html says that the connection should be re-used. Having less connections would likely be beneficial unless you think there would be a performance limitation of using a single connection per process for all the messages?

For the `Session` and `MessageConsumer`, I'm not sure what lifecycle scope it should have. Some ideas:
1. we could make it so that each `Session` is unique per checkpoint, e.g. we hand off the ownership of the `Session` to the JmsCheckpointMark everytime we checkpoint and create a new `Session` for the next set of messages we receive. This would mean that we would also close the `MessageConsumer` at every checkpoint and create a new one.
2. we could make it so that the `Session` is scoped to the reader start/close and possibly multiple checkpoint marks and effectively close the `Session` once the reader is closed and all checkpoint marks are finalized/expired. We would close the `MessageConsumer` whenever the reader is closed.
3. we could make it so that the `Session` is scoped to the `Connection` and would only close it when the `Connection` closes.

1 seems pretty simple since the ownership of the `Session` is always owned by a single distinct owner. This seems like it would make the most sense if `Session` creation and management was cheap. Another positive is that once the `Session` closes any messages that weren't acknowledged are returned back to the queue and we will not have to wait for the reader to be closed or all the checkpoint marks to be finalized.

What do you think?

On Mon, Aug 29, 2022 at 10:06 PM Jean-Baptiste Onofré <jb...@nanthrax.net>> wrote:
Hi Vincent,

thanks, I will take a look (as original JmsIO author ;)).

Regards
JB

On Mon, Aug 29, 2022 at 6:43 PM BALLADA Vincent
<vi...@renault.com>> wrote:
>
> Hi all,
>
>
>
> Here is a PR related to the following issue (Runner acknowledges messages on closed session):
>
> https://github.com/apache/beam/issues/20814
>
>
>
> And here is a documentation explaining the fix:
>
> https://docs.google.com/document/d/19HiNPoJeIlzCFyWGdlw7WEFutceL2AmN_5Z0Vmi1s-I/edit?usp=sharing
>
>
>
> And finally the PR:
>
> https://github.com/apache/beam/pull/22932
>
>
>
> Regards
>
>
>
> Vincent BALLADA
>
>
>
>
>
>
> Confidential C
>
> -- Disclaimer ------------------------------------
> Ce message ainsi que les eventuelles pieces jointes constituent une correspondance privee et confidentielle a l'attention exclusive du destinataire designe ci-dessus. Si vous n'etes pas le destinataire du present message ou une personne susceptible de pouvoir le lui delivrer, il vous est signifie que toute divulgation, distribution ou copie de cette transmission est strictement interdite. Si vous avez recu ce message par erreur, nous vous remercions d'en informer l'expediteur par telephone ou de lui retourner le present message, puis d'effacer immediatement ce message de votre systeme.
>
> *** This e-mail and any attachments is a confidential correspondence intended only for use of the individual or entity named above. If you are not the intended recipient or the agent responsible for delivering the message to the intended recipient, you are hereby notified that any disclosure, distribution or copying of this communication is strictly prohibited. If you have received this communication in error, please notify the sender by phone or by replying this message, and then delete this message from your system.



Confidential C
-- Disclaimer ------------------------------------ 
Ce message ainsi que les eventuelles pieces jointes constituent une correspondance privee et confidentielle a l'attention exclusive du destinataire designe ci-dessus. Si vous n'etes pas le destinataire du present message ou une personne susceptible de pouvoir le lui delivrer, il vous est signifie que toute divulgation, distribution ou copie de cette transmission est strictement interdite. Si vous avez recu ce message par erreur, nous vous remercions d'en informer l'expediteur par telephone ou de lui retourner le present message, puis d'effacer immediatement ce message de votre systeme.

*** This e-mail and any attachments is a confidential correspondence intended only for use of the individual or entity named above. If you are not the intended recipient or the agent responsible for delivering the message to the intended recipient, you are hereby notified that any disclosure, distribution or copying of this communication is strictly prohibited. If you have received this communication in error, please notify the sender by phone or by replying this message, and then delete this message from your system.