You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/11/18 14:04:36 UTC

[GitHub] [pulsar] codelipenghui commented on a change in pull request #8612: [Transaction] Add the batch size in transaction ack command.

codelipenghui commented on a change in pull request #8612:
URL: https://github.com/apache/pulsar/pull/8612#discussion_r526092501



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
##########
@@ -391,7 +391,7 @@ public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
                                                                     List<MutablePair<PositionImpl, Long>> positions) {
         if (pendingAckHandle == null) {
             return FutureUtil.failedFuture(
-                    new TransactionConflictException("Broker does't support Transaction pending ack!"));
+                    new NotAllowedException("Broker does't support Transaction pending ack!"));

Review comment:
       ```suggestion
                       new NotAllowedException("The transaction is not enabled"));
   ```

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
##########
@@ -435,9 +435,6 @@ void doUnsubscribe(final long requestId) {
     private CompletableFuture<Void> individualAckNormal(CommandAck ack, Map<String,Long> properties) {
         List<Position> positionsAcked = new ArrayList<>();
         List<PositionImpl> checkBatchPositions = null;

Review comment:
       This line can be deleted?

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -2416,15 +2418,16 @@ private void removeChunkMessage(String msgUUID, ChunkedMessageCtx chunkedMsgCtx,
                 bitSetRecyclable.set(0, batchMessageId.getBatchSize());
                 bitSetRecyclable.clear(batchMessageId.getBatchIndex());
             }
+            cmd = Commands.newAck(consumerId, ledgerId, entryId, bitSetRecyclable, ackType, validationError, properties,
+                    txnID.getLeastSigBits(), txnID.getMostSigBits(), requestId, batchMessageId.getBatchSize());

Review comment:
       Only the txn is not null needs to set the batch size.

##########
File path: pulsar-common/src/main/proto/PulsarApi.proto
##########
@@ -60,6 +60,7 @@ message MessageIdData {
     optional int32 partition = 3 [default = -1];
     optional int32 batch_index = 4 [default = -1];
     repeated int64 ack_set = 5;
+    optional uint64 batch_size = 6;

Review comment:
       ```suggestion
       optional int32 batch_size = 6;
   ```
   
   int32 is enough for batch size.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org