You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/02/08 14:57:43 UTC

[GitHub] [pulsar] congbobo184 commented on a change in pull request #14170: [Transaction] Fix subscription ack transaction marker.

congbobo184 commented on a change in pull request #14170:
URL: https://github.com/apache/pulsar/pull/14170#discussion_r801724176



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMarkerDeleteTest.java
##########
@@ -97,84 +91,66 @@ public void testMarkerDeleteTimes() throws Exception {
 
     @Test
     public void testMarkerDelete() throws Exception {
-
-        MessageMetadata msgMetadata = new MessageMetadata().clear()
-                .setPublishTime(1)
-                .setProducerName("test")
-                .setSequenceId(0);
-
-        ByteBuf payload = PooledByteBufAllocator.DEFAULT.buffer(0);
-
-        payload = Commands.serializeMetadataAndPayload(Commands.ChecksumType.Crc32c,
-                msgMetadata, payload);
-
-        ManagedLedger managedLedger = pulsar.getManagedLedgerFactory().open("test");
-        PersistentTopic topic = mock(PersistentTopic.class);
-        doReturn(pulsar.getBrokerService()).when(topic).getBrokerService();
-        doReturn(managedLedger).when(topic).getManagedLedger();
-        doReturn("test").when(topic).getName();
-        ManagedCursor cursor = managedLedger.openCursor("test");
-        PersistentSubscription persistentSubscription = new PersistentSubscription(topic, "test",
-                managedLedger.openCursor("test"), false);
-
-        byte[] payloadBytes = toBytes(payload);
-        Position position1 = managedLedger.addEntry(payloadBytes);
-        Position markerPosition1 = managedLedger.addEntry(toBytes(Markers
-                .newTxnCommitMarker(1, 1, 1)));
-
-        Position position2 = managedLedger.addEntry(payloadBytes);
-        Position markerPosition2 = managedLedger.addEntry(toBytes(Markers
-                .newTxnAbortMarker(1, 1, 1)));
-
-        Position position3 = managedLedger.addEntry(payloadBytes);
-
-        assertEquals(cursor.getNumberOfEntriesInBacklog(true), 5);
-        assertTrue(((PositionImpl) cursor.getMarkDeletedPosition()).compareTo((PositionImpl) position1) < 0);
-
-        // ack position1, markerDeletePosition to markerPosition1
-        persistentSubscription.acknowledgeMessage(Collections.singletonList(position1),
-                AckType.Individual, Collections.emptyMap());
-
-        // ack position1, markerDeletePosition to markerPosition1
-        Awaitility.await().during(1, TimeUnit.SECONDS).until(() ->
-                ((PositionImpl) persistentSubscription.getCursor().getMarkDeletedPosition())
-                        .compareTo((PositionImpl) markerPosition1) == 0);
-
-        // ack position2, markerDeletePosition to markerPosition2
-        persistentSubscription.acknowledgeMessage(Collections.singletonList(position2),
-                AckType.Individual, Collections.emptyMap());
-
-        Awaitility.await().until(() ->
-                ((PositionImpl) persistentSubscription.getCursor().getMarkDeletedPosition())
-                        .compareTo((PositionImpl) markerPosition2) == 0);
-
-        // add consequent marker
-        managedLedger.addEntry(toBytes(Markers
-                .newTxnCommitMarker(1, 1, 1)));
-
-        managedLedger.addEntry(toBytes(Markers
-                .newTxnAbortMarker(1, 1, 1)));
-
-        Position markerPosition3 = managedLedger.addEntry(toBytes(Markers
-                .newTxnAbortMarker(1, 1, 1)));
-
-        // ack with transaction, then commit this transaction
-        persistentSubscription.transactionIndividualAcknowledge(new TxnID(0, 0),
-                Collections.singletonList(MutablePair.of((PositionImpl) position3, 0))).get();
-
-        persistentSubscription.endTxn(0, 0, 0, 0).get();
-
-        // ack with transaction, then commit this transaction
-        Awaitility.await().until(() ->
-                ((PositionImpl) persistentSubscription.getCursor().getMarkDeletedPosition())
-                        .compareTo((PositionImpl) markerPosition3) == 0);
-
+        final String subName = "testMarkerDelete";
+        final String topicName = NAMESPACE1 + "/testMarkerDelete";
+        @Cleanup
+        Consumer<byte[]> consumer = pulsarClient
+                .newConsumer()
+                .topic(topicName)
+                .subscriptionName(subName)
+                .isAckReceiptEnabled(true)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscribe();
+
+        Producer<byte[]> producer = pulsarClient
+                .newProducer()
+                .sendTimeout(0, TimeUnit.SECONDS)
+                .topic(topicName)
+                .create();
+
+        Transaction txn1 = getTxn();
+        Transaction txn2 = getTxn();
+        Transaction txn3 = getTxn();
+
+        MessageIdImpl msgId1 = (MessageIdImpl) producer.newMessage(txn1).send();
+        MessageIdImpl msgId2 = (MessageIdImpl) producer.newMessage(txn2).send();
+        assertNull(consumer.receive(1, TimeUnit.SECONDS));
+        txn1.commit().get();
+
+        consumer.acknowledgeAsync(consumer.receive()).get();
+        assertNull(consumer.receive(1, TimeUnit.SECONDS));
+
+        // maxReadPosition move to msgId1, msgId2 have not be committed
+        assertEquals(admin.topics().getInternalStats(topicName).cursors.get(subName).markDeletePosition,
+                PositionImpl.get(msgId1.getLedgerId(), msgId1.getEntryId()).toString());
+
+        MessageIdImpl msgId3 = (MessageIdImpl) producer.newMessage(txn3).send();
+        txn2.commit().get();
+
+        consumer.acknowledgeAsync(consumer.receive()).get();
+        assertNull(consumer.receive(1, TimeUnit.SECONDS));
+
+        // maxReadPosition move to txn1 marker, so entryId is msgId2.getEntryId() + 1,
+        // because send msgId2 before commit txn1
+        assertEquals(admin.topics().getInternalStats(topicName).cursors.get(subName).markDeletePosition,
+                PositionImpl.get(msgId2.getLedgerId(), msgId2.getEntryId() + 1).toString());
+
+        txn3.commit().get();
+
+        consumer.acknowledgeAsync(consumer.receive()).get();
+        assertNull(consumer.receive(1, TimeUnit.SECONDS));
+
+        // maxReadPosition move to txn3 marker, so entryId is msgId3.getEntryId() + 2, this is txn3 marker position
+        // send msgId2 before txn2 commit
+        assertEquals(admin.topics().getInternalStats(topicName).cursors.get(subName).markDeletePosition,
+                PositionImpl.get(msgId3.getLedgerId(), msgId3.getEntryId() + 2).toString());

Review comment:
       yes




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org