You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pulsar.apache.org by Richard Yu <yo...@gmail.com> on 2019/02/28 04:35:08 UTC

PIP-31: Add support for transactional messaging

Hi all,

I would like to create a PIP for issue #2664 on Github. The details of the
PIP are below.
I hope we could discuss this thoroughly.

Cheers,
Richard

PIP-31: Add support for transactional messaging

Motivation: Pulsar currently could improve upon their system of sending
packets of data by implementing transactional messaging. This system
enforces eventual consistency within the system, and allows operations to
be performed atomically.

Proposal:

As described in the issue, we would implement the following policy in
Producer and Pulsar Broker:
1. The producer produces the pre-processing transaction message. At this
point, the broker will set the status of this message to unknown.
2. After the local transaction is successfully executed, the commit message
is sent, otherwise the rollback message is sent.
3. The broker receives the message. If it is a commit message, it modifies
the transaction status to commit, and then sends an actual message to the
consumer queue. At this time, the consumer can consume the message.
Otherwise, the transaction status is modified to rollback. The message will
be discarded.
4. If at step 2, the producer is down or abnormal, at this time, the broker
will periodically ask the specific producer for the status of the message,
and update the status according to the producer's response, and process it
according to step 3, the action that comes down.

Specific concerns:
There are a number of things we will improve upon or add:
- A configuration called ```maxMessageUnknownTime```. Consider this
scenario: the pre-processing transaction message is sent, but the commit or
rollback message is never received, which could mean that the status of a
message would be permanently unknown. To avoid this from happening, we
would need a config which limits the amount of time the status of a message
could be unknown (i.e. ```maxMessageUnknownTime```) After that, the message
would be discarded.
- Logging would be updated to log the status of a message i.e. UNKNOWN,
ROLLBACK, or COMMITTED. This would allow the user to know whether or not a
message had failed or fallen through.

Possible Additional API:
- We would add a method which allows the user to query the state of the
message i.e. ```getStateOfMessage(long id)```

Re: PIP-31: Add support for transactional messaging

Posted by Dave Fisher <wa...@apache.org>.
Hi Richard,

I moderated your email onto the list. To participate please subscribe by sending an email to dev-subscribe@pulsar.apache.org and respond to the confirmation email as it instructs.

Regards,
Dave

Sent from my iPhone

> On Feb 27, 2019, at 8:35 PM, Richard Yu <yo...@gmail.com> wrote:
> 
> Hi all,
> 
> I would like to create a PIP for issue #2664 on Github. The details of the
> PIP are below.
> I hope we could discuss this thoroughly.
> 
> Cheers,
> Richard
> 
> PIP-31: Add support for transactional messaging
> 
> Motivation: Pulsar currently could improve upon their system of sending
> packets of data by implementing transactional messaging. This system
> enforces eventual consistency within the system, and allows operations to
> be performed atomically.
> 
> Proposal:
> 
> As described in the issue, we would implement the following policy in
> Producer and Pulsar Broker:
> 1. The producer produces the pre-processing transaction message. At this
> point, the broker will set the status of this message to unknown.
> 2. After the local transaction is successfully executed, the commit message
> is sent, otherwise the rollback message is sent.
> 3. The broker receives the message. If it is a commit message, it modifies
> the transaction status to commit, and then sends an actual message to the
> consumer queue. At this time, the consumer can consume the message.
> Otherwise, the transaction status is modified to rollback. The message will
> be discarded.
> 4. If at step 2, the producer is down or abnormal, at this time, the broker
> will periodically ask the specific producer for the status of the message,
> and update the status according to the producer's response, and process it
> according to step 3, the action that comes down.
> 
> Specific concerns:
> There are a number of things we will improve upon or add:
> - A configuration called ```maxMessageUnknownTime```. Consider this
> scenario: the pre-processing transaction message is sent, but the commit or
> rollback message is never received, which could mean that the status of a
> message would be permanently unknown. To avoid this from happening, we
> would need a config which limits the amount of time the status of a message
> could be unknown (i.e. ```maxMessageUnknownTime```) After that, the message
> would be discarded.
> - Logging would be updated to log the status of a message i.e. UNKNOWN,
> ROLLBACK, or COMMITTED. This would allow the user to know whether or not a
> message had failed or fallen through.
> 
> Possible Additional API:
> - We would add a method which allows the user to query the state of the
> message i.e. ```getStateOfMessage(long id)```


Re: PIP-31: Add support for transactional messaging

Posted by Ivan Kelly <iv...@apache.org>.
Hi Richard,

This is something that has been discussed a few times, but the
outcomes of the discussions never written down. Thanks for starting
the conversation.

Responses to your PIP inline.

> Motivation: Pulsar currently could improve upon their system of sending
> packets of data by implementing transactional messaging. This system
> enforces eventual consistency within the system, and allows operations to
> be performed atomically.

Transactional acknowledgement also needs to be taken into account, so
that clients can say something like when I commit m2 and m3 to topics
X & Y, m1 should be marked as acknowledged on topic Z. Acknowledgement
makes the transactional bit more complicated. With transactions on
publish we don't have to deal with conflict resolution, but with
acknowledgement, we can't acknowledge a message which has already been
acknowledged.

> As described in the issue, we would implement the following policy in
> Producer and Pulsar Broker:
> 1. The producer produces the pre-processing transaction message. At this
> point, the broker will set the status of this message to unknown.

By "set the message to unknown", do you mean the broker will cache the
message, not writing it to any log? This could become a problem on the
commit as they commit may go to another broker, and the broker storing
the message could crash and then you'd end up with a committed message
where the content is missing.

> 2. After the local transaction is successfully executed, the commit message
> is sent, otherwise the rollback message is sent.

What does the commit message contain? And where is it sent? Keep in
mind that for transactional publish to be useful, we need be able to
write to many topics, and topics may live on different brokers. But
the commit message has to be atomic by definition, so it can only go
to one broker. In designs we've discussed previously, this was handled
by a component called the transaction coordinator, which is a logical
component which each broker knows how to talk to. For a transaction
the commit message is sent to the coordinator, which writes it to its
own log, and then goes through each topic in the commit and marks the
transaction as completed.

> - A configuration called ```maxMessageUnknownTime```. Consider this
> scenario: the pre-processing transaction message is sent, but the commit or
> rollback message is never received, which could mean that the status of a
> message would be permanently unknown. To avoid this from happening, we
> would need a config which limits the amount of time the status of a message
> could be unknown (i.e. ```maxMessageUnknownTime```) After that, the message
> would be discarded.

In transactional systems this is normally known as the low watermark,
but it can't be based on wallclock time. Brokers may have wildly
different times, and if one broker could abort a transaction while
that transactions is still live on another broker. In previous
discussions, each transaction was given a transaction ID, which is a
monotonically increasing number (allocated by the transaction
coordinator). As the commit is going through the coordinator as well,
the coordinator can decide whether the transaction ID is below the low
water mark which it also controls.

Cheers,
Ivan

Re: PIP-31: Add support for transactional messaging

Posted by Matteo Merli <ma...@gmail.com>.
Once there's support for transactions in messaging API, there will be
no need for a base class for functions. Rather a config option will
allow to enable transactional mode.
--
Matteo Merli
<ma...@gmail.com>

On Sat, Mar 2, 2019 at 6:39 PM Sijie Guo <gu...@gmail.com> wrote:
>
> Dave,
>
> You mean implementing the transactions in pulsar function?
>
> - Sijie
>
> On Sun, Mar 3, 2019 at 1:52 AM Dave Fisher <da...@comcast.net> wrote:
>
> > Hi -
> >
> > Is this a case where a Pulsar function base class for transactions would
> > help?
> >
> > Regards,
> > Dave
> >
> > Sent from my iPhone
> >
> > > On Mar 2, 2019, at 2:39 AM, Sijie Guo <gu...@gmail.com> wrote:
> > >
> > > Pravega's model is a better model than Kafka - it addressed the
> > > interleaving problems. However Pravega's model is based on a giant
> > > replicated log and rewrite the data to a second tiered storage for
> > > persistence, which basically re-implemented bookkeeper's logic in
> > broker. A
> > > fundamental drawback of Pravega is write amplifications. The
> > amplifications
> > > of both network and IO bandwidth are huge. If you use bookkeeper both for
> > > its first-and-second tier storage and assume the bookkeeper replication
> > > factor is 3, pravega requires 6x network bandwidth and 12x IO bandwidth.
> > > For a given message, it needs to write 3 times into the journal, and 3
> > > times for persistent. The amplifications hugely limit the throughput at
> > > pravega "brokers".
> > >
> > > - Sijie
> > >
> > >
> > >
> > >> On Sat, Mar 2, 2019 at 6:13 PM Ali Ahmed <ah...@gmail.com> wrote:
> > >>
> > >> I agree we many want to review pravega's past efforts in this area also.
> > >>
> > >>
> > >>
> > https://github.com/pravega/pravega/blob/master/documentation/src/docs/transactions.md
> > >>
> > >>
> > https://github.com/pravega/pravega/blob/master/client/src/main/java/io/pravega/client/stream/Transaction.java
> > >>
> > >> -Ali
> > >>
> > >>> On Sat, Mar 2, 2019 at 1:56 AM Sijie Guo <gu...@gmail.com> wrote:
> > >>>
> > >>> Kafka's implementation is interleaving committed messages with
> > >> uncommitted
> > >>> messages at storage. Personally I think it is a very ugly design and
> > >>> implementation.
> > >>>
> > >>> Pulsar is a segment centric system, where we have a shared segment
> > >> storage
> > >>> - bookkeeper. I think a better direction is to leverage the segments
> > (aka
> > >>> ledgers)
> > >>> for buffering uncommitted messages and commit the whole segment when
> > the
> > >>> whole transaction is committed.
> > >>>
> > >>> A rough idea would be:
> > >>>
> > >>> 1) for any transaction, write the messages to a separate ledger (or
> > >>> multiple separate ledger).
> > >>> 2) during the transaction, accumulates the messages in those ledgers.
> > >>> 3) when commit, merge the txn ledgers back to the main data ledger. the
> > >>> merge can be done either adding a meta message where data is stored in
> > >> the
> > >>> txn ledger or actually copying the data to data ledger (depending on
> > the
> > >>> size of data accumulate in the transaction).
> > >>> 4) when abort, delete the txn ledger. No other additional work to be
> > >> done.
> > >>>
> > >>> This would be producing a much clear design than Kafka.
> > >>>
> > >>> On Ivan's comments:
> > >>>
> > >>>> Transactional acknowledgement also needs to be taken into account
> > >>>
> > >>> I don't think we have to treat `transactional acknowledgement` as a
> > >> special
> > >>> case. currently `acknowledgment` are actually "append" operations into
> > >>> cursor ledgers.
> > >>> So the problem set can be reduced as `atomic append` to both data
> > ledgers
> > >>> and cursor ledgers. in that way, we can use one solution for handling
> > >>> appending data and updating cursors.
> > >>>
> > >>> Additionally, I think a related topic about transactions would be
> > >>> supporting large sized message (e.g. >= 5MB). If we take the approach I
> > >>> described above using a separated ledger for accumulating messages for
> > a
> > >>> transaction, that we are easy to model a large size message as a
> > >>> transaction of chunked messages.
> > >>>
> > >>> @Richard, @Ivan let me know what do you think. If you guys think the
> > >>> direction I raised is a good one to go down, I am happy to write them
> > >> down
> > >>> into details, and drive the design and coordinate the implementations
> > in
> > >>> the community.
> > >>>
> > >>> - Sijie
> > >>>
> > >>> On Sat, Mar 2, 2019 at 9:45 AM Richard Yu <yo...@gmail.com>
> > >>> wrote:
> > >>>
> > >>>> Hi all,
> > >>>>
> > >>>> We might be able to get some ideas on implementing this from Kafka:
> > >>>>
> > >>>>
> > >>>
> > >>
> > https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging+in+Kafka
> > >>>>
> > >>>> Obviously, there is some differences in Kafka and Pulsar internals but
> > >> at
> > >>>> some level, the implementation would be similar.
> > >>>> It should help.
> > >>>>
> > >>>>
> > >>>>
> > >>>> On Thu, Feb 28, 2019 at 4:29 PM Richard Yu <
> > yohan.richard.yu@gmail.com
> > >>>
> > >>>> wrote:
> > >>>>
> > >>>>> Hi,
> > >>>>>
> > >>>>> Per request, I've created a doc so we could get some more input in an
> > >>>>> organized manner:
> > >>>>>
> > >>>>>
> > >>>>
> > >>>
> > >>
> > https://docs.google.com/document/d/1mSUsJvPgThnWizeQqljKU5244BMEiYHabA6OP6QEHmQ/edit?usp=sharing
> > >>>>>
> > >>>>> And for Ivan's questions, I would answer accordingly.
> > >>>>>
> > >>>>>> By "set the message to unknown", do you mean the broker will cache
> > >> the
> > >>>>>> message, not writing it to any log?
> > >>>>>
> > >>>>> We wouldn't cache the message from my interpretation of the steps.
> > >> What
> > >>>>> the producer is first sending is a pre-processing message, not the
> > >> real
> > >>>>> message itself. This step basically notifies the broker that the
> > >>> message
> > >>>> is
> > >>>>> on its way. So all we have to do is store the message id and its
> > >>>>> corresponding status in a map, and depending on the producer's
> > >>> response,
> > >>>>> the status will change accordingly.
> > >>>>>
> > >>>>>> In designs we've discussed previously, this was handled
> > >>>>>> by a component called the transaction coordinator, which is a
> > >> logical
> > >>>>>> component which each broker knows how to talk to. For a transaction
> > >>>>>> the commit message is sent to the coordinator, which writes it to
> > >> its
> > >>>>>> own log, and then goes through each topic in the commit and marks
> > >> the
> > >>>>>> transaction as completed.
> > >>>>>
> > >>>>> I wasn't aware of previous discussions on this topic, but it seems
> > >>> pretty
> > >>>>> good to me. It's certainly better than what I would come up with.
> > >>>>> If there's any more things we need to talk about, I suppose we could
> > >>> move
> > >>>>> it to the google doc to play around with.
> > >>>>>
> > >>>>> Hope we can get this PIP rolling.
> > >>>>>
> > >>>>>
> > >>>>> On Thu, Feb 28, 2019 at 2:53 AM Sijie Guo <gu...@gmail.com>
> > >> wrote:
> > >>>>>
> > >>>>>> Richard,
> > >>>>>>
> > >>>>>> Thank you for putting this put and pushing the discussion forward.
> > >>>>>>
> > >>>>>> I think this is a very large feature. It might be worth creating a
> > >>>> google
> > >>>>>> doc for it (which is better for collaboration). And I believe Ivan
> > >> has
> > >>>>>> some
> > >>>>>> thoughts as well. If you can put up a google doc (make it
> > >>>> world-editable),
> > >>>>>> Ivan can probably dump his thoughts there and we can finalize the
> > >>>>>> discussion and break down into tasks. So the whole community can
> > >>>> actually
> > >>>>>> work together at collaborating this.
> > >>>>>>
> > >>>>>> Thanks,
> > >>>>>> Sijie
> > >>>>>>
> > >>>>>> On Thu, Feb 28, 2019 at 1:08 PM Richard Yu <
> > >>> yohan.richard.yu@gmail.com>
> > >>>>>> wrote:
> > >>>>>>
> > >>>>>>> Hi all,
> > >>>>>>>
> > >>>>>>> I would like to create a PIP for issue #2664 on Github. The
> > >> details
> > >>> of
> > >>>>>> the
> > >>>>>>> PIP are below.
> > >>>>>>> I hope we could discuss this thoroughly.
> > >>>>>>>
> > >>>>>>> Cheers,
> > >>>>>>> Richard
> > >>>>>>>
> > >>>>>>> PIP-31: Add support for transactional messaging
> > >>>>>>>
> > >>>>>>> Motivation: Pulsar currently could improve upon their system of
> > >>>> sending
> > >>>>>>> packets of data by implementing transactional messaging. This
> > >> system
> > >>>>>>> enforces eventual consistency within the system, and allows
> > >>> operations
> > >>>>>> to
> > >>>>>>> be performed atomically.
> > >>>>>>>
> > >>>>>>> Proposal:
> > >>>>>>>
> > >>>>>>> As described in the issue, we would implement the following policy
> > >>> in
> > >>>>>>> Producer and Pulsar Broker:
> > >>>>>>> 1. The producer produces the pre-processing transaction message.
> > >> At
> > >>>> this
> > >>>>>>> point, the broker will set the status of this message to unknown.
> > >>>>>>> 2. After the local transaction is successfully executed, the
> > >> commit
> > >>>>>> message
> > >>>>>>> is sent, otherwise the rollback message is sent.
> > >>>>>>> 3. The broker receives the message. If it is a commit message, it
> > >>>>>> modifies
> > >>>>>>> the transaction status to commit, and then sends an actual message
> > >>> to
> > >>>>>> the
> > >>>>>>> consumer queue. At this time, the consumer can consume the
> > >> message.
> > >>>>>>> Otherwise, the transaction status is modified to rollback. The
> > >>> message
> > >>>>>> will
> > >>>>>>> be discarded.
> > >>>>>>> 4. If at step 2, the producer is down or abnormal, at this time,
> > >> the
> > >>>>>> broker
> > >>>>>>> will periodically ask the specific producer for the status of the
> > >>>>>> message,
> > >>>>>>> and update the status according to the producer's response, and
> > >>>> process
> > >>>>>> it
> > >>>>>>> according to step 3, the action that comes down.
> > >>>>>>>
> > >>>>>>> Specific concerns:
> > >>>>>>> There are a number of things we will improve upon or add:
> > >>>>>>> - A configuration called ```maxMessageUnknownTime```. Consider
> > >> this
> > >>>>>>> scenario: the pre-processing transaction message is sent, but the
> > >>>>>> commit or
> > >>>>>>> rollback message is never received, which could mean that the
> > >> status
> > >>>> of
> > >>>>>> a
> > >>>>>>> message would be permanently unknown. To avoid this from
> > >> happening,
> > >>> we
> > >>>>>>> would need a config which limits the amount of time the status of
> > >> a
> > >>>>>> message
> > >>>>>>> could be unknown (i.e. ```maxMessageUnknownTime```) After that,
> > >> the
> > >>>>>> message
> > >>>>>>> would be discarded.
> > >>>>>>> - Logging would be updated to log the status of a message i.e.
> > >>>> UNKNOWN,
> > >>>>>>> ROLLBACK, or COMMITTED. This would allow the user to know whether
> > >> or
> > >>>>>> not a
> > >>>>>>> message had failed or fallen through.
> > >>>>>>>
> > >>>>>>> Possible Additional API:
> > >>>>>>> - We would add a method which allows the user to query the state
> > >> of
> > >>>> the
> > >>>>>>> message i.e. ```getStateOfMessage(long id)```
> > >>>>>>>
> > >>>>>>
> > >>>>>
> > >>>>
> > >>>
> > >>
> > >>
> > >> --
> > >> -Ali
> > >>
> >
> >

Re: PIP-31: Add support for transactional messaging

Posted by Sijie Guo <gu...@gmail.com>.
On Mon, Mar 18, 2019 at 6:54 PM Ivan Kelly <iv...@apache.org> wrote:

> Sorry it took me so long to take a look at this, last few weeks have
> been hectic.
>
> I still haven't gone through it fully, but putting the transaction
> buffer outside of the partition is fine with me. The thing I objected
> most to in previous discussions was having a separate transaction
> buffer per transaction as this would create a huge amount of metadata.
>

Sure. The per transaction buffer might still be useful for some use cases.
So I leave it extendable in the message header, so we can still have that
implementation in future.


>
> So, I would suggest go with the sidecar approach and remove the inline
> approach from the document to keep things simple.
>

I think inline approach is there just for comparison and make sure the
decisions are made based on tradeoffs.

The full proposal is describe in "a full proposal" section.


>
> I still have a bunch of questions about the design, but I need to chew
> it over for a while. It would be good to have some sequence diagrams
> of the interactions (https://sequencediagram.org/ <- this is a really
> handy tool for building them).


Does the diagram in overview work?
If it doesn't work, I can try this tool.


> We should precisely define how
> retention will work between the partition and the partition's
> transaction buffer, as it seems that that is the biggest can of worms
> in the whole thing.
>

The retention is done via a retention cursor. So we don't reimplement any
retention logic just for transaction buffer.
A simple way to think about this :

- if a transaction is aborted, the messages in transaction buffer are
marked as `DELETED` in retention cursor.
- if a transaction is committed, the messages in transaction buffer are
kept there. The messages are only marked as `DELETED` in retention cursor,
when the segment contains the transaction marker is deleted. We simply
maintains a mapping between transaction and its transaction marker in the
data partition, so we can quickly figure out whether the transaction
markers are deleted or not.

So all these are done via the "cursors" in Pulsar. There is really nothing
reimplemented.


>
> -Ivan
>
>
> On Fri, Mar 15, 2019 at 4:01 AM Sijie Guo <gu...@gmail.com> wrote:
> >
> > Any other more comments on this topic?
> >
> > - Sijie
> >
> > On Sun, Mar 10, 2019 at 8:57 PM Jia Zhai <zh...@gmail.com> wrote:
> >
> > > Thanks @Sijie for the PIP.
> > > It has with enough details for me, It looks great, especially for the
> > > sidecar
> > > approach. Left some comments.
> > >
> > > Best Regards.
> > >
> > >
> > > Jia Zhai
> > >
> > > Beijing, China
> > >
> > > Mobile: +86 15810491983
> > >
> > >
> > >
> > >
> > > On Fri, Mar 8, 2019 at 9:58 PM Sijie Guo <gu...@gmail.com> wrote:
> > >
> > > > Hi Team,
> > > >
> > > > I have written down all my thoughts around supporting transactional
> > > > streaming at Pulsar.
> > > >
> > > >
> > > >
> > >
> https://docs.google.com/document/d/145VYp09JKTw9jAT-7yNyFU255FptB2_B2Fye100ZXDI/edit#heading=h.bm5ainqxosrx
> > > >
> > > > Please take a look and feel free to comment on the google doc. We can
> > > start
> > > > from there.
> > > >
> > > > Also apologies first if there are in-consistency or typos or language
> > > > errors in the doc. feel free to fix them.
> > > >
> > > > Thanks,
> > > > Sijie
> > > >
> > > > On Tue, Mar 5, 2019 at 1:49 PM Sijie Guo <gu...@gmail.com> wrote:
> > > >
> > > > > Will send the detailed proposal. We can go from there.
> > > > >
> > > > > One interesting question I would like to reply here.
> > > > >
> > > > > > But this is more microbatching than streaming.
> > > > >
> > > > > I think people usually have a wrong impression about
> "microbatching" vs
> > > > > "streaming".
> > > > > The "microbatching" vs "streaming" are usually found in the context
> > > > > talking about spark streaming vs storm/flink.
> > > > > The context is more about how computing engine "scheduling"
> > > computations.
> > > > >
> > > > > In reality, "batching" (microbatching) is almost everywhere in a
> > > > > "streaming" pipeline. e.g. even in pulsar client, bookie journal.
> > > > > In the streaming world, you will still do "microbatching" for many
> > > > reasons
> > > > > (such as throughput, windowing semantics and such).
> > > > > but the "microbatching" here is not about "scheduling" anymore.
> > > > >
> > > > > - Sijie
> > > > >
> > > > > On Tue, Mar 5, 2019 at 4:20 AM Ivan Kelly <iv...@apache.org>
> wrote:
> > > > >
> > > > >> > > My replies inline assume the above, so if you have a different
> > > view
> > > > of
> > > > >> > > the general shape let me know.
> > > > >> > >
> > > > >> >
> > > > >> > Yes. We are on the same view of the general shape. I will write
> down
> > > > the
> > > > >> > details of my proposal and will share it with the community
> > > tomorrow.
> > > > >>
> > > > >> Please do. I think there's a lot of details missing.
> > > > >>
> > > > >> > Diagrams can easily drawn to compare the differences here. I
> will
> > > > >> > incorporate into the proposal and show the differences.
> > > > >>
> > > > >> Diagrams and detailed design would be very useful. I still see a
> bunch
> > > > >> of unknowns in your design.
> > > > >>
> > > > >> > I don't think it is a hard dependency. All these components
> should
> > > be
> > > > >> done
> > > > >> > by interfaces.
> > > > >>
> > > > >> The architecture you're proposing requires a metadata service
> that can
> > > > >> scale horizontally to be able to scale, so it is a hard
> dependency. An
> > > > >> implementation backed by ZK would be only a toy.
> > > > >>
> > > > >> > I think I know why do you think interleaving is okay now. In
> your
> > > > mind,
> > > > >> > transactions are carrying one message per partition.
> > > > >>
> > > > >> Well yes, or only a few. I think we have a very different view of
> how
> > > > >> transactions will be used. You seem to be favouring few large
> > > > >> transactions, where as what Matteo and I have discussed is many
> > > > >> smaller transactions, and this informs both designs. With large
> > > > >> transactions, you're basically micro batching, and you can afford
> to
> > > > >> make the individual transactions more expensive since you have
> fewer.
> > > > >> For many smaller transactions, we need to make the transaction
> itself
> > > > >> as cheap as possible.
> > > > >>
> > > > >> > A common case of transaction in streaming, is
> read-transfer-write:
> > > > read
> > > > >> a
> > > > >> > batch of messages, process them and write the results to pulsar
> and
> > > > >> > acknowledges the messages. If you are doing this in a 100ms
> window,
> > > > the
> > > > >> > data can still be large enough, especially the results can be
> > > multiple
> > > > >> > times of the input messages. With that being said, at high
> > > throughput
> > > > >> > transactional streaming, data can be large per transaction,
> > > > continuously
> > > > >> > storing entries of same transaction will have a huge benefit.
> > > > >>
> > > > >> Ok, sure, I'll give you that. If you are reusing a single
> transaction
> > > > >> for a lot of inputs there may be a lot of output messages on the
> same
> > > > >> topic. But this is more microbatching than streaming.
> > > > >>
> > > > >> > Another large category of use cases should be considered is
> "batch"
> > > > data
> > > > >> > processing. You definitely don't want your future "batch"-ish
> data
> > > > >> > processing workload to jump back-and-forth in ledgers. In that
> way,
> > > > >> entries
> > > > >> > of same transaction in same partition stored continuously will
> huge
> > > > >> > benefits.
> > > > >>
> > > > >> Why would you use transaction here? This is more of a usecase for
> > > > >> idempotent producer.
> > > > >>
> > > > >> > Transaction coordinators are responsible for committing,
> aborting
> > > and
> > > > >> > cleaning up transactions.
> > > > >>
> > > > >> How, not where. I'm my experience, the trickiest part in
> transaction
> > > > >> systems is the cleanup.
> > > > >>
> > > > >> > > I think having a ledger, or a shadow topic, per topic would
> work
> > > > fine.
> > > > >> > > There's no need for indexing. We can already look up  an
> message
> > > by
> > > > >> > > messageid. This message id should be part of the transaction
> > > commit
> > > > >> > > message, and subsequently part of the commit marker written to
> > > each
> > > > >> > > topic involved in the transaction.
> > > > >> >
> > > > >> >
> > > > >> > The assumption you have here is all the message ids can be
> stored
> > > into
> > > > >> > within one commit message or the commit markers.
> > > > >>
> > > > >> Are you expecting to have more than 100k messages per transaction?
> > > > >>
> > > > >> > > Caching is no different to any
> > > > >> > > other choice of storage for the message data.
> > > > >> >
> > > > >> > Caching behavior is very different from normal streaming case.
> Since
> > > > >> > accessing those entries of same transaction will be jump
> > > > back-and-forth
> > > > >> in
> > > > >> > the shadow topic.
> > > > >>
> > > > >> Why would you be jumping back and forth for the same transaction
> > > > >> within the same topic? You jump to the first message from that
> topic
> > > > >> in the transaction and read forward.
> > > > >>
> > > > >> > How are you going to delete data of aborted transactions in any
> form
> > > > of
> > > > >> > interleaving storage?
> > > > >> > If we don't compact, those data of aborted transactions will be
> left
> > > > >> there
> > > > >> > forever.
> > > > >>
> > > > >> Aborted transactions should be very rare. The only conflict
> should be
> > > > >> on acknowledge, and these should only conflict if multiple
> consumers
> > > > >> get the same message, which shouldn't be the case. In any case,
> they
> > > > >> should be naturally cleaned up with topic itself (see below).
> > > > >>
> > > > >> > If you don't rewrite shadow topic to the main topic, you either
> have
> > > > to
> > > > >> do
> > > > >> > compaction or retain the shadow topic forever, no?
> > > > >>
> > > > >> No. Lets say that the maximum age of a single transaction is 1h.
> That
> > > > >> means you only need to retain an hour more of data more in the
> shadow
> > > > >> topic as you would have to retain in the main topic. Of course we
> > > > >> wouldn't use wallclock time for this, but something around the low
> > > > >> watermark or something, but that's the basic idea. I haven't
> worked
> > > > >> out all the details.
> > > > >>
> > > > >> I look forward to seeing your full design.
> > > > >>
> > > > >> -Ivan
> > > > >>
> > > > >
> > > >
> > >
>

Re: PIP-31: Add support for transactional messaging

Posted by Ivan Kelly <iv...@apache.org>.
Sorry it took me so long to take a look at this, last few weeks have
been hectic.

I still haven't gone through it fully, but putting the transaction
buffer outside of the partition is fine with me. The thing I objected
most to in previous discussions was having a separate transaction
buffer per transaction as this would create a huge amount of metadata.

So, I would suggest go with the sidecar approach and remove the inline
approach from the document to keep things simple.

I still have a bunch of questions about the design, but I need to chew
it over for a while. It would be good to have some sequence diagrams
of the interactions (https://sequencediagram.org/ <- this is a really
handy tool for building them). We should precisely define how
retention will work between the partition and the partition's
transaction buffer, as it seems that that is the biggest can of worms
in the whole thing.

-Ivan


On Fri, Mar 15, 2019 at 4:01 AM Sijie Guo <gu...@gmail.com> wrote:
>
> Any other more comments on this topic?
>
> - Sijie
>
> On Sun, Mar 10, 2019 at 8:57 PM Jia Zhai <zh...@gmail.com> wrote:
>
> > Thanks @Sijie for the PIP.
> > It has with enough details for me, It looks great, especially for the
> > sidecar
> > approach. Left some comments.
> >
> > Best Regards.
> >
> >
> > Jia Zhai
> >
> > Beijing, China
> >
> > Mobile: +86 15810491983
> >
> >
> >
> >
> > On Fri, Mar 8, 2019 at 9:58 PM Sijie Guo <gu...@gmail.com> wrote:
> >
> > > Hi Team,
> > >
> > > I have written down all my thoughts around supporting transactional
> > > streaming at Pulsar.
> > >
> > >
> > >
> > https://docs.google.com/document/d/145VYp09JKTw9jAT-7yNyFU255FptB2_B2Fye100ZXDI/edit#heading=h.bm5ainqxosrx
> > >
> > > Please take a look and feel free to comment on the google doc. We can
> > start
> > > from there.
> > >
> > > Also apologies first if there are in-consistency or typos or language
> > > errors in the doc. feel free to fix them.
> > >
> > > Thanks,
> > > Sijie
> > >
> > > On Tue, Mar 5, 2019 at 1:49 PM Sijie Guo <gu...@gmail.com> wrote:
> > >
> > > > Will send the detailed proposal. We can go from there.
> > > >
> > > > One interesting question I would like to reply here.
> > > >
> > > > > But this is more microbatching than streaming.
> > > >
> > > > I think people usually have a wrong impression about "microbatching" vs
> > > > "streaming".
> > > > The "microbatching" vs "streaming" are usually found in the context
> > > > talking about spark streaming vs storm/flink.
> > > > The context is more about how computing engine "scheduling"
> > computations.
> > > >
> > > > In reality, "batching" (microbatching) is almost everywhere in a
> > > > "streaming" pipeline. e.g. even in pulsar client, bookie journal.
> > > > In the streaming world, you will still do "microbatching" for many
> > > reasons
> > > > (such as throughput, windowing semantics and such).
> > > > but the "microbatching" here is not about "scheduling" anymore.
> > > >
> > > > - Sijie
> > > >
> > > > On Tue, Mar 5, 2019 at 4:20 AM Ivan Kelly <iv...@apache.org> wrote:
> > > >
> > > >> > > My replies inline assume the above, so if you have a different
> > view
> > > of
> > > >> > > the general shape let me know.
> > > >> > >
> > > >> >
> > > >> > Yes. We are on the same view of the general shape. I will write down
> > > the
> > > >> > details of my proposal and will share it with the community
> > tomorrow.
> > > >>
> > > >> Please do. I think there's a lot of details missing.
> > > >>
> > > >> > Diagrams can easily drawn to compare the differences here. I will
> > > >> > incorporate into the proposal and show the differences.
> > > >>
> > > >> Diagrams and detailed design would be very useful. I still see a bunch
> > > >> of unknowns in your design.
> > > >>
> > > >> > I don't think it is a hard dependency. All these components should
> > be
> > > >> done
> > > >> > by interfaces.
> > > >>
> > > >> The architecture you're proposing requires a metadata service that can
> > > >> scale horizontally to be able to scale, so it is a hard dependency. An
> > > >> implementation backed by ZK would be only a toy.
> > > >>
> > > >> > I think I know why do you think interleaving is okay now. In your
> > > mind,
> > > >> > transactions are carrying one message per partition.
> > > >>
> > > >> Well yes, or only a few. I think we have a very different view of how
> > > >> transactions will be used. You seem to be favouring few large
> > > >> transactions, where as what Matteo and I have discussed is many
> > > >> smaller transactions, and this informs both designs. With large
> > > >> transactions, you're basically micro batching, and you can afford to
> > > >> make the individual transactions more expensive since you have fewer.
> > > >> For many smaller transactions, we need to make the transaction itself
> > > >> as cheap as possible.
> > > >>
> > > >> > A common case of transaction in streaming, is read-transfer-write:
> > > read
> > > >> a
> > > >> > batch of messages, process them and write the results to pulsar and
> > > >> > acknowledges the messages. If you are doing this in a 100ms window,
> > > the
> > > >> > data can still be large enough, especially the results can be
> > multiple
> > > >> > times of the input messages. With that being said, at high
> > throughput
> > > >> > transactional streaming, data can be large per transaction,
> > > continuously
> > > >> > storing entries of same transaction will have a huge benefit.
> > > >>
> > > >> Ok, sure, I'll give you that. If you are reusing a single transaction
> > > >> for a lot of inputs there may be a lot of output messages on the same
> > > >> topic. But this is more microbatching than streaming.
> > > >>
> > > >> > Another large category of use cases should be considered is "batch"
> > > data
> > > >> > processing. You definitely don't want your future "batch"-ish data
> > > >> > processing workload to jump back-and-forth in ledgers. In that way,
> > > >> entries
> > > >> > of same transaction in same partition stored continuously will huge
> > > >> > benefits.
> > > >>
> > > >> Why would you use transaction here? This is more of a usecase for
> > > >> idempotent producer.
> > > >>
> > > >> > Transaction coordinators are responsible for committing, aborting
> > and
> > > >> > cleaning up transactions.
> > > >>
> > > >> How, not where. I'm my experience, the trickiest part in transaction
> > > >> systems is the cleanup.
> > > >>
> > > >> > > I think having a ledger, or a shadow topic, per topic would work
> > > fine.
> > > >> > > There's no need for indexing. We can already look up  an message
> > by
> > > >> > > messageid. This message id should be part of the transaction
> > commit
> > > >> > > message, and subsequently part of the commit marker written to
> > each
> > > >> > > topic involved in the transaction.
> > > >> >
> > > >> >
> > > >> > The assumption you have here is all the message ids can be stored
> > into
> > > >> > within one commit message or the commit markers.
> > > >>
> > > >> Are you expecting to have more than 100k messages per transaction?
> > > >>
> > > >> > > Caching is no different to any
> > > >> > > other choice of storage for the message data.
> > > >> >
> > > >> > Caching behavior is very different from normal streaming case. Since
> > > >> > accessing those entries of same transaction will be jump
> > > back-and-forth
> > > >> in
> > > >> > the shadow topic.
> > > >>
> > > >> Why would you be jumping back and forth for the same transaction
> > > >> within the same topic? You jump to the first message from that topic
> > > >> in the transaction and read forward.
> > > >>
> > > >> > How are you going to delete data of aborted transactions in any form
> > > of
> > > >> > interleaving storage?
> > > >> > If we don't compact, those data of aborted transactions will be left
> > > >> there
> > > >> > forever.
> > > >>
> > > >> Aborted transactions should be very rare. The only conflict should be
> > > >> on acknowledge, and these should only conflict if multiple consumers
> > > >> get the same message, which shouldn't be the case. In any case, they
> > > >> should be naturally cleaned up with topic itself (see below).
> > > >>
> > > >> > If you don't rewrite shadow topic to the main topic, you either have
> > > to
> > > >> do
> > > >> > compaction or retain the shadow topic forever, no?
> > > >>
> > > >> No. Lets say that the maximum age of a single transaction is 1h. That
> > > >> means you only need to retain an hour more of data more in the shadow
> > > >> topic as you would have to retain in the main topic. Of course we
> > > >> wouldn't use wallclock time for this, but something around the low
> > > >> watermark or something, but that's the basic idea. I haven't worked
> > > >> out all the details.
> > > >>
> > > >> I look forward to seeing your full design.
> > > >>
> > > >> -Ivan
> > > >>
> > > >
> > >
> >

Re: PIP-31: Add support for transactional messaging

Posted by Sijie Guo <gu...@gmail.com>.
Any other more comments on this topic?

- Sijie

On Sun, Mar 10, 2019 at 8:57 PM Jia Zhai <zh...@gmail.com> wrote:

> Thanks @Sijie for the PIP.
> It has with enough details for me, It looks great, especially for the
> sidecar
> approach. Left some comments.
>
> Best Regards.
>
>
> Jia Zhai
>
> Beijing, China
>
> Mobile: +86 15810491983
>
>
>
>
> On Fri, Mar 8, 2019 at 9:58 PM Sijie Guo <gu...@gmail.com> wrote:
>
> > Hi Team,
> >
> > I have written down all my thoughts around supporting transactional
> > streaming at Pulsar.
> >
> >
> >
> https://docs.google.com/document/d/145VYp09JKTw9jAT-7yNyFU255FptB2_B2Fye100ZXDI/edit#heading=h.bm5ainqxosrx
> >
> > Please take a look and feel free to comment on the google doc. We can
> start
> > from there.
> >
> > Also apologies first if there are in-consistency or typos or language
> > errors in the doc. feel free to fix them.
> >
> > Thanks,
> > Sijie
> >
> > On Tue, Mar 5, 2019 at 1:49 PM Sijie Guo <gu...@gmail.com> wrote:
> >
> > > Will send the detailed proposal. We can go from there.
> > >
> > > One interesting question I would like to reply here.
> > >
> > > > But this is more microbatching than streaming.
> > >
> > > I think people usually have a wrong impression about "microbatching" vs
> > > "streaming".
> > > The "microbatching" vs "streaming" are usually found in the context
> > > talking about spark streaming vs storm/flink.
> > > The context is more about how computing engine "scheduling"
> computations.
> > >
> > > In reality, "batching" (microbatching) is almost everywhere in a
> > > "streaming" pipeline. e.g. even in pulsar client, bookie journal.
> > > In the streaming world, you will still do "microbatching" for many
> > reasons
> > > (such as throughput, windowing semantics and such).
> > > but the "microbatching" here is not about "scheduling" anymore.
> > >
> > > - Sijie
> > >
> > > On Tue, Mar 5, 2019 at 4:20 AM Ivan Kelly <iv...@apache.org> wrote:
> > >
> > >> > > My replies inline assume the above, so if you have a different
> view
> > of
> > >> > > the general shape let me know.
> > >> > >
> > >> >
> > >> > Yes. We are on the same view of the general shape. I will write down
> > the
> > >> > details of my proposal and will share it with the community
> tomorrow.
> > >>
> > >> Please do. I think there's a lot of details missing.
> > >>
> > >> > Diagrams can easily drawn to compare the differences here. I will
> > >> > incorporate into the proposal and show the differences.
> > >>
> > >> Diagrams and detailed design would be very useful. I still see a bunch
> > >> of unknowns in your design.
> > >>
> > >> > I don't think it is a hard dependency. All these components should
> be
> > >> done
> > >> > by interfaces.
> > >>
> > >> The architecture you're proposing requires a metadata service that can
> > >> scale horizontally to be able to scale, so it is a hard dependency. An
> > >> implementation backed by ZK would be only a toy.
> > >>
> > >> > I think I know why do you think interleaving is okay now. In your
> > mind,
> > >> > transactions are carrying one message per partition.
> > >>
> > >> Well yes, or only a few. I think we have a very different view of how
> > >> transactions will be used. You seem to be favouring few large
> > >> transactions, where as what Matteo and I have discussed is many
> > >> smaller transactions, and this informs both designs. With large
> > >> transactions, you're basically micro batching, and you can afford to
> > >> make the individual transactions more expensive since you have fewer.
> > >> For many smaller transactions, we need to make the transaction itself
> > >> as cheap as possible.
> > >>
> > >> > A common case of transaction in streaming, is read-transfer-write:
> > read
> > >> a
> > >> > batch of messages, process them and write the results to pulsar and
> > >> > acknowledges the messages. If you are doing this in a 100ms window,
> > the
> > >> > data can still be large enough, especially the results can be
> multiple
> > >> > times of the input messages. With that being said, at high
> throughput
> > >> > transactional streaming, data can be large per transaction,
> > continuously
> > >> > storing entries of same transaction will have a huge benefit.
> > >>
> > >> Ok, sure, I'll give you that. If you are reusing a single transaction
> > >> for a lot of inputs there may be a lot of output messages on the same
> > >> topic. But this is more microbatching than streaming.
> > >>
> > >> > Another large category of use cases should be considered is "batch"
> > data
> > >> > processing. You definitely don't want your future "batch"-ish data
> > >> > processing workload to jump back-and-forth in ledgers. In that way,
> > >> entries
> > >> > of same transaction in same partition stored continuously will huge
> > >> > benefits.
> > >>
> > >> Why would you use transaction here? This is more of a usecase for
> > >> idempotent producer.
> > >>
> > >> > Transaction coordinators are responsible for committing, aborting
> and
> > >> > cleaning up transactions.
> > >>
> > >> How, not where. I'm my experience, the trickiest part in transaction
> > >> systems is the cleanup.
> > >>
> > >> > > I think having a ledger, or a shadow topic, per topic would work
> > fine.
> > >> > > There's no need for indexing. We can already look up  an message
> by
> > >> > > messageid. This message id should be part of the transaction
> commit
> > >> > > message, and subsequently part of the commit marker written to
> each
> > >> > > topic involved in the transaction.
> > >> >
> > >> >
> > >> > The assumption you have here is all the message ids can be stored
> into
> > >> > within one commit message or the commit markers.
> > >>
> > >> Are you expecting to have more than 100k messages per transaction?
> > >>
> > >> > > Caching is no different to any
> > >> > > other choice of storage for the message data.
> > >> >
> > >> > Caching behavior is very different from normal streaming case. Since
> > >> > accessing those entries of same transaction will be jump
> > back-and-forth
> > >> in
> > >> > the shadow topic.
> > >>
> > >> Why would you be jumping back and forth for the same transaction
> > >> within the same topic? You jump to the first message from that topic
> > >> in the transaction and read forward.
> > >>
> > >> > How are you going to delete data of aborted transactions in any form
> > of
> > >> > interleaving storage?
> > >> > If we don't compact, those data of aborted transactions will be left
> > >> there
> > >> > forever.
> > >>
> > >> Aborted transactions should be very rare. The only conflict should be
> > >> on acknowledge, and these should only conflict if multiple consumers
> > >> get the same message, which shouldn't be the case. In any case, they
> > >> should be naturally cleaned up with topic itself (see below).
> > >>
> > >> > If you don't rewrite shadow topic to the main topic, you either have
> > to
> > >> do
> > >> > compaction or retain the shadow topic forever, no?
> > >>
> > >> No. Lets say that the maximum age of a single transaction is 1h. That
> > >> means you only need to retain an hour more of data more in the shadow
> > >> topic as you would have to retain in the main topic. Of course we
> > >> wouldn't use wallclock time for this, but something around the low
> > >> watermark or something, but that's the basic idea. I haven't worked
> > >> out all the details.
> > >>
> > >> I look forward to seeing your full design.
> > >>
> > >> -Ivan
> > >>
> > >
> >
>

Re: PIP-31: Add support for transactional messaging

Posted by Jia Zhai <zh...@gmail.com>.
Thanks @Sijie for the PIP.
It has with enough details for me, It looks great, especially for the sidecar
approach. Left some comments.

Best Regards.


Jia Zhai

Beijing, China

Mobile: +86 15810491983




On Fri, Mar 8, 2019 at 9:58 PM Sijie Guo <gu...@gmail.com> wrote:

> Hi Team,
>
> I have written down all my thoughts around supporting transactional
> streaming at Pulsar.
>
>
> https://docs.google.com/document/d/145VYp09JKTw9jAT-7yNyFU255FptB2_B2Fye100ZXDI/edit#heading=h.bm5ainqxosrx
>
> Please take a look and feel free to comment on the google doc. We can start
> from there.
>
> Also apologies first if there are in-consistency or typos or language
> errors in the doc. feel free to fix them.
>
> Thanks,
> Sijie
>
> On Tue, Mar 5, 2019 at 1:49 PM Sijie Guo <gu...@gmail.com> wrote:
>
> > Will send the detailed proposal. We can go from there.
> >
> > One interesting question I would like to reply here.
> >
> > > But this is more microbatching than streaming.
> >
> > I think people usually have a wrong impression about "microbatching" vs
> > "streaming".
> > The "microbatching" vs "streaming" are usually found in the context
> > talking about spark streaming vs storm/flink.
> > The context is more about how computing engine "scheduling" computations.
> >
> > In reality, "batching" (microbatching) is almost everywhere in a
> > "streaming" pipeline. e.g. even in pulsar client, bookie journal.
> > In the streaming world, you will still do "microbatching" for many
> reasons
> > (such as throughput, windowing semantics and such).
> > but the "microbatching" here is not about "scheduling" anymore.
> >
> > - Sijie
> >
> > On Tue, Mar 5, 2019 at 4:20 AM Ivan Kelly <iv...@apache.org> wrote:
> >
> >> > > My replies inline assume the above, so if you have a different view
> of
> >> > > the general shape let me know.
> >> > >
> >> >
> >> > Yes. We are on the same view of the general shape. I will write down
> the
> >> > details of my proposal and will share it with the community tomorrow.
> >>
> >> Please do. I think there's a lot of details missing.
> >>
> >> > Diagrams can easily drawn to compare the differences here. I will
> >> > incorporate into the proposal and show the differences.
> >>
> >> Diagrams and detailed design would be very useful. I still see a bunch
> >> of unknowns in your design.
> >>
> >> > I don't think it is a hard dependency. All these components should be
> >> done
> >> > by interfaces.
> >>
> >> The architecture you're proposing requires a metadata service that can
> >> scale horizontally to be able to scale, so it is a hard dependency. An
> >> implementation backed by ZK would be only a toy.
> >>
> >> > I think I know why do you think interleaving is okay now. In your
> mind,
> >> > transactions are carrying one message per partition.
> >>
> >> Well yes, or only a few. I think we have a very different view of how
> >> transactions will be used. You seem to be favouring few large
> >> transactions, where as what Matteo and I have discussed is many
> >> smaller transactions, and this informs both designs. With large
> >> transactions, you're basically micro batching, and you can afford to
> >> make the individual transactions more expensive since you have fewer.
> >> For many smaller transactions, we need to make the transaction itself
> >> as cheap as possible.
> >>
> >> > A common case of transaction in streaming, is read-transfer-write:
> read
> >> a
> >> > batch of messages, process them and write the results to pulsar and
> >> > acknowledges the messages. If you are doing this in a 100ms window,
> the
> >> > data can still be large enough, especially the results can be multiple
> >> > times of the input messages. With that being said, at high throughput
> >> > transactional streaming, data can be large per transaction,
> continuously
> >> > storing entries of same transaction will have a huge benefit.
> >>
> >> Ok, sure, I'll give you that. If you are reusing a single transaction
> >> for a lot of inputs there may be a lot of output messages on the same
> >> topic. But this is more microbatching than streaming.
> >>
> >> > Another large category of use cases should be considered is "batch"
> data
> >> > processing. You definitely don't want your future "batch"-ish data
> >> > processing workload to jump back-and-forth in ledgers. In that way,
> >> entries
> >> > of same transaction in same partition stored continuously will huge
> >> > benefits.
> >>
> >> Why would you use transaction here? This is more of a usecase for
> >> idempotent producer.
> >>
> >> > Transaction coordinators are responsible for committing, aborting and
> >> > cleaning up transactions.
> >>
> >> How, not where. I'm my experience, the trickiest part in transaction
> >> systems is the cleanup.
> >>
> >> > > I think having a ledger, or a shadow topic, per topic would work
> fine.
> >> > > There's no need for indexing. We can already look up  an message by
> >> > > messageid. This message id should be part of the transaction commit
> >> > > message, and subsequently part of the commit marker written to each
> >> > > topic involved in the transaction.
> >> >
> >> >
> >> > The assumption you have here is all the message ids can be stored into
> >> > within one commit message or the commit markers.
> >>
> >> Are you expecting to have more than 100k messages per transaction?
> >>
> >> > > Caching is no different to any
> >> > > other choice of storage for the message data.
> >> >
> >> > Caching behavior is very different from normal streaming case. Since
> >> > accessing those entries of same transaction will be jump
> back-and-forth
> >> in
> >> > the shadow topic.
> >>
> >> Why would you be jumping back and forth for the same transaction
> >> within the same topic? You jump to the first message from that topic
> >> in the transaction and read forward.
> >>
> >> > How are you going to delete data of aborted transactions in any form
> of
> >> > interleaving storage?
> >> > If we don't compact, those data of aborted transactions will be left
> >> there
> >> > forever.
> >>
> >> Aborted transactions should be very rare. The only conflict should be
> >> on acknowledge, and these should only conflict if multiple consumers
> >> get the same message, which shouldn't be the case. In any case, they
> >> should be naturally cleaned up with topic itself (see below).
> >>
> >> > If you don't rewrite shadow topic to the main topic, you either have
> to
> >> do
> >> > compaction or retain the shadow topic forever, no?
> >>
> >> No. Lets say that the maximum age of a single transaction is 1h. That
> >> means you only need to retain an hour more of data more in the shadow
> >> topic as you would have to retain in the main topic. Of course we
> >> wouldn't use wallclock time for this, but something around the low
> >> watermark or something, but that's the basic idea. I haven't worked
> >> out all the details.
> >>
> >> I look forward to seeing your full design.
> >>
> >> -Ivan
> >>
> >
>

Re: PIP-31: Add support for transactional messaging

Posted by Sijie Guo <gu...@gmail.com>.
Hi Team,

I have written down all my thoughts around supporting transactional
streaming at Pulsar.

https://docs.google.com/document/d/145VYp09JKTw9jAT-7yNyFU255FptB2_B2Fye100ZXDI/edit#heading=h.bm5ainqxosrx

Please take a look and feel free to comment on the google doc. We can start
from there.

Also apologies first if there are in-consistency or typos or language
errors in the doc. feel free to fix them.

Thanks,
Sijie

On Tue, Mar 5, 2019 at 1:49 PM Sijie Guo <gu...@gmail.com> wrote:

> Will send the detailed proposal. We can go from there.
>
> One interesting question I would like to reply here.
>
> > But this is more microbatching than streaming.
>
> I think people usually have a wrong impression about "microbatching" vs
> "streaming".
> The "microbatching" vs "streaming" are usually found in the context
> talking about spark streaming vs storm/flink.
> The context is more about how computing engine "scheduling" computations.
>
> In reality, "batching" (microbatching) is almost everywhere in a
> "streaming" pipeline. e.g. even in pulsar client, bookie journal.
> In the streaming world, you will still do "microbatching" for many reasons
> (such as throughput, windowing semantics and such).
> but the "microbatching" here is not about "scheduling" anymore.
>
> - Sijie
>
> On Tue, Mar 5, 2019 at 4:20 AM Ivan Kelly <iv...@apache.org> wrote:
>
>> > > My replies inline assume the above, so if you have a different view of
>> > > the general shape let me know.
>> > >
>> >
>> > Yes. We are on the same view of the general shape. I will write down the
>> > details of my proposal and will share it with the community tomorrow.
>>
>> Please do. I think there's a lot of details missing.
>>
>> > Diagrams can easily drawn to compare the differences here. I will
>> > incorporate into the proposal and show the differences.
>>
>> Diagrams and detailed design would be very useful. I still see a bunch
>> of unknowns in your design.
>>
>> > I don't think it is a hard dependency. All these components should be
>> done
>> > by interfaces.
>>
>> The architecture you're proposing requires a metadata service that can
>> scale horizontally to be able to scale, so it is a hard dependency. An
>> implementation backed by ZK would be only a toy.
>>
>> > I think I know why do you think interleaving is okay now. In your mind,
>> > transactions are carrying one message per partition.
>>
>> Well yes, or only a few. I think we have a very different view of how
>> transactions will be used. You seem to be favouring few large
>> transactions, where as what Matteo and I have discussed is many
>> smaller transactions, and this informs both designs. With large
>> transactions, you're basically micro batching, and you can afford to
>> make the individual transactions more expensive since you have fewer.
>> For many smaller transactions, we need to make the transaction itself
>> as cheap as possible.
>>
>> > A common case of transaction in streaming, is read-transfer-write: read
>> a
>> > batch of messages, process them and write the results to pulsar and
>> > acknowledges the messages. If you are doing this in a 100ms window, the
>> > data can still be large enough, especially the results can be multiple
>> > times of the input messages. With that being said, at high throughput
>> > transactional streaming, data can be large per transaction, continuously
>> > storing entries of same transaction will have a huge benefit.
>>
>> Ok, sure, I'll give you that. If you are reusing a single transaction
>> for a lot of inputs there may be a lot of output messages on the same
>> topic. But this is more microbatching than streaming.
>>
>> > Another large category of use cases should be considered is "batch" data
>> > processing. You definitely don't want your future "batch"-ish data
>> > processing workload to jump back-and-forth in ledgers. In that way,
>> entries
>> > of same transaction in same partition stored continuously will huge
>> > benefits.
>>
>> Why would you use transaction here? This is more of a usecase for
>> idempotent producer.
>>
>> > Transaction coordinators are responsible for committing, aborting and
>> > cleaning up transactions.
>>
>> How, not where. I'm my experience, the trickiest part in transaction
>> systems is the cleanup.
>>
>> > > I think having a ledger, or a shadow topic, per topic would work fine.
>> > > There's no need for indexing. We can already look up  an message by
>> > > messageid. This message id should be part of the transaction commit
>> > > message, and subsequently part of the commit marker written to each
>> > > topic involved in the transaction.
>> >
>> >
>> > The assumption you have here is all the message ids can be stored into
>> > within one commit message or the commit markers.
>>
>> Are you expecting to have more than 100k messages per transaction?
>>
>> > > Caching is no different to any
>> > > other choice of storage for the message data.
>> >
>> > Caching behavior is very different from normal streaming case. Since
>> > accessing those entries of same transaction will be jump back-and-forth
>> in
>> > the shadow topic.
>>
>> Why would you be jumping back and forth for the same transaction
>> within the same topic? You jump to the first message from that topic
>> in the transaction and read forward.
>>
>> > How are you going to delete data of aborted transactions in any form of
>> > interleaving storage?
>> > If we don't compact, those data of aborted transactions will be left
>> there
>> > forever.
>>
>> Aborted transactions should be very rare. The only conflict should be
>> on acknowledge, and these should only conflict if multiple consumers
>> get the same message, which shouldn't be the case. In any case, they
>> should be naturally cleaned up with topic itself (see below).
>>
>> > If you don't rewrite shadow topic to the main topic, you either have to
>> do
>> > compaction or retain the shadow topic forever, no?
>>
>> No. Lets say that the maximum age of a single transaction is 1h. That
>> means you only need to retain an hour more of data more in the shadow
>> topic as you would have to retain in the main topic. Of course we
>> wouldn't use wallclock time for this, but something around the low
>> watermark or something, but that's the basic idea. I haven't worked
>> out all the details.
>>
>> I look forward to seeing your full design.
>>
>> -Ivan
>>
>

Re: PIP-31: Add support for transactional messaging

Posted by Sijie Guo <gu...@gmail.com>.
Will send the detailed proposal. We can go from there.

One interesting question I would like to reply here.

> But this is more microbatching than streaming.

I think people usually have a wrong impression about "microbatching" vs
"streaming".
The "microbatching" vs "streaming" are usually found in the context talking
about spark streaming vs storm/flink.
The context is more about how computing engine "scheduling" computations.

In reality, "batching" (microbatching) is almost everywhere in a
"streaming" pipeline. e.g. even in pulsar client, bookie journal.
In the streaming world, you will still do "microbatching" for many reasons
(such as throughput, windowing semantics and such).
but the "microbatching" here is not about "scheduling" anymore.

- Sijie

On Tue, Mar 5, 2019 at 4:20 AM Ivan Kelly <iv...@apache.org> wrote:

> > > My replies inline assume the above, so if you have a different view of
> > > the general shape let me know.
> > >
> >
> > Yes. We are on the same view of the general shape. I will write down the
> > details of my proposal and will share it with the community tomorrow.
>
> Please do. I think there's a lot of details missing.
>
> > Diagrams can easily drawn to compare the differences here. I will
> > incorporate into the proposal and show the differences.
>
> Diagrams and detailed design would be very useful. I still see a bunch
> of unknowns in your design.
>
> > I don't think it is a hard dependency. All these components should be
> done
> > by interfaces.
>
> The architecture you're proposing requires a metadata service that can
> scale horizontally to be able to scale, so it is a hard dependency. An
> implementation backed by ZK would be only a toy.
>
> > I think I know why do you think interleaving is okay now. In your mind,
> > transactions are carrying one message per partition.
>
> Well yes, or only a few. I think we have a very different view of how
> transactions will be used. You seem to be favouring few large
> transactions, where as what Matteo and I have discussed is many
> smaller transactions, and this informs both designs. With large
> transactions, you're basically micro batching, and you can afford to
> make the individual transactions more expensive since you have fewer.
> For many smaller transactions, we need to make the transaction itself
> as cheap as possible.
>
> > A common case of transaction in streaming, is read-transfer-write: read a
> > batch of messages, process them and write the results to pulsar and
> > acknowledges the messages. If you are doing this in a 100ms window, the
> > data can still be large enough, especially the results can be multiple
> > times of the input messages. With that being said, at high throughput
> > transactional streaming, data can be large per transaction, continuously
> > storing entries of same transaction will have a huge benefit.
>
> Ok, sure, I'll give you that. If you are reusing a single transaction
> for a lot of inputs there may be a lot of output messages on the same
> topic. But this is more microbatching than streaming.
>
> > Another large category of use cases should be considered is "batch" data
> > processing. You definitely don't want your future "batch"-ish data
> > processing workload to jump back-and-forth in ledgers. In that way,
> entries
> > of same transaction in same partition stored continuously will huge
> > benefits.
>
> Why would you use transaction here? This is more of a usecase for
> idempotent producer.
>
> > Transaction coordinators are responsible for committing, aborting and
> > cleaning up transactions.
>
> How, not where. I'm my experience, the trickiest part in transaction
> systems is the cleanup.
>
> > > I think having a ledger, or a shadow topic, per topic would work fine.
> > > There's no need for indexing. We can already look up  an message by
> > > messageid. This message id should be part of the transaction commit
> > > message, and subsequently part of the commit marker written to each
> > > topic involved in the transaction.
> >
> >
> > The assumption you have here is all the message ids can be stored into
> > within one commit message or the commit markers.
>
> Are you expecting to have more than 100k messages per transaction?
>
> > > Caching is no different to any
> > > other choice of storage for the message data.
> >
> > Caching behavior is very different from normal streaming case. Since
> > accessing those entries of same transaction will be jump back-and-forth
> in
> > the shadow topic.
>
> Why would you be jumping back and forth for the same transaction
> within the same topic? You jump to the first message from that topic
> in the transaction and read forward.
>
> > How are you going to delete data of aborted transactions in any form of
> > interleaving storage?
> > If we don't compact, those data of aborted transactions will be left
> there
> > forever.
>
> Aborted transactions should be very rare. The only conflict should be
> on acknowledge, and these should only conflict if multiple consumers
> get the same message, which shouldn't be the case. In any case, they
> should be naturally cleaned up with topic itself (see below).
>
> > If you don't rewrite shadow topic to the main topic, you either have to
> do
> > compaction or retain the shadow topic forever, no?
>
> No. Lets say that the maximum age of a single transaction is 1h. That
> means you only need to retain an hour more of data more in the shadow
> topic as you would have to retain in the main topic. Of course we
> wouldn't use wallclock time for this, but something around the low
> watermark or something, but that's the basic idea. I haven't worked
> out all the details.
>
> I look forward to seeing your full design.
>
> -Ivan
>

Re: PIP-31: Add support for transactional messaging

Posted by Ivan Kelly <iv...@apache.org>.
> > My replies inline assume the above, so if you have a different view of
> > the general shape let me know.
> >
>
> Yes. We are on the same view of the general shape. I will write down the
> details of my proposal and will share it with the community tomorrow.

Please do. I think there's a lot of details missing.

> Diagrams can easily drawn to compare the differences here. I will
> incorporate into the proposal and show the differences.

Diagrams and detailed design would be very useful. I still see a bunch
of unknowns in your design.

> I don't think it is a hard dependency. All these components should be done
> by interfaces.

The architecture you're proposing requires a metadata service that can
scale horizontally to be able to scale, so it is a hard dependency. An
implementation backed by ZK would be only a toy.

> I think I know why do you think interleaving is okay now. In your mind,
> transactions are carrying one message per partition.

Well yes, or only a few. I think we have a very different view of how
transactions will be used. You seem to be favouring few large
transactions, where as what Matteo and I have discussed is many
smaller transactions, and this informs both designs. With large
transactions, you're basically micro batching, and you can afford to
make the individual transactions more expensive since you have fewer.
For many smaller transactions, we need to make the transaction itself
as cheap as possible.

> A common case of transaction in streaming, is read-transfer-write: read a
> batch of messages, process them and write the results to pulsar and
> acknowledges the messages. If you are doing this in a 100ms window, the
> data can still be large enough, especially the results can be multiple
> times of the input messages. With that being said, at high throughput
> transactional streaming, data can be large per transaction, continuously
> storing entries of same transaction will have a huge benefit.

Ok, sure, I'll give you that. If you are reusing a single transaction
for a lot of inputs there may be a lot of output messages on the same
topic. But this is more microbatching than streaming.

> Another large category of use cases should be considered is "batch" data
> processing. You definitely don't want your future "batch"-ish data
> processing workload to jump back-and-forth in ledgers. In that way, entries
> of same transaction in same partition stored continuously will huge
> benefits.

Why would you use transaction here? This is more of a usecase for
idempotent producer.

> Transaction coordinators are responsible for committing, aborting and
> cleaning up transactions.

How, not where. I'm my experience, the trickiest part in transaction
systems is the cleanup.

> > I think having a ledger, or a shadow topic, per topic would work fine.
> > There's no need for indexing. We can already look up  an message by
> > messageid. This message id should be part of the transaction commit
> > message, and subsequently part of the commit marker written to each
> > topic involved in the transaction.
>
>
> The assumption you have here is all the message ids can be stored into
> within one commit message or the commit markers.

Are you expecting to have more than 100k messages per transaction?

> > Caching is no different to any
> > other choice of storage for the message data.
>
> Caching behavior is very different from normal streaming case. Since
> accessing those entries of same transaction will be jump back-and-forth in
> the shadow topic.

Why would you be jumping back and forth for the same transaction
within the same topic? You jump to the first message from that topic
in the transaction and read forward.

> How are you going to delete data of aborted transactions in any form of
> interleaving storage?
> If we don't compact, those data of aborted transactions will be left there
> forever.

Aborted transactions should be very rare. The only conflict should be
on acknowledge, and these should only conflict if multiple consumers
get the same message, which shouldn't be the case. In any case, they
should be naturally cleaned up with topic itself (see below).

> If you don't rewrite shadow topic to the main topic, you either have to do
> compaction or retain the shadow topic forever, no?

No. Lets say that the maximum age of a single transaction is 1h. That
means you only need to retain an hour more of data more in the shadow
topic as you would have to retain in the main topic. Of course we
wouldn't use wallclock time for this, but something around the low
watermark or something, but that's the basic idea. I haven't worked
out all the details.

I look forward to seeing your full design.

-Ivan

Re: PIP-31: Add support for transactional messaging

Posted by Sijie Guo <gu...@gmail.com>.
On Tue, Mar 5, 2019 at 12:37 AM Ivan Kelly <iv...@apache.org> wrote:

> I think we agree on the general shape of the design as follows.
>
> - Transactions store message data somewhere that it is not
> materialized to the client immediately.
> - On commit, a single message is written to some log
> - Commit messages are then written to the topics of the logs in the
> transaction, which point to the message data and which materialize it
> to the client?
>
> My replies inline assume the above, so if you have a different view of
> the general shape let me know.
>

Yes. We are on the same view of the general shape. I will write down the
details of my proposal and will share it with the community tomorrow.


>
> > Database is very different from a streaming system. Database storage is
> > optimized for randomized
> > accesses, where data tends to be organized in key + version ordering,
> where
> > interleaving committed
> > and uncommitted data is fine.
> > However in a streaming system, interleaving means you have to go
> > back-and-forth between transactions,
>
> Whether we put it interleaved or in a separate ledger, we are going to
> have to do a random read. The only way to avoid a random read is to
> rewrite all the transaction messages after the transaction has
> committed, which means write amplification.
>

There are two layered "random" read. One is "logically" at ledger level,
you need to have some sort of scanning or indexing to jump back-and-forth
between entries, this is at logical level; the other one is the "really" IO.

If you are putting data in a separate ledger, logically the dispatcher will
see entries in order. physically, bookies will optimize storing entries of
same ledgers. for accessing that data of same transaction are most likely
in order.

If you are putting data in an interleaved way, you have implemented logic
at the logical layer (broker) to jump back-and-forth; at storage layer,
since data are interleaved in same ledger, so the access pattern is also
bad. Even you said, the number of transactions can be bound, the access
pattern is not as good as putting in the first option.

Diagrams can easily drawn to compare the differences here. I will
incorporate into the proposal and show the differences.


>
> > I was thinking of one (or multiple) ledger per partition per transaction.
> > this is the simplest solution, where you can delete ledger to abort a
> > transaction. However it will put a pressure to metadata, but we can
> address
> > that by using table service to store those metadata as well as
> transaction
> > statuses.
>
> This is a huge amount of pressure on metadata. With the current
> metadata service, this is 4 ZK writes to commit a transaction (id,
> create, close and anchor it somewhere).
> Then you need to delete the data when the topic gets truncated. You
> need some way to correlate which data ledgers correspond to the
> messages being truncated from the topic, and then the metadata store
> needs to deal with a massive number of delete operations in a short
> time.

I also wouldn't consider table service mature enough to put ledger
> metadata on it yet. It's new, undocumented, and has very little
> production exposure so far. Making a hard dependency on table service,
> would delay transactions by at least a year. Putting transaction
> components themselves on table service is fine, as transactions are
> also a new service, but ledger metadata is the very core of the
> system.
>

I don't think it is a hard dependency. All these components should be done
by interfaces. There should be
no hard dependency on table service. The initial implementation can be
using current zookeeper based ledger storage,
so we can make sure all end-to-end logic are verified. At the same time,
other implementations can be worked on in parallel,
either on table service or on a separate implementation.


>
> > We have a few benefits 1) make sure the entries of a same
> > transaction in the same partitions are kind of continuously stored on
> > bookies. because they will be ordered by ledgerId + entryId.
>
> Clients will not care if the entries from the same transaction are
> contiguous on disk or not. Clients reading are reading from a topic,
> so unless there are multiple entries in the topic from the same
> transaction, they'll only read one entry at a time.
>

I think I know why do you think interleaving is okay now. In your mind,
transactions are carrying one message per partition.

I don't think that's true in streaming space.

A common case of transaction in streaming, is read-transfer-write: read a
batch of messages, process them and write the results to pulsar and
acknowledges the messages. If you are doing this in a 100ms window, the
data can still be large enough, especially the results can be multiple
times of the input messages. With that being said, at high throughput
transactional streaming, data can be large per transaction, continuously
storing entries of same transaction will have a huge benefit.

Another large category of use cases should be considered is "batch" data
processing. You definitely don't want your future "batch"-ish data
processing workload to jump back-and-forth in ledgers. In that way, entries
of same transaction in same partition stored continuously will huge
benefits.


>
> >2) the
> > metadata and transaction status part are handled by the table service,
> > where we are able to support large number of outstanding transactions as
> it
> > scales.
>
> How do you clean up old transactions from table service?
>

Transaction coordinators are responsible for committing, aborting and
cleaning up transactions.


>
> > We can share a ledger across transactions, however it will be similar as
> > interleaving transactions into the data ledgers. You will end up building
> > indexing, caching and compaction logic, which has already implemented on
> > bookies and table service.
>
> I think having a ledger, or a shadow topic, per topic would work fine.
> There's no need for indexing. We can already look up  an message by
> messageid. This message id should be part of the transaction commit
> message, and subsequently part of the commit marker written to each
> topic involved in the transaction.


The assumption you have here is all the message ids can be stored into
within one commit message or the commit markers.



> Caching is no different to any
> other choice of storage for the message data.


Caching behavior is very different from normal streaming case. Since
accessing those entries of same transaction will be jump back-and-forth in
the shadow topic.


> Compaction is unneeded.
>

How are you going to delete data of aborted transactions in any form of
interleaving storage?
If we don't compact, those data of aborted transactions will be left there
forever.

No matter what design we end up with, we will need to put a limit on
> the lifetime of transactions.

This will create an upper bound on the
> amount of extra time we need to retain the shadow topic relative to
> the main topic.
>

If you don't rewrite shadow topic to the main topic, you either have to do
compaction or retain the shadow topic forever, no?


>
> -Ivan
>

Re: PIP-31: Add support for transactional messaging

Posted by Ivan Kelly <iv...@apache.org>.
I think we agree on the general shape of the design as follows.

- Transactions store message data somewhere that it is not
materialized to the client immediately.
- On commit, a single message is written to some log
- Commit messages are then written to the topics of the logs in the
transaction, which point to the message data and which materialize it
to the client?

My replies inline assume the above, so if you have a different view of
the general shape let me know.

> Database is very different from a streaming system. Database storage is
> optimized for randomized
> accesses, where data tends to be organized in key + version ordering, where
> interleaving committed
> and uncommitted data is fine.
> However in a streaming system, interleaving means you have to go
> back-and-forth between transactions,

Whether we put it interleaved or in a separate ledger, we are going to
have to do a random read. The only way to avoid a random read is to
rewrite all the transaction messages after the transaction has
committed, which means write amplification.

> I was thinking of one (or multiple) ledger per partition per transaction.
> this is the simplest solution, where you can delete ledger to abort a
> transaction. However it will put a pressure to metadata, but we can address
> that by using table service to store those metadata as well as transaction
> statuses.

This is a huge amount of pressure on metadata. With the current
metadata service, this is 4 ZK writes to commit a transaction (id,
create, close and anchor it somewhere).
Then you need to delete the data when the topic gets truncated. You
need some way to correlate which data ledgers correspond to the
messages being truncated from the topic, and then the metadata store
needs to deal with a massive number of delete operations in a short
time.
I also wouldn't consider table service mature enough to put ledger
metadata on it yet. It's new, undocumented, and has very little
production exposure so far. Making a hard dependency on table service,
would delay transactions by at least a year. Putting transaction
components themselves on table service is fine, as transactions are
also a new service, but ledger metadata is the very core of the
system.

> We have a few benefits 1) make sure the entries of a same
> transaction in the same partitions are kind of continuously stored on
> bookies. because they will be ordered by ledgerId + entryId.

Clients will not care if the entries from the same transaction are
contiguous on disk or not. Clients reading are reading from a topic,
so unless there are multiple entries in the topic from the same
transaction, they'll only read one entry at a time.

>2) the
> metadata and transaction status part are handled by the table service,
> where we are able to support large number of outstanding transactions as it
> scales.

How do you clean up old transactions from table service?

> We can share a ledger across transactions, however it will be similar as
> interleaving transactions into the data ledgers. You will end up building
> indexing, caching and compaction logic, which has already implemented on
> bookies and table service.

I think having a ledger, or a shadow topic, per topic would work fine.
There's no need for indexing. We can already look up  an message by
messageid. This message id should be part of the transaction commit
message, and subsequently part of the commit marker written to each
topic involved in the transaction. Caching is no different to any
other choice of storage for the message data. Compaction is unneeded.
No matter what design we end up with, we will need to put a limit on
the lifetime of transactions. This will create an upper bound on the
amount of extra time we need to retain the shadow topic relative to
the main topic.

-Ivan

Re: PIP-31: Add support for transactional messaging

Posted by Sijie Guo <gu...@gmail.com>.
On Mon, Mar 4, 2019 at 6:15 PM Ivan Kelly <iv...@apache.org> wrote:

> > > Transactional acknowledgement also needs to be taken into account
> >
> > I don't think we have to treat `transactional acknowledgement` as a
> special
> > case. currently `acknowledgment` are actually "append" operations into
> > cursor ledgers.
> > So the problem set can be reduced as `atomic append` to both data ledgers
> > and cursor ledgers. in that way, we can use one solution for handling
> > appending data and updating cursors.
>
> Acknowledges are different though as they have to take conflicts into
> account. For publishing messages, transactions are only really a case
> of making sure if one message from the transaction is visible, then
> all messages from the transaction are visible, i.e. atomicity. There's
> no need for isolation or consistency in ACID terms because all these
> messages are independent. However, in the case of acknowledgements, we
> want to ensure that only one transaction can acknowledges a specific
> message (leaving cumulative ack aside for now). This means that before
> committing the transaction we need to check if the transaction can be
> committed. If any message acknowledged by the transaction has already
> been acknowledged at this point, then we need to abort the transaction
> as a conflict has occurred.
>

Sure for individual acks. Yes, we need the logic for checking conflicts.


>
> > Kafka's implementation is interleaving committed messages with
> uncommitted
> > messages at storage. Personally I think it is a very ugly design and
> > implementation.
>
> I don't think it's particularly ugly. Databases using MVCC do
> something similar.


Database is very different from a streaming system. Database storage is
optimized for randomized
accesses, where data tends to be organized in key + version ordering, where
interleaving committed
and uncommitted data is fine.

However in a streaming system, interleaving means you have to go
back-and-forth between transactions,
and you have to reply on either client side caching (make client
implementation much complicated), or
broker side caching (make dispatching much complicated).

Caching only works on one assumption, where transactions are live for a
very short time, and there are
no a lot of interleaving transactions. Long transactions mixing with
short-live transactions, or a lot of interleaving
transactions don't work out well.


> This has a nice property for the clients. Our
> transactional system should be optimistic, as the only cause for
> aborts will be client crashes and conflicts on acks, both of which
> should be rare events. Transactions will likely live for a very short
> time.


Kafka can optimize this on caching uncommitted data because Kafka's
consumption model is fairly straightforward
and simple. While Pulsar is relying on broker side dispatch, if we started
caching interleaving transactions at broker side,
that means we have to touch a lot on broker dispatching logic.


> So if the data is interleaved, it can be cached at the client,
> and the client can then surface it to the user when then commit
> arrives shortly afterwards, without any modification to the dispatch
> path.

It also gives clients the ability to read uncommitted data,
> though i don't know why they would want to.
>

Exposing uncommitted data is actually a very confused behavior to clients.
Even we can do so, I don't think we should expose to users.
I would prefer a much clean delivery semantic around transactions.


>
> > 1) for any transaction, write the messages to a separate ledger (or
> multiple separate ledger).
>
> When you say a separate ledger, are you talking about a ledger per
> transaction? Or is there one log for the system?
>

I was thinking of one (or multiple) ledger per partition per transaction.
this is the simplest solution, where you can delete ledger to abort a
transaction. However it will put a pressure to metadata, but we can address
that by using table service to store those metadata as well as transaction
statuses. We have a few benefits 1) make sure the entries of a same
transaction in the same partitions are kind of continuously stored on
bookies. because they will be ordered by ledgerId + entryId. 2) the
metadata and transaction status part are handled by the table service,
where we are able to support large number of outstanding transactions as it
scales.

We can share a ledger across transactions, however it will be similar as
interleaving transactions into the data ledgers. You will end up building
indexing, caching and compaction logic, which has already implemented on
bookies and table service.

- Sijie


>
> -Ivan
>

Re: PIP-31: Add support for transactional messaging

Posted by Ivan Kelly <iv...@apache.org>.
> > Transactional acknowledgement also needs to be taken into account
>
> I don't think we have to treat `transactional acknowledgement` as a special
> case. currently `acknowledgment` are actually "append" operations into
> cursor ledgers.
> So the problem set can be reduced as `atomic append` to both data ledgers
> and cursor ledgers. in that way, we can use one solution for handling
> appending data and updating cursors.

Acknowledges are different though as they have to take conflicts into
account. For publishing messages, transactions are only really a case
of making sure if one message from the transaction is visible, then
all messages from the transaction are visible, i.e. atomicity. There's
no need for isolation or consistency in ACID terms because all these
messages are independent. However, in the case of acknowledgements, we
want to ensure that only one transaction can acknowledges a specific
message (leaving cumulative ack aside for now). This means that before
committing the transaction we need to check if the transaction can be
committed. If any message acknowledged by the transaction has already
been acknowledged at this point, then we need to abort the transaction
as a conflict has occurred.

> Kafka's implementation is interleaving committed messages with uncommitted
> messages at storage. Personally I think it is a very ugly design and
> implementation.

I don't think it's particularly ugly. Databases using MVCC do
something similar. This has a nice property for the clients. Our
transactional system should be optimistic, as the only cause for
aborts will be client crashes and conflicts on acks, both of which
should be rare events. Transactions will likely live for a very short
time. So if the data is interleaved, it can be cached at the client,
and the client can then surface it to the user when then commit
arrives shortly afterwards, without any modification to the dispatch
path. It also gives clients the ability to read uncommitted data,
though i don't know why they would want to.

> 1) for any transaction, write the messages to a separate ledger (or
multiple separate ledger).

When you say a separate ledger, are you talking about a ledger per
transaction? Or is there one log for the system?

-Ivan

Re: PIP-31: Add support for transactional messaging

Posted by Sijie Guo <gu...@gmail.com>.
Richard,

Sure, I can drive the PIP. I will try to write down the details in your PIP
google doc and we can go from there.

Thanks,
Sijie

On Mon, Mar 4, 2019 at 5:01 AM Richard Yu <yo...@gmail.com>
wrote:

> Hi Dave, Mattteo, and Sijie,
>
> Thanks for pitching in on the discussion!
> Sijie, it would be great if you could drive this PIP. To be frank, I don't
> know what the best direction is.
> Oh, and Ivan, if you have any other idea. Let us know. :)
>
> If there is any changes that need to be made, edit the document
>
>
> On Sat, Mar 2, 2019 at 11:11 PM Sijie Guo <gu...@gmail.com> wrote:
>
> > Matteo, Dave,
> >
> > I think you are talking about different things. My comments to both:
> >
> > > Once there's support for transactions in messaging API, there will be
> > > no need for a base class for functions. Rather a config option will
> > > allow to enable transactional mode.
> >
> > Matteo, If I understand your comment correctly, you are talking about
> > functions using transactions for processing semantics. If so, yes that
> > would be the end goal.
> >
> > > Yes, that way there is no additional broker overhead and whatever
> happens
> > when a commit happens is under the control of those making the
> transaction.
> >
> > Dave, this sounds an interesting idea and it is definitely do-able.
> Because
> > Pulsar is a multi-layered system and it is built on top of a reliable
> > storage, so a lot of components are just "stateless", "logical" and not
> > bound to any physical machines. so when we implement a component /
> > functionality, we basically implement a logical unit. How to run the
> logic
> > unit can be very flexible. It can run as a separated service, or as part
> of
> > broker, or in functions.
> >
> > - Sijie
> >
> >
> > On Sun, Mar 3, 2019 at 10:52 AM Dave Fisher <da...@comcast.net>
> wrote:
> >
> > > Hi -
> > >
> > > > On Mar 2, 2019, at 6:39 PM, Sijie Guo <gu...@gmail.com> wrote:
> > > >
> > > > Dave,
> > > >
> > > > You mean implementing the transactions in pulsar function?
> > >
> > > Yes, that way there is no additional broker overhead and whatever
> happens
> > > when a commit happens is under the control of those making the
> > transaction.
> > >
> > > I’m not sure if it would work, but it seems that functions, spouts, and
> > > connectors make sense as opposed to burdening the highly performant
> > brokers.
> > >
> > > Regards,
> > > Dave
> > >
> > > >
> > > > - Sijie
> > > >
> > > >> On Sun, Mar 3, 2019 at 1:52 AM Dave Fisher <da...@comcast.net>
> > > wrote:
> > > >>
> > > >> Hi -
> > > >>
> > > >> Is this a case where a Pulsar function base class for transactions
> > would
> > > >> help?
> > > >>
> > > >> Regards,
> > > >> Dave
> > > >>
> > > >> Sent from my iPhone
> > > >>
> > > >>> On Mar 2, 2019, at 2:39 AM, Sijie Guo <gu...@gmail.com> wrote:
> > > >>>
> > > >>> Pravega's model is a better model than Kafka - it addressed the
> > > >>> interleaving problems. However Pravega's model is based on a giant
> > > >>> replicated log and rewrite the data to a second tiered storage for
> > > >>> persistence, which basically re-implemented bookkeeper's logic in
> > > >> broker. A
> > > >>> fundamental drawback of Pravega is write amplifications. The
> > > >> amplifications
> > > >>> of both network and IO bandwidth are huge. If you use bookkeeper
> both
> > > for
> > > >>> its first-and-second tier storage and assume the bookkeeper
> > replication
> > > >>> factor is 3, pravega requires 6x network bandwidth and 12x IO
> > > bandwidth.
> > > >>> For a given message, it needs to write 3 times into the journal,
> and
> > 3
> > > >>> times for persistent. The amplifications hugely limit the
> throughput
> > at
> > > >>> pravega "brokers".
> > > >>>
> > > >>> - Sijie
> > > >>>
> > > >>>
> > > >>>
> > > >>>> On Sat, Mar 2, 2019 at 6:13 PM Ali Ahmed <ah...@gmail.com>
> > wrote:
> > > >>>>
> > > >>>> I agree we many want to review pravega's past efforts in this area
> > > also.
> > > >>>>
> > > >>>>
> > > >>>>
> > > >>
> > >
> >
> https://github.com/pravega/pravega/blob/master/documentation/src/docs/transactions.md
> > > >>>>
> > > >>>>
> > > >>
> > >
> >
> https://github.com/pravega/pravega/blob/master/client/src/main/java/io/pravega/client/stream/Transaction.java
> > > >>>>
> > > >>>> -Ali
> > > >>>>
> > > >>>>> On Sat, Mar 2, 2019 at 1:56 AM Sijie Guo <gu...@gmail.com>
> > wrote:
> > > >>>>>
> > > >>>>> Kafka's implementation is interleaving committed messages with
> > > >>>> uncommitted
> > > >>>>> messages at storage. Personally I think it is a very ugly design
> > and
> > > >>>>> implementation.
> > > >>>>>
> > > >>>>> Pulsar is a segment centric system, where we have a shared
> segment
> > > >>>> storage
> > > >>>>> - bookkeeper. I think a better direction is to leverage the
> > segments
> > > >> (aka
> > > >>>>> ledgers)
> > > >>>>> for buffering uncommitted messages and commit the whole segment
> > when
> > > >> the
> > > >>>>> whole transaction is committed.
> > > >>>>>
> > > >>>>> A rough idea would be:
> > > >>>>>
> > > >>>>> 1) for any transaction, write the messages to a separate ledger
> (or
> > > >>>>> multiple separate ledger).
> > > >>>>> 2) during the transaction, accumulates the messages in those
> > ledgers.
> > > >>>>> 3) when commit, merge the txn ledgers back to the main data
> ledger.
> > > the
> > > >>>>> merge can be done either adding a meta message where data is
> stored
> > > in
> > > >>>> the
> > > >>>>> txn ledger or actually copying the data to data ledger (depending
> > on
> > > >> the
> > > >>>>> size of data accumulate in the transaction).
> > > >>>>> 4) when abort, delete the txn ledger. No other additional work to
> > be
> > > >>>> done.
> > > >>>>>
> > > >>>>> This would be producing a much clear design than Kafka.
> > > >>>>>
> > > >>>>> On Ivan's comments:
> > > >>>>>
> > > >>>>>> Transactional acknowledgement also needs to be taken into
> account
> > > >>>>>
> > > >>>>> I don't think we have to treat `transactional acknowledgement`
> as a
> > > >>>> special
> > > >>>>> case. currently `acknowledgment` are actually "append" operations
> > > into
> > > >>>>> cursor ledgers.
> > > >>>>> So the problem set can be reduced as `atomic append` to both data
> > > >> ledgers
> > > >>>>> and cursor ledgers. in that way, we can use one solution for
> > handling
> > > >>>>> appending data and updating cursors.
> > > >>>>>
> > > >>>>> Additionally, I think a related topic about transactions would be
> > > >>>>> supporting large sized message (e.g. >= 5MB). If we take the
> > > approach I
> > > >>>>> described above using a separated ledger for accumulating
> messages
> > > for
> > > >> a
> > > >>>>> transaction, that we are easy to model a large size message as a
> > > >>>>> transaction of chunked messages.
> > > >>>>>
> > > >>>>> @Richard, @Ivan let me know what do you think. If you guys think
> > the
> > > >>>>> direction I raised is a good one to go down, I am happy to write
> > them
> > > >>>> down
> > > >>>>> into details, and drive the design and coordinate the
> > implementations
> > > >> in
> > > >>>>> the community.
> > > >>>>>
> > > >>>>> - Sijie
> > > >>>>>
> > > >>>>> On Sat, Mar 2, 2019 at 9:45 AM Richard Yu <
> > > yohan.richard.yu@gmail.com>
> > > >>>>> wrote:
> > > >>>>>
> > > >>>>>> Hi all,
> > > >>>>>>
> > > >>>>>> We might be able to get some ideas on implementing this from
> > Kafka:
> > > >>>>>>
> > > >>>>>>
> > > >>>>>
> > > >>>>
> > > >>
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging+in+Kafka
> > > >>>>>>
> > > >>>>>> Obviously, there is some differences in Kafka and Pulsar
> internals
> > > but
> > > >>>> at
> > > >>>>>> some level, the implementation would be similar.
> > > >>>>>> It should help.
> > > >>>>>>
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> On Thu, Feb 28, 2019 at 4:29 PM Richard Yu <
> > > >> yohan.richard.yu@gmail.com
> > > >>>>>
> > > >>>>>> wrote:
> > > >>>>>>
> > > >>>>>>> Hi,
> > > >>>>>>>
> > > >>>>>>> Per request, I've created a doc so we could get some more input
> > in
> > > an
> > > >>>>>>> organized manner:
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>
> > > >>>>>
> > > >>>>
> > > >>
> > >
> >
> https://docs.google.com/document/d/1mSUsJvPgThnWizeQqljKU5244BMEiYHabA6OP6QEHmQ/edit?usp=sharing
> > > >>>>>>>
> > > >>>>>>> And for Ivan's questions, I would answer accordingly.
> > > >>>>>>>
> > > >>>>>>>> By "set the message to unknown", do you mean the broker will
> > cache
> > > >>>> the
> > > >>>>>>>> message, not writing it to any log?
> > > >>>>>>>
> > > >>>>>>> We wouldn't cache the message from my interpretation of the
> > steps.
> > > >>>> What
> > > >>>>>>> the producer is first sending is a pre-processing message, not
> > the
> > > >>>> real
> > > >>>>>>> message itself. This step basically notifies the broker that
> the
> > > >>>>> message
> > > >>>>>> is
> > > >>>>>>> on its way. So all we have to do is store the message id and
> its
> > > >>>>>>> corresponding status in a map, and depending on the producer's
> > > >>>>> response,
> > > >>>>>>> the status will change accordingly.
> > > >>>>>>>
> > > >>>>>>>> In designs we've discussed previously, this was handled
> > > >>>>>>>> by a component called the transaction coordinator, which is a
> > > >>>> logical
> > > >>>>>>>> component which each broker knows how to talk to. For a
> > > transaction
> > > >>>>>>>> the commit message is sent to the coordinator, which writes it
> > to
> > > >>>> its
> > > >>>>>>>> own log, and then goes through each topic in the commit and
> > marks
> > > >>>> the
> > > >>>>>>>> transaction as completed.
> > > >>>>>>>
> > > >>>>>>> I wasn't aware of previous discussions on this topic, but it
> > seems
> > > >>>>> pretty
> > > >>>>>>> good to me. It's certainly better than what I would come up
> with.
> > > >>>>>>> If there's any more things we need to talk about, I suppose we
> > > could
> > > >>>>> move
> > > >>>>>>> it to the google doc to play around with.
> > > >>>>>>>
> > > >>>>>>> Hope we can get this PIP rolling.
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>> On Thu, Feb 28, 2019 at 2:53 AM Sijie Guo <gu...@gmail.com>
> > > >>>> wrote:
> > > >>>>>>>
> > > >>>>>>>> Richard,
> > > >>>>>>>>
> > > >>>>>>>> Thank you for putting this put and pushing the discussion
> > forward.
> > > >>>>>>>>
> > > >>>>>>>> I think this is a very large feature. It might be worth
> > creating a
> > > >>>>>> google
> > > >>>>>>>> doc for it (which is better for collaboration). And I believe
> > Ivan
> > > >>>> has
> > > >>>>>>>> some
> > > >>>>>>>> thoughts as well. If you can put up a google doc (make it
> > > >>>>>> world-editable),
> > > >>>>>>>> Ivan can probably dump his thoughts there and we can finalize
> > the
> > > >>>>>>>> discussion and break down into tasks. So the whole community
> can
> > > >>>>>> actually
> > > >>>>>>>> work together at collaborating this.
> > > >>>>>>>>
> > > >>>>>>>> Thanks,
> > > >>>>>>>> Sijie
> > > >>>>>>>>
> > > >>>>>>>> On Thu, Feb 28, 2019 at 1:08 PM Richard Yu <
> > > >>>>> yohan.richard.yu@gmail.com>
> > > >>>>>>>> wrote:
> > > >>>>>>>>
> > > >>>>>>>>> Hi all,
> > > >>>>>>>>>
> > > >>>>>>>>> I would like to create a PIP for issue #2664 on Github. The
> > > >>>> details
> > > >>>>> of
> > > >>>>>>>> the
> > > >>>>>>>>> PIP are below.
> > > >>>>>>>>> I hope we could discuss this thoroughly.
> > > >>>>>>>>>
> > > >>>>>>>>> Cheers,
> > > >>>>>>>>> Richard
> > > >>>>>>>>>
> > > >>>>>>>>> PIP-31: Add support for transactional messaging
> > > >>>>>>>>>
> > > >>>>>>>>> Motivation: Pulsar currently could improve upon their system
> of
> > > >>>>>> sending
> > > >>>>>>>>> packets of data by implementing transactional messaging. This
> > > >>>> system
> > > >>>>>>>>> enforces eventual consistency within the system, and allows
> > > >>>>> operations
> > > >>>>>>>> to
> > > >>>>>>>>> be performed atomically.
> > > >>>>>>>>>
> > > >>>>>>>>> Proposal:
> > > >>>>>>>>>
> > > >>>>>>>>> As described in the issue, we would implement the following
> > > policy
> > > >>>>> in
> > > >>>>>>>>> Producer and Pulsar Broker:
> > > >>>>>>>>> 1. The producer produces the pre-processing transaction
> > message.
> > > >>>> At
> > > >>>>>> this
> > > >>>>>>>>> point, the broker will set the status of this message to
> > unknown.
> > > >>>>>>>>> 2. After the local transaction is successfully executed, the
> > > >>>> commit
> > > >>>>>>>> message
> > > >>>>>>>>> is sent, otherwise the rollback message is sent.
> > > >>>>>>>>> 3. The broker receives the message. If it is a commit
> message,
> > it
> > > >>>>>>>> modifies
> > > >>>>>>>>> the transaction status to commit, and then sends an actual
> > > message
> > > >>>>> to
> > > >>>>>>>> the
> > > >>>>>>>>> consumer queue. At this time, the consumer can consume the
> > > >>>> message.
> > > >>>>>>>>> Otherwise, the transaction status is modified to rollback.
> The
> > > >>>>> message
> > > >>>>>>>> will
> > > >>>>>>>>> be discarded.
> > > >>>>>>>>> 4. If at step 2, the producer is down or abnormal, at this
> > time,
> > > >>>> the
> > > >>>>>>>> broker
> > > >>>>>>>>> will periodically ask the specific producer for the status of
> > the
> > > >>>>>>>> message,
> > > >>>>>>>>> and update the status according to the producer's response,
> and
> > > >>>>>> process
> > > >>>>>>>> it
> > > >>>>>>>>> according to step 3, the action that comes down.
> > > >>>>>>>>>
> > > >>>>>>>>> Specific concerns:
> > > >>>>>>>>> There are a number of things we will improve upon or add:
> > > >>>>>>>>> - A configuration called ```maxMessageUnknownTime```.
> Consider
> > > >>>> this
> > > >>>>>>>>> scenario: the pre-processing transaction message is sent, but
> > the
> > > >>>>>>>> commit or
> > > >>>>>>>>> rollback message is never received, which could mean that the
> > > >>>> status
> > > >>>>>> of
> > > >>>>>>>> a
> > > >>>>>>>>> message would be permanently unknown. To avoid this from
> > > >>>> happening,
> > > >>>>> we
> > > >>>>>>>>> would need a config which limits the amount of time the
> status
> > of
> > > >>>> a
> > > >>>>>>>> message
> > > >>>>>>>>> could be unknown (i.e. ```maxMessageUnknownTime```) After
> that,
> > > >>>> the
> > > >>>>>>>> message
> > > >>>>>>>>> would be discarded.
> > > >>>>>>>>> - Logging would be updated to log the status of a message
> i.e.
> > > >>>>>> UNKNOWN,
> > > >>>>>>>>> ROLLBACK, or COMMITTED. This would allow the user to know
> > whether
> > > >>>> or
> > > >>>>>>>> not a
> > > >>>>>>>>> message had failed or fallen through.
> > > >>>>>>>>>
> > > >>>>>>>>> Possible Additional API:
> > > >>>>>>>>> - We would add a method which allows the user to query the
> > state
> > > >>>> of
> > > >>>>>> the
> > > >>>>>>>>> message i.e. ```getStateOfMessage(long id)```
> > > >>>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>
> > > >>>>>>
> > > >>>>>
> > > >>>>
> > > >>>>
> > > >>>> --
> > > >>>> -Ali
> > > >>>>
> > > >>
> > > >>
> > >
> > >
> >
>

Re: PIP-31: Add support for transactional messaging

Posted by Richard Yu <yo...@gmail.com>.
Hi Dave, Mattteo, and Sijie,

Thanks for pitching in on the discussion!
Sijie, it would be great if you could drive this PIP. To be frank, I don't
know what the best direction is.
Oh, and Ivan, if you have any other idea. Let us know. :)

If there is any changes that need to be made, edit the document


On Sat, Mar 2, 2019 at 11:11 PM Sijie Guo <gu...@gmail.com> wrote:

> Matteo, Dave,
>
> I think you are talking about different things. My comments to both:
>
> > Once there's support for transactions in messaging API, there will be
> > no need for a base class for functions. Rather a config option will
> > allow to enable transactional mode.
>
> Matteo, If I understand your comment correctly, you are talking about
> functions using transactions for processing semantics. If so, yes that
> would be the end goal.
>
> > Yes, that way there is no additional broker overhead and whatever happens
> when a commit happens is under the control of those making the transaction.
>
> Dave, this sounds an interesting idea and it is definitely do-able. Because
> Pulsar is a multi-layered system and it is built on top of a reliable
> storage, so a lot of components are just "stateless", "logical" and not
> bound to any physical machines. so when we implement a component /
> functionality, we basically implement a logical unit. How to run the logic
> unit can be very flexible. It can run as a separated service, or as part of
> broker, or in functions.
>
> - Sijie
>
>
> On Sun, Mar 3, 2019 at 10:52 AM Dave Fisher <da...@comcast.net> wrote:
>
> > Hi -
> >
> > > On Mar 2, 2019, at 6:39 PM, Sijie Guo <gu...@gmail.com> wrote:
> > >
> > > Dave,
> > >
> > > You mean implementing the transactions in pulsar function?
> >
> > Yes, that way there is no additional broker overhead and whatever happens
> > when a commit happens is under the control of those making the
> transaction.
> >
> > I’m not sure if it would work, but it seems that functions, spouts, and
> > connectors make sense as opposed to burdening the highly performant
> brokers.
> >
> > Regards,
> > Dave
> >
> > >
> > > - Sijie
> > >
> > >> On Sun, Mar 3, 2019 at 1:52 AM Dave Fisher <da...@comcast.net>
> > wrote:
> > >>
> > >> Hi -
> > >>
> > >> Is this a case where a Pulsar function base class for transactions
> would
> > >> help?
> > >>
> > >> Regards,
> > >> Dave
> > >>
> > >> Sent from my iPhone
> > >>
> > >>> On Mar 2, 2019, at 2:39 AM, Sijie Guo <gu...@gmail.com> wrote:
> > >>>
> > >>> Pravega's model is a better model than Kafka - it addressed the
> > >>> interleaving problems. However Pravega's model is based on a giant
> > >>> replicated log and rewrite the data to a second tiered storage for
> > >>> persistence, which basically re-implemented bookkeeper's logic in
> > >> broker. A
> > >>> fundamental drawback of Pravega is write amplifications. The
> > >> amplifications
> > >>> of both network and IO bandwidth are huge. If you use bookkeeper both
> > for
> > >>> its first-and-second tier storage and assume the bookkeeper
> replication
> > >>> factor is 3, pravega requires 6x network bandwidth and 12x IO
> > bandwidth.
> > >>> For a given message, it needs to write 3 times into the journal, and
> 3
> > >>> times for persistent. The amplifications hugely limit the throughput
> at
> > >>> pravega "brokers".
> > >>>
> > >>> - Sijie
> > >>>
> > >>>
> > >>>
> > >>>> On Sat, Mar 2, 2019 at 6:13 PM Ali Ahmed <ah...@gmail.com>
> wrote:
> > >>>>
> > >>>> I agree we many want to review pravega's past efforts in this area
> > also.
> > >>>>
> > >>>>
> > >>>>
> > >>
> >
> https://github.com/pravega/pravega/blob/master/documentation/src/docs/transactions.md
> > >>>>
> > >>>>
> > >>
> >
> https://github.com/pravega/pravega/blob/master/client/src/main/java/io/pravega/client/stream/Transaction.java
> > >>>>
> > >>>> -Ali
> > >>>>
> > >>>>> On Sat, Mar 2, 2019 at 1:56 AM Sijie Guo <gu...@gmail.com>
> wrote:
> > >>>>>
> > >>>>> Kafka's implementation is interleaving committed messages with
> > >>>> uncommitted
> > >>>>> messages at storage. Personally I think it is a very ugly design
> and
> > >>>>> implementation.
> > >>>>>
> > >>>>> Pulsar is a segment centric system, where we have a shared segment
> > >>>> storage
> > >>>>> - bookkeeper. I think a better direction is to leverage the
> segments
> > >> (aka
> > >>>>> ledgers)
> > >>>>> for buffering uncommitted messages and commit the whole segment
> when
> > >> the
> > >>>>> whole transaction is committed.
> > >>>>>
> > >>>>> A rough idea would be:
> > >>>>>
> > >>>>> 1) for any transaction, write the messages to a separate ledger (or
> > >>>>> multiple separate ledger).
> > >>>>> 2) during the transaction, accumulates the messages in those
> ledgers.
> > >>>>> 3) when commit, merge the txn ledgers back to the main data ledger.
> > the
> > >>>>> merge can be done either adding a meta message where data is stored
> > in
> > >>>> the
> > >>>>> txn ledger or actually copying the data to data ledger (depending
> on
> > >> the
> > >>>>> size of data accumulate in the transaction).
> > >>>>> 4) when abort, delete the txn ledger. No other additional work to
> be
> > >>>> done.
> > >>>>>
> > >>>>> This would be producing a much clear design than Kafka.
> > >>>>>
> > >>>>> On Ivan's comments:
> > >>>>>
> > >>>>>> Transactional acknowledgement also needs to be taken into account
> > >>>>>
> > >>>>> I don't think we have to treat `transactional acknowledgement` as a
> > >>>> special
> > >>>>> case. currently `acknowledgment` are actually "append" operations
> > into
> > >>>>> cursor ledgers.
> > >>>>> So the problem set can be reduced as `atomic append` to both data
> > >> ledgers
> > >>>>> and cursor ledgers. in that way, we can use one solution for
> handling
> > >>>>> appending data and updating cursors.
> > >>>>>
> > >>>>> Additionally, I think a related topic about transactions would be
> > >>>>> supporting large sized message (e.g. >= 5MB). If we take the
> > approach I
> > >>>>> described above using a separated ledger for accumulating messages
> > for
> > >> a
> > >>>>> transaction, that we are easy to model a large size message as a
> > >>>>> transaction of chunked messages.
> > >>>>>
> > >>>>> @Richard, @Ivan let me know what do you think. If you guys think
> the
> > >>>>> direction I raised is a good one to go down, I am happy to write
> them
> > >>>> down
> > >>>>> into details, and drive the design and coordinate the
> implementations
> > >> in
> > >>>>> the community.
> > >>>>>
> > >>>>> - Sijie
> > >>>>>
> > >>>>> On Sat, Mar 2, 2019 at 9:45 AM Richard Yu <
> > yohan.richard.yu@gmail.com>
> > >>>>> wrote:
> > >>>>>
> > >>>>>> Hi all,
> > >>>>>>
> > >>>>>> We might be able to get some ideas on implementing this from
> Kafka:
> > >>>>>>
> > >>>>>>
> > >>>>>
> > >>>>
> > >>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging+in+Kafka
> > >>>>>>
> > >>>>>> Obviously, there is some differences in Kafka and Pulsar internals
> > but
> > >>>> at
> > >>>>>> some level, the implementation would be similar.
> > >>>>>> It should help.
> > >>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>> On Thu, Feb 28, 2019 at 4:29 PM Richard Yu <
> > >> yohan.richard.yu@gmail.com
> > >>>>>
> > >>>>>> wrote:
> > >>>>>>
> > >>>>>>> Hi,
> > >>>>>>>
> > >>>>>>> Per request, I've created a doc so we could get some more input
> in
> > an
> > >>>>>>> organized manner:
> > >>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>
> > >>>>
> > >>
> >
> https://docs.google.com/document/d/1mSUsJvPgThnWizeQqljKU5244BMEiYHabA6OP6QEHmQ/edit?usp=sharing
> > >>>>>>>
> > >>>>>>> And for Ivan's questions, I would answer accordingly.
> > >>>>>>>
> > >>>>>>>> By "set the message to unknown", do you mean the broker will
> cache
> > >>>> the
> > >>>>>>>> message, not writing it to any log?
> > >>>>>>>
> > >>>>>>> We wouldn't cache the message from my interpretation of the
> steps.
> > >>>> What
> > >>>>>>> the producer is first sending is a pre-processing message, not
> the
> > >>>> real
> > >>>>>>> message itself. This step basically notifies the broker that the
> > >>>>> message
> > >>>>>> is
> > >>>>>>> on its way. So all we have to do is store the message id and its
> > >>>>>>> corresponding status in a map, and depending on the producer's
> > >>>>> response,
> > >>>>>>> the status will change accordingly.
> > >>>>>>>
> > >>>>>>>> In designs we've discussed previously, this was handled
> > >>>>>>>> by a component called the transaction coordinator, which is a
> > >>>> logical
> > >>>>>>>> component which each broker knows how to talk to. For a
> > transaction
> > >>>>>>>> the commit message is sent to the coordinator, which writes it
> to
> > >>>> its
> > >>>>>>>> own log, and then goes through each topic in the commit and
> marks
> > >>>> the
> > >>>>>>>> transaction as completed.
> > >>>>>>>
> > >>>>>>> I wasn't aware of previous discussions on this topic, but it
> seems
> > >>>>> pretty
> > >>>>>>> good to me. It's certainly better than what I would come up with.
> > >>>>>>> If there's any more things we need to talk about, I suppose we
> > could
> > >>>>> move
> > >>>>>>> it to the google doc to play around with.
> > >>>>>>>
> > >>>>>>> Hope we can get this PIP rolling.
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> On Thu, Feb 28, 2019 at 2:53 AM Sijie Guo <gu...@gmail.com>
> > >>>> wrote:
> > >>>>>>>
> > >>>>>>>> Richard,
> > >>>>>>>>
> > >>>>>>>> Thank you for putting this put and pushing the discussion
> forward.
> > >>>>>>>>
> > >>>>>>>> I think this is a very large feature. It might be worth
> creating a
> > >>>>>> google
> > >>>>>>>> doc for it (which is better for collaboration). And I believe
> Ivan
> > >>>> has
> > >>>>>>>> some
> > >>>>>>>> thoughts as well. If you can put up a google doc (make it
> > >>>>>> world-editable),
> > >>>>>>>> Ivan can probably dump his thoughts there and we can finalize
> the
> > >>>>>>>> discussion and break down into tasks. So the whole community can
> > >>>>>> actually
> > >>>>>>>> work together at collaborating this.
> > >>>>>>>>
> > >>>>>>>> Thanks,
> > >>>>>>>> Sijie
> > >>>>>>>>
> > >>>>>>>> On Thu, Feb 28, 2019 at 1:08 PM Richard Yu <
> > >>>>> yohan.richard.yu@gmail.com>
> > >>>>>>>> wrote:
> > >>>>>>>>
> > >>>>>>>>> Hi all,
> > >>>>>>>>>
> > >>>>>>>>> I would like to create a PIP for issue #2664 on Github. The
> > >>>> details
> > >>>>> of
> > >>>>>>>> the
> > >>>>>>>>> PIP are below.
> > >>>>>>>>> I hope we could discuss this thoroughly.
> > >>>>>>>>>
> > >>>>>>>>> Cheers,
> > >>>>>>>>> Richard
> > >>>>>>>>>
> > >>>>>>>>> PIP-31: Add support for transactional messaging
> > >>>>>>>>>
> > >>>>>>>>> Motivation: Pulsar currently could improve upon their system of
> > >>>>>> sending
> > >>>>>>>>> packets of data by implementing transactional messaging. This
> > >>>> system
> > >>>>>>>>> enforces eventual consistency within the system, and allows
> > >>>>> operations
> > >>>>>>>> to
> > >>>>>>>>> be performed atomically.
> > >>>>>>>>>
> > >>>>>>>>> Proposal:
> > >>>>>>>>>
> > >>>>>>>>> As described in the issue, we would implement the following
> > policy
> > >>>>> in
> > >>>>>>>>> Producer and Pulsar Broker:
> > >>>>>>>>> 1. The producer produces the pre-processing transaction
> message.
> > >>>> At
> > >>>>>> this
> > >>>>>>>>> point, the broker will set the status of this message to
> unknown.
> > >>>>>>>>> 2. After the local transaction is successfully executed, the
> > >>>> commit
> > >>>>>>>> message
> > >>>>>>>>> is sent, otherwise the rollback message is sent.
> > >>>>>>>>> 3. The broker receives the message. If it is a commit message,
> it
> > >>>>>>>> modifies
> > >>>>>>>>> the transaction status to commit, and then sends an actual
> > message
> > >>>>> to
> > >>>>>>>> the
> > >>>>>>>>> consumer queue. At this time, the consumer can consume the
> > >>>> message.
> > >>>>>>>>> Otherwise, the transaction status is modified to rollback. The
> > >>>>> message
> > >>>>>>>> will
> > >>>>>>>>> be discarded.
> > >>>>>>>>> 4. If at step 2, the producer is down or abnormal, at this
> time,
> > >>>> the
> > >>>>>>>> broker
> > >>>>>>>>> will periodically ask the specific producer for the status of
> the
> > >>>>>>>> message,
> > >>>>>>>>> and update the status according to the producer's response, and
> > >>>>>> process
> > >>>>>>>> it
> > >>>>>>>>> according to step 3, the action that comes down.
> > >>>>>>>>>
> > >>>>>>>>> Specific concerns:
> > >>>>>>>>> There are a number of things we will improve upon or add:
> > >>>>>>>>> - A configuration called ```maxMessageUnknownTime```. Consider
> > >>>> this
> > >>>>>>>>> scenario: the pre-processing transaction message is sent, but
> the
> > >>>>>>>> commit or
> > >>>>>>>>> rollback message is never received, which could mean that the
> > >>>> status
> > >>>>>> of
> > >>>>>>>> a
> > >>>>>>>>> message would be permanently unknown. To avoid this from
> > >>>> happening,
> > >>>>> we
> > >>>>>>>>> would need a config which limits the amount of time the status
> of
> > >>>> a
> > >>>>>>>> message
> > >>>>>>>>> could be unknown (i.e. ```maxMessageUnknownTime```) After that,
> > >>>> the
> > >>>>>>>> message
> > >>>>>>>>> would be discarded.
> > >>>>>>>>> - Logging would be updated to log the status of a message i.e.
> > >>>>>> UNKNOWN,
> > >>>>>>>>> ROLLBACK, or COMMITTED. This would allow the user to know
> whether
> > >>>> or
> > >>>>>>>> not a
> > >>>>>>>>> message had failed or fallen through.
> > >>>>>>>>>
> > >>>>>>>>> Possible Additional API:
> > >>>>>>>>> - We would add a method which allows the user to query the
> state
> > >>>> of
> > >>>>>> the
> > >>>>>>>>> message i.e. ```getStateOfMessage(long id)```
> > >>>>>>>>>
> > >>>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>
> > >>>>
> > >>>>
> > >>>> --
> > >>>> -Ali
> > >>>>
> > >>
> > >>
> >
> >
>

Re: PIP-31: Add support for transactional messaging

Posted by Sijie Guo <gu...@gmail.com>.
Matteo, Dave,

I think you are talking about different things. My comments to both:

> Once there's support for transactions in messaging API, there will be
> no need for a base class for functions. Rather a config option will
> allow to enable transactional mode.

Matteo, If I understand your comment correctly, you are talking about
functions using transactions for processing semantics. If so, yes that
would be the end goal.

> Yes, that way there is no additional broker overhead and whatever happens
when a commit happens is under the control of those making the transaction.

Dave, this sounds an interesting idea and it is definitely do-able. Because
Pulsar is a multi-layered system and it is built on top of a reliable
storage, so a lot of components are just "stateless", "logical" and not
bound to any physical machines. so when we implement a component /
functionality, we basically implement a logical unit. How to run the logic
unit can be very flexible. It can run as a separated service, or as part of
broker, or in functions.

- Sijie


On Sun, Mar 3, 2019 at 10:52 AM Dave Fisher <da...@comcast.net> wrote:

> Hi -
>
> > On Mar 2, 2019, at 6:39 PM, Sijie Guo <gu...@gmail.com> wrote:
> >
> > Dave,
> >
> > You mean implementing the transactions in pulsar function?
>
> Yes, that way there is no additional broker overhead and whatever happens
> when a commit happens is under the control of those making the transaction.
>
> I’m not sure if it would work, but it seems that functions, spouts, and
> connectors make sense as opposed to burdening the highly performant brokers.
>
> Regards,
> Dave
>
> >
> > - Sijie
> >
> >> On Sun, Mar 3, 2019 at 1:52 AM Dave Fisher <da...@comcast.net>
> wrote:
> >>
> >> Hi -
> >>
> >> Is this a case where a Pulsar function base class for transactions would
> >> help?
> >>
> >> Regards,
> >> Dave
> >>
> >> Sent from my iPhone
> >>
> >>> On Mar 2, 2019, at 2:39 AM, Sijie Guo <gu...@gmail.com> wrote:
> >>>
> >>> Pravega's model is a better model than Kafka - it addressed the
> >>> interleaving problems. However Pravega's model is based on a giant
> >>> replicated log and rewrite the data to a second tiered storage for
> >>> persistence, which basically re-implemented bookkeeper's logic in
> >> broker. A
> >>> fundamental drawback of Pravega is write amplifications. The
> >> amplifications
> >>> of both network and IO bandwidth are huge. If you use bookkeeper both
> for
> >>> its first-and-second tier storage and assume the bookkeeper replication
> >>> factor is 3, pravega requires 6x network bandwidth and 12x IO
> bandwidth.
> >>> For a given message, it needs to write 3 times into the journal, and 3
> >>> times for persistent. The amplifications hugely limit the throughput at
> >>> pravega "brokers".
> >>>
> >>> - Sijie
> >>>
> >>>
> >>>
> >>>> On Sat, Mar 2, 2019 at 6:13 PM Ali Ahmed <ah...@gmail.com> wrote:
> >>>>
> >>>> I agree we many want to review pravega's past efforts in this area
> also.
> >>>>
> >>>>
> >>>>
> >>
> https://github.com/pravega/pravega/blob/master/documentation/src/docs/transactions.md
> >>>>
> >>>>
> >>
> https://github.com/pravega/pravega/blob/master/client/src/main/java/io/pravega/client/stream/Transaction.java
> >>>>
> >>>> -Ali
> >>>>
> >>>>> On Sat, Mar 2, 2019 at 1:56 AM Sijie Guo <gu...@gmail.com> wrote:
> >>>>>
> >>>>> Kafka's implementation is interleaving committed messages with
> >>>> uncommitted
> >>>>> messages at storage. Personally I think it is a very ugly design and
> >>>>> implementation.
> >>>>>
> >>>>> Pulsar is a segment centric system, where we have a shared segment
> >>>> storage
> >>>>> - bookkeeper. I think a better direction is to leverage the segments
> >> (aka
> >>>>> ledgers)
> >>>>> for buffering uncommitted messages and commit the whole segment when
> >> the
> >>>>> whole transaction is committed.
> >>>>>
> >>>>> A rough idea would be:
> >>>>>
> >>>>> 1) for any transaction, write the messages to a separate ledger (or
> >>>>> multiple separate ledger).
> >>>>> 2) during the transaction, accumulates the messages in those ledgers.
> >>>>> 3) when commit, merge the txn ledgers back to the main data ledger.
> the
> >>>>> merge can be done either adding a meta message where data is stored
> in
> >>>> the
> >>>>> txn ledger or actually copying the data to data ledger (depending on
> >> the
> >>>>> size of data accumulate in the transaction).
> >>>>> 4) when abort, delete the txn ledger. No other additional work to be
> >>>> done.
> >>>>>
> >>>>> This would be producing a much clear design than Kafka.
> >>>>>
> >>>>> On Ivan's comments:
> >>>>>
> >>>>>> Transactional acknowledgement also needs to be taken into account
> >>>>>
> >>>>> I don't think we have to treat `transactional acknowledgement` as a
> >>>> special
> >>>>> case. currently `acknowledgment` are actually "append" operations
> into
> >>>>> cursor ledgers.
> >>>>> So the problem set can be reduced as `atomic append` to both data
> >> ledgers
> >>>>> and cursor ledgers. in that way, we can use one solution for handling
> >>>>> appending data and updating cursors.
> >>>>>
> >>>>> Additionally, I think a related topic about transactions would be
> >>>>> supporting large sized message (e.g. >= 5MB). If we take the
> approach I
> >>>>> described above using a separated ledger for accumulating messages
> for
> >> a
> >>>>> transaction, that we are easy to model a large size message as a
> >>>>> transaction of chunked messages.
> >>>>>
> >>>>> @Richard, @Ivan let me know what do you think. If you guys think the
> >>>>> direction I raised is a good one to go down, I am happy to write them
> >>>> down
> >>>>> into details, and drive the design and coordinate the implementations
> >> in
> >>>>> the community.
> >>>>>
> >>>>> - Sijie
> >>>>>
> >>>>> On Sat, Mar 2, 2019 at 9:45 AM Richard Yu <
> yohan.richard.yu@gmail.com>
> >>>>> wrote:
> >>>>>
> >>>>>> Hi all,
> >>>>>>
> >>>>>> We might be able to get some ideas on implementing this from Kafka:
> >>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging+in+Kafka
> >>>>>>
> >>>>>> Obviously, there is some differences in Kafka and Pulsar internals
> but
> >>>> at
> >>>>>> some level, the implementation would be similar.
> >>>>>> It should help.
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> On Thu, Feb 28, 2019 at 4:29 PM Richard Yu <
> >> yohan.richard.yu@gmail.com
> >>>>>
> >>>>>> wrote:
> >>>>>>
> >>>>>>> Hi,
> >>>>>>>
> >>>>>>> Per request, I've created a doc so we could get some more input in
> an
> >>>>>>> organized manner:
> >>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>
> https://docs.google.com/document/d/1mSUsJvPgThnWizeQqljKU5244BMEiYHabA6OP6QEHmQ/edit?usp=sharing
> >>>>>>>
> >>>>>>> And for Ivan's questions, I would answer accordingly.
> >>>>>>>
> >>>>>>>> By "set the message to unknown", do you mean the broker will cache
> >>>> the
> >>>>>>>> message, not writing it to any log?
> >>>>>>>
> >>>>>>> We wouldn't cache the message from my interpretation of the steps.
> >>>> What
> >>>>>>> the producer is first sending is a pre-processing message, not the
> >>>> real
> >>>>>>> message itself. This step basically notifies the broker that the
> >>>>> message
> >>>>>> is
> >>>>>>> on its way. So all we have to do is store the message id and its
> >>>>>>> corresponding status in a map, and depending on the producer's
> >>>>> response,
> >>>>>>> the status will change accordingly.
> >>>>>>>
> >>>>>>>> In designs we've discussed previously, this was handled
> >>>>>>>> by a component called the transaction coordinator, which is a
> >>>> logical
> >>>>>>>> component which each broker knows how to talk to. For a
> transaction
> >>>>>>>> the commit message is sent to the coordinator, which writes it to
> >>>> its
> >>>>>>>> own log, and then goes through each topic in the commit and marks
> >>>> the
> >>>>>>>> transaction as completed.
> >>>>>>>
> >>>>>>> I wasn't aware of previous discussions on this topic, but it seems
> >>>>> pretty
> >>>>>>> good to me. It's certainly better than what I would come up with.
> >>>>>>> If there's any more things we need to talk about, I suppose we
> could
> >>>>> move
> >>>>>>> it to the google doc to play around with.
> >>>>>>>
> >>>>>>> Hope we can get this PIP rolling.
> >>>>>>>
> >>>>>>>
> >>>>>>> On Thu, Feb 28, 2019 at 2:53 AM Sijie Guo <gu...@gmail.com>
> >>>> wrote:
> >>>>>>>
> >>>>>>>> Richard,
> >>>>>>>>
> >>>>>>>> Thank you for putting this put and pushing the discussion forward.
> >>>>>>>>
> >>>>>>>> I think this is a very large feature. It might be worth creating a
> >>>>>> google
> >>>>>>>> doc for it (which is better for collaboration). And I believe Ivan
> >>>> has
> >>>>>>>> some
> >>>>>>>> thoughts as well. If you can put up a google doc (make it
> >>>>>> world-editable),
> >>>>>>>> Ivan can probably dump his thoughts there and we can finalize the
> >>>>>>>> discussion and break down into tasks. So the whole community can
> >>>>>> actually
> >>>>>>>> work together at collaborating this.
> >>>>>>>>
> >>>>>>>> Thanks,
> >>>>>>>> Sijie
> >>>>>>>>
> >>>>>>>> On Thu, Feb 28, 2019 at 1:08 PM Richard Yu <
> >>>>> yohan.richard.yu@gmail.com>
> >>>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Hi all,
> >>>>>>>>>
> >>>>>>>>> I would like to create a PIP for issue #2664 on Github. The
> >>>> details
> >>>>> of
> >>>>>>>> the
> >>>>>>>>> PIP are below.
> >>>>>>>>> I hope we could discuss this thoroughly.
> >>>>>>>>>
> >>>>>>>>> Cheers,
> >>>>>>>>> Richard
> >>>>>>>>>
> >>>>>>>>> PIP-31: Add support for transactional messaging
> >>>>>>>>>
> >>>>>>>>> Motivation: Pulsar currently could improve upon their system of
> >>>>>> sending
> >>>>>>>>> packets of data by implementing transactional messaging. This
> >>>> system
> >>>>>>>>> enforces eventual consistency within the system, and allows
> >>>>> operations
> >>>>>>>> to
> >>>>>>>>> be performed atomically.
> >>>>>>>>>
> >>>>>>>>> Proposal:
> >>>>>>>>>
> >>>>>>>>> As described in the issue, we would implement the following
> policy
> >>>>> in
> >>>>>>>>> Producer and Pulsar Broker:
> >>>>>>>>> 1. The producer produces the pre-processing transaction message.
> >>>> At
> >>>>>> this
> >>>>>>>>> point, the broker will set the status of this message to unknown.
> >>>>>>>>> 2. After the local transaction is successfully executed, the
> >>>> commit
> >>>>>>>> message
> >>>>>>>>> is sent, otherwise the rollback message is sent.
> >>>>>>>>> 3. The broker receives the message. If it is a commit message, it
> >>>>>>>> modifies
> >>>>>>>>> the transaction status to commit, and then sends an actual
> message
> >>>>> to
> >>>>>>>> the
> >>>>>>>>> consumer queue. At this time, the consumer can consume the
> >>>> message.
> >>>>>>>>> Otherwise, the transaction status is modified to rollback. The
> >>>>> message
> >>>>>>>> will
> >>>>>>>>> be discarded.
> >>>>>>>>> 4. If at step 2, the producer is down or abnormal, at this time,
> >>>> the
> >>>>>>>> broker
> >>>>>>>>> will periodically ask the specific producer for the status of the
> >>>>>>>> message,
> >>>>>>>>> and update the status according to the producer's response, and
> >>>>>> process
> >>>>>>>> it
> >>>>>>>>> according to step 3, the action that comes down.
> >>>>>>>>>
> >>>>>>>>> Specific concerns:
> >>>>>>>>> There are a number of things we will improve upon or add:
> >>>>>>>>> - A configuration called ```maxMessageUnknownTime```. Consider
> >>>> this
> >>>>>>>>> scenario: the pre-processing transaction message is sent, but the
> >>>>>>>> commit or
> >>>>>>>>> rollback message is never received, which could mean that the
> >>>> status
> >>>>>> of
> >>>>>>>> a
> >>>>>>>>> message would be permanently unknown. To avoid this from
> >>>> happening,
> >>>>> we
> >>>>>>>>> would need a config which limits the amount of time the status of
> >>>> a
> >>>>>>>> message
> >>>>>>>>> could be unknown (i.e. ```maxMessageUnknownTime```) After that,
> >>>> the
> >>>>>>>> message
> >>>>>>>>> would be discarded.
> >>>>>>>>> - Logging would be updated to log the status of a message i.e.
> >>>>>> UNKNOWN,
> >>>>>>>>> ROLLBACK, or COMMITTED. This would allow the user to know whether
> >>>> or
> >>>>>>>> not a
> >>>>>>>>> message had failed or fallen through.
> >>>>>>>>>
> >>>>>>>>> Possible Additional API:
> >>>>>>>>> - We would add a method which allows the user to query the state
> >>>> of
> >>>>>> the
> >>>>>>>>> message i.e. ```getStateOfMessage(long id)```
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>>
> >>>> --
> >>>> -Ali
> >>>>
> >>
> >>
>
>

Re: PIP-31: Add support for transactional messaging

Posted by Dave Fisher <da...@comcast.net>.
Hi -

> On Mar 2, 2019, at 6:39 PM, Sijie Guo <gu...@gmail.com> wrote:
> 
> Dave,
> 
> You mean implementing the transactions in pulsar function?

Yes, that way there is no additional broker overhead and whatever happens when a commit happens is under the control of those making the transaction.

I’m not sure if it would work, but it seems that functions, spouts, and connectors make sense as opposed to burdening the highly performant brokers.

Regards,
Dave

> 
> - Sijie
> 
>> On Sun, Mar 3, 2019 at 1:52 AM Dave Fisher <da...@comcast.net> wrote:
>> 
>> Hi -
>> 
>> Is this a case where a Pulsar function base class for transactions would
>> help?
>> 
>> Regards,
>> Dave
>> 
>> Sent from my iPhone
>> 
>>> On Mar 2, 2019, at 2:39 AM, Sijie Guo <gu...@gmail.com> wrote:
>>> 
>>> Pravega's model is a better model than Kafka - it addressed the
>>> interleaving problems. However Pravega's model is based on a giant
>>> replicated log and rewrite the data to a second tiered storage for
>>> persistence, which basically re-implemented bookkeeper's logic in
>> broker. A
>>> fundamental drawback of Pravega is write amplifications. The
>> amplifications
>>> of both network and IO bandwidth are huge. If you use bookkeeper both for
>>> its first-and-second tier storage and assume the bookkeeper replication
>>> factor is 3, pravega requires 6x network bandwidth and 12x IO bandwidth.
>>> For a given message, it needs to write 3 times into the journal, and 3
>>> times for persistent. The amplifications hugely limit the throughput at
>>> pravega "brokers".
>>> 
>>> - Sijie
>>> 
>>> 
>>> 
>>>> On Sat, Mar 2, 2019 at 6:13 PM Ali Ahmed <ah...@gmail.com> wrote:
>>>> 
>>>> I agree we many want to review pravega's past efforts in this area also.
>>>> 
>>>> 
>>>> 
>> https://github.com/pravega/pravega/blob/master/documentation/src/docs/transactions.md
>>>> 
>>>> 
>> https://github.com/pravega/pravega/blob/master/client/src/main/java/io/pravega/client/stream/Transaction.java
>>>> 
>>>> -Ali
>>>> 
>>>>> On Sat, Mar 2, 2019 at 1:56 AM Sijie Guo <gu...@gmail.com> wrote:
>>>>> 
>>>>> Kafka's implementation is interleaving committed messages with
>>>> uncommitted
>>>>> messages at storage. Personally I think it is a very ugly design and
>>>>> implementation.
>>>>> 
>>>>> Pulsar is a segment centric system, where we have a shared segment
>>>> storage
>>>>> - bookkeeper. I think a better direction is to leverage the segments
>> (aka
>>>>> ledgers)
>>>>> for buffering uncommitted messages and commit the whole segment when
>> the
>>>>> whole transaction is committed.
>>>>> 
>>>>> A rough idea would be:
>>>>> 
>>>>> 1) for any transaction, write the messages to a separate ledger (or
>>>>> multiple separate ledger).
>>>>> 2) during the transaction, accumulates the messages in those ledgers.
>>>>> 3) when commit, merge the txn ledgers back to the main data ledger. the
>>>>> merge can be done either adding a meta message where data is stored in
>>>> the
>>>>> txn ledger or actually copying the data to data ledger (depending on
>> the
>>>>> size of data accumulate in the transaction).
>>>>> 4) when abort, delete the txn ledger. No other additional work to be
>>>> done.
>>>>> 
>>>>> This would be producing a much clear design than Kafka.
>>>>> 
>>>>> On Ivan's comments:
>>>>> 
>>>>>> Transactional acknowledgement also needs to be taken into account
>>>>> 
>>>>> I don't think we have to treat `transactional acknowledgement` as a
>>>> special
>>>>> case. currently `acknowledgment` are actually "append" operations into
>>>>> cursor ledgers.
>>>>> So the problem set can be reduced as `atomic append` to both data
>> ledgers
>>>>> and cursor ledgers. in that way, we can use one solution for handling
>>>>> appending data and updating cursors.
>>>>> 
>>>>> Additionally, I think a related topic about transactions would be
>>>>> supporting large sized message (e.g. >= 5MB). If we take the approach I
>>>>> described above using a separated ledger for accumulating messages for
>> a
>>>>> transaction, that we are easy to model a large size message as a
>>>>> transaction of chunked messages.
>>>>> 
>>>>> @Richard, @Ivan let me know what do you think. If you guys think the
>>>>> direction I raised is a good one to go down, I am happy to write them
>>>> down
>>>>> into details, and drive the design and coordinate the implementations
>> in
>>>>> the community.
>>>>> 
>>>>> - Sijie
>>>>> 
>>>>> On Sat, Mar 2, 2019 at 9:45 AM Richard Yu <yo...@gmail.com>
>>>>> wrote:
>>>>> 
>>>>>> Hi all,
>>>>>> 
>>>>>> We might be able to get some ideas on implementing this from Kafka:
>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>> https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging+in+Kafka
>>>>>> 
>>>>>> Obviously, there is some differences in Kafka and Pulsar internals but
>>>> at
>>>>>> some level, the implementation would be similar.
>>>>>> It should help.
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> On Thu, Feb 28, 2019 at 4:29 PM Richard Yu <
>> yohan.richard.yu@gmail.com
>>>>> 
>>>>>> wrote:
>>>>>> 
>>>>>>> Hi,
>>>>>>> 
>>>>>>> Per request, I've created a doc so we could get some more input in an
>>>>>>> organized manner:
>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>> https://docs.google.com/document/d/1mSUsJvPgThnWizeQqljKU5244BMEiYHabA6OP6QEHmQ/edit?usp=sharing
>>>>>>> 
>>>>>>> And for Ivan's questions, I would answer accordingly.
>>>>>>> 
>>>>>>>> By "set the message to unknown", do you mean the broker will cache
>>>> the
>>>>>>>> message, not writing it to any log?
>>>>>>> 
>>>>>>> We wouldn't cache the message from my interpretation of the steps.
>>>> What
>>>>>>> the producer is first sending is a pre-processing message, not the
>>>> real
>>>>>>> message itself. This step basically notifies the broker that the
>>>>> message
>>>>>> is
>>>>>>> on its way. So all we have to do is store the message id and its
>>>>>>> corresponding status in a map, and depending on the producer's
>>>>> response,
>>>>>>> the status will change accordingly.
>>>>>>> 
>>>>>>>> In designs we've discussed previously, this was handled
>>>>>>>> by a component called the transaction coordinator, which is a
>>>> logical
>>>>>>>> component which each broker knows how to talk to. For a transaction
>>>>>>>> the commit message is sent to the coordinator, which writes it to
>>>> its
>>>>>>>> own log, and then goes through each topic in the commit and marks
>>>> the
>>>>>>>> transaction as completed.
>>>>>>> 
>>>>>>> I wasn't aware of previous discussions on this topic, but it seems
>>>>> pretty
>>>>>>> good to me. It's certainly better than what I would come up with.
>>>>>>> If there's any more things we need to talk about, I suppose we could
>>>>> move
>>>>>>> it to the google doc to play around with.
>>>>>>> 
>>>>>>> Hope we can get this PIP rolling.
>>>>>>> 
>>>>>>> 
>>>>>>> On Thu, Feb 28, 2019 at 2:53 AM Sijie Guo <gu...@gmail.com>
>>>> wrote:
>>>>>>> 
>>>>>>>> Richard,
>>>>>>>> 
>>>>>>>> Thank you for putting this put and pushing the discussion forward.
>>>>>>>> 
>>>>>>>> I think this is a very large feature. It might be worth creating a
>>>>>> google
>>>>>>>> doc for it (which is better for collaboration). And I believe Ivan
>>>> has
>>>>>>>> some
>>>>>>>> thoughts as well. If you can put up a google doc (make it
>>>>>> world-editable),
>>>>>>>> Ivan can probably dump his thoughts there and we can finalize the
>>>>>>>> discussion and break down into tasks. So the whole community can
>>>>>> actually
>>>>>>>> work together at collaborating this.
>>>>>>>> 
>>>>>>>> Thanks,
>>>>>>>> Sijie
>>>>>>>> 
>>>>>>>> On Thu, Feb 28, 2019 at 1:08 PM Richard Yu <
>>>>> yohan.richard.yu@gmail.com>
>>>>>>>> wrote:
>>>>>>>> 
>>>>>>>>> Hi all,
>>>>>>>>> 
>>>>>>>>> I would like to create a PIP for issue #2664 on Github. The
>>>> details
>>>>> of
>>>>>>>> the
>>>>>>>>> PIP are below.
>>>>>>>>> I hope we could discuss this thoroughly.
>>>>>>>>> 
>>>>>>>>> Cheers,
>>>>>>>>> Richard
>>>>>>>>> 
>>>>>>>>> PIP-31: Add support for transactional messaging
>>>>>>>>> 
>>>>>>>>> Motivation: Pulsar currently could improve upon their system of
>>>>>> sending
>>>>>>>>> packets of data by implementing transactional messaging. This
>>>> system
>>>>>>>>> enforces eventual consistency within the system, and allows
>>>>> operations
>>>>>>>> to
>>>>>>>>> be performed atomically.
>>>>>>>>> 
>>>>>>>>> Proposal:
>>>>>>>>> 
>>>>>>>>> As described in the issue, we would implement the following policy
>>>>> in
>>>>>>>>> Producer and Pulsar Broker:
>>>>>>>>> 1. The producer produces the pre-processing transaction message.
>>>> At
>>>>>> this
>>>>>>>>> point, the broker will set the status of this message to unknown.
>>>>>>>>> 2. After the local transaction is successfully executed, the
>>>> commit
>>>>>>>> message
>>>>>>>>> is sent, otherwise the rollback message is sent.
>>>>>>>>> 3. The broker receives the message. If it is a commit message, it
>>>>>>>> modifies
>>>>>>>>> the transaction status to commit, and then sends an actual message
>>>>> to
>>>>>>>> the
>>>>>>>>> consumer queue. At this time, the consumer can consume the
>>>> message.
>>>>>>>>> Otherwise, the transaction status is modified to rollback. The
>>>>> message
>>>>>>>> will
>>>>>>>>> be discarded.
>>>>>>>>> 4. If at step 2, the producer is down or abnormal, at this time,
>>>> the
>>>>>>>> broker
>>>>>>>>> will periodically ask the specific producer for the status of the
>>>>>>>> message,
>>>>>>>>> and update the status according to the producer's response, and
>>>>>> process
>>>>>>>> it
>>>>>>>>> according to step 3, the action that comes down.
>>>>>>>>> 
>>>>>>>>> Specific concerns:
>>>>>>>>> There are a number of things we will improve upon or add:
>>>>>>>>> - A configuration called ```maxMessageUnknownTime```. Consider
>>>> this
>>>>>>>>> scenario: the pre-processing transaction message is sent, but the
>>>>>>>> commit or
>>>>>>>>> rollback message is never received, which could mean that the
>>>> status
>>>>>> of
>>>>>>>> a
>>>>>>>>> message would be permanently unknown. To avoid this from
>>>> happening,
>>>>> we
>>>>>>>>> would need a config which limits the amount of time the status of
>>>> a
>>>>>>>> message
>>>>>>>>> could be unknown (i.e. ```maxMessageUnknownTime```) After that,
>>>> the
>>>>>>>> message
>>>>>>>>> would be discarded.
>>>>>>>>> - Logging would be updated to log the status of a message i.e.
>>>>>> UNKNOWN,
>>>>>>>>> ROLLBACK, or COMMITTED. This would allow the user to know whether
>>>> or
>>>>>>>> not a
>>>>>>>>> message had failed or fallen through.
>>>>>>>>> 
>>>>>>>>> Possible Additional API:
>>>>>>>>> - We would add a method which allows the user to query the state
>>>> of
>>>>>> the
>>>>>>>>> message i.e. ```getStateOfMessage(long id)```
>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>>> 
>>>> --
>>>> -Ali
>>>> 
>> 
>> 


Re: PIP-31: Add support for transactional messaging

Posted by Sijie Guo <gu...@gmail.com>.
Dave,

You mean implementing the transactions in pulsar function?

- Sijie

On Sun, Mar 3, 2019 at 1:52 AM Dave Fisher <da...@comcast.net> wrote:

> Hi -
>
> Is this a case where a Pulsar function base class for transactions would
> help?
>
> Regards,
> Dave
>
> Sent from my iPhone
>
> > On Mar 2, 2019, at 2:39 AM, Sijie Guo <gu...@gmail.com> wrote:
> >
> > Pravega's model is a better model than Kafka - it addressed the
> > interleaving problems. However Pravega's model is based on a giant
> > replicated log and rewrite the data to a second tiered storage for
> > persistence, which basically re-implemented bookkeeper's logic in
> broker. A
> > fundamental drawback of Pravega is write amplifications. The
> amplifications
> > of both network and IO bandwidth are huge. If you use bookkeeper both for
> > its first-and-second tier storage and assume the bookkeeper replication
> > factor is 3, pravega requires 6x network bandwidth and 12x IO bandwidth.
> > For a given message, it needs to write 3 times into the journal, and 3
> > times for persistent. The amplifications hugely limit the throughput at
> > pravega "brokers".
> >
> > - Sijie
> >
> >
> >
> >> On Sat, Mar 2, 2019 at 6:13 PM Ali Ahmed <ah...@gmail.com> wrote:
> >>
> >> I agree we many want to review pravega's past efforts in this area also.
> >>
> >>
> >>
> https://github.com/pravega/pravega/blob/master/documentation/src/docs/transactions.md
> >>
> >>
> https://github.com/pravega/pravega/blob/master/client/src/main/java/io/pravega/client/stream/Transaction.java
> >>
> >> -Ali
> >>
> >>> On Sat, Mar 2, 2019 at 1:56 AM Sijie Guo <gu...@gmail.com> wrote:
> >>>
> >>> Kafka's implementation is interleaving committed messages with
> >> uncommitted
> >>> messages at storage. Personally I think it is a very ugly design and
> >>> implementation.
> >>>
> >>> Pulsar is a segment centric system, where we have a shared segment
> >> storage
> >>> - bookkeeper. I think a better direction is to leverage the segments
> (aka
> >>> ledgers)
> >>> for buffering uncommitted messages and commit the whole segment when
> the
> >>> whole transaction is committed.
> >>>
> >>> A rough idea would be:
> >>>
> >>> 1) for any transaction, write the messages to a separate ledger (or
> >>> multiple separate ledger).
> >>> 2) during the transaction, accumulates the messages in those ledgers.
> >>> 3) when commit, merge the txn ledgers back to the main data ledger. the
> >>> merge can be done either adding a meta message where data is stored in
> >> the
> >>> txn ledger or actually copying the data to data ledger (depending on
> the
> >>> size of data accumulate in the transaction).
> >>> 4) when abort, delete the txn ledger. No other additional work to be
> >> done.
> >>>
> >>> This would be producing a much clear design than Kafka.
> >>>
> >>> On Ivan's comments:
> >>>
> >>>> Transactional acknowledgement also needs to be taken into account
> >>>
> >>> I don't think we have to treat `transactional acknowledgement` as a
> >> special
> >>> case. currently `acknowledgment` are actually "append" operations into
> >>> cursor ledgers.
> >>> So the problem set can be reduced as `atomic append` to both data
> ledgers
> >>> and cursor ledgers. in that way, we can use one solution for handling
> >>> appending data and updating cursors.
> >>>
> >>> Additionally, I think a related topic about transactions would be
> >>> supporting large sized message (e.g. >= 5MB). If we take the approach I
> >>> described above using a separated ledger for accumulating messages for
> a
> >>> transaction, that we are easy to model a large size message as a
> >>> transaction of chunked messages.
> >>>
> >>> @Richard, @Ivan let me know what do you think. If you guys think the
> >>> direction I raised is a good one to go down, I am happy to write them
> >> down
> >>> into details, and drive the design and coordinate the implementations
> in
> >>> the community.
> >>>
> >>> - Sijie
> >>>
> >>> On Sat, Mar 2, 2019 at 9:45 AM Richard Yu <yo...@gmail.com>
> >>> wrote:
> >>>
> >>>> Hi all,
> >>>>
> >>>> We might be able to get some ideas on implementing this from Kafka:
> >>>>
> >>>>
> >>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging+in+Kafka
> >>>>
> >>>> Obviously, there is some differences in Kafka and Pulsar internals but
> >> at
> >>>> some level, the implementation would be similar.
> >>>> It should help.
> >>>>
> >>>>
> >>>>
> >>>> On Thu, Feb 28, 2019 at 4:29 PM Richard Yu <
> yohan.richard.yu@gmail.com
> >>>
> >>>> wrote:
> >>>>
> >>>>> Hi,
> >>>>>
> >>>>> Per request, I've created a doc so we could get some more input in an
> >>>>> organized manner:
> >>>>>
> >>>>>
> >>>>
> >>>
> >>
> https://docs.google.com/document/d/1mSUsJvPgThnWizeQqljKU5244BMEiYHabA6OP6QEHmQ/edit?usp=sharing
> >>>>>
> >>>>> And for Ivan's questions, I would answer accordingly.
> >>>>>
> >>>>>> By "set the message to unknown", do you mean the broker will cache
> >> the
> >>>>>> message, not writing it to any log?
> >>>>>
> >>>>> We wouldn't cache the message from my interpretation of the steps.
> >> What
> >>>>> the producer is first sending is a pre-processing message, not the
> >> real
> >>>>> message itself. This step basically notifies the broker that the
> >>> message
> >>>> is
> >>>>> on its way. So all we have to do is store the message id and its
> >>>>> corresponding status in a map, and depending on the producer's
> >>> response,
> >>>>> the status will change accordingly.
> >>>>>
> >>>>>> In designs we've discussed previously, this was handled
> >>>>>> by a component called the transaction coordinator, which is a
> >> logical
> >>>>>> component which each broker knows how to talk to. For a transaction
> >>>>>> the commit message is sent to the coordinator, which writes it to
> >> its
> >>>>>> own log, and then goes through each topic in the commit and marks
> >> the
> >>>>>> transaction as completed.
> >>>>>
> >>>>> I wasn't aware of previous discussions on this topic, but it seems
> >>> pretty
> >>>>> good to me. It's certainly better than what I would come up with.
> >>>>> If there's any more things we need to talk about, I suppose we could
> >>> move
> >>>>> it to the google doc to play around with.
> >>>>>
> >>>>> Hope we can get this PIP rolling.
> >>>>>
> >>>>>
> >>>>> On Thu, Feb 28, 2019 at 2:53 AM Sijie Guo <gu...@gmail.com>
> >> wrote:
> >>>>>
> >>>>>> Richard,
> >>>>>>
> >>>>>> Thank you for putting this put and pushing the discussion forward.
> >>>>>>
> >>>>>> I think this is a very large feature. It might be worth creating a
> >>>> google
> >>>>>> doc for it (which is better for collaboration). And I believe Ivan
> >> has
> >>>>>> some
> >>>>>> thoughts as well. If you can put up a google doc (make it
> >>>> world-editable),
> >>>>>> Ivan can probably dump his thoughts there and we can finalize the
> >>>>>> discussion and break down into tasks. So the whole community can
> >>>> actually
> >>>>>> work together at collaborating this.
> >>>>>>
> >>>>>> Thanks,
> >>>>>> Sijie
> >>>>>>
> >>>>>> On Thu, Feb 28, 2019 at 1:08 PM Richard Yu <
> >>> yohan.richard.yu@gmail.com>
> >>>>>> wrote:
> >>>>>>
> >>>>>>> Hi all,
> >>>>>>>
> >>>>>>> I would like to create a PIP for issue #2664 on Github. The
> >> details
> >>> of
> >>>>>> the
> >>>>>>> PIP are below.
> >>>>>>> I hope we could discuss this thoroughly.
> >>>>>>>
> >>>>>>> Cheers,
> >>>>>>> Richard
> >>>>>>>
> >>>>>>> PIP-31: Add support for transactional messaging
> >>>>>>>
> >>>>>>> Motivation: Pulsar currently could improve upon their system of
> >>>> sending
> >>>>>>> packets of data by implementing transactional messaging. This
> >> system
> >>>>>>> enforces eventual consistency within the system, and allows
> >>> operations
> >>>>>> to
> >>>>>>> be performed atomically.
> >>>>>>>
> >>>>>>> Proposal:
> >>>>>>>
> >>>>>>> As described in the issue, we would implement the following policy
> >>> in
> >>>>>>> Producer and Pulsar Broker:
> >>>>>>> 1. The producer produces the pre-processing transaction message.
> >> At
> >>>> this
> >>>>>>> point, the broker will set the status of this message to unknown.
> >>>>>>> 2. After the local transaction is successfully executed, the
> >> commit
> >>>>>> message
> >>>>>>> is sent, otherwise the rollback message is sent.
> >>>>>>> 3. The broker receives the message. If it is a commit message, it
> >>>>>> modifies
> >>>>>>> the transaction status to commit, and then sends an actual message
> >>> to
> >>>>>> the
> >>>>>>> consumer queue. At this time, the consumer can consume the
> >> message.
> >>>>>>> Otherwise, the transaction status is modified to rollback. The
> >>> message
> >>>>>> will
> >>>>>>> be discarded.
> >>>>>>> 4. If at step 2, the producer is down or abnormal, at this time,
> >> the
> >>>>>> broker
> >>>>>>> will periodically ask the specific producer for the status of the
> >>>>>> message,
> >>>>>>> and update the status according to the producer's response, and
> >>>> process
> >>>>>> it
> >>>>>>> according to step 3, the action that comes down.
> >>>>>>>
> >>>>>>> Specific concerns:
> >>>>>>> There are a number of things we will improve upon or add:
> >>>>>>> - A configuration called ```maxMessageUnknownTime```. Consider
> >> this
> >>>>>>> scenario: the pre-processing transaction message is sent, but the
> >>>>>> commit or
> >>>>>>> rollback message is never received, which could mean that the
> >> status
> >>>> of
> >>>>>> a
> >>>>>>> message would be permanently unknown. To avoid this from
> >> happening,
> >>> we
> >>>>>>> would need a config which limits the amount of time the status of
> >> a
> >>>>>> message
> >>>>>>> could be unknown (i.e. ```maxMessageUnknownTime```) After that,
> >> the
> >>>>>> message
> >>>>>>> would be discarded.
> >>>>>>> - Logging would be updated to log the status of a message i.e.
> >>>> UNKNOWN,
> >>>>>>> ROLLBACK, or COMMITTED. This would allow the user to know whether
> >> or
> >>>>>> not a
> >>>>>>> message had failed or fallen through.
> >>>>>>>
> >>>>>>> Possible Additional API:
> >>>>>>> - We would add a method which allows the user to query the state
> >> of
> >>>> the
> >>>>>>> message i.e. ```getStateOfMessage(long id)```
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> >>
> >> --
> >> -Ali
> >>
>
>

Re: PIP-31: Add support for transactional messaging

Posted by Dave Fisher <da...@comcast.net>.
Hi -

Is this a case where a Pulsar function base class for transactions would help?

Regards,
Dave

Sent from my iPhone

> On Mar 2, 2019, at 2:39 AM, Sijie Guo <gu...@gmail.com> wrote:
> 
> Pravega's model is a better model than Kafka - it addressed the
> interleaving problems. However Pravega's model is based on a giant
> replicated log and rewrite the data to a second tiered storage for
> persistence, which basically re-implemented bookkeeper's logic in broker. A
> fundamental drawback of Pravega is write amplifications. The amplifications
> of both network and IO bandwidth are huge. If you use bookkeeper both for
> its first-and-second tier storage and assume the bookkeeper replication
> factor is 3, pravega requires 6x network bandwidth and 12x IO bandwidth.
> For a given message, it needs to write 3 times into the journal, and 3
> times for persistent. The amplifications hugely limit the throughput at
> pravega "brokers".
> 
> - Sijie
> 
> 
> 
>> On Sat, Mar 2, 2019 at 6:13 PM Ali Ahmed <ah...@gmail.com> wrote:
>> 
>> I agree we many want to review pravega's past efforts in this area also.
>> 
>> 
>> https://github.com/pravega/pravega/blob/master/documentation/src/docs/transactions.md
>> 
>> https://github.com/pravega/pravega/blob/master/client/src/main/java/io/pravega/client/stream/Transaction.java
>> 
>> -Ali
>> 
>>> On Sat, Mar 2, 2019 at 1:56 AM Sijie Guo <gu...@gmail.com> wrote:
>>> 
>>> Kafka's implementation is interleaving committed messages with
>> uncommitted
>>> messages at storage. Personally I think it is a very ugly design and
>>> implementation.
>>> 
>>> Pulsar is a segment centric system, where we have a shared segment
>> storage
>>> - bookkeeper. I think a better direction is to leverage the segments (aka
>>> ledgers)
>>> for buffering uncommitted messages and commit the whole segment when the
>>> whole transaction is committed.
>>> 
>>> A rough idea would be:
>>> 
>>> 1) for any transaction, write the messages to a separate ledger (or
>>> multiple separate ledger).
>>> 2) during the transaction, accumulates the messages in those ledgers.
>>> 3) when commit, merge the txn ledgers back to the main data ledger. the
>>> merge can be done either adding a meta message where data is stored in
>> the
>>> txn ledger or actually copying the data to data ledger (depending on the
>>> size of data accumulate in the transaction).
>>> 4) when abort, delete the txn ledger. No other additional work to be
>> done.
>>> 
>>> This would be producing a much clear design than Kafka.
>>> 
>>> On Ivan's comments:
>>> 
>>>> Transactional acknowledgement also needs to be taken into account
>>> 
>>> I don't think we have to treat `transactional acknowledgement` as a
>> special
>>> case. currently `acknowledgment` are actually "append" operations into
>>> cursor ledgers.
>>> So the problem set can be reduced as `atomic append` to both data ledgers
>>> and cursor ledgers. in that way, we can use one solution for handling
>>> appending data and updating cursors.
>>> 
>>> Additionally, I think a related topic about transactions would be
>>> supporting large sized message (e.g. >= 5MB). If we take the approach I
>>> described above using a separated ledger for accumulating messages for a
>>> transaction, that we are easy to model a large size message as a
>>> transaction of chunked messages.
>>> 
>>> @Richard, @Ivan let me know what do you think. If you guys think the
>>> direction I raised is a good one to go down, I am happy to write them
>> down
>>> into details, and drive the design and coordinate the implementations in
>>> the community.
>>> 
>>> - Sijie
>>> 
>>> On Sat, Mar 2, 2019 at 9:45 AM Richard Yu <yo...@gmail.com>
>>> wrote:
>>> 
>>>> Hi all,
>>>> 
>>>> We might be able to get some ideas on implementing this from Kafka:
>>>> 
>>>> 
>>> 
>> https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging+in+Kafka
>>>> 
>>>> Obviously, there is some differences in Kafka and Pulsar internals but
>> at
>>>> some level, the implementation would be similar.
>>>> It should help.
>>>> 
>>>> 
>>>> 
>>>> On Thu, Feb 28, 2019 at 4:29 PM Richard Yu <yohan.richard.yu@gmail.com
>>> 
>>>> wrote:
>>>> 
>>>>> Hi,
>>>>> 
>>>>> Per request, I've created a doc so we could get some more input in an
>>>>> organized manner:
>>>>> 
>>>>> 
>>>> 
>>> 
>> https://docs.google.com/document/d/1mSUsJvPgThnWizeQqljKU5244BMEiYHabA6OP6QEHmQ/edit?usp=sharing
>>>>> 
>>>>> And for Ivan's questions, I would answer accordingly.
>>>>> 
>>>>>> By "set the message to unknown", do you mean the broker will cache
>> the
>>>>>> message, not writing it to any log?
>>>>> 
>>>>> We wouldn't cache the message from my interpretation of the steps.
>> What
>>>>> the producer is first sending is a pre-processing message, not the
>> real
>>>>> message itself. This step basically notifies the broker that the
>>> message
>>>> is
>>>>> on its way. So all we have to do is store the message id and its
>>>>> corresponding status in a map, and depending on the producer's
>>> response,
>>>>> the status will change accordingly.
>>>>> 
>>>>>> In designs we've discussed previously, this was handled
>>>>>> by a component called the transaction coordinator, which is a
>> logical
>>>>>> component which each broker knows how to talk to. For a transaction
>>>>>> the commit message is sent to the coordinator, which writes it to
>> its
>>>>>> own log, and then goes through each topic in the commit and marks
>> the
>>>>>> transaction as completed.
>>>>> 
>>>>> I wasn't aware of previous discussions on this topic, but it seems
>>> pretty
>>>>> good to me. It's certainly better than what I would come up with.
>>>>> If there's any more things we need to talk about, I suppose we could
>>> move
>>>>> it to the google doc to play around with.
>>>>> 
>>>>> Hope we can get this PIP rolling.
>>>>> 
>>>>> 
>>>>> On Thu, Feb 28, 2019 at 2:53 AM Sijie Guo <gu...@gmail.com>
>> wrote:
>>>>> 
>>>>>> Richard,
>>>>>> 
>>>>>> Thank you for putting this put and pushing the discussion forward.
>>>>>> 
>>>>>> I think this is a very large feature. It might be worth creating a
>>>> google
>>>>>> doc for it (which is better for collaboration). And I believe Ivan
>> has
>>>>>> some
>>>>>> thoughts as well. If you can put up a google doc (make it
>>>> world-editable),
>>>>>> Ivan can probably dump his thoughts there and we can finalize the
>>>>>> discussion and break down into tasks. So the whole community can
>>>> actually
>>>>>> work together at collaborating this.
>>>>>> 
>>>>>> Thanks,
>>>>>> Sijie
>>>>>> 
>>>>>> On Thu, Feb 28, 2019 at 1:08 PM Richard Yu <
>>> yohan.richard.yu@gmail.com>
>>>>>> wrote:
>>>>>> 
>>>>>>> Hi all,
>>>>>>> 
>>>>>>> I would like to create a PIP for issue #2664 on Github. The
>> details
>>> of
>>>>>> the
>>>>>>> PIP are below.
>>>>>>> I hope we could discuss this thoroughly.
>>>>>>> 
>>>>>>> Cheers,
>>>>>>> Richard
>>>>>>> 
>>>>>>> PIP-31: Add support for transactional messaging
>>>>>>> 
>>>>>>> Motivation: Pulsar currently could improve upon their system of
>>>> sending
>>>>>>> packets of data by implementing transactional messaging. This
>> system
>>>>>>> enforces eventual consistency within the system, and allows
>>> operations
>>>>>> to
>>>>>>> be performed atomically.
>>>>>>> 
>>>>>>> Proposal:
>>>>>>> 
>>>>>>> As described in the issue, we would implement the following policy
>>> in
>>>>>>> Producer and Pulsar Broker:
>>>>>>> 1. The producer produces the pre-processing transaction message.
>> At
>>>> this
>>>>>>> point, the broker will set the status of this message to unknown.
>>>>>>> 2. After the local transaction is successfully executed, the
>> commit
>>>>>> message
>>>>>>> is sent, otherwise the rollback message is sent.
>>>>>>> 3. The broker receives the message. If it is a commit message, it
>>>>>> modifies
>>>>>>> the transaction status to commit, and then sends an actual message
>>> to
>>>>>> the
>>>>>>> consumer queue. At this time, the consumer can consume the
>> message.
>>>>>>> Otherwise, the transaction status is modified to rollback. The
>>> message
>>>>>> will
>>>>>>> be discarded.
>>>>>>> 4. If at step 2, the producer is down or abnormal, at this time,
>> the
>>>>>> broker
>>>>>>> will periodically ask the specific producer for the status of the
>>>>>> message,
>>>>>>> and update the status according to the producer's response, and
>>>> process
>>>>>> it
>>>>>>> according to step 3, the action that comes down.
>>>>>>> 
>>>>>>> Specific concerns:
>>>>>>> There are a number of things we will improve upon or add:
>>>>>>> - A configuration called ```maxMessageUnknownTime```. Consider
>> this
>>>>>>> scenario: the pre-processing transaction message is sent, but the
>>>>>> commit or
>>>>>>> rollback message is never received, which could mean that the
>> status
>>>> of
>>>>>> a
>>>>>>> message would be permanently unknown. To avoid this from
>> happening,
>>> we
>>>>>>> would need a config which limits the amount of time the status of
>> a
>>>>>> message
>>>>>>> could be unknown (i.e. ```maxMessageUnknownTime```) After that,
>> the
>>>>>> message
>>>>>>> would be discarded.
>>>>>>> - Logging would be updated to log the status of a message i.e.
>>>> UNKNOWN,
>>>>>>> ROLLBACK, or COMMITTED. This would allow the user to know whether
>> or
>>>>>> not a
>>>>>>> message had failed or fallen through.
>>>>>>> 
>>>>>>> Possible Additional API:
>>>>>>> - We would add a method which allows the user to query the state
>> of
>>>> the
>>>>>>> message i.e. ```getStateOfMessage(long id)```
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
>> 
>> 
>> --
>> -Ali
>> 


Re: PIP-31: Add support for transactional messaging

Posted by Sijie Guo <gu...@gmail.com>.
Pravega's model is a better model than Kafka - it addressed the
interleaving problems. However Pravega's model is based on a giant
replicated log and rewrite the data to a second tiered storage for
persistence, which basically re-implemented bookkeeper's logic in broker. A
fundamental drawback of Pravega is write amplifications. The amplifications
of both network and IO bandwidth are huge. If you use bookkeeper both for
its first-and-second tier storage and assume the bookkeeper replication
factor is 3, pravega requires 6x network bandwidth and 12x IO bandwidth.
For a given message, it needs to write 3 times into the journal, and 3
times for persistent. The amplifications hugely limit the throughput at
pravega "brokers".

- Sijie



On Sat, Mar 2, 2019 at 6:13 PM Ali Ahmed <ah...@gmail.com> wrote:

> I agree we many want to review pravega's past efforts in this area also.
>
>
> https://github.com/pravega/pravega/blob/master/documentation/src/docs/transactions.md
>
> https://github.com/pravega/pravega/blob/master/client/src/main/java/io/pravega/client/stream/Transaction.java
>
> -Ali
>
> On Sat, Mar 2, 2019 at 1:56 AM Sijie Guo <gu...@gmail.com> wrote:
>
> > Kafka's implementation is interleaving committed messages with
> uncommitted
> > messages at storage. Personally I think it is a very ugly design and
> > implementation.
> >
> > Pulsar is a segment centric system, where we have a shared segment
> storage
> > - bookkeeper. I think a better direction is to leverage the segments (aka
> > ledgers)
> > for buffering uncommitted messages and commit the whole segment when the
> > whole transaction is committed.
> >
> > A rough idea would be:
> >
> > 1) for any transaction, write the messages to a separate ledger (or
> > multiple separate ledger).
> > 2) during the transaction, accumulates the messages in those ledgers.
> > 3) when commit, merge the txn ledgers back to the main data ledger. the
> > merge can be done either adding a meta message where data is stored in
> the
> > txn ledger or actually copying the data to data ledger (depending on the
> > size of data accumulate in the transaction).
> > 4) when abort, delete the txn ledger. No other additional work to be
> done.
> >
> > This would be producing a much clear design than Kafka.
> >
> > On Ivan's comments:
> >
> > > Transactional acknowledgement also needs to be taken into account
> >
> > I don't think we have to treat `transactional acknowledgement` as a
> special
> > case. currently `acknowledgment` are actually "append" operations into
> > cursor ledgers.
> > So the problem set can be reduced as `atomic append` to both data ledgers
> > and cursor ledgers. in that way, we can use one solution for handling
> > appending data and updating cursors.
> >
> > Additionally, I think a related topic about transactions would be
> > supporting large sized message (e.g. >= 5MB). If we take the approach I
> > described above using a separated ledger for accumulating messages for a
> > transaction, that we are easy to model a large size message as a
> > transaction of chunked messages.
> >
> > @Richard, @Ivan let me know what do you think. If you guys think the
> > direction I raised is a good one to go down, I am happy to write them
> down
> > into details, and drive the design and coordinate the implementations in
> > the community.
> >
> > - Sijie
> >
> > On Sat, Mar 2, 2019 at 9:45 AM Richard Yu <yo...@gmail.com>
> > wrote:
> >
> > > Hi all,
> > >
> > > We might be able to get some ideas on implementing this from Kafka:
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging+in+Kafka
> > >
> > > Obviously, there is some differences in Kafka and Pulsar internals but
> at
> > > some level, the implementation would be similar.
> > > It should help.
> > >
> > >
> > >
> > > On Thu, Feb 28, 2019 at 4:29 PM Richard Yu <yohan.richard.yu@gmail.com
> >
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > > Per request, I've created a doc so we could get some more input in an
> > > > organized manner:
> > > >
> > > >
> > >
> >
> https://docs.google.com/document/d/1mSUsJvPgThnWizeQqljKU5244BMEiYHabA6OP6QEHmQ/edit?usp=sharing
> > > >
> > > > And for Ivan's questions, I would answer accordingly.
> > > >
> > > > >By "set the message to unknown", do you mean the broker will cache
> the
> > > > >message, not writing it to any log?
> > > >
> > > > We wouldn't cache the message from my interpretation of the steps.
> What
> > > > the producer is first sending is a pre-processing message, not the
> real
> > > > message itself. This step basically notifies the broker that the
> > message
> > > is
> > > > on its way. So all we have to do is store the message id and its
> > > > corresponding status in a map, and depending on the producer's
> > response,
> > > > the status will change accordingly.
> > > >
> > > > > In designs we've discussed previously, this was handled
> > > > > by a component called the transaction coordinator, which is a
> logical
> > > > > component which each broker knows how to talk to. For a transaction
> > > > > the commit message is sent to the coordinator, which writes it to
> its
> > > > > own log, and then goes through each topic in the commit and marks
> the
> > > > > transaction as completed.
> > > >
> > > > I wasn't aware of previous discussions on this topic, but it seems
> > pretty
> > > > good to me. It's certainly better than what I would come up with.
> > > > If there's any more things we need to talk about, I suppose we could
> > move
> > > > it to the google doc to play around with.
> > > >
> > > > Hope we can get this PIP rolling.
> > > >
> > > >
> > > > On Thu, Feb 28, 2019 at 2:53 AM Sijie Guo <gu...@gmail.com>
> wrote:
> > > >
> > > >> Richard,
> > > >>
> > > >> Thank you for putting this put and pushing the discussion forward.
> > > >>
> > > >> I think this is a very large feature. It might be worth creating a
> > > google
> > > >> doc for it (which is better for collaboration). And I believe Ivan
> has
> > > >> some
> > > >> thoughts as well. If you can put up a google doc (make it
> > > world-editable),
> > > >> Ivan can probably dump his thoughts there and we can finalize the
> > > >> discussion and break down into tasks. So the whole community can
> > > actually
> > > >> work together at collaborating this.
> > > >>
> > > >> Thanks,
> > > >> Sijie
> > > >>
> > > >> On Thu, Feb 28, 2019 at 1:08 PM Richard Yu <
> > yohan.richard.yu@gmail.com>
> > > >> wrote:
> > > >>
> > > >> > Hi all,
> > > >> >
> > > >> > I would like to create a PIP for issue #2664 on Github. The
> details
> > of
> > > >> the
> > > >> > PIP are below.
> > > >> > I hope we could discuss this thoroughly.
> > > >> >
> > > >> > Cheers,
> > > >> > Richard
> > > >> >
> > > >> > PIP-31: Add support for transactional messaging
> > > >> >
> > > >> > Motivation: Pulsar currently could improve upon their system of
> > > sending
> > > >> > packets of data by implementing transactional messaging. This
> system
> > > >> > enforces eventual consistency within the system, and allows
> > operations
> > > >> to
> > > >> > be performed atomically.
> > > >> >
> > > >> > Proposal:
> > > >> >
> > > >> > As described in the issue, we would implement the following policy
> > in
> > > >> > Producer and Pulsar Broker:
> > > >> > 1. The producer produces the pre-processing transaction message.
> At
> > > this
> > > >> > point, the broker will set the status of this message to unknown.
> > > >> > 2. After the local transaction is successfully executed, the
> commit
> > > >> message
> > > >> > is sent, otherwise the rollback message is sent.
> > > >> > 3. The broker receives the message. If it is a commit message, it
> > > >> modifies
> > > >> > the transaction status to commit, and then sends an actual message
> > to
> > > >> the
> > > >> > consumer queue. At this time, the consumer can consume the
> message.
> > > >> > Otherwise, the transaction status is modified to rollback. The
> > message
> > > >> will
> > > >> > be discarded.
> > > >> > 4. If at step 2, the producer is down or abnormal, at this time,
> the
> > > >> broker
> > > >> > will periodically ask the specific producer for the status of the
> > > >> message,
> > > >> > and update the status according to the producer's response, and
> > > process
> > > >> it
> > > >> > according to step 3, the action that comes down.
> > > >> >
> > > >> > Specific concerns:
> > > >> > There are a number of things we will improve upon or add:
> > > >> > - A configuration called ```maxMessageUnknownTime```. Consider
> this
> > > >> > scenario: the pre-processing transaction message is sent, but the
> > > >> commit or
> > > >> > rollback message is never received, which could mean that the
> status
> > > of
> > > >> a
> > > >> > message would be permanently unknown. To avoid this from
> happening,
> > we
> > > >> > would need a config which limits the amount of time the status of
> a
> > > >> message
> > > >> > could be unknown (i.e. ```maxMessageUnknownTime```) After that,
> the
> > > >> message
> > > >> > would be discarded.
> > > >> > - Logging would be updated to log the status of a message i.e.
> > > UNKNOWN,
> > > >> > ROLLBACK, or COMMITTED. This would allow the user to know whether
> or
> > > >> not a
> > > >> > message had failed or fallen through.
> > > >> >
> > > >> > Possible Additional API:
> > > >> > - We would add a method which allows the user to query the state
> of
> > > the
> > > >> > message i.e. ```getStateOfMessage(long id)```
> > > >> >
> > > >>
> > > >
> > >
> >
>
>
> --
> -Ali
>

Re: PIP-31: Add support for transactional messaging

Posted by Ali Ahmed <ah...@gmail.com>.
I agree we many want to review pravega's past efforts in this area also.

https://github.com/pravega/pravega/blob/master/documentation/src/docs/transactions.md
https://github.com/pravega/pravega/blob/master/client/src/main/java/io/pravega/client/stream/Transaction.java

-Ali

On Sat, Mar 2, 2019 at 1:56 AM Sijie Guo <gu...@gmail.com> wrote:

> Kafka's implementation is interleaving committed messages with uncommitted
> messages at storage. Personally I think it is a very ugly design and
> implementation.
>
> Pulsar is a segment centric system, where we have a shared segment storage
> - bookkeeper. I think a better direction is to leverage the segments (aka
> ledgers)
> for buffering uncommitted messages and commit the whole segment when the
> whole transaction is committed.
>
> A rough idea would be:
>
> 1) for any transaction, write the messages to a separate ledger (or
> multiple separate ledger).
> 2) during the transaction, accumulates the messages in those ledgers.
> 3) when commit, merge the txn ledgers back to the main data ledger. the
> merge can be done either adding a meta message where data is stored in the
> txn ledger or actually copying the data to data ledger (depending on the
> size of data accumulate in the transaction).
> 4) when abort, delete the txn ledger. No other additional work to be done.
>
> This would be producing a much clear design than Kafka.
>
> On Ivan's comments:
>
> > Transactional acknowledgement also needs to be taken into account
>
> I don't think we have to treat `transactional acknowledgement` as a special
> case. currently `acknowledgment` are actually "append" operations into
> cursor ledgers.
> So the problem set can be reduced as `atomic append` to both data ledgers
> and cursor ledgers. in that way, we can use one solution for handling
> appending data and updating cursors.
>
> Additionally, I think a related topic about transactions would be
> supporting large sized message (e.g. >= 5MB). If we take the approach I
> described above using a separated ledger for accumulating messages for a
> transaction, that we are easy to model a large size message as a
> transaction of chunked messages.
>
> @Richard, @Ivan let me know what do you think. If you guys think the
> direction I raised is a good one to go down, I am happy to write them down
> into details, and drive the design and coordinate the implementations in
> the community.
>
> - Sijie
>
> On Sat, Mar 2, 2019 at 9:45 AM Richard Yu <yo...@gmail.com>
> wrote:
>
> > Hi all,
> >
> > We might be able to get some ideas on implementing this from Kafka:
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging+in+Kafka
> >
> > Obviously, there is some differences in Kafka and Pulsar internals but at
> > some level, the implementation would be similar.
> > It should help.
> >
> >
> >
> > On Thu, Feb 28, 2019 at 4:29 PM Richard Yu <yo...@gmail.com>
> > wrote:
> >
> > > Hi,
> > >
> > > Per request, I've created a doc so we could get some more input in an
> > > organized manner:
> > >
> > >
> >
> https://docs.google.com/document/d/1mSUsJvPgThnWizeQqljKU5244BMEiYHabA6OP6QEHmQ/edit?usp=sharing
> > >
> > > And for Ivan's questions, I would answer accordingly.
> > >
> > > >By "set the message to unknown", do you mean the broker will cache the
> > > >message, not writing it to any log?
> > >
> > > We wouldn't cache the message from my interpretation of the steps. What
> > > the producer is first sending is a pre-processing message, not the real
> > > message itself. This step basically notifies the broker that the
> message
> > is
> > > on its way. So all we have to do is store the message id and its
> > > corresponding status in a map, and depending on the producer's
> response,
> > > the status will change accordingly.
> > >
> > > > In designs we've discussed previously, this was handled
> > > > by a component called the transaction coordinator, which is a logical
> > > > component which each broker knows how to talk to. For a transaction
> > > > the commit message is sent to the coordinator, which writes it to its
> > > > own log, and then goes through each topic in the commit and marks the
> > > > transaction as completed.
> > >
> > > I wasn't aware of previous discussions on this topic, but it seems
> pretty
> > > good to me. It's certainly better than what I would come up with.
> > > If there's any more things we need to talk about, I suppose we could
> move
> > > it to the google doc to play around with.
> > >
> > > Hope we can get this PIP rolling.
> > >
> > >
> > > On Thu, Feb 28, 2019 at 2:53 AM Sijie Guo <gu...@gmail.com> wrote:
> > >
> > >> Richard,
> > >>
> > >> Thank you for putting this put and pushing the discussion forward.
> > >>
> > >> I think this is a very large feature. It might be worth creating a
> > google
> > >> doc for it (which is better for collaboration). And I believe Ivan has
> > >> some
> > >> thoughts as well. If you can put up a google doc (make it
> > world-editable),
> > >> Ivan can probably dump his thoughts there and we can finalize the
> > >> discussion and break down into tasks. So the whole community can
> > actually
> > >> work together at collaborating this.
> > >>
> > >> Thanks,
> > >> Sijie
> > >>
> > >> On Thu, Feb 28, 2019 at 1:08 PM Richard Yu <
> yohan.richard.yu@gmail.com>
> > >> wrote:
> > >>
> > >> > Hi all,
> > >> >
> > >> > I would like to create a PIP for issue #2664 on Github. The details
> of
> > >> the
> > >> > PIP are below.
> > >> > I hope we could discuss this thoroughly.
> > >> >
> > >> > Cheers,
> > >> > Richard
> > >> >
> > >> > PIP-31: Add support for transactional messaging
> > >> >
> > >> > Motivation: Pulsar currently could improve upon their system of
> > sending
> > >> > packets of data by implementing transactional messaging. This system
> > >> > enforces eventual consistency within the system, and allows
> operations
> > >> to
> > >> > be performed atomically.
> > >> >
> > >> > Proposal:
> > >> >
> > >> > As described in the issue, we would implement the following policy
> in
> > >> > Producer and Pulsar Broker:
> > >> > 1. The producer produces the pre-processing transaction message. At
> > this
> > >> > point, the broker will set the status of this message to unknown.
> > >> > 2. After the local transaction is successfully executed, the commit
> > >> message
> > >> > is sent, otherwise the rollback message is sent.
> > >> > 3. The broker receives the message. If it is a commit message, it
> > >> modifies
> > >> > the transaction status to commit, and then sends an actual message
> to
> > >> the
> > >> > consumer queue. At this time, the consumer can consume the message.
> > >> > Otherwise, the transaction status is modified to rollback. The
> message
> > >> will
> > >> > be discarded.
> > >> > 4. If at step 2, the producer is down or abnormal, at this time, the
> > >> broker
> > >> > will periodically ask the specific producer for the status of the
> > >> message,
> > >> > and update the status according to the producer's response, and
> > process
> > >> it
> > >> > according to step 3, the action that comes down.
> > >> >
> > >> > Specific concerns:
> > >> > There are a number of things we will improve upon or add:
> > >> > - A configuration called ```maxMessageUnknownTime```. Consider this
> > >> > scenario: the pre-processing transaction message is sent, but the
> > >> commit or
> > >> > rollback message is never received, which could mean that the status
> > of
> > >> a
> > >> > message would be permanently unknown. To avoid this from happening,
> we
> > >> > would need a config which limits the amount of time the status of a
> > >> message
> > >> > could be unknown (i.e. ```maxMessageUnknownTime```) After that, the
> > >> message
> > >> > would be discarded.
> > >> > - Logging would be updated to log the status of a message i.e.
> > UNKNOWN,
> > >> > ROLLBACK, or COMMITTED. This would allow the user to know whether or
> > >> not a
> > >> > message had failed or fallen through.
> > >> >
> > >> > Possible Additional API:
> > >> > - We would add a method which allows the user to query the state of
> > the
> > >> > message i.e. ```getStateOfMessage(long id)```
> > >> >
> > >>
> > >
> >
>


-- 
-Ali

Re: PIP-31: Add support for transactional messaging

Posted by Sijie Guo <gu...@gmail.com>.
Kafka's implementation is interleaving committed messages with uncommitted
messages at storage. Personally I think it is a very ugly design and
implementation.

Pulsar is a segment centric system, where we have a shared segment storage
- bookkeeper. I think a better direction is to leverage the segments (aka
ledgers)
for buffering uncommitted messages and commit the whole segment when the
whole transaction is committed.

A rough idea would be:

1) for any transaction, write the messages to a separate ledger (or
multiple separate ledger).
2) during the transaction, accumulates the messages in those ledgers.
3) when commit, merge the txn ledgers back to the main data ledger. the
merge can be done either adding a meta message where data is stored in the
txn ledger or actually copying the data to data ledger (depending on the
size of data accumulate in the transaction).
4) when abort, delete the txn ledger. No other additional work to be done.

This would be producing a much clear design than Kafka.

On Ivan's comments:

> Transactional acknowledgement also needs to be taken into account

I don't think we have to treat `transactional acknowledgement` as a special
case. currently `acknowledgment` are actually "append" operations into
cursor ledgers.
So the problem set can be reduced as `atomic append` to both data ledgers
and cursor ledgers. in that way, we can use one solution for handling
appending data and updating cursors.

Additionally, I think a related topic about transactions would be
supporting large sized message (e.g. >= 5MB). If we take the approach I
described above using a separated ledger for accumulating messages for a
transaction, that we are easy to model a large size message as a
transaction of chunked messages.

@Richard, @Ivan let me know what do you think. If you guys think the
direction I raised is a good one to go down, I am happy to write them down
into details, and drive the design and coordinate the implementations in
the community.

- Sijie

On Sat, Mar 2, 2019 at 9:45 AM Richard Yu <yo...@gmail.com>
wrote:

> Hi all,
>
> We might be able to get some ideas on implementing this from Kafka:
>
> https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging+in+Kafka
>
> Obviously, there is some differences in Kafka and Pulsar internals but at
> some level, the implementation would be similar.
> It should help.
>
>
>
> On Thu, Feb 28, 2019 at 4:29 PM Richard Yu <yo...@gmail.com>
> wrote:
>
> > Hi,
> >
> > Per request, I've created a doc so we could get some more input in an
> > organized manner:
> >
> >
> https://docs.google.com/document/d/1mSUsJvPgThnWizeQqljKU5244BMEiYHabA6OP6QEHmQ/edit?usp=sharing
> >
> > And for Ivan's questions, I would answer accordingly.
> >
> > >By "set the message to unknown", do you mean the broker will cache the
> > >message, not writing it to any log?
> >
> > We wouldn't cache the message from my interpretation of the steps. What
> > the producer is first sending is a pre-processing message, not the real
> > message itself. This step basically notifies the broker that the message
> is
> > on its way. So all we have to do is store the message id and its
> > corresponding status in a map, and depending on the producer's response,
> > the status will change accordingly.
> >
> > > In designs we've discussed previously, this was handled
> > > by a component called the transaction coordinator, which is a logical
> > > component which each broker knows how to talk to. For a transaction
> > > the commit message is sent to the coordinator, which writes it to its
> > > own log, and then goes through each topic in the commit and marks the
> > > transaction as completed.
> >
> > I wasn't aware of previous discussions on this topic, but it seems pretty
> > good to me. It's certainly better than what I would come up with.
> > If there's any more things we need to talk about, I suppose we could move
> > it to the google doc to play around with.
> >
> > Hope we can get this PIP rolling.
> >
> >
> > On Thu, Feb 28, 2019 at 2:53 AM Sijie Guo <gu...@gmail.com> wrote:
> >
> >> Richard,
> >>
> >> Thank you for putting this put and pushing the discussion forward.
> >>
> >> I think this is a very large feature. It might be worth creating a
> google
> >> doc for it (which is better for collaboration). And I believe Ivan has
> >> some
> >> thoughts as well. If you can put up a google doc (make it
> world-editable),
> >> Ivan can probably dump his thoughts there and we can finalize the
> >> discussion and break down into tasks. So the whole community can
> actually
> >> work together at collaborating this.
> >>
> >> Thanks,
> >> Sijie
> >>
> >> On Thu, Feb 28, 2019 at 1:08 PM Richard Yu <yo...@gmail.com>
> >> wrote:
> >>
> >> > Hi all,
> >> >
> >> > I would like to create a PIP for issue #2664 on Github. The details of
> >> the
> >> > PIP are below.
> >> > I hope we could discuss this thoroughly.
> >> >
> >> > Cheers,
> >> > Richard
> >> >
> >> > PIP-31: Add support for transactional messaging
> >> >
> >> > Motivation: Pulsar currently could improve upon their system of
> sending
> >> > packets of data by implementing transactional messaging. This system
> >> > enforces eventual consistency within the system, and allows operations
> >> to
> >> > be performed atomically.
> >> >
> >> > Proposal:
> >> >
> >> > As described in the issue, we would implement the following policy in
> >> > Producer and Pulsar Broker:
> >> > 1. The producer produces the pre-processing transaction message. At
> this
> >> > point, the broker will set the status of this message to unknown.
> >> > 2. After the local transaction is successfully executed, the commit
> >> message
> >> > is sent, otherwise the rollback message is sent.
> >> > 3. The broker receives the message. If it is a commit message, it
> >> modifies
> >> > the transaction status to commit, and then sends an actual message to
> >> the
> >> > consumer queue. At this time, the consumer can consume the message.
> >> > Otherwise, the transaction status is modified to rollback. The message
> >> will
> >> > be discarded.
> >> > 4. If at step 2, the producer is down or abnormal, at this time, the
> >> broker
> >> > will periodically ask the specific producer for the status of the
> >> message,
> >> > and update the status according to the producer's response, and
> process
> >> it
> >> > according to step 3, the action that comes down.
> >> >
> >> > Specific concerns:
> >> > There are a number of things we will improve upon or add:
> >> > - A configuration called ```maxMessageUnknownTime```. Consider this
> >> > scenario: the pre-processing transaction message is sent, but the
> >> commit or
> >> > rollback message is never received, which could mean that the status
> of
> >> a
> >> > message would be permanently unknown. To avoid this from happening, we
> >> > would need a config which limits the amount of time the status of a
> >> message
> >> > could be unknown (i.e. ```maxMessageUnknownTime```) After that, the
> >> message
> >> > would be discarded.
> >> > - Logging would be updated to log the status of a message i.e.
> UNKNOWN,
> >> > ROLLBACK, or COMMITTED. This would allow the user to know whether or
> >> not a
> >> > message had failed or fallen through.
> >> >
> >> > Possible Additional API:
> >> > - We would add a method which allows the user to query the state of
> the
> >> > message i.e. ```getStateOfMessage(long id)```
> >> >
> >>
> >
>

Re: PIP-31: Add support for transactional messaging

Posted by Richard Yu <yo...@gmail.com>.
Hi all,

We might be able to get some ideas on implementing this from Kafka:
https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging+in+Kafka

Obviously, there is some differences in Kafka and Pulsar internals but at
some level, the implementation would be similar.
It should help.



On Thu, Feb 28, 2019 at 4:29 PM Richard Yu <yo...@gmail.com>
wrote:

> Hi,
>
> Per request, I've created a doc so we could get some more input in an
> organized manner:
>
> https://docs.google.com/document/d/1mSUsJvPgThnWizeQqljKU5244BMEiYHabA6OP6QEHmQ/edit?usp=sharing
>
> And for Ivan's questions, I would answer accordingly.
>
> >By "set the message to unknown", do you mean the broker will cache the
> >message, not writing it to any log?
>
> We wouldn't cache the message from my interpretation of the steps. What
> the producer is first sending is a pre-processing message, not the real
> message itself. This step basically notifies the broker that the message is
> on its way. So all we have to do is store the message id and its
> corresponding status in a map, and depending on the producer's response,
> the status will change accordingly.
>
> > In designs we've discussed previously, this was handled
> > by a component called the transaction coordinator, which is a logical
> > component which each broker knows how to talk to. For a transaction
> > the commit message is sent to the coordinator, which writes it to its
> > own log, and then goes through each topic in the commit and marks the
> > transaction as completed.
>
> I wasn't aware of previous discussions on this topic, but it seems pretty
> good to me. It's certainly better than what I would come up with.
> If there's any more things we need to talk about, I suppose we could move
> it to the google doc to play around with.
>
> Hope we can get this PIP rolling.
>
>
> On Thu, Feb 28, 2019 at 2:53 AM Sijie Guo <gu...@gmail.com> wrote:
>
>> Richard,
>>
>> Thank you for putting this put and pushing the discussion forward.
>>
>> I think this is a very large feature. It might be worth creating a google
>> doc for it (which is better for collaboration). And I believe Ivan has
>> some
>> thoughts as well. If you can put up a google doc (make it world-editable),
>> Ivan can probably dump his thoughts there and we can finalize the
>> discussion and break down into tasks. So the whole community can actually
>> work together at collaborating this.
>>
>> Thanks,
>> Sijie
>>
>> On Thu, Feb 28, 2019 at 1:08 PM Richard Yu <yo...@gmail.com>
>> wrote:
>>
>> > Hi all,
>> >
>> > I would like to create a PIP for issue #2664 on Github. The details of
>> the
>> > PIP are below.
>> > I hope we could discuss this thoroughly.
>> >
>> > Cheers,
>> > Richard
>> >
>> > PIP-31: Add support for transactional messaging
>> >
>> > Motivation: Pulsar currently could improve upon their system of sending
>> > packets of data by implementing transactional messaging. This system
>> > enforces eventual consistency within the system, and allows operations
>> to
>> > be performed atomically.
>> >
>> > Proposal:
>> >
>> > As described in the issue, we would implement the following policy in
>> > Producer and Pulsar Broker:
>> > 1. The producer produces the pre-processing transaction message. At this
>> > point, the broker will set the status of this message to unknown.
>> > 2. After the local transaction is successfully executed, the commit
>> message
>> > is sent, otherwise the rollback message is sent.
>> > 3. The broker receives the message. If it is a commit message, it
>> modifies
>> > the transaction status to commit, and then sends an actual message to
>> the
>> > consumer queue. At this time, the consumer can consume the message.
>> > Otherwise, the transaction status is modified to rollback. The message
>> will
>> > be discarded.
>> > 4. If at step 2, the producer is down or abnormal, at this time, the
>> broker
>> > will periodically ask the specific producer for the status of the
>> message,
>> > and update the status according to the producer's response, and process
>> it
>> > according to step 3, the action that comes down.
>> >
>> > Specific concerns:
>> > There are a number of things we will improve upon or add:
>> > - A configuration called ```maxMessageUnknownTime```. Consider this
>> > scenario: the pre-processing transaction message is sent, but the
>> commit or
>> > rollback message is never received, which could mean that the status of
>> a
>> > message would be permanently unknown. To avoid this from happening, we
>> > would need a config which limits the amount of time the status of a
>> message
>> > could be unknown (i.e. ```maxMessageUnknownTime```) After that, the
>> message
>> > would be discarded.
>> > - Logging would be updated to log the status of a message i.e. UNKNOWN,
>> > ROLLBACK, or COMMITTED. This would allow the user to know whether or
>> not a
>> > message had failed or fallen through.
>> >
>> > Possible Additional API:
>> > - We would add a method which allows the user to query the state of the
>> > message i.e. ```getStateOfMessage(long id)```
>> >
>>
>

Re: PIP-31: Add support for transactional messaging

Posted by Richard Yu <yo...@gmail.com>.
Hi,

Per request, I've created a doc so we could get some more input in an
organized manner:
https://docs.google.com/document/d/1mSUsJvPgThnWizeQqljKU5244BMEiYHabA6OP6QEHmQ/edit?usp=sharing

And for Ivan's questions, I would answer accordingly.

>By "set the message to unknown", do you mean the broker will cache the
>message, not writing it to any log?

We wouldn't cache the message from my interpretation of the steps. What the
producer is first sending is a pre-processing message, not the real message
itself. This step basically notifies the broker that the message is on its
way. So all we have to do is store the message id and its corresponding
status in a map, and depending on the producer's response, the status will
change accordingly.

> In designs we've discussed previously, this was handled
> by a component called the transaction coordinator, which is a logical
> component which each broker knows how to talk to. For a transaction
> the commit message is sent to the coordinator, which writes it to its
> own log, and then goes through each topic in the commit and marks the
> transaction as completed.

I wasn't aware of previous discussions on this topic, but it seems pretty
good to me. It's certainly better than what I would come up with.
If there's any more things we need to talk about, I suppose we could move
it to the google doc to play around with.

Hope we can get this PIP rolling.


On Thu, Feb 28, 2019 at 2:53 AM Sijie Guo <gu...@gmail.com> wrote:

> Richard,
>
> Thank you for putting this put and pushing the discussion forward.
>
> I think this is a very large feature. It might be worth creating a google
> doc for it (which is better for collaboration). And I believe Ivan has some
> thoughts as well. If you can put up a google doc (make it world-editable),
> Ivan can probably dump his thoughts there and we can finalize the
> discussion and break down into tasks. So the whole community can actually
> work together at collaborating this.
>
> Thanks,
> Sijie
>
> On Thu, Feb 28, 2019 at 1:08 PM Richard Yu <yo...@gmail.com>
> wrote:
>
> > Hi all,
> >
> > I would like to create a PIP for issue #2664 on Github. The details of
> the
> > PIP are below.
> > I hope we could discuss this thoroughly.
> >
> > Cheers,
> > Richard
> >
> > PIP-31: Add support for transactional messaging
> >
> > Motivation: Pulsar currently could improve upon their system of sending
> > packets of data by implementing transactional messaging. This system
> > enforces eventual consistency within the system, and allows operations to
> > be performed atomically.
> >
> > Proposal:
> >
> > As described in the issue, we would implement the following policy in
> > Producer and Pulsar Broker:
> > 1. The producer produces the pre-processing transaction message. At this
> > point, the broker will set the status of this message to unknown.
> > 2. After the local transaction is successfully executed, the commit
> message
> > is sent, otherwise the rollback message is sent.
> > 3. The broker receives the message. If it is a commit message, it
> modifies
> > the transaction status to commit, and then sends an actual message to the
> > consumer queue. At this time, the consumer can consume the message.
> > Otherwise, the transaction status is modified to rollback. The message
> will
> > be discarded.
> > 4. If at step 2, the producer is down or abnormal, at this time, the
> broker
> > will periodically ask the specific producer for the status of the
> message,
> > and update the status according to the producer's response, and process
> it
> > according to step 3, the action that comes down.
> >
> > Specific concerns:
> > There are a number of things we will improve upon or add:
> > - A configuration called ```maxMessageUnknownTime```. Consider this
> > scenario: the pre-processing transaction message is sent, but the commit
> or
> > rollback message is never received, which could mean that the status of a
> > message would be permanently unknown. To avoid this from happening, we
> > would need a config which limits the amount of time the status of a
> message
> > could be unknown (i.e. ```maxMessageUnknownTime```) After that, the
> message
> > would be discarded.
> > - Logging would be updated to log the status of a message i.e. UNKNOWN,
> > ROLLBACK, or COMMITTED. This would allow the user to know whether or not
> a
> > message had failed or fallen through.
> >
> > Possible Additional API:
> > - We would add a method which allows the user to query the state of the
> > message i.e. ```getStateOfMessage(long id)```
> >
>

Re: PIP-31: Add support for transactional messaging

Posted by Sijie Guo <gu...@gmail.com>.
Richard,

Thank you for putting this put and pushing the discussion forward.

I think this is a very large feature. It might be worth creating a google
doc for it (which is better for collaboration). And I believe Ivan has some
thoughts as well. If you can put up a google doc (make it world-editable),
Ivan can probably dump his thoughts there and we can finalize the
discussion and break down into tasks. So the whole community can actually
work together at collaborating this.

Thanks,
Sijie

On Thu, Feb 28, 2019 at 1:08 PM Richard Yu <yo...@gmail.com>
wrote:

> Hi all,
>
> I would like to create a PIP for issue #2664 on Github. The details of the
> PIP are below.
> I hope we could discuss this thoroughly.
>
> Cheers,
> Richard
>
> PIP-31: Add support for transactional messaging
>
> Motivation: Pulsar currently could improve upon their system of sending
> packets of data by implementing transactional messaging. This system
> enforces eventual consistency within the system, and allows operations to
> be performed atomically.
>
> Proposal:
>
> As described in the issue, we would implement the following policy in
> Producer and Pulsar Broker:
> 1. The producer produces the pre-processing transaction message. At this
> point, the broker will set the status of this message to unknown.
> 2. After the local transaction is successfully executed, the commit message
> is sent, otherwise the rollback message is sent.
> 3. The broker receives the message. If it is a commit message, it modifies
> the transaction status to commit, and then sends an actual message to the
> consumer queue. At this time, the consumer can consume the message.
> Otherwise, the transaction status is modified to rollback. The message will
> be discarded.
> 4. If at step 2, the producer is down or abnormal, at this time, the broker
> will periodically ask the specific producer for the status of the message,
> and update the status according to the producer's response, and process it
> according to step 3, the action that comes down.
>
> Specific concerns:
> There are a number of things we will improve upon or add:
> - A configuration called ```maxMessageUnknownTime```. Consider this
> scenario: the pre-processing transaction message is sent, but the commit or
> rollback message is never received, which could mean that the status of a
> message would be permanently unknown. To avoid this from happening, we
> would need a config which limits the amount of time the status of a message
> could be unknown (i.e. ```maxMessageUnknownTime```) After that, the message
> would be discarded.
> - Logging would be updated to log the status of a message i.e. UNKNOWN,
> ROLLBACK, or COMMITTED. This would allow the user to know whether or not a
> message had failed or fallen through.
>
> Possible Additional API:
> - We would add a method which allows the user to query the state of the
> message i.e. ```getStateOfMessage(long id)```
>