You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Marco Robles <ma...@wizeline.com> on 2021/09/17 20:14:07 UTC

Re: Help/Inputs with PulsarIO connector

Hi,

I am dealing with some blockers during the PulsarIO SDF implementation,
checking back the comments you mentioned before. What do you mean with the
Second idea of using a pull model for messages, request N messages and
output them all, will it be something like I fetched N messages, processed
them, and the next iteration or split will be the same amount of N messages
to process, so the N will be a fixed number (let's say 100), so each split
will be splitting into (0, 100], (101, 200] ... and so on until it
finished? Do I get it wrong?

Thanks in advance.

On Wed, Aug 4, 2021 at 11:02 AM Luke Cwik <lc...@google.com> wrote:

> Your research into the SDF Kafka implementation seems spot on.
>
> I took a quick look at the links you had provided and for partitioned
> topics it looks like you don't have a choice where a Consumer is able to
> resume from as you have a typical get message and ack scheme client. In
> this kind of setup for an initial implementation it is best if you can:
> 1) Occasionally poll to see how many messages are still in the queue ahead
> of you so you can report the remaining work as 1 / numberOfInitialSplits *
> numOutstandngMessages
> *2) Use a pull model for messages (e.g. request N messages and output them
> all). This prevents an issue where the client library instances effectively
> are holding onto unprocessed messages while the bundle isn't being
> processed.*
> 3) Only support checkpointing in the RestrictionTracker (adding support
> for dynamic splitting would be great but no runner would exercise it right
> now in a streaming pipeline)
>
> It looks like the above would work for both the multi-partition and single
> partition scenarios and still could parallelize to the capacity of what the
> brokers could handle. Note that in the future you could still have a single
> SDF implementation that handles two types of restrictions one being the
> Consumer based one and the other being the Reader based one (See
> Watch.java[1] for a growing and nongrowing restriction for what I mean by
> having different branching logic). In the future you would update the
> initial splitting logic to check whether the broker has a single partition
> and then you could create "Reader" restrictions but this would only be
> useful if you felt as though there was something to be gained from using
> it. For the Reader based interface:
> 4) Do you expect the user to supply the message id for the first message?
> (if so is there a way to partition the message id space? (e.g. in Kafka the
> id is a number that increments and you know where you are and can poll for
> the latest id so you can split the numerical range easily))
> 5) What value do you see it providing?
>
> 1:
> https://github.com/apache/beam/blob/03a1cca42ceeec2e963ec14c9bc344956a8683b3/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java#L885
>
> On Tue, Aug 3, 2021 at 1:17 PM Marco Robles Pulido <
> marco.robles@wizeline.com> wrote:
>
>> Hi folks,
>>
>> I am working with the new PulsarIO connector with Beam, and most of my
>> work has been in researching how Pulsar works, as many of you know we
>> already have KafkaIO connector which is kind of similar to Pulsar but there
>> is some difference that I have found during my research and I would like to
>> know your input in how would you handle the implementation for SDF. Here
>> are my main concerns:
>> - As you may know kafka handles by default partitioned topics where each
>> message within the partition gets an incremental id, called offset. Having
>> this in mind SDF implementation for kafka works something like this, where
>> the element to evaluate is the topic/partition and the restrictions are the
>> start and end offsets.
>> - For Pulsar, partitioned topics are optional
>> <https://pulsar.apache.org/docs/en/concepts-messaging/#partitioned-topics> or
>> well by default are handled by single broker, there is a possibility where
>> you can use the partitioned topics, but you will limit the final user to
>> use only partitioned topics with pulsar, as well, there is a possibility
>> to manually handle cursors
>> <https://pulsar.apache.org/docs/en/2.5.1/concepts-clients/#reader-interface>
>> which will be the earliest and latest message available that may be used as
>> restrictions (but implementing this will not allow to use partitioned
>> topics). So with this in mind I was thinking there should be two
>> implementations one that use partitioned topics with pulsar and the other
>> one that manually handle cursors.
>>
>> So, let me know your ideas/input about it. And maybe If i am wrong help
>> to clarify the SDF restrictions for KafkaIO.
>>
>> Thanks,
>>
>> --
>>
>> *Marco Robles* *|* WIZELINE
>>
>> Software Engineer
>>
>> marco.robles@wizeline.com
>>
>> Amado Nervo 2200, Esfera P6, Col. Ciudad del Sol, 45050 Zapopan, Jal.
>>
>>
>>
>>
>>
>>
>>
>>
>> *This email and its contents (including any attachments) are being sent
>> toyou on the condition of confidentiality and may be protected by
>> legalprivilege. Access to this email by anyone other than the intended
>> recipientis unauthorized. If you are not the intended recipient, please
>> immediatelynotify the sender by replying to this message and delete the
>> materialimmediately from your system. Any further use, dissemination,
>> distributionor reproduction of this email is strictly prohibited. Further,
>> norepresentation is made with respect to any content contained in this
>> email.*
>
>

-- 

*Marco Robles* *|* WIZELINE

Software Engineer

marco.robles@wizeline.com

Amado Nervo 2200, Esfera P6, Col. Ciudad del Sol, 45050 Zapopan, Jal.

-- 
*This email and its contents (including any attachments) are being sent to
you on the condition of confidentiality and may be protected by legal
privilege. Access to this email by anyone other than the intended recipient
is unauthorized. If you are not the intended recipient, please immediately
notify the sender by replying to this message and delete the material
immediately from your system. Any further use, dissemination, distribution
or reproduction of this email is strictly prohibited. Further, no
representation is made with respect to any content contained in this email.*

Re: Help/Inputs with PulsarIO connector

Posted by Luke Cwik <lc...@google.com>.
Sometimes classes within util packages might be expected to be used only
internally within the project (e.g. Apache Beam). If it is meant to be
consumed publicly then using the offsets sounds like a good alternative.

On Tue, Sep 28, 2021 at 4:16 PM Marco Robles <ma...@wizeline.com>
wrote:

> Hi,
>
> Thanks for the proposal, I will take a look at it!
>
> I was following the next approach, I would like to know your input on it
>
> I found the following util class that helps to convert MessageId into
> numerical offset, MessageIdUtils
> <https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/util/MessageIdUtils.java> ,
> and I am using it to get the offset restriction for SDF. So for each new
> restriction, we get the last message id committed and from there we do the
> next restriction, which will be something like [currentOffset,
> latestMessageIdCommitedOffset).
>
> Here is an example <https://github.com/apache/beam/pull/15572> of the
> initial approach, here is more detail about the initial approach.
>
> https://github.com/apache/beam/pull/15572
>
> Thanks in advance
>
> On Tue, Sep 28, 2021 at 3:48 PM Luke Cwik <lc...@google.com> wrote:
>
>> My second proposal wasn't about each split representing a fixed number of
>> messages, it was just that you pull the messages and output them and then
>> ack them when the output has been durably committed (e.g. in a
>> BundleFinalizer[1] if duplicates are OK or in a downstream transform
>> separated by a Reshuffle assuming you can ack messages in a different
>> worker from the one that did the reading). This is very similar to that the
>> GCP Pubsub implementation does (note this is implemented as
>> UnboundedSource).
>>
>> Thanks for the details about using the Reader API to enumerate single
>> partitions from a partitioned topic as that would make it easy to build
>> something that doesn't produce duplicates all the time and would allow for
>> great parallelization. The Reader API allows you to seek to latest[2] and
>> also to seek to a specific message timestamp[3]. So you could create two
>> types of restrictions, "read from timestamp X" and "read timestamp range
>> [Y, Z)". The idea is that you always have one restriction covering "read
>> from timestamp X" and as it makes progress it updates what X is whenever a
>> process continuation resume is returned or a split at fraction of remainder
>> of 0 is done. When a split request comes in with a non zero fraction of
>> remainder, you can find the latest message id and its timestamp Z and split
>> the range using the fraction between X and Z into a "read range [X, Y)" and
>> "read from Y".
>>
>> Unless the seek via timestamp is prohibitively expensive, I would highly
>> recommend using the reader API as this would mirror what is happening in
>> Kafka and would allow for really good parallelization.
>>
>> 1:
>> https://github.com/apache/beam/blob/5e7e66f116da5b947cb271ccaeb9a6a15fb1d6f6/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L1359
>> 2:
>> https://pulsar.incubator.apache.org/api/client/2.8.0-SNAPSHOT/org/apache/pulsar/client/api/MessageId.html#latest
>> 3:
>> https://pulsar.incubator.apache.org/api/client/2.8.0-SNAPSHOT/org/apache/pulsar/client/api/Reader.html#seek-long-
>>
>>
>> On Mon, Sep 20, 2021 at 8:06 AM Marco Robles <ma...@wizeline.com>
>> wrote:
>>
>>> Hi,
>>>
>>> Yes, It is correct.
>>>
>>> Thanks
>>>
>>> On Sun, Sep 19, 2021 at 11:34 PM Matteo Merli <mm...@apache.org> wrote:
>>>
>>>> Hi,
>>>>
>>>> I just wanted to clarify the behavior of readers on partitioned topics
>>>> in Pulsar.
>>>>
>>>> You have 2 main ways of consuming messages from pulsar topics:
>>>>   1. Consumers -> cursor is managed by the system, based on acks.
>>>> (still allows for seek() operations)
>>>>   2. Readers -> reading position is managed by application (eg: by
>>>> storing message ids into a state checkpoint).
>>>>
>>>> Consumers are automatically handling partitions, while readers are
>>>> meant to work at the individual partition level.
>>>>
>>>> When using readers, it's definitely possible to use them on partitioned
>>>> topics, just by creating 1 reader per partition. There is an easy way to
>>>> discover the list of partitions:
>>>>
>>>> List<String> partitions =
>>>> pulsarClient.getPartitionsForTopic("my-topic").join();
>>>> for (String p : partitions) {
>>>>     Reader<byte[]> reader = pulsarClient.newReader()
>>>>                   .topic(p)
>>>>                   .startMessageId(....)
>>>>                   .create();
>>>>
>>>>     ///...
>>>> }
>>>>
>>>> Matteo
>>>>
>>>> On 2021/09/17 20:14:07, Marco Robles <ma...@wizeline.com>
>>>> wrote:
>>>> > Hi,
>>>> >
>>>> > I am dealing with some blockers during the PulsarIO SDF
>>>> implementation,
>>>> > checking back the comments you mentioned before. What do you mean
>>>> with the
>>>> > Second idea of using a pull model for messages, request N messages and
>>>> > output them all, will it be something like I fetched N messages,
>>>> processed
>>>> > them, and the next iteration or split will be the same amount of N
>>>> messages
>>>> > to process, so the N will be a fixed number (let's say 100), so each
>>>> split
>>>> > will be splitting into (0, 100], (101, 200] ... and so on until it
>>>> > finished? Do I get it wrong?
>>>> >
>>>> > Thanks in advance.
>>>> >
>>>> > On Wed, Aug 4, 2021 at 11:02 AM Luke Cwik <lc...@google.com> wrote:
>>>> >
>>>> > > Your research into the SDF Kafka implementation seems spot on.
>>>> > >
>>>> > > I took a quick look at the links you had provided and for
>>>> partitioned
>>>> > > topics it looks like you don't have a choice where a Consumer is
>>>> able to
>>>> > > resume from as you have a typical get message and ack scheme
>>>> client. In
>>>> > > this kind of setup for an initial implementation it is best if you
>>>> can:
>>>> > > 1) Occasionally poll to see how many messages are still in the
>>>> queue ahead
>>>> > > of you so you can report the remaining work as 1 /
>>>> numberOfInitialSplits *
>>>> > > numOutstandngMessages
>>>> > > *2) Use a pull model for messages (e.g. request N messages and
>>>> output them
>>>> > > all). This prevents an issue where the client library instances
>>>> effectively
>>>> > > are holding onto unprocessed messages while the bundle isn't being
>>>> > > processed.*
>>>> > > 3) Only support checkpointing in the RestrictionTracker (adding
>>>> support
>>>> > > for dynamic splitting would be great but no runner would exercise
>>>> it right
>>>> > > now in a streaming pipeline)
>>>> > >
>>>> > > It looks like the above would work for both the multi-partition and
>>>> single
>>>> > > partition scenarios and still could parallelize to the capacity of
>>>> what the
>>>> > > brokers could handle. Note that in the future you could still have
>>>> a single
>>>> > > SDF implementation that handles two types of restrictions one being
>>>> the
>>>> > > Consumer based one and the other being the Reader based one (See
>>>> > > Watch.java[1] for a growing and nongrowing restriction for what I
>>>> mean by
>>>> > > having different branching logic). In the future you would update
>>>> the
>>>> > > initial splitting logic to check whether the broker has a single
>>>> partition
>>>> > > and then you could create "Reader" restrictions but this would only
>>>> be
>>>> > > useful if you felt as though there was something to be gained from
>>>> using
>>>> > > it. For the Reader based interface:
>>>> > > 4) Do you expect the user to supply the message id for the first
>>>> message?
>>>> > > (if so is there a way to partition the message id space? (e.g. in
>>>> Kafka the
>>>> > > id is a number that increments and you know where you are and can
>>>> poll for
>>>> > > the latest id so you can split the numerical range easily))
>>>> > > 5) What value do you see it providing?
>>>> > >
>>>> > > 1:
>>>> > >
>>>> https://github.com/apache/beam/blob/03a1cca42ceeec2e963ec14c9bc344956a8683b3/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java#L885
>>>> > >
>>>> > > On Tue, Aug 3, 2021 at 1:17 PM Marco Robles Pulido <
>>>> > > marco.robles@wizeline.com> wrote:
>>>> > >
>>>> > >> Hi folks,
>>>> > >>
>>>> > >> I am working with the new PulsarIO connector with Beam, and most
>>>> of my
>>>> > >> work has been in researching how Pulsar works, as many of you know
>>>> we
>>>> > >> already have KafkaIO connector which is kind of similar to Pulsar
>>>> but there
>>>> > >> is some difference that I have found during my research and I
>>>> would like to
>>>> > >> know your input in how would you handle the implementation for
>>>> SDF. Here
>>>> > >> are my main concerns:
>>>> > >> - As you may know kafka handles by default partitioned topics
>>>> where each
>>>> > >> message within the partition gets an incremental id, called
>>>> offset. Having
>>>> > >> this in mind SDF implementation for kafka works something like
>>>> this, where
>>>> > >> the element to evaluate is the topic/partition and the
>>>> restrictions are the
>>>> > >> start and end offsets.
>>>> > >> - For Pulsar, partitioned topics are optional
>>>> > >> <
>>>> https://pulsar.apache.org/docs/en/concepts-messaging/#partitioned-topics>
>>>> or
>>>> > >> well by default are handled by single broker, there is a
>>>> possibility where
>>>> > >> you can use the partitioned topics, but you will limit the final
>>>> user to
>>>> > >> use only partitioned topics with pulsar, as well, there is a
>>>> possibility
>>>> > >> to manually handle cursors
>>>> > >> <
>>>> https://pulsar.apache.org/docs/en/2.5.1/concepts-clients/#reader-interface
>>>> >
>>>> > >> which will be the earliest and latest message available that may
>>>> be used as
>>>> > >> restrictions (but implementing this will not allow to use
>>>> partitioned
>>>> > >> topics). So with this in mind I was thinking there should be two
>>>> > >> implementations one that use partitioned topics with pulsar and
>>>> the other
>>>> > >> one that manually handle cursors.
>>>> > >>
>>>> > >> So, let me know your ideas/input about it. And maybe If i am wrong
>>>> help
>>>> > >> to clarify the SDF restrictions for KafkaIO.
>>>> > >>
>>>> > >> Thanks,
>>>> > >>
>>>> > >> --
>>>> > >>
>>>> > >> *Marco Robles* *|* WIZELINE
>>>> > >>
>>>> > >> Software Engineer
>>>> > >>
>>>> > >> marco.robles@wizeline.com
>>>> > >>
>>>> > >> Amado Nervo 2200, Esfera P6, Col. Ciudad del Sol, 45050 Zapopan,
>>>> Jal.
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >> *This email and its contents (including any attachments) are being
>>>> sent
>>>> > >> toyou on the condition of confidentiality and may be protected by
>>>> > >> legalprivilege. Access to this email by anyone other than the
>>>> intended
>>>> > >> recipientis unauthorized. If you are not the intended recipient,
>>>> please
>>>> > >> immediatelynotify the sender by replying to this message and
>>>> delete the
>>>> > >> materialimmediately from your system. Any further use,
>>>> dissemination,
>>>> > >> distributionor reproduction of this email is strictly prohibited.
>>>> Further,
>>>> > >> norepresentation is made with respect to any content contained in
>>>> this
>>>> > >> email.*
>>>> > >
>>>> > >
>>>> >
>>>> > --
>>>> >
>>>> > *Marco Robles* *|* WIZELINE
>>>> >
>>>> > Software Engineer
>>>> >
>>>> > marco.robles@wizeline.com
>>>> >
>>>> > Amado Nervo 2200, Esfera P6, Col. Ciudad del Sol, 45050 Zapopan, Jal.
>>>> >
>>>> > --
>>>> > *This email and its contents (including any attachments) are being
>>>> sent to
>>>> > you on the condition of confidentiality and may be protected by legal
>>>> > privilege. Access to this email by anyone other than the intended
>>>> recipient
>>>> > is unauthorized. If you are not the intended recipient, please
>>>> immediately
>>>> > notify the sender by replying to this message and delete the material
>>>> > immediately from your system. Any further use, dissemination,
>>>> distribution
>>>> > or reproduction of this email is strictly prohibited. Further, no
>>>> > representation is made with respect to any content contained in this
>>>> email.*
>>>> >
>>>>
>>>
>>>
>>> --
>>>
>>> *Marco Robles* *|* WIZELINE
>>>
>>> Software Engineer
>>>
>>> marco.robles@wizeline.com
>>>
>>> Amado Nervo 2200, Esfera P6, Col. Ciudad del Sol, 45050 Zapopan, Jal.
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> *This email and its contents (including any attachments) are being sent
>>> toyou on the condition of confidentiality and may be protected by
>>> legalprivilege. Access to this email by anyone other than the intended
>>> recipientis unauthorized. If you are not the intended recipient, please
>>> immediatelynotify the sender by replying to this message and delete the
>>> materialimmediately from your system. Any further use, dissemination,
>>> distributionor reproduction of this email is strictly prohibited. Further,
>>> norepresentation is made with respect to any content contained in this
>>> email.*
>>
>>
>
> --
>
> *Marco Robles* *|* WIZELINE
>
> Software Engineer
>
> marco.robles@wizeline.com
>
> Amado Nervo 2200, Esfera P6, Col. Ciudad del Sol, 45050 Zapopan, Jal.
>
>
>
>
>
>
>
>
> *This email and its contents (including any attachments) are being sent
> toyou on the condition of confidentiality and may be protected by
> legalprivilege. Access to this email by anyone other than the intended
> recipientis unauthorized. If you are not the intended recipient, please
> immediatelynotify the sender by replying to this message and delete the
> materialimmediately from your system. Any further use, dissemination,
> distributionor reproduction of this email is strictly prohibited. Further,
> norepresentation is made with respect to any content contained in this
> email.*

Re: Help/Inputs with PulsarIO connector

Posted by Marco Robles <ma...@wizeline.com>.
Hi,

Thanks for the proposal, I will take a look at it!

I was following the next approach, I would like to know your input on it

I found the following util class that helps to convert MessageId into
numerical offset, MessageIdUtils
<https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/util/MessageIdUtils.java>
,
and I am using it to get the offset restriction for SDF. So for each new
restriction, we get the last message id committed and from there we do the
next restriction, which will be something like [currentOffset,
latestMessageIdCommitedOffset).

Here is an example <https://github.com/apache/beam/pull/15572> of the
initial approach, here is more detail about the initial approach.

https://github.com/apache/beam/pull/15572

Thanks in advance

On Tue, Sep 28, 2021 at 3:48 PM Luke Cwik <lc...@google.com> wrote:

> My second proposal wasn't about each split representing a fixed number of
> messages, it was just that you pull the messages and output them and then
> ack them when the output has been durably committed (e.g. in a
> BundleFinalizer[1] if duplicates are OK or in a downstream transform
> separated by a Reshuffle assuming you can ack messages in a different
> worker from the one that did the reading). This is very similar to that the
> GCP Pubsub implementation does (note this is implemented as
> UnboundedSource).
>
> Thanks for the details about using the Reader API to enumerate single
> partitions from a partitioned topic as that would make it easy to build
> something that doesn't produce duplicates all the time and would allow for
> great parallelization. The Reader API allows you to seek to latest[2] and
> also to seek to a specific message timestamp[3]. So you could create two
> types of restrictions, "read from timestamp X" and "read timestamp range
> [Y, Z)". The idea is that you always have one restriction covering "read
> from timestamp X" and as it makes progress it updates what X is whenever a
> process continuation resume is returned or a split at fraction of remainder
> of 0 is done. When a split request comes in with a non zero fraction of
> remainder, you can find the latest message id and its timestamp Z and split
> the range using the fraction between X and Z into a "read range [X, Y)" and
> "read from Y".
>
> Unless the seek via timestamp is prohibitively expensive, I would highly
> recommend using the reader API as this would mirror what is happening in
> Kafka and would allow for really good parallelization.
>
> 1:
> https://github.com/apache/beam/blob/5e7e66f116da5b947cb271ccaeb9a6a15fb1d6f6/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L1359
> 2:
> https://pulsar.incubator.apache.org/api/client/2.8.0-SNAPSHOT/org/apache/pulsar/client/api/MessageId.html#latest
> 3:
> https://pulsar.incubator.apache.org/api/client/2.8.0-SNAPSHOT/org/apache/pulsar/client/api/Reader.html#seek-long-
>
>
> On Mon, Sep 20, 2021 at 8:06 AM Marco Robles <ma...@wizeline.com>
> wrote:
>
>> Hi,
>>
>> Yes, It is correct.
>>
>> Thanks
>>
>> On Sun, Sep 19, 2021 at 11:34 PM Matteo Merli <mm...@apache.org> wrote:
>>
>>> Hi,
>>>
>>> I just wanted to clarify the behavior of readers on partitioned topics
>>> in Pulsar.
>>>
>>> You have 2 main ways of consuming messages from pulsar topics:
>>>   1. Consumers -> cursor is managed by the system, based on acks. (still
>>> allows for seek() operations)
>>>   2. Readers -> reading position is managed by application (eg: by
>>> storing message ids into a state checkpoint).
>>>
>>> Consumers are automatically handling partitions, while readers are meant
>>> to work at the individual partition level.
>>>
>>> When using readers, it's definitely possible to use them on partitioned
>>> topics, just by creating 1 reader per partition. There is an easy way to
>>> discover the list of partitions:
>>>
>>> List<String> partitions =
>>> pulsarClient.getPartitionsForTopic("my-topic").join();
>>> for (String p : partitions) {
>>>     Reader<byte[]> reader = pulsarClient.newReader()
>>>                   .topic(p)
>>>                   .startMessageId(....)
>>>                   .create();
>>>
>>>     ///...
>>> }
>>>
>>> Matteo
>>>
>>> On 2021/09/17 20:14:07, Marco Robles <ma...@wizeline.com> wrote:
>>> > Hi,
>>> >
>>> > I am dealing with some blockers during the PulsarIO SDF implementation,
>>> > checking back the comments you mentioned before. What do you mean with
>>> the
>>> > Second idea of using a pull model for messages, request N messages and
>>> > output them all, will it be something like I fetched N messages,
>>> processed
>>> > them, and the next iteration or split will be the same amount of N
>>> messages
>>> > to process, so the N will be a fixed number (let's say 100), so each
>>> split
>>> > will be splitting into (0, 100], (101, 200] ... and so on until it
>>> > finished? Do I get it wrong?
>>> >
>>> > Thanks in advance.
>>> >
>>> > On Wed, Aug 4, 2021 at 11:02 AM Luke Cwik <lc...@google.com> wrote:
>>> >
>>> > > Your research into the SDF Kafka implementation seems spot on.
>>> > >
>>> > > I took a quick look at the links you had provided and for partitioned
>>> > > topics it looks like you don't have a choice where a Consumer is
>>> able to
>>> > > resume from as you have a typical get message and ack scheme client.
>>> In
>>> > > this kind of setup for an initial implementation it is best if you
>>> can:
>>> > > 1) Occasionally poll to see how many messages are still in the queue
>>> ahead
>>> > > of you so you can report the remaining work as 1 /
>>> numberOfInitialSplits *
>>> > > numOutstandngMessages
>>> > > *2) Use a pull model for messages (e.g. request N messages and
>>> output them
>>> > > all). This prevents an issue where the client library instances
>>> effectively
>>> > > are holding onto unprocessed messages while the bundle isn't being
>>> > > processed.*
>>> > > 3) Only support checkpointing in the RestrictionTracker (adding
>>> support
>>> > > for dynamic splitting would be great but no runner would exercise it
>>> right
>>> > > now in a streaming pipeline)
>>> > >
>>> > > It looks like the above would work for both the multi-partition and
>>> single
>>> > > partition scenarios and still could parallelize to the capacity of
>>> what the
>>> > > brokers could handle. Note that in the future you could still have a
>>> single
>>> > > SDF implementation that handles two types of restrictions one being
>>> the
>>> > > Consumer based one and the other being the Reader based one (See
>>> > > Watch.java[1] for a growing and nongrowing restriction for what I
>>> mean by
>>> > > having different branching logic). In the future you would update the
>>> > > initial splitting logic to check whether the broker has a single
>>> partition
>>> > > and then you could create "Reader" restrictions but this would only
>>> be
>>> > > useful if you felt as though there was something to be gained from
>>> using
>>> > > it. For the Reader based interface:
>>> > > 4) Do you expect the user to supply the message id for the first
>>> message?
>>> > > (if so is there a way to partition the message id space? (e.g. in
>>> Kafka the
>>> > > id is a number that increments and you know where you are and can
>>> poll for
>>> > > the latest id so you can split the numerical range easily))
>>> > > 5) What value do you see it providing?
>>> > >
>>> > > 1:
>>> > >
>>> https://github.com/apache/beam/blob/03a1cca42ceeec2e963ec14c9bc344956a8683b3/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java#L885
>>> > >
>>> > > On Tue, Aug 3, 2021 at 1:17 PM Marco Robles Pulido <
>>> > > marco.robles@wizeline.com> wrote:
>>> > >
>>> > >> Hi folks,
>>> > >>
>>> > >> I am working with the new PulsarIO connector with Beam, and most of
>>> my
>>> > >> work has been in researching how Pulsar works, as many of you know
>>> we
>>> > >> already have KafkaIO connector which is kind of similar to Pulsar
>>> but there
>>> > >> is some difference that I have found during my research and I would
>>> like to
>>> > >> know your input in how would you handle the implementation for SDF.
>>> Here
>>> > >> are my main concerns:
>>> > >> - As you may know kafka handles by default partitioned topics where
>>> each
>>> > >> message within the partition gets an incremental id, called offset.
>>> Having
>>> > >> this in mind SDF implementation for kafka works something like
>>> this, where
>>> > >> the element to evaluate is the topic/partition and the restrictions
>>> are the
>>> > >> start and end offsets.
>>> > >> - For Pulsar, partitioned topics are optional
>>> > >> <
>>> https://pulsar.apache.org/docs/en/concepts-messaging/#partitioned-topics>
>>> or
>>> > >> well by default are handled by single broker, there is a
>>> possibility where
>>> > >> you can use the partitioned topics, but you will limit the final
>>> user to
>>> > >> use only partitioned topics with pulsar, as well, there is a
>>> possibility
>>> > >> to manually handle cursors
>>> > >> <
>>> https://pulsar.apache.org/docs/en/2.5.1/concepts-clients/#reader-interface
>>> >
>>> > >> which will be the earliest and latest message available that may be
>>> used as
>>> > >> restrictions (but implementing this will not allow to use
>>> partitioned
>>> > >> topics). So with this in mind I was thinking there should be two
>>> > >> implementations one that use partitioned topics with pulsar and the
>>> other
>>> > >> one that manually handle cursors.
>>> > >>
>>> > >> So, let me know your ideas/input about it. And maybe If i am wrong
>>> help
>>> > >> to clarify the SDF restrictions for KafkaIO.
>>> > >>
>>> > >> Thanks,
>>> > >>
>>> > >> --
>>> > >>
>>> > >> *Marco Robles* *|* WIZELINE
>>> > >>
>>> > >> Software Engineer
>>> > >>
>>> > >> marco.robles@wizeline.com
>>> > >>
>>> > >> Amado Nervo 2200, Esfera P6, Col. Ciudad del Sol, 45050 Zapopan,
>>> Jal.
>>> > >>
>>> > >>
>>> > >>
>>> > >>
>>> > >>
>>> > >>
>>> > >>
>>> > >>
>>> > >> *This email and its contents (including any attachments) are being
>>> sent
>>> > >> toyou on the condition of confidentiality and may be protected by
>>> > >> legalprivilege. Access to this email by anyone other than the
>>> intended
>>> > >> recipientis unauthorized. If you are not the intended recipient,
>>> please
>>> > >> immediatelynotify the sender by replying to this message and delete
>>> the
>>> > >> materialimmediately from your system. Any further use,
>>> dissemination,
>>> > >> distributionor reproduction of this email is strictly prohibited.
>>> Further,
>>> > >> norepresentation is made with respect to any content contained in
>>> this
>>> > >> email.*
>>> > >
>>> > >
>>> >
>>> > --
>>> >
>>> > *Marco Robles* *|* WIZELINE
>>> >
>>> > Software Engineer
>>> >
>>> > marco.robles@wizeline.com
>>> >
>>> > Amado Nervo 2200, Esfera P6, Col. Ciudad del Sol, 45050 Zapopan, Jal.
>>> >
>>> > --
>>> > *This email and its contents (including any attachments) are being
>>> sent to
>>> > you on the condition of confidentiality and may be protected by legal
>>> > privilege. Access to this email by anyone other than the intended
>>> recipient
>>> > is unauthorized. If you are not the intended recipient, please
>>> immediately
>>> > notify the sender by replying to this message and delete the material
>>> > immediately from your system. Any further use, dissemination,
>>> distribution
>>> > or reproduction of this email is strictly prohibited. Further, no
>>> > representation is made with respect to any content contained in this
>>> email.*
>>> >
>>>
>>
>>
>> --
>>
>> *Marco Robles* *|* WIZELINE
>>
>> Software Engineer
>>
>> marco.robles@wizeline.com
>>
>> Amado Nervo 2200, Esfera P6, Col. Ciudad del Sol, 45050 Zapopan, Jal.
>>
>>
>>
>>
>>
>>
>>
>>
>> *This email and its contents (including any attachments) are being sent
>> toyou on the condition of confidentiality and may be protected by
>> legalprivilege. Access to this email by anyone other than the intended
>> recipientis unauthorized. If you are not the intended recipient, please
>> immediatelynotify the sender by replying to this message and delete the
>> materialimmediately from your system. Any further use, dissemination,
>> distributionor reproduction of this email is strictly prohibited. Further,
>> norepresentation is made with respect to any content contained in this
>> email.*
>
>

-- 

*Marco Robles* *|* WIZELINE

Software Engineer

marco.robles@wizeline.com

Amado Nervo 2200, Esfera P6, Col. Ciudad del Sol, 45050 Zapopan, Jal.

-- 
*This email and its contents (including any attachments) are being sent to
you on the condition of confidentiality and may be protected by legal
privilege. Access to this email by anyone other than the intended recipient
is unauthorized. If you are not the intended recipient, please immediately
notify the sender by replying to this message and delete the material
immediately from your system. Any further use, dissemination, distribution
or reproduction of this email is strictly prohibited. Further, no
representation is made with respect to any content contained in this email.*

Re: Help/Inputs with PulsarIO connector

Posted by Luke Cwik <lc...@google.com>.
My second proposal wasn't about each split representing a fixed number of
messages, it was just that you pull the messages and output them and then
ack them when the output has been durably committed (e.g. in a
BundleFinalizer[1] if duplicates are OK or in a downstream transform
separated by a Reshuffle assuming you can ack messages in a different
worker from the one that did the reading). This is very similar to that the
GCP Pubsub implementation does (note this is implemented as
UnboundedSource).

Thanks for the details about using the Reader API to enumerate single
partitions from a partitioned topic as that would make it easy to build
something that doesn't produce duplicates all the time and would allow for
great parallelization. The Reader API allows you to seek to latest[2] and
also to seek to a specific message timestamp[3]. So you could create two
types of restrictions, "read from timestamp X" and "read timestamp range
[Y, Z)". The idea is that you always have one restriction covering "read
from timestamp X" and as it makes progress it updates what X is whenever a
process continuation resume is returned or a split at fraction of remainder
of 0 is done. When a split request comes in with a non zero fraction of
remainder, you can find the latest message id and its timestamp Z and split
the range using the fraction between X and Z into a "read range [X, Y)" and
"read from Y".

Unless the seek via timestamp is prohibitively expensive, I would highly
recommend using the reader API as this would mirror what is happening in
Kafka and would allow for really good parallelization.

1:
https://github.com/apache/beam/blob/5e7e66f116da5b947cb271ccaeb9a6a15fb1d6f6/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L1359
2:
https://pulsar.incubator.apache.org/api/client/2.8.0-SNAPSHOT/org/apache/pulsar/client/api/MessageId.html#latest
3:
https://pulsar.incubator.apache.org/api/client/2.8.0-SNAPSHOT/org/apache/pulsar/client/api/Reader.html#seek-long-


On Mon, Sep 20, 2021 at 8:06 AM Marco Robles <ma...@wizeline.com>
wrote:

> Hi,
>
> Yes, It is correct.
>
> Thanks
>
> On Sun, Sep 19, 2021 at 11:34 PM Matteo Merli <mm...@apache.org> wrote:
>
>> Hi,
>>
>> I just wanted to clarify the behavior of readers on partitioned topics in
>> Pulsar.
>>
>> You have 2 main ways of consuming messages from pulsar topics:
>>   1. Consumers -> cursor is managed by the system, based on acks. (still
>> allows for seek() operations)
>>   2. Readers -> reading position is managed by application (eg: by
>> storing message ids into a state checkpoint).
>>
>> Consumers are automatically handling partitions, while readers are meant
>> to work at the individual partition level.
>>
>> When using readers, it's definitely possible to use them on partitioned
>> topics, just by creating 1 reader per partition. There is an easy way to
>> discover the list of partitions:
>>
>> List<String> partitions =
>> pulsarClient.getPartitionsForTopic("my-topic").join();
>> for (String p : partitions) {
>>     Reader<byte[]> reader = pulsarClient.newReader()
>>                   .topic(p)
>>                   .startMessageId(....)
>>                   .create();
>>
>>     ///...
>> }
>>
>> Matteo
>>
>> On 2021/09/17 20:14:07, Marco Robles <ma...@wizeline.com> wrote:
>> > Hi,
>> >
>> > I am dealing with some blockers during the PulsarIO SDF implementation,
>> > checking back the comments you mentioned before. What do you mean with
>> the
>> > Second idea of using a pull model for messages, request N messages and
>> > output them all, will it be something like I fetched N messages,
>> processed
>> > them, and the next iteration or split will be the same amount of N
>> messages
>> > to process, so the N will be a fixed number (let's say 100), so each
>> split
>> > will be splitting into (0, 100], (101, 200] ... and so on until it
>> > finished? Do I get it wrong?
>> >
>> > Thanks in advance.
>> >
>> > On Wed, Aug 4, 2021 at 11:02 AM Luke Cwik <lc...@google.com> wrote:
>> >
>> > > Your research into the SDF Kafka implementation seems spot on.
>> > >
>> > > I took a quick look at the links you had provided and for partitioned
>> > > topics it looks like you don't have a choice where a Consumer is able
>> to
>> > > resume from as you have a typical get message and ack scheme client.
>> In
>> > > this kind of setup for an initial implementation it is best if you
>> can:
>> > > 1) Occasionally poll to see how many messages are still in the queue
>> ahead
>> > > of you so you can report the remaining work as 1 /
>> numberOfInitialSplits *
>> > > numOutstandngMessages
>> > > *2) Use a pull model for messages (e.g. request N messages and output
>> them
>> > > all). This prevents an issue where the client library instances
>> effectively
>> > > are holding onto unprocessed messages while the bundle isn't being
>> > > processed.*
>> > > 3) Only support checkpointing in the RestrictionTracker (adding
>> support
>> > > for dynamic splitting would be great but no runner would exercise it
>> right
>> > > now in a streaming pipeline)
>> > >
>> > > It looks like the above would work for both the multi-partition and
>> single
>> > > partition scenarios and still could parallelize to the capacity of
>> what the
>> > > brokers could handle. Note that in the future you could still have a
>> single
>> > > SDF implementation that handles two types of restrictions one being
>> the
>> > > Consumer based one and the other being the Reader based one (See
>> > > Watch.java[1] for a growing and nongrowing restriction for what I
>> mean by
>> > > having different branching logic). In the future you would update the
>> > > initial splitting logic to check whether the broker has a single
>> partition
>> > > and then you could create "Reader" restrictions but this would only be
>> > > useful if you felt as though there was something to be gained from
>> using
>> > > it. For the Reader based interface:
>> > > 4) Do you expect the user to supply the message id for the first
>> message?
>> > > (if so is there a way to partition the message id space? (e.g. in
>> Kafka the
>> > > id is a number that increments and you know where you are and can
>> poll for
>> > > the latest id so you can split the numerical range easily))
>> > > 5) What value do you see it providing?
>> > >
>> > > 1:
>> > >
>> https://github.com/apache/beam/blob/03a1cca42ceeec2e963ec14c9bc344956a8683b3/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java#L885
>> > >
>> > > On Tue, Aug 3, 2021 at 1:17 PM Marco Robles Pulido <
>> > > marco.robles@wizeline.com> wrote:
>> > >
>> > >> Hi folks,
>> > >>
>> > >> I am working with the new PulsarIO connector with Beam, and most of
>> my
>> > >> work has been in researching how Pulsar works, as many of you know we
>> > >> already have KafkaIO connector which is kind of similar to Pulsar
>> but there
>> > >> is some difference that I have found during my research and I would
>> like to
>> > >> know your input in how would you handle the implementation for SDF.
>> Here
>> > >> are my main concerns:
>> > >> - As you may know kafka handles by default partitioned topics where
>> each
>> > >> message within the partition gets an incremental id, called offset.
>> Having
>> > >> this in mind SDF implementation for kafka works something like this,
>> where
>> > >> the element to evaluate is the topic/partition and the restrictions
>> are the
>> > >> start and end offsets.
>> > >> - For Pulsar, partitioned topics are optional
>> > >> <
>> https://pulsar.apache.org/docs/en/concepts-messaging/#partitioned-topics>
>> or
>> > >> well by default are handled by single broker, there is a possibility
>> where
>> > >> you can use the partitioned topics, but you will limit the final
>> user to
>> > >> use only partitioned topics with pulsar, as well, there is a
>> possibility
>> > >> to manually handle cursors
>> > >> <
>> https://pulsar.apache.org/docs/en/2.5.1/concepts-clients/#reader-interface
>> >
>> > >> which will be the earliest and latest message available that may be
>> used as
>> > >> restrictions (but implementing this will not allow to use partitioned
>> > >> topics). So with this in mind I was thinking there should be two
>> > >> implementations one that use partitioned topics with pulsar and the
>> other
>> > >> one that manually handle cursors.
>> > >>
>> > >> So, let me know your ideas/input about it. And maybe If i am wrong
>> help
>> > >> to clarify the SDF restrictions for KafkaIO.
>> > >>
>> > >> Thanks,
>> > >>
>> > >> --
>> > >>
>> > >> *Marco Robles* *|* WIZELINE
>> > >>
>> > >> Software Engineer
>> > >>
>> > >> marco.robles@wizeline.com
>> > >>
>> > >> Amado Nervo 2200, Esfera P6, Col. Ciudad del Sol, 45050 Zapopan, Jal.
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >> *This email and its contents (including any attachments) are being
>> sent
>> > >> toyou on the condition of confidentiality and may be protected by
>> > >> legalprivilege. Access to this email by anyone other than the
>> intended
>> > >> recipientis unauthorized. If you are not the intended recipient,
>> please
>> > >> immediatelynotify the sender by replying to this message and delete
>> the
>> > >> materialimmediately from your system. Any further use, dissemination,
>> > >> distributionor reproduction of this email is strictly prohibited.
>> Further,
>> > >> norepresentation is made with respect to any content contained in
>> this
>> > >> email.*
>> > >
>> > >
>> >
>> > --
>> >
>> > *Marco Robles* *|* WIZELINE
>> >
>> > Software Engineer
>> >
>> > marco.robles@wizeline.com
>> >
>> > Amado Nervo 2200, Esfera P6, Col. Ciudad del Sol, 45050 Zapopan, Jal.
>> >
>> > --
>> > *This email and its contents (including any attachments) are being sent
>> to
>> > you on the condition of confidentiality and may be protected by legal
>> > privilege. Access to this email by anyone other than the intended
>> recipient
>> > is unauthorized. If you are not the intended recipient, please
>> immediately
>> > notify the sender by replying to this message and delete the material
>> > immediately from your system. Any further use, dissemination,
>> distribution
>> > or reproduction of this email is strictly prohibited. Further, no
>> > representation is made with respect to any content contained in this
>> email.*
>> >
>>
>
>
> --
>
> *Marco Robles* *|* WIZELINE
>
> Software Engineer
>
> marco.robles@wizeline.com
>
> Amado Nervo 2200, Esfera P6, Col. Ciudad del Sol, 45050 Zapopan, Jal.
>
>
>
>
>
>
>
>
> *This email and its contents (including any attachments) are being sent
> toyou on the condition of confidentiality and may be protected by
> legalprivilege. Access to this email by anyone other than the intended
> recipientis unauthorized. If you are not the intended recipient, please
> immediatelynotify the sender by replying to this message and delete the
> materialimmediately from your system. Any further use, dissemination,
> distributionor reproduction of this email is strictly prohibited. Further,
> norepresentation is made with respect to any content contained in this
> email.*

Re: Help/Inputs with PulsarIO connector

Posted by Marco Robles <ma...@wizeline.com>.
Hi,

Yes, It is correct.

Thanks

On Sun, Sep 19, 2021 at 11:34 PM Matteo Merli <mm...@apache.org> wrote:

> Hi,
>
> I just wanted to clarify the behavior of readers on partitioned topics in
> Pulsar.
>
> You have 2 main ways of consuming messages from pulsar topics:
>   1. Consumers -> cursor is managed by the system, based on acks. (still
> allows for seek() operations)
>   2. Readers -> reading position is managed by application (eg: by storing
> message ids into a state checkpoint).
>
> Consumers are automatically handling partitions, while readers are meant
> to work at the individual partition level.
>
> When using readers, it's definitely possible to use them on partitioned
> topics, just by creating 1 reader per partition. There is an easy way to
> discover the list of partitions:
>
> List<String> partitions =
> pulsarClient.getPartitionsForTopic("my-topic").join();
> for (String p : partitions) {
>     Reader<byte[]> reader = pulsarClient.newReader()
>                   .topic(p)
>                   .startMessageId(....)
>                   .create();
>
>     ///...
> }
>
> Matteo
>
> On 2021/09/17 20:14:07, Marco Robles <ma...@wizeline.com> wrote:
> > Hi,
> >
> > I am dealing with some blockers during the PulsarIO SDF implementation,
> > checking back the comments you mentioned before. What do you mean with
> the
> > Second idea of using a pull model for messages, request N messages and
> > output them all, will it be something like I fetched N messages,
> processed
> > them, and the next iteration or split will be the same amount of N
> messages
> > to process, so the N will be a fixed number (let's say 100), so each
> split
> > will be splitting into (0, 100], (101, 200] ... and so on until it
> > finished? Do I get it wrong?
> >
> > Thanks in advance.
> >
> > On Wed, Aug 4, 2021 at 11:02 AM Luke Cwik <lc...@google.com> wrote:
> >
> > > Your research into the SDF Kafka implementation seems spot on.
> > >
> > > I took a quick look at the links you had provided and for partitioned
> > > topics it looks like you don't have a choice where a Consumer is able
> to
> > > resume from as you have a typical get message and ack scheme client. In
> > > this kind of setup for an initial implementation it is best if you can:
> > > 1) Occasionally poll to see how many messages are still in the queue
> ahead
> > > of you so you can report the remaining work as 1 /
> numberOfInitialSplits *
> > > numOutstandngMessages
> > > *2) Use a pull model for messages (e.g. request N messages and output
> them
> > > all). This prevents an issue where the client library instances
> effectively
> > > are holding onto unprocessed messages while the bundle isn't being
> > > processed.*
> > > 3) Only support checkpointing in the RestrictionTracker (adding support
> > > for dynamic splitting would be great but no runner would exercise it
> right
> > > now in a streaming pipeline)
> > >
> > > It looks like the above would work for both the multi-partition and
> single
> > > partition scenarios and still could parallelize to the capacity of
> what the
> > > brokers could handle. Note that in the future you could still have a
> single
> > > SDF implementation that handles two types of restrictions one being the
> > > Consumer based one and the other being the Reader based one (See
> > > Watch.java[1] for a growing and nongrowing restriction for what I mean
> by
> > > having different branching logic). In the future you would update the
> > > initial splitting logic to check whether the broker has a single
> partition
> > > and then you could create "Reader" restrictions but this would only be
> > > useful if you felt as though there was something to be gained from
> using
> > > it. For the Reader based interface:
> > > 4) Do you expect the user to supply the message id for the first
> message?
> > > (if so is there a way to partition the message id space? (e.g. in
> Kafka the
> > > id is a number that increments and you know where you are and can poll
> for
> > > the latest id so you can split the numerical range easily))
> > > 5) What value do you see it providing?
> > >
> > > 1:
> > >
> https://github.com/apache/beam/blob/03a1cca42ceeec2e963ec14c9bc344956a8683b3/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java#L885
> > >
> > > On Tue, Aug 3, 2021 at 1:17 PM Marco Robles Pulido <
> > > marco.robles@wizeline.com> wrote:
> > >
> > >> Hi folks,
> > >>
> > >> I am working with the new PulsarIO connector with Beam, and most of my
> > >> work has been in researching how Pulsar works, as many of you know we
> > >> already have KafkaIO connector which is kind of similar to Pulsar but
> there
> > >> is some difference that I have found during my research and I would
> like to
> > >> know your input in how would you handle the implementation for SDF.
> Here
> > >> are my main concerns:
> > >> - As you may know kafka handles by default partitioned topics where
> each
> > >> message within the partition gets an incremental id, called offset.
> Having
> > >> this in mind SDF implementation for kafka works something like this,
> where
> > >> the element to evaluate is the topic/partition and the restrictions
> are the
> > >> start and end offsets.
> > >> - For Pulsar, partitioned topics are optional
> > >> <
> https://pulsar.apache.org/docs/en/concepts-messaging/#partitioned-topics>
> or
> > >> well by default are handled by single broker, there is a possibility
> where
> > >> you can use the partitioned topics, but you will limit the final user
> to
> > >> use only partitioned topics with pulsar, as well, there is a
> possibility
> > >> to manually handle cursors
> > >> <
> https://pulsar.apache.org/docs/en/2.5.1/concepts-clients/#reader-interface
> >
> > >> which will be the earliest and latest message available that may be
> used as
> > >> restrictions (but implementing this will not allow to use partitioned
> > >> topics). So with this in mind I was thinking there should be two
> > >> implementations one that use partitioned topics with pulsar and the
> other
> > >> one that manually handle cursors.
> > >>
> > >> So, let me know your ideas/input about it. And maybe If i am wrong
> help
> > >> to clarify the SDF restrictions for KafkaIO.
> > >>
> > >> Thanks,
> > >>
> > >> --
> > >>
> > >> *Marco Robles* *|* WIZELINE
> > >>
> > >> Software Engineer
> > >>
> > >> marco.robles@wizeline.com
> > >>
> > >> Amado Nervo 2200, Esfera P6, Col. Ciudad del Sol, 45050 Zapopan, Jal.
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >> *This email and its contents (including any attachments) are being
> sent
> > >> toyou on the condition of confidentiality and may be protected by
> > >> legalprivilege. Access to this email by anyone other than the intended
> > >> recipientis unauthorized. If you are not the intended recipient,
> please
> > >> immediatelynotify the sender by replying to this message and delete
> the
> > >> materialimmediately from your system. Any further use, dissemination,
> > >> distributionor reproduction of this email is strictly prohibited.
> Further,
> > >> norepresentation is made with respect to any content contained in this
> > >> email.*
> > >
> > >
> >
> > --
> >
> > *Marco Robles* *|* WIZELINE
> >
> > Software Engineer
> >
> > marco.robles@wizeline.com
> >
> > Amado Nervo 2200, Esfera P6, Col. Ciudad del Sol, 45050 Zapopan, Jal.
> >
> > --
> > *This email and its contents (including any attachments) are being sent
> to
> > you on the condition of confidentiality and may be protected by legal
> > privilege. Access to this email by anyone other than the intended
> recipient
> > is unauthorized. If you are not the intended recipient, please
> immediately
> > notify the sender by replying to this message and delete the material
> > immediately from your system. Any further use, dissemination,
> distribution
> > or reproduction of this email is strictly prohibited. Further, no
> > representation is made with respect to any content contained in this
> email.*
> >
>


-- 

*Marco Robles* *|* WIZELINE

Software Engineer

marco.robles@wizeline.com

Amado Nervo 2200, Esfera P6, Col. Ciudad del Sol, 45050 Zapopan, Jal.

-- 
*This email and its contents (including any attachments) are being sent to
you on the condition of confidentiality and may be protected by legal
privilege. Access to this email by anyone other than the intended recipient
is unauthorized. If you are not the intended recipient, please immediately
notify the sender by replying to this message and delete the material
immediately from your system. Any further use, dissemination, distribution
or reproduction of this email is strictly prohibited. Further, no
representation is made with respect to any content contained in this email.*

Re: Help/Inputs with PulsarIO connector

Posted by Matteo Merli <mm...@apache.org>.
Hi, 

I just wanted to clarify the behavior of readers on partitioned topics in Pulsar. 

You have 2 main ways of consuming messages from pulsar topics: 
  1. Consumers -> cursor is managed by the system, based on acks. (still allows for seek() operations)
  2. Readers -> reading position is managed by application (eg: by storing message ids into a state checkpoint).

Consumers are automatically handling partitions, while readers are meant to work at the individual partition level.

When using readers, it's definitely possible to use them on partitioned topics, just by creating 1 reader per partition. There is an easy way to discover the list of partitions: 

List<String> partitions = pulsarClient.getPartitionsForTopic("my-topic").join();
for (String p : partitions) {
    Reader<byte[]> reader = pulsarClient.newReader()
                  .topic(p)
                  .startMessageId(....)
                  .create();

    ///... 
}

Matteo

On 2021/09/17 20:14:07, Marco Robles <ma...@wizeline.com> wrote: 
> Hi,
> 
> I am dealing with some blockers during the PulsarIO SDF implementation,
> checking back the comments you mentioned before. What do you mean with the
> Second idea of using a pull model for messages, request N messages and
> output them all, will it be something like I fetched N messages, processed
> them, and the next iteration or split will be the same amount of N messages
> to process, so the N will be a fixed number (let's say 100), so each split
> will be splitting into (0, 100], (101, 200] ... and so on until it
> finished? Do I get it wrong?
> 
> Thanks in advance.
> 
> On Wed, Aug 4, 2021 at 11:02 AM Luke Cwik <lc...@google.com> wrote:
> 
> > Your research into the SDF Kafka implementation seems spot on.
> >
> > I took a quick look at the links you had provided and for partitioned
> > topics it looks like you don't have a choice where a Consumer is able to
> > resume from as you have a typical get message and ack scheme client. In
> > this kind of setup for an initial implementation it is best if you can:
> > 1) Occasionally poll to see how many messages are still in the queue ahead
> > of you so you can report the remaining work as 1 / numberOfInitialSplits *
> > numOutstandngMessages
> > *2) Use a pull model for messages (e.g. request N messages and output them
> > all). This prevents an issue where the client library instances effectively
> > are holding onto unprocessed messages while the bundle isn't being
> > processed.*
> > 3) Only support checkpointing in the RestrictionTracker (adding support
> > for dynamic splitting would be great but no runner would exercise it right
> > now in a streaming pipeline)
> >
> > It looks like the above would work for both the multi-partition and single
> > partition scenarios and still could parallelize to the capacity of what the
> > brokers could handle. Note that in the future you could still have a single
> > SDF implementation that handles two types of restrictions one being the
> > Consumer based one and the other being the Reader based one (See
> > Watch.java[1] for a growing and nongrowing restriction for what I mean by
> > having different branching logic). In the future you would update the
> > initial splitting logic to check whether the broker has a single partition
> > and then you could create "Reader" restrictions but this would only be
> > useful if you felt as though there was something to be gained from using
> > it. For the Reader based interface:
> > 4) Do you expect the user to supply the message id for the first message?
> > (if so is there a way to partition the message id space? (e.g. in Kafka the
> > id is a number that increments and you know where you are and can poll for
> > the latest id so you can split the numerical range easily))
> > 5) What value do you see it providing?
> >
> > 1:
> > https://github.com/apache/beam/blob/03a1cca42ceeec2e963ec14c9bc344956a8683b3/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java#L885
> >
> > On Tue, Aug 3, 2021 at 1:17 PM Marco Robles Pulido <
> > marco.robles@wizeline.com> wrote:
> >
> >> Hi folks,
> >>
> >> I am working with the new PulsarIO connector with Beam, and most of my
> >> work has been in researching how Pulsar works, as many of you know we
> >> already have KafkaIO connector which is kind of similar to Pulsar but there
> >> is some difference that I have found during my research and I would like to
> >> know your input in how would you handle the implementation for SDF. Here
> >> are my main concerns:
> >> - As you may know kafka handles by default partitioned topics where each
> >> message within the partition gets an incremental id, called offset. Having
> >> this in mind SDF implementation for kafka works something like this, where
> >> the element to evaluate is the topic/partition and the restrictions are the
> >> start and end offsets.
> >> - For Pulsar, partitioned topics are optional
> >> <https://pulsar.apache.org/docs/en/concepts-messaging/#partitioned-topics> or
> >> well by default are handled by single broker, there is a possibility where
> >> you can use the partitioned topics, but you will limit the final user to
> >> use only partitioned topics with pulsar, as well, there is a possibility
> >> to manually handle cursors
> >> <https://pulsar.apache.org/docs/en/2.5.1/concepts-clients/#reader-interface>
> >> which will be the earliest and latest message available that may be used as
> >> restrictions (but implementing this will not allow to use partitioned
> >> topics). So with this in mind I was thinking there should be two
> >> implementations one that use partitioned topics with pulsar and the other
> >> one that manually handle cursors.
> >>
> >> So, let me know your ideas/input about it. And maybe If i am wrong help
> >> to clarify the SDF restrictions for KafkaIO.
> >>
> >> Thanks,
> >>
> >> --
> >>
> >> *Marco Robles* *|* WIZELINE
> >>
> >> Software Engineer
> >>
> >> marco.robles@wizeline.com
> >>
> >> Amado Nervo 2200, Esfera P6, Col. Ciudad del Sol, 45050 Zapopan, Jal.
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> *This email and its contents (including any attachments) are being sent
> >> toyou on the condition of confidentiality and may be protected by
> >> legalprivilege. Access to this email by anyone other than the intended
> >> recipientis unauthorized. If you are not the intended recipient, please
> >> immediatelynotify the sender by replying to this message and delete the
> >> materialimmediately from your system. Any further use, dissemination,
> >> distributionor reproduction of this email is strictly prohibited. Further,
> >> norepresentation is made with respect to any content contained in this
> >> email.*
> >
> >
> 
> -- 
> 
> *Marco Robles* *|* WIZELINE
> 
> Software Engineer
> 
> marco.robles@wizeline.com
> 
> Amado Nervo 2200, Esfera P6, Col. Ciudad del Sol, 45050 Zapopan, Jal.
> 
> -- 
> *This email and its contents (including any attachments) are being sent to
> you on the condition of confidentiality and may be protected by legal
> privilege. Access to this email by anyone other than the intended recipient
> is unauthorized. If you are not the intended recipient, please immediately
> notify the sender by replying to this message and delete the material
> immediately from your system. Any further use, dissemination, distribution
> or reproduction of this email is strictly prohibited. Further, no
> representation is made with respect to any content contained in this email.*
>