You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Marco Robles <ma...@wizeline.com> on 2021/09/02 19:40:04 UTC

Re: Help/Inputs with PulsarIO connector (GetSizeVSHasProgress)

Hi folks,

Still working with PulsarIO, I am kind of stuck trying to define the Size
and progress
<https://beam.apache.org/documentation/programming-guide/#sizing-and-progress>
for the SDF, I found the following info in beam javadoc
<https://beam.apache.org/releases/javadoc/2.32.0/org/apache/beam/sdk/transforms/DoFn.ProcessElement.html>,
which says that SDF may define a *GetSize* method or at least ensure the
*RestrictionTracker* implements *HasProgress*. For a pub/sub technology,
would you recommend using *HasProgress* rather than *GetSize* (which
somehow it will be an estimate of how much work would be processed)?

For the HasProgress, I supposed it should be implemented in the @NewTracker
method, right? Does anyone have experience using HasProgress?

Thanks in advance.

On Mon, Aug 16, 2021 at 11:49 AM Marco Robles <ma...@wizeline.com>
wrote:

> Does the client buffer/reserve any messages?
> - Pulsar stores the messages that haven't been acknowledged
> <https://pulsar.apache.org/docs/en/cookbooks-retention-expiry/#retention-policies>.
> And you acknowledged the message by calling consumer.acknowledge(msg);
>
> On Fri, Aug 13, 2021 at 6:58 PM Luke Cwik <lc...@google.com> wrote:
>
>> Does the client buffer/reserve any messages?
>> For example, you call consumer.receive() so it goes and fetches the next
>> 100 messages assuming that you'll want them as well so the next 99
>> consumer.receive() calls will be fast.
>>
>> On Fri, Aug 13, 2021 at 4:16 PM Marco Robles Pulido <
>> marco.robles@wizeline.com> wrote:
>>
>>>
>>>
>>>> I am taking in consideration to use this approach for the PulsarIO -
>>>> SDF implementation, in which I pull N messages or use the default size
>>>> for the queue message in apache pulsar
>>>> <https://pulsar.apache.org/docs/en/concepts-messaging/#consumers> (which
>>>> is 1000) and keep adding to "N" as many messages are still being received
>>>> in the queue with help of the client listener
>>>> <https://pulsar.apache.org/docs/en/concepts-messaging/#listeners>,  so
>>>> every time the listener is invoked will do something like N + 1.
>>>>
>>>
>>> Does the client "pull" the messages when you create the client or does
>>> it "pull" the messages when you perform an explicit API call like
>>> "getMessages"?
>>>
>>>  - The client "pull" the messages when you perform *consumer.receive()*,
>>> you can see here a basic example of a Simple Consumer client
>>> <https://github.com/streamnative/streamnative-academy/blob/master/src/main/java/sn/academy/consumers/SimpleConsumer.java>,
>>> so each time consumer.receive() is called, a message is dequeued from the
>>> buffer. As well there is a way to keep listening messages using a listener
>>> as is shown in this client example
>>> <https://pulsar.apache.org/docs/en/client-libraries-java/#consumer> (see
>>> MessageListener example, above *Configure consumer* section)
>>>
>>> On Fri, Aug 13, 2021 at 5:14 PM Luke Cwik <lc...@google.com> wrote:
>>>
>>>>
>>>>
>>>> On Thu, Aug 12, 2021 at 3:31 PM Marco Robles Pulido <
>>>> marco.robles@wizeline.com> wrote:
>>>>
>>>>> Sorry It was sent without finishing the email.
>>>>>
>>>>> Hi,
>>>>>  So regarding to Luke comments for the first approach
>>>>> 1) Occasionally poll to see how many messages are still in the queue
>>>>> ahead of you so you can report the remaining work as 1 /
>>>>> numberOfInitialSplits * numOutstandngMessages
>>>>>     - What do you mean for numOutstandingMessages? The messages that
>>>>> are still pending in the queue, then using the above formula, it will give
>>>>> you the remaining work that can be used for the restriction in which the
>>>>> initial restriction for SDF will be like,
>>>>> [0, 1 / numberOfInitialSplits * numOutstandngMessages)
>>>>>
>>>>
>>>> Yes.
>>>>
>>>>
>>>>>
>>>>> I am taking in consideration to use this approach for the PulsarIO -
>>>>> SDF implementation, in which I pull N messages or use the default
>>>>> size for the queue message in apache pulsar
>>>>> <https://pulsar.apache.org/docs/en/concepts-messaging/#consumers> (which
>>>>> is 1000) and keep adding to "N" as many messages are still being received
>>>>> in the queue with help of the client listener
>>>>> <https://pulsar.apache.org/docs/en/concepts-messaging/#listeners>,  so
>>>>> every time the listener is invoked will do something like N + 1.
>>>>>
>>>>
>>>> Does the client "pull" the messages when you create the client or does
>>>> it "pull" the messages when you perform an explicit API call like
>>>> "getMessages"?
>>>>
>>>>
>>>>>
>>>>> Thanks in advance
>>>>>
>>>>> On Thu, Aug 12, 2021 at 5:18 PM Marco Robles Pulido <
>>>>> marco.robles@wizeline.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> So regarding to Luke comments for the first approach
>>>>>> 1) Occasionally poll to see how many messages are still in the queue
>>>>>> ahead of you so you can report the remaining work as 1 /
>>>>>> numberOfInitialSplits * numOutstandngMessages
>>>>>>     - What do you mean for numOutstandingMessages? The messages that
>>>>>> are still pending in the queue, then using the above formula, it will give
>>>>>> you the remaining work that can be used for the restriction in which the
>>>>>> initial restriction for SDF will be like,
>>>>>>
>>>>>> On Wed, Aug 4, 2021 at 11:02 AM Luke Cwik <lc...@google.com> wrote:
>>>>>>
>>>>>>> Your research into the SDF Kafka implementation seems spot on.
>>>>>>>
>>>>>>> I took a quick look at the links you had provided and for
>>>>>>> partitioned topics it looks like you don't have a choice where a Consumer
>>>>>>> is able to resume from as you have a typical get message and ack scheme
>>>>>>> client. In this kind of setup for an initial implementation it is best if
>>>>>>> you can:
>>>>>>> 1) Occasionally poll to see how many messages are still in the queue
>>>>>>> ahead of you so you can report the remaining work as 1 /
>>>>>>> numberOfInitialSplits * numOutstandngMessages
>>>>>>> 2) Use a pull model for messages (e.g. request N messages and output
>>>>>>> them all). This prevents an issue where the client library instances
>>>>>>> effectively are holding onto unprocessed messages while the bundle isn't
>>>>>>> being processed.
>>>>>>> 3) Only support checkpointing in the RestrictionTracker (adding
>>>>>>> support for dynamic splitting would be great but no runner would exercise
>>>>>>> it right now in a streaming pipeline)
>>>>>>>
>>>>>>> It looks like the above would work for both the multi-partition and
>>>>>>> single partition scenarios and still could parallelize to the capacity of
>>>>>>> what the brokers could handle. Note that in the future you could still have
>>>>>>> a single SDF implementation that handles two types of restrictions one
>>>>>>> being the Consumer based one and the other being the Reader based one (See
>>>>>>> Watch.java[1] for a growing and nongrowing restriction for what I mean by
>>>>>>> having different branching logic). In the future you would update the
>>>>>>> initial splitting logic to check whether the broker has a single partition
>>>>>>> and then you could create "Reader" restrictions but this would only be
>>>>>>> useful if you felt as though there was something to be gained from using
>>>>>>> it. For the Reader based interface:
>>>>>>> 4) Do you expect the user to supply the message id for the first
>>>>>>> message? (if so is there a way to partition the message id space? (e.g. in
>>>>>>> Kafka the id is a number that increments and you know where you are and can
>>>>>>> poll for the latest id so you can split the numerical range easily))
>>>>>>> 5) What value do you see it providing?
>>>>>>>
>>>>>>> 1:
>>>>>>> https://github.com/apache/beam/blob/03a1cca42ceeec2e963ec14c9bc344956a8683b3/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java#L885
>>>>>>>
>>>>>>> On Tue, Aug 3, 2021 at 1:17 PM Marco Robles Pulido <
>>>>>>> marco.robles@wizeline.com> wrote:
>>>>>>>
>>>>>>>> Hi folks,
>>>>>>>>
>>>>>>>> I am working with the new PulsarIO connector with Beam, and most of
>>>>>>>> my work has been in researching how Pulsar works, as many of you know we
>>>>>>>> already have KafkaIO connector which is kind of similar to Pulsar but there
>>>>>>>> is some difference that I have found during my research and I would like to
>>>>>>>> know your input in how would you handle the implementation for SDF. Here
>>>>>>>> are my main concerns:
>>>>>>>> - As you may know kafka handles by default partitioned topics where
>>>>>>>> each message within the partition gets an incremental id, called offset.
>>>>>>>> Having this in mind SDF implementation for kafka works something like this,
>>>>>>>> where the element to evaluate is the topic/partition and the restrictions
>>>>>>>> are the start and end offsets.
>>>>>>>> - For Pulsar, partitioned topics are optional
>>>>>>>> <https://pulsar.apache.org/docs/en/concepts-messaging/#partitioned-topics> or
>>>>>>>> well by default are handled by single broker, there is a possibility where
>>>>>>>> you can use the partitioned topics, but you will limit the final user to
>>>>>>>> use only partitioned topics with pulsar, as well, there is a possibility
>>>>>>>> to manually handle cursors
>>>>>>>> <https://pulsar.apache.org/docs/en/2.5.1/concepts-clients/#reader-interface>
>>>>>>>> which will be the earliest and latest message available that may be used as
>>>>>>>> restrictions (but implementing this will not allow to use partitioned
>>>>>>>> topics). So with this in mind I was thinking there should be two
>>>>>>>> implementations one that use partitioned topics with pulsar and the other
>>>>>>>> one that manually handle cursors.
>>>>>>>>
>>>>>>>> So, let me know your ideas/input about it. And maybe If i am wrong
>>>>>>>> help to clarify the SDF restrictions for KafkaIO.
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>>
>>>>>>>> --
>>>>>>>>
>>>>>>>> *Marco Robles* *|* WIZELINE
>>>>>>>>
>>>>>>>> Software Engineer
>>>>>>>>
>>>>>>>> marco.robles@wizeline.com
>>>>>>>>
>>>>>>>> Amado Nervo 2200, Esfera P6, Col. Ciudad del Sol, 45050 Zapopan,
>>>>>>>> Jal.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> *This email and its contents (including any attachments) are being
>>>>>>>> sent toyou on the condition of confidentiality and may be protected by
>>>>>>>> legalprivilege. Access to this email by anyone other than the intended
>>>>>>>> recipientis unauthorized. If you are not the intended recipient, please
>>>>>>>> immediatelynotify the sender by replying to this message and delete the
>>>>>>>> materialimmediately from your system. Any further use, dissemination,
>>>>>>>> distributionor reproduction of this email is strictly prohibited. Further,
>>>>>>>> norepresentation is made with respect to any content contained in this
>>>>>>>> email.*
>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>>
>>>>>> *Marco Robles* *|* WIZELINE
>>>>>>
>>>>>> Software Engineer
>>>>>>
>>>>>> marco.robles@wizeline.com
>>>>>>
>>>>>> Amado Nervo 2200, Esfera P6, Col. Ciudad del Sol, 45050 Zapopan, Jal.
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> *Marco Robles* *|* WIZELINE
>>>>>
>>>>> Software Engineer
>>>>>
>>>>> marco.robles@wizeline.com
>>>>>
>>>>> Amado Nervo 2200, Esfera P6, Col. Ciudad del Sol, 45050 Zapopan, Jal.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> *This email and its contents (including any attachments) are being
>>>>> sent toyou on the condition of confidentiality and may be protected by
>>>>> legalprivilege. Access to this email by anyone other than the intended
>>>>> recipientis unauthorized. If you are not the intended recipient, please
>>>>> immediatelynotify the sender by replying to this message and delete the
>>>>> materialimmediately from your system. Any further use, dissemination,
>>>>> distributionor reproduction of this email is strictly prohibited. Further,
>>>>> norepresentation is made with respect to any content contained in this
>>>>> email.*
>>>>
>>>>
>>>
>>> --
>>>
>>> *Marco Robles* *|* WIZELINE
>>>
>>> Software Engineer
>>>
>>> marco.robles@wizeline.com
>>>
>>> Amado Nervo 2200, Esfera P6, Col. Ciudad del Sol, 45050 Zapopan, Jal.
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> *This email and its contents (including any attachments) are being sent
>>> toyou on the condition of confidentiality and may be protected by
>>> legalprivilege. Access to this email by anyone other than the intended
>>> recipientis unauthorized. If you are not the intended recipient, please
>>> immediatelynotify the sender by replying to this message and delete the
>>> materialimmediately from your system. Any further use, dissemination,
>>> distributionor reproduction of this email is strictly prohibited. Further,
>>> norepresentation is made with respect to any content contained in this
>>> email.*
>>
>>
>
> --
>
> *Marco Robles* *|* WIZELINE
>
> Software Engineer
>
> marco.robles@wizeline.com
>
> Amado Nervo 2200, Esfera P6, Col. Ciudad del Sol, 45050 Zapopan, Jal.
>


-- 

*Marco Robles* *|* WIZELINE

Software Engineer

marco.robles@wizeline.com

Amado Nervo 2200, Esfera P6, Col. Ciudad del Sol, 45050 Zapopan, Jal.

-- 
*This email and its contents (including any attachments) are being sent to
you on the condition of confidentiality and may be protected by legal
privilege. Access to this email by anyone other than the intended recipient
is unauthorized. If you are not the intended recipient, please immediately
notify the sender by replying to this message and delete the material
immediately from your system. Any further use, dissemination, distribution
or reproduction of this email is strictly prohibited. Further, no
representation is made with respect to any content contained in this email.*

Re: Help/Inputs with PulsarIO connector (GetSizeVSHasProgress)

Posted by Luke Cwik <lc...@google.com>.
See these for example implementations:
https://github.com/apache/beam/blob/8367b4370973b018c988d8c1cfa993e6de817cae/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java#L141
https://github.com/apache/beam/blob/8367b4370973b018c988d8c1cfa993e6de817cae/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/ByteKeyRangeTracker.java#L234

The framework uses it and it is not expected to be invoked directly by the
pipeline author. For starting points to see where it is being used see:
https://github.com/apache/beam/blob/8367b4370973b018c988d8c1cfa993e6de817cae/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java#L1128
https://github.com/apache/beam/blob/8367b4370973b018c988d8c1cfa993e6de817cae/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java#L1376

On Wed, Sep 8, 2021 at 10:09 AM Marco Robles <ma...@wizeline.com>
wrote:

> Hi,
>
> Is there any example of how to implement HasProgress or how it is being
> used?
>
> Thanks
>
> On Fri, Sep 3, 2021 at 4:39 PM Luke Cwik <lc...@google.com> wrote:
>
>> GetSize delegates to HasProgress if it isn't implemented and reports the
>> work remaining as the size.
>>
>> HasProgress is strictly more powerful since it can report the size when
>> the restriction isn't being processed and also report work
>> completed/remaining when the restriction is actively being processed.
>>
>> On Thu, Sep 2, 2021 at 12:40 PM Marco Robles <ma...@wizeline.com>
>> wrote:
>>
>>> Hi folks,
>>>
>>> Still working with PulsarIO, I am kind of stuck trying to define the Size
>>> and progress
>>> <https://beam.apache.org/documentation/programming-guide/#sizing-and-progress>
>>> for the SDF, I found the following info in beam javadoc
>>> <https://beam.apache.org/releases/javadoc/2.32.0/org/apache/beam/sdk/transforms/DoFn.ProcessElement.html>,
>>> which says that SDF may define a *GetSize* method or at least ensure
>>> the *RestrictionTracker* implements *HasProgress*. For a pub/sub
>>> technology, would you recommend using *HasProgress* rather than
>>> *GetSize* (which somehow it will be an estimate of how much work would
>>> be processed)?
>>>
>>> For the HasProgress, I supposed it should be implemented in
>>> the @NewTracker method, right? Does anyone have experience using
>>> HasProgress?
>>>
>>> Thanks in advance.
>>>
>>> On Mon, Aug 16, 2021 at 11:49 AM Marco Robles <ma...@wizeline.com>
>>> wrote:
>>>
>>>> Does the client buffer/reserve any messages?
>>>> - Pulsar stores the messages that haven't been acknowledged
>>>> <https://pulsar.apache.org/docs/en/cookbooks-retention-expiry/#retention-policies>.
>>>> And you acknowledged the message by calling consumer.acknowledge(msg);
>>>>
>>>> On Fri, Aug 13, 2021 at 6:58 PM Luke Cwik <lc...@google.com> wrote:
>>>>
>>>>> Does the client buffer/reserve any messages?
>>>>> For example, you call consumer.receive() so it goes and fetches the
>>>>> next 100 messages assuming that you'll want them as well so the next 99
>>>>> consumer.receive() calls will be fast.
>>>>>
>>>>> On Fri, Aug 13, 2021 at 4:16 PM Marco Robles Pulido <
>>>>> marco.robles@wizeline.com> wrote:
>>>>>
>>>>>>
>>>>>>
>>>>>>> I am taking in consideration to use this approach for the PulsarIO -
>>>>>>> SDF implementation, in which I pull N messages or use the default
>>>>>>> size for the queue message in apache pulsar
>>>>>>> <https://pulsar.apache.org/docs/en/concepts-messaging/#consumers> (which
>>>>>>> is 1000) and keep adding to "N" as many messages are still being received
>>>>>>> in the queue with help of the client listener
>>>>>>> <https://pulsar.apache.org/docs/en/concepts-messaging/#listeners>,  so
>>>>>>> every time the listener is invoked will do something like N + 1.
>>>>>>>
>>>>>>
>>>>>> Does the client "pull" the messages when you create the client or
>>>>>> does it "pull" the messages when you perform an explicit API call like
>>>>>> "getMessages"?
>>>>>>
>>>>>>  - The client "pull" the messages when you perform
>>>>>> *consumer.receive()*, you can see here a basic example of a Simple
>>>>>> Consumer client
>>>>>> <https://github.com/streamnative/streamnative-academy/blob/master/src/main/java/sn/academy/consumers/SimpleConsumer.java>,
>>>>>> so each time consumer.receive() is called, a message is dequeued from the
>>>>>> buffer. As well there is a way to keep listening messages using a listener
>>>>>> as is shown in this client example
>>>>>> <https://pulsar.apache.org/docs/en/client-libraries-java/#consumer> (see
>>>>>> MessageListener example, above *Configure consumer* section)
>>>>>>
>>>>>> On Fri, Aug 13, 2021 at 5:14 PM Luke Cwik <lc...@google.com> wrote:
>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Aug 12, 2021 at 3:31 PM Marco Robles Pulido <
>>>>>>> marco.robles@wizeline.com> wrote:
>>>>>>>
>>>>>>>> Sorry It was sent without finishing the email.
>>>>>>>>
>>>>>>>> Hi,
>>>>>>>>  So regarding to Luke comments for the first approach
>>>>>>>> 1) Occasionally poll to see how many messages are still in the
>>>>>>>> queue ahead of you so you can report the remaining work as 1 /
>>>>>>>> numberOfInitialSplits * numOutstandngMessages
>>>>>>>>     - What do you mean for numOutstandingMessages? The messages
>>>>>>>> that are still pending in the queue, then using the above formula, it will
>>>>>>>> give you the remaining work that can be used for the restriction in which
>>>>>>>> the initial restriction for SDF will be like,
>>>>>>>> [0, 1 / numberOfInitialSplits * numOutstandngMessages)
>>>>>>>>
>>>>>>>
>>>>>>> Yes.
>>>>>>>
>>>>>>>
>>>>>>>>
>>>>>>>> I am taking in consideration to use this approach for the PulsarIO
>>>>>>>> - SDF implementation, in which I pull N messages or use the
>>>>>>>> default size for the queue message in apache pulsar
>>>>>>>> <https://pulsar.apache.org/docs/en/concepts-messaging/#consumers> (which
>>>>>>>> is 1000) and keep adding to "N" as many messages are still being received
>>>>>>>> in the queue with help of the client listener
>>>>>>>> <https://pulsar.apache.org/docs/en/concepts-messaging/#listeners>,  so
>>>>>>>> every time the listener is invoked will do something like N + 1.
>>>>>>>>
>>>>>>>
>>>>>>> Does the client "pull" the messages when you create the client or
>>>>>>> does it "pull" the messages when you perform an explicit API call like
>>>>>>> "getMessages"?
>>>>>>>
>>>>>>>
>>>>>>>>
>>>>>>>> Thanks in advance
>>>>>>>>
>>>>>>>> On Thu, Aug 12, 2021 at 5:18 PM Marco Robles Pulido <
>>>>>>>> marco.robles@wizeline.com> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> So regarding to Luke comments for the first approach
>>>>>>>>> 1) Occasionally poll to see how many messages are still in the
>>>>>>>>> queue ahead of you so you can report the remaining work as 1 /
>>>>>>>>> numberOfInitialSplits * numOutstandngMessages
>>>>>>>>>     - What do you mean for numOutstandingMessages? The messages
>>>>>>>>> that are still pending in the queue, then using the above formula, it will
>>>>>>>>> give you the remaining work that can be used for the restriction in which
>>>>>>>>> the initial restriction for SDF will be like,
>>>>>>>>>
>>>>>>>>> On Wed, Aug 4, 2021 at 11:02 AM Luke Cwik <lc...@google.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Your research into the SDF Kafka implementation seems spot on.
>>>>>>>>>>
>>>>>>>>>> I took a quick look at the links you had provided and for
>>>>>>>>>> partitioned topics it looks like you don't have a choice where a Consumer
>>>>>>>>>> is able to resume from as you have a typical get message and ack scheme
>>>>>>>>>> client. In this kind of setup for an initial implementation it is best if
>>>>>>>>>> you can:
>>>>>>>>>> 1) Occasionally poll to see how many messages are still in the
>>>>>>>>>> queue ahead of you so you can report the remaining work as 1 /
>>>>>>>>>> numberOfInitialSplits * numOutstandngMessages
>>>>>>>>>> 2) Use a pull model for messages (e.g. request N messages and
>>>>>>>>>> output them all). This prevents an issue where the client library instances
>>>>>>>>>> effectively are holding onto unprocessed messages while the bundle isn't
>>>>>>>>>> being processed.
>>>>>>>>>> 3) Only support checkpointing in the RestrictionTracker (adding
>>>>>>>>>> support for dynamic splitting would be great but no runner would exercise
>>>>>>>>>> it right now in a streaming pipeline)
>>>>>>>>>>
>>>>>>>>>> It looks like the above would work for both the multi-partition
>>>>>>>>>> and single partition scenarios and still could parallelize to the capacity
>>>>>>>>>> of what the brokers could handle. Note that in the future you could still
>>>>>>>>>> have a single SDF implementation that handles two types of restrictions one
>>>>>>>>>> being the Consumer based one and the other being the Reader based one (See
>>>>>>>>>> Watch.java[1] for a growing and nongrowing restriction for what I mean by
>>>>>>>>>> having different branching logic). In the future you would update the
>>>>>>>>>> initial splitting logic to check whether the broker has a single partition
>>>>>>>>>> and then you could create "Reader" restrictions but this would only be
>>>>>>>>>> useful if you felt as though there was something to be gained from using
>>>>>>>>>> it. For the Reader based interface:
>>>>>>>>>> 4) Do you expect the user to supply the message id for the first
>>>>>>>>>> message? (if so is there a way to partition the message id space? (e.g. in
>>>>>>>>>> Kafka the id is a number that increments and you know where you are and can
>>>>>>>>>> poll for the latest id so you can split the numerical range easily))
>>>>>>>>>> 5) What value do you see it providing?
>>>>>>>>>>
>>>>>>>>>> 1:
>>>>>>>>>> https://github.com/apache/beam/blob/03a1cca42ceeec2e963ec14c9bc344956a8683b3/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java#L885
>>>>>>>>>>
>>>>>>>>>> On Tue, Aug 3, 2021 at 1:17 PM Marco Robles Pulido <
>>>>>>>>>> marco.robles@wizeline.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi folks,
>>>>>>>>>>>
>>>>>>>>>>> I am working with the new PulsarIO connector with Beam, and most
>>>>>>>>>>> of my work has been in researching how Pulsar works, as many of you know we
>>>>>>>>>>> already have KafkaIO connector which is kind of similar to Pulsar but there
>>>>>>>>>>> is some difference that I have found during my research and I would like to
>>>>>>>>>>> know your input in how would you handle the implementation for SDF. Here
>>>>>>>>>>> are my main concerns:
>>>>>>>>>>> - As you may know kafka handles by default partitioned topics
>>>>>>>>>>> where each message within the partition gets an incremental id, called
>>>>>>>>>>> offset. Having this in mind SDF implementation for kafka works something
>>>>>>>>>>> like this, where the element to evaluate is the topic/partition and the
>>>>>>>>>>> restrictions are the start and end offsets.
>>>>>>>>>>> - For Pulsar, partitioned topics are optional
>>>>>>>>>>> <https://pulsar.apache.org/docs/en/concepts-messaging/#partitioned-topics> or
>>>>>>>>>>> well by default are handled by single broker, there is a possibility where
>>>>>>>>>>> you can use the partitioned topics, but you will limit the final user to
>>>>>>>>>>> use only partitioned topics with pulsar, as well, there is a possibility
>>>>>>>>>>> to manually handle cursors
>>>>>>>>>>> <https://pulsar.apache.org/docs/en/2.5.1/concepts-clients/#reader-interface>
>>>>>>>>>>> which will be the earliest and latest message available that may be used as
>>>>>>>>>>> restrictions (but implementing this will not allow to use partitioned
>>>>>>>>>>> topics). So with this in mind I was thinking there should be two
>>>>>>>>>>> implementations one that use partitioned topics with pulsar and the other
>>>>>>>>>>> one that manually handle cursors.
>>>>>>>>>>>
>>>>>>>>>>> So, let me know your ideas/input about it. And maybe If i am
>>>>>>>>>>> wrong help to clarify the SDF restrictions for KafkaIO.
>>>>>>>>>>>
>>>>>>>>>>> Thanks,
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>>
>>>>>>>>>>> *Marco Robles* *|* WIZELINE
>>>>>>>>>>>
>>>>>>>>>>> Software Engineer
>>>>>>>>>>>
>>>>>>>>>>> marco.robles@wizeline.com
>>>>>>>>>>>
>>>>>>>>>>> Amado Nervo 2200, Esfera P6, Col. Ciudad del Sol, 45050 Zapopan,
>>>>>>>>>>> Jal.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> *This email and its contents (including any attachments) are
>>>>>>>>>>> being sent toyou on the condition of confidentiality and may be protected
>>>>>>>>>>> by legalprivilege. Access to this email by anyone other than the intended
>>>>>>>>>>> recipientis unauthorized. If you are not the intended recipient, please
>>>>>>>>>>> immediatelynotify the sender by replying to this message and delete the
>>>>>>>>>>> materialimmediately from your system. Any further use, dissemination,
>>>>>>>>>>> distributionor reproduction of this email is strictly prohibited. Further,
>>>>>>>>>>> norepresentation is made with respect to any content contained in this
>>>>>>>>>>> email.*
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>>
>>>>>>>>> *Marco Robles* *|* WIZELINE
>>>>>>>>>
>>>>>>>>> Software Engineer
>>>>>>>>>
>>>>>>>>> marco.robles@wizeline.com
>>>>>>>>>
>>>>>>>>> Amado Nervo 2200, Esfera P6, Col. Ciudad del Sol, 45050 Zapopan,
>>>>>>>>> Jal.
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>>
>>>>>>>> *Marco Robles* *|* WIZELINE
>>>>>>>>
>>>>>>>> Software Engineer
>>>>>>>>
>>>>>>>> marco.robles@wizeline.com
>>>>>>>>
>>>>>>>> Amado Nervo 2200, Esfera P6, Col. Ciudad del Sol, 45050 Zapopan,
>>>>>>>> Jal.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> *This email and its contents (including any attachments) are being
>>>>>>>> sent toyou on the condition of confidentiality and may be protected by
>>>>>>>> legalprivilege. Access to this email by anyone other than the intended
>>>>>>>> recipientis unauthorized. If you are not the intended recipient, please
>>>>>>>> immediatelynotify the sender by replying to this message and delete the
>>>>>>>> materialimmediately from your system. Any further use, dissemination,
>>>>>>>> distributionor reproduction of this email is strictly prohibited. Further,
>>>>>>>> norepresentation is made with respect to any content contained in this
>>>>>>>> email.*
>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>>
>>>>>> *Marco Robles* *|* WIZELINE
>>>>>>
>>>>>> Software Engineer
>>>>>>
>>>>>> marco.robles@wizeline.com
>>>>>>
>>>>>> Amado Nervo 2200, Esfera P6, Col. Ciudad del Sol, 45050 Zapopan, Jal.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> *This email and its contents (including any attachments) are being
>>>>>> sent toyou on the condition of confidentiality and may be protected by
>>>>>> legalprivilege. Access to this email by anyone other than the intended
>>>>>> recipientis unauthorized. If you are not the intended recipient, please
>>>>>> immediatelynotify the sender by replying to this message and delete the
>>>>>> materialimmediately from your system. Any further use, dissemination,
>>>>>> distributionor reproduction of this email is strictly prohibited. Further,
>>>>>> norepresentation is made with respect to any content contained in this
>>>>>> email.*
>>>>>
>>>>>
>>>>
>>>> --
>>>>
>>>> *Marco Robles* *|* WIZELINE
>>>>
>>>> Software Engineer
>>>>
>>>> marco.robles@wizeline.com
>>>>
>>>> Amado Nervo 2200, Esfera P6, Col. Ciudad del Sol, 45050 Zapopan, Jal.
>>>>
>>>
>>>
>>> --
>>>
>>> *Marco Robles* *|* WIZELINE
>>>
>>> Software Engineer
>>>
>>> marco.robles@wizeline.com
>>>
>>> Amado Nervo 2200, Esfera P6, Col. Ciudad del Sol, 45050 Zapopan, Jal.
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> *This email and its contents (including any attachments) are being sent
>>> toyou on the condition of confidentiality and may be protected by
>>> legalprivilege. Access to this email by anyone other than the intended
>>> recipientis unauthorized. If you are not the intended recipient, please
>>> immediatelynotify the sender by replying to this message and delete the
>>> materialimmediately from your system. Any further use, dissemination,
>>> distributionor reproduction of this email is strictly prohibited. Further,
>>> norepresentation is made with respect to any content contained in this
>>> email.*
>>
>>
>
> --
>
> *Marco Robles* *|* WIZELINE
>
> Software Engineer
>
> marco.robles@wizeline.com
>
> Amado Nervo 2200, Esfera P6, Col. Ciudad del Sol, 45050 Zapopan, Jal.
>
>
>
>
>
>
>
>
> *This email and its contents (including any attachments) are being sent
> toyou on the condition of confidentiality and may be protected by
> legalprivilege. Access to this email by anyone other than the intended
> recipientis unauthorized. If you are not the intended recipient, please
> immediatelynotify the sender by replying to this message and delete the
> materialimmediately from your system. Any further use, dissemination,
> distributionor reproduction of this email is strictly prohibited. Further,
> norepresentation is made with respect to any content contained in this
> email.*

Re: Help/Inputs with PulsarIO connector (GetSizeVSHasProgress)

Posted by Marco Robles <ma...@wizeline.com>.
Hi,

Is there any example of how to implement HasProgress or how it is being
used?

Thanks

On Fri, Sep 3, 2021 at 4:39 PM Luke Cwik <lc...@google.com> wrote:

> GetSize delegates to HasProgress if it isn't implemented and reports the
> work remaining as the size.
>
> HasProgress is strictly more powerful since it can report the size when
> the restriction isn't being processed and also report work
> completed/remaining when the restriction is actively being processed.
>
> On Thu, Sep 2, 2021 at 12:40 PM Marco Robles <ma...@wizeline.com>
> wrote:
>
>> Hi folks,
>>
>> Still working with PulsarIO, I am kind of stuck trying to define the Size
>> and progress
>> <https://beam.apache.org/documentation/programming-guide/#sizing-and-progress>
>> for the SDF, I found the following info in beam javadoc
>> <https://beam.apache.org/releases/javadoc/2.32.0/org/apache/beam/sdk/transforms/DoFn.ProcessElement.html>,
>> which says that SDF may define a *GetSize* method or at least ensure the
>> *RestrictionTracker* implements *HasProgress*. For a pub/sub technology,
>> would you recommend using *HasProgress* rather than *GetSize* (which
>> somehow it will be an estimate of how much work would be processed)?
>>
>> For the HasProgress, I supposed it should be implemented in
>> the @NewTracker method, right? Does anyone have experience using
>> HasProgress?
>>
>> Thanks in advance.
>>
>> On Mon, Aug 16, 2021 at 11:49 AM Marco Robles <ma...@wizeline.com>
>> wrote:
>>
>>> Does the client buffer/reserve any messages?
>>> - Pulsar stores the messages that haven't been acknowledged
>>> <https://pulsar.apache.org/docs/en/cookbooks-retention-expiry/#retention-policies>.
>>> And you acknowledged the message by calling consumer.acknowledge(msg);
>>>
>>> On Fri, Aug 13, 2021 at 6:58 PM Luke Cwik <lc...@google.com> wrote:
>>>
>>>> Does the client buffer/reserve any messages?
>>>> For example, you call consumer.receive() so it goes and fetches the
>>>> next 100 messages assuming that you'll want them as well so the next 99
>>>> consumer.receive() calls will be fast.
>>>>
>>>> On Fri, Aug 13, 2021 at 4:16 PM Marco Robles Pulido <
>>>> marco.robles@wizeline.com> wrote:
>>>>
>>>>>
>>>>>
>>>>>> I am taking in consideration to use this approach for the PulsarIO -
>>>>>> SDF implementation, in which I pull N messages or use the default
>>>>>> size for the queue message in apache pulsar
>>>>>> <https://pulsar.apache.org/docs/en/concepts-messaging/#consumers> (which
>>>>>> is 1000) and keep adding to "N" as many messages are still being received
>>>>>> in the queue with help of the client listener
>>>>>> <https://pulsar.apache.org/docs/en/concepts-messaging/#listeners>,  so
>>>>>> every time the listener is invoked will do something like N + 1.
>>>>>>
>>>>>
>>>>> Does the client "pull" the messages when you create the client or does
>>>>> it "pull" the messages when you perform an explicit API call like
>>>>> "getMessages"?
>>>>>
>>>>>  - The client "pull" the messages when you perform
>>>>> *consumer.receive()*, you can see here a basic example of a Simple
>>>>> Consumer client
>>>>> <https://github.com/streamnative/streamnative-academy/blob/master/src/main/java/sn/academy/consumers/SimpleConsumer.java>,
>>>>> so each time consumer.receive() is called, a message is dequeued from the
>>>>> buffer. As well there is a way to keep listening messages using a listener
>>>>> as is shown in this client example
>>>>> <https://pulsar.apache.org/docs/en/client-libraries-java/#consumer> (see
>>>>> MessageListener example, above *Configure consumer* section)
>>>>>
>>>>> On Fri, Aug 13, 2021 at 5:14 PM Luke Cwik <lc...@google.com> wrote:
>>>>>
>>>>>>
>>>>>>
>>>>>> On Thu, Aug 12, 2021 at 3:31 PM Marco Robles Pulido <
>>>>>> marco.robles@wizeline.com> wrote:
>>>>>>
>>>>>>> Sorry It was sent without finishing the email.
>>>>>>>
>>>>>>> Hi,
>>>>>>>  So regarding to Luke comments for the first approach
>>>>>>> 1) Occasionally poll to see how many messages are still in the queue
>>>>>>> ahead of you so you can report the remaining work as 1 /
>>>>>>> numberOfInitialSplits * numOutstandngMessages
>>>>>>>     - What do you mean for numOutstandingMessages? The messages that
>>>>>>> are still pending in the queue, then using the above formula, it will give
>>>>>>> you the remaining work that can be used for the restriction in which the
>>>>>>> initial restriction for SDF will be like,
>>>>>>> [0, 1 / numberOfInitialSplits * numOutstandngMessages)
>>>>>>>
>>>>>>
>>>>>> Yes.
>>>>>>
>>>>>>
>>>>>>>
>>>>>>> I am taking in consideration to use this approach for the PulsarIO -
>>>>>>> SDF implementation, in which I pull N messages or use the default
>>>>>>> size for the queue message in apache pulsar
>>>>>>> <https://pulsar.apache.org/docs/en/concepts-messaging/#consumers> (which
>>>>>>> is 1000) and keep adding to "N" as many messages are still being received
>>>>>>> in the queue with help of the client listener
>>>>>>> <https://pulsar.apache.org/docs/en/concepts-messaging/#listeners>,  so
>>>>>>> every time the listener is invoked will do something like N + 1.
>>>>>>>
>>>>>>
>>>>>> Does the client "pull" the messages when you create the client or
>>>>>> does it "pull" the messages when you perform an explicit API call like
>>>>>> "getMessages"?
>>>>>>
>>>>>>
>>>>>>>
>>>>>>> Thanks in advance
>>>>>>>
>>>>>>> On Thu, Aug 12, 2021 at 5:18 PM Marco Robles Pulido <
>>>>>>> marco.robles@wizeline.com> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> So regarding to Luke comments for the first approach
>>>>>>>> 1) Occasionally poll to see how many messages are still in the
>>>>>>>> queue ahead of you so you can report the remaining work as 1 /
>>>>>>>> numberOfInitialSplits * numOutstandngMessages
>>>>>>>>     - What do you mean for numOutstandingMessages? The messages
>>>>>>>> that are still pending in the queue, then using the above formula, it will
>>>>>>>> give you the remaining work that can be used for the restriction in which
>>>>>>>> the initial restriction for SDF will be like,
>>>>>>>>
>>>>>>>> On Wed, Aug 4, 2021 at 11:02 AM Luke Cwik <lc...@google.com> wrote:
>>>>>>>>
>>>>>>>>> Your research into the SDF Kafka implementation seems spot on.
>>>>>>>>>
>>>>>>>>> I took a quick look at the links you had provided and for
>>>>>>>>> partitioned topics it looks like you don't have a choice where a Consumer
>>>>>>>>> is able to resume from as you have a typical get message and ack scheme
>>>>>>>>> client. In this kind of setup for an initial implementation it is best if
>>>>>>>>> you can:
>>>>>>>>> 1) Occasionally poll to see how many messages are still in the
>>>>>>>>> queue ahead of you so you can report the remaining work as 1 /
>>>>>>>>> numberOfInitialSplits * numOutstandngMessages
>>>>>>>>> 2) Use a pull model for messages (e.g. request N messages and
>>>>>>>>> output them all). This prevents an issue where the client library instances
>>>>>>>>> effectively are holding onto unprocessed messages while the bundle isn't
>>>>>>>>> being processed.
>>>>>>>>> 3) Only support checkpointing in the RestrictionTracker (adding
>>>>>>>>> support for dynamic splitting would be great but no runner would exercise
>>>>>>>>> it right now in a streaming pipeline)
>>>>>>>>>
>>>>>>>>> It looks like the above would work for both the multi-partition
>>>>>>>>> and single partition scenarios and still could parallelize to the capacity
>>>>>>>>> of what the brokers could handle. Note that in the future you could still
>>>>>>>>> have a single SDF implementation that handles two types of restrictions one
>>>>>>>>> being the Consumer based one and the other being the Reader based one (See
>>>>>>>>> Watch.java[1] for a growing and nongrowing restriction for what I mean by
>>>>>>>>> having different branching logic). In the future you would update the
>>>>>>>>> initial splitting logic to check whether the broker has a single partition
>>>>>>>>> and then you could create "Reader" restrictions but this would only be
>>>>>>>>> useful if you felt as though there was something to be gained from using
>>>>>>>>> it. For the Reader based interface:
>>>>>>>>> 4) Do you expect the user to supply the message id for the first
>>>>>>>>> message? (if so is there a way to partition the message id space? (e.g. in
>>>>>>>>> Kafka the id is a number that increments and you know where you are and can
>>>>>>>>> poll for the latest id so you can split the numerical range easily))
>>>>>>>>> 5) What value do you see it providing?
>>>>>>>>>
>>>>>>>>> 1:
>>>>>>>>> https://github.com/apache/beam/blob/03a1cca42ceeec2e963ec14c9bc344956a8683b3/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java#L885
>>>>>>>>>
>>>>>>>>> On Tue, Aug 3, 2021 at 1:17 PM Marco Robles Pulido <
>>>>>>>>> marco.robles@wizeline.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi folks,
>>>>>>>>>>
>>>>>>>>>> I am working with the new PulsarIO connector with Beam, and most
>>>>>>>>>> of my work has been in researching how Pulsar works, as many of you know we
>>>>>>>>>> already have KafkaIO connector which is kind of similar to Pulsar but there
>>>>>>>>>> is some difference that I have found during my research and I would like to
>>>>>>>>>> know your input in how would you handle the implementation for SDF. Here
>>>>>>>>>> are my main concerns:
>>>>>>>>>> - As you may know kafka handles by default partitioned topics
>>>>>>>>>> where each message within the partition gets an incremental id, called
>>>>>>>>>> offset. Having this in mind SDF implementation for kafka works something
>>>>>>>>>> like this, where the element to evaluate is the topic/partition and the
>>>>>>>>>> restrictions are the start and end offsets.
>>>>>>>>>> - For Pulsar, partitioned topics are optional
>>>>>>>>>> <https://pulsar.apache.org/docs/en/concepts-messaging/#partitioned-topics> or
>>>>>>>>>> well by default are handled by single broker, there is a possibility where
>>>>>>>>>> you can use the partitioned topics, but you will limit the final user to
>>>>>>>>>> use only partitioned topics with pulsar, as well, there is a possibility
>>>>>>>>>> to manually handle cursors
>>>>>>>>>> <https://pulsar.apache.org/docs/en/2.5.1/concepts-clients/#reader-interface>
>>>>>>>>>> which will be the earliest and latest message available that may be used as
>>>>>>>>>> restrictions (but implementing this will not allow to use partitioned
>>>>>>>>>> topics). So with this in mind I was thinking there should be two
>>>>>>>>>> implementations one that use partitioned topics with pulsar and the other
>>>>>>>>>> one that manually handle cursors.
>>>>>>>>>>
>>>>>>>>>> So, let me know your ideas/input about it. And maybe If i am
>>>>>>>>>> wrong help to clarify the SDF restrictions for KafkaIO.
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>>
>>>>>>>>>> *Marco Robles* *|* WIZELINE
>>>>>>>>>>
>>>>>>>>>> Software Engineer
>>>>>>>>>>
>>>>>>>>>> marco.robles@wizeline.com
>>>>>>>>>>
>>>>>>>>>> Amado Nervo 2200, Esfera P6, Col. Ciudad del Sol, 45050 Zapopan,
>>>>>>>>>> Jal.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> *This email and its contents (including any attachments) are
>>>>>>>>>> being sent toyou on the condition of confidentiality and may be protected
>>>>>>>>>> by legalprivilege. Access to this email by anyone other than the intended
>>>>>>>>>> recipientis unauthorized. If you are not the intended recipient, please
>>>>>>>>>> immediatelynotify the sender by replying to this message and delete the
>>>>>>>>>> materialimmediately from your system. Any further use, dissemination,
>>>>>>>>>> distributionor reproduction of this email is strictly prohibited. Further,
>>>>>>>>>> norepresentation is made with respect to any content contained in this
>>>>>>>>>> email.*
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>>
>>>>>>>> *Marco Robles* *|* WIZELINE
>>>>>>>>
>>>>>>>> Software Engineer
>>>>>>>>
>>>>>>>> marco.robles@wizeline.com
>>>>>>>>
>>>>>>>> Amado Nervo 2200, Esfera P6, Col. Ciudad del Sol, 45050 Zapopan,
>>>>>>>> Jal.
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>>
>>>>>>> *Marco Robles* *|* WIZELINE
>>>>>>>
>>>>>>> Software Engineer
>>>>>>>
>>>>>>> marco.robles@wizeline.com
>>>>>>>
>>>>>>> Amado Nervo 2200, Esfera P6, Col. Ciudad del Sol, 45050 Zapopan, Jal.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> *This email and its contents (including any attachments) are being
>>>>>>> sent toyou on the condition of confidentiality and may be protected by
>>>>>>> legalprivilege. Access to this email by anyone other than the intended
>>>>>>> recipientis unauthorized. If you are not the intended recipient, please
>>>>>>> immediatelynotify the sender by replying to this message and delete the
>>>>>>> materialimmediately from your system. Any further use, dissemination,
>>>>>>> distributionor reproduction of this email is strictly prohibited. Further,
>>>>>>> norepresentation is made with respect to any content contained in this
>>>>>>> email.*
>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> *Marco Robles* *|* WIZELINE
>>>>>
>>>>> Software Engineer
>>>>>
>>>>> marco.robles@wizeline.com
>>>>>
>>>>> Amado Nervo 2200, Esfera P6, Col. Ciudad del Sol, 45050 Zapopan, Jal.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> *This email and its contents (including any attachments) are being
>>>>> sent toyou on the condition of confidentiality and may be protected by
>>>>> legalprivilege. Access to this email by anyone other than the intended
>>>>> recipientis unauthorized. If you are not the intended recipient, please
>>>>> immediatelynotify the sender by replying to this message and delete the
>>>>> materialimmediately from your system. Any further use, dissemination,
>>>>> distributionor reproduction of this email is strictly prohibited. Further,
>>>>> norepresentation is made with respect to any content contained in this
>>>>> email.*
>>>>
>>>>
>>>
>>> --
>>>
>>> *Marco Robles* *|* WIZELINE
>>>
>>> Software Engineer
>>>
>>> marco.robles@wizeline.com
>>>
>>> Amado Nervo 2200, Esfera P6, Col. Ciudad del Sol, 45050 Zapopan, Jal.
>>>
>>
>>
>> --
>>
>> *Marco Robles* *|* WIZELINE
>>
>> Software Engineer
>>
>> marco.robles@wizeline.com
>>
>> Amado Nervo 2200, Esfera P6, Col. Ciudad del Sol, 45050 Zapopan, Jal.
>>
>>
>>
>>
>>
>>
>>
>>
>> *This email and its contents (including any attachments) are being sent
>> toyou on the condition of confidentiality and may be protected by
>> legalprivilege. Access to this email by anyone other than the intended
>> recipientis unauthorized. If you are not the intended recipient, please
>> immediatelynotify the sender by replying to this message and delete the
>> materialimmediately from your system. Any further use, dissemination,
>> distributionor reproduction of this email is strictly prohibited. Further,
>> norepresentation is made with respect to any content contained in this
>> email.*
>
>

-- 

*Marco Robles* *|* WIZELINE

Software Engineer

marco.robles@wizeline.com

Amado Nervo 2200, Esfera P6, Col. Ciudad del Sol, 45050 Zapopan, Jal.

-- 
*This email and its contents (including any attachments) are being sent to
you on the condition of confidentiality and may be protected by legal
privilege. Access to this email by anyone other than the intended recipient
is unauthorized. If you are not the intended recipient, please immediately
notify the sender by replying to this message and delete the material
immediately from your system. Any further use, dissemination, distribution
or reproduction of this email is strictly prohibited. Further, no
representation is made with respect to any content contained in this email.*

Re: Help/Inputs with PulsarIO connector (GetSizeVSHasProgress)

Posted by Luke Cwik <lc...@google.com>.
GetSize delegates to HasProgress if it isn't implemented and reports the
work remaining as the size.

HasProgress is strictly more powerful since it can report the size when the
restriction isn't being processed and also report work completed/remaining
when the restriction is actively being processed.

On Thu, Sep 2, 2021 at 12:40 PM Marco Robles <ma...@wizeline.com>
wrote:

> Hi folks,
>
> Still working with PulsarIO, I am kind of stuck trying to define the Size
> and progress
> <https://beam.apache.org/documentation/programming-guide/#sizing-and-progress>
> for the SDF, I found the following info in beam javadoc
> <https://beam.apache.org/releases/javadoc/2.32.0/org/apache/beam/sdk/transforms/DoFn.ProcessElement.html>,
> which says that SDF may define a *GetSize* method or at least ensure the
> *RestrictionTracker* implements *HasProgress*. For a pub/sub technology,
> would you recommend using *HasProgress* rather than *GetSize* (which
> somehow it will be an estimate of how much work would be processed)?
>
> For the HasProgress, I supposed it should be implemented in
> the @NewTracker method, right? Does anyone have experience using
> HasProgress?
>
> Thanks in advance.
>
> On Mon, Aug 16, 2021 at 11:49 AM Marco Robles <ma...@wizeline.com>
> wrote:
>
>> Does the client buffer/reserve any messages?
>> - Pulsar stores the messages that haven't been acknowledged
>> <https://pulsar.apache.org/docs/en/cookbooks-retention-expiry/#retention-policies>.
>> And you acknowledged the message by calling consumer.acknowledge(msg);
>>
>> On Fri, Aug 13, 2021 at 6:58 PM Luke Cwik <lc...@google.com> wrote:
>>
>>> Does the client buffer/reserve any messages?
>>> For example, you call consumer.receive() so it goes and fetches the next
>>> 100 messages assuming that you'll want them as well so the next 99
>>> consumer.receive() calls will be fast.
>>>
>>> On Fri, Aug 13, 2021 at 4:16 PM Marco Robles Pulido <
>>> marco.robles@wizeline.com> wrote:
>>>
>>>>
>>>>
>>>>> I am taking in consideration to use this approach for the PulsarIO -
>>>>> SDF implementation, in which I pull N messages or use the default
>>>>> size for the queue message in apache pulsar
>>>>> <https://pulsar.apache.org/docs/en/concepts-messaging/#consumers> (which
>>>>> is 1000) and keep adding to "N" as many messages are still being received
>>>>> in the queue with help of the client listener
>>>>> <https://pulsar.apache.org/docs/en/concepts-messaging/#listeners>,  so
>>>>> every time the listener is invoked will do something like N + 1.
>>>>>
>>>>
>>>> Does the client "pull" the messages when you create the client or does
>>>> it "pull" the messages when you perform an explicit API call like
>>>> "getMessages"?
>>>>
>>>>  - The client "pull" the messages when you perform *consumer.receive()*,
>>>> you can see here a basic example of a Simple Consumer client
>>>> <https://github.com/streamnative/streamnative-academy/blob/master/src/main/java/sn/academy/consumers/SimpleConsumer.java>,
>>>> so each time consumer.receive() is called, a message is dequeued from the
>>>> buffer. As well there is a way to keep listening messages using a listener
>>>> as is shown in this client example
>>>> <https://pulsar.apache.org/docs/en/client-libraries-java/#consumer> (see
>>>> MessageListener example, above *Configure consumer* section)
>>>>
>>>> On Fri, Aug 13, 2021 at 5:14 PM Luke Cwik <lc...@google.com> wrote:
>>>>
>>>>>
>>>>>
>>>>> On Thu, Aug 12, 2021 at 3:31 PM Marco Robles Pulido <
>>>>> marco.robles@wizeline.com> wrote:
>>>>>
>>>>>> Sorry It was sent without finishing the email.
>>>>>>
>>>>>> Hi,
>>>>>>  So regarding to Luke comments for the first approach
>>>>>> 1) Occasionally poll to see how many messages are still in the queue
>>>>>> ahead of you so you can report the remaining work as 1 /
>>>>>> numberOfInitialSplits * numOutstandngMessages
>>>>>>     - What do you mean for numOutstandingMessages? The messages that
>>>>>> are still pending in the queue, then using the above formula, it will give
>>>>>> you the remaining work that can be used for the restriction in which the
>>>>>> initial restriction for SDF will be like,
>>>>>> [0, 1 / numberOfInitialSplits * numOutstandngMessages)
>>>>>>
>>>>>
>>>>> Yes.
>>>>>
>>>>>
>>>>>>
>>>>>> I am taking in consideration to use this approach for the PulsarIO -
>>>>>> SDF implementation, in which I pull N messages or use the default
>>>>>> size for the queue message in apache pulsar
>>>>>> <https://pulsar.apache.org/docs/en/concepts-messaging/#consumers> (which
>>>>>> is 1000) and keep adding to "N" as many messages are still being received
>>>>>> in the queue with help of the client listener
>>>>>> <https://pulsar.apache.org/docs/en/concepts-messaging/#listeners>,  so
>>>>>> every time the listener is invoked will do something like N + 1.
>>>>>>
>>>>>
>>>>> Does the client "pull" the messages when you create the client or does
>>>>> it "pull" the messages when you perform an explicit API call like
>>>>> "getMessages"?
>>>>>
>>>>>
>>>>>>
>>>>>> Thanks in advance
>>>>>>
>>>>>> On Thu, Aug 12, 2021 at 5:18 PM Marco Robles Pulido <
>>>>>> marco.robles@wizeline.com> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> So regarding to Luke comments for the first approach
>>>>>>> 1) Occasionally poll to see how many messages are still in the queue
>>>>>>> ahead of you so you can report the remaining work as 1 /
>>>>>>> numberOfInitialSplits * numOutstandngMessages
>>>>>>>     - What do you mean for numOutstandingMessages? The messages that
>>>>>>> are still pending in the queue, then using the above formula, it will give
>>>>>>> you the remaining work that can be used for the restriction in which the
>>>>>>> initial restriction for SDF will be like,
>>>>>>>
>>>>>>> On Wed, Aug 4, 2021 at 11:02 AM Luke Cwik <lc...@google.com> wrote:
>>>>>>>
>>>>>>>> Your research into the SDF Kafka implementation seems spot on.
>>>>>>>>
>>>>>>>> I took a quick look at the links you had provided and for
>>>>>>>> partitioned topics it looks like you don't have a choice where a Consumer
>>>>>>>> is able to resume from as you have a typical get message and ack scheme
>>>>>>>> client. In this kind of setup for an initial implementation it is best if
>>>>>>>> you can:
>>>>>>>> 1) Occasionally poll to see how many messages are still in the
>>>>>>>> queue ahead of you so you can report the remaining work as 1 /
>>>>>>>> numberOfInitialSplits * numOutstandngMessages
>>>>>>>> 2) Use a pull model for messages (e.g. request N messages and
>>>>>>>> output them all). This prevents an issue where the client library instances
>>>>>>>> effectively are holding onto unprocessed messages while the bundle isn't
>>>>>>>> being processed.
>>>>>>>> 3) Only support checkpointing in the RestrictionTracker (adding
>>>>>>>> support for dynamic splitting would be great but no runner would exercise
>>>>>>>> it right now in a streaming pipeline)
>>>>>>>>
>>>>>>>> It looks like the above would work for both the multi-partition and
>>>>>>>> single partition scenarios and still could parallelize to the capacity of
>>>>>>>> what the brokers could handle. Note that in the future you could still have
>>>>>>>> a single SDF implementation that handles two types of restrictions one
>>>>>>>> being the Consumer based one and the other being the Reader based one (See
>>>>>>>> Watch.java[1] for a growing and nongrowing restriction for what I mean by
>>>>>>>> having different branching logic). In the future you would update the
>>>>>>>> initial splitting logic to check whether the broker has a single partition
>>>>>>>> and then you could create "Reader" restrictions but this would only be
>>>>>>>> useful if you felt as though there was something to be gained from using
>>>>>>>> it. For the Reader based interface:
>>>>>>>> 4) Do you expect the user to supply the message id for the first
>>>>>>>> message? (if so is there a way to partition the message id space? (e.g. in
>>>>>>>> Kafka the id is a number that increments and you know where you are and can
>>>>>>>> poll for the latest id so you can split the numerical range easily))
>>>>>>>> 5) What value do you see it providing?
>>>>>>>>
>>>>>>>> 1:
>>>>>>>> https://github.com/apache/beam/blob/03a1cca42ceeec2e963ec14c9bc344956a8683b3/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java#L885
>>>>>>>>
>>>>>>>> On Tue, Aug 3, 2021 at 1:17 PM Marco Robles Pulido <
>>>>>>>> marco.robles@wizeline.com> wrote:
>>>>>>>>
>>>>>>>>> Hi folks,
>>>>>>>>>
>>>>>>>>> I am working with the new PulsarIO connector with Beam, and most
>>>>>>>>> of my work has been in researching how Pulsar works, as many of you know we
>>>>>>>>> already have KafkaIO connector which is kind of similar to Pulsar but there
>>>>>>>>> is some difference that I have found during my research and I would like to
>>>>>>>>> know your input in how would you handle the implementation for SDF. Here
>>>>>>>>> are my main concerns:
>>>>>>>>> - As you may know kafka handles by default partitioned topics
>>>>>>>>> where each message within the partition gets an incremental id, called
>>>>>>>>> offset. Having this in mind SDF implementation for kafka works something
>>>>>>>>> like this, where the element to evaluate is the topic/partition and the
>>>>>>>>> restrictions are the start and end offsets.
>>>>>>>>> - For Pulsar, partitioned topics are optional
>>>>>>>>> <https://pulsar.apache.org/docs/en/concepts-messaging/#partitioned-topics> or
>>>>>>>>> well by default are handled by single broker, there is a possibility where
>>>>>>>>> you can use the partitioned topics, but you will limit the final user to
>>>>>>>>> use only partitioned topics with pulsar, as well, there is a possibility
>>>>>>>>> to manually handle cursors
>>>>>>>>> <https://pulsar.apache.org/docs/en/2.5.1/concepts-clients/#reader-interface>
>>>>>>>>> which will be the earliest and latest message available that may be used as
>>>>>>>>> restrictions (but implementing this will not allow to use partitioned
>>>>>>>>> topics). So with this in mind I was thinking there should be two
>>>>>>>>> implementations one that use partitioned topics with pulsar and the other
>>>>>>>>> one that manually handle cursors.
>>>>>>>>>
>>>>>>>>> So, let me know your ideas/input about it. And maybe If i am wrong
>>>>>>>>> help to clarify the SDF restrictions for KafkaIO.
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>>
>>>>>>>>> *Marco Robles* *|* WIZELINE
>>>>>>>>>
>>>>>>>>> Software Engineer
>>>>>>>>>
>>>>>>>>> marco.robles@wizeline.com
>>>>>>>>>
>>>>>>>>> Amado Nervo 2200, Esfera P6, Col. Ciudad del Sol, 45050 Zapopan,
>>>>>>>>> Jal.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> *This email and its contents (including any attachments) are being
>>>>>>>>> sent toyou on the condition of confidentiality and may be protected by
>>>>>>>>> legalprivilege. Access to this email by anyone other than the intended
>>>>>>>>> recipientis unauthorized. If you are not the intended recipient, please
>>>>>>>>> immediatelynotify the sender by replying to this message and delete the
>>>>>>>>> materialimmediately from your system. Any further use, dissemination,
>>>>>>>>> distributionor reproduction of this email is strictly prohibited. Further,
>>>>>>>>> norepresentation is made with respect to any content contained in this
>>>>>>>>> email.*
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>>
>>>>>>> *Marco Robles* *|* WIZELINE
>>>>>>>
>>>>>>> Software Engineer
>>>>>>>
>>>>>>> marco.robles@wizeline.com
>>>>>>>
>>>>>>> Amado Nervo 2200, Esfera P6, Col. Ciudad del Sol, 45050 Zapopan, Jal.
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>>
>>>>>> *Marco Robles* *|* WIZELINE
>>>>>>
>>>>>> Software Engineer
>>>>>>
>>>>>> marco.robles@wizeline.com
>>>>>>
>>>>>> Amado Nervo 2200, Esfera P6, Col. Ciudad del Sol, 45050 Zapopan, Jal.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> *This email and its contents (including any attachments) are being
>>>>>> sent toyou on the condition of confidentiality and may be protected by
>>>>>> legalprivilege. Access to this email by anyone other than the intended
>>>>>> recipientis unauthorized. If you are not the intended recipient, please
>>>>>> immediatelynotify the sender by replying to this message and delete the
>>>>>> materialimmediately from your system. Any further use, dissemination,
>>>>>> distributionor reproduction of this email is strictly prohibited. Further,
>>>>>> norepresentation is made with respect to any content contained in this
>>>>>> email.*
>>>>>
>>>>>
>>>>
>>>> --
>>>>
>>>> *Marco Robles* *|* WIZELINE
>>>>
>>>> Software Engineer
>>>>
>>>> marco.robles@wizeline.com
>>>>
>>>> Amado Nervo 2200, Esfera P6, Col. Ciudad del Sol, 45050 Zapopan, Jal.
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> *This email and its contents (including any attachments) are being sent
>>>> toyou on the condition of confidentiality and may be protected by
>>>> legalprivilege. Access to this email by anyone other than the intended
>>>> recipientis unauthorized. If you are not the intended recipient, please
>>>> immediatelynotify the sender by replying to this message and delete the
>>>> materialimmediately from your system. Any further use, dissemination,
>>>> distributionor reproduction of this email is strictly prohibited. Further,
>>>> norepresentation is made with respect to any content contained in this
>>>> email.*
>>>
>>>
>>
>> --
>>
>> *Marco Robles* *|* WIZELINE
>>
>> Software Engineer
>>
>> marco.robles@wizeline.com
>>
>> Amado Nervo 2200, Esfera P6, Col. Ciudad del Sol, 45050 Zapopan, Jal.
>>
>
>
> --
>
> *Marco Robles* *|* WIZELINE
>
> Software Engineer
>
> marco.robles@wizeline.com
>
> Amado Nervo 2200, Esfera P6, Col. Ciudad del Sol, 45050 Zapopan, Jal.
>
>
>
>
>
>
>
>
> *This email and its contents (including any attachments) are being sent
> toyou on the condition of confidentiality and may be protected by
> legalprivilege. Access to this email by anyone other than the intended
> recipientis unauthorized. If you are not the intended recipient, please
> immediatelynotify the sender by replying to this message and delete the
> materialimmediately from your system. Any further use, dissemination,
> distributionor reproduction of this email is strictly prohibited. Further,
> norepresentation is made with respect to any content contained in this
> email.*