You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bl...@apache.org on 2020/06/10 07:47:30 UTC

[pulsar-dotpulsar] branch master updated: Added Pulsar Proxy support, now honors ProxyThroughServiceUrl by setting ProxyToBrokerUrl. (#37)

This is an automated email from the ASF dual-hosted git repository.

blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new c8f10fe  Added Pulsar Proxy support, now honors ProxyThroughServiceUrl by setting ProxyToBrokerUrl. (#37)
c8f10fe is described below

commit c8f10feca3cbab1a8f947a7f5e97572c923a0a02
Author: Magne Helleborg <ma...@gmail.com>
AuthorDate: Wed Jun 10 09:44:12 2020 +0200

    Added Pulsar Proxy support, now honors ProxyThroughServiceUrl by setting ProxyToBrokerUrl. (#37)
    
    Co-authored-by: Magne Helleborg <ma...@gmail.com>
---
 README.md                                |   1 +
 src/DotPulsar/Internal/ConnectionPool.cs | 118 ++++++++++++++++++++++++++-----
 2 files changed, 101 insertions(+), 18 deletions(-)

diff --git a/README.md b/README.md
index e76d5c1..ac0bd99 100644
--- a/README.md
+++ b/README.md
@@ -56,6 +56,7 @@ For a more in-depth tour of the API, please visit the [Wiki](https://github.com/
 - [X] Consume compacted topics
 - [X] Reader API
 - [X] Read/Consume/Acknowledge batched messages
+- [X] Pulsar Proxy
 
 ## Roadmap
 
diff --git a/src/DotPulsar/Internal/ConnectionPool.cs b/src/DotPulsar/Internal/ConnectionPool.cs
index 7b5c627..c86cfc5 100644
--- a/src/DotPulsar/Internal/ConnectionPool.cs
+++ b/src/DotPulsar/Internal/ConnectionPool.cs
@@ -14,10 +14,10 @@
 
 namespace DotPulsar.Internal
 {
+    using Abstractions;
     using DotPulsar.Exceptions;
-    using DotPulsar.Internal.Abstractions;
-    using DotPulsar.Internal.Extensions;
-    using DotPulsar.Internal.PulsarApi;
+    using Extensions;
+    using PulsarApi;
     using System;
     using System.Collections.Concurrent;
     using System.Linq;
@@ -31,7 +31,7 @@ namespace DotPulsar.Internal
         private readonly Uri _serviceUrl;
         private readonly Connector _connector;
         private readonly EncryptionPolicy _encryptionPolicy;
-        private readonly ConcurrentDictionary<Uri, Connection> _connections;
+        private readonly ConcurrentDictionary<PulsarUrl, Connection> _connections;
 
         private readonly CancellationTokenSource _cancellationTokenSource;
         private readonly Task _closeInactiveConnections;
@@ -43,7 +43,7 @@ namespace DotPulsar.Internal
             _serviceUrl = serviceUrl;
             _connector = connector;
             _encryptionPolicy = encryptionPolicy;
-            _connections = new ConcurrentDictionary<Uri, Connection>();
+            _connections = new ConcurrentDictionary<PulsarUrl, Connection>();
             _cancellationTokenSource = new CancellationTokenSource();
             _closeInactiveConnections = CloseInactiveConnections(closeInactiveConnectionsInterval, _cancellationTokenSource.Token);
         }
@@ -69,11 +69,11 @@ namespace DotPulsar.Internal
                 Authoritative = false
             };
 
-            var serviceUrl = _serviceUrl;
+            var physicalUrl = _serviceUrl;
 
             while (true)
             {
-                var connection = await GetConnection(serviceUrl, cancellationToken).ConfigureAwait(false);
+                var connection = await GetConnection(physicalUrl, cancellationToken).ConfigureAwait(false);
                 var response = await connection.Send(lookup, cancellationToken).ConfigureAwait(false);
 
                 response.Expect(BaseCommand.Type.LookupResponse);
@@ -83,13 +83,24 @@ namespace DotPulsar.Internal
 
                 lookup.Authoritative = response.LookupTopicResponse.Authoritative;
 
-                serviceUrl = new Uri(GetBrokerServiceUrl(response.LookupTopicResponse));
+                var lookupResponseServiceUrl = new Uri(GetBrokerServiceUrl(response.LookupTopicResponse));
 
                 if (response.LookupTopicResponse.Response == CommandLookupTopicResponse.LookupType.Redirect || !response.LookupTopicResponse.Authoritative)
+                {
+                    physicalUrl = lookupResponseServiceUrl;
                     continue;
+                }
+
+                if (response.LookupTopicResponse.ProxyThroughServiceUrl)
+                {
+                    var url = new PulsarUrl(physicalUrl, lookupResponseServiceUrl);
+                    return await GetConnection(url, cancellationToken).ConfigureAwait(false);
+                }
 
                 // LookupType is 'Connect', ServiceUrl is local and response is authoritative. Assume the Pulsar server is a standalone docker.
-                return _serviceUrl.IsLoopback ? connection : await GetConnection(serviceUrl, cancellationToken).ConfigureAwait(false);
+                return lookupResponseServiceUrl.IsLoopback
+                    ? connection
+                    : await GetConnection(lookupResponseServiceUrl, cancellationToken).ConfigureAwait(false);
             }
         }
 
@@ -115,30 +126,42 @@ namespace DotPulsar.Internal
             }
         }
 
-        private async ValueTask<Connection> GetConnection(Uri serviceUrl, CancellationToken cancellationToken)
+        private ValueTask<Connection> GetConnection(Uri serviceUrl, CancellationToken cancellationToken)
+        {
+            return GetConnection(new PulsarUrl(serviceUrl,serviceUrl), cancellationToken);
+        }
+
+        private async ValueTask<Connection> GetConnection(PulsarUrl url, CancellationToken cancellationToken)
         {
             using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
             {
-                if (_connections.TryGetValue(serviceUrl, out Connection connection))
+                if (_connections.TryGetValue(url, out Connection connection))
                     return connection;
 
-                return await EstablishNewConnection(serviceUrl, cancellationToken).ConfigureAwait(false);
+                return await EstablishNewConnection(url, cancellationToken).ConfigureAwait(false);
             }
         }
 
-        private async Task<Connection> EstablishNewConnection(Uri serviceUrl, CancellationToken cancellationToken)
+        private async Task<Connection> EstablishNewConnection(PulsarUrl url, CancellationToken cancellationToken)
         {
-            var stream = await _connector.Connect(serviceUrl).ConfigureAwait(false);
+            var stream = await _connector.Connect(url.Physical).ConfigureAwait(false);
             var connection = new Connection(new PulsarStream(stream));
             DotPulsarEventSource.Log.ConnectionCreated();
-            _connections[serviceUrl] = connection;
-            _ = connection.ProcessIncommingFrames(cancellationToken).ContinueWith(t => DisposeConnection(serviceUrl));
-            var response = await connection.Send(_commandConnect, cancellationToken).ConfigureAwait(false);
+            _connections[url] = connection;
+            _ = connection.ProcessIncommingFrames(cancellationToken).ContinueWith(t => DisposeConnection(url));
+            var commandConnect = _commandConnect;
+
+            if (url.ProxyThroughServiceUrl)
+            {
+                commandConnect = WithProxyToBroker(_commandConnect, url.Logical);
+            }
+
+            var response = await connection.Send(commandConnect, cancellationToken).ConfigureAwait(false);
             response.Expect(BaseCommand.Type.Connected);
             return connection;
         }
 
-        private async ValueTask DisposeConnection(Uri serviceUrl)
+        private async ValueTask DisposeConnection(PulsarUrl serviceUrl)
         {
             if (_connections.TryRemove(serviceUrl, out Connection connection))
             {
@@ -147,6 +170,22 @@ namespace DotPulsar.Internal
             }
         }
 
+        private static CommandConnect WithProxyToBroker(CommandConnect commandConnect, Uri logicalUrl)
+        {
+            return new CommandConnect
+            {
+                AuthData = commandConnect.AuthData,
+                AuthMethod = commandConnect.AuthMethod,
+                AuthMethodName = commandConnect.AuthMethodName,
+                ClientVersion = commandConnect.ClientVersion,
+                OriginalPrincipal = commandConnect.OriginalPrincipal,
+                ProtocolVersion = commandConnect.ProtocolVersion,
+                OriginalAuthData = commandConnect.OriginalAuthData,
+                OriginalAuthMethod = commandConnect.OriginalAuthMethod,
+                ProxyToBrokerUrl = $"{logicalUrl.Host}:{logicalUrl.Port}"
+            };
+        }
+
         private async Task CloseInactiveConnections(TimeSpan interval, CancellationToken cancellationToken)
         {
             while (!cancellationToken.IsCancellationRequested)
@@ -175,5 +214,48 @@ namespace DotPulsar.Internal
                 }
             }
         }
+
+        private class PulsarUrl: IEquatable<PulsarUrl>
+        {
+            public PulsarUrl(Uri physical, Uri logical)
+            {
+                Physical = physical;
+                Logical = logical;
+                ProxyThroughServiceUrl = physical != logical;
+            }
+            public Uri Physical { get; }
+            public Uri Logical { get; }
+
+            public bool ProxyThroughServiceUrl { get; }
+
+            public bool Equals(PulsarUrl? other)
+            {
+                if (ReferenceEquals(null, other))
+                    return false;
+
+                if (ReferenceEquals(this, other))
+                    return true;
+                return Physical.Equals(other.Physical) && Logical.Equals(other.Logical);
+            }
+
+            public override bool Equals(object? obj)
+            {
+                if (ReferenceEquals(null, obj))
+                    return false;
+
+                if (ReferenceEquals(this, obj))
+                    return true;
+
+                if (obj.GetType() != this.GetType())
+                    return false;
+                return Equals((PulsarUrl) obj);
+            }
+
+            public override int GetHashCode()
+                => HashCode.Combine(Physical, Logical);
+
+            public override string ToString()
+                => $"{nameof(Physical)}: {Physical}, {nameof(Logical)}: {Logical}, {nameof(ProxyThroughServiceUrl)}: {ProxyThroughServiceUrl}";
+        }
     }
 }