You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/03/08 10:55:21 UTC

[GitHub] [pulsar] lhotari commented on a change in pull request #14602: Fix PartitionedProducerImpl flushAsync always fail when one partition send TimeOutException

lhotari commented on a change in pull request #14602:
URL: https://github.com/apache/pulsar/pull/14602#discussion_r821529738



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
##########
@@ -116,6 +116,9 @@
 
     private final BatchMessageContainerBase batchMessageContainer;
     private CompletableFuture<MessageId> lastSendFuture = CompletableFuture.completedFuture(null);
+    private final CompletableFuture<MessageId> lastSendFutureEmpty = CompletableFuture.completedFuture(null);

Review comment:
       A CompletableFuture isn't immutable and shouldn't be shared.

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
##########
@@ -116,6 +116,9 @@
 
     private final BatchMessageContainerBase batchMessageContainer;
     private CompletableFuture<MessageId> lastSendFuture = CompletableFuture.completedFuture(null);
+    private final CompletableFuture<MessageId> lastSendFutureEmpty = CompletableFuture.completedFuture(null);
+    private volatile boolean lastSendFutureResponse = false;

Review comment:
       Adding a new field isn't necessary. (I'll explain later)

##########
File path: pulsar-client/src/test/java/org/apache/pulsar/client/impl/PartitionedProducerImplTest.java
##########
@@ -264,4 +266,49 @@ public void testGetNumOfPartitions() throws Exception {
         assertEquals(producerImpl.getNumOfPartitions(), 0);
     }
 
+    @Test
+    public void testFlushWhenLastSendFutureFailed() {
+        ProducerConfigurationData producerConfData = new ProducerConfigurationData();
+        ProducerImpl<Object> producerImpl = client.newProducerImpl(TOPIC_NAME, 0, producerConfData,
+                null, null, null, Optional.empty());
+
+        // 1. When no data is sent to this producerImpl,
+        // its lastSendFuture is always in normal completion state
+        CompletableFuture<Void> lastSendFuture = producerImpl.getLastSendFuture();
+        assertTrue(lastSendFuture.isDone());
+        assertFalse(lastSendFuture.isCompletedExceptionally());
+
+        // 2. We set the lastSendFuture of this partition to an abnormal state,
+        // simulating that an exception occurred during the sending process
+        final String failedMessage = "failed last send future";
+        producerImpl.setLastSendFuture(FutureUtil.failedFuture(new Throwable(failedMessage)));
+
+        // 3. So when we get its lastSendFuture again,
+        // the future is already in an abnormal failure state
+        lastSendFuture = producerImpl.getLastSendFuture();
+        assertTrue(lastSendFuture.isDone());
+        assertTrue(lastSendFuture.isCompletedExceptionally());
+        // 4. The following simple simulation application captures the exception processing
+        lastSendFuture.exceptionally(throwable -> {
+            assertNotNull(throwable);
+            assertEquals(throwable.getMessage(), failedMessage);
+            return null;
+        });
+
+        // 5. We have already handled the exception in the step 4,
+        // and then PartitionedProducerImpl will continue to send data.
+        // It should be noted that when this partition is not selected for data transmission this time,
+        // its lastSendFuture is still the future that was in an abnormal state in the previous step.
+        // When the application calls the flush operation again, in the previous logic,
+        // its exception future will be returned to the application,
+        // causing the application to always execute the exception handling logic.
+        // In fact, we have handled the exception before,
+        // and we did not send data to this partition this time,
+        // it should not affect this transmission.
+        // So we want the lastSendFuture here to be in normal state.
+        lastSendFuture = producerImpl.getLastSendFuture();
+        assertTrue(lastSendFuture.isDone());

Review comment:
       this should be tested by observing the external behavior. for example, sending a message while the connection is broken. the flush should fail only once and it should be possible to send more messages after the connection resumes.

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
##########
@@ -2209,5 +2224,26 @@ boolean isErrorStat() {
         return errorState;
     }
 
+    @VisibleForTesting
+    CompletableFuture<Void> getLastSendFuture() {
+        CompletableFuture<MessageId> lastSendFuture;
+        if (lastSendFutureResponse) {
+            lastSendFuture = this.lastSendFutureEmpty;
+        } else {
+            lastSendFuture = this.lastSendFuture;
+            lastSendFuture.exceptionally(ignored -> {
+                lastSendFutureResponse = true;
+                return null;
+            });
+        }
+
+        return lastSendFuture.thenApply(ignored -> null);
+    }
+
+    @VisibleForTesting
+    void setLastSendFuture(CompletableFuture<MessageId> lastSendFuture) {
+        this.lastSendFuture = lastSendFuture;
+    }
+

Review comment:
       It's better to focus on testing external behavior that can be observed.

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
##########
@@ -1975,8 +1981,17 @@ private void failPendingBatchMessages(PulsarClientException ex) {
             if (isBatchMessagingEnabled()) {
                 batchMessageAndSend();
             }
-            lastSendFuture = this.lastSendFuture;
+            if (lastSendFutureResponse) {
+                lastSendFuture = this.lastSendFutureEmpty;
+            } else {
+                lastSendFuture = this.lastSendFuture;
+                lastSendFuture.exceptionally(ignored -> {
+                    lastSendFutureResponse = true;
+                    return null;
+                });
+            }
         }
+
         return lastSendFuture.thenApply(ignored -> null);

Review comment:
       Since the intention seems to be to ignore an exception that is delivered once, it would be better to make that more explicit. It would be a breaking change if calling flushAsync wouldn't deliver a possible exception to the caller at all.
   
   To prevent race conditions this would have to be implemented in a different way. The high level solution would be to create a class that contains 2 fields: a completable future and a AtomicBoolean field (or volate. The "lastSendFuture" field should reference this wrapper and ProducerImpl should delegate actions to it. The wrapper would contain behavior that makes it return the exception only for the "first call". Technically this could be implemented using CompletableFuture.handle .  It's possible to re-throw the exception in handle when wrapped with CompletionException. That would be done only the "first call". AtomicBoolean's compareAndSet could be used to ensure that the exception is thrown only on the "first call". ("first call" isn't really a call in this case, but I found it easier to explain it that way)
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org