You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bl...@apache.org on 2020/06/10 07:47:30 UTC
[pulsar-dotpulsar] branch master updated: Added Pulsar Proxy
support,
now honors ProxyThroughServiceUrl by setting ProxyToBrokerUrl. (#37)
This is an automated email from the ASF dual-hosted git repository.
blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
The following commit(s) were added to refs/heads/master by this push:
new c8f10fe Added Pulsar Proxy support, now honors ProxyThroughServiceUrl by setting ProxyToBrokerUrl. (#37)
c8f10fe is described below
commit c8f10feca3cbab1a8f947a7f5e97572c923a0a02
Author: Magne Helleborg <ma...@gmail.com>
AuthorDate: Wed Jun 10 09:44:12 2020 +0200
Added Pulsar Proxy support, now honors ProxyThroughServiceUrl by setting ProxyToBrokerUrl. (#37)
Co-authored-by: Magne Helleborg <ma...@gmail.com>
---
README.md | 1 +
src/DotPulsar/Internal/ConnectionPool.cs | 118 ++++++++++++++++++++++++++-----
2 files changed, 101 insertions(+), 18 deletions(-)
diff --git a/README.md b/README.md
index e76d5c1..ac0bd99 100644
--- a/README.md
+++ b/README.md
@@ -56,6 +56,7 @@ For a more in-depth tour of the API, please visit the [Wiki](https://github.com/
- [X] Consume compacted topics
- [X] Reader API
- [X] Read/Consume/Acknowledge batched messages
+- [X] Pulsar Proxy
## Roadmap
diff --git a/src/DotPulsar/Internal/ConnectionPool.cs b/src/DotPulsar/Internal/ConnectionPool.cs
index 7b5c627..c86cfc5 100644
--- a/src/DotPulsar/Internal/ConnectionPool.cs
+++ b/src/DotPulsar/Internal/ConnectionPool.cs
@@ -14,10 +14,10 @@
namespace DotPulsar.Internal
{
+ using Abstractions;
using DotPulsar.Exceptions;
- using DotPulsar.Internal.Abstractions;
- using DotPulsar.Internal.Extensions;
- using DotPulsar.Internal.PulsarApi;
+ using Extensions;
+ using PulsarApi;
using System;
using System.Collections.Concurrent;
using System.Linq;
@@ -31,7 +31,7 @@ namespace DotPulsar.Internal
private readonly Uri _serviceUrl;
private readonly Connector _connector;
private readonly EncryptionPolicy _encryptionPolicy;
- private readonly ConcurrentDictionary<Uri, Connection> _connections;
+ private readonly ConcurrentDictionary<PulsarUrl, Connection> _connections;
private readonly CancellationTokenSource _cancellationTokenSource;
private readonly Task _closeInactiveConnections;
@@ -43,7 +43,7 @@ namespace DotPulsar.Internal
_serviceUrl = serviceUrl;
_connector = connector;
_encryptionPolicy = encryptionPolicy;
- _connections = new ConcurrentDictionary<Uri, Connection>();
+ _connections = new ConcurrentDictionary<PulsarUrl, Connection>();
_cancellationTokenSource = new CancellationTokenSource();
_closeInactiveConnections = CloseInactiveConnections(closeInactiveConnectionsInterval, _cancellationTokenSource.Token);
}
@@ -69,11 +69,11 @@ namespace DotPulsar.Internal
Authoritative = false
};
- var serviceUrl = _serviceUrl;
+ var physicalUrl = _serviceUrl;
while (true)
{
- var connection = await GetConnection(serviceUrl, cancellationToken).ConfigureAwait(false);
+ var connection = await GetConnection(physicalUrl, cancellationToken).ConfigureAwait(false);
var response = await connection.Send(lookup, cancellationToken).ConfigureAwait(false);
response.Expect(BaseCommand.Type.LookupResponse);
@@ -83,13 +83,24 @@ namespace DotPulsar.Internal
lookup.Authoritative = response.LookupTopicResponse.Authoritative;
- serviceUrl = new Uri(GetBrokerServiceUrl(response.LookupTopicResponse));
+ var lookupResponseServiceUrl = new Uri(GetBrokerServiceUrl(response.LookupTopicResponse));
if (response.LookupTopicResponse.Response == CommandLookupTopicResponse.LookupType.Redirect || !response.LookupTopicResponse.Authoritative)
+ {
+ physicalUrl = lookupResponseServiceUrl;
continue;
+ }
+
+ if (response.LookupTopicResponse.ProxyThroughServiceUrl)
+ {
+ var url = new PulsarUrl(physicalUrl, lookupResponseServiceUrl);
+ return await GetConnection(url, cancellationToken).ConfigureAwait(false);
+ }
// LookupType is 'Connect', ServiceUrl is local and response is authoritative. Assume the Pulsar server is a standalone docker.
- return _serviceUrl.IsLoopback ? connection : await GetConnection(serviceUrl, cancellationToken).ConfigureAwait(false);
+ return lookupResponseServiceUrl.IsLoopback
+ ? connection
+ : await GetConnection(lookupResponseServiceUrl, cancellationToken).ConfigureAwait(false);
}
}
@@ -115,30 +126,42 @@ namespace DotPulsar.Internal
}
}
- private async ValueTask<Connection> GetConnection(Uri serviceUrl, CancellationToken cancellationToken)
+ private ValueTask<Connection> GetConnection(Uri serviceUrl, CancellationToken cancellationToken)
+ {
+ return GetConnection(new PulsarUrl(serviceUrl,serviceUrl), cancellationToken);
+ }
+
+ private async ValueTask<Connection> GetConnection(PulsarUrl url, CancellationToken cancellationToken)
{
using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
{
- if (_connections.TryGetValue(serviceUrl, out Connection connection))
+ if (_connections.TryGetValue(url, out Connection connection))
return connection;
- return await EstablishNewConnection(serviceUrl, cancellationToken).ConfigureAwait(false);
+ return await EstablishNewConnection(url, cancellationToken).ConfigureAwait(false);
}
}
- private async Task<Connection> EstablishNewConnection(Uri serviceUrl, CancellationToken cancellationToken)
+ private async Task<Connection> EstablishNewConnection(PulsarUrl url, CancellationToken cancellationToken)
{
- var stream = await _connector.Connect(serviceUrl).ConfigureAwait(false);
+ var stream = await _connector.Connect(url.Physical).ConfigureAwait(false);
var connection = new Connection(new PulsarStream(stream));
DotPulsarEventSource.Log.ConnectionCreated();
- _connections[serviceUrl] = connection;
- _ = connection.ProcessIncommingFrames(cancellationToken).ContinueWith(t => DisposeConnection(serviceUrl));
- var response = await connection.Send(_commandConnect, cancellationToken).ConfigureAwait(false);
+ _connections[url] = connection;
+ _ = connection.ProcessIncommingFrames(cancellationToken).ContinueWith(t => DisposeConnection(url));
+ var commandConnect = _commandConnect;
+
+ if (url.ProxyThroughServiceUrl)
+ {
+ commandConnect = WithProxyToBroker(_commandConnect, url.Logical);
+ }
+
+ var response = await connection.Send(commandConnect, cancellationToken).ConfigureAwait(false);
response.Expect(BaseCommand.Type.Connected);
return connection;
}
- private async ValueTask DisposeConnection(Uri serviceUrl)
+ private async ValueTask DisposeConnection(PulsarUrl serviceUrl)
{
if (_connections.TryRemove(serviceUrl, out Connection connection))
{
@@ -147,6 +170,22 @@ namespace DotPulsar.Internal
}
}
+ private static CommandConnect WithProxyToBroker(CommandConnect commandConnect, Uri logicalUrl)
+ {
+ return new CommandConnect
+ {
+ AuthData = commandConnect.AuthData,
+ AuthMethod = commandConnect.AuthMethod,
+ AuthMethodName = commandConnect.AuthMethodName,
+ ClientVersion = commandConnect.ClientVersion,
+ OriginalPrincipal = commandConnect.OriginalPrincipal,
+ ProtocolVersion = commandConnect.ProtocolVersion,
+ OriginalAuthData = commandConnect.OriginalAuthData,
+ OriginalAuthMethod = commandConnect.OriginalAuthMethod,
+ ProxyToBrokerUrl = $"{logicalUrl.Host}:{logicalUrl.Port}"
+ };
+ }
+
private async Task CloseInactiveConnections(TimeSpan interval, CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
@@ -175,5 +214,48 @@ namespace DotPulsar.Internal
}
}
}
+
+ private class PulsarUrl: IEquatable<PulsarUrl>
+ {
+ public PulsarUrl(Uri physical, Uri logical)
+ {
+ Physical = physical;
+ Logical = logical;
+ ProxyThroughServiceUrl = physical != logical;
+ }
+ public Uri Physical { get; }
+ public Uri Logical { get; }
+
+ public bool ProxyThroughServiceUrl { get; }
+
+ public bool Equals(PulsarUrl? other)
+ {
+ if (ReferenceEquals(null, other))
+ return false;
+
+ if (ReferenceEquals(this, other))
+ return true;
+ return Physical.Equals(other.Physical) && Logical.Equals(other.Logical);
+ }
+
+ public override bool Equals(object? obj)
+ {
+ if (ReferenceEquals(null, obj))
+ return false;
+
+ if (ReferenceEquals(this, obj))
+ return true;
+
+ if (obj.GetType() != this.GetType())
+ return false;
+ return Equals((PulsarUrl) obj);
+ }
+
+ public override int GetHashCode()
+ => HashCode.Combine(Physical, Logical);
+
+ public override string ToString()
+ => $"{nameof(Physical)}: {Physical}, {nameof(Logical)}: {Logical}, {nameof(ProxyThroughServiceUrl)}: {ProxyThroughServiceUrl}";
+ }
}
}