You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/09/29 14:27:30 UTC

[GitHub] [pulsar] congbobo184 opened a new pull request #8161: Subscription response implement

congbobo184 opened a new pull request #8161:
URL: https://github.com/apache/pulsar/pull/8161


   ## Motivation
   - subscription response implement
   - add the tracker for response timeout
   
   ## implement
   
   ### Verifying this change
   Add the tests for it
   
   Does this pull request potentially affect one of the following parts:
   If yes was chosen, please highlight the changes
   
   Dependencies (does it add or upgrade a dependency): (no)
   The public API: (no)
   The schema: (no)
   The default values of configurations: (no)
   The wire protocol: (yes)
   The rest endpoints: (no)
   The admin cli options: (no)
   Anything that affects deployment: (no)
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 commented on pull request #8161: Ack response implement

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on pull request #8161:
URL: https://github.com/apache/pulsar/pull/8161#issuecomment-705311576


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on a change in pull request #8161: Ack response implement

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #8161:
URL: https://github.com/apache/pulsar/pull/8161#discussion_r504367205



##########
File path: pulsar-common/src/main/proto/PulsarApi.proto
##########
@@ -201,6 +201,8 @@ enum ServerError {
     TransactionCoordinatorNotFound = 20; // Transaction coordinator not found error
     InvalidTxnStatus = 21; // Invalid txn status error
     NotAllowedError = 22; // Not allowed error
+
+    TransactionConflictException = 23; // Ack with transaction conflict

Review comment:
       ```suggestion
       TransactionConflict = 23; // Transaction conflict
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on pull request #8161: Ack response implement

Posted by GitBox <gi...@apache.org>.
eolivelli commented on pull request #8161:
URL: https://github.com/apache/pulsar/pull/8161#issuecomment-707083639


   @congbobo184 is the new client able to work with Pulsar 2.6.1 ? it looks like it will expect an ack from the server now.
   Probably is just only a matter of documenting better your change.
   
   Can you please describe more this change in the description of the PR ?
   - What is the problem that the PR is solving
   - What is the design of the solution
   - Compatibility with older clients and servers
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on pull request #8161: Ack response implement

Posted by GitBox <gi...@apache.org>.
eolivelli commented on pull request #8161:
URL: https://github.com/apache/pulsar/pull/8161#issuecomment-707548201


   Thank you @congbobo184 
   Now it is clear to me.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 commented on pull request #8161: Ack response implement

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on pull request #8161:
URL: https://github.com/apache/pulsar/pull/8161#issuecomment-708194163


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui merged pull request #8161: Ack response implement

Posted by GitBox <gi...@apache.org>.
codelipenghui merged pull request #8161:
URL: https://github.com/apache/pulsar/pull/8161


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 commented on pull request #8161: Ack response implement

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on pull request #8161:
URL: https://github.com/apache/pulsar/pull/8161#issuecomment-701430174


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on a change in pull request #8161: Ack response implement

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #8161:
URL: https://github.com/apache/pulsar/pull/8161#discussion_r502878102



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -2343,6 +2361,102 @@ private void removeChunkMessage(String msgUUID, ChunkedMessageCtx chunkedMsgCtx,
         pendingChunckedMessageCount--;
     }
 
+    private CompletableFuture<Void> doAcknowledgeForResponse(MessageId messageId, AckType ackType,
+                                                             ValidationError validationError,
+                                                             Map<String, Long> properties, TxnID txnID) {
+        CompletableFuture<Void> callBack = new CompletableFuture<>();
+        BitSetRecyclable bitSetRecyclable = null;
+        long ledgerId;
+        long entryId;
+        if (messageId instanceof BatchMessageIdImpl) {
+            BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId;
+            bitSetRecyclable = BitSetRecyclable.create();
+            ledgerId = batchMessageId.getLedgerId();
+            entryId = batchMessageId.getEntryId();
+            if (ackType == AckType.Cumulative) {
+                batchMessageId.ackCumulative();
+                bitSetRecyclable.set(0, batchMessageId.getAcker().getBatchSize());

Review comment:
       The transaction message is a couple of batch messages, make sure can handle this well, @gaoran10 Please help double-check.

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -1037,6 +1049,12 @@ private void closeConsumerTasks() {
             possibleSendToDeadLetterTopicMessages.clear();
         }
 
+        if (!ackRequests.isEmpty()) {
+            ackRequests.forEach((key, value) -> value.callback
+                    .completeExceptionally(new TransactionConflictException("Consumer has closed!")));

Review comment:
       The TransactionConflictException does not make sense here. It's better to add a MessageAcknowledgeException.

##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerAckResponseTest.java
##########
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import lombok.Cleanup;
+
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.transaction.TransactionImpl;
+
+import org.apache.pulsar.transaction.common.exception.TransactionConflictException;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.Mockito.*;
+
+public class ConsumerAckResponseTest extends ProducerConsumerBase {
+
+    private final static TransactionImpl transaction = mock(TransactionImpl.class);
+
+    @BeforeClass
+    public void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+        doReturn(1L).when(transaction).getTxnIdLeastBits();
+        doReturn(1L).when(transaction).getTxnIdMostBits();
+        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+        doReturn(completableFuture).when(transaction).registerAckOp(any());
+        doNothing().when(transaction).registerAckedTopic(any(), any());
+
+        Thread.sleep(1000 * 3);
+    }
+
+    @AfterClass
+    public void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testAckResponse() throws PulsarClientException, InterruptedException, ExecutionException {
+        String topic = "testAckResponse";
+        @Cleanup
+        Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32)
+                .topic(topic)
+                .enableBatching(false)
+                .create();
+        @Cleanup
+        ConsumerImpl<Integer> consumer = (ConsumerImpl<Integer>) pulsarClient.newConsumer(Schema.INT32)
+                .topic(topic)
+                .subscriptionName("sub")
+                .subscriptionType(SubscriptionType.Shared)
+                .ackTimeout(1, TimeUnit.SECONDS)
+                .subscribe();
+        producer.send(1);
+        producer.send(2);
+        try {
+            consumer.acknowledgeAsync(new MessageIdImpl(1, 1, 1), transaction).get();
+        } catch (InterruptedException | ExecutionException e) {
+            Assert.assertTrue(e.getCause() instanceof TransactionConflictException);

Review comment:
       Why message id (1,1,1) conflict? 

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -2343,6 +2361,102 @@ private void removeChunkMessage(String msgUUID, ChunkedMessageCtx chunkedMsgCtx,
         pendingChunckedMessageCount--;
     }
 
+    private CompletableFuture<Void> doAcknowledgeForResponse(MessageId messageId, AckType ackType,
+                                                             ValidationError validationError,
+                                                             Map<String, Long> properties, TxnID txnID) {
+        CompletableFuture<Void> callBack = new CompletableFuture<>();
+        BitSetRecyclable bitSetRecyclable = null;
+        long ledgerId;
+        long entryId;
+        if (messageId instanceof BatchMessageIdImpl) {
+            BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId;
+            bitSetRecyclable = BitSetRecyclable.create();
+            ledgerId = batchMessageId.getLedgerId();
+            entryId = batchMessageId.getEntryId();
+            if (ackType == AckType.Cumulative) {
+                batchMessageId.ackCumulative();
+                bitSetRecyclable.set(0, batchMessageId.getAcker().getBatchSize());
+                bitSetRecyclable.clear(0, batchMessageId.getBatchIndex() + 1);
+            } else {
+                batchMessageId.ackIndividual();
+                bitSetRecyclable.set(0, batchMessageId.getAcker().getBatchSize());

Review comment:
       Same as above comment.

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -2343,6 +2361,102 @@ private void removeChunkMessage(String msgUUID, ChunkedMessageCtx chunkedMsgCtx,
         pendingChunckedMessageCount--;
     }
 
+    private CompletableFuture<Void> doAcknowledgeForResponse(MessageId messageId, AckType ackType,
+                                                             ValidationError validationError,
+                                                             Map<String, Long> properties, TxnID txnID) {
+        CompletableFuture<Void> callBack = new CompletableFuture<>();
+        BitSetRecyclable bitSetRecyclable = null;
+        long ledgerId;
+        long entryId;
+        if (messageId instanceof BatchMessageIdImpl) {
+            BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId;
+            bitSetRecyclable = BitSetRecyclable.create();
+            ledgerId = batchMessageId.getLedgerId();
+            entryId = batchMessageId.getEntryId();
+            if (ackType == AckType.Cumulative) {
+                batchMessageId.ackCumulative();
+                bitSetRecyclable.set(0, batchMessageId.getAcker().getBatchSize());
+                bitSetRecyclable.clear(0, batchMessageId.getBatchIndex() + 1);
+            } else {
+                batchMessageId.ackIndividual();
+                bitSetRecyclable.set(0, batchMessageId.getAcker().getBatchSize());
+                bitSetRecyclable.clear(batchMessageId.getBatchIndex());
+            }
+        } else {
+            MessageIdImpl singleMessage = (MessageIdImpl) messageId;
+            ledgerId = singleMessage.getLedgerId();
+            entryId = singleMessage.getEntryId();
+        }
+        long requestId = client.newRequestId();
+        ByteBuf cmd = Commands.newAck(consumerId, ledgerId, entryId,
+                bitSetRecyclable, ackType,
+                validationError, properties, txnID.getLeastSigBits(), txnID.getMostSigBits(), requestId);
+        OpForAckCallBack op = OpForAckCallBack.create(cmd, callBack, messageId,
+                new TxnID(txnID.getMostSigBits(), txnID.getLeastSigBits()));
+        ackRequests.put(requestId, op);
+        unAckedMessageTracker.remove(messageId);
+        cmd.retain();
+        cnx().ctx().writeAndFlush(cmd, cnx().ctx().voidPromise());
+        return callBack;
+    }
+
+    protected void ackResponse(CommandAckResponse ackResponse) {
+        checkArgument(ackResponse.getRequestId() >= 0);
+        OpForAckCallBack callBackOp = ackRequests.remove(ackResponse.getRequestId());
+        if (callBackOp == null || callBackOp.callback.isDone()) {
+            log.error("Ack request has been handled requestId : {}", ackResponse.getRequestId());

Review comment:
       Use info log level here 

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -2343,6 +2361,102 @@ private void removeChunkMessage(String msgUUID, ChunkedMessageCtx chunkedMsgCtx,
         pendingChunckedMessageCount--;
     }
 
+    private CompletableFuture<Void> doAcknowledgeForResponse(MessageId messageId, AckType ackType,
+                                                             ValidationError validationError,
+                                                             Map<String, Long> properties, TxnID txnID) {
+        CompletableFuture<Void> callBack = new CompletableFuture<>();
+        BitSetRecyclable bitSetRecyclable = null;
+        long ledgerId;
+        long entryId;
+        if (messageId instanceof BatchMessageIdImpl) {
+            BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId;
+            bitSetRecyclable = BitSetRecyclable.create();
+            ledgerId = batchMessageId.getLedgerId();
+            entryId = batchMessageId.getEntryId();
+            if (ackType == AckType.Cumulative) {
+                batchMessageId.ackCumulative();
+                bitSetRecyclable.set(0, batchMessageId.getAcker().getBatchSize());
+                bitSetRecyclable.clear(0, batchMessageId.getBatchIndex() + 1);
+            } else {
+                batchMessageId.ackIndividual();
+                bitSetRecyclable.set(0, batchMessageId.getAcker().getBatchSize());
+                bitSetRecyclable.clear(batchMessageId.getBatchIndex());
+            }
+        } else {
+            MessageIdImpl singleMessage = (MessageIdImpl) messageId;
+            ledgerId = singleMessage.getLedgerId();
+            entryId = singleMessage.getEntryId();
+        }
+        long requestId = client.newRequestId();
+        ByteBuf cmd = Commands.newAck(consumerId, ledgerId, entryId,
+                bitSetRecyclable, ackType,
+                validationError, properties, txnID.getLeastSigBits(), txnID.getMostSigBits(), requestId);
+        OpForAckCallBack op = OpForAckCallBack.create(cmd, callBack, messageId,
+                new TxnID(txnID.getMostSigBits(), txnID.getLeastSigBits()));
+        ackRequests.put(requestId, op);
+        unAckedMessageTracker.remove(messageId);
+        cmd.retain();
+        cnx().ctx().writeAndFlush(cmd, cnx().ctx().voidPromise());
+        return callBack;
+    }
+
+    protected void ackResponse(CommandAckResponse ackResponse) {
+        checkArgument(ackResponse.getRequestId() >= 0);
+        OpForAckCallBack callBackOp = ackRequests.remove(ackResponse.getRequestId());
+        if (callBackOp == null || callBackOp.callback.isDone()) {
+            log.error("Ack request has been handled requestId : {}", ackResponse.getRequestId());
+        } else if (!ackResponse.hasError()) {
+            callBackOp.callback.complete(null);
+            if (log.isDebugEnabled()) {
+                log.debug("MessageId : {} has ack by TxnId : {}", callBackOp.messageId.getLedgerId() + ":"
+                        + callBackOp.messageId.getEntryId(), callBackOp.txnID.toString());
+            }
+            callBackOp.recycle();
+        } else {
+            callBackOp.callback.completeExceptionally(new TransactionConflictException(ackResponse.getMessage()));

Review comment:
       We should get the specific exception from the `serverError`, should not be throw TransactionConflictException for all errors.
   

##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerAckResponseTest.java
##########
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import lombok.Cleanup;
+
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.transaction.TransactionImpl;
+
+import org.apache.pulsar.transaction.common.exception.TransactionConflictException;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.Mockito.*;

Review comment:
       Avoid use import .*




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 commented on a change in pull request #8161: Ack response implement

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on a change in pull request #8161:
URL: https://github.com/apache/pulsar/pull/8161#discussion_r502905370



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerAckResponseTest.java
##########
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import lombok.Cleanup;
+
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.transaction.TransactionImpl;
+
+import org.apache.pulsar.transaction.common.exception.TransactionConflictException;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.Mockito.*;
+
+public class ConsumerAckResponseTest extends ProducerConsumerBase {
+
+    private final static TransactionImpl transaction = mock(TransactionImpl.class);
+
+    @BeforeClass
+    public void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+        doReturn(1L).when(transaction).getTxnIdLeastBits();
+        doReturn(1L).when(transaction).getTxnIdMostBits();
+        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+        doReturn(completableFuture).when(transaction).registerAckOp(any());
+        doNothing().when(transaction).registerAckedTopic(any(), any());
+
+        Thread.sleep(1000 * 3);
+    }
+
+    @AfterClass
+    public void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testAckResponse() throws PulsarClientException, InterruptedException, ExecutionException {
+        String topic = "testAckResponse";
+        @Cleanup
+        Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32)
+                .topic(topic)
+                .enableBatching(false)
+                .create();
+        @Cleanup
+        ConsumerImpl<Integer> consumer = (ConsumerImpl<Integer>) pulsarClient.newConsumer(Schema.INT32)
+                .topic(topic)
+                .subscriptionName("sub")
+                .subscriptionType(SubscriptionType.Shared)
+                .ackTimeout(1, TimeUnit.SECONDS)
+                .subscribe();
+        producer.send(1);
+        producer.send(2);
+        try {
+            consumer.acknowledgeAsync(new MessageIdImpl(1, 1, 1), transaction).get();
+        } catch (InterruptedException | ExecutionException e) {
+            Assert.assertTrue(e.getCause() instanceof TransactionConflictException);

Review comment:
       ledgerId start with 3.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 commented on pull request #8161: Ack response implement

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on pull request #8161:
URL: https://github.com/apache/pulsar/pull/8161#issuecomment-708193021


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 removed a comment on pull request #8161: Ack response implement

Posted by GitBox <gi...@apache.org>.
congbobo184 removed a comment on pull request #8161:
URL: https://github.com/apache/pulsar/pull/8161#issuecomment-708193021


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 commented on pull request #8161: Ack response implement

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on pull request #8161:
URL: https://github.com/apache/pulsar/pull/8161#issuecomment-707074946


   @eolivelli I think added protocol fields are all optional type, so it doesn't need to think about compatibility. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 commented on pull request #8161: Ack response implement

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on pull request #8161:
URL: https://github.com/apache/pulsar/pull/8161#issuecomment-706857576


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 commented on pull request #8161: Ack response implement

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on pull request #8161:
URL: https://github.com/apache/pulsar/pull/8161#issuecomment-707447784


   @eolivelli This PR is for transaction ack and don't change any client API. In normal ack, we don't change any logical. I change the description of the PR. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 commented on pull request #8161: Ack response implement

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on pull request #8161:
URL: https://github.com/apache/pulsar/pull/8161#issuecomment-701229292


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 edited a comment on pull request #8161: Ack response implement

Posted by GitBox <gi...@apache.org>.
congbobo184 edited a comment on pull request #8161:
URL: https://github.com/apache/pulsar/pull/8161#issuecomment-707074946


   @eolivelli I think the added protocol fields are all optional type, so it doesn't need to think about compatibility. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 commented on pull request #8161: Ack response implement

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on pull request #8161:
URL: https://github.com/apache/pulsar/pull/8161#issuecomment-704866079


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org