You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/09/28 19:19:07 UTC

[GitHub] merlimat closed pull request #2671: Simplify ServiceUrlProvider related APIs

merlimat closed pull request #2671: Simplify ServiceUrlProvider related APIs
URL: https://github.com/apache/pulsar/pull/2671
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ServiceUrlProviderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ServiceUrlProviderTest.java
index ab86f125b7..4b73b9ef6d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ServiceUrlProviderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ServiceUrlProviderTest.java
@@ -64,7 +64,7 @@ public void testCreateClientWithServiceUrlProvider() throws Exception {
         for (int i = 0; i < 100; i++) {
             producer.send("Hello Pulsar[" + i + "]");
         }
-        client.forceCloseConnection();
+        client.updateServiceUrl(pulsar.getBrokerServiceUrl());
         for (int i = 100; i < 200; i++) {
             producer.send("Hello Pulsar[" + i + "]");
         }
@@ -132,7 +132,7 @@ public String getServiceUrl() {
         }
 
         @Override
-        public void setClient(PulsarClient client) {
+        public void initialize(PulsarClient client) {
             this.pulsarClient = client;
         }
 
@@ -148,9 +148,7 @@ public AutoChangedServiceUrlProvider(String serviceUrl) {
         }
 
         public void onServiceUrlChanged(String newServiceUrl) throws PulsarClientException {
-            this.getPulsarClient().getConf().setServiceUrl(newServiceUrl);
-            this.getPulsarClient().reloadLookUp();
-            this.getPulsarClient().forceCloseConnection();
+            this.getPulsarClient().updateServiceUrl(newServiceUrl);
         }
     }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClient.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClient.java
index cba451bde1..ea80bc11e3 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClient.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClient.java
@@ -338,6 +338,19 @@ public static PulsarClient create(String serviceUrl, ClientConfiguration conf) t
     @Deprecated
     CompletableFuture<Reader<byte[]>> createReaderAsync(String topic, MessageId startMessageId, ReaderConfiguration conf);
 
+    /**
+     * Update the service URL this client is using.
+     *
+     * This will force the client close all existing connections and to restart service discovery to the new service
+     * endpoint.
+     *
+     * @param serviceUrl
+     *            the new service URL this client should connect to
+     * @throws PulsarClientException
+     *             in case the serviceUrl is not valid
+     */
+    void updateServiceUrl(String serviceUrl) throws PulsarClientException;
+
     /**
      * Close the PulsarClient and release all the resources.
      *
@@ -368,26 +381,4 @@ public static PulsarClient create(String serviceUrl, ClientConfiguration conf) t
      *             if the forceful shutdown fails
      */
     void shutdown() throws PulsarClientException;
-
-    /**
-     * Force close connection of pulsar client.
-     *
-     * close all producer connection and close all consumer producer.
-     *
-     */
-    void forceCloseConnection();
-
-    /**
-     * Reload lookup service in pulsar client.
-     *
-     * @throws PulsarClientException
-     */
-    void reloadLookUp() throws PulsarClientException;
-
-    /**
-     * Get client config data.
-     *
-     * @return
-     */
-    ClientConfigurationData getConf();
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ServiceUrlProvider.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ServiceUrlProvider.java
index 9b6e96347d..a668987c81 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ServiceUrlProvider.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ServiceUrlProvider.java
@@ -19,24 +19,31 @@
 package org.apache.pulsar.client.api;
 
 /**
- * The provider to provide the service url
- * It used by {@link ClientBuilder#serviceUrlProvider(ServiceUrlProvider)}
+ * The provider to provide the service url.
+ *
+ * This allows applications to retrieve the service URL from an external configuration provider and, more importantly,
+ * to force the Pulsar client to reconnect if the service URL has been changed.
+ *
+ * It can be passed with {@link ClientBuilder#serviceUrlProvider(ServiceUrlProvider)}
  */
 public interface ServiceUrlProvider {
 
     /**
-     * Get pulsar service url from ServiceUrlProvider.
+     * Initialize the service url provider with Pulsar client instance.
+     *
+     * This can be used by the provider to force the Pulsar client to reconnect whenever the service url might have
+     * changed. See {@link PulsarClient#updateServiceUrl(String)}.
      *
-     * @return pulsar service url.
+     * @param client
+     *            created pulsar client.
      */
-    String getServiceUrl();
+    void initialize(PulsarClient client);
 
     /**
-     * Set pulsar client to the provider for provider can control the pulsar client,
-     * such as {@link PulsarClient#forceCloseConnection()} or {@link PulsarClient#close()}.
+     * Get the current service URL the Pulsar client should connect to.
      *
-     * @param client created pulsar client.
+     * @return the pulsar service url.
      */
-    void setClient(PulsarClient client);
+    String getServiceUrl();
 
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
index e4cb7bdac6..e373ed8319 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
@@ -38,7 +38,6 @@
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.common.api.Commands;
-import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopicResponse;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopicResponse.LookupType;
@@ -52,7 +51,7 @@
 public class BinaryProtoLookupService implements LookupService {
 
     private final PulsarClientImpl client;
-    protected final InetSocketAddress serviceAddress;
+    protected volatile InetSocketAddress serviceAddress;
     private final boolean useTls;
     private final ExecutorService executor;
 
@@ -61,6 +60,11 @@ public BinaryProtoLookupService(PulsarClientImpl client, String serviceUrl, bool
         this.client = client;
         this.useTls = useTls;
         this.executor = executor;
+        updateServiceUrl(serviceUrl);
+    }
+
+    @Override
+    public void updateServiceUrl(String serviceUrl) throws PulsarClientException {
         URI uri;
         try {
             uri = new URI(serviceUrl);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
index 6a595209c5..3e87bc9352 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
@@ -53,7 +53,7 @@ public PulsarClient build() throws PulsarClientException {
         }
         PulsarClient client = new PulsarClientImpl(conf);
         if (conf.getServiceUrlProvider() != null) {
-            conf.getServiceUrlProvider().setClient(client);
+            conf.getServiceUrlProvider().initialize(client);
         }
         return client;
     }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
index 3188b3562b..0c7751bd9a 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
@@ -72,7 +72,7 @@
     public ConnectionPool(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) {
         this(conf, eventLoopGroup, () -> new ClientCnx(conf, eventLoopGroup));
     }
-    
+
     public ConnectionPool(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, Supplier<ClientCnx> clientCnxSupplier) {
         this.eventLoopGroup = eventLoopGroup;
         this.maxConnectionsPerHosts = conf.getConnectionsPerBroker();
@@ -119,6 +119,24 @@ public void initChannel(SocketChannel ch) throws Exception {
         return getConnection(address, address);
     }
 
+    void closeAllConnections() {
+        pool.values().forEach(map -> {
+            map.values().forEach(future -> {
+                if (future.isDone()) {
+                    if (!future.isCompletedExceptionally()) {
+                        // Connection was already created successfully, the join will not throw any exception
+                        future.join().close();
+                    } else {
+                        // If the future already failed, there's nothing we have to do
+                    }
+                } else {
+                    // The future is still pending: just register to make sure it gets closed if the operation will succeed
+                    future.thenAccept(ClientCnx::close);
+                }
+            });
+        });
+    }
+
     /**
      * Get a connection from the pool.
      * <p>
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
index e5ef4c5abc..0e94e6b73f 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
@@ -60,7 +60,7 @@
     protected final static int DEFAULT_READ_TIMEOUT_IN_SECONDS = 30;
 
     protected final AsyncHttpClient httpClient;
-    protected final URL url;
+    protected volatile URL url;
     protected final Authentication authentication;
 
     protected HttpClient(String serviceUrl, Authentication authentication,
@@ -74,12 +74,7 @@ protected HttpClient(String serviceUrl, Authentication authentication,
             EventLoopGroup eventLoopGroup, boolean tlsAllowInsecureConnection, String tlsTrustCertsFilePath,
             int connectTimeoutInSeconds, int readTimeoutInSeconds) throws PulsarClientException {
         this.authentication = authentication;
-        try {
-            // Ensure trailing "/" on url
-            url = new URL(serviceUrl);
-        } catch (MalformedURLException e) {
-            throw new PulsarClientException.InvalidServiceURL(e);
-        }
+        setServiceUrl(serviceUrl);
 
         DefaultAsyncHttpClientConfig.Builder confBuilder = new DefaultAsyncHttpClientConfig.Builder();
         confBuilder.setFollowRedirect(true);
@@ -120,6 +115,15 @@ public boolean keepAlive(Request ahcRequest, HttpRequest request, HttpResponse r
         log.debug("Using HTTP url: {}", this.url);
     }
 
+    void setServiceUrl(String serviceUrl) throws PulsarClientException {
+        try {
+            // Ensure trailing "/" on url
+            url = new URL(serviceUrl);
+        } catch (MalformedURLException e) {
+            throw new PulsarClientException.InvalidServiceURL(e);
+        }
+    }
+
     @Override
     public void close() throws IOException {
         httpClient.close();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
index 3abdf2531c..97ef0e1c6c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
@@ -59,6 +59,11 @@ public HttpLookupService(ClientConfigurationData conf, EventLoopGroup eventLoopG
         this.useTls = conf.isUseTls();
     }
 
+    @Override
+    public void updateServiceUrl(String serviceUrl) throws PulsarClientException {
+        httpClient.setServiceUrl(serviceUrl);
+    }
+
     /**
      * Calls http-lookup api to find broker-service address which can serve a given topic.
      *
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java
index a2af17c391..5d526a5d54 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java
@@ -24,6 +24,7 @@
 import java.util.concurrent.CompletableFuture;
 
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
@@ -43,6 +44,11 @@
  */
 public interface LookupService extends AutoCloseable {
 
+    /**
+     * Instruct the LookupService to switch to a new service URL for all subsequent requests
+     */
+    void updateServiceUrl(String serviceUrl) throws PulsarClientException;
+
     /**
      * Calls broker lookup-api to get broker {@link InetSocketAddress} which serves namespace bundle that contains given
      * topic.
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 5752486f1a..9640c09dd7 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -34,7 +34,6 @@
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ThreadFactory;
@@ -712,16 +711,12 @@ public void shutdown() throws PulsarClientException {
     }
 
     @Override
-    public void forceCloseConnection() {
-        for (ConcurrentMap<Integer, CompletableFuture<ClientCnx>> cnxMap : cnxPool.pool.values()) {
-            for (CompletableFuture<ClientCnx> clientCnxCompletableFuture : cnxMap.values()) {
-                try {
-                    clientCnxCompletableFuture.get().close();
-                } catch (Exception e) {
-                    log.error("Force close connection exception ", e);
-                }
-            }
-        }
+    public synchronized void updateServiceUrl(String serviceUrl) throws PulsarClientException {
+        log.info("Updating service URL to {}", serviceUrl);
+
+        conf.setServiceUrl(serviceUrl);
+        lookup.updateServiceUrl(serviceUrl);
+        cnxPool.closeAllConnections();
     }
 
     protected CompletableFuture<ClientCnx> getConnection(final String topic) {
@@ -771,11 +766,6 @@ public void reloadLookUp() throws PulsarClientException {
         }
     }
 
-    @Override
-    public ClientConfigurationData getConf() {
-        return conf;
-    }
-
     public CompletableFuture<Integer> getNumberOfPartitions(String topic) {
         return getPartitionedTopicMetadata(topic).thenApply(metadata -> metadata.partitions);
     }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services