You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@samza.apache.org by Mart Haitjema <mh...@skyportsystems.com> on 2015/01/13 02:24:31 UTC

state store changelog compaction

Hey guys,

Apologies if this question has been asked before but is there a way to enable (kafka) log compaction on the key-value store changelog from the samza properties file?

I’m experimenting with the state management features of Samza 0.8 and I’ve created a very simple single node test setup to deploy a super simple stream processor that uses the RocksDb state store and I’ve set the following store related config properties:
stores.my-store.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory
stores.my-store.changelog=kafka.my-store-changelog
stores.my-store.key.serde=string
stores.my-store.msg.serde=string

When I deployed the configuration I noticed the state store changelog topic did not have compaction enabled. I was also experimenting with checkpointing and noticed that the kafka checkpoint topic did have the log cleaner policy set to compaction:

deploy/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --describe
Topic:__samza_checkpoint_ver_1_for_my-test_1	PartitionCount:1	ReplicationFactor:1	Configs:segment.bytes=26214400,cleanup.policy=compact
	Topic: __samza_checkpoint_ver_1_for_my-test_1	Partition: 0	Leader: 0	Replicas: 0	Isr: 0
Topic:my-store-changelog	PartitionCount:1	ReplicationFactor:1	Configs:
	Topic: my-store-changelog	Partition: 0	Leader: 0	Replicas: 0	Isr: 0

Is there a way to enable log compaction for the state store changelog through samzas configuration? If not, is it safe to manually enable it using kafka-topics.sh --alter —topic my-store-changelog —config cleanup.policy=compact?

Many thanks,

-Mart

Re: state store changelog compaction

Posted by Chris Riccomini <cr...@linkedin.com.INVALID>.
Hey Mart,

Which version of Samza are you running? Naveen committed this ticket:

  https://issues.apache.org/jira/browse/SAMZA-226

But it's only available in master (0.9.0). This feature will auto-create
your changelog topic with the appropriate compaction settings and
partition count.

> When I deployed the configuration I noticed the state store changelog
>topic did not have compaction enabled.

If you have not manually created your topic using Kafka's kafka-topics.sh
command, then Kafka will use the topic defaults to create the topic. This
is usually not what you want. Usually, you'll need to create the changelog
with the proper partition count and log compaction enabled. Before
SAMZA-226, this was a manual process. If you didn't create the topic
manually, this is likely what happened.

This is fixable by modifying the Kafka topic's settings to use log
compaction.

> I was also experimenting with checkpointing and noticed that the kafka
>checkpoint topic did have the log cleaner policy set to compaction

This is surprising. The Kafka checkpoint topic has auto-created itself
with log compaction for a while now. I have seen some issues with Samza
0.7.0, where this happens, but with Samza 0.8.0, the issues were fixed. In
0.8.0, this commit:

  
https://github.com/apache/incubator-samza/commit/d733ed961fec0a08a6706be22c
a8a6537693bed2

Should cause the KafkaCheckpointManager to auto-create the topic with log
compaction.

Cheers,
Chris

On 1/12/15 5:24 PM, "Mart Haitjema" <mh...@skyportsystems.com> wrote:

>Hey guys,
>
>Apologies if this question has been asked before but is there a way to
>enable (kafka) log compaction on the key-value store changelog from the
>samza properties file?
>
>I¹m experimenting with the state management features of Samza 0.8 and
>I¹ve created a very simple single node test setup to deploy a super
>simple stream processor that uses the RocksDb state store and I¹ve set
>the following store related config properties:
>stores.my-store.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorage
>EngineFactory
>stores.my-store.changelog=kafka.my-store-changelog
>stores.my-store.key.serde=string
>stores.my-store.msg.serde=string
>
>When I deployed the configuration I noticed the state store changelog
>topic did not have compaction enabled. I was also experimenting with
>checkpointing and noticed that the kafka checkpoint topic did have the
>log cleaner policy set to compaction:
>
>deploy/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --describe
>Topic:__samza_checkpoint_ver_1_for_my-test_1	PartitionCount:1	ReplicationF
>actor:1	Configs:segment.bytes=26214400,cleanup.policy=compact
>	Topic: __samza_checkpoint_ver_1_for_my-test_1	Partition: 0	Leader:
>0	Replicas: 0	Isr: 0
>Topic:my-store-changelog	PartitionCount:1	ReplicationFactor:1	Configs:
>	Topic: my-store-changelog	Partition: 0	Leader: 0	Replicas: 0	Isr: 0
>
>Is there a way to enable log compaction for the state store changelog
>through samzas configuration? If not, is it safe to manually enable it
>using kafka-topics.sh --alter ‹topic my-store-changelog ‹config
>cleanup.policy=compact?
>
>Many thanks,
>
>-Mart