You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Edward Capriolo <ed...@gmail.com> on 2019/11/09 16:08:51 UTC

Re: Attempt to prove Kafka transactions work

On Thu, Oct 31, 2019 at 6:11 AM Matthias J. Sax <ma...@confluent.io>
wrote:

> Quite a project to test transactions...
>
> The current system test suite is part of the code base:
> https://github.com/apache/kafka/tree/trunk/tests/kafkatest/tests
>
> There is course also some unit/integration test for transactions.
>
> There is also a blog post that describes in a high level what testing
> was done when EOS was introduced:
>
> https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/
>
> And yes, transactions are built on some assumptions and if you configure
> your system incorrectly of violate those assumptions, it may break. We
> also fixed some bugs since the first release. And there might be more
> bugs --- software is always buggy. However, for practical consideration,
> transactions should work.
>
> We would of course love if you could share your test results! If you
> discover a bug, please report it, so we can fix it.
>
>
> -Matthias
>
> On 10/28/19 10:06 AM, Edward Capriolo wrote:
> > On Sunday, October 27, 2019, Boyang Chen <re...@gmail.com>
> wrote:
> >
> >> Hey Edward,
> >>
> >> just to summarize and make sure I understood your question, you want to
> >> implement some Chaos testing to validate Kafka EOS model, but not sure
> how
> >> to start or curious about whether there are already works in the
> community
> >> doing that?
> >>
> >> For the correctness of Kafka EOS, we have tons of unit tests and system
> >> tests to prove its functionality. They could be found inside the repo.
> You
> >> could check them out and see if we still have gaps (which I believe we
> >> definitely have).
> >>
> >> Boyang
> >>
> >> On Fri, Oct 25, 2019 at 7:25 PM Edward Capriolo <ed...@gmail.com>
> >> wrote:
> >>
> >>> Hello all,
> >>>
> >>> I used to work in adtech. Adtech was great. CPM for ads is 1-$5 per
> >>> thousand ad impression. If numbers are 5% off you can blame javascript
> >>> click trackers.
> >>>
> >>> Now, I work in a non addtech industry and they are really, really
> serious
> >>> about exactly once.
> >>>
> >>> So there is this blog:
> >>>
> >>> https://www.confluent.io/blog/transactions-apache-kafka/
> >>>
> >>> Great little snippet of code. I think I can copy it and implement it
> >>> correctly.
> >>> You know, but if you read that section about the zombie fencing, you
> >> learn
> >>> that you either need to manually assign partitions or use the rebalance
> >>> listener and have N producers. Hunting around github is not super
> >> helpful,
> >>> some code snippets are less complete then even the snipped in the blog.
> >>>
> >>> I looked at what spring-kafka does. It does get the zombie fencing
> >> correct
> >>> with respect to fencing id, other bits other bits of the code seem
> >>> plausible.
> >>>
> >>> Notice I said "plausible", because I do not count a few end to end
> tests
> >>> running single VM as a solid enough evidence that this works in the
> face
> >> of
> >>> failures.
> >>>
> >>> I have been contemplating how one stress tests this exactly once
> concept,
> >>> with something Jepsen like or something brute force that I can run for
> 5
> >>> hours in a row
> >>>
> >>> If I faithfully implemented the code in the transactional read-write
> loop
> >>> and I feed it into my jepsen like black box tester it should:
> >>>
> >>> Create a topic with 10 partitions, Start launching read-write
> transaction
> >>> code, start feeding input data, Maybe strings like 1 -1000, now start
> >>> randomly killing vms with kill -9 kill graceful exits, maybe even
> killing
> >>> kafka, and make sure 1-1000, pop out on the other end.
> >>>
> >>> I thought of some other "crazy" ideas. One such idea:
> >>>
> >>> If I make a transactional "echo", read x; write x back to the same
> topic.
> >>> RunN instances of that and kill them randomly. If I am loosing messages
> >>> (and not duplicating messages) then the topic would eventually have no
> >>> data..
> >>>
> >>> Or should I make a program with some math formula like receive x write
> >> xx.
> >>> If duplication is happening I would start seeing multiple xx's
> >>>
> >>> Or send 1,000,000,000 messages through and consumer logs them to a
> file.
> >>> Then use an etl tool to validate messages come out on the other side.
> >>>
> >>> Or should I use a nosql with increments and count up and ensure no key
> >> has
> >>> been incremented twice.
> >>>
> >>> note: I realize I can just use kafka streams or storm, which has its
> own
> >>> systems to guarantee "at most once" but Iooking for a way to prove what
> >> can
> >>> be done with pure kafka. (and not just prove it adtech work (good
> enough
> >> 5%
> >>> here or there) )
> >>>
> >>> I imagine someone somewhere must be doing this. How where tips? Is it
> >> part
> >>> of some kafka release stress test? I'm down to write it if it does not
> >>> exist.
> >>>
> >>> Thanks,
> >>> Edward,
> >>>
> >>> Thanks,
> >>> Edward
> >>>
> >>
> >
> > Boyang,
> >
> > I just to summarize and make sure I understood your question, you want to
> > implement some Chaos testing to validate Kafka EOS model, but not sure
> how
> > to start or curious about whether there are already works in the
> community
> > doing that?
> >
> > Yes.
> >
> > I am not an expert in this field, but I know what distributed systems can
> > mask failures. For example if you have atomic increment you might unit
> test
> > it and it works fine, but if you ran it for 40 days it might double
> count 1
> > time.
> >
> >  of Kafka EOS, we have tons of unit tests and system
> > tests to prove its functionality. They could be found inside the repo.
> >
> > I've been a developer for a while so the phrase "there are tests" never
> > tells me everything. Tests reveal the presence of bugs not the absence.
> >
> > Can you please point me at the tests? My curiosity is if there is a
> > systematic in-depth strategy here and how much rigor there is.
> >
> > In my environment I need to quantify and use rigor to prove out these
> > things. Things that you might take for granted. For example, I have to
> > prove that zookeeper works as expected when we lose a datacenter. Most
> > people 'in the know' take it for granted that kafka and zk do what is
> > advertised when configured properly. I have to test that out and document
> > my findings.
> >
> > For kafka transactions. The user space code needs to be written properly
> > and configured properly along with the server being setup properly. It is
> > not enough for me to check out kafka run 'sbt test' and declare victory
> > after the unit tests pass.
> >
> > What I am effectively looking for is the anti jepsen blog that says...We
> > threw the kitchen sink at this and these transactions are bullet proof.
> > Here is our methodology, here is some charts, here is xyz. Here is how we
> > run it every minor release
> >
> > I'm not trying to be a pita, educate me on how bullet proof this is and
> how
> > I can reproduce the results.
> >
> >
> >
> >
> >
> >
> >
>
>

All,

After a few weeks of hacking at this I have found a few interesting things.
First things first, introducing Kafos

https://github.com/edwardcapriolo/kafos

Kafos (think Kafka + Chaos) . Kafos is a project that attempts to prove out
Kafka transactions do exacty-once by launching multiple servers and
consume/producers, killing them off and counting how many messages land on
the other end.

Note: The project is a little raw at this point, (it has hard code to my
home folders and some long Thread.sleep() calls in there. I will clean it
up as I go,) but I have some early findings.

From the blog here, I want to point out the following flaws:

https://www.confluent.io/blog/transactions-apache-kafka/

KafkaProducer producer = createKafkaProducer(
“bootstrap.servers”, “localhost:9092”,
“transactional.id”, “my-transactional-id”);

1) a fixed transaction.id effectively means the first worker fences all the
other ones.While being exactly-once, there is exactly one worker and no
parallelism.

Spring has two schemes that seem to implement random information in the
fencing ids, which I think defeats the purpose. But I need to dive in on
that more.
https://docs.spring.io/spring-kafka/reference/html/

Transactions are enabled by providing the DefaultKafkaProducerFactory with
a transactionIdPrefix. In that case, instead of managing a single shared
Producer, the factory maintains a cache of transactional producers. When
the user calls close() on a producer, it is returned to the cache for reuse
instead of actually being closed. The transactional.id property of each
producer is transactionIdPrefix + n, where n starts with 0 and is
incremented for each new producer, unless the transaction is started by a
listener container with a record-based listener. In that case, the
transactional.id is <transactionIdPrefix>.<group.id>.<topic>.<partition>.
This is to properly support fencing zombies, as described here
<https://www.confluent.io/blog/transactions-apache-kafka/>. This new
behavior was added in versions 1.3.7, 2.0.6, 2.1.10, and 2.2.0. If you wish
to revert to the previous behavior, you can set the
producerPerConsumerPartition property on the DefaultKafkaProducerFactory to
false.

While transactions are supported with batch listeners, by default, zombie
fencing is not supported because a batch may contain records from multiple
topics or partitions. However, starting with version 2.3.2, zombie fencing
is supported if you set the container property subBatchPerPartition to
true. In that case, the batch listener is invoked once per partition
received from the last poll, as if each poll only returned records for a
single partition.

How to fix this? Here be the dragons:

"Practically, one would either have to store the mapping between input
partitions and transactional.ids in an external store, or have some static
encoding of it. Kafka Streams opts for the latter approach to solve this
problem."

Well then practically, the rest of this code is not useful.

while (true) {
   ConsumerRecords records = consumer.poll(Long.MAX_VALUE);
   producer.beginTransaction();
   for (ConsumerRecord record : records)
    producer.send(producerRecord(“outputTopic”, record));
   producer.sendOffsetsToTransaction(currentOffsets(consumer), group);
   producer.commitTransaction();
}

When you call consumer.poll() (unless you manually assign to a single
partiion) you are going to receive records from one or more partitions.  As
consumers join and leave the the group workers are going to move.

There are two ways I can find to do this:
1) use a consumerRebalanceListener and a concurrent map of producers
https://github.com/edwardcapriolo/kafos/blob/68782a5420840377dcb3ef94fb65132028804340/pure8/src/main/java/pure8/multipartitionrpw/MultiPartitionRPWBuffered.java#L123

2) as you are polling create/reuse producers as needed.

With scalable correctly fenced producers the loop looks more like this:

ConsumerRecords<K,V> records = null;
try {
   records = consumer.poll(2000);
} catch (KafkaException e){
   return;
}

for(ConsumerRecord<K,V> record: records) {
   int partition = record.partition();
   BufferedTrackingProducer<K,V> producer = producers.get(partition);
   List<ProducerRecord<K, V>> results = processor.process(record);
   producer.send(results);

}
for (Entry<Integer, BufferedTrackingProducer<K, V>> entry:
producers.entrySet())
{
   entry.getValue().commitAndClear(records, groupId);
}

Implementation here:
https://github.com/edwardcapriolo/kafos/blob/68782a5420840377dcb3ef94fb65132028804340/pure8/src/main/java/pure8/multipartitionrpw/MultiPartitionRPWBuffered.java#L1

BTW maybe I am wrong about this and there is a better approach. With the
multiple producer threads for single consumer.poll figuring out exactly how
to layer the try/catch and exception handling becomes a bit harder than the
example in the blog.

Anyway,  here is what a Kafos chaos "test" looks like.

https://github.com/edwardcapriolo/kafos/blob/68782a5420840377dcb3ef94fb65132028804340/pure8/src/main/java/pure8/multipartitionrpw/MutliRPWWithConsumerRestartingTest.java#L1

It is a bit silly that we are forking off processes like this, but I did
not want processes running in the same VM.I am not ready to run
testcontainers/docker at this point. But hey we launch kafka, we launch zk,
we start  workers, we kill them off we count things. Results look like
this.


Verification stats
------------------------------------------------------------------
notFound: 0 foundCorrect: 4000 foundDuplications: 0
------------------------------------------------------------------
not found list[]
serverid: 0 out line Stream closed

I am going to do more work killing off servers and making more test
scenarios, but so far off to a good start. When you get all the code right,
(by cobbling together all the sources out there with half the code) you get
things that seem to do exactly once, Help wanted in acedemic review of my
assertions and code or PRS

Thanks
Edward

Re: Attempt to prove Kafka transactions work

Posted by Edward Capriolo <ed...@gmail.com>.
On Wed, Nov 20, 2019 at 6:35 PM Edward Capriolo <ed...@gmail.com>
wrote:

>
>
> On Wednesday, November 20, 2019, Matthias J. Sax <ma...@confluent.io>
> wrote:
>
>> I am not sure what Spring does, but using Kafka Streams writing the
>> output and committing offset would be part of the same transaction.
>>
>> It seems Spring is doing something else and thus, is seems it does not
>> use the EOS API correctly.
>>
>> If you use transactions to copy data from input to output topic,
>> committing offsets must be done on the producer as part to the
>> transaction; the consumer would not commit offsets.
>>
>> To me, it seems that Spring is committing offset using the consumer
>> independent if the transaction was successful or not. This would be an
>> incorrect usage of the API.
>>
>>
>> -Matthias
>>
>> On 11/20/19 6:16 AM, Edward Capriolo wrote:
>> > Ok. I'm at a point where I believe the exactly once is in question.
>> >
>> > Topic input 10 partitions topic output 10 partitions.
>> >
>> > Producer writes messages 1 to 100 to topic input.
>> >
>> > CTP process calls poll. It receives 100 messages 10 in each partiton.
>> >
>> > Process is simple mirroring take from input write to output.
>> >
>> > 10 producers with 10 transactional ids are created. During processing 1
>> of
>> > the 10 producers throws kafka exception. 90 out of 100 writes are
>> committed
>> > tranactionally 10 are not.
>> >
>> > If poll is called again 10 messages do not appear in the next poll. Are
>> > they lost?
>> >
>> >
>> >
>> > On Saturday, November 9, 2019, Edward Capriolo <ed...@gmail.com>
>> > wrote:
>> >
>> >>
>> >> On Thu, Oct 31, 2019 at 6:11 AM Matthias J. Sax <matthias@confluent.io
>> >
>> >> wrote:
>> >>
>> >>> Quite a project to test transactions...
>> >>>
>> >>> The current system test suite is part of the code base:
>> >>> https://github.com/apache/kafka/tree/trunk/tests/kafkatest/tests
>> >>>
>> >>> There is course also some unit/integration test for transactions.
>> >>>
>> >>> There is also a blog post that describes in a high level what testing
>> >>> was done when EOS was introduced:
>> >>> https://www.confluent.io/blog/exactly-once-semantics-are-
>> >>> possible-heres-how-apache-kafka-does-it/
>> >>>
>> >>> And yes, transactions are built on some assumptions and if you
>> configure
>> >>> your system incorrectly of violate those assumptions, it may break. We
>> >>> also fixed some bugs since the first release. And there might be more
>> >>> bugs --- software is always buggy. However, for practical
>> consideration,
>> >>> transactions should work.
>> >>>
>> >>> We would of course love if you could share your test results! If you
>> >>> discover a bug, please report it, so we can fix it.
>> >>>
>> >>>
>> >>> -Matthias
>> >>>
>> >>> On 10/28/19 10:06 AM, Edward Capriolo wrote:
>> >>>> On Sunday, October 27, 2019, Boyang Chen <reluctanthero104@gmail.com
>> >
>> >>> wrote:
>> >>>>
>> >>>>> Hey Edward,
>> >>>>>
>> >>>>> just to summarize and make sure I understood your question, you
>> want to
>> >>>>> implement some Chaos testing to validate Kafka EOS model, but not
>> sure
>> >>> how
>> >>>>> to start or curious about whether there are already works in the
>> >>> community
>> >>>>> doing that?
>> >>>>>
>> >>>>> For the correctness of Kafka EOS, we have tons of unit tests and
>> system
>> >>>>> tests to prove its functionality. They could be found inside the
>> repo.
>> >>> You
>> >>>>> could check them out and see if we still have gaps (which I believe
>> we
>> >>>>> definitely have).
>> >>>>>
>> >>>>> Boyang
>> >>>>>
>> >>>>> On Fri, Oct 25, 2019 at 7:25 PM Edward Capriolo <
>> edlinuxguru@gmail.com
>> >>>>
>> >>>>> wrote:
>> >>>>>
>> >>>>>> Hello all,
>> >>>>>>
>> >>>>>> I used to work in adtech. Adtech was great. CPM for ads is 1-$5 per
>> >>>>>> thousand ad impression. If numbers are 5% off you can blame
>> javascript
>> >>>>>> click trackers.
>> >>>>>>
>> >>>>>> Now, I work in a non addtech industry and they are really, really
>> >>> serious
>> >>>>>> about exactly once.
>> >>>>>>
>> >>>>>> So there is this blog:
>> >>>>>>
>> >>>>>> https://www.confluent.io/blog/transactions-apache-kafka/
>> >>>>>>
>> >>>>>> Great little snippet of code. I think I can copy it and implement
>> it
>> >>>>>> correctly.
>> >>>>>> You know, but if you read that section about the zombie fencing,
>> you
>> >>>>> learn
>> >>>>>> that you either need to manually assign partitions or use the
>> >>> rebalance
>> >>>>>> listener and have N producers. Hunting around github is not super
>> >>>>> helpful,
>> >>>>>> some code snippets are less complete then even the snipped in the
>> >>> blog.
>> >>>>>>
>> >>>>>> I looked at what spring-kafka does. It does get the zombie fencing
>> >>>>> correct
>> >>>>>> with respect to fencing id, other bits other bits of the code seem
>> >>>>>> plausible.
>> >>>>>>
>> >>>>>> Notice I said "plausible", because I do not count a few end to end
>> >>> tests
>> >>>>>> running single VM as a solid enough evidence that this works in the
>> >>> face
>> >>>>> of
>> >>>>>> failures.
>> >>>>>>
>> >>>>>> I have been contemplating how one stress tests this exactly once
>> >>> concept,
>> >>>>>> with something Jepsen like or something brute force that I can run
>> >>> for 5
>> >>>>>> hours in a row
>> >>>>>>
>> >>>>>> If I faithfully implemented the code in the transactional
>> read-write
>> >>> loop
>> >>>>>> and I feed it into my jepsen like black box tester it should:
>> >>>>>>
>> >>>>>> Create a topic with 10 partitions, Start launching read-write
>> >>> transaction
>> >>>>>> code, start feeding input data, Maybe strings like 1 -1000, now
>> start
>> >>>>>> randomly killing vms with kill -9 kill graceful exits, maybe even
>> >>> killing
>> >>>>>> kafka, and make sure 1-1000, pop out on the other end.
>> >>>>>>
>> >>>>>> I thought of some other "crazy" ideas. One such idea:
>> >>>>>>
>> >>>>>> If I make a transactional "echo", read x; write x back to the same
>> >>> topic.
>> >>>>>> RunN instances of that and kill them randomly. If I am loosing
>> >>> messages
>> >>>>>> (and not duplicating messages) then the topic would eventually
>> have no
>> >>>>>> data..
>> >>>>>>
>> >>>>>> Or should I make a program with some math formula like receive x
>> write
>> >>>>> xx.
>> >>>>>> If duplication is happening I would start seeing multiple xx's
>> >>>>>>
>> >>>>>> Or send 1,000,000,000 messages through and consumer logs them to a
>> >>> file.
>> >>>>>> Then use an etl tool to validate messages come out on the other
>> side.
>> >>>>>>
>> >>>>>> Or should I use a nosql with increments and count up and ensure no
>> key
>> >>>>> has
>> >>>>>> been incremented twice.
>> >>>>>>
>> >>>>>> note: I realize I can just use kafka streams or storm, which has
>> its
>> >>> own
>> >>>>>> systems to guarantee "at most once" but Iooking for a way to prove
>> >>> what
>> >>>>> can
>> >>>>>> be done with pure kafka. (and not just prove it adtech work (good
>> >>> enough
>> >>>>> 5%
>> >>>>>> here or there) )
>> >>>>>>
>> >>>>>> I imagine someone somewhere must be doing this. How where tips? Is
>> it
>> >>>>> part
>> >>>>>> of some kafka release stress test? I'm down to write it if it does
>> not
>> >>>>>> exist.
>> >>>>>>
>> >>>>>> Thanks,
>> >>>>>> Edward,
>> >>>>>>
>> >>>>>> Thanks,
>> >>>>>> Edward
>> >>>>>>
>> >>>>>
>> >>>>
>> >>>> Boyang,
>> >>>>
>> >>>> I just to summarize and make sure I understood your question, you
>> want
>> >>> to
>> >>>> implement some Chaos testing to validate Kafka EOS model, but not
>> sure
>> >>> how
>> >>>> to start or curious about whether there are already works in the
>> >>> community
>> >>>> doing that?
>> >>>>
>> >>>> Yes.
>> >>>>
>> >>>> I am not an expert in this field, but I know what distributed systems
>> >>> can
>> >>>> mask failures. For example if you have atomic increment you might
>> unit
>> >>> test
>> >>>> it and it works fine, but if you ran it for 40 days it might double
>> >>> count 1
>> >>>> time.
>> >>>>
>> >>>>  of Kafka EOS, we have tons of unit tests and system
>> >>>> tests to prove its functionality. They could be found inside the
>> repo.
>> >>>>
>> >>>> I've been a developer for a while so the phrase "there are tests"
>> never
>> >>>> tells me everything. Tests reveal the presence of bugs not the
>> absence.
>> >>>>
>> >>>> Can you please point me at the tests? My curiosity is if there is a
>> >>>> systematic in-depth strategy here and how much rigor there is.
>> >>>>
>> >>>> In my environment I need to quantify and use rigor to prove out these
>> >>>> things. Things that you might take for granted. For example, I have
>> to
>> >>>> prove that zookeeper works as expected when we lose a datacenter.
>> Most
>> >>>> people 'in the know' take it for granted that kafka and zk do what is
>> >>>> advertised when configured properly. I have to test that out and
>> >>> document
>> >>>> my findings.
>> >>>>
>> >>>> For kafka transactions. The user space code needs to be written
>> properly
>> >>>> and configured properly along with the server being setup properly.
>> It
>> >>> is
>> >>>> not enough for me to check out kafka run 'sbt test' and declare
>> victory
>> >>>> after the unit tests pass.
>> >>>>
>> >>>> What I am effectively looking for is the anti jepsen blog that
>> says...We
>> >>>> threw the kitchen sink at this and these transactions are bullet
>> proof.
>> >>>> Here is our methodology, here is some charts, here is xyz. Here is
>> how
>> >>> we
>> >>>> run it every minor release
>> >>>>
>> >>>> I'm not trying to be a pita, educate me on how bullet proof this is
>> and
>> >>> how
>> >>>> I can reproduce the results.
>> >>>>
>> >>>>
>> >>>>
>> >>>>
>> >>>>
>> >>>>
>> >>>>
>> >>>
>> >>>
>> >>
>> >> All,
>> >>
>> >> After a few weeks of hacking at this I have found a few interesting
>> >> things. First things first, introducing Kafos
>> >>
>> >> https://github.com/edwardcapriolo/kafos
>> >>
>> >> Kafos (think Kafka + Chaos) . Kafos is a project that attempts to prove
>> >> out Kafka transactions do exacty-once by launching multiple servers and
>> >> consume/producers, killing them off and counting how many messages
>> land on
>> >> the other end.
>> >>
>> >> Note: The project is a little raw at this point, (it has hard code to
>> my
>> >> home folders and some long Thread.sleep() calls in there. I will clean
>> it
>> >> up as I go,) but I have some early findings.
>> >>
>> >> From the blog here, I want to point out the following flaws:
>> >>
>> >> https://www.confluent.io/blog/transactions-apache-kafka/
>> >>
>> >> KafkaProducer producer = createKafkaProducer(
>> >> “bootstrap.servers”, “localhost:9092”,
>> >> “transactional.id”, “my-transactional-id”);
>> >>
>> >> 1) a fixed transaction.id effectively means the first worker fences
>> all
>> >> the other ones.While being exactly-once, there is exactly one worker
>> and no
>> >> parallelism.
>> >>
>> >> Spring has two schemes that seem to implement random information in the
>> >> fencing ids, which I think defeats the purpose. But I need to dive in
>> on
>> >> that more.
>> >> https://docs.spring.io/spring-kafka/reference/html/
>> >>
>> >> Transactions are enabled by providing the DefaultKafkaProducerFactory
>> >> with a transactionIdPrefix. In that case, instead of managing a single
>> >> shared Producer, the factory maintains a cache of transactional
>> >> producers. When the user calls close() on a producer, it is returned to
>> >> the cache for reuse instead of actually being closed. The
>> transactional.id
>> >> property of each producer is transactionIdPrefix + n, where n starts
>> with
>> >> 0 and is incremented for each new producer, unless the transaction is
>> >> started by a listener container with a record-based listener. In that
>> case,
>> >> the transactional.id is <transactionIdPrefix>.<group.id
>> >>> .<topic>.<partition>. This is to properly support fencing zombies, as
>> >> described here <
>> https://www.confluent.io/blog/transactions-apache-kafka/>.
>> >> This new behavior was added in versions 1.3.7, 2.0.6, 2.1.10, and
>> 2.2.0. If
>> >> you wish to revert to the previous behavior, you can set the
>> >> producerPerConsumerPartition property on the
>> DefaultKafkaProducerFactory
>> >> to false.
>> >>
>> >> While transactions are supported with batch listeners, by default,
>> zombie
>> >> fencing is not supported because a batch may contain records from
>> multiple
>> >> topics or partitions. However, starting with version 2.3.2, zombie
>> fencing
>> >> is supported if you set the container property subBatchPerPartition to
>> >> true. In that case, the batch listener is invoked once per partition
>> >> received from the last poll, as if each poll only returned records for
>> a
>> >> single partition.
>> >>
>> >> How to fix this? Here be the dragons:
>> >>
>> >> "Practically, one would either have to store the mapping between input
>> >> partitions and transactional.ids in an external store, or have some
>> static
>> >> encoding of it. Kafka Streams opts for the latter approach to solve
>> this
>> >> problem."
>> >>
>> >> Well then practically, the rest of this code is not useful.
>> >>
>> >> while (true) {
>> >>    ConsumerRecords records = consumer.poll(Long.MAX_VALUE);
>> >>    producer.beginTransaction();
>> >>    for (ConsumerRecord record : records)
>> >>     producer.send(producerRecord(“outputTopic”, record));
>> >>    producer.sendOffsetsToTransaction(currentOffsets(consumer), group);
>> >>    producer.commitTransaction();
>> >> }
>> >>
>> >> When you call consumer.poll() (unless you manually assign to a single
>> >> partiion) you are going to receive records from one or more
>> partitions.  As
>> >> consumers join and leave the the group workers are going to move.
>> >>
>> >> There are two ways I can find to do this:
>> >> 1) use a consumerRebalanceListener and a concurrent map of producers
>> >> https://github.com/edwardcapriolo/kafos/blob/
>> >> 68782a5420840377dcb3ef94fb65132028804340/pure8/src/main/
>> >> java/pure8/multipartitionrpw/MultiPartitionRPWBuffered.java#L123
>> >>
>> >> 2) as you are polling create/reuse producers as needed.
>> >>
>> >> With scalable correctly fenced producers the loop looks more like this:
>> >>
>> >> ConsumerRecords<K,V> records = null;
>> >> try {
>> >>    records = consumer.poll(2000);
>> >> } catch (KafkaException e){
>> >>    return;
>> >> }
>> >>
>> >> for(ConsumerRecord<K,V> record: records) {
>> >>    int partition = record.partition();
>> >>    BufferedTrackingProducer<K,V> producer = producers.get(partition);
>> >>    List<ProducerRecord<K, V>> results = processor.process(record);
>> >>    producer.send(results);
>> >>
>> >> }
>> >> for (Entry<Integer, BufferedTrackingProducer<K, V>> entry:
>> producers.entrySet())
>> >> {
>> >>    entry.getValue().commitAndClear(records, groupId);
>> >> }
>> >>
>> >> Implementation here: https://github.com/edwardcapriolo/kafos/blob/
>> >> 68782a5420840377dcb3ef94fb65132028804340/pure8/src/main/
>> >> java/pure8/multipartitionrpw/MultiPartitionRPWBuffered.java#L1
>> >>
>> >> BTW maybe I am wrong about this and there is a better approach. With
>> the
>> >> multiple producer threads for single consumer.poll figuring out
>> exactly how
>> >> to layer the try/catch and exception handling becomes a bit harder
>> than the
>> >> example in the blog.
>> >>
>> >> Anyway,  here is what a Kafos chaos "test" looks like.
>> >>
>> >> https://github.com/edwardcapriolo/kafos/blob/
>> >> 68782a5420840377dcb3ef94fb65132028804340/pure8/src/main/
>> >> java/pure8/multipartitionrpw/MutliRPWWithConsumerRestartingTest.java#L1
>> >>
>> >> It is a bit silly that we are forking off processes like this, but I
>> did
>> >> not want processes running in the same VM.I am not ready to run
>> >> testcontainers/docker at this point. But hey we launch kafka, we
>> launch zk,
>> >> we start  workers, we kill them off we count things. Results look like
>> >> this.
>> >>
>> >>
>> >> Verification stats
>> >> ------------------------------------------------------------------
>> >> notFound: 0 foundCorrect: 4000 foundDuplications: 0
>> >> ------------------------------------------------------------------
>> >> not found list[]
>> >> serverid: 0 out line Stream closed
>> >>
>> >> I am going to do more work killing off servers and making more test
>> >> scenarios, but so far off to a good start. When you get all the code
>> right,
>> >> (by cobbling together all the sources out there with half the code)
>> you get
>> >> things that seem to do exactly once, Help wanted in acedemic review of
>> my
>> >> assertions and code or PRS
>> >>
>> >> Thanks
>> >> Edward
>> >>
>> >
>> >
>>
>>
> Kafka will throw you a warning if you try to mix consumer commit with
> transaction commits.
>
>
> --
> Sorry this was sent from mobile. Will do less grammar and spell check than
> usual.
>


Hey all!

I spent some more time hacking on this and I came up with a few updates

https://github.com/edwardcapriolo/kafos/commit/ca0f5ea6e5650799d5d60d892e75f7af5bcaba4c

I updated my code to randomly inject in Kafka exceptions during a
transaction. The working solution I have found looks like this:


   1. Store a (concurrent) map of int (partitionId) Producer
   2. Implement a ConsumerRebalanceListener
   3. onPartitionsAssigned(Collection<TopicPartition> topics) create a
   Producer with proper fencing id (topic-partition-group). Note according to
   the docs this MAY not be the entire list of partitions this instance is
   assigned
   4.  onPartitionsRevoked(Collection<TopicPartition> completeTopicList)
   clean the concurrent map of all entries where the key is NOT in this list
   of partitions
   5. In your main loop, poll with your consumer and process (in my case I
   keep wrap a the KafkaProducer with a buffer so that the transactions are
   only open for a short time)
   6. Once you are done processing all the records from the poll, attempt
   to commit for every producer
   7. If any producer fails
   8. close all the producers, close the consumer (do not let it call poll
   again)


The implementation looks like this:

https://github.com/edwardcapriolo/kafos/blob/ca0f5ea6e5650799d5d60d892e75f7af5bcaba4c/pure8/src/main/java/pure8/multipartitionrpw/MultiPartitionRPW.java#L1

Some notes: You REALLY REALLY needs a rebalance listener, because
fencing-ids! If you do not close the producer when the partition moves away
from you, whoever the partition rebalances to is fenced out!

Re: Attempt to prove Kafka transactions work

Posted by Edward Capriolo <ed...@gmail.com>.
On Wednesday, November 20, 2019, Matthias J. Sax <ma...@confluent.io>
wrote:

> I am not sure what Spring does, but using Kafka Streams writing the
> output and committing offset would be part of the same transaction.
>
> It seems Spring is doing something else and thus, is seems it does not
> use the EOS API correctly.
>
> If you use transactions to copy data from input to output topic,
> committing offsets must be done on the producer as part to the
> transaction; the consumer would not commit offsets.
>
> To me, it seems that Spring is committing offset using the consumer
> independent if the transaction was successful or not. This would be an
> incorrect usage of the API.
>
>
> -Matthias
>
> On 11/20/19 6:16 AM, Edward Capriolo wrote:
> > Ok. I'm at a point where I believe the exactly once is in question.
> >
> > Topic input 10 partitions topic output 10 partitions.
> >
> > Producer writes messages 1 to 100 to topic input.
> >
> > CTP process calls poll. It receives 100 messages 10 in each partiton.
> >
> > Process is simple mirroring take from input write to output.
> >
> > 10 producers with 10 transactional ids are created. During processing 1
> of
> > the 10 producers throws kafka exception. 90 out of 100 writes are
> committed
> > tranactionally 10 are not.
> >
> > If poll is called again 10 messages do not appear in the next poll. Are
> > they lost?
> >
> >
> >
> > On Saturday, November 9, 2019, Edward Capriolo <ed...@gmail.com>
> > wrote:
> >
> >>
> >> On Thu, Oct 31, 2019 at 6:11 AM Matthias J. Sax <ma...@confluent.io>
> >> wrote:
> >>
> >>> Quite a project to test transactions...
> >>>
> >>> The current system test suite is part of the code base:
> >>> https://github.com/apache/kafka/tree/trunk/tests/kafkatest/tests
> >>>
> >>> There is course also some unit/integration test for transactions.
> >>>
> >>> There is also a blog post that describes in a high level what testing
> >>> was done when EOS was introduced:
> >>> https://www.confluent.io/blog/exactly-once-semantics-are-
> >>> possible-heres-how-apache-kafka-does-it/
> >>>
> >>> And yes, transactions are built on some assumptions and if you
> configure
> >>> your system incorrectly of violate those assumptions, it may break. We
> >>> also fixed some bugs since the first release. And there might be more
> >>> bugs --- software is always buggy. However, for practical
> consideration,
> >>> transactions should work.
> >>>
> >>> We would of course love if you could share your test results! If you
> >>> discover a bug, please report it, so we can fix it.
> >>>
> >>>
> >>> -Matthias
> >>>
> >>> On 10/28/19 10:06 AM, Edward Capriolo wrote:
> >>>> On Sunday, October 27, 2019, Boyang Chen <re...@gmail.com>
> >>> wrote:
> >>>>
> >>>>> Hey Edward,
> >>>>>
> >>>>> just to summarize and make sure I understood your question, you want
> to
> >>>>> implement some Chaos testing to validate Kafka EOS model, but not
> sure
> >>> how
> >>>>> to start or curious about whether there are already works in the
> >>> community
> >>>>> doing that?
> >>>>>
> >>>>> For the correctness of Kafka EOS, we have tons of unit tests and
> system
> >>>>> tests to prove its functionality. They could be found inside the
> repo.
> >>> You
> >>>>> could check them out and see if we still have gaps (which I believe
> we
> >>>>> definitely have).
> >>>>>
> >>>>> Boyang
> >>>>>
> >>>>> On Fri, Oct 25, 2019 at 7:25 PM Edward Capriolo <
> edlinuxguru@gmail.com
> >>>>
> >>>>> wrote:
> >>>>>
> >>>>>> Hello all,
> >>>>>>
> >>>>>> I used to work in adtech. Adtech was great. CPM for ads is 1-$5 per
> >>>>>> thousand ad impression. If numbers are 5% off you can blame
> javascript
> >>>>>> click trackers.
> >>>>>>
> >>>>>> Now, I work in a non addtech industry and they are really, really
> >>> serious
> >>>>>> about exactly once.
> >>>>>>
> >>>>>> So there is this blog:
> >>>>>>
> >>>>>> https://www.confluent.io/blog/transactions-apache-kafka/
> >>>>>>
> >>>>>> Great little snippet of code. I think I can copy it and implement it
> >>>>>> correctly.
> >>>>>> You know, but if you read that section about the zombie fencing, you
> >>>>> learn
> >>>>>> that you either need to manually assign partitions or use the
> >>> rebalance
> >>>>>> listener and have N producers. Hunting around github is not super
> >>>>> helpful,
> >>>>>> some code snippets are less complete then even the snipped in the
> >>> blog.
> >>>>>>
> >>>>>> I looked at what spring-kafka does. It does get the zombie fencing
> >>>>> correct
> >>>>>> with respect to fencing id, other bits other bits of the code seem
> >>>>>> plausible.
> >>>>>>
> >>>>>> Notice I said "plausible", because I do not count a few end to end
> >>> tests
> >>>>>> running single VM as a solid enough evidence that this works in the
> >>> face
> >>>>> of
> >>>>>> failures.
> >>>>>>
> >>>>>> I have been contemplating how one stress tests this exactly once
> >>> concept,
> >>>>>> with something Jepsen like or something brute force that I can run
> >>> for 5
> >>>>>> hours in a row
> >>>>>>
> >>>>>> If I faithfully implemented the code in the transactional read-write
> >>> loop
> >>>>>> and I feed it into my jepsen like black box tester it should:
> >>>>>>
> >>>>>> Create a topic with 10 partitions, Start launching read-write
> >>> transaction
> >>>>>> code, start feeding input data, Maybe strings like 1 -1000, now
> start
> >>>>>> randomly killing vms with kill -9 kill graceful exits, maybe even
> >>> killing
> >>>>>> kafka, and make sure 1-1000, pop out on the other end.
> >>>>>>
> >>>>>> I thought of some other "crazy" ideas. One such idea:
> >>>>>>
> >>>>>> If I make a transactional "echo", read x; write x back to the same
> >>> topic.
> >>>>>> RunN instances of that and kill them randomly. If I am loosing
> >>> messages
> >>>>>> (and not duplicating messages) then the topic would eventually have
> no
> >>>>>> data..
> >>>>>>
> >>>>>> Or should I make a program with some math formula like receive x
> write
> >>>>> xx.
> >>>>>> If duplication is happening I would start seeing multiple xx's
> >>>>>>
> >>>>>> Or send 1,000,000,000 messages through and consumer logs them to a
> >>> file.
> >>>>>> Then use an etl tool to validate messages come out on the other
> side.
> >>>>>>
> >>>>>> Or should I use a nosql with increments and count up and ensure no
> key
> >>>>> has
> >>>>>> been incremented twice.
> >>>>>>
> >>>>>> note: I realize I can just use kafka streams or storm, which has its
> >>> own
> >>>>>> systems to guarantee "at most once" but Iooking for a way to prove
> >>> what
> >>>>> can
> >>>>>> be done with pure kafka. (and not just prove it adtech work (good
> >>> enough
> >>>>> 5%
> >>>>>> here or there) )
> >>>>>>
> >>>>>> I imagine someone somewhere must be doing this. How where tips? Is
> it
> >>>>> part
> >>>>>> of some kafka release stress test? I'm down to write it if it does
> not
> >>>>>> exist.
> >>>>>>
> >>>>>> Thanks,
> >>>>>> Edward,
> >>>>>>
> >>>>>> Thanks,
> >>>>>> Edward
> >>>>>>
> >>>>>
> >>>>
> >>>> Boyang,
> >>>>
> >>>> I just to summarize and make sure I understood your question, you want
> >>> to
> >>>> implement some Chaos testing to validate Kafka EOS model, but not sure
> >>> how
> >>>> to start or curious about whether there are already works in the
> >>> community
> >>>> doing that?
> >>>>
> >>>> Yes.
> >>>>
> >>>> I am not an expert in this field, but I know what distributed systems
> >>> can
> >>>> mask failures. For example if you have atomic increment you might unit
> >>> test
> >>>> it and it works fine, but if you ran it for 40 days it might double
> >>> count 1
> >>>> time.
> >>>>
> >>>>  of Kafka EOS, we have tons of unit tests and system
> >>>> tests to prove its functionality. They could be found inside the repo.
> >>>>
> >>>> I've been a developer for a while so the phrase "there are tests"
> never
> >>>> tells me everything. Tests reveal the presence of bugs not the
> absence.
> >>>>
> >>>> Can you please point me at the tests? My curiosity is if there is a
> >>>> systematic in-depth strategy here and how much rigor there is.
> >>>>
> >>>> In my environment I need to quantify and use rigor to prove out these
> >>>> things. Things that you might take for granted. For example, I have to
> >>>> prove that zookeeper works as expected when we lose a datacenter. Most
> >>>> people 'in the know' take it for granted that kafka and zk do what is
> >>>> advertised when configured properly. I have to test that out and
> >>> document
> >>>> my findings.
> >>>>
> >>>> For kafka transactions. The user space code needs to be written
> properly
> >>>> and configured properly along with the server being setup properly. It
> >>> is
> >>>> not enough for me to check out kafka run 'sbt test' and declare
> victory
> >>>> after the unit tests pass.
> >>>>
> >>>> What I am effectively looking for is the anti jepsen blog that
> says...We
> >>>> threw the kitchen sink at this and these transactions are bullet
> proof.
> >>>> Here is our methodology, here is some charts, here is xyz. Here is how
> >>> we
> >>>> run it every minor release
> >>>>
> >>>> I'm not trying to be a pita, educate me on how bullet proof this is
> and
> >>> how
> >>>> I can reproduce the results.
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>
> >>>
> >>
> >> All,
> >>
> >> After a few weeks of hacking at this I have found a few interesting
> >> things. First things first, introducing Kafos
> >>
> >> https://github.com/edwardcapriolo/kafos
> >>
> >> Kafos (think Kafka + Chaos) . Kafos is a project that attempts to prove
> >> out Kafka transactions do exacty-once by launching multiple servers and
> >> consume/producers, killing them off and counting how many messages land
> on
> >> the other end.
> >>
> >> Note: The project is a little raw at this point, (it has hard code to my
> >> home folders and some long Thread.sleep() calls in there. I will clean
> it
> >> up as I go,) but I have some early findings.
> >>
> >> From the blog here, I want to point out the following flaws:
> >>
> >> https://www.confluent.io/blog/transactions-apache-kafka/
> >>
> >> KafkaProducer producer = createKafkaProducer(
> >> “bootstrap.servers”, “localhost:9092”,
> >> “transactional.id”, “my-transactional-id”);
> >>
> >> 1) a fixed transaction.id effectively means the first worker fences all
> >> the other ones.While being exactly-once, there is exactly one worker
> and no
> >> parallelism.
> >>
> >> Spring has two schemes that seem to implement random information in the
> >> fencing ids, which I think defeats the purpose. But I need to dive in on
> >> that more.
> >> https://docs.spring.io/spring-kafka/reference/html/
> >>
> >> Transactions are enabled by providing the DefaultKafkaProducerFactory
> >> with a transactionIdPrefix. In that case, instead of managing a single
> >> shared Producer, the factory maintains a cache of transactional
> >> producers. When the user calls close() on a producer, it is returned to
> >> the cache for reuse instead of actually being closed. The
> transactional.id
> >> property of each producer is transactionIdPrefix + n, where n starts
> with
> >> 0 and is incremented for each new producer, unless the transaction is
> >> started by a listener container with a record-based listener. In that
> case,
> >> the transactional.id is <transactionIdPrefix>.<group.id
> >>> .<topic>.<partition>. This is to properly support fencing zombies, as
> >> described here <https://www.confluent.io/blog/transactions-apache-
> kafka/>.
> >> This new behavior was added in versions 1.3.7, 2.0.6, 2.1.10, and
> 2.2.0. If
> >> you wish to revert to the previous behavior, you can set the
> >> producerPerConsumerPartition property on the DefaultKafkaProducerFactory
> >> to false.
> >>
> >> While transactions are supported with batch listeners, by default,
> zombie
> >> fencing is not supported because a batch may contain records from
> multiple
> >> topics or partitions. However, starting with version 2.3.2, zombie
> fencing
> >> is supported if you set the container property subBatchPerPartition to
> >> true. In that case, the batch listener is invoked once per partition
> >> received from the last poll, as if each poll only returned records for a
> >> single partition.
> >>
> >> How to fix this? Here be the dragons:
> >>
> >> "Practically, one would either have to store the mapping between input
> >> partitions and transactional.ids in an external store, or have some
> static
> >> encoding of it. Kafka Streams opts for the latter approach to solve this
> >> problem."
> >>
> >> Well then practically, the rest of this code is not useful.
> >>
> >> while (true) {
> >>    ConsumerRecords records = consumer.poll(Long.MAX_VALUE);
> >>    producer.beginTransaction();
> >>    for (ConsumerRecord record : records)
> >>     producer.send(producerRecord(“outputTopic”, record));
> >>    producer.sendOffsetsToTransaction(currentOffsets(consumer), group);
> >>    producer.commitTransaction();
> >> }
> >>
> >> When you call consumer.poll() (unless you manually assign to a single
> >> partiion) you are going to receive records from one or more
> partitions.  As
> >> consumers join and leave the the group workers are going to move.
> >>
> >> There are two ways I can find to do this:
> >> 1) use a consumerRebalanceListener and a concurrent map of producers
> >> https://github.com/edwardcapriolo/kafos/blob/
> >> 68782a5420840377dcb3ef94fb65132028804340/pure8/src/main/
> >> java/pure8/multipartitionrpw/MultiPartitionRPWBuffered.java#L123
> >>
> >> 2) as you are polling create/reuse producers as needed.
> >>
> >> With scalable correctly fenced producers the loop looks more like this:
> >>
> >> ConsumerRecords<K,V> records = null;
> >> try {
> >>    records = consumer.poll(2000);
> >> } catch (KafkaException e){
> >>    return;
> >> }
> >>
> >> for(ConsumerRecord<K,V> record: records) {
> >>    int partition = record.partition();
> >>    BufferedTrackingProducer<K,V> producer = producers.get(partition);
> >>    List<ProducerRecord<K, V>> results = processor.process(record);
> >>    producer.send(results);
> >>
> >> }
> >> for (Entry<Integer, BufferedTrackingProducer<K, V>> entry:
> producers.entrySet())
> >> {
> >>    entry.getValue().commitAndClear(records, groupId);
> >> }
> >>
> >> Implementation here: https://github.com/edwardcapriolo/kafos/blob/
> >> 68782a5420840377dcb3ef94fb65132028804340/pure8/src/main/
> >> java/pure8/multipartitionrpw/MultiPartitionRPWBuffered.java#L1
> >>
> >> BTW maybe I am wrong about this and there is a better approach. With the
> >> multiple producer threads for single consumer.poll figuring out exactly
> how
> >> to layer the try/catch and exception handling becomes a bit harder than
> the
> >> example in the blog.
> >>
> >> Anyway,  here is what a Kafos chaos "test" looks like.
> >>
> >> https://github.com/edwardcapriolo/kafos/blob/
> >> 68782a5420840377dcb3ef94fb65132028804340/pure8/src/main/
> >> java/pure8/multipartitionrpw/MutliRPWWithConsumerRestartingTest.java#L1
> >>
> >> It is a bit silly that we are forking off processes like this, but I did
> >> not want processes running in the same VM.I am not ready to run
> >> testcontainers/docker at this point. But hey we launch kafka, we launch
> zk,
> >> we start  workers, we kill them off we count things. Results look like
> >> this.
> >>
> >>
> >> Verification stats
> >> ------------------------------------------------------------------
> >> notFound: 0 foundCorrect: 4000 foundDuplications: 0
> >> ------------------------------------------------------------------
> >> not found list[]
> >> serverid: 0 out line Stream closed
> >>
> >> I am going to do more work killing off servers and making more test
> >> scenarios, but so far off to a good start. When you get all the code
> right,
> >> (by cobbling together all the sources out there with half the code) you
> get
> >> things that seem to do exactly once, Help wanted in acedemic review of
> my
> >> assertions and code or PRS
> >>
> >> Thanks
> >> Edward
> >>
> >
> >
>
>
Kafka will throw you a warning if you try to mix consumer commit with
transaction commits.


-- 
Sorry this was sent from mobile. Will do less grammar and spell check than
usual.

Re: Attempt to prove Kafka transactions work

Posted by "Matthias J. Sax" <ma...@confluent.io>.
I am not sure what Spring does, but using Kafka Streams writing the
output and committing offset would be part of the same transaction.

It seems Spring is doing something else and thus, is seems it does not
use the EOS API correctly.

If you use transactions to copy data from input to output topic,
committing offsets must be done on the producer as part to the
transaction; the consumer would not commit offsets.

To me, it seems that Spring is committing offset using the consumer
independent if the transaction was successful or not. This would be an
incorrect usage of the API.


-Matthias

On 11/20/19 6:16 AM, Edward Capriolo wrote:
> Ok. I'm at a point where I believe the exactly once is in question.
> 
> Topic input 10 partitions topic output 10 partitions.
> 
> Producer writes messages 1 to 100 to topic input.
> 
> CTP process calls poll. It receives 100 messages 10 in each partiton.
> 
> Process is simple mirroring take from input write to output.
> 
> 10 producers with 10 transactional ids are created. During processing 1 of
> the 10 producers throws kafka exception. 90 out of 100 writes are committed
> tranactionally 10 are not.
> 
> If poll is called again 10 messages do not appear in the next poll. Are
> they lost?
> 
> 
> 
> On Saturday, November 9, 2019, Edward Capriolo <ed...@gmail.com>
> wrote:
> 
>>
>> On Thu, Oct 31, 2019 at 6:11 AM Matthias J. Sax <ma...@confluent.io>
>> wrote:
>>
>>> Quite a project to test transactions...
>>>
>>> The current system test suite is part of the code base:
>>> https://github.com/apache/kafka/tree/trunk/tests/kafkatest/tests
>>>
>>> There is course also some unit/integration test for transactions.
>>>
>>> There is also a blog post that describes in a high level what testing
>>> was done when EOS was introduced:
>>> https://www.confluent.io/blog/exactly-once-semantics-are-
>>> possible-heres-how-apache-kafka-does-it/
>>>
>>> And yes, transactions are built on some assumptions and if you configure
>>> your system incorrectly of violate those assumptions, it may break. We
>>> also fixed some bugs since the first release. And there might be more
>>> bugs --- software is always buggy. However, for practical consideration,
>>> transactions should work.
>>>
>>> We would of course love if you could share your test results! If you
>>> discover a bug, please report it, so we can fix it.
>>>
>>>
>>> -Matthias
>>>
>>> On 10/28/19 10:06 AM, Edward Capriolo wrote:
>>>> On Sunday, October 27, 2019, Boyang Chen <re...@gmail.com>
>>> wrote:
>>>>
>>>>> Hey Edward,
>>>>>
>>>>> just to summarize and make sure I understood your question, you want to
>>>>> implement some Chaos testing to validate Kafka EOS model, but not sure
>>> how
>>>>> to start or curious about whether there are already works in the
>>> community
>>>>> doing that?
>>>>>
>>>>> For the correctness of Kafka EOS, we have tons of unit tests and system
>>>>> tests to prove its functionality. They could be found inside the repo.
>>> You
>>>>> could check them out and see if we still have gaps (which I believe we
>>>>> definitely have).
>>>>>
>>>>> Boyang
>>>>>
>>>>> On Fri, Oct 25, 2019 at 7:25 PM Edward Capriolo <edlinuxguru@gmail.com
>>>>
>>>>> wrote:
>>>>>
>>>>>> Hello all,
>>>>>>
>>>>>> I used to work in adtech. Adtech was great. CPM for ads is 1-$5 per
>>>>>> thousand ad impression. If numbers are 5% off you can blame javascript
>>>>>> click trackers.
>>>>>>
>>>>>> Now, I work in a non addtech industry and they are really, really
>>> serious
>>>>>> about exactly once.
>>>>>>
>>>>>> So there is this blog:
>>>>>>
>>>>>> https://www.confluent.io/blog/transactions-apache-kafka/
>>>>>>
>>>>>> Great little snippet of code. I think I can copy it and implement it
>>>>>> correctly.
>>>>>> You know, but if you read that section about the zombie fencing, you
>>>>> learn
>>>>>> that you either need to manually assign partitions or use the
>>> rebalance
>>>>>> listener and have N producers. Hunting around github is not super
>>>>> helpful,
>>>>>> some code snippets are less complete then even the snipped in the
>>> blog.
>>>>>>
>>>>>> I looked at what spring-kafka does. It does get the zombie fencing
>>>>> correct
>>>>>> with respect to fencing id, other bits other bits of the code seem
>>>>>> plausible.
>>>>>>
>>>>>> Notice I said "plausible", because I do not count a few end to end
>>> tests
>>>>>> running single VM as a solid enough evidence that this works in the
>>> face
>>>>> of
>>>>>> failures.
>>>>>>
>>>>>> I have been contemplating how one stress tests this exactly once
>>> concept,
>>>>>> with something Jepsen like or something brute force that I can run
>>> for 5
>>>>>> hours in a row
>>>>>>
>>>>>> If I faithfully implemented the code in the transactional read-write
>>> loop
>>>>>> and I feed it into my jepsen like black box tester it should:
>>>>>>
>>>>>> Create a topic with 10 partitions, Start launching read-write
>>> transaction
>>>>>> code, start feeding input data, Maybe strings like 1 -1000, now start
>>>>>> randomly killing vms with kill -9 kill graceful exits, maybe even
>>> killing
>>>>>> kafka, and make sure 1-1000, pop out on the other end.
>>>>>>
>>>>>> I thought of some other "crazy" ideas. One such idea:
>>>>>>
>>>>>> If I make a transactional "echo", read x; write x back to the same
>>> topic.
>>>>>> RunN instances of that and kill them randomly. If I am loosing
>>> messages
>>>>>> (and not duplicating messages) then the topic would eventually have no
>>>>>> data..
>>>>>>
>>>>>> Or should I make a program with some math formula like receive x write
>>>>> xx.
>>>>>> If duplication is happening I would start seeing multiple xx's
>>>>>>
>>>>>> Or send 1,000,000,000 messages through and consumer logs them to a
>>> file.
>>>>>> Then use an etl tool to validate messages come out on the other side.
>>>>>>
>>>>>> Or should I use a nosql with increments and count up and ensure no key
>>>>> has
>>>>>> been incremented twice.
>>>>>>
>>>>>> note: I realize I can just use kafka streams or storm, which has its
>>> own
>>>>>> systems to guarantee "at most once" but Iooking for a way to prove
>>> what
>>>>> can
>>>>>> be done with pure kafka. (and not just prove it adtech work (good
>>> enough
>>>>> 5%
>>>>>> here or there) )
>>>>>>
>>>>>> I imagine someone somewhere must be doing this. How where tips? Is it
>>>>> part
>>>>>> of some kafka release stress test? I'm down to write it if it does not
>>>>>> exist.
>>>>>>
>>>>>> Thanks,
>>>>>> Edward,
>>>>>>
>>>>>> Thanks,
>>>>>> Edward
>>>>>>
>>>>>
>>>>
>>>> Boyang,
>>>>
>>>> I just to summarize and make sure I understood your question, you want
>>> to
>>>> implement some Chaos testing to validate Kafka EOS model, but not sure
>>> how
>>>> to start or curious about whether there are already works in the
>>> community
>>>> doing that?
>>>>
>>>> Yes.
>>>>
>>>> I am not an expert in this field, but I know what distributed systems
>>> can
>>>> mask failures. For example if you have atomic increment you might unit
>>> test
>>>> it and it works fine, but if you ran it for 40 days it might double
>>> count 1
>>>> time.
>>>>
>>>>  of Kafka EOS, we have tons of unit tests and system
>>>> tests to prove its functionality. They could be found inside the repo.
>>>>
>>>> I've been a developer for a while so the phrase "there are tests" never
>>>> tells me everything. Tests reveal the presence of bugs not the absence.
>>>>
>>>> Can you please point me at the tests? My curiosity is if there is a
>>>> systematic in-depth strategy here and how much rigor there is.
>>>>
>>>> In my environment I need to quantify and use rigor to prove out these
>>>> things. Things that you might take for granted. For example, I have to
>>>> prove that zookeeper works as expected when we lose a datacenter. Most
>>>> people 'in the know' take it for granted that kafka and zk do what is
>>>> advertised when configured properly. I have to test that out and
>>> document
>>>> my findings.
>>>>
>>>> For kafka transactions. The user space code needs to be written properly
>>>> and configured properly along with the server being setup properly. It
>>> is
>>>> not enough for me to check out kafka run 'sbt test' and declare victory
>>>> after the unit tests pass.
>>>>
>>>> What I am effectively looking for is the anti jepsen blog that says...We
>>>> threw the kitchen sink at this and these transactions are bullet proof.
>>>> Here is our methodology, here is some charts, here is xyz. Here is how
>>> we
>>>> run it every minor release
>>>>
>>>> I'm not trying to be a pita, educate me on how bullet proof this is and
>>> how
>>>> I can reproduce the results.
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>
>>>
>>
>> All,
>>
>> After a few weeks of hacking at this I have found a few interesting
>> things. First things first, introducing Kafos
>>
>> https://github.com/edwardcapriolo/kafos
>>
>> Kafos (think Kafka + Chaos) . Kafos is a project that attempts to prove
>> out Kafka transactions do exacty-once by launching multiple servers and
>> consume/producers, killing them off and counting how many messages land on
>> the other end.
>>
>> Note: The project is a little raw at this point, (it has hard code to my
>> home folders and some long Thread.sleep() calls in there. I will clean it
>> up as I go,) but I have some early findings.
>>
>> From the blog here, I want to point out the following flaws:
>>
>> https://www.confluent.io/blog/transactions-apache-kafka/
>>
>> KafkaProducer producer = createKafkaProducer(
>> “bootstrap.servers”, “localhost:9092”,
>> “transactional.id”, “my-transactional-id”);
>>
>> 1) a fixed transaction.id effectively means the first worker fences all
>> the other ones.While being exactly-once, there is exactly one worker and no
>> parallelism.
>>
>> Spring has two schemes that seem to implement random information in the
>> fencing ids, which I think defeats the purpose. But I need to dive in on
>> that more.
>> https://docs.spring.io/spring-kafka/reference/html/
>>
>> Transactions are enabled by providing the DefaultKafkaProducerFactory
>> with a transactionIdPrefix. In that case, instead of managing a single
>> shared Producer, the factory maintains a cache of transactional
>> producers. When the user calls close() on a producer, it is returned to
>> the cache for reuse instead of actually being closed. The transactional.id
>> property of each producer is transactionIdPrefix + n, where n starts with
>> 0 and is incremented for each new producer, unless the transaction is
>> started by a listener container with a record-based listener. In that case,
>> the transactional.id is <transactionIdPrefix>.<group.id
>>> .<topic>.<partition>. This is to properly support fencing zombies, as
>> described here <https://www.confluent.io/blog/transactions-apache-kafka/>.
>> This new behavior was added in versions 1.3.7, 2.0.6, 2.1.10, and 2.2.0. If
>> you wish to revert to the previous behavior, you can set the
>> producerPerConsumerPartition property on the DefaultKafkaProducerFactory
>> to false.
>>
>> While transactions are supported with batch listeners, by default, zombie
>> fencing is not supported because a batch may contain records from multiple
>> topics or partitions. However, starting with version 2.3.2, zombie fencing
>> is supported if you set the container property subBatchPerPartition to
>> true. In that case, the batch listener is invoked once per partition
>> received from the last poll, as if each poll only returned records for a
>> single partition.
>>
>> How to fix this? Here be the dragons:
>>
>> "Practically, one would either have to store the mapping between input
>> partitions and transactional.ids in an external store, or have some static
>> encoding of it. Kafka Streams opts for the latter approach to solve this
>> problem."
>>
>> Well then practically, the rest of this code is not useful.
>>
>> while (true) {
>>    ConsumerRecords records = consumer.poll(Long.MAX_VALUE);
>>    producer.beginTransaction();
>>    for (ConsumerRecord record : records)
>>     producer.send(producerRecord(“outputTopic”, record));
>>    producer.sendOffsetsToTransaction(currentOffsets(consumer), group);
>>    producer.commitTransaction();
>> }
>>
>> When you call consumer.poll() (unless you manually assign to a single
>> partiion) you are going to receive records from one or more partitions.  As
>> consumers join and leave the the group workers are going to move.
>>
>> There are two ways I can find to do this:
>> 1) use a consumerRebalanceListener and a concurrent map of producers
>> https://github.com/edwardcapriolo/kafos/blob/
>> 68782a5420840377dcb3ef94fb65132028804340/pure8/src/main/
>> java/pure8/multipartitionrpw/MultiPartitionRPWBuffered.java#L123
>>
>> 2) as you are polling create/reuse producers as needed.
>>
>> With scalable correctly fenced producers the loop looks more like this:
>>
>> ConsumerRecords<K,V> records = null;
>> try {
>>    records = consumer.poll(2000);
>> } catch (KafkaException e){
>>    return;
>> }
>>
>> for(ConsumerRecord<K,V> record: records) {
>>    int partition = record.partition();
>>    BufferedTrackingProducer<K,V> producer = producers.get(partition);
>>    List<ProducerRecord<K, V>> results = processor.process(record);
>>    producer.send(results);
>>
>> }
>> for (Entry<Integer, BufferedTrackingProducer<K, V>> entry: producers.entrySet())
>> {
>>    entry.getValue().commitAndClear(records, groupId);
>> }
>>
>> Implementation here: https://github.com/edwardcapriolo/kafos/blob/
>> 68782a5420840377dcb3ef94fb65132028804340/pure8/src/main/
>> java/pure8/multipartitionrpw/MultiPartitionRPWBuffered.java#L1
>>
>> BTW maybe I am wrong about this and there is a better approach. With the
>> multiple producer threads for single consumer.poll figuring out exactly how
>> to layer the try/catch and exception handling becomes a bit harder than the
>> example in the blog.
>>
>> Anyway,  here is what a Kafos chaos "test" looks like.
>>
>> https://github.com/edwardcapriolo/kafos/blob/
>> 68782a5420840377dcb3ef94fb65132028804340/pure8/src/main/
>> java/pure8/multipartitionrpw/MutliRPWWithConsumerRestartingTest.java#L1
>>
>> It is a bit silly that we are forking off processes like this, but I did
>> not want processes running in the same VM.I am not ready to run
>> testcontainers/docker at this point. But hey we launch kafka, we launch zk,
>> we start  workers, we kill them off we count things. Results look like
>> this.
>>
>>
>> Verification stats
>> ------------------------------------------------------------------
>> notFound: 0 foundCorrect: 4000 foundDuplications: 0
>> ------------------------------------------------------------------
>> not found list[]
>> serverid: 0 out line Stream closed
>>
>> I am going to do more work killing off servers and making more test
>> scenarios, but so far off to a good start. When you get all the code right,
>> (by cobbling together all the sources out there with half the code) you get
>> things that seem to do exactly once, Help wanted in acedemic review of my
>> assertions and code or PRS
>>
>> Thanks
>> Edward
>>
> 
> 


Re: Attempt to prove Kafka transactions work

Posted by Edward Capriolo <ed...@gmail.com>.
On Wednesday, November 20, 2019, Eric Azama <ea...@gmail.com> wrote:

> Calls to KafkaConsumer#poll() are completely independent of commits. As
> such they will always return the next set of records, even if the previous
> set have not been committed. This is how the consumer acts, regardless of
> the Exactly Once semantics.
>
> In order for the Consumer to reset to the currently committed offsets, you
> need to either initiate a Consumer Group Rebalance, or use a combination of
> the KafkaConsumer#committed() and KafkaConsumer#seek() methods.
>
> On Wed, Nov 20, 2019 at 6:16 AM Edward Capriolo <ed...@gmail.com>
> wrote:
>
> > Ok. I'm at a point where I believe the exactly once is in question.
> >
> > Topic input 10 partitions topic output 10 partitions.
> >
> > Producer writes messages 1 to 100 to topic input.
> >
> > CTP process calls poll. It receives 100 messages 10 in each partiton.
> >
> > Process is simple mirroring take from input write to output.
> >
> > 10 producers with 10 transactional ids are created. During processing 1
> of
> > the 10 producers throws kafka exception. 90 out of 100 writes are
> committed
> > tranactionally 10 are not.
> >
> > If poll is called again 10 messages do not appear in the next poll. Are
> > they lost?
> >
> >
> >
> > On Saturday, November 9, 2019, Edward Capriolo <ed...@gmail.com>
> > wrote:
> >
> > >
> > > On Thu, Oct 31, 2019 at 6:11 AM Matthias J. Sax <matthias@confluent.io
> >
> > > wrote:
> > >
> > >> Quite a project to test transactions...
> > >>
> > >> The current system test suite is part of the code base:
> > >> https://github.com/apache/kafka/tree/trunk/tests/kafkatest/tests
> > >>
> > >> There is course also some unit/integration test for transactions.
> > >>
> > >> There is also a blog post that describes in a high level what testing
> > >> was done when EOS was introduced:
> > >> https://www.confluent.io/blog/exactly-once-semantics-are-
> > >> possible-heres-how-apache-kafka-does-it/
> > >>
> > >> And yes, transactions are built on some assumptions and if you
> configure
> > >> your system incorrectly of violate those assumptions, it may break. We
> > >> also fixed some bugs since the first release. And there might be more
> > >> bugs --- software is always buggy. However, for practical
> consideration,
> > >> transactions should work.
> > >>
> > >> We would of course love if you could share your test results! If you
> > >> discover a bug, please report it, so we can fix it.
> > >>
> > >>
> > >> -Matthias
> > >>
> > >> On 10/28/19 10:06 AM, Edward Capriolo wrote:
> > >> > On Sunday, October 27, 2019, Boyang Chen <
> reluctanthero104@gmail.com>
> > >> wrote:
> > >> >
> > >> >> Hey Edward,
> > >> >>
> > >> >> just to summarize and make sure I understood your question, you
> want
> > to
> > >> >> implement some Chaos testing to validate Kafka EOS model, but not
> > sure
> > >> how
> > >> >> to start or curious about whether there are already works in the
> > >> community
> > >> >> doing that?
> > >> >>
> > >> >> For the correctness of Kafka EOS, we have tons of unit tests and
> > system
> > >> >> tests to prove its functionality. They could be found inside the
> > repo.
> > >> You
> > >> >> could check them out and see if we still have gaps (which I believe
> > we
> > >> >> definitely have).
> > >> >>
> > >> >> Boyang
> > >> >>
> > >> >> On Fri, Oct 25, 2019 at 7:25 PM Edward Capriolo <
> > edlinuxguru@gmail.com
> > >> >
> > >> >> wrote:
> > >> >>
> > >> >>> Hello all,
> > >> >>>
> > >> >>> I used to work in adtech. Adtech was great. CPM for ads is 1-$5
> per
> > >> >>> thousand ad impression. If numbers are 5% off you can blame
> > javascript
> > >> >>> click trackers.
> > >> >>>
> > >> >>> Now, I work in a non addtech industry and they are really, really
> > >> serious
> > >> >>> about exactly once.
> > >> >>>
> > >> >>> So there is this blog:
> > >> >>>
> > >> >>> https://www.confluent.io/blog/transactions-apache-kafka/
> > >> >>>
> > >> >>> Great little snippet of code. I think I can copy it and implement
> it
> > >> >>> correctly.
> > >> >>> You know, but if you read that section about the zombie fencing,
> you
> > >> >> learn
> > >> >>> that you either need to manually assign partitions or use the
> > >> rebalance
> > >> >>> listener and have N producers. Hunting around github is not super
> > >> >> helpful,
> > >> >>> some code snippets are less complete then even the snipped in the
> > >> blog.
> > >> >>>
> > >> >>> I looked at what spring-kafka does. It does get the zombie fencing
> > >> >> correct
> > >> >>> with respect to fencing id, other bits other bits of the code seem
> > >> >>> plausible.
> > >> >>>
> > >> >>> Notice I said "plausible", because I do not count a few end to end
> > >> tests
> > >> >>> running single VM as a solid enough evidence that this works in
> the
> > >> face
> > >> >> of
> > >> >>> failures.
> > >> >>>
> > >> >>> I have been contemplating how one stress tests this exactly once
> > >> concept,
> > >> >>> with something Jepsen like or something brute force that I can run
> > >> for 5
> > >> >>> hours in a row
> > >> >>>
> > >> >>> If I faithfully implemented the code in the transactional
> read-write
> > >> loop
> > >> >>> and I feed it into my jepsen like black box tester it should:
> > >> >>>
> > >> >>> Create a topic with 10 partitions, Start launching read-write
> > >> transaction
> > >> >>> code, start feeding input data, Maybe strings like 1 -1000, now
> > start
> > >> >>> randomly killing vms with kill -9 kill graceful exits, maybe even
> > >> killing
> > >> >>> kafka, and make sure 1-1000, pop out on the other end.
> > >> >>>
> > >> >>> I thought of some other "crazy" ideas. One such idea:
> > >> >>>
> > >> >>> If I make a transactional "echo", read x; write x back to the same
> > >> topic.
> > >> >>> RunN instances of that and kill them randomly. If I am loosing
> > >> messages
> > >> >>> (and not duplicating messages) then the topic would eventually
> have
> > no
> > >> >>> data..
> > >> >>>
> > >> >>> Or should I make a program with some math formula like receive x
> > write
> > >> >> xx.
> > >> >>> If duplication is happening I would start seeing multiple xx's
> > >> >>>
> > >> >>> Or send 1,000,000,000 messages through and consumer logs them to a
> > >> file.
> > >> >>> Then use an etl tool to validate messages come out on the other
> > side.
> > >> >>>
> > >> >>> Or should I use a nosql with increments and count up and ensure no
> > key
> > >> >> has
> > >> >>> been incremented twice.
> > >> >>>
> > >> >>> note: I realize I can just use kafka streams or storm, which has
> its
> > >> own
> > >> >>> systems to guarantee "at most once" but Iooking for a way to prove
> > >> what
> > >> >> can
> > >> >>> be done with pure kafka. (and not just prove it adtech work (good
> > >> enough
> > >> >> 5%
> > >> >>> here or there) )
> > >> >>>
> > >> >>> I imagine someone somewhere must be doing this. How where tips? Is
> > it
> > >> >> part
> > >> >>> of some kafka release stress test? I'm down to write it if it does
> > not
> > >> >>> exist.
> > >> >>>
> > >> >>> Thanks,
> > >> >>> Edward,
> > >> >>>
> > >> >>> Thanks,
> > >> >>> Edward
> > >> >>>
> > >> >>
> > >> >
> > >> > Boyang,
> > >> >
> > >> > I just to summarize and make sure I understood your question, you
> want
> > >> to
> > >> > implement some Chaos testing to validate Kafka EOS model, but not
> sure
> > >> how
> > >> > to start or curious about whether there are already works in the
> > >> community
> > >> > doing that?
> > >> >
> > >> > Yes.
> > >> >
> > >> > I am not an expert in this field, but I know what distributed
> systems
> > >> can
> > >> > mask failures. For example if you have atomic increment you might
> unit
> > >> test
> > >> > it and it works fine, but if you ran it for 40 days it might double
> > >> count 1
> > >> > time.
> > >> >
> > >> >  of Kafka EOS, we have tons of unit tests and system
> > >> > tests to prove its functionality. They could be found inside the
> repo.
> > >> >
> > >> > I've been a developer for a while so the phrase "there are tests"
> > never
> > >> > tells me everything. Tests reveal the presence of bugs not the
> > absence.
> > >> >
> > >> > Can you please point me at the tests? My curiosity is if there is a
> > >> > systematic in-depth strategy here and how much rigor there is.
> > >> >
> > >> > In my environment I need to quantify and use rigor to prove out
> these
> > >> > things. Things that you might take for granted. For example, I have
> to
> > >> > prove that zookeeper works as expected when we lose a datacenter.
> Most
> > >> > people 'in the know' take it for granted that kafka and zk do what
> is
> > >> > advertised when configured properly. I have to test that out and
> > >> document
> > >> > my findings.
> > >> >
> > >> > For kafka transactions. The user space code needs to be written
> > properly
> > >> > and configured properly along with the server being setup properly.
> It
> > >> is
> > >> > not enough for me to check out kafka run 'sbt test' and declare
> > victory
> > >> > after the unit tests pass.
> > >> >
> > >> > What I am effectively looking for is the anti jepsen blog that
> > says...We
> > >> > threw the kitchen sink at this and these transactions are bullet
> > proof.
> > >> > Here is our methodology, here is some charts, here is xyz. Here is
> how
> > >> we
> > >> > run it every minor release
> > >> >
> > >> > I'm not trying to be a pita, educate me on how bullet proof this is
> > and
> > >> how
> > >> > I can reproduce the results.
> > >> >
> > >> >
> > >> >
> > >> >
> > >> >
> > >> >
> > >> >
> > >>
> > >>
> > >
> > > All,
> > >
> > > After a few weeks of hacking at this I have found a few interesting
> > > things. First things first, introducing Kafos
> > >
> > > https://github.com/edwardcapriolo/kafos
> > >
> > > Kafos (think Kafka + Chaos) . Kafos is a project that attempts to prove
> > > out Kafka transactions do exacty-once by launching multiple servers and
> > > consume/producers, killing them off and counting how many messages land
> > on
> > > the other end.
> > >
> > > Note: The project is a little raw at this point, (it has hard code to
> my
> > > home folders and some long Thread.sleep() calls in there. I will clean
> it
> > > up as I go,) but I have some early findings.
> > >
> > > From the blog here, I want to point out the following flaws:
> > >
> > > https://www.confluent.io/blog/transactions-apache-kafka/
> > >
> > > KafkaProducer producer = createKafkaProducer(
> > > “bootstrap.servers”, “localhost:9092”,
> > > “transactional.id”, “my-transactional-id”);
> > >
> > > 1) a fixed transaction.id effectively means the first worker fences
> all
> > > the other ones.While being exactly-once, there is exactly one worker
> and
> > no
> > > parallelism.
> > >
> > > Spring has two schemes that seem to implement random information in the
> > > fencing ids, which I think defeats the purpose. But I need to dive in
> on
> > > that more.
> > > https://docs.spring.io/spring-kafka/reference/html/
> > >
> > > Transactions are enabled by providing the DefaultKafkaProducerFactory
> > > with a transactionIdPrefix. In that case, instead of managing a single
> > > shared Producer, the factory maintains a cache of transactional
> > > producers. When the user calls close() on a producer, it is returned to
> > > the cache for reuse instead of actually being closed. The
> > transactional.id
> > > property of each producer is transactionIdPrefix + n, where n starts
> with
> > > 0 and is incremented for each new producer, unless the transaction is
> > > started by a listener container with a record-based listener. In that
> > case,
> > > the transactional.id is <transactionIdPrefix>.<group.id
> > > >.<topic>.<partition>. This is to properly support fencing zombies, as
> > > described here <https://www.confluent.io/blog/transactions-apache-
> kafka/
> > >.
> > > This new behavior was added in versions 1.3.7, 2.0.6, 2.1.10, and
> 2.2.0.
> > If
> > > you wish to revert to the previous behavior, you can set the
> > > producerPerConsumerPartition property on the
> DefaultKafkaProducerFactory
> > > to false.
> > >
> > > While transactions are supported with batch listeners, by default,
> zombie
> > > fencing is not supported because a batch may contain records from
> > multiple
> > > topics or partitions. However, starting with version 2.3.2, zombie
> > fencing
> > > is supported if you set the container property subBatchPerPartition to
> > > true. In that case, the batch listener is invoked once per partition
> > > received from the last poll, as if each poll only returned records for
> a
> > > single partition.
> > >
> > > How to fix this? Here be the dragons:
> > >
> > > "Practically, one would either have to store the mapping between input
> > > partitions and transactional.ids in an external store, or have some
> > static
> > > encoding of it. Kafka Streams opts for the latter approach to solve
> this
> > > problem."
> > >
> > > Well then practically, the rest of this code is not useful.
> > >
> > > while (true) {
> > >    ConsumerRecords records = consumer.poll(Long.MAX_VALUE);
> > >    producer.beginTransaction();
> > >    for (ConsumerRecord record : records)
> > >     producer.send(producerRecord(“outputTopic”, record));
> > >    producer.sendOffsetsToTransaction(currentOffsets(consumer), group);
> > >    producer.commitTransaction();
> > > }
> > >
> > > When you call consumer.poll() (unless you manually assign to a single
> > > partiion) you are going to receive records from one or more partitions.
> > As
> > > consumers join and leave the the group workers are going to move.
> > >
> > > There are two ways I can find to do this:
> > > 1) use a consumerRebalanceListener and a concurrent map of producers
> > > https://github.com/edwardcapriolo/kafos/blob/
> > > 68782a5420840377dcb3ef94fb65132028804340/pure8/src/main/
> > > java/pure8/multipartitionrpw/MultiPartitionRPWBuffered.java#L123
> > >
> > > 2) as you are polling create/reuse producers as needed.
> > >
> > > With scalable correctly fenced producers the loop looks more like this:
> > >
> > > ConsumerRecords<K,V> records = null;
> > > try {
> > >    records = consumer.poll(2000);
> > > } catch (KafkaException e){
> > >    return;
> > > }
> > >
> > > for(ConsumerRecord<K,V> record: records) {
> > >    int partition = record.partition();
> > >    BufferedTrackingProducer<K,V> producer = producers.get(partition);
> > >    List<ProducerRecord<K, V>> results = processor.process(record);
> > >    producer.send(results);
> > >
> > > }
> > > for (Entry<Integer, BufferedTrackingProducer<K, V>> entry:
> > producers.entrySet())
> > > {
> > >    entry.getValue().commitAndClear(records, groupId);
> > > }
> > >
> > > Implementation here: https://github.com/edwardcapriolo/kafos/blob/
> > > 68782a5420840377dcb3ef94fb65132028804340/pure8/src/main/
> > > java/pure8/multipartitionrpw/MultiPartitionRPWBuffered.java#L1
> > >
> > > BTW maybe I am wrong about this and there is a better approach. With
> the
> > > multiple producer threads for single consumer.poll figuring out exactly
> > how
> > > to layer the try/catch and exception handling becomes a bit harder than
> > the
> > > example in the blog.
> > >
> > > Anyway,  here is what a Kafos chaos "test" looks like.
> > >
> > > https://github.com/edwardcapriolo/kafos/blob/
> > > 68782a5420840377dcb3ef94fb65132028804340/pure8/src/main/
> > > java/pure8/multipartitionrpw/MutliRPWWithConsumerRestarting
> Test.java#L1
> > >
> > > It is a bit silly that we are forking off processes like this, but I
> did
> > > not want processes running in the same VM.I am not ready to run
> > > testcontainers/docker at this point. But hey we launch kafka, we launch
> > zk,
> > > we start  workers, we kill them off we count things. Results look like
> > > this.
> > >
> > >
> > > Verification stats
> > > ------------------------------------------------------------------
> > > notFound: 0 foundCorrect: 4000 foundDuplications: 0
> > > ------------------------------------------------------------------
> > > not found list[]
> > > serverid: 0 out line Stream closed
> > >
> > > I am going to do more work killing off servers and making more test
> > > scenarios, but so far off to a good start. When you get all the code
> > right,
> > > (by cobbling together all the sources out there with half the code) you
> > get
> > > things that seem to do exactly once, Help wanted in acedemic review of
> my
> > > assertions and code or PRS
> > >
> > > Thanks
> > > Edward
> > >
> >
> >
> > --
> > Sorry this was sent from mobile. Will do less grammar and spell check
> than
> > usual.
> >
>

Right. It looks like what you need to do.

In my scenario above you have n transactional producers. You loop thought
them calling commit. If any of them fail. Do not call poll() close all the
producers. Close the consumer. Now restart it..ef fectively you create a
rebalance.

Can you psuedo code your method of

use a combination of
the KafkaConsumer#committed() and KafkaConsumer#seek()

I like this approach because avoiding the rebalance might be a good idea.


-- 
Sorry this was sent from mobile. Will do less grammar and spell check than
usual.

Re: Attempt to prove Kafka transactions work

Posted by Eric Azama <ea...@gmail.com>.
Calls to KafkaConsumer#poll() are completely independent of commits. As
such they will always return the next set of records, even if the previous
set have not been committed. This is how the consumer acts, regardless of
the Exactly Once semantics.

In order for the Consumer to reset to the currently committed offsets, you
need to either initiate a Consumer Group Rebalance, or use a combination of
the KafkaConsumer#committed() and KafkaConsumer#seek() methods.

On Wed, Nov 20, 2019 at 6:16 AM Edward Capriolo <ed...@gmail.com>
wrote:

> Ok. I'm at a point where I believe the exactly once is in question.
>
> Topic input 10 partitions topic output 10 partitions.
>
> Producer writes messages 1 to 100 to topic input.
>
> CTP process calls poll. It receives 100 messages 10 in each partiton.
>
> Process is simple mirroring take from input write to output.
>
> 10 producers with 10 transactional ids are created. During processing 1 of
> the 10 producers throws kafka exception. 90 out of 100 writes are committed
> tranactionally 10 are not.
>
> If poll is called again 10 messages do not appear in the next poll. Are
> they lost?
>
>
>
> On Saturday, November 9, 2019, Edward Capriolo <ed...@gmail.com>
> wrote:
>
> >
> > On Thu, Oct 31, 2019 at 6:11 AM Matthias J. Sax <ma...@confluent.io>
> > wrote:
> >
> >> Quite a project to test transactions...
> >>
> >> The current system test suite is part of the code base:
> >> https://github.com/apache/kafka/tree/trunk/tests/kafkatest/tests
> >>
> >> There is course also some unit/integration test for transactions.
> >>
> >> There is also a blog post that describes in a high level what testing
> >> was done when EOS was introduced:
> >> https://www.confluent.io/blog/exactly-once-semantics-are-
> >> possible-heres-how-apache-kafka-does-it/
> >>
> >> And yes, transactions are built on some assumptions and if you configure
> >> your system incorrectly of violate those assumptions, it may break. We
> >> also fixed some bugs since the first release. And there might be more
> >> bugs --- software is always buggy. However, for practical consideration,
> >> transactions should work.
> >>
> >> We would of course love if you could share your test results! If you
> >> discover a bug, please report it, so we can fix it.
> >>
> >>
> >> -Matthias
> >>
> >> On 10/28/19 10:06 AM, Edward Capriolo wrote:
> >> > On Sunday, October 27, 2019, Boyang Chen <re...@gmail.com>
> >> wrote:
> >> >
> >> >> Hey Edward,
> >> >>
> >> >> just to summarize and make sure I understood your question, you want
> to
> >> >> implement some Chaos testing to validate Kafka EOS model, but not
> sure
> >> how
> >> >> to start or curious about whether there are already works in the
> >> community
> >> >> doing that?
> >> >>
> >> >> For the correctness of Kafka EOS, we have tons of unit tests and
> system
> >> >> tests to prove its functionality. They could be found inside the
> repo.
> >> You
> >> >> could check them out and see if we still have gaps (which I believe
> we
> >> >> definitely have).
> >> >>
> >> >> Boyang
> >> >>
> >> >> On Fri, Oct 25, 2019 at 7:25 PM Edward Capriolo <
> edlinuxguru@gmail.com
> >> >
> >> >> wrote:
> >> >>
> >> >>> Hello all,
> >> >>>
> >> >>> I used to work in adtech. Adtech was great. CPM for ads is 1-$5 per
> >> >>> thousand ad impression. If numbers are 5% off you can blame
> javascript
> >> >>> click trackers.
> >> >>>
> >> >>> Now, I work in a non addtech industry and they are really, really
> >> serious
> >> >>> about exactly once.
> >> >>>
> >> >>> So there is this blog:
> >> >>>
> >> >>> https://www.confluent.io/blog/transactions-apache-kafka/
> >> >>>
> >> >>> Great little snippet of code. I think I can copy it and implement it
> >> >>> correctly.
> >> >>> You know, but if you read that section about the zombie fencing, you
> >> >> learn
> >> >>> that you either need to manually assign partitions or use the
> >> rebalance
> >> >>> listener and have N producers. Hunting around github is not super
> >> >> helpful,
> >> >>> some code snippets are less complete then even the snipped in the
> >> blog.
> >> >>>
> >> >>> I looked at what spring-kafka does. It does get the zombie fencing
> >> >> correct
> >> >>> with respect to fencing id, other bits other bits of the code seem
> >> >>> plausible.
> >> >>>
> >> >>> Notice I said "plausible", because I do not count a few end to end
> >> tests
> >> >>> running single VM as a solid enough evidence that this works in the
> >> face
> >> >> of
> >> >>> failures.
> >> >>>
> >> >>> I have been contemplating how one stress tests this exactly once
> >> concept,
> >> >>> with something Jepsen like or something brute force that I can run
> >> for 5
> >> >>> hours in a row
> >> >>>
> >> >>> If I faithfully implemented the code in the transactional read-write
> >> loop
> >> >>> and I feed it into my jepsen like black box tester it should:
> >> >>>
> >> >>> Create a topic with 10 partitions, Start launching read-write
> >> transaction
> >> >>> code, start feeding input data, Maybe strings like 1 -1000, now
> start
> >> >>> randomly killing vms with kill -9 kill graceful exits, maybe even
> >> killing
> >> >>> kafka, and make sure 1-1000, pop out on the other end.
> >> >>>
> >> >>> I thought of some other "crazy" ideas. One such idea:
> >> >>>
> >> >>> If I make a transactional "echo", read x; write x back to the same
> >> topic.
> >> >>> RunN instances of that and kill them randomly. If I am loosing
> >> messages
> >> >>> (and not duplicating messages) then the topic would eventually have
> no
> >> >>> data..
> >> >>>
> >> >>> Or should I make a program with some math formula like receive x
> write
> >> >> xx.
> >> >>> If duplication is happening I would start seeing multiple xx's
> >> >>>
> >> >>> Or send 1,000,000,000 messages through and consumer logs them to a
> >> file.
> >> >>> Then use an etl tool to validate messages come out on the other
> side.
> >> >>>
> >> >>> Or should I use a nosql with increments and count up and ensure no
> key
> >> >> has
> >> >>> been incremented twice.
> >> >>>
> >> >>> note: I realize I can just use kafka streams or storm, which has its
> >> own
> >> >>> systems to guarantee "at most once" but Iooking for a way to prove
> >> what
> >> >> can
> >> >>> be done with pure kafka. (and not just prove it adtech work (good
> >> enough
> >> >> 5%
> >> >>> here or there) )
> >> >>>
> >> >>> I imagine someone somewhere must be doing this. How where tips? Is
> it
> >> >> part
> >> >>> of some kafka release stress test? I'm down to write it if it does
> not
> >> >>> exist.
> >> >>>
> >> >>> Thanks,
> >> >>> Edward,
> >> >>>
> >> >>> Thanks,
> >> >>> Edward
> >> >>>
> >> >>
> >> >
> >> > Boyang,
> >> >
> >> > I just to summarize and make sure I understood your question, you want
> >> to
> >> > implement some Chaos testing to validate Kafka EOS model, but not sure
> >> how
> >> > to start or curious about whether there are already works in the
> >> community
> >> > doing that?
> >> >
> >> > Yes.
> >> >
> >> > I am not an expert in this field, but I know what distributed systems
> >> can
> >> > mask failures. For example if you have atomic increment you might unit
> >> test
> >> > it and it works fine, but if you ran it for 40 days it might double
> >> count 1
> >> > time.
> >> >
> >> >  of Kafka EOS, we have tons of unit tests and system
> >> > tests to prove its functionality. They could be found inside the repo.
> >> >
> >> > I've been a developer for a while so the phrase "there are tests"
> never
> >> > tells me everything. Tests reveal the presence of bugs not the
> absence.
> >> >
> >> > Can you please point me at the tests? My curiosity is if there is a
> >> > systematic in-depth strategy here and how much rigor there is.
> >> >
> >> > In my environment I need to quantify and use rigor to prove out these
> >> > things. Things that you might take for granted. For example, I have to
> >> > prove that zookeeper works as expected when we lose a datacenter. Most
> >> > people 'in the know' take it for granted that kafka and zk do what is
> >> > advertised when configured properly. I have to test that out and
> >> document
> >> > my findings.
> >> >
> >> > For kafka transactions. The user space code needs to be written
> properly
> >> > and configured properly along with the server being setup properly. It
> >> is
> >> > not enough for me to check out kafka run 'sbt test' and declare
> victory
> >> > after the unit tests pass.
> >> >
> >> > What I am effectively looking for is the anti jepsen blog that
> says...We
> >> > threw the kitchen sink at this and these transactions are bullet
> proof.
> >> > Here is our methodology, here is some charts, here is xyz. Here is how
> >> we
> >> > run it every minor release
> >> >
> >> > I'm not trying to be a pita, educate me on how bullet proof this is
> and
> >> how
> >> > I can reproduce the results.
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >>
> >>
> >
> > All,
> >
> > After a few weeks of hacking at this I have found a few interesting
> > things. First things first, introducing Kafos
> >
> > https://github.com/edwardcapriolo/kafos
> >
> > Kafos (think Kafka + Chaos) . Kafos is a project that attempts to prove
> > out Kafka transactions do exacty-once by launching multiple servers and
> > consume/producers, killing them off and counting how many messages land
> on
> > the other end.
> >
> > Note: The project is a little raw at this point, (it has hard code to my
> > home folders and some long Thread.sleep() calls in there. I will clean it
> > up as I go,) but I have some early findings.
> >
> > From the blog here, I want to point out the following flaws:
> >
> > https://www.confluent.io/blog/transactions-apache-kafka/
> >
> > KafkaProducer producer = createKafkaProducer(
> > “bootstrap.servers”, “localhost:9092”,
> > “transactional.id”, “my-transactional-id”);
> >
> > 1) a fixed transaction.id effectively means the first worker fences all
> > the other ones.While being exactly-once, there is exactly one worker and
> no
> > parallelism.
> >
> > Spring has two schemes that seem to implement random information in the
> > fencing ids, which I think defeats the purpose. But I need to dive in on
> > that more.
> > https://docs.spring.io/spring-kafka/reference/html/
> >
> > Transactions are enabled by providing the DefaultKafkaProducerFactory
> > with a transactionIdPrefix. In that case, instead of managing a single
> > shared Producer, the factory maintains a cache of transactional
> > producers. When the user calls close() on a producer, it is returned to
> > the cache for reuse instead of actually being closed. The
> transactional.id
> > property of each producer is transactionIdPrefix + n, where n starts with
> > 0 and is incremented for each new producer, unless the transaction is
> > started by a listener container with a record-based listener. In that
> case,
> > the transactional.id is <transactionIdPrefix>.<group.id
> > >.<topic>.<partition>. This is to properly support fencing zombies, as
> > described here <https://www.confluent.io/blog/transactions-apache-kafka/
> >.
> > This new behavior was added in versions 1.3.7, 2.0.6, 2.1.10, and 2.2.0.
> If
> > you wish to revert to the previous behavior, you can set the
> > producerPerConsumerPartition property on the DefaultKafkaProducerFactory
> > to false.
> >
> > While transactions are supported with batch listeners, by default, zombie
> > fencing is not supported because a batch may contain records from
> multiple
> > topics or partitions. However, starting with version 2.3.2, zombie
> fencing
> > is supported if you set the container property subBatchPerPartition to
> > true. In that case, the batch listener is invoked once per partition
> > received from the last poll, as if each poll only returned records for a
> > single partition.
> >
> > How to fix this? Here be the dragons:
> >
> > "Practically, one would either have to store the mapping between input
> > partitions and transactional.ids in an external store, or have some
> static
> > encoding of it. Kafka Streams opts for the latter approach to solve this
> > problem."
> >
> > Well then practically, the rest of this code is not useful.
> >
> > while (true) {
> >    ConsumerRecords records = consumer.poll(Long.MAX_VALUE);
> >    producer.beginTransaction();
> >    for (ConsumerRecord record : records)
> >     producer.send(producerRecord(“outputTopic”, record));
> >    producer.sendOffsetsToTransaction(currentOffsets(consumer), group);
> >    producer.commitTransaction();
> > }
> >
> > When you call consumer.poll() (unless you manually assign to a single
> > partiion) you are going to receive records from one or more partitions.
> As
> > consumers join and leave the the group workers are going to move.
> >
> > There are two ways I can find to do this:
> > 1) use a consumerRebalanceListener and a concurrent map of producers
> > https://github.com/edwardcapriolo/kafos/blob/
> > 68782a5420840377dcb3ef94fb65132028804340/pure8/src/main/
> > java/pure8/multipartitionrpw/MultiPartitionRPWBuffered.java#L123
> >
> > 2) as you are polling create/reuse producers as needed.
> >
> > With scalable correctly fenced producers the loop looks more like this:
> >
> > ConsumerRecords<K,V> records = null;
> > try {
> >    records = consumer.poll(2000);
> > } catch (KafkaException e){
> >    return;
> > }
> >
> > for(ConsumerRecord<K,V> record: records) {
> >    int partition = record.partition();
> >    BufferedTrackingProducer<K,V> producer = producers.get(partition);
> >    List<ProducerRecord<K, V>> results = processor.process(record);
> >    producer.send(results);
> >
> > }
> > for (Entry<Integer, BufferedTrackingProducer<K, V>> entry:
> producers.entrySet())
> > {
> >    entry.getValue().commitAndClear(records, groupId);
> > }
> >
> > Implementation here: https://github.com/edwardcapriolo/kafos/blob/
> > 68782a5420840377dcb3ef94fb65132028804340/pure8/src/main/
> > java/pure8/multipartitionrpw/MultiPartitionRPWBuffered.java#L1
> >
> > BTW maybe I am wrong about this and there is a better approach. With the
> > multiple producer threads for single consumer.poll figuring out exactly
> how
> > to layer the try/catch and exception handling becomes a bit harder than
> the
> > example in the blog.
> >
> > Anyway,  here is what a Kafos chaos "test" looks like.
> >
> > https://github.com/edwardcapriolo/kafos/blob/
> > 68782a5420840377dcb3ef94fb65132028804340/pure8/src/main/
> > java/pure8/multipartitionrpw/MutliRPWWithConsumerRestartingTest.java#L1
> >
> > It is a bit silly that we are forking off processes like this, but I did
> > not want processes running in the same VM.I am not ready to run
> > testcontainers/docker at this point. But hey we launch kafka, we launch
> zk,
> > we start  workers, we kill them off we count things. Results look like
> > this.
> >
> >
> > Verification stats
> > ------------------------------------------------------------------
> > notFound: 0 foundCorrect: 4000 foundDuplications: 0
> > ------------------------------------------------------------------
> > not found list[]
> > serverid: 0 out line Stream closed
> >
> > I am going to do more work killing off servers and making more test
> > scenarios, but so far off to a good start. When you get all the code
> right,
> > (by cobbling together all the sources out there with half the code) you
> get
> > things that seem to do exactly once, Help wanted in acedemic review of my
> > assertions and code or PRS
> >
> > Thanks
> > Edward
> >
>
>
> --
> Sorry this was sent from mobile. Will do less grammar and spell check than
> usual.
>

Re: Attempt to prove Kafka transactions work

Posted by Edward Capriolo <ed...@gmail.com>.
Ok. I'm at a point where I believe the exactly once is in question.

Topic input 10 partitions topic output 10 partitions.

Producer writes messages 1 to 100 to topic input.

CTP process calls poll. It receives 100 messages 10 in each partiton.

Process is simple mirroring take from input write to output.

10 producers with 10 transactional ids are created. During processing 1 of
the 10 producers throws kafka exception. 90 out of 100 writes are committed
tranactionally 10 are not.

If poll is called again 10 messages do not appear in the next poll. Are
they lost?



On Saturday, November 9, 2019, Edward Capriolo <ed...@gmail.com>
wrote:

>
> On Thu, Oct 31, 2019 at 6:11 AM Matthias J. Sax <ma...@confluent.io>
> wrote:
>
>> Quite a project to test transactions...
>>
>> The current system test suite is part of the code base:
>> https://github.com/apache/kafka/tree/trunk/tests/kafkatest/tests
>>
>> There is course also some unit/integration test for transactions.
>>
>> There is also a blog post that describes in a high level what testing
>> was done when EOS was introduced:
>> https://www.confluent.io/blog/exactly-once-semantics-are-
>> possible-heres-how-apache-kafka-does-it/
>>
>> And yes, transactions are built on some assumptions and if you configure
>> your system incorrectly of violate those assumptions, it may break. We
>> also fixed some bugs since the first release. And there might be more
>> bugs --- software is always buggy. However, for practical consideration,
>> transactions should work.
>>
>> We would of course love if you could share your test results! If you
>> discover a bug, please report it, so we can fix it.
>>
>>
>> -Matthias
>>
>> On 10/28/19 10:06 AM, Edward Capriolo wrote:
>> > On Sunday, October 27, 2019, Boyang Chen <re...@gmail.com>
>> wrote:
>> >
>> >> Hey Edward,
>> >>
>> >> just to summarize and make sure I understood your question, you want to
>> >> implement some Chaos testing to validate Kafka EOS model, but not sure
>> how
>> >> to start or curious about whether there are already works in the
>> community
>> >> doing that?
>> >>
>> >> For the correctness of Kafka EOS, we have tons of unit tests and system
>> >> tests to prove its functionality. They could be found inside the repo.
>> You
>> >> could check them out and see if we still have gaps (which I believe we
>> >> definitely have).
>> >>
>> >> Boyang
>> >>
>> >> On Fri, Oct 25, 2019 at 7:25 PM Edward Capriolo <edlinuxguru@gmail.com
>> >
>> >> wrote:
>> >>
>> >>> Hello all,
>> >>>
>> >>> I used to work in adtech. Adtech was great. CPM for ads is 1-$5 per
>> >>> thousand ad impression. If numbers are 5% off you can blame javascript
>> >>> click trackers.
>> >>>
>> >>> Now, I work in a non addtech industry and they are really, really
>> serious
>> >>> about exactly once.
>> >>>
>> >>> So there is this blog:
>> >>>
>> >>> https://www.confluent.io/blog/transactions-apache-kafka/
>> >>>
>> >>> Great little snippet of code. I think I can copy it and implement it
>> >>> correctly.
>> >>> You know, but if you read that section about the zombie fencing, you
>> >> learn
>> >>> that you either need to manually assign partitions or use the
>> rebalance
>> >>> listener and have N producers. Hunting around github is not super
>> >> helpful,
>> >>> some code snippets are less complete then even the snipped in the
>> blog.
>> >>>
>> >>> I looked at what spring-kafka does. It does get the zombie fencing
>> >> correct
>> >>> with respect to fencing id, other bits other bits of the code seem
>> >>> plausible.
>> >>>
>> >>> Notice I said "plausible", because I do not count a few end to end
>> tests
>> >>> running single VM as a solid enough evidence that this works in the
>> face
>> >> of
>> >>> failures.
>> >>>
>> >>> I have been contemplating how one stress tests this exactly once
>> concept,
>> >>> with something Jepsen like or something brute force that I can run
>> for 5
>> >>> hours in a row
>> >>>
>> >>> If I faithfully implemented the code in the transactional read-write
>> loop
>> >>> and I feed it into my jepsen like black box tester it should:
>> >>>
>> >>> Create a topic with 10 partitions, Start launching read-write
>> transaction
>> >>> code, start feeding input data, Maybe strings like 1 -1000, now start
>> >>> randomly killing vms with kill -9 kill graceful exits, maybe even
>> killing
>> >>> kafka, and make sure 1-1000, pop out on the other end.
>> >>>
>> >>> I thought of some other "crazy" ideas. One such idea:
>> >>>
>> >>> If I make a transactional "echo", read x; write x back to the same
>> topic.
>> >>> RunN instances of that and kill them randomly. If I am loosing
>> messages
>> >>> (and not duplicating messages) then the topic would eventually have no
>> >>> data..
>> >>>
>> >>> Or should I make a program with some math formula like receive x write
>> >> xx.
>> >>> If duplication is happening I would start seeing multiple xx's
>> >>>
>> >>> Or send 1,000,000,000 messages through and consumer logs them to a
>> file.
>> >>> Then use an etl tool to validate messages come out on the other side.
>> >>>
>> >>> Or should I use a nosql with increments and count up and ensure no key
>> >> has
>> >>> been incremented twice.
>> >>>
>> >>> note: I realize I can just use kafka streams or storm, which has its
>> own
>> >>> systems to guarantee "at most once" but Iooking for a way to prove
>> what
>> >> can
>> >>> be done with pure kafka. (and not just prove it adtech work (good
>> enough
>> >> 5%
>> >>> here or there) )
>> >>>
>> >>> I imagine someone somewhere must be doing this. How where tips? Is it
>> >> part
>> >>> of some kafka release stress test? I'm down to write it if it does not
>> >>> exist.
>> >>>
>> >>> Thanks,
>> >>> Edward,
>> >>>
>> >>> Thanks,
>> >>> Edward
>> >>>
>> >>
>> >
>> > Boyang,
>> >
>> > I just to summarize and make sure I understood your question, you want
>> to
>> > implement some Chaos testing to validate Kafka EOS model, but not sure
>> how
>> > to start or curious about whether there are already works in the
>> community
>> > doing that?
>> >
>> > Yes.
>> >
>> > I am not an expert in this field, but I know what distributed systems
>> can
>> > mask failures. For example if you have atomic increment you might unit
>> test
>> > it and it works fine, but if you ran it for 40 days it might double
>> count 1
>> > time.
>> >
>> >  of Kafka EOS, we have tons of unit tests and system
>> > tests to prove its functionality. They could be found inside the repo.
>> >
>> > I've been a developer for a while so the phrase "there are tests" never
>> > tells me everything. Tests reveal the presence of bugs not the absence.
>> >
>> > Can you please point me at the tests? My curiosity is if there is a
>> > systematic in-depth strategy here and how much rigor there is.
>> >
>> > In my environment I need to quantify and use rigor to prove out these
>> > things. Things that you might take for granted. For example, I have to
>> > prove that zookeeper works as expected when we lose a datacenter. Most
>> > people 'in the know' take it for granted that kafka and zk do what is
>> > advertised when configured properly. I have to test that out and
>> document
>> > my findings.
>> >
>> > For kafka transactions. The user space code needs to be written properly
>> > and configured properly along with the server being setup properly. It
>> is
>> > not enough for me to check out kafka run 'sbt test' and declare victory
>> > after the unit tests pass.
>> >
>> > What I am effectively looking for is the anti jepsen blog that says...We
>> > threw the kitchen sink at this and these transactions are bullet proof.
>> > Here is our methodology, here is some charts, here is xyz. Here is how
>> we
>> > run it every minor release
>> >
>> > I'm not trying to be a pita, educate me on how bullet proof this is and
>> how
>> > I can reproduce the results.
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>>
>>
>
> All,
>
> After a few weeks of hacking at this I have found a few interesting
> things. First things first, introducing Kafos
>
> https://github.com/edwardcapriolo/kafos
>
> Kafos (think Kafka + Chaos) . Kafos is a project that attempts to prove
> out Kafka transactions do exacty-once by launching multiple servers and
> consume/producers, killing them off and counting how many messages land on
> the other end.
>
> Note: The project is a little raw at this point, (it has hard code to my
> home folders and some long Thread.sleep() calls in there. I will clean it
> up as I go,) but I have some early findings.
>
> From the blog here, I want to point out the following flaws:
>
> https://www.confluent.io/blog/transactions-apache-kafka/
>
> KafkaProducer producer = createKafkaProducer(
> “bootstrap.servers”, “localhost:9092”,
> “transactional.id”, “my-transactional-id”);
>
> 1) a fixed transaction.id effectively means the first worker fences all
> the other ones.While being exactly-once, there is exactly one worker and no
> parallelism.
>
> Spring has two schemes that seem to implement random information in the
> fencing ids, which I think defeats the purpose. But I need to dive in on
> that more.
> https://docs.spring.io/spring-kafka/reference/html/
>
> Transactions are enabled by providing the DefaultKafkaProducerFactory
> with a transactionIdPrefix. In that case, instead of managing a single
> shared Producer, the factory maintains a cache of transactional
> producers. When the user calls close() on a producer, it is returned to
> the cache for reuse instead of actually being closed. The transactional.id
> property of each producer is transactionIdPrefix + n, where n starts with
> 0 and is incremented for each new producer, unless the transaction is
> started by a listener container with a record-based listener. In that case,
> the transactional.id is <transactionIdPrefix>.<group.id
> >.<topic>.<partition>. This is to properly support fencing zombies, as
> described here <https://www.confluent.io/blog/transactions-apache-kafka/>.
> This new behavior was added in versions 1.3.7, 2.0.6, 2.1.10, and 2.2.0. If
> you wish to revert to the previous behavior, you can set the
> producerPerConsumerPartition property on the DefaultKafkaProducerFactory
> to false.
>
> While transactions are supported with batch listeners, by default, zombie
> fencing is not supported because a batch may contain records from multiple
> topics or partitions. However, starting with version 2.3.2, zombie fencing
> is supported if you set the container property subBatchPerPartition to
> true. In that case, the batch listener is invoked once per partition
> received from the last poll, as if each poll only returned records for a
> single partition.
>
> How to fix this? Here be the dragons:
>
> "Practically, one would either have to store the mapping between input
> partitions and transactional.ids in an external store, or have some static
> encoding of it. Kafka Streams opts for the latter approach to solve this
> problem."
>
> Well then practically, the rest of this code is not useful.
>
> while (true) {
>    ConsumerRecords records = consumer.poll(Long.MAX_VALUE);
>    producer.beginTransaction();
>    for (ConsumerRecord record : records)
>     producer.send(producerRecord(“outputTopic”, record));
>    producer.sendOffsetsToTransaction(currentOffsets(consumer), group);
>    producer.commitTransaction();
> }
>
> When you call consumer.poll() (unless you manually assign to a single
> partiion) you are going to receive records from one or more partitions.  As
> consumers join and leave the the group workers are going to move.
>
> There are two ways I can find to do this:
> 1) use a consumerRebalanceListener and a concurrent map of producers
> https://github.com/edwardcapriolo/kafos/blob/
> 68782a5420840377dcb3ef94fb65132028804340/pure8/src/main/
> java/pure8/multipartitionrpw/MultiPartitionRPWBuffered.java#L123
>
> 2) as you are polling create/reuse producers as needed.
>
> With scalable correctly fenced producers the loop looks more like this:
>
> ConsumerRecords<K,V> records = null;
> try {
>    records = consumer.poll(2000);
> } catch (KafkaException e){
>    return;
> }
>
> for(ConsumerRecord<K,V> record: records) {
>    int partition = record.partition();
>    BufferedTrackingProducer<K,V> producer = producers.get(partition);
>    List<ProducerRecord<K, V>> results = processor.process(record);
>    producer.send(results);
>
> }
> for (Entry<Integer, BufferedTrackingProducer<K, V>> entry: producers.entrySet())
> {
>    entry.getValue().commitAndClear(records, groupId);
> }
>
> Implementation here: https://github.com/edwardcapriolo/kafos/blob/
> 68782a5420840377dcb3ef94fb65132028804340/pure8/src/main/
> java/pure8/multipartitionrpw/MultiPartitionRPWBuffered.java#L1
>
> BTW maybe I am wrong about this and there is a better approach. With the
> multiple producer threads for single consumer.poll figuring out exactly how
> to layer the try/catch and exception handling becomes a bit harder than the
> example in the blog.
>
> Anyway,  here is what a Kafos chaos "test" looks like.
>
> https://github.com/edwardcapriolo/kafos/blob/
> 68782a5420840377dcb3ef94fb65132028804340/pure8/src/main/
> java/pure8/multipartitionrpw/MutliRPWWithConsumerRestartingTest.java#L1
>
> It is a bit silly that we are forking off processes like this, but I did
> not want processes running in the same VM.I am not ready to run
> testcontainers/docker at this point. But hey we launch kafka, we launch zk,
> we start  workers, we kill them off we count things. Results look like
> this.
>
>
> Verification stats
> ------------------------------------------------------------------
> notFound: 0 foundCorrect: 4000 foundDuplications: 0
> ------------------------------------------------------------------
> not found list[]
> serverid: 0 out line Stream closed
>
> I am going to do more work killing off servers and making more test
> scenarios, but so far off to a good start. When you get all the code right,
> (by cobbling together all the sources out there with half the code) you get
> things that seem to do exactly once, Help wanted in acedemic review of my
> assertions and code or PRS
>
> Thanks
> Edward
>


-- 
Sorry this was sent from mobile. Will do less grammar and spell check than
usual.