You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/06/19 18:04:03 UTC

[GitHub] [kafka] vvcephei opened a new pull request #8903: WIP: dump tool for suppress buffer changelog

vvcephei opened a new pull request #8903:
URL: https://github.com/apache/kafka/pull/8903


   There's currently no good way to look into the changelog for a suppression buffer,
   since the format is a binary schema.
   
   This POC tool would allow scanning over the changelog and parsing the records.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #8903: WIP: dump tool for suppress buffer changelog

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #8903:
URL: https://github.com/apache/kafka/pull/8903#issuecomment-646798478


   As a POC, I've verfied this tool by hand by dumping the log from a suppression over windowed data:
   
   `consumer.properties`:
   ```
   bootstrap.servers=localhost:9092
   key.deserializer=org.apache.kafka.streams.kstream.TimeWindowedDeserializer
   default.windowed.key.serde.inner=org.apache.kafka.common.serialization.Serdes$StringSerde
   value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
   ```
   
   `bin/kafka-run-class.sh org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBufferChangelogDumpTool consumer.properties stream-soak-test-logData10MinuteSuppressedCount-store-changelog 0`
   
   The result is output like:
   ```
   ...
   topic=[stream-soak-test-logData10MinuteSuppressedCount-store-changelog] partition=[0] offset=[785652987] timestamp=[1592576961370] key=[[gke-k8s-sz-b1-us-central-default-pool-239c28d1-568g@1592528400000/9223372036854775807]] <tombstone>
   topic=[stream-soak-test-logData10MinuteSuppressedCount-store-changelog] partition=[0] offset=[785652988] timestamp=[1592529165408] key=[[gke-k8s-sz-b1-us-central-default-pool-5p6i165g-8vhf@1592529000000/9223372036854775807]] priorValue=[null] oldValue=[null] newValue=[1] serializedContext=[ProcessorRecordContext{topic='node-name-repartition', partition=0, offset=448463657, timestamp=1592529165408, headers=RecordHeaders(headers = [], isReadOnly = false)}]
   topic=[stream-soak-test-logData10MinuteSuppressedCount-store-changelog] partition=[0] offset=[785652989] timestamp=[1592529173959] key=[[gke-k8s-sz-b1-us-central-default-pool-tham13b7-x74o@1592529000000/9223372036854775807]] priorValue=[null] oldValue=[null] newValue=[1] serializedContext=[ProcessorRecordContext{topic='node-name-repartition', partition=0, offset=448463970, timestamp=1592529173959, headers=RecordHeaders(headers = [], isReadOnly = false)}]
   topic=[stream-soak-test-logData10MinuteSuppressedCount-store-changelog] partition=[0] offset=[785652990] timestamp=[1592576961370] key=[[gke-k8s-sz-b1-us-central-default-pool-03wh5r35-2ns2@1592528400000/9223372036854775807]] <tombstone>
   topic=[stream-soak-test-logData10MinuteSuppressedCount-store-changelog] partition=[0] offset=[785652991] timestamp=[1592576961370] key=[[gke-k8s-sz-b1-us-central-default-pool-6l5j5i59-16qk@1592528400000/9223372036854775807]] <tombstone>
   topic=[stream-soak-test-logData10MinuteSuppressedCount-store-changelog] partition=[0] offset=[785652992] timestamp=[1592576961370] key=[[gke-k8s-sz-b1-us-central-default-pool-sma04jt2-l838@1592528400000/9223372036854775807]] <tombstone>
   topic=[stream-soak-test-logData10MinuteSuppressedCount-store-changelog] partition=[0] offset=[785652993] timestamp=[1592529180997] key=[[gke-k8s-sz-b1-us-central-default-pool-t9aqx35k-x999@1592529000000/9223372036854775807]] priorValue=[null] oldValue=[null] newValue=[1] serializedContext=[ProcessorRecordContext{topic='node-name-repartition', partition=0, offset=448464292, timestamp=1592529180997, headers=RecordHeaders(headers = [], isReadOnly = false)}]
   topic=[stream-soak-test-logData10MinuteSuppressedCount-store-changelog] partition=[0] offset=[785652994] timestamp=[1592576961370] key=[[gke-k8s-sz-b1-us-central-default-pool-l21x28ba-4bui@1592528400000/9223372036854775807]] <tombstone>
   Consumed to end of topic.
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org