You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/09/23 18:44:35 UTC

[GitHub] [pulsar] nodece opened a new pull request, #17831: [fix][proxy] Fix refresh client auth

nodece opened a new pull request, #17831:
URL: https://github.com/apache/pulsar/pull/17831

   Fixes https://github.com/apache/pulsar/issues/10816
   
   ### Motivation
   
   The proxy client cannot refresh the original authentication data immediately. The user(original) client still works, but there will be network disconnection.
   
   New version:
   ```
   2022-09-23T11:18:36,607 - INFO  - [pulsar-proxy-io-38-2:ProxyConnection@341] - [/127.0.0.1:51556] complete connection, init proxy handler. authenticated with token role client, hasProxyToBrokerUrl: false
   2022-09-23T11:18:36,615 - INFO  - [pulsar-proxy-io-38-5:ConnectionPool@245] - [[id: 0x3e7cc63b, L:/127.0.0.1:51557 - R:localhost/127.0.0.1:51549]] Connected to server
   2022-09-23T11:18:36,635 - INFO  - [pulsar-io-6-1:ServerCnx@299] - New connection from /127.0.0.1:51557
   2022-09-23T11:18:36,691 - INFO  - [pulsar-client-io-40-1:ProducerStatsRecorderImpl@106] - Starting Pulsar producer perf with config: {"topicName":"persistent://my-tenant/my-ns/my-topic1","producerName":null,"sendTimeoutMs":30000,"blockIfQueueFull":false,"maxPendingMessages":0,"maxPendingMessagesAcrossPartitions":0,"messageRoutingMode":"RoundRobinPartition","hashingScheme":"JavaStringHash","cryptoFailureAction":"FAIL","batchingMaxPublishDelayMicros":1000,"batchingPartitionSwitchFrequencyByPublishDelay":10,"batchingMaxMessages":1000,"batchingMaxBytes":131072,"batchingEnabled":true,"chunkingEnabled":false,"chunkMaxMessageSize":-1,"compressionType":"NONE","initialSequenceId":null,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"multiSchema":true,"accessMode":"Shared","lazyStartPartitionedProducers":false,"properties":{},"initialSubscriptionName":null}
   2022-09-23T11:18:36,693 - INFO  - [pulsar-client-io-40-1:ProducerStatsRecorderImpl@107] - Pulsar client config: {"serviceUrl":"pulsar://localhost:51554/","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":3000,"lookupTimeoutMs":3000,"statsIntervalSeconds":60,"numIoThreads":1,"numListenerThreads":1,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsKeyStoreType":"JKS","tlsKeyStorePath":nu
 ll,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null}
   2022-09-23T11:18:36,710 - INFO  - [metadata-store-12-1:NamespaceBundleFactory@186] - Policy updated for namespace Optional[my-tenant/my-ns], refreshing the bundle cache.
   2022-09-23T11:18:36,738 - INFO  - [pulsar-4-1:ModularLoadManagerImpl@857] - 1 brokers being considered for assignment of my-tenant/my-ns/0x00000000_0xffffffff
   2022-09-23T11:18:36,740 - INFO  - [pulsar-4-1:OwnershipCache@199] - Trying to acquire ownership of my-tenant/my-ns/0x00000000_0xffffffff
   2022-09-23T11:18:36,742 - INFO  - [metadata-store-12-1:ResourceLockImpl@166] - Acquired resource lock on /namespace/my-tenant/my-ns/0x00000000_0xffffffff
   2022-09-23T11:18:36,742 - INFO  - [metadata-store-12-1:OwnershipCache@205] - Successfully acquired ownership of OwnedBundle(bundle=my-tenant/my-ns/0x00000000_0xffffffff, isActive=1)
   2022-09-23T11:18:36,743 - INFO  - [pulsar-4-2:PulsarService@1187] - Loading all topics on bundle: my-tenant/my-ns/0x00000000_0xffffffff
   2022-09-23T11:18:36,750 - INFO  - [pulsar-client-io-40-1:ConnectionPool@245] - [[id: 0x399d1371, L:/127.0.0.1:51558 - R:localhost/127.0.0.1:51554]] Connected to server
   2022-09-23T11:18:36,750 - INFO  - [pulsar-client-io-40-1:ClientCnx@255] - [id: 0x399d1371, L:/127.0.0.1:51558 - R:localhost/127.0.0.1:51554] Connected through proxy to target broker at localhost:51549
   2022-09-23T11:18:36,751 - INFO  - [pulsar-proxy-io-38-7:ProxyConnection@186] - [/127.0.0.1:51558] New connection opened
   2022-09-23T11:18:36,762 - INFO  - [pulsar-proxy-io-38-7:ProxyConnection@341] - [/127.0.0.1:51558] complete connection, init proxy handler. authenticated with token role client, hasProxyToBrokerUrl: true
   2022-09-23T11:18:36,770 - INFO  - [pulsar-io-6-2:ServerCnx@299] - New connection from /127.0.0.1:51559
   2022-09-23T11:18:36,776 - INFO  - [pulsar-client-io-40-1:ProducerImpl@1639] - [persistent://my-tenant/my-ns/my-topic1] [null] Creating producer on cnx [id: 0x399d1371, L:/127.0.0.1:51558 - R:localhost/127.0.0.1:51554]
   2022-09-23T11:18:36,836 - INFO  - [pulsar-io-6-2:ManagedLedgerImpl@356] - Opening managed ledger my-tenant/my-ns/persistent/my-topic1
   2022-09-23T11:18:36,841 - INFO  - [bookkeeper-ml-scheduler-OrderedScheduler-4-0:MetaStoreImpl@113] - Creating '/managed-ledgers/my-tenant/my-ns/persistent/my-topic1'
   2022-09-23T11:18:36,882 - INFO  - [mock-pulsar-bk-OrderedExecutor-0-0:PulsarMockBookKeeper@122] - Creating ledger 3
   2022-09-23T11:18:36,914 - INFO  - [mock-pulsar-bk-OrderedExecutor-0-0:ManagedLedgerImpl@505] - [my-tenant/my-ns/persistent/my-topic1] Created ledger 3
   2022-09-23T11:18:36,942 - INFO  - [bookkeeper-ml-scheduler-OrderedScheduler-4-0:ManagedLedgerFactoryImpl$2@380] - [my-tenant/my-ns/persistent/my-topic1] Successfully initialize managed ledger
   2022-09-23T11:18:36,984 - INFO  - [bookkeeper-ml-scheduler-OrderedScheduler-4-0:BrokerService$2@1502] - Created topic persistent://my-tenant/my-ns/my-topic1 - dedup is disabled
   2022-09-23T11:18:37,000 - INFO  - [bookkeeper-ml-scheduler-OrderedScheduler-4-0:ServerCnx@1474] - [/127.0.0.1:51559] Created new producer: Producer{topic=PersistentTopic{topic=persistent://my-tenant/my-ns/my-topic1}, client=/127.0.0.1:51559, producerName=proxy-authorization-0-0, producerId=0}
   2022-09-23T11:18:37,006 - INFO  - [pulsar-client-io-40-1:ProducerImpl@1694] - [persistent://my-tenant/my-ns/my-topic1] [proxy-authorization-0-0] Created producer on cnx [id: 0x399d1371, L:/127.0.0.1:51558 - R:localhost/127.0.0.1:51554]
   2022-09-23T11:18:47,999 - INFO  - [pulsar-io-6-2:ServerCnx@769] - [/127.0.0.1:51559] Refreshing authentication credentials for originalPrincipal client and authRole Proxy
   2022-09-23T11:18:47,999 - INFO  - [pulsar-io-6-1:ServerCnx@769] - [/127.0.0.1:51557] Refreshing authentication credentials for originalPrincipal client and authRole Proxy
   2022-09-23T11:18:48,008 - INFO  - [pulsar-io-6-2:ServerCnx@730] - [/127.0.0.1:51559] Refreshed authentication credentials for role client
   2022-09-23T11:18:48,010 - INFO  - [pulsar-io-6-1:ServerCnx@730] - [/127.0.0.1:51557] Refreshed authentication credentials for role client
   ```
   Old version:
   ```
   2022-09-23T11:17:45,889 - INFO  - [pulsar-client-io-40-1:ConnectionPool@245] - [[id: 0x155081f5, L:/127.0.0.1:51513 - R:localhost/127.0.0.1:51508]] Connected to server
   2022-09-23T11:17:45,890 - INFO  - [pulsar-client-io-40-1:ClientCnx@255] - [id: 0x155081f5, L:/127.0.0.1:51513 - R:localhost/127.0.0.1:51508] Connected through proxy to target broker at localhost:51503
   2022-09-23T11:17:45,891 - INFO  - [pulsar-proxy-io-38-7:ProxyConnection@186] - [/127.0.0.1:51513] New connection opened
   2022-09-23T11:17:45,902 - INFO  - [pulsar-proxy-io-38-7:ProxyConnection@341] - [/127.0.0.1:51513] complete connection, init proxy handler. authenticated with token role client, hasProxyToBrokerUrl: true
   2022-09-23T11:17:45,911 - INFO  - [pulsar-io-6-2:ServerCnx@299] - New connection from /127.0.0.1:51514
   2022-09-23T11:17:45,917 - INFO  - [pulsar-client-io-40-1:ProducerImpl@1639] - [persistent://my-tenant/my-ns/my-topic1] [null] Creating producer on cnx [id: 0x155081f5, L:/127.0.0.1:51513 - R:localhost/127.0.0.1:51508]
   2022-09-23T11:17:45,993 - INFO  - [pulsar-io-6-2:ManagedLedgerImpl@356] - Opening managed ledger my-tenant/my-ns/persistent/my-topic1
   2022-09-23T11:17:45,996 - INFO  - [bookkeeper-ml-scheduler-OrderedScheduler-4-0:MetaStoreImpl@113] - Creating '/managed-ledgers/my-tenant/my-ns/persistent/my-topic1'
   2022-09-23T11:17:46,038 - INFO  - [mock-pulsar-bk-OrderedExecutor-0-0:PulsarMockBookKeeper@122] - Creating ledger 3
   2022-09-23T11:17:46,077 - INFO  - [mock-pulsar-bk-OrderedExecutor-0-0:ManagedLedgerImpl@505] - [my-tenant/my-ns/persistent/my-topic1] Created ledger 3
   2022-09-23T11:17:46,111 - INFO  - [bookkeeper-ml-scheduler-OrderedScheduler-4-0:ManagedLedgerFactoryImpl$2@380] - [my-tenant/my-ns/persistent/my-topic1] Successfully initialize managed ledger
   2022-09-23T11:17:46,160 - INFO  - [bookkeeper-ml-scheduler-OrderedScheduler-4-0:BrokerService$2@1502] - Created topic persistent://my-tenant/my-ns/my-topic1 - dedup is disabled
   2022-09-23T11:17:46,177 - INFO  - [bookkeeper-ml-scheduler-OrderedScheduler-4-0:ServerCnx@1474] - [/127.0.0.1:51514] Created new producer: Producer{topic=PersistentTopic{topic=persistent://my-tenant/my-ns/my-topic1}, client=/127.0.0.1:51514, producerName=proxy-authorization-0-0, producerId=0}
   2022-09-23T11:17:46,183 - INFO  - [pulsar-client-io-40-1:ProducerImpl@1694] - [persistent://my-tenant/my-ns/my-topic1] [proxy-authorization-0-0] Created producer on cnx [id: 0x155081f5, L:/127.0.0.1:51513 - R:localhost/127.0.0.1:51508]
   2022-09-23T11:17:56,653 - INFO  - [pulsar-io-6-1:ServerCnx@769] - [/127.0.0.1:51511] Refreshing authentication credentials for originalPrincipal client and authRole Proxy
   2022-09-23T11:17:56,653 - INFO  - [pulsar-io-6-2:ServerCnx@769] - [/127.0.0.1:51514] Refreshing authentication credentials for originalPrincipal client and authRole Proxy
   2022-09-23T11:17:56,660 - WARN  - [pulsar-io-6-1:ServerCnx@726] - [/127.0.0.1:51511] Principal cannot change during an authentication refresh expected=client got=Proxy
   2022-09-23T11:17:56,661 - INFO  - [pulsar-io-6-2:ServerCnx@730] - [/127.0.0.1:51514] Refreshed authentication credentials for role client
   2022-09-23T11:17:56,661 - INFO  - [pulsar-io-6-1:ServerCnx@311] - Closed connection from /127.0.0.1:51511
   2022-09-23T11:17:56,661 - INFO  - [pulsar-proxy-io-38-5:ClientCnx@292] - [id: 0x23766f8b, L:/127.0.0.1:51511 ! R:localhost/127.0.0.1:51503] Disconnected
   2022-09-23T11:18:02,580 - INFO  - [pulsar-proxy-io-38-10:ConnectionPool@245] - [[id: 0xe8cbf738, L:/127.0.0.1:51523 - R:localhost/127.0.0.1:51503]] Connected to server
   2022-09-23T11:18:02,582 - INFO  - [pulsar-io-6-4:ServerCnx@299] - New connection from /127.0.0.1:51523
   2022-09-23T11:18:02,588 - INFO  - [pulsar-io-6-4:ServerCnx@3082] - [/127.0.0.1:51523] Failed to authenticate: operation=connect, principal=Proxy, reason=Failed to authentication token: JWT expired at 2022-09-23T03:17:55Z. Current time: 2022-09-23T03:18:02Z, a difference of 7586 milliseconds.  Allowed clock skew: 0 milliseconds.
   2022-09-23T11:18:02,590 - INFO  - [pulsar-io-6-4:ServerCnx@311] - Closed connection from /127.0.0.1:51523
   2022-09-23T11:18:02,590 - WARN  - [pulsar-proxy-io-38-10:ClientCnx@732] - [id: 0xe8cbf738, L:/127.0.0.1:51523 - R:localhost/127.0.0.1:51503] Received error from server: Unable to authenticate
   2022-09-23T11:18:02,590 - WARN  - [pulsar-proxy-io-38-10:ConnectionPool@282] - [[id: 0xe8cbf738, L:/127.0.0.1:51523 - R:localhost/127.0.0.1:51503]] Connection handshake failed: org.apache.pulsar.client.api.PulsarClientException$AuthenticationException: Unable to authenticate
   2022-09-23T11:18:02,591 - ERROR - [pulsar-client-io-40-1:ClientCnx@1138] - [id: 0xb8d2d6d6, L:/127.0.0.1:51510 - R:localhost/127.0.0.1:51508] Close connection because received internal-server error {"errorMsg":"org.apache.pulsar.client.api.PulsarClientException$AuthenticationException: Unable to authenticate","reqId":676244056976135601, "remote":"localhost/127.0.0.1:51508", "local":"/127.0.0.1:51510"}
   2022-09-23T11:18:02,591 - ERROR - [pulsar-proxy-io-38-10:ClientCnx@741] - [id: 0xe8cbf738, L:/127.0.0.1:51523 ! R:localhost/127.0.0.1:51503] Failed to authenticate the client
   2022-09-23T11:18:02,591 - WARN  - [pulsar-proxy-io-38-10:ClientCnx@753] - [id: 0xe8cbf738, L:/127.0.0.1:51523 ! R:localhost/127.0.0.1:51503] Received unknown request id from server: -1
   2022-09-23T11:18:02,592 - INFO  - [pulsar-proxy-io-38-10:ClientCnx@292] - [id: 0xe8cbf738, L:/127.0.0.1:51523 ! R:localhost/127.0.0.1:51503] Disconnected
   2022-09-23T11:18:02,592 - INFO  - [pulsar-proxy-io-38-2:ProxyConnection@199] - [/127.0.0.1:51510] Connection closed
   2022-09-23T11:18:02,592 - WARN  - [pulsar-client-io-40-1:BinaryProtoLookupService@198] - [persistent://my-tenant/my-ns/my-topic1] failed to get Partitioned metadata : {"errorMsg":"org.apache.pulsar.client.api.PulsarClientException$AuthenticationException: Unable to authenticate","reqId":676244056976135601, "remote":"localhost/127.0.0.1:51508", "local":"/127.0.0.1:51510"}
   org.apache.pulsar.client.api.PulsarClientException$LookupException: {"errorMsg":"org.apache.pulsar.client.api.PulsarClientException$AuthenticationException: Unable to authenticate","reqId":676244056976135601, "remote":"localhost/127.0.0.1:51508", "local":"/127.0.0.1:51510"}
   	at org.apache.pulsar.client.impl.ClientCnx.getPulsarClientException(ClientCnx.java:1224) ~[classes/:?]
   	at org.apache.pulsar.client.impl.ClientCnx.handlePartitionResponse(ClientCnx.java:640) ~[classes/:?]
   	at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:134) ~[classes/:?]
   	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.77.Final.jar:4.1.77.Final]
   	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.77.Final.jar:4.1.77.Final]
   	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.77.Final.jar:4.1.77.Final]
   	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327) ~[netty-codec-4.1.77.Final.jar:4.1.77.Final]
   	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:314) ~[netty-codec-4.1.77.Final.jar:4.1.77.Final]
   	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:435) ~[netty-codec-4.1.77.Final.jar:4.1.77.Final]
   	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:279) ~[netty-codec-4.1.77.Final.jar:4.1.77.Final]
   	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.77.Final.jar:4.1.77.Final]
   	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.77.Final.jar:4.1.77.Final]
   	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.77.Final.jar:4.1.77.Final]
   	at io.netty.handler.flush.FlushConsolidationHandler.channelRead(FlushConsolidationHandler.java:152) ~[netty-handler-4.1.77.Final.jar:4.1.77.Final]
   	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.77.Final.jar:4.1.77.Final]
   	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.77.Final.jar:4.1.77.Final]
   	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.77.Final.jar:4.1.77.Final]
   	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.77.Final.jar:4.1.77.Final]
   	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.77.Final.jar:4.1.77.Final]
   	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.77.Final.jar:4.1.77.Final]
   	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.77.Final.jar:4.1.77.Final]
   	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.77.Final.jar:4.1.77.Final]
   	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722) ~[netty-transport-4.1.77.Final.jar:4.1.77.Final]
   	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658) ~[netty-transport-4.1.77.Final.jar:4.1.77.Final]
   	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584) ~[netty-transport-4.1.77.Final.jar:4.1.77.Final]
   	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496) ~[netty-transport-4.1.77.Final.jar:4.1.77.Final]
   	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:995) ~[netty-common-4.1.77.Final.jar:4.1.77.Final]
   	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.77.Final.jar:4.1.77.Final]
   	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.77.Final.jar:4.1.77.Final]
   	at java.lang.Thread.run(Thread.java:833) ~[?:?]
   2022-09-23T11:18:02,593 - ERROR - [pulsar-client-io-40-1:ClientCnx@1138] - [id: 0xb8d2d6d6, L:/127.0.0.1:51510 ! R:localhost/127.0.0.1:51508] Close connection because received internal-server error {"errorMsg":"org.apache.pulsar.client.api.PulsarClientException$AuthenticationException: Unable to authenticate","reqId":676244056976135600, "remote":"localhost/127.0.0.1:51508", "local":"/127.0.0.1:51510"}
   2022-09-23T11:18:02,593 - WARN  - [pulsar-client-io-40-1:BinaryProtoLookupService@198] - [persistent://my-tenant/my-ns/my-topic1] failed to get Partitioned metadata : {"errorMsg":"org.apache.pulsar.client.api.PulsarClientException$AuthenticationException: Unable to authenticate","reqId":676244056976135600, "remote":"localhost/127.0.0.1:51508", "local":"/127.0.0.1:51510"}
   org.apache.pulsar.client.api.PulsarClientException$LookupException: {"errorMsg":"org.apache.pulsar.client.api.PulsarClientException$AuthenticationException: Unable to authenticate","reqId":676244056976135600, "remote":"localhost/127.0.0.1:51508", "local":"/127.0.0.1:51510"}
   	at org.apache.pulsar.client.impl.ClientCnx.getPulsarClientException(ClientCnx.java:1224) ~[classes/:?]
   	at org.apache.pulsar.client.impl.ClientCnx.handlePartitionResponse(ClientCnx.java:640) ~[classes/:?]
   	at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:134) ~[classes/:?]
   	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.77.Final.jar:4.1.77.Final]
   	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.77.Final.jar:4.1.77.Final]
   	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.77.Final.jar:4.1.77.Final]
   	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327) ~[netty-codec-4.1.77.Final.jar:4.1.77.Final]
   	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:299) ~[netty-codec-4.1.77.Final.jar:4.1.77.Final]
   	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.77.Final.jar:4.1.77.Final]
   	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.77.Final.jar:4.1.77.Final]
   	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.77.Final.jar:4.1.77.Final]
   	at io.netty.handler.flush.FlushConsolidationHandler.channelRead(FlushConsolidationHandler.java:152) ~[netty-handler-4.1.77.Final.jar:4.1.77.Final]
   	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.77.Final.jar:4.1.77.Final]
   	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.77.Final.jar:4.1.77.Final]
   	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.77.Final.jar:4.1.77.Final]
   	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.77.Final.jar:4.1.77.Final]
   	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.77.Final.jar:4.1.77.Final]
   	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.77.Final.jar:4.1.77.Final]
   	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.77.Final.jar:4.1.77.Final]
   	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.77.Final.jar:4.1.77.Final]
   	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722) ~[netty-transport-4.1.77.Final.jar:4.1.77.Final]
   	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658) ~[netty-transport-4.1.77.Final.jar:4.1.77.Final]
   	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584) ~[netty-transport-4.1.77.Final.jar:4.1.77.Final]
   	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496) ~[netty-transport-4.1.77.Final.jar:4.1.77.Final]
   	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:995) ~[netty-common-4.1.77.Final.jar:4.1.77.Final]
   	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.77.Final.jar:4.1.77.Final]
   	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.77.Final.jar:4.1.77.Final]
   	at java.lang.Thread.run(Thread.java:833) ~[?:?]
   2022-09-23T11:18:02,594 - INFO  - [pulsar-client-io-40-1:ClientCnx@292] - [id: 0xb8d2d6d6, L:/127.0.0.1:51510 ! R:localhost/127.0.0.1:51508] Disconnected
   2022-09-23T11:18:02,594 - WARN  - [pulsar-client-io-40-1:BinaryProtoLookupService@198] - [persistent://my-tenant/my-ns/my-topic1] failed to get Partitioned metadata : Disconnected from server at localhost/127.0.0.1:51508
   org.apache.pulsar.client.api.PulsarClientException$ConnectException: Disconnected from server at localhost/127.0.0.1:51508
   	at org.apache.pulsar.client.impl.ClientCnx.channelInactive(ClientCnx.java:298) ~[classes/:?]
   	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262) ~[netty-transport-4.1.77.Final.jar:4.1.77.Final]
   	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248) ~[netty-transport-4.1.77.Final.jar:4.1.77.Final]
   	at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241) ~[netty-transport-4.1.77.Final.jar:4.1.77.Final]
   	at io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:392) ~[netty-codec-4.1.77.Final.jar:4.1.77.Final]
   	at io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:357) ~[netty-codec-4.1.77.Final.jar:4.1.77.Final]
   	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262) ~[netty-transport-4.1.77.Final.jar:4.1.77.Final]
   	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248) ~[netty-transport-4.1.77.Final.jar:4.1.77.Final]
   	at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241) ~[netty-transport-4.1.77.Final.jar:4.1.77.Final]
   	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405) ~[netty-transport-4.1.77.Final.jar:4.1.77.Final]
   	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262) ~[netty-transport-4.1.77.Final.jar:4.1.77.Final]
   	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248) ~[netty-transport-4.1.77.Final.jar:4.1.77.Final]
   	at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901) ~[netty-transport-4.1.77.Final.jar:4.1.77.Final]
   	at io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:813) ~[netty-transport-4.1.77.Final.jar:4.1.77.Final]
   	at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174) ~[netty-common-4.1.77.Final.jar:4.1.77.Final]
   	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167) ~[netty-common-4.1.77.Final.jar:4.1.77.Final]
   	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470) ~[netty-common-4.1.77.Final.jar:4.1.77.Final]
   	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500) ~[netty-transport-4.1.77.Final.jar:4.1.77.Final]
   	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:995) ~[netty-common-4.1.77.Final.jar:4.1.77.Final]
   	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.77.Final.jar:4.1.77.Final]
   	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.77.Final.jar:4.1.77.Final]
   	at java.lang.Thread.run(Thread.java:833) ~[?:?]
   ````
   ### Modifications
   
   - Improve the ClientCnx and ProxyClientCnx to override method
   
   ### Documentation
   
   <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
   
   - [ ] `doc-required` 
   (Your PR needs to update docs and you will update later)
   
   - [x] `doc-not-needed` 
   (Please explain why)
   
   - [ ] `doc` 
   (Your PR contains doc changes)
   
   - [ ] `doc-complete`
   (Docs have been already added)
   
   ### Matching PR in forked repository
   
   PR in forked repository: <!-- ENTER URL HERE 
   
   After opening this PR, the build in apache/pulsar will fail and instructions will
   be provided for opening a PR in the PR author's forked repository.
   
   apache/pulsar pull requests should be first tested in your own fork since the 
   apache/pulsar CI based on GitHub Actions has constrained resources and quota.
   GitHub Actions provides separate quota for pull requests that are executed in 
   a forked repository.
   
   The tests will be run in the forked repository until all PR review comments have
   been handled, the tests pass and the PR is approved by a reviewer.
   
   -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nodece commented on pull request #17831: [fix][proxy] Fix refresh client auth

Posted by GitBox <gi...@apache.org>.
nodece commented on PR #17831:
URL: https://github.com/apache/pulsar/pull/17831#issuecomment-1259319610

   @lin-zhao @codelipenghui  Thank you all! These changes have been fixed, please help to review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nodece commented on a diff in pull request #17831: [fix][proxy] Fix refresh client auth

Posted by GitBox <gi...@apache.org>.
nodece commented on code in PR #17831:
URL: https://github.com/apache/pulsar/pull/17831#discussion_r983331693


##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java:
##########
@@ -54,10 +59,54 @@ protected ByteBuf newConnectCommand() throws Exception {
 
         authenticationDataProvider = authentication.getAuthData(remoteHostName);
         AuthData authData = authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA);
-        return Commands.newConnect(authentication.getAuthMethodName(), authData, this.protocolVersion,
-            PulsarVersion.getVersion(), proxyToTargetBrokerAddress, clientAuthRole, clientAuthData,
-            clientAuthMethod);
+        return Commands.newConnect(authentication.getAuthMethodName(), authData, protocolVersion,
+                PulsarVersion.getVersion(), proxyToTargetBrokerAddress, clientAuthRole, clientAuthData,
+                clientAuthMethod);
     }
 
-    private static final Logger log = LoggerFactory.getLogger(ProxyClientCnx.class);
+    @Override
+    protected void handleAuthChallenge(CommandAuthChallenge authChallenge) {
+        checkArgument(authChallenge.hasChallenge());
+        checkArgument(authChallenge.getChallenge().hasAuthData());
+
+        boolean isRefresh = Arrays.equals(AuthData.REFRESH_AUTH_DATA_BYTES, authChallenge.getChallenge().getAuthData());
+        if (!forwardClientAuthData || !isRefresh) {
+            super.handleAuthChallenge(authChallenge);
+            return;
+        }
+
+        try {
+            if (log.isDebugEnabled()) {
+                log.debug("Proxy {} request to refresh the original client authentication data for "
+                        + "the proxy client {}", proxyConnection.ctx().channel(), ctx.channel());
+            }
+
+            proxyConnection.ctx().writeAndFlush(Commands.newAuthChallenge(clientAuthMethod, AuthData.REFRESH_AUTH_DATA,

Review Comment:
   It looks like it could be improved, but I remember there's only one connection here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nodece commented on a diff in pull request #17831: [fix][proxy] Fix refresh client auth

Posted by GitBox <gi...@apache.org>.
nodece commented on code in PR #17831:
URL: https://github.com/apache/pulsar/pull/17831#discussion_r983008794


##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java:
##########
@@ -54,10 +67,33 @@ protected ByteBuf newConnectCommand() throws Exception {
 
         authenticationDataProvider = authentication.getAuthData(remoteHostName);
         AuthData authData = authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA);
-        return Commands.newConnect(authentication.getAuthMethodName(), authData, this.protocolVersion,
-            PulsarVersion.getVersion(), proxyToTargetBrokerAddress, clientAuthRole, clientAuthData,
-            clientAuthMethod);
+        return Commands.newConnect(authentication.getAuthMethodName(), authData, protocolVersion,
+                PulsarVersion.getVersion(), proxyToTargetBrokerAddress, clientAuthRole, clientAuthData,
+                clientAuthMethod);
     }
 
-    private static final Logger log = LoggerFactory.getLogger(ProxyClientCnx.class);
+    @Override
+    protected void handleAuthChallenge(CommandAuthChallenge authChallenge) {
+        checkArgument(authChallenge.hasChallenge());
+        checkArgument(authChallenge.getChallenge().hasAuthData());
+
+        boolean isRefresh = Arrays.equals(AuthData.REFRESH_AUTH_DATA_BYTES, authChallenge.getChallenge().getAuthData());
+        if (!forwardClientAuthData || !isRefresh || refreshClientAuthDataNotifier == null) {

Review Comment:
   When no refresher, we didn't refresh the authentication data of the user client.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nodece commented on a diff in pull request #17831: [fix][proxy] Fix refresh client auth

Posted by GitBox <gi...@apache.org>.
nodece commented on code in PR #17831:
URL: https://github.com/apache/pulsar/pull/17831#discussion_r983008160


##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java:
##########
@@ -18,30 +18,43 @@
  */
 package org.apache.pulsar.proxy.server;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.EventLoopGroup;
+import java.util.Arrays;
+import java.util.function.Supplier;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.PulsarVersion;
 import org.apache.pulsar.client.impl.ClientCnx;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.common.api.AuthData;
+import org.apache.pulsar.common.api.proto.CommandAuthChallenge;
 import org.apache.pulsar.common.protocol.Commands;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
+@Slf4j
 public class ProxyClientCnx extends ClientCnx {
+    private final boolean forwardClientAuthData;
+    private final String clientAuthMethod;
+    private final String clientAuthRole;
+    private final AuthData clientAuthData;

Review Comment:
   No, this value is only used to send the connect command.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nodece commented on a diff in pull request #17831: [fix][proxy] Fix refresh client auth

Posted by GitBox <gi...@apache.org>.
nodece commented on code in PR #17831:
URL: https://github.com/apache/pulsar/pull/17831#discussion_r980960357


##########
pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java:
##########
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.proxy.server;
+
+import static org.mockito.Mockito.spy;
+import static org.testng.Assert.assertEquals;
+import com.google.common.collect.Sets;
+import io.jsonwebtoken.SignatureAlgorithm;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import javax.crypto.SecretKey;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
+import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.mockito.Mockito;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class ProxyRefreshAuthTest extends ProducerConsumerBase {
+    private final SecretKey SECRET_KEY = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
+
+    private ProxyService proxyService;
+    private final ProxyConfiguration proxyConfig = new ProxyConfiguration();
+
+    @Override
+    protected void doInitConf() throws Exception {
+        super.doInitConf();
+
+        // enable tls and auth&auth at broker
+        conf.setAuthenticationEnabled(true);
+        conf.setAuthorizationEnabled(false);
+        conf.setTopicLevelPoliciesEnabled(false);
+        conf.setProxyRoles(Collections.singleton("Proxy"));
+        conf.setAdvertisedAddress(null);
+        conf.setAuthenticateOriginalAuthData(true);
+        conf.setBrokerServicePort(Optional.of(0));
+        conf.setWebServicePort(Optional.of(0));
+
+        Set<String> superUserRoles = new HashSet<>();
+        superUserRoles.add("superUser");
+        conf.setSuperUserRoles(superUserRoles);
+
+        conf.setAuthenticationProviders(Set.of(AuthenticationProviderToken.class.getName()));
+        Properties properties = new Properties();
+        properties.setProperty("tokenSecretKey", AuthTokenUtils.encodeKeyBase64(SECRET_KEY));
+        conf.setProperties(properties);
+
+        conf.setClusterName("proxy-authorization");
+        conf.setNumExecutorThreadPoolSize(5);
+
+        conf.setAuthenticationRefreshCheckSeconds(3);
+    }
+
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        super.init();
+
+        // start proxy service
+        proxyConfig.setAuthenticationEnabled(true);
+        proxyConfig.setAuthorizationEnabled(false);
+        proxyConfig.setForwardAuthorizationCredentials(true);
+        proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl());
+        proxyConfig.setAdvertisedAddress(null);
+
+        proxyConfig.setServicePort(Optional.of(0));
+        proxyConfig.setBrokerProxyAllowedTargetPorts("*");
+        proxyConfig.setWebServicePort(Optional.of(0));
+
+        proxyConfig.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName());
+        proxyConfig.setBrokerClientAuthenticationParameters(
+                AuthTokenUtils.createToken(SECRET_KEY, "Proxy", Optional.empty()));
+        proxyConfig.setAuthenticationProviders(Set.of(AuthenticationProviderToken.class.getName()));
+        Properties properties = new Properties();
+        properties.setProperty("tokenSecretKey", AuthTokenUtils.encodeKeyBase64(SECRET_KEY));
+        proxyConfig.setProperties(properties);
+
+        proxyService = Mockito.spy(new ProxyService(proxyConfig,
+                new AuthenticationService(
+                        PulsarConfigurationLoader.convertFrom(proxyConfig))));
+    }
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+        proxyService.close();
+    }
+
+    private void startProxy(boolean forwardAuthData) throws Exception {
+        pulsar.getConfiguration().setAuthenticateOriginalAuthData(forwardAuthData);
+        proxyConfig.setForwardAuthorizationCredentials(forwardAuthData);
+        proxyService.start();
+    }
+
+    @DataProvider
+    Object[] forwardAuthDataProvider() {
+        return new Object[]{true,false};
+    }
+
+    @Test(dataProvider = "forwardAuthDataProvider")
+    public void testAuthDataRefresh(boolean forwardAuthData) throws Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        startProxy(forwardAuthData);
+
+        AuthenticationToken authenticationToken = new AuthenticationToken(() -> {
+            Calendar calendar = Calendar.getInstance();
+            calendar.add(Calendar.SECOND, 10);
+            return AuthTokenUtils.createToken(SECRET_KEY, "client", Optional.of(calendar.getTime()));
+        });
+
+        pulsarClient = PulsarClient.builder().serviceUrl(proxyService.getServiceUrl())
+                .authentication(authenticationToken)
+                .operationTimeout(3, TimeUnit.SECONDS)
+                .build();
+
+        admin = PulsarAdmin.builder().serviceHttpUrl(pulsar.getWebServiceAddress())
+                .authentication(authenticationToken).build();
+
+        String namespaceName = "my-tenant/my-ns";
+        admin.clusters().createCluster("proxy-authorization",
+                ClusterData.builder().serviceUrlTls(brokerUrlTls.toString()).build());
+        admin.tenants().createTenant("my-tenant",
+                new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("proxy-authorization")));
+        admin.namespaces().createNamespace(namespaceName);
+
+        String topic = "persistent://my-tenant/my-ns/my-topic1";
+        Producer<byte[]> producer = spy(pulsarClient.newProducer()
+                .topic(topic).create());
+        int i = 0;
+        while (i <= 15) {
+            producer.send(String.valueOf(i).getBytes(StandardCharsets.UTF_8));
+            i++;
+            Thread.sleep(1000);
+        }
+
+        List<CompletableFuture<List<String>>> futures = new ArrayList<>(10);
+        for (i = 0; i < 10; i++) {
+            futures.add(pulsarClient.getPartitionsForTopic(topic));
+        }

Review Comment:
   Ok, I've figured out how to test it.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nodece commented on a diff in pull request #17831: [fix][proxy] Fix refresh client auth

Posted by GitBox <gi...@apache.org>.
nodece commented on code in PR #17831:
URL: https://github.com/apache/pulsar/pull/17831#discussion_r980963890


##########
pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java:
##########
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.proxy.server;
+
+import static org.mockito.Mockito.spy;
+import static org.testng.Assert.assertEquals;
+import com.google.common.collect.Sets;
+import io.jsonwebtoken.SignatureAlgorithm;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import javax.crypto.SecretKey;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
+import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.mockito.Mockito;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class ProxyRefreshAuthTest extends ProducerConsumerBase {
+    private final SecretKey SECRET_KEY = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
+
+    private ProxyService proxyService;
+    private final ProxyConfiguration proxyConfig = new ProxyConfiguration();
+
+    @Override
+    protected void doInitConf() throws Exception {
+        super.doInitConf();
+
+        // enable tls and auth&auth at broker
+        conf.setAuthenticationEnabled(true);
+        conf.setAuthorizationEnabled(false);
+        conf.setTopicLevelPoliciesEnabled(false);
+        conf.setProxyRoles(Collections.singleton("Proxy"));
+        conf.setAdvertisedAddress(null);
+        conf.setAuthenticateOriginalAuthData(true);
+        conf.setBrokerServicePort(Optional.of(0));
+        conf.setWebServicePort(Optional.of(0));
+
+        Set<String> superUserRoles = new HashSet<>();
+        superUserRoles.add("superUser");
+        conf.setSuperUserRoles(superUserRoles);
+
+        conf.setAuthenticationProviders(Set.of(AuthenticationProviderToken.class.getName()));
+        Properties properties = new Properties();
+        properties.setProperty("tokenSecretKey", AuthTokenUtils.encodeKeyBase64(SECRET_KEY));
+        conf.setProperties(properties);
+
+        conf.setClusterName("proxy-authorization");
+        conf.setNumExecutorThreadPoolSize(5);
+
+        conf.setAuthenticationRefreshCheckSeconds(3);

Review Comment:
   This interval is too short.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nodece commented on a diff in pull request #17831: [fix][proxy] Fix refresh client auth

Posted by GitBox <gi...@apache.org>.
nodece commented on code in PR #17831:
URL: https://github.com/apache/pulsar/pull/17831#discussion_r980814437


##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java:
##########
@@ -18,46 +18,61 @@
  */
 package org.apache.pulsar.proxy.server;
 
-import io.netty.buffer.ByteBuf;
 import io.netty.channel.EventLoopGroup;
-import org.apache.pulsar.PulsarVersion;
+import java.util.Arrays;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import javax.naming.AuthenticationException;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.impl.ClientCnx;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.common.api.AuthData;
-import org.apache.pulsar.common.protocol.Commands;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.pulsar.common.api.proto.CommandAuthChallenge;
 
+@Slf4j
 public class ProxyClientCnx extends ClientCnx {
-
-    String clientAuthRole;
-    AuthData clientAuthData;
-    String clientAuthMethod;
-    int protocolVersion;
+    private final boolean forwardClientAuthData;
+    private final String clientAuthMethod;
+    private final String clientAuthRole;
+    private final Function<Boolean, CompletableFuture<AuthData>> clientAuthDataSupplier;
 
     public ProxyClientCnx(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, String clientAuthRole,
-                          AuthData clientAuthData, String clientAuthMethod, int protocolVersion) {
-        super(conf, eventLoopGroup);
+                          Function<Boolean, CompletableFuture<AuthData>> clientAuthDataSupplier,
+                          String clientAuthMethod,
+                          int protocolVersion, boolean forwardClientAuthData) {
+        super(conf, eventLoopGroup, protocolVersion);
         this.clientAuthRole = clientAuthRole;
-        this.clientAuthData = clientAuthData;
+        this.clientAuthDataSupplier = clientAuthDataSupplier;
         this.clientAuthMethod = clientAuthMethod;
-        this.protocolVersion = protocolVersion;
+        this.forwardClientAuthData = forwardClientAuthData;
     }
 
     @Override
-    protected ByteBuf newConnectCommand() throws Exception {
-        if (log.isDebugEnabled()) {
-            log.debug("New Connection opened via ProxyClientCnx with params clientAuthRole = {},"
-                            + " clientAuthData = {}, clientAuthMethod = {}",
-                    clientAuthRole, clientAuthData, clientAuthMethod);
+    protected void completeActive() {
+        clientAuthDataSupplier.apply(false).thenAccept(clientAuthData -> {
+            try {
+                sendConnectCommand(clientAuthRole, clientAuthData, clientAuthMethod);
+            } catch (Exception e) {
+                log.error("{} Error during handshake", ctx.channel(), e);
+                close(e);
+            }
+        });
+    }
+
+    @Override
+    protected void prepareMutualAuth(CommandAuthChallenge authChallenge) throws AuthenticationException {
+        boolean isRefresh = Arrays.equals(AuthData.REFRESH_AUTH_DATA_BYTES, authChallenge.getChallenge().getAuthData());
+        if (!forwardClientAuthData || !isRefresh) {
+            super.prepareMutualAuth(authChallenge);
+            return;
         }
 
-        authenticationDataProvider = authentication.getAuthData(remoteHostName);
-        AuthData authData = authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA);
-        return Commands.newConnect(authentication.getAuthMethodName(), authData, this.protocolVersion,
-            PulsarVersion.getVersion(), proxyToTargetBrokerAddress, clientAuthRole, clientAuthData,
-            clientAuthMethod);
+        clientAuthDataSupplier.apply(true).thenAccept(originalClientAuthData -> {
+            sendMutualAuthCommand(clientAuthMethod, originalClientAuthData);

Review Comment:
   The `clientAuthMethod` is the user client, not the proxy client, and we support the different auth methods.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #17831: [fix][proxy] Fix refresh client auth

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #17831:
URL: https://github.com/apache/pulsar/pull/17831#discussion_r983029975


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java:
##########
@@ -453,5 +453,8 @@ public void doMarkAndReleaseUselessConnections(){
         // Do release idle connections.
         releaseIdleConnectionTaskList.forEach(Runnable::run);
     }
-}
 
+    public ConcurrentMap<InetSocketAddress, ConcurrentMap<Integer, CompletableFuture<ClientCnx>>> getPool() {
+        return pool;
+    }

Review Comment:
   We should return a new structure or immutable structure. Otherwise, the caller can modify the map which will introduce unpredictable behavior
   
   My suggestion is we can change to `getConnections()` and only return a List structure.



##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java:
##########
@@ -535,6 +542,42 @@ protected void handleAuthResponse(CommandAuthResponse authResponse) {
         try {
             AuthData clientData = AuthData.of(authResponse.getResponse().getAuthData());
             doAuthentication(clientData);
+            if (connectionPool != null && state == State.ProxyLookupRequests) {
+                if (service.getConfiguration().isForwardAuthorizationCredentials()) {
+                    connectionPool.getPool().values().forEach(n -> n.values().forEach(cnxFuture -> {
+                        String clientVersion;
+                        if (authResponse.hasClientVersion()) {
+                            clientVersion = authResponse.getClientVersion();
+                        } else {
+                            clientVersion = PulsarVersion.getVersion();
+                        }
+                        int protocolVersion;
+                        if (authResponse.hasProtocolVersion()) {
+                            protocolVersion = authResponse.getProtocolVersion();
+                        } else {
+                            protocolVersion = Commands.getCurrentProtocolVersion();
+                        }
+
+                        ByteBuf cmd =
+                                Commands.newAuthResponse(clientAuthMethod, clientData, protocolVersion, clientVersion);
+                        cnxFuture.thenAccept(clientCnx -> clientCnx.ctx().writeAndFlush(cmd)
+                                .addListener(writeFuture -> {
+                                    if (writeFuture.isSuccess()) {
+                                        if (LOG.isDebugEnabled()) {
+                                            LOG.debug(
+                                                    "{} authentication is refreshed successfully by {}, auth method: {} ",
+                                                    clientCnx.ctx().channel(), ctx.channel(), clientAuthMethod);
+                                        }
+                                    } else {
+                                        LOG.error("{} Failed to refresh request for mutual auth to client {}",
+                                                clientCnx.ctx().channel(),
+                                                writeFuture.cause());
+                                        ctx.channel().pipeline().fireExceptionCaught(writeFuture.cause());

Review Comment:
   It should be `toServerCnx`, not the `ctx`? The `ctx` is from client to proxy. Here means we are not able to write data to the broker through the `toServerCnx` right?



##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java:
##########
@@ -535,6 +542,42 @@ protected void handleAuthResponse(CommandAuthResponse authResponse) {
         try {
             AuthData clientData = AuthData.of(authResponse.getResponse().getAuthData());
             doAuthentication(clientData);
+            if (connectionPool != null && state == State.ProxyLookupRequests) {
+                if (service.getConfiguration().isForwardAuthorizationCredentials()) {
+                    connectionPool.getPool().values().forEach(n -> n.values().forEach(cnxFuture -> {
+                        String clientVersion;
+                        if (authResponse.hasClientVersion()) {
+                            clientVersion = authResponse.getClientVersion();
+                        } else {
+                            clientVersion = PulsarVersion.getVersion();
+                        }
+                        int protocolVersion;
+                        if (authResponse.hasProtocolVersion()) {
+                            protocolVersion = authResponse.getProtocolVersion();
+                        } else {
+                            protocolVersion = Commands.getCurrentProtocolVersion();
+                        }
+
+                        ByteBuf cmd =
+                                Commands.newAuthResponse(clientAuthMethod, clientData, protocolVersion, clientVersion);
+                        cnxFuture.thenAccept(clientCnx -> clientCnx.ctx().writeAndFlush(cmd)

Review Comment:
   And the name is also confusing, it should be `toBrokerCnx`?



##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java:
##########
@@ -54,10 +66,33 @@ protected ByteBuf newConnectCommand() throws Exception {
 
         authenticationDataProvider = authentication.getAuthData(remoteHostName);
         AuthData authData = authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA);
-        return Commands.newConnect(authentication.getAuthMethodName(), authData, this.protocolVersion,
-            PulsarVersion.getVersion(), proxyToTargetBrokerAddress, clientAuthRole, clientAuthData,
-            clientAuthMethod);
+        return Commands.newConnect(authentication.getAuthMethodName(), authData, protocolVersion,
+                PulsarVersion.getVersion(), proxyToTargetBrokerAddress, clientAuthRole, clientAuthData,
+                clientAuthMethod);
     }
 
-    private static final Logger log = LoggerFactory.getLogger(ProxyClientCnx.class);
+    @Override
+    protected void handleAuthChallenge(CommandAuthChallenge authChallenge) {
+        checkArgument(authChallenge.hasChallenge());
+        checkArgument(authChallenge.getChallenge().hasAuthData());
+
+        boolean isRefresh = Arrays.equals(AuthData.REFRESH_AUTH_DATA_BYTES, authChallenge.getChallenge().getAuthData());
+        if (!forwardClientAuthData || !isRefresh || refreshClientAuthDataNotifier == null) {
+            super.handleAuthChallenge(authChallenge);
+            return;
+        }
+
+        try {
+            if (log.isDebugEnabled()) {
+                log.debug("{} Request to refresh the original client authentication data", ctx.channel());
+            }
+            refreshClientAuthDataNotifier.run();

Review Comment:
   Looks like we can only pass the `ctx` and `protocolVersion` to `ProxyClientCnx` from `ProxyConnection`, so we don't need a callback mechanism to complete the auth challenge handling



##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java:
##########
@@ -535,6 +542,42 @@ protected void handleAuthResponse(CommandAuthResponse authResponse) {
         try {
             AuthData clientData = AuthData.of(authResponse.getResponse().getAuthData());
             doAuthentication(clientData);
+            if (connectionPool != null && state == State.ProxyLookupRequests) {
+                if (service.getConfiguration().isForwardAuthorizationCredentials()) {
+                    connectionPool.getPool().values().forEach(n -> n.values().forEach(cnxFuture -> {
+                        String clientVersion;
+                        if (authResponse.hasClientVersion()) {
+                            clientVersion = authResponse.getClientVersion();
+                        } else {
+                            clientVersion = PulsarVersion.getVersion();
+                        }
+                        int protocolVersion;
+                        if (authResponse.hasProtocolVersion()) {
+                            protocolVersion = authResponse.getProtocolVersion();
+                        } else {
+                            protocolVersion = Commands.getCurrentProtocolVersion();
+                        }
+
+                        ByteBuf cmd =
+                                Commands.newAuthResponse(clientAuthMethod, clientData, protocolVersion, clientVersion);
+                        cnxFuture.thenAccept(clientCnx -> clientCnx.ctx().writeAndFlush(cmd)

Review Comment:
   I think we need some logs if the `cnxFuture` is completed with exception



##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java:
##########
@@ -423,16 +425,22 @@ public void brokerConnected(DirectProxyHandler directProxyHandler, CommandConnec
     }
 
     // According to auth result, send newConnected or newAuthChallenge command.
-    private void doAuthentication(AuthData clientData) throws Exception {
+    private void doAuthentication(AuthData clientData)
+            throws Exception {
         AuthData brokerData = authState.authenticate(clientData);
         // authentication has completed, will send newConnected command.
         if (authState.isComplete()) {
             clientAuthRole = authState.getAuthRole();
             if (LOG.isDebugEnabled()) {
                 LOG.debug("[{}] Client successfully authenticated with {} role {}",
-                    remoteAddress, authMethod, clientAuthRole);
+                        remoteAddress, authMethod, clientAuthRole);
+            }
+
+            // First connection
+            if (this.connectionPool == null || state == State.Connecting) {

Review Comment:
   Why do we need to check `this.connectionPool == null`?
   It looks like to avoid calling `completeConnect(clientData);` multiple times?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nodece commented on a diff in pull request #17831: [fix][proxy] Fix refresh client auth

Posted by GitBox <gi...@apache.org>.
nodece commented on code in PR #17831:
URL: https://github.com/apache/pulsar/pull/17831#discussion_r983077155


##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java:
##########
@@ -535,6 +542,42 @@ protected void handleAuthResponse(CommandAuthResponse authResponse) {
         try {
             AuthData clientData = AuthData.of(authResponse.getResponse().getAuthData());
             doAuthentication(clientData);
+            if (connectionPool != null && state == State.ProxyLookupRequests) {
+                if (service.getConfiguration().isForwardAuthorizationCredentials()) {
+                    connectionPool.getPool().values().forEach(n -> n.values().forEach(cnxFuture -> {
+                        String clientVersion;
+                        if (authResponse.hasClientVersion()) {
+                            clientVersion = authResponse.getClientVersion();
+                        } else {
+                            clientVersion = PulsarVersion.getVersion();
+                        }
+                        int protocolVersion;
+                        if (authResponse.hasProtocolVersion()) {
+                            protocolVersion = authResponse.getProtocolVersion();
+                        } else {
+                            protocolVersion = Commands.getCurrentProtocolVersion();
+                        }
+
+                        ByteBuf cmd =
+                                Commands.newAuthResponse(clientAuthMethod, clientData, protocolVersion, clientVersion);
+                        cnxFuture.thenAccept(clientCnx -> clientCnx.ctx().writeAndFlush(cmd)
+                                .addListener(writeFuture -> {
+                                    if (writeFuture.isSuccess()) {
+                                        if (LOG.isDebugEnabled()) {
+                                            LOG.debug(
+                                                    "{} authentication is refreshed successfully by {}, auth method: {} ",
+                                                    clientCnx.ctx().channel(), ctx.channel(), clientAuthMethod);
+                                        }
+                                    } else {
+                                        LOG.error("{} Failed to refresh request for mutual auth to client {}",
+                                                clientCnx.ctx().channel(),
+                                                writeFuture.cause());
+                                        ctx.channel().pipeline().fireExceptionCaught(writeFuture.cause());

Review Comment:
   > If we write to the broker failed, should we retry or close the user client channel?
   
   Close the proxy client channel.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on pull request #17831: [fix][proxy] Fix refresh client auth

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on PR #17831:
URL: https://github.com/apache/pulsar/pull/17831#issuecomment-1261618380

   @nodece Please check the check style issue https://github.com/nodece/pulsar/pull/5


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #17831: [fix][proxy] Fix refresh client auth

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #17831:
URL: https://github.com/apache/pulsar/pull/17831#discussion_r982977833


##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java:
##########
@@ -543,6 +577,35 @@ protected void handleAuthResponse(CommandAuthResponse authResponse) {
         }
     }
 
+    private Void requestRefreshClientAuthData() {

Review Comment:
   why return `Void`?



##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java:
##########
@@ -513,7 +548,7 @@ remoteAddress, protocolVersionToAdvertise, getRemoteEndpointProtocolVersion(),
 
             authState = authenticationProvider.newAuthState(clientData, remoteAddress, sslSession);
             authenticationData = authState.getAuthDataSource();
-            doAuthentication(clientData);
+            doAuthentication(clientData, Optional.empty());

Review Comment:
   Passing the optional as a parameter is not a good approach I think.



##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java:
##########
@@ -423,16 +430,44 @@ public void brokerConnected(DirectProxyHandler directProxyHandler, CommandConnec
     }
 
     // According to auth result, send newConnected or newAuthChallenge command.
-    private void doAuthentication(AuthData clientData) throws Exception {
+    private void doAuthentication(AuthData clientData, Optional<CommandAuthResponse> authResponseOptional)
+            throws Exception {
         AuthData brokerData = authState.authenticate(clientData);
         // authentication has completed, will send newConnected command.
         if (authState.isComplete()) {
             clientAuthRole = authState.getAuthRole();
             if (LOG.isDebugEnabled()) {
                 LOG.debug("[{}] Client successfully authenticated with {} role {}",
-                    remoteAddress, authMethod, clientAuthRole);
+                        remoteAddress, authMethod, clientAuthRole);
+            }
+
+            // First connection
+            if (this.connectionPool == null || state == State.Connecting) {
+                // authentication has completed, will send newConnected command.
+                completeConnect(clientData);
+            } else if (state == State.ProxyLookupRequests) {

Review Comment:
   Could moving this logic out of `doAuthentication` make this method look better? According to its name, only the operation of authentication is done.
   I think we can move this method to `handleAuthResponse`, please let me know what do you think.



##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java:
##########
@@ -54,10 +67,33 @@ protected ByteBuf newConnectCommand() throws Exception {
 
         authenticationDataProvider = authentication.getAuthData(remoteHostName);
         AuthData authData = authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA);
-        return Commands.newConnect(authentication.getAuthMethodName(), authData, this.protocolVersion,
-            PulsarVersion.getVersion(), proxyToTargetBrokerAddress, clientAuthRole, clientAuthData,
-            clientAuthMethod);
+        return Commands.newConnect(authentication.getAuthMethodName(), authData, protocolVersion,
+                PulsarVersion.getVersion(), proxyToTargetBrokerAddress, clientAuthRole, clientAuthData,
+                clientAuthMethod);
     }
 
-    private static final Logger log = LoggerFactory.getLogger(ProxyClientCnx.class);
+    @Override
+    protected void handleAuthChallenge(CommandAuthChallenge authChallenge) {
+        checkArgument(authChallenge.hasChallenge());
+        checkArgument(authChallenge.getChallenge().hasAuthData());
+
+        boolean isRefresh = Arrays.equals(AuthData.REFRESH_AUTH_DATA_BYTES, authChallenge.getChallenge().getAuthData());
+        if (!forwardClientAuthData || !isRefresh || refreshClientAuthDataNotifier == null) {

Review Comment:
   Question:
   Which condition the `refreshClientAuthDataNotifier` equal null?
   Could we check it when we create the `ProxyClientCnx`? 



##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java:
##########
@@ -54,10 +67,33 @@ protected ByteBuf newConnectCommand() throws Exception {
 
         authenticationDataProvider = authentication.getAuthData(remoteHostName);
         AuthData authData = authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA);
-        return Commands.newConnect(authentication.getAuthMethodName(), authData, this.protocolVersion,
-            PulsarVersion.getVersion(), proxyToTargetBrokerAddress, clientAuthRole, clientAuthData,
-            clientAuthMethod);
+        return Commands.newConnect(authentication.getAuthMethodName(), authData, protocolVersion,
+                PulsarVersion.getVersion(), proxyToTargetBrokerAddress, clientAuthRole, clientAuthData,
+                clientAuthMethod);
     }
 
-    private static final Logger log = LoggerFactory.getLogger(ProxyClientCnx.class);
+    @Override
+    protected void handleAuthChallenge(CommandAuthChallenge authChallenge) {
+        checkArgument(authChallenge.hasChallenge());
+        checkArgument(authChallenge.getChallenge().hasAuthData());
+
+        boolean isRefresh = Arrays.equals(AuthData.REFRESH_AUTH_DATA_BYTES, authChallenge.getChallenge().getAuthData());
+        if (!forwardClientAuthData || !isRefresh || refreshClientAuthDataNotifier == null) {
+            super.handleAuthChallenge(authChallenge);
+            return;
+        }
+
+        if (log.isDebugEnabled()) {
+            log.debug("Request to refresh the original client authentication data");
+        }
+        try {
+            refreshClientAuthDataNotifier.get();
+            if (state == State.SentConnectFrame) {
+                state = State.Connecting;

Review Comment:
   Does this state conflict with `handleAuthResponse`? Because I see the logic check 
   ```
     // First connection
   if (this.connectionPool == null || state == State.Connecting) {
        // authentication has completed, will send newConnected command.
         completeConnect(clientData);
   } 
   ```
   



##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java:
##########
@@ -18,30 +18,43 @@
  */
 package org.apache.pulsar.proxy.server;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.EventLoopGroup;
+import java.util.Arrays;
+import java.util.function.Supplier;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.PulsarVersion;
 import org.apache.pulsar.client.impl.ClientCnx;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.common.api.AuthData;
+import org.apache.pulsar.common.api.proto.CommandAuthChallenge;
 import org.apache.pulsar.common.protocol.Commands;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
+@Slf4j
 public class ProxyClientCnx extends ClientCnx {
+    private final boolean forwardClientAuthData;
+    private final String clientAuthMethod;
+    private final String clientAuthRole;
+    private final AuthData clientAuthData;

Review Comment:
   When we refresh the `authData`, should we update this value?



##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java:
##########
@@ -543,6 +577,35 @@ protected void handleAuthResponse(CommandAuthResponse authResponse) {
         }
     }
 
+    private Void requestRefreshClientAuthData() {
+        if (!service.getConfiguration().isForwardAuthorizationCredentials()) {

Review Comment:
   I'm not sure if we have to check this again. because it looks like already checked at `ProxyClientCnx`. but it's fine.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nodece commented on a diff in pull request #17831: [fix][proxy] Fix refresh client auth

Posted by GitBox <gi...@apache.org>.
nodece commented on code in PR #17831:
URL: https://github.com/apache/pulsar/pull/17831#discussion_r983055072


##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java:
##########
@@ -535,6 +542,42 @@ protected void handleAuthResponse(CommandAuthResponse authResponse) {
         try {
             AuthData clientData = AuthData.of(authResponse.getResponse().getAuthData());
             doAuthentication(clientData);
+            if (connectionPool != null && state == State.ProxyLookupRequests) {
+                if (service.getConfiguration().isForwardAuthorizationCredentials()) {
+                    connectionPool.getPool().values().forEach(n -> n.values().forEach(cnxFuture -> {
+                        String clientVersion;
+                        if (authResponse.hasClientVersion()) {
+                            clientVersion = authResponse.getClientVersion();
+                        } else {
+                            clientVersion = PulsarVersion.getVersion();
+                        }
+                        int protocolVersion;
+                        if (authResponse.hasProtocolVersion()) {
+                            protocolVersion = authResponse.getProtocolVersion();
+                        } else {
+                            protocolVersion = Commands.getCurrentProtocolVersion();
+                        }
+
+                        ByteBuf cmd =
+                                Commands.newAuthResponse(clientAuthMethod, clientData, protocolVersion, clientVersion);
+                        cnxFuture.thenAccept(clientCnx -> clientCnx.ctx().writeAndFlush(cmd)
+                                .addListener(writeFuture -> {
+                                    if (writeFuture.isSuccess()) {
+                                        if (LOG.isDebugEnabled()) {
+                                            LOG.debug(
+                                                    "{} authentication is refreshed successfully by {}, auth method: {} ",
+                                                    clientCnx.ctx().channel(), ctx.channel(), clientAuthMethod);
+                                        }
+                                    } else {
+                                        LOG.error("{} Failed to refresh request for mutual auth to client {}",
+                                                clientCnx.ctx().channel(),
+                                                writeFuture.cause());
+                                        ctx.channel().pipeline().fireExceptionCaught(writeFuture.cause());

Review Comment:
   ~You are right!~
   
   It should be `toBrokerCnx`, disconnect the broker and proxy client.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nodece commented on a diff in pull request #17831: [fix][proxy] Fix refresh client auth

Posted by GitBox <gi...@apache.org>.
nodece commented on code in PR #17831:
URL: https://github.com/apache/pulsar/pull/17831#discussion_r983077155


##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java:
##########
@@ -535,6 +542,42 @@ protected void handleAuthResponse(CommandAuthResponse authResponse) {
         try {
             AuthData clientData = AuthData.of(authResponse.getResponse().getAuthData());
             doAuthentication(clientData);
+            if (connectionPool != null && state == State.ProxyLookupRequests) {
+                if (service.getConfiguration().isForwardAuthorizationCredentials()) {
+                    connectionPool.getPool().values().forEach(n -> n.values().forEach(cnxFuture -> {
+                        String clientVersion;
+                        if (authResponse.hasClientVersion()) {
+                            clientVersion = authResponse.getClientVersion();
+                        } else {
+                            clientVersion = PulsarVersion.getVersion();
+                        }
+                        int protocolVersion;
+                        if (authResponse.hasProtocolVersion()) {
+                            protocolVersion = authResponse.getProtocolVersion();
+                        } else {
+                            protocolVersion = Commands.getCurrentProtocolVersion();
+                        }
+
+                        ByteBuf cmd =
+                                Commands.newAuthResponse(clientAuthMethod, clientData, protocolVersion, clientVersion);
+                        cnxFuture.thenAccept(clientCnx -> clientCnx.ctx().writeAndFlush(cmd)
+                                .addListener(writeFuture -> {
+                                    if (writeFuture.isSuccess()) {
+                                        if (LOG.isDebugEnabled()) {
+                                            LOG.debug(
+                                                    "{} authentication is refreshed successfully by {}, auth method: {} ",
+                                                    clientCnx.ctx().channel(), ctx.channel(), clientAuthMethod);
+                                        }
+                                    } else {
+                                        LOG.error("{} Failed to refresh request for mutual auth to client {}",
+                                                clientCnx.ctx().channel(),
+                                                writeFuture.cause());
+                                        ctx.channel().pipeline().fireExceptionCaught(writeFuture.cause());

Review Comment:
   > If we write to the broker failed, should we retry or close the user client channel?
   Close the proxy client channel.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #17831: [fix][proxy] Fix refresh client auth

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #17831:
URL: https://github.com/apache/pulsar/pull/17831#discussion_r980699338


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java:
##########
@@ -253,8 +254,25 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception {
         } else {
             log.info("{} Connected through proxy to target broker at {}", ctx.channel(), proxyToTargetBrokerAddress);
         }
-        // Send CONNECT command
-        ctx.writeAndFlush(newConnectCommand())
+        completeActive();
+    }
+
+    protected void completeActive() throws Exception {
+        sendConnectCommand(null, null, null);

Review Comment:
   It should not be `sendConnectCommand(null, null, null);` right?
   Otherwise, how can we pass the client's auth data to the new connection command
   
   https://github.com/apache/pulsar/pull/17831/files#diff-454fee306809542a71a0f3ce2cd97d7325afcfd7027dc3d156d303aff4c29902L275-L278



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #17831: [fix][proxy] Fix refresh client auth

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #17831:
URL: https://github.com/apache/pulsar/pull/17831#discussion_r980669864


##########
pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java:
##########
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.proxy.server;
+
+import static org.mockito.Mockito.spy;
+import static org.testng.Assert.assertEquals;
+import com.google.common.collect.Sets;
+import io.jsonwebtoken.SignatureAlgorithm;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import javax.crypto.SecretKey;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
+import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.mockito.Mockito;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class ProxyRefreshAuthTest extends ProducerConsumerBase {
+    private final SecretKey SECRET_KEY = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
+
+    private ProxyService proxyService;
+    private final ProxyConfiguration proxyConfig = new ProxyConfiguration();
+
+    @Override
+    protected void doInitConf() throws Exception {
+        super.doInitConf();
+
+        // enable tls and auth&auth at broker
+        conf.setAuthenticationEnabled(true);
+        conf.setAuthorizationEnabled(false);
+        conf.setTopicLevelPoliciesEnabled(false);
+        conf.setProxyRoles(Collections.singleton("Proxy"));
+        conf.setAdvertisedAddress(null);
+        conf.setAuthenticateOriginalAuthData(true);
+        conf.setBrokerServicePort(Optional.of(0));
+        conf.setWebServicePort(Optional.of(0));
+
+        Set<String> superUserRoles = new HashSet<>();
+        superUserRoles.add("superUser");
+        conf.setSuperUserRoles(superUserRoles);
+
+        conf.setAuthenticationProviders(Set.of(AuthenticationProviderToken.class.getName()));
+        Properties properties = new Properties();
+        properties.setProperty("tokenSecretKey", AuthTokenUtils.encodeKeyBase64(SECRET_KEY));
+        conf.setProperties(properties);
+
+        conf.setClusterName("proxy-authorization");
+        conf.setNumExecutorThreadPoolSize(5);
+
+        conf.setAuthenticationRefreshCheckSeconds(3);
+    }
+
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        super.init();
+
+        // start proxy service
+        proxyConfig.setAuthenticationEnabled(true);
+        proxyConfig.setAuthorizationEnabled(false);
+        proxyConfig.setForwardAuthorizationCredentials(true);
+        proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl());
+        proxyConfig.setAdvertisedAddress(null);
+
+        proxyConfig.setServicePort(Optional.of(0));
+        proxyConfig.setBrokerProxyAllowedTargetPorts("*");
+        proxyConfig.setWebServicePort(Optional.of(0));
+
+        proxyConfig.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName());
+        proxyConfig.setBrokerClientAuthenticationParameters(
+                AuthTokenUtils.createToken(SECRET_KEY, "Proxy", Optional.empty()));
+        proxyConfig.setAuthenticationProviders(Set.of(AuthenticationProviderToken.class.getName()));
+        Properties properties = new Properties();
+        properties.setProperty("tokenSecretKey", AuthTokenUtils.encodeKeyBase64(SECRET_KEY));
+        proxyConfig.setProperties(properties);
+
+        proxyService = Mockito.spy(new ProxyService(proxyConfig,
+                new AuthenticationService(
+                        PulsarConfigurationLoader.convertFrom(proxyConfig))));
+    }
+
+    @AfterMethod(alwaysRun = true)

Review Comment:
   ```suggestion
       @AfterClass(alwaysRun = true)
   ```



##########
pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java:
##########
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.proxy.server;
+
+import static org.mockito.Mockito.spy;
+import static org.testng.Assert.assertEquals;
+import com.google.common.collect.Sets;
+import io.jsonwebtoken.SignatureAlgorithm;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import javax.crypto.SecretKey;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
+import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.mockito.Mockito;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class ProxyRefreshAuthTest extends ProducerConsumerBase {
+    private final SecretKey SECRET_KEY = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
+
+    private ProxyService proxyService;
+    private final ProxyConfiguration proxyConfig = new ProxyConfiguration();
+
+    @Override
+    protected void doInitConf() throws Exception {
+        super.doInitConf();
+
+        // enable tls and auth&auth at broker
+        conf.setAuthenticationEnabled(true);
+        conf.setAuthorizationEnabled(false);
+        conf.setTopicLevelPoliciesEnabled(false);
+        conf.setProxyRoles(Collections.singleton("Proxy"));
+        conf.setAdvertisedAddress(null);
+        conf.setAuthenticateOriginalAuthData(true);
+        conf.setBrokerServicePort(Optional.of(0));
+        conf.setWebServicePort(Optional.of(0));
+
+        Set<String> superUserRoles = new HashSet<>();
+        superUserRoles.add("superUser");
+        conf.setSuperUserRoles(superUserRoles);
+
+        conf.setAuthenticationProviders(Set.of(AuthenticationProviderToken.class.getName()));
+        Properties properties = new Properties();
+        properties.setProperty("tokenSecretKey", AuthTokenUtils.encodeKeyBase64(SECRET_KEY));
+        conf.setProperties(properties);
+
+        conf.setClusterName("proxy-authorization");
+        conf.setNumExecutorThreadPoolSize(5);
+
+        conf.setAuthenticationRefreshCheckSeconds(3);
+    }
+
+    @BeforeMethod

Review Comment:
   ```suggestion
       @BeforeClass
   ```



##########
pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java:
##########
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.proxy.server;
+
+import static org.mockito.Mockito.spy;
+import static org.testng.Assert.assertEquals;
+import com.google.common.collect.Sets;
+import io.jsonwebtoken.SignatureAlgorithm;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import javax.crypto.SecretKey;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
+import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.mockito.Mockito;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class ProxyRefreshAuthTest extends ProducerConsumerBase {
+    private final SecretKey SECRET_KEY = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
+
+    private ProxyService proxyService;
+    private final ProxyConfiguration proxyConfig = new ProxyConfiguration();
+
+    @Override
+    protected void doInitConf() throws Exception {
+        super.doInitConf();
+
+        // enable tls and auth&auth at broker
+        conf.setAuthenticationEnabled(true);
+        conf.setAuthorizationEnabled(false);
+        conf.setTopicLevelPoliciesEnabled(false);
+        conf.setProxyRoles(Collections.singleton("Proxy"));
+        conf.setAdvertisedAddress(null);
+        conf.setAuthenticateOriginalAuthData(true);
+        conf.setBrokerServicePort(Optional.of(0));
+        conf.setWebServicePort(Optional.of(0));
+
+        Set<String> superUserRoles = new HashSet<>();
+        superUserRoles.add("superUser");
+        conf.setSuperUserRoles(superUserRoles);
+
+        conf.setAuthenticationProviders(Set.of(AuthenticationProviderToken.class.getName()));
+        Properties properties = new Properties();
+        properties.setProperty("tokenSecretKey", AuthTokenUtils.encodeKeyBase64(SECRET_KEY));
+        conf.setProperties(properties);
+
+        conf.setClusterName("proxy-authorization");
+        conf.setNumExecutorThreadPoolSize(5);
+
+        conf.setAuthenticationRefreshCheckSeconds(3);
+    }
+
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        super.init();
+
+        // start proxy service
+        proxyConfig.setAuthenticationEnabled(true);
+        proxyConfig.setAuthorizationEnabled(false);
+        proxyConfig.setForwardAuthorizationCredentials(true);
+        proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl());
+        proxyConfig.setAdvertisedAddress(null);
+
+        proxyConfig.setServicePort(Optional.of(0));
+        proxyConfig.setBrokerProxyAllowedTargetPorts("*");
+        proxyConfig.setWebServicePort(Optional.of(0));
+
+        proxyConfig.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName());
+        proxyConfig.setBrokerClientAuthenticationParameters(
+                AuthTokenUtils.createToken(SECRET_KEY, "Proxy", Optional.empty()));
+        proxyConfig.setAuthenticationProviders(Set.of(AuthenticationProviderToken.class.getName()));
+        Properties properties = new Properties();
+        properties.setProperty("tokenSecretKey", AuthTokenUtils.encodeKeyBase64(SECRET_KEY));
+        proxyConfig.setProperties(properties);
+
+        proxyService = Mockito.spy(new ProxyService(proxyConfig,
+                new AuthenticationService(
+                        PulsarConfigurationLoader.convertFrom(proxyConfig))));
+    }
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+        proxyService.close();
+    }
+
+    private void startProxy(boolean forwardAuthData) throws Exception {
+        pulsar.getConfiguration().setAuthenticateOriginalAuthData(forwardAuthData);
+        proxyConfig.setForwardAuthorizationCredentials(forwardAuthData);
+        proxyService.start();
+    }
+
+    @DataProvider
+    Object[] forwardAuthDataProvider() {
+        return new Object[]{true,false};
+    }
+
+    @Test(dataProvider = "forwardAuthDataProvider")
+    public void testAuthDataRefresh(boolean forwardAuthData) throws Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        startProxy(forwardAuthData);
+
+        AuthenticationToken authenticationToken = new AuthenticationToken(() -> {
+            Calendar calendar = Calendar.getInstance();
+            calendar.add(Calendar.SECOND, 10);

Review Comment:
   Reducing the token timeout is better to make the test more efficient.



##########
pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java:
##########
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.proxy.server;
+
+import static org.mockito.Mockito.spy;
+import static org.testng.Assert.assertEquals;
+import com.google.common.collect.Sets;
+import io.jsonwebtoken.SignatureAlgorithm;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import javax.crypto.SecretKey;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
+import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.mockito.Mockito;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class ProxyRefreshAuthTest extends ProducerConsumerBase {
+    private final SecretKey SECRET_KEY = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
+
+    private ProxyService proxyService;
+    private final ProxyConfiguration proxyConfig = new ProxyConfiguration();
+
+    @Override
+    protected void doInitConf() throws Exception {
+        super.doInitConf();
+
+        // enable tls and auth&auth at broker
+        conf.setAuthenticationEnabled(true);
+        conf.setAuthorizationEnabled(false);
+        conf.setTopicLevelPoliciesEnabled(false);
+        conf.setProxyRoles(Collections.singleton("Proxy"));
+        conf.setAdvertisedAddress(null);
+        conf.setAuthenticateOriginalAuthData(true);
+        conf.setBrokerServicePort(Optional.of(0));
+        conf.setWebServicePort(Optional.of(0));
+
+        Set<String> superUserRoles = new HashSet<>();
+        superUserRoles.add("superUser");
+        conf.setSuperUserRoles(superUserRoles);
+
+        conf.setAuthenticationProviders(Set.of(AuthenticationProviderToken.class.getName()));
+        Properties properties = new Properties();
+        properties.setProperty("tokenSecretKey", AuthTokenUtils.encodeKeyBase64(SECRET_KEY));
+        conf.setProperties(properties);
+
+        conf.setClusterName("proxy-authorization");
+        conf.setNumExecutorThreadPoolSize(5);
+
+        conf.setAuthenticationRefreshCheckSeconds(3);

Review Comment:
   ```suggestion
           conf.setAuthenticationRefreshCheckSeconds(1);
   ```



##########
pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java:
##########
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.proxy.server;
+
+import static org.mockito.Mockito.spy;
+import static org.testng.Assert.assertEquals;
+import com.google.common.collect.Sets;
+import io.jsonwebtoken.SignatureAlgorithm;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import javax.crypto.SecretKey;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
+import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.mockito.Mockito;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class ProxyRefreshAuthTest extends ProducerConsumerBase {
+    private final SecretKey SECRET_KEY = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
+
+    private ProxyService proxyService;
+    private final ProxyConfiguration proxyConfig = new ProxyConfiguration();
+
+    @Override
+    protected void doInitConf() throws Exception {
+        super.doInitConf();
+
+        // enable tls and auth&auth at broker
+        conf.setAuthenticationEnabled(true);
+        conf.setAuthorizationEnabled(false);
+        conf.setTopicLevelPoliciesEnabled(false);
+        conf.setProxyRoles(Collections.singleton("Proxy"));
+        conf.setAdvertisedAddress(null);
+        conf.setAuthenticateOriginalAuthData(true);
+        conf.setBrokerServicePort(Optional.of(0));
+        conf.setWebServicePort(Optional.of(0));
+
+        Set<String> superUserRoles = new HashSet<>();
+        superUserRoles.add("superUser");
+        conf.setSuperUserRoles(superUserRoles);
+
+        conf.setAuthenticationProviders(Set.of(AuthenticationProviderToken.class.getName()));
+        Properties properties = new Properties();
+        properties.setProperty("tokenSecretKey", AuthTokenUtils.encodeKeyBase64(SECRET_KEY));
+        conf.setProperties(properties);
+
+        conf.setClusterName("proxy-authorization");
+        conf.setNumExecutorThreadPoolSize(5);
+
+        conf.setAuthenticationRefreshCheckSeconds(3);
+    }
+
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        super.init();
+
+        // start proxy service
+        proxyConfig.setAuthenticationEnabled(true);
+        proxyConfig.setAuthorizationEnabled(false);
+        proxyConfig.setForwardAuthorizationCredentials(true);
+        proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl());
+        proxyConfig.setAdvertisedAddress(null);
+
+        proxyConfig.setServicePort(Optional.of(0));
+        proxyConfig.setBrokerProxyAllowedTargetPorts("*");
+        proxyConfig.setWebServicePort(Optional.of(0));
+
+        proxyConfig.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName());
+        proxyConfig.setBrokerClientAuthenticationParameters(
+                AuthTokenUtils.createToken(SECRET_KEY, "Proxy", Optional.empty()));
+        proxyConfig.setAuthenticationProviders(Set.of(AuthenticationProviderToken.class.getName()));
+        Properties properties = new Properties();
+        properties.setProperty("tokenSecretKey", AuthTokenUtils.encodeKeyBase64(SECRET_KEY));
+        proxyConfig.setProperties(properties);
+
+        proxyService = Mockito.spy(new ProxyService(proxyConfig,
+                new AuthenticationService(
+                        PulsarConfigurationLoader.convertFrom(proxyConfig))));
+    }
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+        proxyService.close();
+    }
+
+    private void startProxy(boolean forwardAuthData) throws Exception {
+        pulsar.getConfiguration().setAuthenticateOriginalAuthData(forwardAuthData);
+        proxyConfig.setForwardAuthorizationCredentials(forwardAuthData);
+        proxyService.start();
+    }
+
+    @DataProvider
+    Object[] forwardAuthDataProvider() {
+        return new Object[]{true,false};
+    }
+
+    @Test(dataProvider = "forwardAuthDataProvider")
+    public void testAuthDataRefresh(boolean forwardAuthData) throws Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        startProxy(forwardAuthData);
+
+        AuthenticationToken authenticationToken = new AuthenticationToken(() -> {
+            Calendar calendar = Calendar.getInstance();
+            calendar.add(Calendar.SECOND, 10);
+            return AuthTokenUtils.createToken(SECRET_KEY, "client", Optional.of(calendar.getTime()));
+        });
+
+        pulsarClient = PulsarClient.builder().serviceUrl(proxyService.getServiceUrl())
+                .authentication(authenticationToken)
+                .operationTimeout(3, TimeUnit.SECONDS)
+                .build();
+
+        admin = PulsarAdmin.builder().serviceHttpUrl(pulsar.getWebServiceAddress())
+                .authentication(authenticationToken).build();
+
+        String namespaceName = "my-tenant/my-ns";
+        admin.clusters().createCluster("proxy-authorization",
+                ClusterData.builder().serviceUrlTls(brokerUrlTls.toString()).build());
+        admin.tenants().createTenant("my-tenant",
+                new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("proxy-authorization")));
+        admin.namespaces().createNamespace(namespaceName);
+
+        String topic = "persistent://my-tenant/my-ns/my-topic1";
+        Producer<byte[]> producer = spy(pulsarClient.newProducer()
+                .topic(topic).create());
+        int i = 0;
+        while (i <= 15) {
+            producer.send(String.valueOf(i).getBytes(StandardCharsets.UTF_8));
+            i++;
+            Thread.sleep(1000);
+        }
+
+        List<CompletableFuture<List<String>>> futures = new ArrayList<>(10);
+        for (i = 0; i < 10; i++) {
+            futures.add(pulsarClient.getPartitionsForTopic(topic));
+        }

Review Comment:
   Any reason that we need to call 10 times?
   And if without this fix, we will get an exception here?
   
   From the PR description, it looks like even without this fix, the client will finally refresh the token, so does the test can't pass without this fix?



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java:
##########
@@ -1243,6 +1254,13 @@ public void close() {
        }
     }
 
+    public void close(Throwable e) {

Review Comment:
   It should be `protected`, the `ProxyClientCnx` will also call this method.



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java:
##########
@@ -361,51 +369,54 @@ protected void handleConnected(CommandConnected connected) {
         state = State.Ready;
     }
 
-    @Override
-    protected void handleAuthChallenge(CommandAuthChallenge authChallenge) {
-        checkArgument(authChallenge.hasChallenge());
-        checkArgument(authChallenge.getChallenge().hasAuthData());
+    protected final void sendMutualAuthCommand(String authMethod, AuthData authData) {
+        if (log.isDebugEnabled()) {
+            log.debug("{} Mutual auth {}", ctx.channel(), authMethod);
+        }
 
+        ByteBuf request = Commands.newAuthResponse(authMethod,
+                authData,
+                this.protocolVersion,
+                PulsarVersion.getVersion());
+
+        ctx.writeAndFlush(request).addListener(writeFuture -> {
+            if (!writeFuture.isSuccess()) {
+                log.warn("{} Failed to send request for mutual auth to broker: {}", ctx.channel(),
+                        writeFuture.cause().getMessage());
+                close(writeFuture.cause());
+            }
+        });
+    }
+
+    protected void prepareMutualAuth(CommandAuthChallenge authChallenge) throws AuthenticationException {
         if (Arrays.equals(AuthData.REFRESH_AUTH_DATA_BYTES, authChallenge.getChallenge().getAuthData())) {
             try {
                 authenticationDataProvider = authentication.getAuthData(remoteHostName);
             } catch (PulsarClientException e) {
                 log.error("{} Error when refreshing authentication data provider: {}", ctx.channel(), e);
-                connectionFuture.completeExceptionally(e);
+                close(e);
                 return;
             }
         }
-
         // mutual authn. If auth not complete, continue auth; if auth complete, complete connectionFuture.
-        try {
-            AuthData authData = authenticationDataProvider
-                .authenticate(AuthData.of(authChallenge.getChallenge().getAuthData()));
-
-            checkState(!authData.isComplete());
-
-            ByteBuf request = Commands.newAuthResponse(authentication.getAuthMethodName(),
-                authData,
-                this.protocolVersion,
-                PulsarVersion.getVersion());
-
-            if (log.isDebugEnabled()) {
-                log.debug("{} Mutual auth {}", ctx.channel(), authentication.getAuthMethodName());
-            }
-
-            ctx.writeAndFlush(request).addListener(writeFuture -> {
-                if (!writeFuture.isSuccess()) {
-                    log.warn("{} Failed to send request for mutual auth to broker: {}", ctx.channel(),
-                        writeFuture.cause().getMessage());
-                    connectionFuture.completeExceptionally(writeFuture.cause());
-                }
-            });
+        AuthData authData =
+                authenticationDataProvider.authenticate(AuthData.of(authChallenge.getChallenge().getAuthData()));
+        checkState(!authData.isComplete());
+        sendMutualAuthCommand(authentication.getAuthMethodName(), authData);
+    }
 
+    @Override
+    protected void handleAuthChallenge(CommandAuthChallenge authChallenge) {
+        checkArgument(authChallenge.hasChallenge());
+        checkArgument(authChallenge.getChallenge().hasAuthData());
+        try {
+            prepareMutualAuth(authChallenge);

Review Comment:
   I think we'd better mix the `sendMutualAuthCommand` method with the `prepareMutualAuth` method.
   My suggestion is
   
   ```java
   AuthData authData = prepareMutualAuthData(authChallenge);
   sendMutualAuthCommand(authentication.getAuthMethodName(), authData);
   ```



##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java:
##########
@@ -543,6 +557,48 @@ protected void handleAuthResponse(CommandAuthResponse authResponse) {
         }
     }
 
+    private CompletableFuture<AuthData> getOrRefreshOriginalClientAuthData(boolean isRefresh) {
+        if (!isRefresh) {
+            if (service.getConfiguration().isForwardAuthorizationCredentials()) {
+                return CompletableFuture.completedFuture(clientAuthData);
+            }
+            return CompletableFuture.completedFuture(null);
+        }
+
+        if (refreshAuthFuture != null && !refreshAuthFuture.isDone()) {
+            log.error("{} Mutual auth timeout", ctx.channel());
+            ctx.close();
+            return CompletableFuture.failedFuture(new AuthenticationException("Mutual auth timeout"));
+        }
+
+        refreshAuthFuture = new CompletableFuture<>();
+        try {
+            AuthData refreshAuthData = authState.refreshAuthentication();
+            ctx.writeAndFlush(Commands.newAuthChallenge(clientAuthMethod, refreshAuthData, protocolVersionToAdvertise))
+                    .addListener(writeFuture -> {
+                        if (writeFuture.isSuccess()) {
+                            if (LOG.isDebugEnabled()) {
+                                LOG.debug("{} Sent auth challenge to client to refresh credentials with method: {}",
+                                        ctx.channel(), clientAuthMethod);
+                            }
+                        } else {
+                            LOG.error("{} Failed to send request for mutual auth to client", ctx.channel(),
+                                    writeFuture.cause());
+                            refreshAuthFuture.completeExceptionally(writeFuture.cause());
+                            ctx.close();
+                        }
+                    });
+        } catch (AuthenticationException e) {
+            log.error("{} Failed to refresh authentication", ctx.channel(), e);
+            ctx.writeAndFlush(
+                            Commands.newError(-1, ServerError.AuthenticationError, "Failed to refresh authentication"))
+                    .addListener(ChannelFutureListener.CLOSE);
+            refreshAuthFuture.completeExceptionally(e);
+        }
+
+        return refreshAuthFuture;

Review Comment:
   Another point I'm curious about is why we need this method.
   
   If we need the client(user) to refresh the auth data, we can just let the proxy forward the auth challenge command to the client and forward the client auth response to the broker. Does this not work? or maybe I missed something.



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java:
##########
@@ -253,8 +254,25 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception {
         } else {
             log.info("{} Connected through proxy to target broker at {}", ctx.channel(), proxyToTargetBrokerAddress);
         }
-        // Send CONNECT command
-        ctx.writeAndFlush(newConnectCommand())
+        completeActive();
+    }
+
+    protected void completeActive() throws Exception {
+        sendConnectCommand(null, null, null);

Review Comment:
   It should be `sendConnectCommand(null, null, null);` right?
   Otherwise, how can we pass the client's auth data to the new connection command
   
   https://github.com/apache/pulsar/pull/17831/files#diff-454fee306809542a71a0f3ce2cd97d7325afcfd7027dc3d156d303aff4c29902L275-L278



##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java:
##########
@@ -543,6 +557,48 @@ protected void handleAuthResponse(CommandAuthResponse authResponse) {
         }
     }
 
+    private CompletableFuture<AuthData> getOrRefreshOriginalClientAuthData(boolean isRefresh) {
+        if (!isRefresh) {
+            if (service.getConfiguration().isForwardAuthorizationCredentials()) {
+                return CompletableFuture.completedFuture(clientAuthData);
+            }
+            return CompletableFuture.completedFuture(null);
+        }
+
+        if (refreshAuthFuture != null && !refreshAuthFuture.isDone()) {
+            log.error("{} Mutual auth timeout", ctx.channel());
+            ctx.close();
+            return CompletableFuture.failedFuture(new AuthenticationException("Mutual auth timeout"));
+        }
+
+        refreshAuthFuture = new CompletableFuture<>();
+        try {
+            AuthData refreshAuthData = authState.refreshAuthentication();
+            ctx.writeAndFlush(Commands.newAuthChallenge(clientAuthMethod, refreshAuthData, protocolVersionToAdvertise))
+                    .addListener(writeFuture -> {
+                        if (writeFuture.isSuccess()) {
+                            if (LOG.isDebugEnabled()) {
+                                LOG.debug("{} Sent auth challenge to client to refresh credentials with method: {}",
+                                        ctx.channel(), clientAuthMethod);
+                            }
+                        } else {
+                            LOG.error("{} Failed to send request for mutual auth to client", ctx.channel(),
+                                    writeFuture.cause());
+                            refreshAuthFuture.completeExceptionally(writeFuture.cause());
+                            ctx.close();
+                        }
+                    });
+        } catch (AuthenticationException e) {
+            log.error("{} Failed to refresh authentication", ctx.channel(), e);
+            ctx.writeAndFlush(
+                            Commands.newError(-1, ServerError.AuthenticationError, "Failed to refresh authentication"))
+                    .addListener(ChannelFutureListener.CLOSE);
+            refreshAuthFuture.completeExceptionally(e);
+        }
+
+        return refreshAuthFuture;

Review Comment:
   Looks like the `refreshAuthFuture` will only be completed with the exception, not the auth data. How does it work?



##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java:
##########
@@ -18,46 +18,61 @@
  */
 package org.apache.pulsar.proxy.server;
 
-import io.netty.buffer.ByteBuf;
 import io.netty.channel.EventLoopGroup;
-import org.apache.pulsar.PulsarVersion;
+import java.util.Arrays;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import javax.naming.AuthenticationException;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.impl.ClientCnx;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.common.api.AuthData;
-import org.apache.pulsar.common.protocol.Commands;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.pulsar.common.api.proto.CommandAuthChallenge;
 
+@Slf4j
 public class ProxyClientCnx extends ClientCnx {
-
-    String clientAuthRole;
-    AuthData clientAuthData;
-    String clientAuthMethod;
-    int protocolVersion;
+    private final boolean forwardClientAuthData;
+    private final String clientAuthMethod;
+    private final String clientAuthRole;
+    private final Function<Boolean, CompletableFuture<AuthData>> clientAuthDataSupplier;
 
     public ProxyClientCnx(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, String clientAuthRole,
-                          AuthData clientAuthData, String clientAuthMethod, int protocolVersion) {
-        super(conf, eventLoopGroup);
+                          Function<Boolean, CompletableFuture<AuthData>> clientAuthDataSupplier,
+                          String clientAuthMethod,
+                          int protocolVersion, boolean forwardClientAuthData) {
+        super(conf, eventLoopGroup, protocolVersion);
         this.clientAuthRole = clientAuthRole;
-        this.clientAuthData = clientAuthData;
+        this.clientAuthDataSupplier = clientAuthDataSupplier;
         this.clientAuthMethod = clientAuthMethod;
-        this.protocolVersion = protocolVersion;
+        this.forwardClientAuthData = forwardClientAuthData;
     }
 
     @Override
-    protected ByteBuf newConnectCommand() throws Exception {
-        if (log.isDebugEnabled()) {
-            log.debug("New Connection opened via ProxyClientCnx with params clientAuthRole = {},"
-                            + " clientAuthData = {}, clientAuthMethod = {}",
-                    clientAuthRole, clientAuthData, clientAuthMethod);
+    protected void completeActive() {
+        clientAuthDataSupplier.apply(false).thenAccept(clientAuthData -> {
+            try {
+                sendConnectCommand(clientAuthRole, clientAuthData, clientAuthMethod);
+            } catch (Exception e) {
+                log.error("{} Error during handshake", ctx.channel(), e);
+                close(e);
+            }
+        });
+    }
+
+    @Override
+    protected void prepareMutualAuth(CommandAuthChallenge authChallenge) throws AuthenticationException {
+        boolean isRefresh = Arrays.equals(AuthData.REFRESH_AUTH_DATA_BYTES, authChallenge.getChallenge().getAuthData());
+        if (!forwardClientAuthData || !isRefresh) {
+            super.prepareMutualAuth(authChallenge);
+            return;
         }
 
-        authenticationDataProvider = authentication.getAuthData(remoteHostName);
-        AuthData authData = authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA);
-        return Commands.newConnect(authentication.getAuthMethodName(), authData, this.protocolVersion,
-            PulsarVersion.getVersion(), proxyToTargetBrokerAddress, clientAuthRole, clientAuthData,
-            clientAuthMethod);
+        clientAuthDataSupplier.apply(true).thenAccept(originalClientAuthData -> {
+            sendMutualAuthCommand(clientAuthMethod, originalClientAuthData);

Review Comment:
   Can the `clientAuthMethod` be different from the client and proxy client?



##########
pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java:
##########
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.proxy.server;
+
+import static org.mockito.Mockito.spy;
+import static org.testng.Assert.assertEquals;
+import com.google.common.collect.Sets;
+import io.jsonwebtoken.SignatureAlgorithm;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import javax.crypto.SecretKey;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
+import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.mockito.Mockito;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class ProxyRefreshAuthTest extends ProducerConsumerBase {
+    private final SecretKey SECRET_KEY = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
+
+    private ProxyService proxyService;
+    private final ProxyConfiguration proxyConfig = new ProxyConfiguration();
+
+    @Override
+    protected void doInitConf() throws Exception {
+        super.doInitConf();
+
+        // enable tls and auth&auth at broker
+        conf.setAuthenticationEnabled(true);
+        conf.setAuthorizationEnabled(false);
+        conf.setTopicLevelPoliciesEnabled(false);
+        conf.setProxyRoles(Collections.singleton("Proxy"));
+        conf.setAdvertisedAddress(null);
+        conf.setAuthenticateOriginalAuthData(true);
+        conf.setBrokerServicePort(Optional.of(0));
+        conf.setWebServicePort(Optional.of(0));
+
+        Set<String> superUserRoles = new HashSet<>();
+        superUserRoles.add("superUser");
+        conf.setSuperUserRoles(superUserRoles);
+
+        conf.setAuthenticationProviders(Set.of(AuthenticationProviderToken.class.getName()));
+        Properties properties = new Properties();
+        properties.setProperty("tokenSecretKey", AuthTokenUtils.encodeKeyBase64(SECRET_KEY));
+        conf.setProperties(properties);
+
+        conf.setClusterName("proxy-authorization");
+        conf.setNumExecutorThreadPoolSize(5);
+
+        conf.setAuthenticationRefreshCheckSeconds(3);
+    }
+
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        super.init();
+
+        // start proxy service
+        proxyConfig.setAuthenticationEnabled(true);
+        proxyConfig.setAuthorizationEnabled(false);
+        proxyConfig.setForwardAuthorizationCredentials(true);
+        proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl());
+        proxyConfig.setAdvertisedAddress(null);
+
+        proxyConfig.setServicePort(Optional.of(0));
+        proxyConfig.setBrokerProxyAllowedTargetPorts("*");
+        proxyConfig.setWebServicePort(Optional.of(0));
+
+        proxyConfig.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName());
+        proxyConfig.setBrokerClientAuthenticationParameters(
+                AuthTokenUtils.createToken(SECRET_KEY, "Proxy", Optional.empty()));
+        proxyConfig.setAuthenticationProviders(Set.of(AuthenticationProviderToken.class.getName()));
+        Properties properties = new Properties();
+        properties.setProperty("tokenSecretKey", AuthTokenUtils.encodeKeyBase64(SECRET_KEY));
+        proxyConfig.setProperties(properties);
+
+        proxyService = Mockito.spy(new ProxyService(proxyConfig,
+                new AuthenticationService(
+                        PulsarConfigurationLoader.convertFrom(proxyConfig))));
+    }
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+        proxyService.close();
+    }
+
+    private void startProxy(boolean forwardAuthData) throws Exception {
+        pulsar.getConfiguration().setAuthenticateOriginalAuthData(forwardAuthData);
+        proxyConfig.setForwardAuthorizationCredentials(forwardAuthData);
+        proxyService.start();
+    }
+
+    @DataProvider
+    Object[] forwardAuthDataProvider() {
+        return new Object[]{true,false};
+    }
+
+    @Test(dataProvider = "forwardAuthDataProvider")
+    public void testAuthDataRefresh(boolean forwardAuthData) throws Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        startProxy(forwardAuthData);
+
+        AuthenticationToken authenticationToken = new AuthenticationToken(() -> {
+            Calendar calendar = Calendar.getInstance();
+            calendar.add(Calendar.SECOND, 10);
+            return AuthTokenUtils.createToken(SECRET_KEY, "client", Optional.of(calendar.getTime()));
+        });
+
+        pulsarClient = PulsarClient.builder().serviceUrl(proxyService.getServiceUrl())
+                .authentication(authenticationToken)
+                .operationTimeout(3, TimeUnit.SECONDS)

Review Comment:
   Is it related to this test?
   It might introduce a flaky test in the CI environment with a short operation timeout.



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java:
##########
@@ -253,8 +254,25 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception {
         } else {
             log.info("{} Connected through proxy to target broker at {}", ctx.channel(), proxyToTargetBrokerAddress);
         }
-        // Send CONNECT command
-        ctx.writeAndFlush(newConnectCommand())
+        completeActive();
+    }
+
+    protected void completeActive() throws Exception {

Review Comment:
   ```suggestion
       protected void handleChannelActive() throws Exception {
   ```



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java:
##########
@@ -361,51 +369,54 @@ protected void handleConnected(CommandConnected connected) {
         state = State.Ready;
     }
 
-    @Override
-    protected void handleAuthChallenge(CommandAuthChallenge authChallenge) {
-        checkArgument(authChallenge.hasChallenge());
-        checkArgument(authChallenge.getChallenge().hasAuthData());
+    protected final void sendMutualAuthCommand(String authMethod, AuthData authData) {
+        if (log.isDebugEnabled()) {
+            log.debug("{} Mutual auth {}", ctx.channel(), authMethod);
+        }
 
+        ByteBuf request = Commands.newAuthResponse(authMethod,
+                authData,
+                this.protocolVersion,
+                PulsarVersion.getVersion());
+
+        ctx.writeAndFlush(request).addListener(writeFuture -> {
+            if (!writeFuture.isSuccess()) {
+                log.warn("{} Failed to send request for mutual auth to broker: {}", ctx.channel(),
+                        writeFuture.cause().getMessage());
+                close(writeFuture.cause());
+            }
+        });
+    }
+
+    protected void prepareMutualAuth(CommandAuthChallenge authChallenge) throws AuthenticationException {
         if (Arrays.equals(AuthData.REFRESH_AUTH_DATA_BYTES, authChallenge.getChallenge().getAuthData())) {
             try {
                 authenticationDataProvider = authentication.getAuthData(remoteHostName);
             } catch (PulsarClientException e) {
                 log.error("{} Error when refreshing authentication data provider: {}", ctx.channel(), e);
-                connectionFuture.completeExceptionally(e);
+                close(e);
                 return;
             }
         }
-
         // mutual authn. If auth not complete, continue auth; if auth complete, complete connectionFuture.
-        try {
-            AuthData authData = authenticationDataProvider
-                .authenticate(AuthData.of(authChallenge.getChallenge().getAuthData()));
-
-            checkState(!authData.isComplete());
-
-            ByteBuf request = Commands.newAuthResponse(authentication.getAuthMethodName(),
-                authData,
-                this.protocolVersion,
-                PulsarVersion.getVersion());
-
-            if (log.isDebugEnabled()) {
-                log.debug("{} Mutual auth {}", ctx.channel(), authentication.getAuthMethodName());
-            }
-
-            ctx.writeAndFlush(request).addListener(writeFuture -> {
-                if (!writeFuture.isSuccess()) {
-                    log.warn("{} Failed to send request for mutual auth to broker: {}", ctx.channel(),
-                        writeFuture.cause().getMessage());
-                    connectionFuture.completeExceptionally(writeFuture.cause());
-                }
-            });
+        AuthData authData =
+                authenticationDataProvider.authenticate(AuthData.of(authChallenge.getChallenge().getAuthData()));
+        checkState(!authData.isComplete());
+        sendMutualAuthCommand(authentication.getAuthMethodName(), authData);
+    }
 
+    @Override
+    protected void handleAuthChallenge(CommandAuthChallenge authChallenge) {
+        checkArgument(authChallenge.hasChallenge());
+        checkArgument(authChallenge.getChallenge().hasAuthData());
+        try {
+            prepareMutualAuth(authChallenge);

Review Comment:
   And then the `sendMutualAuthCommand` can be private, the ProxyClientCnx.java only needs to re-implement the `prepareMutualAuthData(authChallenge)` method



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] michaeljmarshall commented on a diff in pull request #17831: [fix][proxy] Fix refresh client auth

Posted by GitBox <gi...@apache.org>.
michaeljmarshall commented on code in PR #17831:
URL: https://github.com/apache/pulsar/pull/17831#discussion_r1080836860


##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java:
##########
@@ -523,18 +529,63 @@ remoteAddress, protocolVersionToAdvertise, getRemoteEndpointProtocolVersion(),
 
     @Override
     protected void handleAuthResponse(CommandAuthResponse authResponse) {
-        checkArgument(state == State.Connecting);
         checkArgument(authResponse.hasResponse());
         checkArgument(authResponse.getResponse().hasAuthData() && authResponse.getResponse().hasAuthMethodName());
 
         if (LOG.isDebugEnabled()) {
             LOG.debug("Received AuthResponse from {}, auth method: {}",
-                remoteAddress, authResponse.getResponse().getAuthMethodName());
+                    remoteAddress, authResponse.getResponse().getAuthMethodName());
         }
 
         try {
             AuthData clientData = AuthData.of(authResponse.getResponse().getAuthData());
             doAuthentication(clientData);
+            if (service.getConfiguration().isForwardAuthorizationCredentials()

Review Comment:
   An interesting consequence of this change is that the `ProxyConnection` goes back into the `Connecting` state if the authentication takes multiple steps.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nodece commented on a diff in pull request #17831: [fix][proxy] Fix refresh client auth

Posted by GitBox <gi...@apache.org>.
nodece commented on code in PR #17831:
URL: https://github.com/apache/pulsar/pull/17831#discussion_r980803354


##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java:
##########
@@ -18,46 +18,61 @@
  */
 package org.apache.pulsar.proxy.server;
 
-import io.netty.buffer.ByteBuf;
 import io.netty.channel.EventLoopGroup;
-import org.apache.pulsar.PulsarVersion;
+import java.util.Arrays;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import javax.naming.AuthenticationException;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.impl.ClientCnx;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.common.api.AuthData;
-import org.apache.pulsar.common.protocol.Commands;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.pulsar.common.api.proto.CommandAuthChallenge;
 
+@Slf4j
 public class ProxyClientCnx extends ClientCnx {
-
-    String clientAuthRole;
-    AuthData clientAuthData;
-    String clientAuthMethod;
-    int protocolVersion;
+    private final boolean forwardClientAuthData;
+    private final String clientAuthMethod;
+    private final String clientAuthRole;
+    private final Function<Boolean, CompletableFuture<AuthData>> clientAuthDataSupplier;
 
     public ProxyClientCnx(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, String clientAuthRole,

Review Comment:
   Good catch!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nodece commented on a diff in pull request #17831: [fix][proxy] Fix refresh client auth

Posted by GitBox <gi...@apache.org>.
nodece commented on code in PR #17831:
URL: https://github.com/apache/pulsar/pull/17831#discussion_r983169093


##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java:
##########
@@ -18,46 +18,61 @@
  */
 package org.apache.pulsar.proxy.server;
 
-import io.netty.buffer.ByteBuf;
 import io.netty.channel.EventLoopGroup;
-import org.apache.pulsar.PulsarVersion;
+import java.util.Arrays;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import javax.naming.AuthenticationException;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.impl.ClientCnx;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.common.api.AuthData;
-import org.apache.pulsar.common.protocol.Commands;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.pulsar.common.api.proto.CommandAuthChallenge;
 
+@Slf4j
 public class ProxyClientCnx extends ClientCnx {
-
-    String clientAuthRole;
-    AuthData clientAuthData;
-    String clientAuthMethod;
-    int protocolVersion;
+    private final boolean forwardClientAuthData;
+    private final String clientAuthMethod;
+    private final String clientAuthRole;
+    private final Function<Boolean, CompletableFuture<AuthData>> clientAuthDataSupplier;
 
     public ProxyClientCnx(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, String clientAuthRole,

Review Comment:
   I still change this method, because this clientCnx requires the ProxyConnection.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nodece commented on a diff in pull request #17831: [fix][proxy] Fix refresh client auth

Posted by GitBox <gi...@apache.org>.
nodece commented on code in PR #17831:
URL: https://github.com/apache/pulsar/pull/17831#discussion_r983009317


##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java:
##########
@@ -543,6 +577,35 @@ protected void handleAuthResponse(CommandAuthResponse authResponse) {
         }
     }
 
+    private Void requestRefreshClientAuthData() {
+        if (!service.getConfiguration().isForwardAuthorizationCredentials()) {

Review Comment:
   Ok, I can remove this check.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nodece commented on a diff in pull request #17831: [fix][proxy] Fix refresh client auth

Posted by GitBox <gi...@apache.org>.
nodece commented on code in PR #17831:
URL: https://github.com/apache/pulsar/pull/17831#discussion_r983057350


##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java:
##########
@@ -54,10 +66,33 @@ protected ByteBuf newConnectCommand() throws Exception {
 
         authenticationDataProvider = authentication.getAuthData(remoteHostName);
         AuthData authData = authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA);
-        return Commands.newConnect(authentication.getAuthMethodName(), authData, this.protocolVersion,
-            PulsarVersion.getVersion(), proxyToTargetBrokerAddress, clientAuthRole, clientAuthData,
-            clientAuthMethod);
+        return Commands.newConnect(authentication.getAuthMethodName(), authData, protocolVersion,
+                PulsarVersion.getVersion(), proxyToTargetBrokerAddress, clientAuthRole, clientAuthData,
+                clientAuthMethod);
     }
 
-    private static final Logger log = LoggerFactory.getLogger(ProxyClientCnx.class);
+    @Override
+    protected void handleAuthChallenge(CommandAuthChallenge authChallenge) {
+        checkArgument(authChallenge.hasChallenge());
+        checkArgument(authChallenge.getChallenge().hasAuthData());
+
+        boolean isRefresh = Arrays.equals(AuthData.REFRESH_AUTH_DATA_BYTES, authChallenge.getChallenge().getAuthData());
+        if (!forwardClientAuthData || !isRefresh || refreshClientAuthDataNotifier == null) {
+            super.handleAuthChallenge(authChallenge);
+            return;
+        }
+
+        try {
+            if (log.isDebugEnabled()) {
+                log.debug("{} Request to refresh the original client authentication data", ctx.channel());
+            }
+            refreshClientAuthDataNotifier.run();

Review Comment:
   Yes, when authentication data is refreshed, we send an `authResponse` command to the broker that the proxy client connected.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] michaeljmarshall commented on a diff in pull request #17831: [fix][proxy] Fix refresh client auth

Posted by GitBox <gi...@apache.org>.
michaeljmarshall commented on code in PR #17831:
URL: https://github.com/apache/pulsar/pull/17831#discussion_r1080836860


##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java:
##########
@@ -523,18 +529,63 @@ remoteAddress, protocolVersionToAdvertise, getRemoteEndpointProtocolVersion(),
 
     @Override
     protected void handleAuthResponse(CommandAuthResponse authResponse) {
-        checkArgument(state == State.Connecting);
         checkArgument(authResponse.hasResponse());
         checkArgument(authResponse.getResponse().hasAuthData() && authResponse.getResponse().hasAuthMethodName());
 
         if (LOG.isDebugEnabled()) {
             LOG.debug("Received AuthResponse from {}, auth method: {}",
-                remoteAddress, authResponse.getResponse().getAuthMethodName());
+                    remoteAddress, authResponse.getResponse().getAuthMethodName());
         }
 
         try {
             AuthData clientData = AuthData.of(authResponse.getResponse().getAuthData());
             doAuthentication(clientData);
+            if (service.getConfiguration().isForwardAuthorizationCredentials()

Review Comment:
   An interesting consequence of this change is that the `ProxyConnection` goes back into the `Connecting` state.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nodece commented on a diff in pull request #17831: [fix][proxy] Fix refresh client auth

Posted by GitBox <gi...@apache.org>.
nodece commented on code in PR #17831:
URL: https://github.com/apache/pulsar/pull/17831#discussion_r983009856


##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java:
##########
@@ -423,16 +430,44 @@ public void brokerConnected(DirectProxyHandler directProxyHandler, CommandConnec
     }
 
     // According to auth result, send newConnected or newAuthChallenge command.
-    private void doAuthentication(AuthData clientData) throws Exception {
+    private void doAuthentication(AuthData clientData, Optional<CommandAuthResponse> authResponseOptional)
+            throws Exception {
         AuthData brokerData = authState.authenticate(clientData);
         // authentication has completed, will send newConnected command.
         if (authState.isComplete()) {
             clientAuthRole = authState.getAuthRole();
             if (LOG.isDebugEnabled()) {
                 LOG.debug("[{}] Client successfully authenticated with {} role {}",
-                    remoteAddress, authMethod, clientAuthRole);
+                        remoteAddress, authMethod, clientAuthRole);
+            }
+
+            // First connection
+            if (this.connectionPool == null || state == State.Connecting) {
+                // authentication has completed, will send newConnected command.
+                completeConnect(clientData);
+            } else if (state == State.ProxyLookupRequests) {

Review Comment:
   Both `handleAuthResponse` and `handleConnect` use this method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nodece commented on a diff in pull request #17831: [fix][proxy] Fix refresh client auth

Posted by GitBox <gi...@apache.org>.
nodece commented on code in PR #17831:
URL: https://github.com/apache/pulsar/pull/17831#discussion_r983055843


##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java:
##########
@@ -423,16 +425,22 @@ public void brokerConnected(DirectProxyHandler directProxyHandler, CommandConnec
     }
 
     // According to auth result, send newConnected or newAuthChallenge command.
-    private void doAuthentication(AuthData clientData) throws Exception {
+    private void doAuthentication(AuthData clientData)
+            throws Exception {
         AuthData brokerData = authState.authenticate(clientData);
         // authentication has completed, will send newConnected command.
         if (authState.isComplete()) {
             clientAuthRole = authState.getAuthRole();
             if (LOG.isDebugEnabled()) {
                 LOG.debug("[{}] Client successfully authenticated with {} role {}",
-                    remoteAddress, authMethod, clientAuthRole);
+                        remoteAddress, authMethod, clientAuthRole);
+            }
+
+            // First connection
+            if (this.connectionPool == null || state == State.Connecting) {

Review Comment:
   The `doAuthentication()` method will be called multiple times, so we need to add a check.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nodece commented on a diff in pull request #17831: [fix][proxy] Fix refresh client auth

Posted by GitBox <gi...@apache.org>.
nodece commented on code in PR #17831:
URL: https://github.com/apache/pulsar/pull/17831#discussion_r980807652


##########
pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java:
##########
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.proxy.server;
+
+import static org.mockito.Mockito.spy;
+import static org.testng.Assert.assertEquals;
+import com.google.common.collect.Sets;
+import io.jsonwebtoken.SignatureAlgorithm;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import javax.crypto.SecretKey;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
+import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.mockito.Mockito;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class ProxyRefreshAuthTest extends ProducerConsumerBase {
+    private final SecretKey SECRET_KEY = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
+
+    private ProxyService proxyService;
+    private final ProxyConfiguration proxyConfig = new ProxyConfiguration();
+
+    @Override
+    protected void doInitConf() throws Exception {
+        super.doInitConf();
+
+        // enable tls and auth&auth at broker
+        conf.setAuthenticationEnabled(true);
+        conf.setAuthorizationEnabled(false);
+        conf.setTopicLevelPoliciesEnabled(false);
+        conf.setProxyRoles(Collections.singleton("Proxy"));
+        conf.setAdvertisedAddress(null);
+        conf.setAuthenticateOriginalAuthData(true);
+        conf.setBrokerServicePort(Optional.of(0));
+        conf.setWebServicePort(Optional.of(0));
+
+        Set<String> superUserRoles = new HashSet<>();
+        superUserRoles.add("superUser");
+        conf.setSuperUserRoles(superUserRoles);
+
+        conf.setAuthenticationProviders(Set.of(AuthenticationProviderToken.class.getName()));
+        Properties properties = new Properties();
+        properties.setProperty("tokenSecretKey", AuthTokenUtils.encodeKeyBase64(SECRET_KEY));
+        conf.setProperties(properties);
+
+        conf.setClusterName("proxy-authorization");
+        conf.setNumExecutorThreadPoolSize(5);
+
+        conf.setAuthenticationRefreshCheckSeconds(3);
+    }
+
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        super.init();
+
+        // start proxy service
+        proxyConfig.setAuthenticationEnabled(true);
+        proxyConfig.setAuthorizationEnabled(false);
+        proxyConfig.setForwardAuthorizationCredentials(true);
+        proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl());
+        proxyConfig.setAdvertisedAddress(null);
+
+        proxyConfig.setServicePort(Optional.of(0));
+        proxyConfig.setBrokerProxyAllowedTargetPorts("*");
+        proxyConfig.setWebServicePort(Optional.of(0));
+
+        proxyConfig.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName());
+        proxyConfig.setBrokerClientAuthenticationParameters(
+                AuthTokenUtils.createToken(SECRET_KEY, "Proxy", Optional.empty()));
+        proxyConfig.setAuthenticationProviders(Set.of(AuthenticationProviderToken.class.getName()));
+        Properties properties = new Properties();
+        properties.setProperty("tokenSecretKey", AuthTokenUtils.encodeKeyBase64(SECRET_KEY));
+        proxyConfig.setProperties(properties);
+
+        proxyService = Mockito.spy(new ProxyService(proxyConfig,
+                new AuthenticationService(
+                        PulsarConfigurationLoader.convertFrom(proxyConfig))));
+    }
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+        proxyService.close();
+    }
+
+    private void startProxy(boolean forwardAuthData) throws Exception {
+        pulsar.getConfiguration().setAuthenticateOriginalAuthData(forwardAuthData);
+        proxyConfig.setForwardAuthorizationCredentials(forwardAuthData);
+        proxyService.start();
+    }
+
+    @DataProvider
+    Object[] forwardAuthDataProvider() {
+        return new Object[]{true,false};
+    }
+
+    @Test(dataProvider = "forwardAuthDataProvider")
+    public void testAuthDataRefresh(boolean forwardAuthData) throws Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        startProxy(forwardAuthData);
+
+        AuthenticationToken authenticationToken = new AuthenticationToken(() -> {
+            Calendar calendar = Calendar.getInstance();
+            calendar.add(Calendar.SECOND, 10);
+            return AuthTokenUtils.createToken(SECRET_KEY, "client", Optional.of(calendar.getTime()));
+        });
+
+        pulsarClient = PulsarClient.builder().serviceUrl(proxyService.getServiceUrl())
+                .authentication(authenticationToken)
+                .operationTimeout(3, TimeUnit.SECONDS)
+                .build();
+
+        admin = PulsarAdmin.builder().serviceHttpUrl(pulsar.getWebServiceAddress())
+                .authentication(authenticationToken).build();
+
+        String namespaceName = "my-tenant/my-ns";
+        admin.clusters().createCluster("proxy-authorization",
+                ClusterData.builder().serviceUrlTls(brokerUrlTls.toString()).build());
+        admin.tenants().createTenant("my-tenant",
+                new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("proxy-authorization")));
+        admin.namespaces().createNamespace(namespaceName);
+
+        String topic = "persistent://my-tenant/my-ns/my-topic1";
+        Producer<byte[]> producer = spy(pulsarClient.newProducer()
+                .topic(topic).create());
+        int i = 0;
+        while (i <= 15) {
+            producer.send(String.valueOf(i).getBytes(StandardCharsets.UTF_8));
+            i++;
+            Thread.sleep(1000);
+        }
+
+        List<CompletableFuture<List<String>>> futures = new ArrayList<>(10);
+        for (i = 0; i < 10; i++) {
+            futures.add(pulsarClient.getPartitionsForTopic(topic));
+        }

Review Comment:
   Do you suggest removing this test? This PR looks like an improvement, we have a test that covers these changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nodece commented on a diff in pull request #17831: [fix][proxy] Fix refresh client auth

Posted by GitBox <gi...@apache.org>.
nodece commented on code in PR #17831:
URL: https://github.com/apache/pulsar/pull/17831#discussion_r980962997


##########
pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java:
##########
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.proxy.server;
+
+import static org.mockito.Mockito.spy;
+import static org.testng.Assert.assertEquals;
+import com.google.common.collect.Sets;
+import io.jsonwebtoken.SignatureAlgorithm;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import javax.crypto.SecretKey;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
+import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.mockito.Mockito;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class ProxyRefreshAuthTest extends ProducerConsumerBase {
+    private final SecretKey SECRET_KEY = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
+
+    private ProxyService proxyService;
+    private final ProxyConfiguration proxyConfig = new ProxyConfiguration();
+
+    @Override
+    protected void doInitConf() throws Exception {
+        super.doInitConf();
+
+        // enable tls and auth&auth at broker
+        conf.setAuthenticationEnabled(true);
+        conf.setAuthorizationEnabled(false);
+        conf.setTopicLevelPoliciesEnabled(false);
+        conf.setProxyRoles(Collections.singleton("Proxy"));
+        conf.setAdvertisedAddress(null);
+        conf.setAuthenticateOriginalAuthData(true);
+        conf.setBrokerServicePort(Optional.of(0));
+        conf.setWebServicePort(Optional.of(0));
+
+        Set<String> superUserRoles = new HashSet<>();
+        superUserRoles.add("superUser");
+        conf.setSuperUserRoles(superUserRoles);
+
+        conf.setAuthenticationProviders(Set.of(AuthenticationProviderToken.class.getName()));
+        Properties properties = new Properties();
+        properties.setProperty("tokenSecretKey", AuthTokenUtils.encodeKeyBase64(SECRET_KEY));
+        conf.setProperties(properties);
+
+        conf.setClusterName("proxy-authorization");
+        conf.setNumExecutorThreadPoolSize(5);
+
+        conf.setAuthenticationRefreshCheckSeconds(3);
+    }
+
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        super.init();
+
+        // start proxy service
+        proxyConfig.setAuthenticationEnabled(true);
+        proxyConfig.setAuthorizationEnabled(false);
+        proxyConfig.setForwardAuthorizationCredentials(true);
+        proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl());
+        proxyConfig.setAdvertisedAddress(null);
+
+        proxyConfig.setServicePort(Optional.of(0));
+        proxyConfig.setBrokerProxyAllowedTargetPorts("*");
+        proxyConfig.setWebServicePort(Optional.of(0));
+
+        proxyConfig.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName());
+        proxyConfig.setBrokerClientAuthenticationParameters(
+                AuthTokenUtils.createToken(SECRET_KEY, "Proxy", Optional.empty()));
+        proxyConfig.setAuthenticationProviders(Set.of(AuthenticationProviderToken.class.getName()));
+        Properties properties = new Properties();
+        properties.setProperty("tokenSecretKey", AuthTokenUtils.encodeKeyBase64(SECRET_KEY));
+        proxyConfig.setProperties(properties);
+
+        proxyService = Mockito.spy(new ProxyService(proxyConfig,
+                new AuthenticationService(
+                        PulsarConfigurationLoader.convertFrom(proxyConfig))));
+    }
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+        proxyService.close();
+    }
+
+    private void startProxy(boolean forwardAuthData) throws Exception {
+        pulsar.getConfiguration().setAuthenticateOriginalAuthData(forwardAuthData);
+        proxyConfig.setForwardAuthorizationCredentials(forwardAuthData);
+        proxyService.start();
+    }
+
+    @DataProvider
+    Object[] forwardAuthDataProvider() {
+        return new Object[]{true,false};
+    }
+
+    @Test(dataProvider = "forwardAuthDataProvider")
+    public void testAuthDataRefresh(boolean forwardAuthData) throws Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        startProxy(forwardAuthData);
+
+        AuthenticationToken authenticationToken = new AuthenticationToken(() -> {
+            Calendar calendar = Calendar.getInstance();
+            calendar.add(Calendar.SECOND, 10);
+            return AuthTokenUtils.createToken(SECRET_KEY, "client", Optional.of(calendar.getTime()));
+        });
+
+        pulsarClient = PulsarClient.builder().serviceUrl(proxyService.getServiceUrl())
+                .authentication(authenticationToken)
+                .operationTimeout(3, TimeUnit.SECONDS)

Review Comment:
   How long do you suggest?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nodece commented on a diff in pull request #17831: [fix][proxy] Fix refresh client auth

Posted by GitBox <gi...@apache.org>.
nodece commented on code in PR #17831:
URL: https://github.com/apache/pulsar/pull/17831#discussion_r980801899


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java:
##########
@@ -1243,6 +1254,13 @@ public void close() {
        }
     }
 
+    public void close(Throwable e) {

Review Comment:
   Yes, it should be `protected`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #17831: [fix][proxy] Fix refresh client auth

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #17831:
URL: https://github.com/apache/pulsar/pull/17831#discussion_r980909168


##########
pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java:
##########
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.proxy.server;
+
+import static org.mockito.Mockito.spy;
+import static org.testng.Assert.assertEquals;
+import com.google.common.collect.Sets;
+import io.jsonwebtoken.SignatureAlgorithm;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import javax.crypto.SecretKey;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
+import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.mockito.Mockito;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class ProxyRefreshAuthTest extends ProducerConsumerBase {
+    private final SecretKey SECRET_KEY = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
+
+    private ProxyService proxyService;
+    private final ProxyConfiguration proxyConfig = new ProxyConfiguration();
+
+    @Override
+    protected void doInitConf() throws Exception {
+        super.doInitConf();
+
+        // enable tls and auth&auth at broker
+        conf.setAuthenticationEnabled(true);
+        conf.setAuthorizationEnabled(false);
+        conf.setTopicLevelPoliciesEnabled(false);
+        conf.setProxyRoles(Collections.singleton("Proxy"));
+        conf.setAdvertisedAddress(null);
+        conf.setAuthenticateOriginalAuthData(true);
+        conf.setBrokerServicePort(Optional.of(0));
+        conf.setWebServicePort(Optional.of(0));
+
+        Set<String> superUserRoles = new HashSet<>();
+        superUserRoles.add("superUser");
+        conf.setSuperUserRoles(superUserRoles);
+
+        conf.setAuthenticationProviders(Set.of(AuthenticationProviderToken.class.getName()));
+        Properties properties = new Properties();
+        properties.setProperty("tokenSecretKey", AuthTokenUtils.encodeKeyBase64(SECRET_KEY));
+        conf.setProperties(properties);
+
+        conf.setClusterName("proxy-authorization");
+        conf.setNumExecutorThreadPoolSize(5);
+
+        conf.setAuthenticationRefreshCheckSeconds(3);
+    }
+
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        super.init();
+
+        // start proxy service
+        proxyConfig.setAuthenticationEnabled(true);
+        proxyConfig.setAuthorizationEnabled(false);
+        proxyConfig.setForwardAuthorizationCredentials(true);
+        proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl());
+        proxyConfig.setAdvertisedAddress(null);
+
+        proxyConfig.setServicePort(Optional.of(0));
+        proxyConfig.setBrokerProxyAllowedTargetPorts("*");
+        proxyConfig.setWebServicePort(Optional.of(0));
+
+        proxyConfig.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName());
+        proxyConfig.setBrokerClientAuthenticationParameters(
+                AuthTokenUtils.createToken(SECRET_KEY, "Proxy", Optional.empty()));
+        proxyConfig.setAuthenticationProviders(Set.of(AuthenticationProviderToken.class.getName()));
+        Properties properties = new Properties();
+        properties.setProperty("tokenSecretKey", AuthTokenUtils.encodeKeyBase64(SECRET_KEY));
+        proxyConfig.setProperties(properties);
+
+        proxyService = Mockito.spy(new ProxyService(proxyConfig,
+                new AuthenticationService(
+                        PulsarConfigurationLoader.convertFrom(proxyConfig))));
+    }
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+        proxyService.close();
+    }
+
+    private void startProxy(boolean forwardAuthData) throws Exception {
+        pulsar.getConfiguration().setAuthenticateOriginalAuthData(forwardAuthData);
+        proxyConfig.setForwardAuthorizationCredentials(forwardAuthData);
+        proxyService.start();
+    }
+
+    @DataProvider
+    Object[] forwardAuthDataProvider() {
+        return new Object[]{true,false};
+    }
+
+    @Test(dataProvider = "forwardAuthDataProvider")
+    public void testAuthDataRefresh(boolean forwardAuthData) throws Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        startProxy(forwardAuthData);
+
+        AuthenticationToken authenticationToken = new AuthenticationToken(() -> {
+            Calendar calendar = Calendar.getInstance();
+            calendar.add(Calendar.SECOND, 10);
+            return AuthTokenUtils.createToken(SECRET_KEY, "client", Optional.of(calendar.getTime()));
+        });
+
+        pulsarClient = PulsarClient.builder().serviceUrl(proxyService.getServiceUrl())
+                .authentication(authenticationToken)
+                .operationTimeout(3, TimeUnit.SECONDS)
+                .build();
+
+        admin = PulsarAdmin.builder().serviceHttpUrl(pulsar.getWebServiceAddress())
+                .authentication(authenticationToken).build();
+
+        String namespaceName = "my-tenant/my-ns";
+        admin.clusters().createCluster("proxy-authorization",
+                ClusterData.builder().serviceUrlTls(brokerUrlTls.toString()).build());
+        admin.tenants().createTenant("my-tenant",
+                new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("proxy-authorization")));
+        admin.namespaces().createNamespace(namespaceName);
+
+        String topic = "persistent://my-tenant/my-ns/my-topic1";
+        Producer<byte[]> producer = spy(pulsarClient.newProducer()
+                .topic(topic).create());
+        int i = 0;
+        while (i <= 15) {
+            producer.send(String.valueOf(i).getBytes(StandardCharsets.UTF_8));
+            i++;
+            Thread.sleep(1000);
+        }
+
+        List<CompletableFuture<List<String>>> futures = new ArrayList<>(10);
+        for (i = 0; i < 10; i++) {
+            futures.add(pulsarClient.getPartitionsForTopic(topic));
+        }

Review Comment:
   @nodece No, I mean how the test can verify the fix should work, and how the new test will fail if without this fix.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nodece commented on a diff in pull request #17831: [fix][proxy] Fix refresh client auth

Posted by GitBox <gi...@apache.org>.
nodece commented on code in PR #17831:
URL: https://github.com/apache/pulsar/pull/17831#discussion_r980827317


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java:
##########
@@ -361,51 +369,54 @@ protected void handleConnected(CommandConnected connected) {
         state = State.Ready;
     }
 
-    @Override
-    protected void handleAuthChallenge(CommandAuthChallenge authChallenge) {
-        checkArgument(authChallenge.hasChallenge());
-        checkArgument(authChallenge.getChallenge().hasAuthData());
+    protected final void sendMutualAuthCommand(String authMethod, AuthData authData) {
+        if (log.isDebugEnabled()) {
+            log.debug("{} Mutual auth {}", ctx.channel(), authMethod);
+        }
 
+        ByteBuf request = Commands.newAuthResponse(authMethod,
+                authData,
+                this.protocolVersion,
+                PulsarVersion.getVersion());
+
+        ctx.writeAndFlush(request).addListener(writeFuture -> {
+            if (!writeFuture.isSuccess()) {
+                log.warn("{} Failed to send request for mutual auth to broker: {}", ctx.channel(),
+                        writeFuture.cause().getMessage());
+                close(writeFuture.cause());
+            }
+        });
+    }
+
+    protected void prepareMutualAuth(CommandAuthChallenge authChallenge) throws AuthenticationException {
         if (Arrays.equals(AuthData.REFRESH_AUTH_DATA_BYTES, authChallenge.getChallenge().getAuthData())) {
             try {
                 authenticationDataProvider = authentication.getAuthData(remoteHostName);
             } catch (PulsarClientException e) {
                 log.error("{} Error when refreshing authentication data provider: {}", ctx.channel(), e);
-                connectionFuture.completeExceptionally(e);
+                close(e);
                 return;
             }
         }
-
         // mutual authn. If auth not complete, continue auth; if auth complete, complete connectionFuture.
-        try {
-            AuthData authData = authenticationDataProvider
-                .authenticate(AuthData.of(authChallenge.getChallenge().getAuthData()));
-
-            checkState(!authData.isComplete());
-
-            ByteBuf request = Commands.newAuthResponse(authentication.getAuthMethodName(),
-                authData,
-                this.protocolVersion,
-                PulsarVersion.getVersion());
-
-            if (log.isDebugEnabled()) {
-                log.debug("{} Mutual auth {}", ctx.channel(), authentication.getAuthMethodName());
-            }
-
-            ctx.writeAndFlush(request).addListener(writeFuture -> {
-                if (!writeFuture.isSuccess()) {
-                    log.warn("{} Failed to send request for mutual auth to broker: {}", ctx.channel(),
-                        writeFuture.cause().getMessage());
-                    connectionFuture.completeExceptionally(writeFuture.cause());
-                }
-            });
+        AuthData authData =
+                authenticationDataProvider.authenticate(AuthData.of(authChallenge.getChallenge().getAuthData()));
+        checkState(!authData.isComplete());
+        sendMutualAuthCommand(authentication.getAuthMethodName(), authData);
+    }
 
+    @Override
+    protected void handleAuthChallenge(CommandAuthChallenge authChallenge) {
+        checkArgument(authChallenge.hasChallenge());
+        checkArgument(authChallenge.getChallenge().hasAuthData());
+        try {
+            prepareMutualAuth(authChallenge);

Review Comment:
   I'll refactor this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #17831: [fix][proxy] Fix refresh client auth

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #17831:
URL: https://github.com/apache/pulsar/pull/17831#discussion_r980910831


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java:
##########
@@ -253,8 +254,25 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception {
         } else {
             log.info("{} Connected through proxy to target broker at {}", ctx.channel(), proxyToTargetBrokerAddress);
         }
-        // Send CONNECT command
-        ctx.writeAndFlush(newConnectCommand())
+        completeActive();
+    }
+
+    protected void completeActive() throws Exception {
+        sendConnectCommand(null, null, null);

Review Comment:
   @nodece Sorry, I forget `not` in the previous comment.
   If we use `null` here, how the client(user)'s auth data can be passed to the proxy/broker?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #17831: [fix][proxy] Fix refresh client auth

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #17831:
URL: https://github.com/apache/pulsar/pull/17831#discussion_r983161996


##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java:
##########
@@ -54,10 +59,54 @@ protected ByteBuf newConnectCommand() throws Exception {
 
         authenticationDataProvider = authentication.getAuthData(remoteHostName);
         AuthData authData = authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA);
-        return Commands.newConnect(authentication.getAuthMethodName(), authData, this.protocolVersion,
-            PulsarVersion.getVersion(), proxyToTargetBrokerAddress, clientAuthRole, clientAuthData,
-            clientAuthMethod);
+        return Commands.newConnect(authentication.getAuthMethodName(), authData, protocolVersion,
+                PulsarVersion.getVersion(), proxyToTargetBrokerAddress, clientAuthRole, clientAuthData,
+                clientAuthMethod);
     }
 
-    private static final Logger log = LoggerFactory.getLogger(ProxyClientCnx.class);
+    @Override
+    protected void handleAuthChallenge(CommandAuthChallenge authChallenge) {
+        checkArgument(authChallenge.hasChallenge());
+        checkArgument(authChallenge.getChallenge().hasAuthData());
+
+        boolean isRefresh = Arrays.equals(AuthData.REFRESH_AUTH_DATA_BYTES, authChallenge.getChallenge().getAuthData());
+        if (!forwardClientAuthData || !isRefresh) {
+            super.handleAuthChallenge(authChallenge);
+            return;
+        }
+
+        try {
+            if (log.isDebugEnabled()) {
+                log.debug("Proxy {} request to refresh the original client authentication data for "
+                        + "the proxy client {}", proxyConnection.ctx().channel(), ctx.channel());
+            }
+
+            proxyConnection.ctx().writeAndFlush(Commands.newAuthChallenge(clientAuthMethod, AuthData.REFRESH_AUTH_DATA,
+                            protocolVersion))
+                    .addListener(writeFuture -> {
+                        if (writeFuture.isSuccess()) {
+                            if (log.isDebugEnabled()) {
+                                log.debug("Proxy {} sent the auth challenge to original client to refresh credentials "
+                                                + "with method {} for the proxy client {}",
+                                        proxyConnection.ctx().channel(), clientAuthMethod, ctx.channel());
+                            }
+                        } else {
+                            log.error("Failed to send the auth challenge to original client by the proxy {} "
+                                            + "for the proxy client {}",
+                                    proxyConnection.ctx().channel(),
+                                    ctx.channel(),
+                                    writeFuture.cause());
+                            closeWithException(writeFuture.cause());

Review Comment:
   Write to user client fail then close `proxyClientCnx`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nodece commented on a diff in pull request #17831: [fix][proxy] Fix refresh client auth

Posted by GitBox <gi...@apache.org>.
nodece commented on code in PR #17831:
URL: https://github.com/apache/pulsar/pull/17831#discussion_r983030763


##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java:
##########
@@ -543,6 +577,35 @@ protected void handleAuthResponse(CommandAuthResponse authResponse) {
         }
     }
 
+    private Void requestRefreshClientAuthData() {

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nodece commented on a diff in pull request #17831: [fix][proxy] Fix refresh client auth

Posted by GitBox <gi...@apache.org>.
nodece commented on code in PR #17831:
URL: https://github.com/apache/pulsar/pull/17831#discussion_r983089312


##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java:
##########
@@ -535,6 +542,42 @@ protected void handleAuthResponse(CommandAuthResponse authResponse) {
         try {
             AuthData clientData = AuthData.of(authResponse.getResponse().getAuthData());
             doAuthentication(clientData);
+            if (connectionPool != null && state == State.ProxyLookupRequests) {

Review Comment:
   Done



##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java:
##########
@@ -54,10 +66,33 @@ protected ByteBuf newConnectCommand() throws Exception {
 
         authenticationDataProvider = authentication.getAuthData(remoteHostName);
         AuthData authData = authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA);
-        return Commands.newConnect(authentication.getAuthMethodName(), authData, this.protocolVersion,
-            PulsarVersion.getVersion(), proxyToTargetBrokerAddress, clientAuthRole, clientAuthData,
-            clientAuthMethod);
+        return Commands.newConnect(authentication.getAuthMethodName(), authData, protocolVersion,
+                PulsarVersion.getVersion(), proxyToTargetBrokerAddress, clientAuthRole, clientAuthData,
+                clientAuthMethod);
     }
 
-    private static final Logger log = LoggerFactory.getLogger(ProxyClientCnx.class);
+    @Override
+    protected void handleAuthChallenge(CommandAuthChallenge authChallenge) {
+        checkArgument(authChallenge.hasChallenge());
+        checkArgument(authChallenge.getChallenge().hasAuthData());
+
+        boolean isRefresh = Arrays.equals(AuthData.REFRESH_AUTH_DATA_BYTES, authChallenge.getChallenge().getAuthData());
+        if (!forwardClientAuthData || !isRefresh || refreshClientAuthDataNotifier == null) {
+            super.handleAuthChallenge(authChallenge);
+            return;
+        }
+
+        try {
+            if (log.isDebugEnabled()) {
+                log.debug("{} Request to refresh the original client authentication data", ctx.channel());
+            }
+            refreshClientAuthDataNotifier.run();

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nodece merged pull request #17831: [fix][proxy] Fix refresh client auth

Posted by GitBox <gi...@apache.org>.
nodece merged PR #17831:
URL: https://github.com/apache/pulsar/pull/17831


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nodece commented on pull request #17831: [fix][proxy] Fix refresh client auth

Posted by GitBox <gi...@apache.org>.
nodece commented on PR #17831:
URL: https://github.com/apache/pulsar/pull/17831#issuecomment-1380040321

   > I think that this change should be reverted. This can cause corruption of the traffic between the client and the proxy. I provided some explanation in this comment: [#10816 (comment)](https://github.com/apache/pulsar/issues/10816#issuecomment-1359198680) . When the proxy is passing bytes back and forth, it is not safe to inject any protocol messages to the connection. That might corrupt the traffic.
   
   I just fix the ProxyLookupRequests connection, so don't break other connections.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nodece commented on a diff in pull request #17831: [fix][proxy] Fix refresh client auth

Posted by GitBox <gi...@apache.org>.
nodece commented on code in PR #17831:
URL: https://github.com/apache/pulsar/pull/17831#discussion_r983007827


##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java:
##########
@@ -513,7 +548,7 @@ remoteAddress, protocolVersionToAdvertise, getRemoteEndpointProtocolVersion(),
 
             authState = authenticationProvider.newAuthState(clientData, remoteAddress, sslSession);
             authenticationData = authState.getAuthDataSource();
-            doAuthentication(clientData);
+            doAuthentication(clientData, Optional.empty());

Review Comment:
   Do you have any idea?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nodece commented on a diff in pull request #17831: [fix][proxy] Fix refresh client auth

Posted by GitBox <gi...@apache.org>.
nodece commented on code in PR #17831:
URL: https://github.com/apache/pulsar/pull/17831#discussion_r983055072


##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java:
##########
@@ -535,6 +542,42 @@ protected void handleAuthResponse(CommandAuthResponse authResponse) {
         try {
             AuthData clientData = AuthData.of(authResponse.getResponse().getAuthData());
             doAuthentication(clientData);
+            if (connectionPool != null && state == State.ProxyLookupRequests) {
+                if (service.getConfiguration().isForwardAuthorizationCredentials()) {
+                    connectionPool.getPool().values().forEach(n -> n.values().forEach(cnxFuture -> {
+                        String clientVersion;
+                        if (authResponse.hasClientVersion()) {
+                            clientVersion = authResponse.getClientVersion();
+                        } else {
+                            clientVersion = PulsarVersion.getVersion();
+                        }
+                        int protocolVersion;
+                        if (authResponse.hasProtocolVersion()) {
+                            protocolVersion = authResponse.getProtocolVersion();
+                        } else {
+                            protocolVersion = Commands.getCurrentProtocolVersion();
+                        }
+
+                        ByteBuf cmd =
+                                Commands.newAuthResponse(clientAuthMethod, clientData, protocolVersion, clientVersion);
+                        cnxFuture.thenAccept(clientCnx -> clientCnx.ctx().writeAndFlush(cmd)
+                                .addListener(writeFuture -> {
+                                    if (writeFuture.isSuccess()) {
+                                        if (LOG.isDebugEnabled()) {
+                                            LOG.debug(
+                                                    "{} authentication is refreshed successfully by {}, auth method: {} ",
+                                                    clientCnx.ctx().channel(), ctx.channel(), clientAuthMethod);
+                                        }
+                                    } else {
+                                        LOG.error("{} Failed to refresh request for mutual auth to client {}",
+                                                clientCnx.ctx().channel(),
+                                                writeFuture.cause());
+                                        ctx.channel().pipeline().fireExceptionCaught(writeFuture.cause());

Review Comment:
   You are right!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nodece commented on a diff in pull request #17831: [fix][proxy] Fix refresh client auth

Posted by GitBox <gi...@apache.org>.
nodece commented on code in PR #17831:
URL: https://github.com/apache/pulsar/pull/17831#discussion_r983053826


##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java:
##########
@@ -535,6 +542,42 @@ protected void handleAuthResponse(CommandAuthResponse authResponse) {
         try {
             AuthData clientData = AuthData.of(authResponse.getResponse().getAuthData());
             doAuthentication(clientData);
+            if (connectionPool != null && state == State.ProxyLookupRequests) {
+                if (service.getConfiguration().isForwardAuthorizationCredentials()) {
+                    connectionPool.getPool().values().forEach(n -> n.values().forEach(cnxFuture -> {
+                        String clientVersion;
+                        if (authResponse.hasClientVersion()) {
+                            clientVersion = authResponse.getClientVersion();
+                        } else {
+                            clientVersion = PulsarVersion.getVersion();
+                        }
+                        int protocolVersion;
+                        if (authResponse.hasProtocolVersion()) {
+                            protocolVersion = authResponse.getProtocolVersion();
+                        } else {
+                            protocolVersion = Commands.getCurrentProtocolVersion();
+                        }
+
+                        ByteBuf cmd =
+                                Commands.newAuthResponse(clientAuthMethod, clientData, protocolVersion, clientVersion);
+                        cnxFuture.thenAccept(clientCnx -> clientCnx.ctx().writeAndFlush(cmd)

Review Comment:
   > And the name is also confusing, it should be `toBrokerCnx`?
   
   Yes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #17831: [fix][proxy] Fix refresh client auth

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #17831:
URL: https://github.com/apache/pulsar/pull/17831#discussion_r983161996


##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java:
##########
@@ -54,10 +59,54 @@ protected ByteBuf newConnectCommand() throws Exception {
 
         authenticationDataProvider = authentication.getAuthData(remoteHostName);
         AuthData authData = authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA);
-        return Commands.newConnect(authentication.getAuthMethodName(), authData, this.protocolVersion,
-            PulsarVersion.getVersion(), proxyToTargetBrokerAddress, clientAuthRole, clientAuthData,
-            clientAuthMethod);
+        return Commands.newConnect(authentication.getAuthMethodName(), authData, protocolVersion,
+                PulsarVersion.getVersion(), proxyToTargetBrokerAddress, clientAuthRole, clientAuthData,
+                clientAuthMethod);
     }
 
-    private static final Logger log = LoggerFactory.getLogger(ProxyClientCnx.class);
+    @Override
+    protected void handleAuthChallenge(CommandAuthChallenge authChallenge) {
+        checkArgument(authChallenge.hasChallenge());
+        checkArgument(authChallenge.getChallenge().hasAuthData());
+
+        boolean isRefresh = Arrays.equals(AuthData.REFRESH_AUTH_DATA_BYTES, authChallenge.getChallenge().getAuthData());
+        if (!forwardClientAuthData || !isRefresh) {
+            super.handleAuthChallenge(authChallenge);
+            return;
+        }
+
+        try {
+            if (log.isDebugEnabled()) {
+                log.debug("Proxy {} request to refresh the original client authentication data for "
+                        + "the proxy client {}", proxyConnection.ctx().channel(), ctx.channel());
+            }
+
+            proxyConnection.ctx().writeAndFlush(Commands.newAuthChallenge(clientAuthMethod, AuthData.REFRESH_AUTH_DATA,
+                            protocolVersion))
+                    .addListener(writeFuture -> {
+                        if (writeFuture.isSuccess()) {
+                            if (log.isDebugEnabled()) {
+                                log.debug("Proxy {} sent the auth challenge to original client to refresh credentials "
+                                                + "with method {} for the proxy client {}",
+                                        proxyConnection.ctx().channel(), clientAuthMethod, ctx.channel());
+                            }
+                        } else {
+                            log.error("Failed to send the auth challenge to original client by the proxy {} "
+                                            + "for the proxy client {}",
+                                    proxyConnection.ctx().channel(),
+                                    ctx.channel(),
+                                    writeFuture.cause());
+                            closeWithException(writeFuture.cause());

Review Comment:
   Write to user client fail then close `proxyClientCnx`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #17831: [fix][proxy] Fix refresh client auth

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #17831:
URL: https://github.com/apache/pulsar/pull/17831#discussion_r983073423


##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java:
##########
@@ -54,10 +66,33 @@ protected ByteBuf newConnectCommand() throws Exception {
 
         authenticationDataProvider = authentication.getAuthData(remoteHostName);
         AuthData authData = authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA);
-        return Commands.newConnect(authentication.getAuthMethodName(), authData, this.protocolVersion,
-            PulsarVersion.getVersion(), proxyToTargetBrokerAddress, clientAuthRole, clientAuthData,
-            clientAuthMethod);
+        return Commands.newConnect(authentication.getAuthMethodName(), authData, protocolVersion,
+                PulsarVersion.getVersion(), proxyToTargetBrokerAddress, clientAuthRole, clientAuthData,
+                clientAuthMethod);
     }
 
-    private static final Logger log = LoggerFactory.getLogger(ProxyClientCnx.class);
+    @Override
+    protected void handleAuthChallenge(CommandAuthChallenge authChallenge) {
+        checkArgument(authChallenge.hasChallenge());
+        checkArgument(authChallenge.getChallenge().hasAuthData());
+
+        boolean isRefresh = Arrays.equals(AuthData.REFRESH_AUTH_DATA_BYTES, authChallenge.getChallenge().getAuthData());
+        if (!forwardClientAuthData || !isRefresh || refreshClientAuthDataNotifier == null) {
+            super.handleAuthChallenge(authChallenge);
+            return;
+        }
+
+        try {
+            if (log.isDebugEnabled()) {
+                log.debug("{} Request to refresh the original client authentication data", ctx.channel());
+            }
+            refreshClientAuthDataNotifier.run();

Review Comment:
   @nodece I mean we can remove the `refreshClientAuthDataNotifier`, instead add `ctx` and `protocolVersion` to `ProxyClientCnx`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lhotari commented on pull request #17831: [fix][proxy] Fix refresh client auth

Posted by GitBox <gi...@apache.org>.
lhotari commented on PR #17831:
URL: https://github.com/apache/pulsar/pull/17831#issuecomment-1359207195

   I think that this change should be reverted. This can cause corruption of the traffic between the client and the proxy. 
   I provided some explanation in this comment: https://github.com/apache/pulsar/issues/10816#issuecomment-1359198680 . When the proxy is passing bytes back and forth, it is not safe to inject any protocol messages to the connection. That might corrupt the traffic.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nodece commented on a diff in pull request #17831: [fix][proxy] Fix refresh client auth

Posted by GitBox <gi...@apache.org>.
nodece commented on code in PR #17831:
URL: https://github.com/apache/pulsar/pull/17831#discussion_r980802759


##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java:
##########
@@ -18,46 +18,61 @@
  */
 package org.apache.pulsar.proxy.server;
 
-import io.netty.buffer.ByteBuf;
 import io.netty.channel.EventLoopGroup;
-import org.apache.pulsar.PulsarVersion;
+import java.util.Arrays;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import javax.naming.AuthenticationException;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.impl.ClientCnx;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.common.api.AuthData;
-import org.apache.pulsar.common.protocol.Commands;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.pulsar.common.api.proto.CommandAuthChallenge;
 
+@Slf4j
 public class ProxyClientCnx extends ClientCnx {
-
-    String clientAuthRole;
-    AuthData clientAuthData;
-    String clientAuthMethod;
-    int protocolVersion;
+    private final boolean forwardClientAuthData;
+    private final String clientAuthMethod;
+    private final String clientAuthRole;
+    private final Function<Boolean, CompletableFuture<AuthData>> clientAuthDataSupplier;
 
     public ProxyClientCnx(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, String clientAuthRole,
-                          AuthData clientAuthData, String clientAuthMethod, int protocolVersion) {
-        super(conf, eventLoopGroup);
+                          Function<Boolean, CompletableFuture<AuthData>> clientAuthDataSupplier,
+                          String clientAuthMethod,
+                          int protocolVersion, boolean forwardClientAuthData) {
+        super(conf, eventLoopGroup, protocolVersion);
         this.clientAuthRole = clientAuthRole;
-        this.clientAuthData = clientAuthData;
+        this.clientAuthDataSupplier = clientAuthDataSupplier;
         this.clientAuthMethod = clientAuthMethod;
-        this.protocolVersion = protocolVersion;
+        this.forwardClientAuthData = forwardClientAuthData;
     }
 
     @Override
-    protected ByteBuf newConnectCommand() throws Exception {
-        if (log.isDebugEnabled()) {
-            log.debug("New Connection opened via ProxyClientCnx with params clientAuthRole = {},"
-                            + " clientAuthData = {}, clientAuthMethod = {}",
-                    clientAuthRole, clientAuthData, clientAuthMethod);
+    protected void completeActive() {
+        clientAuthDataSupplier.apply(false).thenAccept(clientAuthData -> {
+            try {
+                sendConnectCommand(clientAuthRole, clientAuthData, clientAuthMethod);
+            } catch (Exception e) {
+                log.error("{} Error during handshake", ctx.channel(), e);
+                close(e);
+            }
+        });
+    }
+
+    @Override
+    protected void prepareMutualAuth(CommandAuthChallenge authChallenge) throws AuthenticationException {
+        boolean isRefresh = Arrays.equals(AuthData.REFRESH_AUTH_DATA_BYTES, authChallenge.getChallenge().getAuthData());
+        if (!forwardClientAuthData || !isRefresh) {
+            super.prepareMutualAuth(authChallenge);
+            return;
         }
 
-        authenticationDataProvider = authentication.getAuthData(remoteHostName);
-        AuthData authData = authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA);
-        return Commands.newConnect(authentication.getAuthMethodName(), authData, this.protocolVersion,
-            PulsarVersion.getVersion(), proxyToTargetBrokerAddress, clientAuthRole, clientAuthData,
-            clientAuthMethod);
+        clientAuthDataSupplier.apply(true).thenAccept(originalClientAuthData -> {
+            sendMutualAuthCommand(clientAuthMethod, originalClientAuthData);
+        }).exceptionally(e -> {
+            log.error("{} Error mutual verify", ctx.channel(), e);

Review Comment:
   Ok, I'll refactor this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lin-zhao commented on a diff in pull request #17831: [fix][proxy] Fix refresh client auth

Posted by GitBox <gi...@apache.org>.
lin-zhao commented on code in PR #17831:
URL: https://github.com/apache/pulsar/pull/17831#discussion_r980609442


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java:
##########
@@ -253,8 +254,25 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception {
         } else {
             log.info("{} Connected through proxy to target broker at {}", ctx.channel(), proxyToTargetBrokerAddress);
         }
-        // Send CONNECT command
-        ctx.writeAndFlush(newConnectCommand())
+        completeActive();
+    }
+
+    protected void completeActive() throws Exception {
+        sendConnectCommand(null, null, null);
+    }
+
+    protected final void sendConnectCommand(String originalPrincipal, AuthData originalAuthData,
+                                            String originalAuthMethod) throws Exception {
+        // mutual authentication is to auth between `remoteHostName` and this client for this channel.
+        // each channel will have a mutual client/server pair, mutual client evaluateChallenge with init data,
+        // and return authData to server.
+        authenticationDataProvider = authentication.getAuthData(remoteHostName);

Review Comment:
   nit pick: 
   ```suggestion
           this.authenticationDataProvider = authentication.getAuthData(remoteHostName);
   ```



##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java:
##########
@@ -18,46 +18,61 @@
  */
 package org.apache.pulsar.proxy.server;
 
-import io.netty.buffer.ByteBuf;
 import io.netty.channel.EventLoopGroup;
-import org.apache.pulsar.PulsarVersion;
+import java.util.Arrays;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import javax.naming.AuthenticationException;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.impl.ClientCnx;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.common.api.AuthData;
-import org.apache.pulsar.common.protocol.Commands;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.pulsar.common.api.proto.CommandAuthChallenge;
 
+@Slf4j
 public class ProxyClientCnx extends ClientCnx {
-
-    String clientAuthRole;
-    AuthData clientAuthData;
-    String clientAuthMethod;
-    int protocolVersion;
+    private final boolean forwardClientAuthData;
+    private final String clientAuthMethod;
+    private final String clientAuthRole;
+    private final Function<Boolean, CompletableFuture<AuthData>> clientAuthDataSupplier;
 
     public ProxyClientCnx(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, String clientAuthRole,
-                          AuthData clientAuthData, String clientAuthMethod, int protocolVersion) {
-        super(conf, eventLoopGroup);
+                          Function<Boolean, CompletableFuture<AuthData>> clientAuthDataSupplier,
+                          String clientAuthMethod,
+                          int protocolVersion, boolean forwardClientAuthData) {
+        super(conf, eventLoopGroup, protocolVersion);
         this.clientAuthRole = clientAuthRole;
-        this.clientAuthData = clientAuthData;
+        this.clientAuthDataSupplier = clientAuthDataSupplier;
         this.clientAuthMethod = clientAuthMethod;
-        this.protocolVersion = protocolVersion;
+        this.forwardClientAuthData = forwardClientAuthData;
     }
 
     @Override
-    protected ByteBuf newConnectCommand() throws Exception {
-        if (log.isDebugEnabled()) {
-            log.debug("New Connection opened via ProxyClientCnx with params clientAuthRole = {},"
-                            + " clientAuthData = {}, clientAuthMethod = {}",
-                    clientAuthRole, clientAuthData, clientAuthMethod);
+    protected void completeActive() {
+        clientAuthDataSupplier.apply(false).thenAccept(clientAuthData -> {
+            try {
+                sendConnectCommand(clientAuthRole, clientAuthData, clientAuthMethod);
+            } catch (Exception e) {
+                log.error("{} Error during handshake", ctx.channel(), e);
+                close(e);
+            }
+        });
+    }
+
+    @Override
+    protected void prepareMutualAuth(CommandAuthChallenge authChallenge) throws AuthenticationException {
+        boolean isRefresh = Arrays.equals(AuthData.REFRESH_AUTH_DATA_BYTES, authChallenge.getChallenge().getAuthData());
+        if (!forwardClientAuthData || !isRefresh) {
+            super.prepareMutualAuth(authChallenge);
+            return;
         }
 
-        authenticationDataProvider = authentication.getAuthData(remoteHostName);
-        AuthData authData = authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA);
-        return Commands.newConnect(authentication.getAuthMethodName(), authData, this.protocolVersion,
-            PulsarVersion.getVersion(), proxyToTargetBrokerAddress, clientAuthRole, clientAuthData,
-            clientAuthMethod);
+        clientAuthDataSupplier.apply(true).thenAccept(originalClientAuthData -> {
+            sendMutualAuthCommand(clientAuthMethod, originalClientAuthData);
+        }).exceptionally(e -> {
+            log.error("{} Error mutual verify", ctx.channel(), e);

Review Comment:
   What's the reason to swallow any exception instead of throwing to the caller? This method declares `AuthenticationException`. Is it supposed to be thrown by this line? If yes you probably don't want to handle and ignore all exceptions here.



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java:
##########
@@ -1243,6 +1254,13 @@ public void close() {
        }
     }
 
+    public void close(Throwable e) {

Review Comment:
   Is there a particular reason to make this public? `close` as a method name is very generic.
   
   How about 
   ```suggestion
       private void closeWithException(Throwable e) {
   ```



##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java:
##########
@@ -18,46 +18,61 @@
  */
 package org.apache.pulsar.proxy.server;
 
-import io.netty.buffer.ByteBuf;
 import io.netty.channel.EventLoopGroup;
-import org.apache.pulsar.PulsarVersion;
+import java.util.Arrays;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import javax.naming.AuthenticationException;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.impl.ClientCnx;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.common.api.AuthData;
-import org.apache.pulsar.common.protocol.Commands;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.pulsar.common.api.proto.CommandAuthChallenge;
 
+@Slf4j
 public class ProxyClientCnx extends ClientCnx {
-
-    String clientAuthRole;
-    AuthData clientAuthData;
-    String clientAuthMethod;
-    int protocolVersion;
+    private final boolean forwardClientAuthData;
+    private final String clientAuthMethod;
+    private final String clientAuthRole;
+    private final Function<Boolean, CompletableFuture<AuthData>> clientAuthDataSupplier;
 
     public ProxyClientCnx(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, String clientAuthRole,

Review Comment:
   This is a backward incompatible change. Do you want to instead add a new overloaded method instead of changing the existing constructor? Otherwise there's argument this needs to be a major version bump.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nodece commented on a diff in pull request #17831: [fix][proxy] Fix refresh client auth

Posted by GitBox <gi...@apache.org>.
nodece commented on code in PR #17831:
URL: https://github.com/apache/pulsar/pull/17831#discussion_r980827134


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java:
##########
@@ -253,8 +254,25 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception {
         } else {
             log.info("{} Connected through proxy to target broker at {}", ctx.channel(), proxyToTargetBrokerAddress);
         }
-        // Send CONNECT command
-        ctx.writeAndFlush(newConnectCommand())
+        completeActive();
+    }
+
+    protected void completeActive() throws Exception {
+        sendConnectCommand(null, null, null);

Review Comment:
   It is right, these parameters are original(real user) auth.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nodece commented on a diff in pull request #17831: [fix][proxy] Fix refresh client auth

Posted by GitBox <gi...@apache.org>.
nodece commented on code in PR #17831:
URL: https://github.com/apache/pulsar/pull/17831#discussion_r980825915


##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java:
##########
@@ -543,6 +557,48 @@ protected void handleAuthResponse(CommandAuthResponse authResponse) {
         }
     }
 
+    private CompletableFuture<AuthData> getOrRefreshOriginalClientAuthData(boolean isRefresh) {
+        if (!isRefresh) {
+            if (service.getConfiguration().isForwardAuthorizationCredentials()) {
+                return CompletableFuture.completedFuture(clientAuthData);
+            }
+            return CompletableFuture.completedFuture(null);
+        }
+
+        if (refreshAuthFuture != null && !refreshAuthFuture.isDone()) {
+            log.error("{} Mutual auth timeout", ctx.channel());
+            ctx.close();
+            return CompletableFuture.failedFuture(new AuthenticationException("Mutual auth timeout"));
+        }
+
+        refreshAuthFuture = new CompletableFuture<>();
+        try {
+            AuthData refreshAuthData = authState.refreshAuthentication();
+            ctx.writeAndFlush(Commands.newAuthChallenge(clientAuthMethod, refreshAuthData, protocolVersionToAdvertise))
+                    .addListener(writeFuture -> {
+                        if (writeFuture.isSuccess()) {
+                            if (LOG.isDebugEnabled()) {
+                                LOG.debug("{} Sent auth challenge to client to refresh credentials with method: {}",
+                                        ctx.channel(), clientAuthMethod);
+                            }
+                        } else {
+                            LOG.error("{} Failed to send request for mutual auth to client", ctx.channel(),
+                                    writeFuture.cause());
+                            refreshAuthFuture.completeExceptionally(writeFuture.cause());
+                            ctx.close();
+                        }
+                    });
+        } catch (AuthenticationException e) {
+            log.error("{} Failed to refresh authentication", ctx.channel(), e);
+            ctx.writeAndFlush(
+                            Commands.newError(-1, ServerError.AuthenticationError, "Failed to refresh authentication"))
+                    .addListener(ChannelFutureListener.CLOSE);
+            refreshAuthFuture.completeExceptionally(e);
+        }
+
+        return refreshAuthFuture;

Review Comment:
   > Looks like the `refreshAuthFuture` will only be completed with the exception, not the auth data. How does it work?
   
   See `org.apache.pulsar.proxy.server.ProxyConnection#doAuthentication`, we will complete auth in this method.
   
   > If we need the client(user) to refresh the auth data, we can just let the proxy forward the auth challenge command to the client and forward the client auth response to the broker. Does this not work? or maybe I missed something.
   
    This is `org.apache.pulsar.proxy.server.DirectProxyHandler` functional, the client can directly communication with broker by that.
   
   You miss the first connection that the client connects to the proxy, which is used to lookup operations, next step is the client can directly communication with the broker by the proxy.
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nodece commented on a diff in pull request #17831: [fix][proxy] Fix refresh client auth

Posted by GitBox <gi...@apache.org>.
nodece commented on code in PR #17831:
URL: https://github.com/apache/pulsar/pull/17831#discussion_r983011081


##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java:
##########
@@ -54,10 +67,33 @@ protected ByteBuf newConnectCommand() throws Exception {
 
         authenticationDataProvider = authentication.getAuthData(remoteHostName);
         AuthData authData = authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA);
-        return Commands.newConnect(authentication.getAuthMethodName(), authData, this.protocolVersion,
-            PulsarVersion.getVersion(), proxyToTargetBrokerAddress, clientAuthRole, clientAuthData,
-            clientAuthMethod);
+        return Commands.newConnect(authentication.getAuthMethodName(), authData, protocolVersion,
+                PulsarVersion.getVersion(), proxyToTargetBrokerAddress, clientAuthRole, clientAuthData,
+                clientAuthMethod);
     }
 
-    private static final Logger log = LoggerFactory.getLogger(ProxyClientCnx.class);
+    @Override
+    protected void handleAuthChallenge(CommandAuthChallenge authChallenge) {
+        checkArgument(authChallenge.hasChallenge());
+        checkArgument(authChallenge.getChallenge().hasAuthData());
+
+        boolean isRefresh = Arrays.equals(AuthData.REFRESH_AUTH_DATA_BYTES, authChallenge.getChallenge().getAuthData());
+        if (!forwardClientAuthData || !isRefresh || refreshClientAuthDataNotifier == null) {
+            super.handleAuthChallenge(authChallenge);
+            return;
+        }
+
+        if (log.isDebugEnabled()) {
+            log.debug("Request to refresh the original client authentication data");
+        }
+        try {
+            refreshClientAuthDataNotifier.get();
+            if (state == State.SentConnectFrame) {
+                state = State.Connecting;

Review Comment:
   No conflicts, one is from `ProxyConnection`, and one is from `ProxyClientCnx`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nodece commented on a diff in pull request #17831: [fix][proxy] Fix refresh client auth

Posted by GitBox <gi...@apache.org>.
nodece commented on code in PR #17831:
URL: https://github.com/apache/pulsar/pull/17831#discussion_r980801591


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java:
##########
@@ -253,8 +254,25 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception {
         } else {
             log.info("{} Connected through proxy to target broker at {}", ctx.channel(), proxyToTargetBrokerAddress);
         }
-        // Send CONNECT command
-        ctx.writeAndFlush(newConnectCommand())
+        completeActive();
+    }
+
+    protected void completeActive() throws Exception {
+        sendConnectCommand(null, null, null);
+    }
+
+    protected final void sendConnectCommand(String originalPrincipal, AuthData originalAuthData,
+                                            String originalAuthMethod) throws Exception {
+        // mutual authentication is to auth between `remoteHostName` and this client for this channel.
+        // each channel will have a mutual client/server pair, mutual client evaluateChallenge with init data,
+        // and return authData to server.
+        authenticationDataProvider = authentication.getAuthData(remoteHostName);

Review Comment:
   When using global variables, most do not have `this`.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nodece commented on a diff in pull request #17831: [fix][proxy] Fix refresh client auth

Posted by GitBox <gi...@apache.org>.
nodece commented on code in PR #17831:
URL: https://github.com/apache/pulsar/pull/17831#discussion_r980961703


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java:
##########
@@ -253,8 +254,25 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception {
         } else {
             log.info("{} Connected through proxy to target broker at {}", ctx.channel(), proxyToTargetBrokerAddress);
         }
-        // Send CONNECT command
-        ctx.writeAndFlush(newConnectCommand())
+        completeActive();
+    }
+
+    protected void completeActive() throws Exception {
+        sendConnectCommand(null, null, null);

Review Comment:
   We override the `completeActive()` in the `ProxyClientCnx` class.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #17831: [fix][proxy] Fix refresh client auth

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #17831:
URL: https://github.com/apache/pulsar/pull/17831#discussion_r983309654


##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java:
##########
@@ -54,10 +59,54 @@ protected ByteBuf newConnectCommand() throws Exception {
 
         authenticationDataProvider = authentication.getAuthData(remoteHostName);
         AuthData authData = authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA);
-        return Commands.newConnect(authentication.getAuthMethodName(), authData, this.protocolVersion,
-            PulsarVersion.getVersion(), proxyToTargetBrokerAddress, clientAuthRole, clientAuthData,
-            clientAuthMethod);
+        return Commands.newConnect(authentication.getAuthMethodName(), authData, protocolVersion,
+                PulsarVersion.getVersion(), proxyToTargetBrokerAddress, clientAuthRole, clientAuthData,
+                clientAuthMethod);
     }
 
-    private static final Logger log = LoggerFactory.getLogger(ProxyClientCnx.class);
+    @Override
+    protected void handleAuthChallenge(CommandAuthChallenge authChallenge) {
+        checkArgument(authChallenge.hasChallenge());
+        checkArgument(authChallenge.getChallenge().hasAuthData());
+
+        boolean isRefresh = Arrays.equals(AuthData.REFRESH_AUTH_DATA_BYTES, authChallenge.getChallenge().getAuthData());
+        if (!forwardClientAuthData || !isRefresh) {
+            super.handleAuthChallenge(authChallenge);
+            return;
+        }
+
+        try {
+            if (log.isDebugEnabled()) {
+                log.debug("Proxy {} request to refresh the original client authentication data for "
+                        + "the proxy client {}", proxyConnection.ctx().channel(), ctx.channel());
+            }
+
+            proxyConnection.ctx().writeAndFlush(Commands.newAuthChallenge(clientAuthMethod, AuthData.REFRESH_AUTH_DATA,

Review Comment:
   It looks like we can make some improvements to avoid sending auth challenge command to the client if the proxy just has done the auth challenge. Because all the brokers that interacted with client connection will send the auth challenge command



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #17831: [fix][proxy] Fix refresh client auth

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #17831:
URL: https://github.com/apache/pulsar/pull/17831#discussion_r983064900


##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java:
##########
@@ -535,6 +542,42 @@ protected void handleAuthResponse(CommandAuthResponse authResponse) {
         try {
             AuthData clientData = AuthData.of(authResponse.getResponse().getAuthData());
             doAuthentication(clientData);
+            if (connectionPool != null && state == State.ProxyLookupRequests) {

Review Comment:
   can we merge the if judgement?(line:546)



##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java:
##########
@@ -535,6 +542,42 @@ protected void handleAuthResponse(CommandAuthResponse authResponse) {
         try {
             AuthData clientData = AuthData.of(authResponse.getResponse().getAuthData());
             doAuthentication(clientData);
+            if (connectionPool != null && state == State.ProxyLookupRequests) {
+                if (service.getConfiguration().isForwardAuthorizationCredentials()) {
+                    connectionPool.getPool().values().forEach(n -> n.values().forEach(cnxFuture -> {
+                        String clientVersion;
+                        if (authResponse.hasClientVersion()) {
+                            clientVersion = authResponse.getClientVersion();
+                        } else {
+                            clientVersion = PulsarVersion.getVersion();
+                        }
+                        int protocolVersion;
+                        if (authResponse.hasProtocolVersion()) {
+                            protocolVersion = authResponse.getProtocolVersion();
+                        } else {
+                            protocolVersion = Commands.getCurrentProtocolVersion();
+                        }
+
+                        ByteBuf cmd =
+                                Commands.newAuthResponse(clientAuthMethod, clientData, protocolVersion, clientVersion);
+                        cnxFuture.thenAccept(clientCnx -> clientCnx.ctx().writeAndFlush(cmd)
+                                .addListener(writeFuture -> {
+                                    if (writeFuture.isSuccess()) {
+                                        if (LOG.isDebugEnabled()) {
+                                            LOG.debug(
+                                                    "{} authentication is refreshed successfully by {}, auth method: {} ",
+                                                    clientCnx.ctx().channel(), ctx.channel(), clientAuthMethod);
+                                        }
+                                    } else {
+                                        LOG.error("{} Failed to refresh request for mutual auth to client {}",
+                                                clientCnx.ctx().channel(),
+                                                writeFuture.cause());
+                                        ctx.channel().pipeline().fireExceptionCaught(writeFuture.cause());

Review Comment:
   If we write to the broker failed, should we retry or close the user client channel?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org