You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/03/09 08:20:05 UTC

[GitHub] [pulsar] eolivelli commented on a change in pull request #14602: Fix PartitionedProducerImpl flushAsync always fail when one partition send TimeOutException

eolivelli commented on a change in pull request #14602:
URL: https://github.com/apache/pulsar/pull/14602#discussion_r822388287



##########
File path: pulsar-client/src/test/java/org/apache/pulsar/client/impl/PartitionedProducerImplTest.java
##########
@@ -264,4 +266,49 @@ public void testGetNumOfPartitions() throws Exception {
         assertEquals(producerImpl.getNumOfPartitions(), 0);
     }
 
+    @Test
+    public void testFlushWhenLastSendFutureFailed() {
+        ProducerConfigurationData producerConfData = new ProducerConfigurationData();
+        ProducerImpl<Object> producerImpl = client.newProducerImpl(TOPIC_NAME, 0, producerConfData,
+                null, null, null, Optional.empty());
+
+        // 1. When no data is sent to this producerImpl,
+        // its lastSendFuture is always in normal completion state
+        CompletableFuture<Void> lastSendFuture = producerImpl.getLastSendFuture();
+        assertTrue(lastSendFuture.isDone());
+        assertFalse(lastSendFuture.isCompletedExceptionally());
+
+        // 2. We set the lastSendFuture of this partition to an abnormal state,
+        // simulating that an exception occurred during the sending process
+        final String failedMessage = "failed last send future";
+        producerImpl.setLastSendFuture(FutureUtil.failedFuture(new Throwable(failedMessage)));
+
+        // 3. So when we get its lastSendFuture again,
+        // the future is already in an abnormal failure state
+        lastSendFuture = producerImpl.getLastSendFuture();
+        assertTrue(lastSendFuture.isDone());
+        assertTrue(lastSendFuture.isCompletedExceptionally());
+        // 4. The following simple simulation application captures the exception processing
+        lastSendFuture.exceptionally(throwable -> {
+            assertNotNull(throwable);
+            assertEquals(throwable.getMessage(), failedMessage);
+            return null;
+        });
+
+        // 5. We have already handled the exception in the step 4,
+        // and then PartitionedProducerImpl will continue to send data.
+        // It should be noted that when this partition is not selected for data transmission this time,
+        // its lastSendFuture is still the future that was in an abnormal state in the previous step.
+        // When the application calls the flush operation again, in the previous logic,
+        // its exception future will be returned to the application,
+        // causing the application to always execute the exception handling logic.
+        // In fact, we have handled the exception before,
+        // and we did not send data to this partition this time,
+        // it should not affect this transmission.
+        // So we want the lastSendFuture here to be in normal state.
+        lastSendFuture = producerImpl.getLastSendFuture();
+        assertTrue(lastSendFuture.isDone());

Review comment:
       > this should be tested by observing the external behavior
   
   +1 to this (thanks @lhotari )




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org