You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2019/11/23 12:43:12 UTC
[pulsar] 02/03: [Issue 5597][pulsar-client-java] retry when
getPartitionedTopicMetadata failed (#5603)
This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit d22dfd36a07821a6812525ad49f7ae93a7e50fd9
Author: ltamber <lt...@gmail.com>
AuthorDate: Thu Nov 21 00:53:20 2019 +0800
[Issue 5597][pulsar-client-java] retry when getPartitionedTopicMetadata failed (#5603)
### Motivation
Fixes #5597
When using a multi-broker service url to create a producer, if the connection to the first broker failed, the creation will fail.
### Modification
Add backoff retries when getting partitioned metadata from brokers.
(cherry picked from commit ee11e100d8a05296f1ddf0da6c4e52f63ca02294)
---
.../stats/client/PulsarBrokerStatsClientTest.java | 63 ++++++++++++++++++++++
.../org/apache/pulsar/client/impl/HttpClient.java | 6 ++-
.../pulsar/client/impl/PulsarClientImpl.java | 32 ++++++++++-
3 files changed, 97 insertions(+), 4 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/stats/client/PulsarBrokerStatsClientTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/stats/client/PulsarBrokerStatsClientTest.java
index fca1485..00d95d1 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/stats/client/PulsarBrokerStatsClientTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/stats/client/PulsarBrokerStatsClientTest.java
@@ -31,6 +31,8 @@ import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats.CursorStats;
import org.slf4j.Logger;
@@ -47,6 +49,7 @@ import java.util.concurrent.TimeUnit;
import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
public class PulsarBrokerStatsClientTest extends ProducerConsumerBase {
@@ -132,5 +135,65 @@ public class PulsarBrokerStatsClientTest extends ProducerConsumerBase {
log.info("-- Exiting {} test --", methodName);
}
+ @Test
+ public void testGetPartitionedTopicMetaData() throws Exception {
+ log.info("-- Starting {} test --", methodName);
+
+ final String topicName = "persistent://my-property/my-ns/my-topic1";
+ final String subscriptionName = "my-subscriber-name";
+
+
+
+ try {
+ String url = "http://localhost:51000,localhost:" + BROKER_WEBSERVICE_PORT;
+ if (isTcpLookup) {
+ url = "pulsar://localhost:51000,localhost:" + BROKER_PORT;
+ }
+ PulsarClient client = newPulsarClient(url, 0);
+
+ Consumer<byte[]> consumer = client.newConsumer().topic(topicName).subscriptionName(subscriptionName)
+ .acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
+ Producer<byte[]> producer = client.newProducer().topic(topicName).create();
+
+ consumer.close();
+ producer.close();
+ client.close();
+ } catch (PulsarClientException pce) {
+ log.error("create producer or consumer error: ", pce);
+ fail();
+ }
+
+ log.info("-- Exiting {} test --", methodName);
+ }
+
+ @Test (timeOut = 4000)
+ public void testGetPartitionedTopicDataTimeout() {
+ log.info("-- Starting {} test --", methodName);
+
+ final String topicName = "persistent://my-property/my-ns/my-topic1";
+
+ String url = "http://localhost:51000,localhost:51001";
+ if (isTcpLookup) {
+ url = "pulsar://localhost:51000,localhost:51001";
+ }
+
+ PulsarClient client;
+ try {
+ client = PulsarClient.builder()
+ .serviceUrl(url)
+ .statsInterval(0, TimeUnit.SECONDS)
+ .operationTimeout(3, TimeUnit.SECONDS)
+ .build();
+
+ Producer<byte[]> producer = client.newProducer().topic(topicName).create();
+
+ fail();
+ } catch (PulsarClientException pce) {
+ log.error("create producer error: ", pce);
+ }
+
+ log.info("-- Exiting {} test --", methodName);
+ }
+
private static final Logger log = LoggerFactory.getLogger(PulsarBrokerStatsClientTest.class);
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
index 96c62d2..845c741 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.client.impl;
import java.io.Closeable;
import java.io.IOException;
import java.net.HttpURLConnection;
+import java.net.URI;
import java.net.URL;
import java.util.Map;
import java.util.Map.Entry;
@@ -127,8 +128,9 @@ public class HttpClient implements Closeable {
public <T> CompletableFuture<T> get(String path, Class<T> clazz) {
final CompletableFuture<T> future = new CompletableFuture<>();
try {
- String requestUrl = new URL(serviceNameResolver.resolveHostUri().toURL(), path).toString();
- String remoteHostName = serviceNameResolver.resolveHostUri().getHost();
+ URI hostUri = serviceNameResolver.resolveHostUri();
+ String requestUrl = new URL(hostUri.toURL(), path).toString();
+ String remoteHostName = hostUri.getHost();
AuthenticationDataProvider authData = authentication.getAuthData(remoteHostName);
CompletableFuture<Map<String, String>> authFuture = new CompletableFuture<>();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 53468d3..abad54b 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -641,17 +641,45 @@ public class PulsarClientImpl implements PulsarClient {
public CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(String topic) {
- CompletableFuture<PartitionedTopicMetadata> metadataFuture;
+ CompletableFuture<PartitionedTopicMetadata> metadataFuture = new CompletableFuture<>();
try {
TopicName topicName = TopicName.get(topic);
- metadataFuture = lookup.getPartitionedTopicMetadata(topicName);
+ AtomicLong opTimeoutMs = new AtomicLong(conf.getOperationTimeoutMs());
+ Backoff backoff = new BackoffBuilder()
+ .setInitialTime(100, TimeUnit.MILLISECONDS)
+ .setMandatoryStop(opTimeoutMs.get() * 2, TimeUnit.MILLISECONDS)
+ .setMax(0, TimeUnit.MILLISECONDS)
+ .useUserConfiguredIntervals(conf.getDefaultBackoffIntervalNanos(),
+ conf.getMaxBackoffIntervalNanos())
+ .create();
+ getPartitionedTopicMetadata(topicName, backoff, opTimeoutMs, metadataFuture);
} catch (IllegalArgumentException e) {
return FutureUtil.failedFuture(new PulsarClientException.InvalidConfigurationException(e.getMessage()));
}
return metadataFuture;
}
+ private void getPartitionedTopicMetadata(TopicName topicName,
+ Backoff backoff,
+ AtomicLong remainingTime,
+ CompletableFuture<PartitionedTopicMetadata> future) {
+ lookup.getPartitionedTopicMetadata(topicName).thenAccept(future::complete).exceptionally(e -> {
+ long nextDelay = Math.min(backoff.next(), remainingTime.get());
+ if (nextDelay <= 0) {
+ future.completeExceptionally(new PulsarClientException
+ .TimeoutException("Could not getPartitionedTopicMetadata within configured timeout."));
+ return null;
+ }
+
+ timer.newTimeout( task -> {
+ remainingTime.addAndGet(-nextDelay);
+ getPartitionedTopicMetadata(topicName, backoff, remainingTime, future);
+ }, nextDelay, TimeUnit.MILLISECONDS);
+ return null;
+ });
+ }
+
@Override
public CompletableFuture<List<String>> getPartitionsForTopic(String topic) {
return getPartitionedTopicMetadata(topic).thenApply(metadata -> {