You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@pulsar.apache.org by Niclas Hedhman <ni...@hedhman.org> on 2023/01/24 08:03:55 UTC

Topic as a buffer?

Hi,
I am trying to set up topics to act as "message buffers" to web clients.

I would like the client to be able to re-connect at any time, and have 
the latest N messages (or AFAIUI, some number of Megabytes).

So, I think I need;
1. non-persistent topics
2. very long MessageTTL on Namespace used for this
3. RetentionPolicies with -1 time limit and some number of MB

Client in question is GO.

I am reading (see below) the topic, not subscribing.

Question 1;
When I do 'pulsar.EarliestMessageID()', should I be getting the oldest 
message in the Retention buffer, or the oldest message not acknowledged?

If the latter, will subsequent messages be read out, or will the same 
client be served the same message over and over again?

Question 2;
Will RetentionPolicies ("size"/"time") cause messages to be kept in RAM, 
or can I rely on messages being read from disk, even though they are 
non-persistent?


TIA
Niclas


-o-o-o- Relevant code -o-o-o-
Creating reader;

func (p *PulsarClient) CreateReader(topic string, earliest bool) 
pulsar.Reader {
     var start pulsar.MessageID
     if earliest {
         start = pulsar.EarliestMessageID()
     } else {
         start = pulsar.LatestMessageID()
     }
     reader, err := p.client.CreateReader(pulsar.ReaderOptions{
         Topic:          topic,
         StartMessageID: start,
     })
     if err != nil {
         log.DefaultLogger.Error(fmt.Sprintf("Failed to create Pulsar 
Reader for: %s", topic), err)
     }
     return reader
}


Reading messages;
     reader := 
h.pulsar.CreateReader(model.NotificationTopics+strconv.FormatInt(orgId, 
10), true)
     defer reader.Close()
     for {
         msg, err := reader.Next(ctx)
         if msg == nil {
             log.DefaultLogger.Info("Grafana sender: DONE")
             return ctx.Err()
         }
         if err != nil {
             log.DefaultLogger.Error(fmt.Sprintf("Couldn't get the 
message via reader.Next(): %+v", err))
             continue
         }
         log.DefaultLogger.Info("Sending notification to " + 
req.PluginContext.User.Login)
         err = sender.SendJSON(msg.Payload())
         if err != nil {
             log.DefaultLogger.Error(fmt.Sprintf("Couldn't send frame: 
%v", err))
             return err
         }
     }
}

Re: Topic as a buffer?

Posted by Niclas Hedhman <ni...@hedhman.org>.
On 2023-01-25 13:59, Asaf Mesika wrote:
> If I understand correctly, you don't really need the whole notion of
> acknowledgement (subscription).
> When the web client connects, it needs to receive the last N messages
> (What happens from there after?)

Keep "reading" messages and update UI as they arrive.

> How about using a non-durable subscription - they only live as long as
> the broker is not restarted and the topic has not moved between them
> (load balancer).
> If the subscription vanishes, you just recreate your whole buffer from
> scratch?

> Also take a look at Reader interface:
> https://pulsar.apache.org/docs/2.11.x/concepts-clients/#reader-interface

Ah!! I use Reader... I wasn't clear (to me) that it doesn't use 
acknowledgements. Should have realized that, pretty obvious.


So, thanks... I now have the Pulsar side of the equation working, and 
need to get the mechanics of Grafana Streaming to play along.


Niclas

Re: Topic as a buffer?

Posted by Asaf Mesika <as...@gmail.com>.
If I understand correctly, you don't really need the whole notion of
acknowledgement (subscription).
When the web client connects, it needs to receive the last N messages (What
happens from there after?)

How about using a non-durable subscription - they only live as long as the
broker is not restarted and the topic has not moved between them (load
balancer).
If the subscription vanishes, you just recreate your whole buffer from
scratch?

Also take a look at Reader interface:
https://pulsar.apache.org/docs/2.11.x/concepts-clients/#reader-interface


On Tue, Jan 24, 2023 at 11:47 AM Niclas Hedhman <ni...@hedhman.org> wrote:

> On 2023-01-24 10:29, Niclas Hedhman wrote:
> > What a mess up! I already had it changed to "persistent://" and I
> > misread "non-partitioned" in my server code.
> >
> > So, my topics ARE persistent already, non-partitioned (to save
> > space?). The rest of my original post still applies.
>
> non-partitioned so I can do
>
>      seekError := reader.SeekByTime(hourAgo)
>
> So additional question; Should I try to stick with this approach or rely
> on RetentionPolicies?
>
>
>
>

Re: Topic as a buffer?

Posted by Niclas Hedhman <ni...@hedhman.org>.
On 2023-01-24 10:29, Niclas Hedhman wrote:
> What a mess up! I already had it changed to "persistent://" and I
> misread "non-partitioned" in my server code.
> 
> So, my topics ARE persistent already, non-partitioned (to save
> space?). The rest of my original post still applies.

non-partitioned so I can do

     seekError := reader.SeekByTime(hourAgo)

So additional question; Should I try to stick with this approach or rely 
on RetentionPolicies?




Re: Topic as a buffer?

Posted by Niclas Hedhman <ni...@hedhman.org>.
What a mess up! I already had it changed to "persistent://" and I 
misread "non-partitioned" in my server code.

So, my topics ARE persistent already, non-partitioned (to save space?). 
The rest of my original post still applies.



On 2023-01-24 09:03, Niclas Hedhman wrote:
> Hi,
> I am trying to set up topics to act as "message buffers" to web 
> clients.
> 
> I would like the client to be able to re-connect at any time, and have
> the latest N messages (or AFAIUI, some number of Megabytes).
> 
> So, I think I need;
> 1. non-persistent topics
> 2. very long MessageTTL on Namespace used for this
> 3. RetentionPolicies with -1 time limit and some number of MB
> 
> Client in question is GO.
> 
> I am reading (see below) the topic, not subscribing.
> 
> Question 1;
> When I do 'pulsar.EarliestMessageID()', should I be getting the oldest
> message in the Retention buffer, or the oldest message not
> acknowledged?
> 
> If the latter, will subsequent messages be read out, or will the same
> client be served the same message over and over again?
> 
> Question 2;
> Will RetentionPolicies ("size"/"time") cause messages to be kept in
> RAM, or can I rely on messages being read from disk, even though they
> are non-persistent?
> 
> 
> TIA
> Niclas
> 
> 
> -o-o-o- Relevant code -o-o-o-
> Creating reader;
> 
> func (p *PulsarClient) CreateReader(topic string, earliest bool) 
> pulsar.Reader {
>     var start pulsar.MessageID
>     if earliest {
>         start = pulsar.EarliestMessageID()
>     } else {
>         start = pulsar.LatestMessageID()
>     }
>     reader, err := p.client.CreateReader(pulsar.ReaderOptions{
>         Topic:          topic,
>         StartMessageID: start,
>     })
>     if err != nil {
>         log.DefaultLogger.Error(fmt.Sprintf("Failed to create Pulsar
> Reader for: %s", topic), err)
>     }
>     return reader
> }
> 
> 
> Reading messages;
>     reader :=
> h.pulsar.CreateReader(model.NotificationTopics+strconv.FormatInt(orgId,
> 10), true)
>     defer reader.Close()
>     for {
>         msg, err := reader.Next(ctx)
>         if msg == nil {
>             log.DefaultLogger.Info("Grafana sender: DONE")
>             return ctx.Err()
>         }
>         if err != nil {
>             log.DefaultLogger.Error(fmt.Sprintf("Couldn't get the
> message via reader.Next(): %+v", err))
>             continue
>         }
>         log.DefaultLogger.Info("Sending notification to " +
> req.PluginContext.User.Login)
>         err = sender.SendJSON(msg.Payload())
>         if err != nil {
>             log.DefaultLogger.Error(fmt.Sprintf("Couldn't send frame: 
> %v", err))
>             return err
>         }
>     }
> }

Re: Topic as a buffer?

Posted by Niclas Hedhman <ni...@hedhman.org>.
On 2023-01-24 09:46, Asaf Mesika wrote:
> I would add that it's more volatile that you think:

Hmmm....

> There is no buffer in memory as you would imagine.
> 
> I quote from documentation here [2]:
<snip/>
> Meaning, as soon as the message is received, it is immediately pushed
> to the socket of each connected consumer.
> If your client is a consumer and it is disconnected, once reconnected
> it will start receiving messages from this point on.

So, if not connected, no messages are forwarded. Got it! Explains what I 
am observing and assumed what is a bug on my side.

> Honestly I find this non persistent topic quite confusing.

Agree.

Now I need to find out (again) why I didn't use persistent topics in the 
first place. Back in my head there was something I am forced to use that 
is not supported in persistent topics... Maybe I have some notes on that 
in the commit logs.

Niclas

Re: Topic as a buffer?

Posted by Asaf Mesika <as...@gmail.com>.
I would add that it's more volatile that you think:

There is *no buffer* in memory as you would imagine.

I quote from documentation here
<https://pulsar.apache.org/docs/2.11.x/concepts-messaging/#non-persistent-topics>
:

With non-persistent topics, message data lives only in memory, without a
specific buffer - which means data *is not* buffered in memory. The
received messages are immediately transmitted to all *connected consumers*.
If a message broker fails or message data can otherwise not be retrieved
from memory, your message data may be lost. Use non-persistent topics only
if you're *certain* that your use case requires it and can sustain it.

Meaning, as soon as the message is received, it is immediately pushed to
the socket of each connected consumer.
If your client is a consumer and it is disconnected, once reconnected it
will start receiving messages from this point on.

Honestly I find this non persistent topic quite confusing.


On Tue, Jan 24, 2023 at 10:07 AM Enrico Olivelli <eo...@gmail.com>
wrote:

> Niclas,
>
>
> Il giorno mar 24 gen 2023 alle ore 09:04 Niclas Hedhman
> <ni...@hedhman.org> ha scritto:
> >
> > Hi,
> > I am trying to set up topics to act as "message buffers" to web clients.
> >
> > I would like the client to be able to re-connect at any time, and have
> > the latest N messages (or AFAIUI, some number of Megabytes).
> >
> > So, I think I need;
> > 1. non-persistent topics
> > 2. very long MessageTTL on Namespace used for this
> > 3. RetentionPolicies with -1 time limit and some number of MB
> >
> > Client in question is GO.
> >
> > I am reading (see below) the topic, not subscribing.
> >
> > Question 1;
> > When I do 'pulsar.EarliestMessageID()', should I be getting the oldest
> > message in the Retention buffer, or the oldest message not acknowledged?
> >
> > If the latter, will subsequent messages be read out, or will the same
> > client be served the same message over and over again?
> >
> > Question 2;
> > Will RetentionPolicies ("size"/"time") cause messages to be kept in RAM,
> > or can I rely on messages being read from disk, even though they are
> > non-persistent?
>
>
> A non-persistent topic is volatile: if you restart the broker you lose the
> data
>
>
> Enrico
>
>
> >
> >
> > TIA
> > Niclas
> >
> >
> > -o-o-o- Relevant code -o-o-o-
> > Creating reader;
> >
> > func (p *PulsarClient) CreateReader(topic string, earliest bool)
> > pulsar.Reader {
> >      var start pulsar.MessageID
> >      if earliest {
> >          start = pulsar.EarliestMessageID()
> >      } else {
> >          start = pulsar.LatestMessageID()
> >      }
> >      reader, err := p.client.CreateReader(pulsar.ReaderOptions{
> >          Topic:          topic,
> >          StartMessageID: start,
> >      })
> >      if err != nil {
> >          log.DefaultLogger.Error(fmt.Sprintf("Failed to create Pulsar
> > Reader for: %s", topic), err)
> >      }
> >      return reader
> > }
> >
> >
> > Reading messages;
> >      reader :=
> > h.pulsar.CreateReader(model.NotificationTopics+strconv.FormatInt(orgId,
> > 10), true)
> >      defer reader.Close()
> >      for {
> >          msg, err := reader.Next(ctx)
> >          if msg == nil {
> >              log.DefaultLogger.Info("Grafana sender: DONE")
> >              return ctx.Err()
> >          }
> >          if err != nil {
> >              log.DefaultLogger.Error(fmt.Sprintf("Couldn't get the
> > message via reader.Next(): %+v", err))
> >              continue
> >          }
> >          log.DefaultLogger.Info("Sending notification to " +
> > req.PluginContext.User.Login)
> >          err = sender.SendJSON(msg.Payload())
> >          if err != nil {
> >              log.DefaultLogger.Error(fmt.Sprintf("Couldn't send frame:
> > %v", err))
> >              return err
> >          }
> >      }
> > }
>

Re: Topic as a buffer?

Posted by Niclas Hedhman <ni...@hedhman.org>.
On 2023-01-24 09:06, Enrico Olivelli wrote:
> A non-persistent topic is volatile: if you restart the broker you lose 
> the data

Yes, I know that. But for this particular case, that is fine. This 
buffer is roughly the equivalent of "syslog", reporting what is going on 
in the async backend, but at a application level abstraction (rather 
than "programmer's view"). And the application has a cyclic "vibe", so 
any important information lost will re-appear in an hour or less.

Cheers
Niclas

Re: Topic as a buffer?

Posted by Enrico Olivelli <eo...@gmail.com>.
Niclas,


Il giorno mar 24 gen 2023 alle ore 09:04 Niclas Hedhman
<ni...@hedhman.org> ha scritto:
>
> Hi,
> I am trying to set up topics to act as "message buffers" to web clients.
>
> I would like the client to be able to re-connect at any time, and have
> the latest N messages (or AFAIUI, some number of Megabytes).
>
> So, I think I need;
> 1. non-persistent topics
> 2. very long MessageTTL on Namespace used for this
> 3. RetentionPolicies with -1 time limit and some number of MB
>
> Client in question is GO.
>
> I am reading (see below) the topic, not subscribing.
>
> Question 1;
> When I do 'pulsar.EarliestMessageID()', should I be getting the oldest
> message in the Retention buffer, or the oldest message not acknowledged?
>
> If the latter, will subsequent messages be read out, or will the same
> client be served the same message over and over again?
>
> Question 2;
> Will RetentionPolicies ("size"/"time") cause messages to be kept in RAM,
> or can I rely on messages being read from disk, even though they are
> non-persistent?


A non-persistent topic is volatile: if you restart the broker you lose the data


Enrico


>
>
> TIA
> Niclas
>
>
> -o-o-o- Relevant code -o-o-o-
> Creating reader;
>
> func (p *PulsarClient) CreateReader(topic string, earliest bool)
> pulsar.Reader {
>      var start pulsar.MessageID
>      if earliest {
>          start = pulsar.EarliestMessageID()
>      } else {
>          start = pulsar.LatestMessageID()
>      }
>      reader, err := p.client.CreateReader(pulsar.ReaderOptions{
>          Topic:          topic,
>          StartMessageID: start,
>      })
>      if err != nil {
>          log.DefaultLogger.Error(fmt.Sprintf("Failed to create Pulsar
> Reader for: %s", topic), err)
>      }
>      return reader
> }
>
>
> Reading messages;
>      reader :=
> h.pulsar.CreateReader(model.NotificationTopics+strconv.FormatInt(orgId,
> 10), true)
>      defer reader.Close()
>      for {
>          msg, err := reader.Next(ctx)
>          if msg == nil {
>              log.DefaultLogger.Info("Grafana sender: DONE")
>              return ctx.Err()
>          }
>          if err != nil {
>              log.DefaultLogger.Error(fmt.Sprintf("Couldn't get the
> message via reader.Next(): %+v", err))
>              continue
>          }
>          log.DefaultLogger.Info("Sending notification to " +
> req.PluginContext.User.Login)
>          err = sender.SendJSON(msg.Payload())
>          if err != nil {
>              log.DefaultLogger.Error(fmt.Sprintf("Couldn't send frame:
> %v", err))
>              return err
>          }
>      }
> }