You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/07/16 02:52:41 UTC

[GitHub] [pulsar] gaoran10 opened a new pull request #7552: [WIP] Produce transaction messages integration

gaoran10 opened a new pull request #7552:
URL: https://github.com/apache/pulsar/pull/7552


   Fix #2664 
   
   # Motivation
   
   Currently, the transaction components are all independent, the relationship between transaction client and transaction server needs to be established.
   
   The target of this PR is making the Pulsar client could send transaction messages to the Pulsar broker.
   
   ### Modifications
   
   1. Make the Pulsar client could generate a new transaction and get the txnID.
   2. Make the Pulsar client could send transaction messages to transaction buffer.
   3. Append a committing marker in transaction buffer.
   4. Append a committed marker in the topic partition.
   5. Append a committed marker int the transaction buffer.
   
   ### Verifying this change
   
   - [ ] Make sure that the change passes the CI checks.
   
   This change added tests
   
   ### Does this pull request potentially affect one of the following parts:
   
   *If `yes` was chosen, please highlight the changes*
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API: (yes)
     - The schema: (no)
     - The default values of configurations: (no)
     - The wire protocol: (yes)
     - The rest endpoints: (no)
     - The admin cli options: (no)
     - Anything that affects deployment: (don't know)
   
   ### Documentation
   
     - Does this pull request introduce a new feature? (yes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaoran10 commented on pull request #7552: [WIP] Produce transaction messages integration

Posted by GitBox <gi...@apache.org>.
gaoran10 commented on pull request #7552:
URL: https://github.com/apache/pulsar/pull/7552#issuecomment-661021535


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaoran10 commented on a change in pull request #7552: [Transaction] Produce transaction messages and commit

Posted by GitBox <gi...@apache.org>.
gaoran10 commented on a change in pull request #7552:
URL: https://github.com/apache/pulsar/pull/7552#discussion_r462457051



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
##########
@@ -1674,40 +1687,134 @@ protected void handleAddPartitionToTxn(PulsarApi.CommandAddPartitionToTxn comman
 
     @Override
     protected void handleEndTxn(PulsarApi.CommandEndTxn command) {
-        TxnStatus newStatus = null;
+        TxnID txnID = new TxnID(command.getTxnidMostBits(), command.getTxnidLeastBits());
+        final TxnStatus newStatus;
         switch (command.getTxnAction()) {
             case COMMIT:
                 newStatus = TxnStatus.COMMITTING;
                 break;
             case ABORT:
                 newStatus = TxnStatus.ABORTING;
                 break;
+            default:
+                UnsupportedTxnActionException exception =
+                        new UnsupportedTxnActionException(txnID, command.getTxnAction());
+                log.error(exception.getMessage());
+                ctx.writeAndFlush(Commands.newEndTxnResponse(command.getRequestId(), txnID.getMostSigBits(),
+                        BrokerServiceException.getClientErrorCode(exception), exception.getMessage()));
+                return;
         }
-        TxnID txnID = new TxnID(command.getTxnidMostBits(), command.getTxnidLeastBits());
         if (log.isDebugEnabled()) {
             log.debug("Receive end txn by {} request {} from {} with txnId {}", newStatus, command.getRequestId(), remoteAddress, txnID);
         }
+        final long requestId = command.getRequestId();
         service.pulsar().getTransactionMetadataStoreService().updateTxnStatus(txnID, newStatus, TxnStatus.OPEN)
-            .whenComplete((v, ex) -> {
-                if (ex == null) {
-                    if (log.isDebugEnabled()) {
-                        log.debug("Send response success for end txn request {}", command.getRequestId());
-                    }
-                    ctx.writeAndFlush(Commands.newEndTxnResponse(command.getRequestId(),
+            .thenRun(() -> {
+                if (log.isDebugEnabled()) {
+                    log.debug("Send response success for end txn request {}", command.getRequestId());
+                }
+                updateTBStatus(txnID, newStatus).thenRun(() -> {
+                    service.pulsar().getTransactionMetadataStoreService()
+                            .updateTxnStatus(txnID, TxnStatus.COMMITTED, TxnStatus.COMMITTING);
+                    ctx.writeAndFlush(Commands.newEndTxnResponse(requestId,
                             txnID.getLeastSigBits(), txnID.getMostSigBits()));
+                }).exceptionally(throwable -> {
+                    log.error("Failed to get txn `" + txnID + "` txnMeta.", throwable);
+                    ctx.writeAndFlush(Commands.newEndTxnResponse(requestId, txnID.getMostSigBits(),
+                            BrokerServiceException.getClientErrorCode(throwable), throwable.getMessage()));
+                    return null;
+                });
+            }).exceptionally(throwable -> {
+                if (log.isDebugEnabled()) {
+                    log.debug("Send response error for end txn request {}", command.getRequestId());
+                }
+                ctx.writeAndFlush(Commands.newEndTxnResponse(command.getRequestId(), txnID.getMostSigBits(),
+                        BrokerServiceException.getClientErrorCode(throwable), throwable.getMessage()));
+                return null;
+        });

Review comment:
       Yes, I'll fix this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaoran10 removed a comment on pull request #7552: [WIP] Produce transaction messages integration

Posted by GitBox <gi...@apache.org>.
gaoran10 removed a comment on pull request #7552:
URL: https://github.com/apache/pulsar/pull/7552#issuecomment-661021535


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaoran10 commented on pull request #7552: [Transaction] Produce transaction messages and commit

Posted by GitBox <gi...@apache.org>.
gaoran10 commented on pull request #7552:
URL: https://github.com/apache/pulsar/pull/7552#issuecomment-669659058


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaoran10 commented on pull request #7552: [Transaction] Produce transaction messages and commit

Posted by GitBox <gi...@apache.org>.
gaoran10 commented on pull request #7552:
URL: https://github.com/apache/pulsar/pull/7552#issuecomment-661490478


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on a change in pull request #7552: [Transaction] Produce transaction messages and commit

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #7552:
URL: https://github.com/apache/pulsar/pull/7552#discussion_r461603349



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
##########
@@ -1212,6 +1221,33 @@ protected void handleSend(CommandSend send, ByteBuf headersAndPayload) {
 
         startSendOperation(producer, headersAndPayload.readableBytes(), send.getNumMessages());
 
+        final long produceId = producer.getProducerId();

Review comment:
       It's better to handle the transaction message in the topic, not the server cnx. 

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
##########
@@ -549,6 +552,9 @@ public Boolean get() {
                 transactionMetadataStoreService = new TransactionMetadataStoreService(TransactionMetadataStoreProvider
                         .newProvider(config.getTransactionMetadataStoreProviderClassName()), this);
                 transactionMetadataStoreService.start();
+
+                transactionBufferProvider = TransactionBufferProvider.newProvider(
+                        PersistentTransactionBufferProvider.class.getName());

Review comment:
       It's better to config in the broker.conf.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaoran10 commented on pull request #7552: [Transaction] Produce transaction messages and commit

Posted by GitBox <gi...@apache.org>.
gaoran10 commented on pull request #7552:
URL: https://github.com/apache/pulsar/pull/7552#issuecomment-670845279


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaoran10 commented on a change in pull request #7552: [Transaction] Produce transaction messages and commit

Posted by GitBox <gi...@apache.org>.
gaoran10 commented on a change in pull request #7552:
URL: https://github.com/apache/pulsar/pull/7552#discussion_r461812971



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
##########
@@ -549,6 +552,9 @@ public Boolean get() {
                 transactionMetadataStoreService = new TransactionMetadataStoreService(TransactionMetadataStoreProvider
                         .newProvider(config.getTransactionMetadataStoreProviderClassName()), this);
                 transactionMetadataStoreService.start();
+
+                transactionBufferProvider = TransactionBufferProvider.newProvider(
+                        PersistentTransactionBufferProvider.class.getName());

Review comment:
       Ok, I'll fix it.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
##########
@@ -1212,6 +1221,33 @@ protected void handleSend(CommandSend send, ByteBuf headersAndPayload) {
 
         startSendOperation(producer, headersAndPayload.readableBytes(), send.getNumMessages());
 
+        final long produceId = producer.getProducerId();

Review comment:
       Yes, that's more reasonable.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaoran10 commented on pull request #7552: [WIP] Produce transaction messages integration

Posted by GitBox <gi...@apache.org>.
gaoran10 commented on pull request #7552:
URL: https://github.com/apache/pulsar/pull/7552#issuecomment-661158943


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaoran10 commented on pull request #7552: [WIP] Produce transaction messages integration

Posted by GitBox <gi...@apache.org>.
gaoran10 commented on pull request #7552:
URL: https://github.com/apache/pulsar/pull/7552#issuecomment-661069848


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaoran10 commented on pull request #7552: [WIP] Produce transaction messages integration

Posted by GitBox <gi...@apache.org>.
gaoran10 commented on pull request #7552:
URL: https://github.com/apache/pulsar/pull/7552#issuecomment-660787709


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaoran10 commented on pull request #7552: [WIP] Produce transaction messages integration

Posted by GitBox <gi...@apache.org>.
gaoran10 commented on pull request #7552:
URL: https://github.com/apache/pulsar/pull/7552#issuecomment-661100267


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on a change in pull request #7552: [Transaction] Produce transaction messages and commit

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #7552:
URL: https://github.com/apache/pulsar/pull/7552#discussion_r462299542



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
##########
@@ -1674,40 +1687,134 @@ protected void handleAddPartitionToTxn(PulsarApi.CommandAddPartitionToTxn comman
 
     @Override
     protected void handleEndTxn(PulsarApi.CommandEndTxn command) {
-        TxnStatus newStatus = null;
+        TxnID txnID = new TxnID(command.getTxnidMostBits(), command.getTxnidLeastBits());
+        final TxnStatus newStatus;
         switch (command.getTxnAction()) {
             case COMMIT:
                 newStatus = TxnStatus.COMMITTING;
                 break;
             case ABORT:
                 newStatus = TxnStatus.ABORTING;
                 break;
+            default:
+                UnsupportedTxnActionException exception =
+                        new UnsupportedTxnActionException(txnID, command.getTxnAction());
+                log.error(exception.getMessage());
+                ctx.writeAndFlush(Commands.newEndTxnResponse(command.getRequestId(), txnID.getMostSigBits(),
+                        BrokerServiceException.getClientErrorCode(exception), exception.getMessage()));
+                return;
         }
-        TxnID txnID = new TxnID(command.getTxnidMostBits(), command.getTxnidLeastBits());
         if (log.isDebugEnabled()) {
             log.debug("Receive end txn by {} request {} from {} with txnId {}", newStatus, command.getRequestId(), remoteAddress, txnID);
         }
+        final long requestId = command.getRequestId();
         service.pulsar().getTransactionMetadataStoreService().updateTxnStatus(txnID, newStatus, TxnStatus.OPEN)
-            .whenComplete((v, ex) -> {
-                if (ex == null) {
-                    if (log.isDebugEnabled()) {
-                        log.debug("Send response success for end txn request {}", command.getRequestId());
-                    }
-                    ctx.writeAndFlush(Commands.newEndTxnResponse(command.getRequestId(),
+            .thenRun(() -> {
+                if (log.isDebugEnabled()) {
+                    log.debug("Send response success for end txn request {}", command.getRequestId());
+                }
+                updateTBStatus(txnID, newStatus).thenRun(() -> {
+                    service.pulsar().getTransactionMetadataStoreService()
+                            .updateTxnStatus(txnID, TxnStatus.COMMITTED, TxnStatus.COMMITTING);
+                    ctx.writeAndFlush(Commands.newEndTxnResponse(requestId,
                             txnID.getLeastSigBits(), txnID.getMostSigBits()));
+                }).exceptionally(throwable -> {
+                    log.error("Failed to get txn `" + txnID + "` txnMeta.", throwable);
+                    ctx.writeAndFlush(Commands.newEndTxnResponse(requestId, txnID.getMostSigBits(),
+                            BrokerServiceException.getClientErrorCode(throwable), throwable.getMessage()));
+                    return null;
+                });
+            }).exceptionally(throwable -> {
+                if (log.isDebugEnabled()) {
+                    log.debug("Send response error for end txn request {}", command.getRequestId());
+                }
+                ctx.writeAndFlush(Commands.newEndTxnResponse(command.getRequestId(), txnID.getMostSigBits(),
+                        BrokerServiceException.getClientErrorCode(throwable), throwable.getMessage()));
+                return null;
+        });

Review comment:
       We should handle it in the TC and TC should retry to make sure the TB can handle success.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui merged pull request #7552: [Transaction] Produce transaction messages and commit

Posted by GitBox <gi...@apache.org>.
codelipenghui merged pull request #7552:
URL: https://github.com/apache/pulsar/pull/7552


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaoran10 commented on pull request #7552: [Transaction] Produce transaction messages and commit

Posted by GitBox <gi...@apache.org>.
gaoran10 commented on pull request #7552:
URL: https://github.com/apache/pulsar/pull/7552#issuecomment-662268700


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaoran10 commented on pull request #7552: [Transaction] Produce transaction messages and commit

Posted by GitBox <gi...@apache.org>.
gaoran10 commented on pull request #7552:
URL: https://github.com/apache/pulsar/pull/7552#issuecomment-664160181


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaoran10 commented on pull request #7552: [Transaction] Produce transaction messages and commit

Posted by GitBox <gi...@apache.org>.
gaoran10 commented on pull request #7552:
URL: https://github.com/apache/pulsar/pull/7552#issuecomment-661554563


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaoran10 commented on pull request #7552: [Transaction] Produce transaction messages and commit

Posted by GitBox <gi...@apache.org>.
gaoran10 commented on pull request #7552:
URL: https://github.com/apache/pulsar/pull/7552#issuecomment-669649488


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaoran10 commented on pull request #7552: [Transaction] Produce transaction messages and commit

Posted by GitBox <gi...@apache.org>.
gaoran10 commented on pull request #7552:
URL: https://github.com/apache/pulsar/pull/7552#issuecomment-661485845


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaoran10 commented on pull request #7552: [Transaction] Produce transaction messages and commit

Posted by GitBox <gi...@apache.org>.
gaoran10 commented on pull request #7552:
URL: https://github.com/apache/pulsar/pull/7552#issuecomment-665397027






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] sijie commented on pull request #7552: [Transaction] Produce transaction messages and commit

Posted by GitBox <gi...@apache.org>.
sijie commented on pull request #7552:
URL: https://github.com/apache/pulsar/pull/7552#issuecomment-669835362


   @gaoran10 Can you rebase?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaoran10 commented on pull request #7552: [WIP] Produce transaction messages integration

Posted by GitBox <gi...@apache.org>.
gaoran10 commented on pull request #7552:
URL: https://github.com/apache/pulsar/pull/7552#issuecomment-661102178


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org