You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/06/24 01:45:37 UTC

[GitHub] [pulsar] Technoboy- opened a new pull request, #16205: Fix compaction subscription acknowledge Marker msg issue.

Technoboy- opened a new pull request, #16205:
URL: https://github.com/apache/pulsar/pull/16205

   ### Motivation
   
   ```
   06:03:58.778 [broker-topic-workers-OrderedScheduler-4-0] ERROR org.apache.bookkeeper.common.util.SafeRunnable - Unexpected throwable caught
   java.lang.IllegalArgumentException: null
           at com.google.common.base.Preconditions.checkArgument(Preconditions.java:128)
           at org.apache.pulsar.broker.service.persistent.CompactorSubscription.acknowledgeMessage(CompactorSubscription.java:61) 
           at org.apache.pulsar.broker.service.AbstractBaseDispatcher.filterEntriesForConsumer(AbstractBaseDispatcher.java:154) 
           at org.apache.pulsar.broker.service.AbstractBaseDispatcher.filterEntriesForConsumer(AbstractBaseDispatcher.java:103)
           at org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer.internalReadEntriesComplete(PersistentDispatcherSingleActiveConsumer.java:203) 
           at org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer.lambda$readEntriesComplete$1(PersistentDispatcherSingleActiveConsumer.java:146) 
           at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32) 
           at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) 
           at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
           at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
           at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) [?:?]
           at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
           at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
           at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) 
           at java.lang.Thread.run(Thread.java:829) [?:?]
   ```
   
   If the topic enabled transaction or replicated cluster,  the original topic will write some Marker msg. Then if enabled compaction, it will trigger CompactionSubscription to acknowledge failure due to inconsistent `AckType`:
   
   line-178 will ack as `Individual`:
   https://github.com/apache/pulsar/blob/c871d2433acad7f6fc6fd37c32102fc7578b8c2d/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java#L168-L181
   
   But CompactorSubscription only supports `Cumulative `:
   https://github.com/apache/pulsar/blob/c871d2433acad7f6fc6fd37c32102fc7578b8c2d/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java#L60-L64
   
   Above is the root cause. 
   
   Then if occurs this error, it may cause the broker OOM, because some entries are not released. 
   
   
   ### Modification
   
   - Check subscription type when acknowledge.
   
   ### Verifying this change
   
   - [x] Add a new test to cover this change.
   
   ### Documentation
   
   - [x] `doc-not-needed` 
   (Please explain why)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on pull request #16205: [fix][broker] Fix compaction subscription acknowledge Marker msg issue.

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on PR #16205:
URL: https://github.com/apache/pulsar/pull/16205#issuecomment-1198874486

   @Technoboy- Could you help cherry-pick this PR to branch-2.8? It's weird that the `createNonPartitionedTopic` call failed in my local env.
   
   ```
   12:37:49.693 [AsyncHttpClient-43-1] WARN  org.apache.pulsar.client.admin.internal.BaseResource - [http://localhost:53870/admin/persistent/my-property/use/my-ns/testWriteMarker-caf36fc4-1e8c-45f8-a49f-674b1d145a19] Failed to perform http put request: javax.ws.rs.NotAllowedException: HTTP 405 Method Not Allowed
   
   org.apache.pulsar.client.admin.PulsarAdminException$NotAllowedException: HTTP 405 Method Not Allowed
   
   	at org.apache.pulsar.client.admin.internal.BaseResource.getApiException(BaseResource.java:232)
   	at org.apache.pulsar.client.admin.internal.BaseResource$1.failed(BaseResource.java:130)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] HQebupt commented on pull request #16205: Fix compaction subscription acknowledge Marker msg issue.

Posted by GitBox <gi...@apache.org>.
HQebupt commented on PR #16205:
URL: https://github.com/apache/pulsar/pull/16205#issuecomment-1166438206

   Nice catch! 👍 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on pull request #16205: [fix][broker] Fix compaction subscription acknowledge Marker msg issue.

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on PR #16205:
URL: https://github.com/apache/pulsar/pull/16205#issuecomment-1202575666

   Move the `release/2.8.4` label to https://github.com/apache/pulsar/pull/16918


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #16205: Fix compaction subscription acknowledge Marker msg issue.

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #16205:
URL: https://github.com/apache/pulsar/pull/16205#discussion_r905675920


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java:
##########
@@ -234,6 +236,12 @@ && trackDelayedDelivery(entry.getLedgerId(), entry.getEntryId(), msgMetadata)) {
         return totalEntries;
     }
 
+    private void doAcknowledgeMessage(List<Position> positions, AckType ackType, Map<String, Long> properties) {

Review Comment:
   Maybe we don't need to use `List<Position> positions` here, looks like we don't have the condition to auto-ack with cumulative in the filter.



##########
pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java:
##########
@@ -1678,4 +1689,62 @@ public void testHealthCheckTopicNotCompacted() {
         producer1.close();
         producer2.close();
     }
+
+    @Test(timeOut = 60000)
+    public void testCompactionWithMarker() throws Exception {
+        String namespace = "my-property/use/my-ns";
+        final TopicName dest = TopicName.get(
+                BrokerTestUtil.newUniqueName("persistent://" + namespace + "/testWriteMarker"));
+        admin.topics().createNonPartitionedTopic(dest.toString());
+        @Cleanup
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(dest.toString())
+                .subscriptionName("test-compaction-sub")
+                .subscriptionType(SubscriptionType.Exclusive)
+                .readCompacted(true)
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Latest)
+                .subscribe();
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(dest.toString())
+                .enableBatching(false)
+                .messageRoutingMode(MessageRoutingMode.SinglePartition)
+                .create();
+        producer.send("msg-1".getBytes(StandardCharsets.UTF_8));
+        Optional<Topic> topic = pulsar.getBrokerService().getTopic(dest.toString(), true).join();
+        Assert.assertTrue(topic.isPresent());
+        PersistentTopic persistentTopic = (PersistentTopic) topic.get();
+        Random random = new Random();
+        for (int i = 0; i < 100; i++) {
+            int rad = random.nextInt(3);
+            ByteBuf marker;
+            if (rad == 0) {
+                marker = Markers.newTxnCommitMarker(-1L, 0, i);
+            } else if (rad == 1) {
+                marker = Markers.newTxnAbortMarker(-1L, 0, i);
+            } else {
+                marker = Markers.newReplicatedSubscriptionsSnapshotRequest(UUID.randomUUID().toString(), "r1");
+            }
+            persistentTopic.getManagedLedger().asyncAddEntry(marker, new AsyncCallbacks.AddEntryCallback() {
+                @Override
+                public void addComplete(Position position, ByteBuf entryData, Object ctx) {
+                    //
+                }
+
+                @Override
+                public void addFailed(ManagedLedgerException exception, Object ctx) {
+                    //
+                }
+            }, null);
+            marker.release();

Review Comment:
   Release marker after `asyncAddEntry` success or failure?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- commented on a diff in pull request #16205: Fix compaction subscription acknowledge Marker msg issue.

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on code in PR #16205:
URL: https://github.com/apache/pulsar/pull/16205#discussion_r905695053


##########
pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java:
##########
@@ -1678,4 +1689,62 @@ public void testHealthCheckTopicNotCompacted() {
         producer1.close();
         producer2.close();
     }
+
+    @Test(timeOut = 60000)
+    public void testCompactionWithMarker() throws Exception {
+        String namespace = "my-property/use/my-ns";
+        final TopicName dest = TopicName.get(
+                BrokerTestUtil.newUniqueName("persistent://" + namespace + "/testWriteMarker"));
+        admin.topics().createNonPartitionedTopic(dest.toString());
+        @Cleanup
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(dest.toString())
+                .subscriptionName("test-compaction-sub")
+                .subscriptionType(SubscriptionType.Exclusive)
+                .readCompacted(true)
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Latest)
+                .subscribe();
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(dest.toString())
+                .enableBatching(false)
+                .messageRoutingMode(MessageRoutingMode.SinglePartition)
+                .create();
+        producer.send("msg-1".getBytes(StandardCharsets.UTF_8));
+        Optional<Topic> topic = pulsar.getBrokerService().getTopic(dest.toString(), true).join();
+        Assert.assertTrue(topic.isPresent());
+        PersistentTopic persistentTopic = (PersistentTopic) topic.get();
+        Random random = new Random();
+        for (int i = 0; i < 100; i++) {
+            int rad = random.nextInt(3);
+            ByteBuf marker;
+            if (rad == 0) {
+                marker = Markers.newTxnCommitMarker(-1L, 0, i);
+            } else if (rad == 1) {
+                marker = Markers.newTxnAbortMarker(-1L, 0, i);
+            } else {
+                marker = Markers.newReplicatedSubscriptionsSnapshotRequest(UUID.randomUUID().toString(), "r1");
+            }
+            persistentTopic.getManagedLedger().asyncAddEntry(marker, new AsyncCallbacks.AddEntryCallback() {
+                @Override
+                public void addComplete(Position position, ByteBuf entryData, Object ctx) {
+                    //
+                }
+
+                @Override
+                public void addFailed(ManagedLedgerException exception, Object ctx) {
+                    //
+                }
+            }, null);
+            marker.release();

Review Comment:
   Ah, it will `retain` in `asyncAddEntry`, so it's ok here.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java:
##########
@@ -234,6 +236,12 @@ && trackDelayedDelivery(entry.getLedgerId(), entry.getEntryId(), msgMetadata)) {
         return totalEntries;
     }
 
+    private void doAcknowledgeMessage(List<Position> positions, AckType ackType, Map<String, Long> properties) {

Review Comment:
   updated. Thanks for your review.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #16205: Fix compaction subscription acknowledge Marker msg issue.

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #16205:
URL: https://github.com/apache/pulsar/pull/16205#discussion_r905823708


##########
pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java:
##########
@@ -1678,4 +1689,62 @@ public void testHealthCheckTopicNotCompacted() {
         producer1.close();
         producer2.close();
     }
+
+    @Test(timeOut = 60000)
+    public void testCompactionWithMarker() throws Exception {
+        String namespace = "my-property/use/my-ns";
+        final TopicName dest = TopicName.get(
+                BrokerTestUtil.newUniqueName("persistent://" + namespace + "/testWriteMarker"));
+        admin.topics().createNonPartitionedTopic(dest.toString());
+        @Cleanup
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(dest.toString())
+                .subscriptionName("test-compaction-sub")
+                .subscriptionType(SubscriptionType.Exclusive)
+                .readCompacted(true)
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Latest)
+                .subscribe();
+        Producer<byte[]> producer = pulsarClient.newProducer()

Review Comment:
   Should we close the producer?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui merged pull request #16205: Fix compaction subscription acknowledge Marker msg issue.

Posted by GitBox <gi...@apache.org>.
codelipenghui merged PR #16205:
URL: https://github.com/apache/pulsar/pull/16205


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- commented on a diff in pull request #16205: Fix compaction subscription acknowledge Marker msg issue.

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on code in PR #16205:
URL: https://github.com/apache/pulsar/pull/16205#discussion_r906029239


##########
pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java:
##########
@@ -1678,4 +1689,62 @@ public void testHealthCheckTopicNotCompacted() {
         producer1.close();
         producer2.close();
     }
+
+    @Test(timeOut = 60000)
+    public void testCompactionWithMarker() throws Exception {
+        String namespace = "my-property/use/my-ns";
+        final TopicName dest = TopicName.get(
+                BrokerTestUtil.newUniqueName("persistent://" + namespace + "/testWriteMarker"));
+        admin.topics().createNonPartitionedTopic(dest.toString());
+        @Cleanup
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(dest.toString())
+                .subscriptionName("test-compaction-sub")
+                .subscriptionType(SubscriptionType.Exclusive)
+                .readCompacted(true)
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Latest)
+                .subscribe();
+        Producer<byte[]> producer = pulsarClient.newProducer()

Review Comment:
   Ah, yes. 
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] github-actions[bot] commented on pull request #16205: Fix compaction subscription acknowledge Marker msg issue.

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #16205:
URL: https://github.com/apache/pulsar/pull/16205#issuecomment-1165089379

   @Technoboy- Please provide a correct documentation label for your PR.
   Instructions see [Pulsar Documentation Label Guide](https://docs.google.com/document/d/1Qw7LHQdXWBW9t2-r-A7QdFDBwmZh6ytB4guwMoXHqc0).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] github-actions[bot] commented on pull request #16205: Fix compaction subscription acknowledge Marker msg issue.

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #16205:
URL: https://github.com/apache/pulsar/pull/16205#issuecomment-1165090894

   @Technoboy- Please provide a correct documentation label for your PR.
   Instructions see [Pulsar Documentation Label Guide](https://docs.google.com/document/d/1Qw7LHQdXWBW9t2-r-A7QdFDBwmZh6ytB4guwMoXHqc0).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org