You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/03/18 09:01:59 UTC

[pulsar] 09/10: Fix partitionsAutoUpdateFuture never complete (#14625)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 8fbb57d8ac4944a3221d71d52dfcd8907d95f1d3
Author: wenbingshen <ol...@gmail.com>
AuthorDate: Thu Mar 17 19:50:19 2022 +0800

    Fix partitionsAutoUpdateFuture never complete (#14625)
    
    (cherry picked from commit b06dac68700dfcdf701dfbccb98126db7a7b7ef3)
---
 .../client/api/SimpleProducerConsumerTest.java     | 49 ++++++++++++++++++++++
 .../client/impl/PartitionedProducerImpl.java       | 13 +++++-
 2 files changed, 60 insertions(+), 2 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 6302cbd..32b320cd 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -39,6 +39,7 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
+import io.netty.util.Timeout;
 import java.io.IOException;
 import java.lang.reflect.Field;
 import java.lang.reflect.Modifier;
@@ -4200,4 +4201,52 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
             assertEquals(resultSet.size(), total);
         });
     }
+
+    @Test
+    public void testPartitionsAutoUpdate() throws Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        int numPartitions = 3;
+        TopicName topicName = TopicName.get("persistent://my-property/my-ns/partitionsAutoUpdate-1");
+        admin.topics().createPartitionedTopic(topicName.toString(), numPartitions);
+
+        int operationTimeout = 2000; // MILLISECONDS
+        @Cleanup final PulsarClient client = PulsarClient.builder()
+                .serviceUrl(lookupUrl.toString())
+                .operationTimeout(operationTimeout, TimeUnit.MILLISECONDS)
+                .build();
+
+        ProducerBuilder<byte[]> producerBuilder = client.newProducer()
+                .topic(topicName.toString()).sendTimeout(1, TimeUnit.SECONDS);
+
+        @Cleanup
+        PartitionedProducerImpl<byte[]> partitionedProducer =
+                (PartitionedProducerImpl<byte[]>) producerBuilder.autoUpdatePartitions(true).create();
+
+        // Trigger the Connection refused exception
+        stopBroker();
+
+        log.info("trigger partitionsAutoUpdateTimerTask run failed for producer");
+        Timeout timeout = partitionedProducer.getPartitionsAutoUpdateTimeout();
+        timeout.task().run(timeout);
+        Awaitility.await().untilAsserted(() -> {
+            assertNotNull(partitionedProducer.getPartitionsAutoUpdateFuture());
+            assertTrue(partitionedProducer.getPartitionsAutoUpdateFuture().isCompletedExceptionally());
+            assertTrue(FutureUtil.getException(partitionedProducer.getPartitionsAutoUpdateFuture()).get().getMessage()
+                    .contains("Connection refused:"));
+        });
+
+        startBroker();
+
+        log.info("trigger partitionsAutoUpdateTimerTask run successful for producer");
+        timeout = partitionedProducer.getPartitionsAutoUpdateTimeout();
+        timeout.task().run(timeout);
+        Awaitility.await().untilAsserted(() -> {
+            assertNotNull(partitionedProducer.getPartitionsAutoUpdateFuture());
+            assertTrue(partitionedProducer.getPartitionsAutoUpdateFuture().isDone());
+            assertFalse(partitionedProducer.getPartitionsAutoUpdateFuture().isCompletedExceptionally());
+        });
+
+        log.info("-- Exiting {} test --", methodName);
+    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
index 9ad0f14..c47eca3 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
@@ -300,7 +300,7 @@ public class PartitionedProducerImpl<T> extends ProducerBase<T> {
 
                 if (log.isDebugEnabled()) {
                     log.debug("[{}] partitions number. old: {}, new: {}",
-                        topic, oldPartitionNumber, currentPartitionNumber);
+                            topic, oldPartitionNumber, currentPartitionNumber);
                 }
 
                 if (oldPartitionNumber == currentPartitionNumber) {
@@ -343,10 +343,14 @@ public class PartitionedProducerImpl<T> extends ProducerBase<T> {
                     return null;
                 } else {
                     log.error("[{}] not support shrink topic partitions. old: {}, new: {}",
-                        topic, oldPartitionNumber, currentPartitionNumber);
+                            topic, oldPartitionNumber, currentPartitionNumber);
                     future.completeExceptionally(new NotSupportedException("not support shrink topic partitions"));
                 }
                 return future;
+            }).exceptionally(throwable -> {
+                log.error("[{}] Auto getting partitions failed", topic, throwable);
+                future.completeExceptionally(throwable);
+                return null;
             });
 
             return future;
@@ -380,6 +384,11 @@ public class PartitionedProducerImpl<T> extends ProducerBase<T> {
     };
 
     @VisibleForTesting
+    public CompletableFuture<Void> getPartitionsAutoUpdateFuture() {
+        return partitionsAutoUpdateFuture;
+    }
+
+    @VisibleForTesting
     public Timeout getPartitionsAutoUpdateTimeout() {
         return partitionsAutoUpdateTimeout;
     }