You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@pulsar.apache.org by Brian Candler <b....@pobox.com> on 2019/10/11 13:12:03 UTC

Message compaction questions

I have a couple of questions about topic compaction.

I have been able to demonstrate it working, using the python API (*).  
After producing messages with repeated keys, I did a manual compaction:

$ apache-pulsar-2.4.1/bin/pulsar-admin topics compact avro-topic
Topic compaction requested for persistent://public/default/avro-topic
$ apache-pulsar-2.4.1/bin/pulsar-admin topics compaction-status avro-topic
Compaction was a success
$ apache-pulsar-2.4.1/bin/pulsar-admin topics reset-cursor 
persistent://public/default/avro-topic -s my-subscription -t 10h

The consumer (created with is_read_compacted=True) then sees only the 
latest message for each key.  That's great.


However, all the messages in the original topic are still available - I 
can see them using the reader API.

Question 1: at what point, if ever, is the original topic purged to make 
space?  If I set an infinite retention policy on the topic, will 
compaction ever reclaim space?

Question 2: I don't understand what happens if you run compaction on an 
already-compacted topic.  Is the compacted topic compacted, or is the 
original topic re-compacted, or something else?

Regards,

Brian.


(*) Code:

=== producer ===

import pulsar

from pulsar.schema import *

class Foo(Record):
     a = String()
     b = Integer()
     c = Boolean()

client = pulsar.Client('pulsar://localhost:6650')

producer = client.create_producer('avro-topic', schema=AvroSchema(Foo))

for i in range(10):
     producer.send(Foo(a="hello%d" % i, b=i, c=True), 
partition_key=str(i % 3))

client.close()

# Problem 1: the python API doesn't have "key", but it does have 
"partition_key".  So even though I'm using a non-partitioned topic, I'm 
assuming that "partition_key" equates to "message key"

=== reader ===

from pulsar.schema import *

class Foo(Record):
     a = String()
     b = Integer()
     c = Boolean()

client = pulsar.Client('pulsar://localhost:6650')

msg_id = pulsar.MessageId.earliest

reader = client.create_reader('avro-topic', msg_id,
          schema=AvroSchema(Foo),
          #is_read_compacted=True,
          )

while True:
     msg = reader.read_next()
     print("Received message %r key=%r id=%s" % (msg.value().a, 
msg.partition_key(), msg.message_id()))

# Problem 2: the "reader" API doesn't appear to have 
"is_read_compacted", so it's always reading from the non-compacted 
version of the topic.

=== consumer ===

import pulsar

from pulsar.schema import *

class Foo(Record):
     a = String()
     b = Integer()
     c = Boolean()

client = pulsar.Client('pulsar://localhost:6650')

consumer = client.subscribe('avro-topic', 'my-subscription', 
schema=AvroSchema(Foo),
             is_read_compacted=True)

while True:
     msg = consumer.receive()
     try:
         print("Received message %r key=%r id=%s" % (msg.value().a, 
msg.partition_key(), msg.message_id()))
         consumer.acknowledge(msg)
     except Exception as e:
         # Message failed to be processed
         print("Oops %r" % e)
         consumer.negative_acknowledge(msg)

# Works if the topic is manually compacted and then the cursor is 
manually reset

======



Re: Message compaction questions

Posted by Brian Candler <b....@pobox.com>.
On 16/10/2019 16:29, Sijie Guo wrote:
> However I think there are a couple tasks are not completed for topic 
> compaction. For example, provide the ability to truncate raw data 
> after compacted. Feel free to create a github issue for requesting 
> this feature.
>
>     Is there a way to recover the storage from the original topic?
>
>
> You mean reclaim the storage occupied by the raw data?

Yes that's what I meant - and I now understand the answer is "not yet".  
Many thanks.


Re: Message compaction questions

Posted by Sijie Guo <gu...@gmail.com>.
On Mon, Oct 14, 2019 at 5:42 AM Brian Candler <b....@pobox.com> wrote:

> On 14/10/2019 13:24, xiaolong ran wrote:
>
> For more details on compact topic, you can refer to:
> https://github.com/apache/pulsar/wiki/PIP-14:-Topic-compaction
>
> There it says:
>
> "Compaction doesn't directly interact with message expiration. Once a
> topic is compacted, the backlog still exists. However, subscribers with
> cursors before the compaction horizon will move quickly through the
> backlog, so it will eventually get cleaned up as there'll be no subscribers.
> "
>
> Now suppose my usage model is: I want to use a pulsar topic as an analogue
> of a "table".  I need to keep the last value for any given key in the
> backlog forever - but previous values of that key can be forgotten.
>
> As far as I can see, the only way to keep a key forever is either:
>
> 1. Set the topic's retention
> <https://pulsar.apache.org/docs/en/cookbooks-retention-expiry/#retention-policies>
> time to infinity
>
> 2. Have a dummy subscriber which never consumes
>
> But in either case, it seems to me that the main backlog will grow
> forever.  The compacted topic is an optimisation for *readers* who want to
> skip all key/value pairs except the most recent ones for each key; but the
> original topic continues to grow without bounds.
>
> Have I understood that correctly - or else what have I missed?
>

Your understanding is correct. The decision made for topic compaction is to
keep both raw data and compacted data so that user can choose which copy of
the data to consume.

However I think there are a couple tasks are not completed for topic
compaction. For example, provide the ability to truncate raw data after
compacted. Feel free to create a github issue for requesting this feature.


> Is there a way to recover the storage from the original topic?
>

You mean reclaim the storage occupied by the raw data?


>   Logically what I want to do is replace the original topic with the
> compacted one.  Would the application be expected to copy all messages from
> the compacted topic to a new one periodically?
>
> Thanks,
>
> Brian.
>

Re: Message compaction questions

Posted by Brian Candler <b....@pobox.com>.
On 14/10/2019 13:24, xiaolong ran wrote:
> For more details on compact topic, you can refer to: 
> https://github.com/apache/pulsar/wiki/PIP-14:-Topic-compaction

There it says:

"Compaction doesn't directly interact with message expiration. Once a 
topic is compacted, the backlog still exists. However, subscribers with 
cursors before the compaction horizon will move quickly through the 
backlog, so it will eventually get cleaned up as there'll be no 
subscribers."

Now suppose my usage model is: I want to use a pulsar topic as an 
analogue of a "table".  I need to keep the last value for any given key 
in the backlog forever - but previous values of that key can be forgotten.

As far as I can see, the only way to keep a key forever is either:

1. Set the topic's retention 
<https://pulsar.apache.org/docs/en/cookbooks-retention-expiry/#retention-policies> 
time to infinity

2. Have a dummy subscriber which never consumes

But in either case, it seems to me that the main backlog will grow 
forever.  The compacted topic is an optimisation for *readers* who want 
to skip all key/value pairs except the most recent ones for each key; 
but the original topic continues to grow without bounds.

Have I understood that correctly - or else what have I missed?

Is there a way to recover the storage from the original topic? Logically 
what I want to do is replace the original topic with the compacted one.  
Would the application be expected to copy all messages from the 
compacted topic to a new one periodically?

Thanks,

Brian.


Re: Message compaction questions

Posted by xiaolong ran <ra...@gmail.com>.
> Question 1: at what point, if ever, is the original topic purged to make space?  If I set an infinite retention policy on the topic, will compaction ever reclaim space?

This is an implementation mechanism inside pulsar. In pulsar, topic is a logical concept. A topic corresponds to a manage ledger. When you write data, it actually writes the data to the ledger. One core of pulsar design is: In pulsar, all operations are asynchronous, so when the retention reaches the specified threshold, whether to delete the data in the corresponding ledger, this operation is also asynchronous. The delete operation is not performed on the currently active ledger. Only when the data is full of the current ledger, the ledger will switch to the real execution of the retention policy, freeing up the space.

> Question 2: I don't understand what happens if you run compaction on an already-compacted topic.  Is the compacted topic compacted, or is the original topic re-compacted, or something else?

As you have provided, we can get the operation state of the last compact through compaction-stats. If it succeeds, the next compact operation will continue to be compact on the already-compacted basis instead of starting from the original. But whether it will actually trigger the compression operation, it needs further inspection.

For more details on compact topic, you can refer to: https://github.com/apache/pulsar/wiki/PIP-14:-Topic-compaction <https://github.com/apache/pulsar/wiki/PIP-14:-Topic-compaction>

> 在 2019年10月11日,下午9:12,Brian Candler <b....@pobox.com> 写道:
> 
> I have a couple of questions about topic compaction.
> 
> I have been able to demonstrate it working, using the python API (*).  After producing messages with repeated keys, I did a manual compaction:
> 
> $ apache-pulsar-2.4.1/bin/pulsar-admin topics compact avro-topic
> Topic compaction requested for persistent://public/default/avro-topic
> $ apache-pulsar-2.4.1/bin/pulsar-admin topics compaction-status avro-topic
> Compaction was a success
> $ apache-pulsar-2.4.1/bin/pulsar-admin topics reset-cursor persistent://public/default/avro-topic -s my-subscription -t 10h
> 
> The consumer (created with is_read_compacted=True) then sees only the latest message for each key.  That's great.
> 
> 
> However, all the messages in the original topic are still available - I can see them using the reader API.
> 
> Question 1: at what point, if ever, is the original topic purged to make space?  If I set an infinite retention policy on the topic, will compaction ever reclaim space?
> 
> Question 2: I don't understand what happens if you run compaction on an already-compacted topic.  Is the compacted topic compacted, or is the original topic re-compacted, or something else?
> 
> Regards,
> 
> Brian.
> 
> 
> (*) Code:
> 
> === producer ===
> 
> import pulsar
> 
> from pulsar.schema import *
> 
> class Foo(Record):
>     a = String()
>     b = Integer()
>     c = Boolean()
> 
> client = pulsar.Client('pulsar://localhost:6650')
> 
> producer = client.create_producer('avro-topic', schema=AvroSchema(Foo))
> 
> for i in range(10):
>     producer.send(Foo(a="hello%d" % i, b=i, c=True), partition_key=str(i % 3))
> 
> client.close()
> 
> # Problem 1: the python API doesn't have "key", but it does have "partition_key".  So even though I'm using a non-partitioned topic, I'm assuming that "partition_key" equates to "message key"
> 
> === reader ===
> 
> from pulsar.schema import *
> 
> class Foo(Record):
>     a = String()
>     b = Integer()
>     c = Boolean()
> 
> client = pulsar.Client('pulsar://localhost:6650')
> 
> msg_id = pulsar.MessageId.earliest
> 
> reader = client.create_reader('avro-topic', msg_id,
>          schema=AvroSchema(Foo),
>          #is_read_compacted=True,
>          )
> 
> while True:
>     msg = reader.read_next()
>     print("Received message %r key=%r id=%s" % (msg.value().a, msg.partition_key(), msg.message_id()))
> 
> # Problem 2: the "reader" API doesn't appear to have "is_read_compacted", so it's always reading from the non-compacted version of the topic.
> 
> === consumer ===
> 
> import pulsar
> 
> from pulsar.schema import *
> 
> class Foo(Record):
>     a = String()
>     b = Integer()
>     c = Boolean()
> 
> client = pulsar.Client('pulsar://localhost:6650')
> 
> consumer = client.subscribe('avro-topic', 'my-subscription', schema=AvroSchema(Foo),
>             is_read_compacted=True)
> 
> while True:
>     msg = consumer.receive()
>     try:
>         print("Received message %r key=%r id=%s" % (msg.value().a, msg.partition_key(), msg.message_id()))
>         consumer.acknowledge(msg)
>     except Exception as e:
>         # Message failed to be processed
>         print("Oops %r" % e)
>         consumer.negative_acknowledge(msg)
> 
> # Works if the topic is manually compacted and then the cursor is manually reset
> 
> ======
> 
>