You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by 柳尘 <yu...@gmail.com> on 2022/10/19 06:04:17 UTC

Re: [DISCUSS] RIP-47 Producer idempotent send

ok i have resent

Xinyu Zhou <yu...@apache.org> 于2022年9月29日周四 20:10写道:

> Hi, thanks for starting this discussion.
>
> But the format is broken, it's hard to read, could you please resend a
> well-formatted version? Or, paste a doc link?
>
> Regards
>
> On Thu, Sep 29, 2022 at 7:23 PM 柳尘 <yu...@gmail.com> wrote:
>
> > *Status*
> >
> >
> > * - Current State: Discussing- Authors: [complone](github.com/complone
> > <http://github.com/complone>)- Shepherds: - Mailing List discussion:
> > dev@rocketmq.apache.org <de...@rocketmq.apache.org>- Pull Request: -
> > Released: no*Background & MotivationWhat do we need to do
> >
> >
> > * - Will we add a new module? yes- Will we add new APIs? yes- Will we add
> > new feature? yes*Why should we do that
> >
> >
> >
> >
> >
> >
> >
> > * - Are there any problems of our current project?At present, in the
> > process of rocketmq producing messages, there is uncertainty in the
> network
> > call itself, that is, the so-called processing state, so there will be
> > repetitions. Many other MQ products also have this problem. The usual
> > approach is to ask consumers to deduplicate messages when consuming
> > messages. Idempotent bases should be generated by message producers. When
> > sending a message, we can pass the message's key. Set the id, the
> > corresponding API is ```org.apache.rocketmq.common.message.setKeys(String
> > keys)```.   - What can we benefit proposed changes?We introduce a notion
> of
> > TransactionalId, to enable users to uniquely identify producers in a
> > persistent way. Different instances of a producer with the same
> > TransactionalId will be able to resume (or abort) any transactions
> > instantiated by the previous instance.We introduce the notion of a
> producer
> > epoch, which enables us to ensure that there is only one legitimate
> active
> > instance of a producer with a given TransactionalId, and hence enables us
> > to maintain transaction guarantees in the event of failures.*Goals
> >
> >
> >
> >
> >
> >
> >
> >
> > * - What problem is this proposal designed to solve?When tuned for
> > reliability, users can guarantee that every message write will be
> persisted
> > at least once without data loss. Duplications may appear in the stream
> due
> > to producer retries. For example, a broker might crash between
> committing a
> > message and sending an ack to the producer, causing the producer to
> retry,
> > resulting in duplicate messages in the stream.However, idempotent
> producers
> > don’t provide guarantees for writes across multiple MessageQueues. For
> > this, one needs stronger transactional guarantees, ie. the ability to
> write
> > to several MessageQueues atomically. By atomically, we mean the ability
> to
> > commit a set of messages across MessageQueues as a unit: either all
> > messages are committed, or none of them are. - To what degree should we
> > solve the problem?Within a transaction, we also need to make sure that
> > there is no duplicate messages generated by the producer. To achieve
> this,
> > we are going to add sequence numbers to messages to allow the brokers to
> > de-duplicate messages per producer and topic partition. For each topic
> > partition that is written to, the producer maintains a sequence number
> > counter and assigns the next number in the sequence for each new message.
> > The broker verifies that the next message produced has been assigned the
> > next number and otherwise returns an error. In addition, since the
> sequence
> > number is per producer and topic partition, we also need to uniquely
> > identify a producer across multiple sessions (i.e. when the producer
> fails
> > and recreates, etc). Hence we introduce a new TransactionalId to
> > distinguish producers, along with an epoch number so that zombie writers
> > with the same TransactionalId can be fenced.At any given point in time, a
> > producer can only have one ongoing transaction, so we can distinguish
> > messages that belong to different transactions by their respective
> > TransactionalId. Producers with the same TransactionalId will talk to the
> > same transaction coordinator which also keeps track of their
> > TransactionalIds in addition to managing their transaction status. *
> > Non-Goals
> >
> >
> > * - What problem is this proposal NOT designed to solve?- Are there any
> > limits of this proposal?*ChangesArchitecture
> >
> > Interface Design/Change
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > * - Method signature changesA set of new public APIs to the
> > TransactionMQProducer class, and describe how these APIs will be
> > implemented./* initialize the producer as a transactional producer
> > */initTransactions()The following steps will be taken when
> > initTransactions() is called: 1. If no TransactionalId has been provided
> in
> > configuration, skip to step 3.2. Send a FindCoordinatorRequest with the
> > configured TransactionalId and with CoordinatorType encoded as
> > “transaction” to a random broker. Block for the corresponding response,
> > which will return the assigned transaction coordinator for this
> producer.3.
> > Send an InitPidRequest to the transaction coordinator or to a random
> broker
> > if no TransactionalId was provided in configuration. Block for the
> > corresponding response to get the returned PID./* start a transaction to
> > produce messages */beginTransaction()  The following steps are executed
> on
> > the producer when beginTransaction is called: 1. Check if the producer is
> > transactional (i.e. init has been called), if not throw an exception (we
> > omit this step in the rest of the APIs, but they all need to execute
> it).2.
> > Check whether a transaction has already been started. If so, raise an
> > exception./* send offsets for a given consumer group within this
> > transaction */sendOffsetsToTransaction(            Map<TopicPartition,
> > OffsetAndMetadata> offsets,             String consumerGroupId)  The
> > following steps are executed on the producer when
> sendOffsetsToTransaction
> > is called: 1. Check if it is currently within a transaction, if not throw
> > an exception; otherwise proceed to the next step.2. Check if this
> function
> > has ever been called for the given groupId within this transaction. If
> not
> > then send an AddOffsetsToTxnRequest to the transaction coordinator, block
> > until the corresponding response is received; otherwise proceed to the
> next
> > step.3. Send a TxnOffsetCommitRequest to the coordinator return from the
> > response in the previous step, block until the corresponding response is
> > received./* commit the transaction with its produced messages
> > */commitTransaction() The following steps are executed on the producer
> when
> > commitTransaction is called: 1. Check if there is an active transaction,
> if
> > not throw an exception; otherwise proceed to the next step.2. Call flush
> to
> > make sure all sent messages in this transactions are acknowledged.3. Send
> > an EndTxnRequest with COMMIT command to the transaction coordinator,
> block
> > until the corresponding response is received. - Method behavior changes -
> > CLI command changes- Log format or content changes* Compatibility,
> > Deprecation, and Migration Plan
> >
> >
> > * - Are backward and forward compatibility taken into consideration?- Are
> > there deprecated APIs?- How do we do migration?*Implementation Outline
> > Phase 1
> >
> >
> > *First, before sending data, you need to manually create a transaction
> > coordinator to control the one to many relationship between PID and
> > transcationId. To this end, you need to ensure that the consumption
> > progress of offset is notified to the tc in a timely manner and that the
> > messages of submitted but unsettled transactions will not be deleted by
> > mistake during the simultaneous process of the scheduled deletion
> > task*Phase
> > 2
> >
> > *Whether it is necessary to judge the progress of the current offset and
> > the global PID visible transaction offset in PullLiteProducer in order to
> > enable PID ->transactionid to achieve cross partition transaction
> > consistency*Phase 3
> >
> >
> > *When downstream consumers consume, they need to ensure that messages
> > marked as transactionId in the upstream market are perceived by
> downstream
> > consumers*Rejected Alternatives How does alternatives solve the issue you
> > proposed?Pros and Cons of alternativesWhy should we reject above
> > alternatives
> >
>