You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/03/18 09:01:59 UTC
[pulsar] 09/10: Fix partitionsAutoUpdateFuture never complete (#14625)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 8fbb57d8ac4944a3221d71d52dfcd8907d95f1d3
Author: wenbingshen <ol...@gmail.com>
AuthorDate: Thu Mar 17 19:50:19 2022 +0800
Fix partitionsAutoUpdateFuture never complete (#14625)
(cherry picked from commit b06dac68700dfcdf701dfbccb98126db7a7b7ef3)
---
.../client/api/SimpleProducerConsumerTest.java | 49 ++++++++++++++++++++++
.../client/impl/PartitionedProducerImpl.java | 13 +++++-
2 files changed, 60 insertions(+), 2 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 6302cbd..32b320cd 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -39,6 +39,7 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
+import io.netty.util.Timeout;
import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
@@ -4200,4 +4201,52 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
assertEquals(resultSet.size(), total);
});
}
+
+ @Test
+ public void testPartitionsAutoUpdate() throws Exception {
+ log.info("-- Starting {} test --", methodName);
+
+ int numPartitions = 3;
+ TopicName topicName = TopicName.get("persistent://my-property/my-ns/partitionsAutoUpdate-1");
+ admin.topics().createPartitionedTopic(topicName.toString(), numPartitions);
+
+ int operationTimeout = 2000; // MILLISECONDS
+ @Cleanup final PulsarClient client = PulsarClient.builder()
+ .serviceUrl(lookupUrl.toString())
+ .operationTimeout(operationTimeout, TimeUnit.MILLISECONDS)
+ .build();
+
+ ProducerBuilder<byte[]> producerBuilder = client.newProducer()
+ .topic(topicName.toString()).sendTimeout(1, TimeUnit.SECONDS);
+
+ @Cleanup
+ PartitionedProducerImpl<byte[]> partitionedProducer =
+ (PartitionedProducerImpl<byte[]>) producerBuilder.autoUpdatePartitions(true).create();
+
+ // Trigger the Connection refused exception
+ stopBroker();
+
+ log.info("trigger partitionsAutoUpdateTimerTask run failed for producer");
+ Timeout timeout = partitionedProducer.getPartitionsAutoUpdateTimeout();
+ timeout.task().run(timeout);
+ Awaitility.await().untilAsserted(() -> {
+ assertNotNull(partitionedProducer.getPartitionsAutoUpdateFuture());
+ assertTrue(partitionedProducer.getPartitionsAutoUpdateFuture().isCompletedExceptionally());
+ assertTrue(FutureUtil.getException(partitionedProducer.getPartitionsAutoUpdateFuture()).get().getMessage()
+ .contains("Connection refused:"));
+ });
+
+ startBroker();
+
+ log.info("trigger partitionsAutoUpdateTimerTask run successful for producer");
+ timeout = partitionedProducer.getPartitionsAutoUpdateTimeout();
+ timeout.task().run(timeout);
+ Awaitility.await().untilAsserted(() -> {
+ assertNotNull(partitionedProducer.getPartitionsAutoUpdateFuture());
+ assertTrue(partitionedProducer.getPartitionsAutoUpdateFuture().isDone());
+ assertFalse(partitionedProducer.getPartitionsAutoUpdateFuture().isCompletedExceptionally());
+ });
+
+ log.info("-- Exiting {} test --", methodName);
+ }
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
index 9ad0f14..c47eca3 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
@@ -300,7 +300,7 @@ public class PartitionedProducerImpl<T> extends ProducerBase<T> {
if (log.isDebugEnabled()) {
log.debug("[{}] partitions number. old: {}, new: {}",
- topic, oldPartitionNumber, currentPartitionNumber);
+ topic, oldPartitionNumber, currentPartitionNumber);
}
if (oldPartitionNumber == currentPartitionNumber) {
@@ -343,10 +343,14 @@ public class PartitionedProducerImpl<T> extends ProducerBase<T> {
return null;
} else {
log.error("[{}] not support shrink topic partitions. old: {}, new: {}",
- topic, oldPartitionNumber, currentPartitionNumber);
+ topic, oldPartitionNumber, currentPartitionNumber);
future.completeExceptionally(new NotSupportedException("not support shrink topic partitions"));
}
return future;
+ }).exceptionally(throwable -> {
+ log.error("[{}] Auto getting partitions failed", topic, throwable);
+ future.completeExceptionally(throwable);
+ return null;
});
return future;
@@ -380,6 +384,11 @@ public class PartitionedProducerImpl<T> extends ProducerBase<T> {
};
@VisibleForTesting
+ public CompletableFuture<Void> getPartitionsAutoUpdateFuture() {
+ return partitionsAutoUpdateFuture;
+ }
+
+ @VisibleForTesting
public Timeout getPartitionsAutoUpdateTimeout() {
return partitionsAutoUpdateTimeout;
}