You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2020/07/29 09:38:56 UTC

[pulsar] branch branch-2.6 updated (63d4078 -> f0363e3)

This is an automated email from the ASF dual-hosted git repository.

rxl pushed a change to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


    from 63d4078  Fix conflict
     new 828eb43  [C++] Reduce log level for ack-grouping tracker (#7373)
     new b64e02a  update for pr:Restore clusterDispatchRate policy for compatibility (#7380)
     new 0a2df5a  [Issue 7347] Avoid the NPE occurs in method `ManagedLedgerImpl.isOffloadedNeedsDelete` (#7389)
     new 5b3889c  Catch NPE and detect state doesn't move (#7401)
     new 30a5221  Avoid NPEs at ledger creation when DNS failures happen (#7403)
     new f62bc5b  [Issue 7407] NPE with tombstones (#7408)
     new eb343fd  Fix batch ackset recycled multiple times. (#7409)
     new ae246fe  Decompression payload if needed in KeyShared subscription (#7416)
     new c86fe38  [client authentication] add authentication client with oauth2 support (#7420)
     new cd19d3b  [pulsar-doc] Add document for Pulasr SNI routing with ATS (#7421)
     new 2a65167  [pulsar-cli] fix update-cluster cli updates proxy-url (#7422)
     new 1fd55cf  [Doc]--Add labels to function statefulsets and services (#7428)
     new 754b864  Handle NotAllowed Exception at the client side. (#7430)
     new f0363e3  shaded jclouds to avoid gson conflict

The 14 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .gitignore                                         |   1 +
 jclouds-shaded/pom.xml                             | 140 ++++++++++++++++++
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |   4 +-
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  53 ++++---
 .../mledger/impl/OffloadLedgerDeleteTest.java      |  62 +++++++-
 .../bookkeeper/mledger/impl/OffloadPrefixTest.java |   2 +-
 pom.xml                                            |  13 +-
 .../broker/service/AbstractBaseDispatcher.java     |  47 ++----
 .../broker/service/BrokerServiceException.java     |   2 +
 .../service/schema/BookkeeperSchemaStorage.java    |  33 +++--
 .../pulsar/compaction/TwoPhaseCompactor.java       |  31 ++--
 .../apache/pulsar/broker/admin/AdminApiTest2.java  |  14 ++
 .../client/api/KeySharedSubscriptionTest.java      |  59 +++++++-
 .../TokenAuthenticatedProducerConsumerTest.java    | 143 ++++++++++++++++++
 ...kenOauth2AuthenticatedProducerConsumerTest.java | 160 +++++++++++++++++++++
 .../client/impl/BatchMessageIndexAckTest.java      |  31 ++++
 .../authentication/token/credentials_file.json     |   4 +
 .../pulsar/client/api/PulsarClientException.java   |  22 +++
 pulsar-client-cpp/include/pulsar/Result.h          |   1 +
 pulsar-client-cpp/lib/AckGroupingTrackerEnabled.cc |   6 +-
 pulsar-client-cpp/lib/ClientConnection.cc          |   3 +
 pulsar-client-cpp/lib/Result.cc                    |   3 +
 .../org/apache/pulsar/admin/cli/CmdClusters.java   |  10 +-
 .../client/impl/BatchMessageKeyBasedContainer.java |   9 --
 .../org/apache/pulsar/client/impl/ClientCnx.java   |   2 +
 .../client/impl/ConsumerStatsRecorderImpl.java     |   2 +-
 .../org/apache/pulsar/client/impl/MessageImpl.java |  10 +-
 .../PersistentAcknowledgmentsGroupingTracker.java  |   1 +
 .../AuthenticationDataOAuth2.java}                 |  31 ++--
 .../auth/oauth2/AuthenticationFactoryOAuth2.java}  |  43 +++---
 .../impl/auth/oauth2/AuthenticationOAuth2.java     | 131 +++++++++++++++++
 .../impl/auth/oauth2/ClientCredentialsFlow.java    | 146 +++++++++++++++++++
 .../pulsar/client/impl/auth/oauth2/Flow.java       |  45 +++---
 .../pulsar/client/impl/auth/oauth2/FlowBase.java   |  80 +++++++++++
 .../pulsar/client/impl/auth/oauth2/KeyFile.java    |  66 +++++++++
 .../pulsar/client/impl/auth/oauth2/Readme.md       |  94 ++++++++++++
 .../client/impl/auth/oauth2}/package-info.java     |   2 +-
 .../protocol/ClientCredentialsExchangeRequest.java |  23 +--
 .../protocol/ClientCredentialsExchanger.java       |  32 ++---
 .../oauth2/protocol/DefaultMetadataResolver.java   | 105 ++++++++++++++
 .../client/impl/auth/oauth2/protocol/Metadata.java |  37 +++--
 .../auth/oauth2/protocol/MetadataResolver.java     |  17 +--
 .../impl/auth/oauth2/protocol/TokenClient.java     | 121 ++++++++++++++++
 .../impl/auth/oauth2/protocol/TokenError.java      |  24 ++--
 .../oauth2/protocol/TokenExchangeException.java    |  20 ++-
 .../impl/auth/oauth2/protocol/TokenResult.java     |  28 ++--
 .../impl/auth/oauth2/protocol}/package-info.java   |   2 +-
 .../impl/auth/oauth2/AuthenticationOAuth2Test.java | 122 ++++++++++++++++
 .../pulsar/client/impl/auth/oauth2/MockClock.java  |  97 +++++++++++++
 .../apache/pulsar/common/api/proto/PulsarApi.java  |   3 +
 .../pulsar/common/policies/data/ClusterData.java   |   2 +
 .../apache/pulsar/common/protocol/Commands.java    |  11 +-
 pulsar-common/src/main/proto/PulsarApi.proto       |   1 +
 site2/docs/admin-api-namespaces.md                 |  11 +-
 site2/docs/assets/pulsar-sni-client.png            | Bin 0 -> 232801 bytes
 site2/docs/assets/pulsar-sni-geo.png               | Bin 0 -> 201549 bytes
 site2/docs/concepts-proxy-sni-routing.md           | 122 ++++++++++++++++
 site2/docs/functions-runtime.md                    |   3 +
 .../version-2.4.0/functions-runtime.md             |   3 +
 .../version-2.4.1/functions-runtime.md             |   3 +
 .../version-2.4.2/functions-runtime.md             |   3 +
 .../version-2.5.0/functions-runtime.md             |   3 +
 .../version-2.5.1/functions-runtime.md             |   3 +
 .../version-2.5.2/functions-runtime.md             |   3 +
 tiered-storage/jcloud/pom.xml                      |  36 ++++-
 .../impl/BlobStoreManagedLedgerOffloader.java      |   6 +-
 .../impl/BlobStoreManagedLedgerOffloaderTest.java  |   2 +-
 tiered-storage/pom.xml                             |   4 +
 68 files changed, 2070 insertions(+), 283 deletions(-)
 create mode 100644 jclouds-shaded/pom.xml
 create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenAuthenticatedProducerConsumerTest.java
 create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java
 create mode 100644 pulsar-broker/src/test/resources/authentication/token/credentials_file.json
 copy pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/{AuthenticationDataToken.java => oauth2/AuthenticationDataOAuth2.java} (63%)
 copy pulsar-client/src/{test/java/org/apache/pulsar/client/impl/ClientInitializationTest.java => main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java} (50%)
 create mode 100644 pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java
 create mode 100644 pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java
 copy pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConsumer.java => pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/Flow.java (55%)
 create mode 100644 pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/FlowBase.java
 create mode 100644 pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/KeyFile.java
 create mode 100644 pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/Readme.md
 copy {managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util => pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2}/package-info.java (94%)
 copy pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/IsCompatibilityResponse.java => pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/ClientCredentialsExchangeRequest.java (62%)
 copy pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreException.java => pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/ClientCredentialsExchanger.java (57%)
 create mode 100644 pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/DefaultMetadataResolver.java
 copy pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionState.java => pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/Metadata.java (58%)
 copy pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/Category.java => pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/MetadataResolver.java (77%)
 create mode 100644 pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java
 copy pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionState.java => pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenError.java (71%)
 copy pulsar-common/src/main/java/org/apache/pulsar/common/partition/PartitionedTopicMetadata.java => pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenExchangeException.java (66%)
 copy pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionState.java => pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenResult.java (66%)
 copy {managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util => pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol}/package-info.java (93%)
 create mode 100644 pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2Test.java
 create mode 100644 pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/MockClock.java
 create mode 100644 site2/docs/assets/pulsar-sni-client.png
 create mode 100644 site2/docs/assets/pulsar-sni-geo.png
 create mode 100644 site2/docs/concepts-proxy-sni-routing.md


[pulsar] 10/14: [pulsar-doc] Add document for Pulasr SNI routing with ATS (#7421)

Posted by rx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit cd19d3bf46efb88b235c7fa29118fbc38746908f
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Tue Jul 7 01:06:51 2020 -0700

    [pulsar-doc] Add document for Pulasr SNI routing with ATS (#7421)
    
    * [pulsar-doc] Add document for Pulasr SNI routing with ATS
    
    * address comments
    
    * add image
    
    * Update site2/docs/concepts-proxy-sni-routing.md
    
    Co-authored-by: Yu Liu <50...@users.noreply.github.com>
    
    * Update site2/docs/concepts-proxy-sni-routing.md
    
    Co-authored-by: Yu Liu <50...@users.noreply.github.com>
    
    * Update site2/docs/concepts-proxy-sni-routing.md
    
    Co-authored-by: Yu Liu <50...@users.noreply.github.com>
    
    * Update site2/docs/concepts-proxy-sni-routing.md
    
    Co-authored-by: Yu Liu <50...@users.noreply.github.com>
    
    * Update site2/docs/concepts-proxy-sni-routing.md
    
    Co-authored-by: Yu Liu <50...@users.noreply.github.com>
    
    Co-authored-by: Yu Liu <50...@users.noreply.github.com>
    (cherry picked from commit ee1b81030f0b6fe6f67fc6db89b4b55a1ef2f033)
---
 site2/docs/assets/pulsar-sni-client.png  | Bin 0 -> 232801 bytes
 site2/docs/assets/pulsar-sni-geo.png     | Bin 0 -> 201549 bytes
 site2/docs/concepts-proxy-sni-routing.md | 122 +++++++++++++++++++++++++++++++
 3 files changed, 122 insertions(+)

diff --git a/site2/docs/assets/pulsar-sni-client.png b/site2/docs/assets/pulsar-sni-client.png
new file mode 100644
index 0000000..c02e046
Binary files /dev/null and b/site2/docs/assets/pulsar-sni-client.png differ
diff --git a/site2/docs/assets/pulsar-sni-geo.png b/site2/docs/assets/pulsar-sni-geo.png
new file mode 100644
index 0000000..bff8482
Binary files /dev/null and b/site2/docs/assets/pulsar-sni-geo.png differ
diff --git a/site2/docs/concepts-proxy-sni-routing.md b/site2/docs/concepts-proxy-sni-routing.md
new file mode 100644
index 0000000..fc1ae9b
--- /dev/null
+++ b/site2/docs/concepts-proxy-sni-routing.md
@@ -0,0 +1,122 @@
+---
+id: concepts-proxy-sni-routing
+title: Proxy support with SNI routing
+sidebar_label: Proxy support with SNI routing
+---
+
+## Pulsar Proxy with SNI routing
+
+A proxy server is an intermediary server that forwards requests from multiple clients to different servers across the Internet. The proxy server acts as a "traffic cop" in both forward and reverse proxy scenarios, and brings various benefits to your system such as load balancing, performance, security, auto-scaling, etc. There are already many proxy servers available in the market which are fast and scalable. More importantly, these proxy servers cover various essential security aspects  [...]
+
+[PIP-60](https://github.com/apache/pulsar/wiki/PIP-60:-Support-Proxy-server-with-SNI-routing) explains the SNI routing protocol and how Pulsar clients support SNI routing protocol to connect to brokers through the proxy. This document explains how to set up the ATS proxy and the Pulsar client to enable SNI routing and connect Pulsar client to the broker through the ATS proxy.
+
+### ATS-SNI Routing in Pulsar
+[ATS supports layer-4 SNI routing](https://docs.trafficserver.apache.org/en/latest/admin-guide/layer-4-routing.en.html) with the requirement that inbound connection must be a TLS connection. The Pulsar client also supports SNI routing protocol on TLS connection and that allows Pulsar to use ATS as a reverse proxy when Pulsar client wants to connect to broker through ATS proxy. Therefore, this section explains how to set up and use ATS as a reverse proxy so pulsar clients can connect to b [...]
+
+
+#### ATS Proxy setup for layer-4 SNI routing
+
+This section explains how to set up ATS proxy to enable layer 4 SNI routing which will be used by Pulsar to use ATS as a reverse proxy.
+
+
+![Pulsar client SNI](assets/pulsar-sni-client.png)
+
+To support SNI routing, you need to configure two files: `records.conf` and `ssl_server_name.conf`.
+
+
+- `records.conf`: 
+The [records.config fil](https://docs.trafficserver.apache.org/en/latest/admin-guide/files/records.config.en.html) (By default, it is located in `/usr/local/etc/trafficserver/`.) is a list of configurable variables used by the Apache Traffic Server and we have to update this file with TLS port (`http.server_ports`) on which proxy can listen and proxy certs (`ssl.client.cert.path` and `ssl.client.cert.filename`) for secure TLS tunneling. We also have to configure a range of server ports ( [...]
+
+**Example:**
+
+```
+# PROXY TLS PORT
+CONFIG proxy.config.http.server_ports STRING 4443:ssl 4080
+# PROXY CERTS FILE PATH
+CONFIG proxy.config.ssl.client.cert.path STRING /proxy-cert.pem
+# PROXY KEY FILE PATH
+CONFIG proxy.config.ssl.client.cert.filename STRING /proxy-key.pem
+
+
+# The range of origin server ports that can be used for tunneling via CONNECT. # Traffic Server allows tunnels only to the specified ports. Supports both wildcards (*) and ranges (e.g. 0-1023).
+CONFIG proxy.config.http.connect_ports STRING 4443 6651
+```
+
+- `ssl_server_name.conf`: 
+The [ssl_server_name file](https://docs.trafficserver.apache.org/en/8.0.x/admin-guide/files/ssl_server_name.yaml.en.html) is used to configure aspects of TLS connection handling for both inbound and outbound connections. The configuration is driven by the SNI values provided by the inbound connection. The file consists of a set of configuration items, each identified by an SNI value (`fqdn`). When an inbound TLS connection is made, the SNI value from the TLS negotiation is matched agains [...]
+
+The following example shows mapping of inbound SNI hostname coming from the client and the actual broker’s service URL where request should be redirected. For example, if the client sends the SNI header `pulsar-broker1`, the  proxy creates a TLS tunnel by redirecting request to the service URL `pulsar-broker1:6651` 
+
+**Example:**
+
+```
+server_config = {
+  {
+     fqdn = 'pulsar-broker-vip',
+     # Forward to Pulsar broker which is listening on 6651
+     tunnel_route = 'pulsar-broker-vip:6651'
+  },
+  {
+     fqdn = 'pulsar-broker1',
+     # Forward to Pulsar broker-1 which is listening on 6651
+     tunnel_route = 'pulsar-broker1:6651'
+  },
+  {
+     fqdn = 'pulsar-broker2',
+     # Forward to Pulsar broker-2 which is listening on 6651
+     tunnel_route = 'pulsar-broker2:6651'
+  },
+}
+```
+Once, `ssl_server_name.config` and `records.config` are configured, ATS-proxy server is ready to handle SNI routing and can create TCP tunnel between the client and the broker.
+
+#### Pulsar-client Configuration with SNI routing
+
+Now, the ATS proxy server is configured and ready to handle SNI routing and create the TCP tunnel between the client and the broker. Here, we have to note that ATS SNI-routing works only with TLS. Therefore, the ATS proxy and brokers must have TLS enabled before the Pulsar client configures the SNI routing protocol to connect to the broker through ATS proxy. With [PIP-60](https://github.com/apache/pulsar/wiki/PIP-60:-Support-Proxy-server-with-SNI-routing), the pulsar client supports SNI  [...]
+
+```
+String brokerServiceUrl = “pulsar+ssl://pulsar-broker-vip:6651/”;
+String proxyUrl = “pulsar+ssl://ats-proxy:443”;
+ClientBuilder clientBuilder = PulsarClient.builder()
+		.serviceUrl(brokerServiceUrl)
+        .tlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH)
+        .enableTls(true)
+        .allowTlsInsecureConnection(false)
+        .proxyServiceUrl(proxyUrl, ProxyProtocol.SNI)
+        .operationTimeout(1000, TimeUnit.MILLISECONDS);
+
+Map<String, String> authParams = new HashMap<>();
+authParams.put("tlsCertFile", TLS_CLIENT_CERT_FILE_PATH);
+authParams.put("tlsKeyFile", TLS_CLIENT_KEY_FILE_PATH);
+clientBuilder.authentication(AuthenticationTls.class.getName(), authParams);
+
+PulsarClient pulsarClient = clientBuilder.build();
+```
+
+#### Pulsar geo-replication with SNI routing
+
+We can also use ATS proxy for geo-replication. The Pulsar broker can connect to cross colo brokers for geo-replication using SNI routing. In order to enable SNI routing for cross cluster broker connection, we have to configure SNI proxy URL to the cluster metadata. If the cluster metadata has SNI proxy URL configured, the broker connects to cross cluster broker through the proxy over SNI routing.
+
+![Pulsar client SNI](assets/pulsar-sni-geo.png)
+
+In this example, we have a Pulsar cluster deployed into two separate regions, us-west and us-east. We have also configured ATS proxy in both regions and brokers in each region run behind this ATS proxy. Now, we configure the cluster metadata for both the clusters, so brokers in one cluster can use SNI routing and connect to brokers in other clusters through the ATS proxy.
+
+(a) Configure the cluster metadata for us-east with us-east broker service URL and us-east ATS proxy URL with SNI proxy-protocol.
+
+```
+./pulsar-admin clusters update \
+--broker-url-secure pulsar+ssl://east-broker-vip:6651 \
+--url http://east-broker-vip:8080 \
+--proxy-protocol SNI \
+--proxy-url pulsar+ssl://east-ats-proxy:443
+```
+
+(b) Configure the cluster metadata for us-west with us-west broker service URL and us-west ATS proxy URL with SNI proxy-protocol.
+
+```
+./pulsar-admin clusters update \
+--broker-url-secure pulsar+ssl://west-broker-vip:6651 \
+--url http://west-broker-vip:8080 \
+--proxy-protocol SNI \
+--proxy-url pulsar+ssl://west-ats-proxy:443
+```


[pulsar] 11/14: [pulsar-cli] fix update-cluster cli updates proxy-url (#7422)

Posted by rx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 2a65167529e02d9f76ecef673430d04357d2ddfc
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Wed Jul 1 23:21:16 2020 -0700

    [pulsar-cli] fix update-cluster cli updates proxy-url (#7422)
    
    ### Motivation
    Pulsar-admin CLI doesn't update cluster-metadata with proxy-url and proxy-protocol.
    
    ### Modification
    Fix CLI to update proxy metadata.
    
    (cherry picked from commit 1e152bd1ef1d3c4b48de0179c66efa899ea34330)
---
 .../java/org/apache/pulsar/broker/admin/AdminApiTest2.java | 14 ++++++++++++++
 .../main/java/org/apache/pulsar/admin/cli/CmdClusters.java | 10 ++++++++--
 .../apache/pulsar/common/policies/data/ClusterData.java    |  2 ++
 3 files changed, 24 insertions(+), 2 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
index 1fad0f8..ba246ea 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
@@ -62,6 +62,7 @@ import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProxyProtocol;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
@@ -1269,4 +1270,17 @@ public class AdminApiTest2 extends MockedPulsarServiceBaseTest {
         }
     }
 
+    @Test
+    public void testUpdateClusterWithProxyUrl() throws Exception {
+        ClusterData cluster = new ClusterData(pulsar.getWebServiceAddress());
+        String clusterName = "test2";
+        admin.clusters().createCluster(clusterName, cluster);
+        Assert.assertEquals(admin.clusters().getCluster(clusterName), cluster);
+
+        // update
+        cluster.setProxyServiceUrl("proxy");
+        cluster.setProxyProtocol(ProxyProtocol.SNI);
+        admin.clusters().updateCluster(clusterName, cluster);
+        Assert.assertEquals(admin.clusters().getCluster(clusterName), cluster);
+    }
 }
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdClusters.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdClusters.java
index 75f80ee..8ea2e36 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdClusters.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdClusters.java
@@ -101,10 +101,16 @@ public class CmdClusters extends CmdBase {
         @Parameter(names = "--broker-url-secure", description = "broker-service-url for secure connection", required = false)
         private String brokerServiceUrlTls;
 
+        @Parameter(names = "--proxy-url", description = "Proxy-service url when client would like to connect to broker via proxy.", required = false)
+        private String proxyServiceUrl;
+
+        @Parameter(names = "--proxy-protocol", description = "protocol to decide type of proxy routing eg: SNI", required = false)
+        private ProxyProtocol proxyProtocol;
+
         void run() throws PulsarAdminException {
             String cluster = getOneArgument(params);
-            admin.clusters().updateCluster(cluster,
-                    new ClusterData(serviceUrl, serviceUrlTls, brokerServiceUrl, brokerServiceUrlTls));
+            admin.clusters().updateCluster(cluster, new ClusterData(serviceUrl, serviceUrlTls, brokerServiceUrl,
+                    brokerServiceUrlTls, proxyServiceUrl, proxyProtocol));
         }
     }
 
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ClusterData.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ClusterData.java
index 35cecbb..4f17811 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ClusterData.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ClusterData.java
@@ -117,6 +117,8 @@ public class ClusterData {
         this.serviceUrlTls = other.serviceUrlTls;
         this.brokerServiceUrl = other.brokerServiceUrl;
         this.brokerServiceUrlTls = other.brokerServiceUrlTls;
+        this.proxyServiceUrl = other.proxyServiceUrl;
+        this.proxyProtocol = other.proxyProtocol;
     }
 
     public String getServiceUrl() {


[pulsar] 01/14: [C++] Reduce log level for ack-grouping tracker (#7373)

Posted by rx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 828eb43a10989ae327a7af35723d98d0a835c849
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Fri Jun 26 21:12:10 2020 -0700

    [C++] Reduce log level for ack-grouping tracker (#7373)
    
    
    (cherry picked from commit 4138e712a389ff0885e4dddcc92d2176de8076ac)
---
 pulsar-client-cpp/lib/AckGroupingTrackerEnabled.cc | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/pulsar-client-cpp/lib/AckGroupingTrackerEnabled.cc b/pulsar-client-cpp/lib/AckGroupingTrackerEnabled.cc
index 6666a29..5c61cca 100644
--- a/pulsar-client-cpp/lib/AckGroupingTrackerEnabled.cc
+++ b/pulsar-client-cpp/lib/AckGroupingTrackerEnabled.cc
@@ -49,8 +49,8 @@ AckGroupingTrackerEnabled::AckGroupingTrackerEnabled(ClientImplPtr clientPtr, Ha
       executor_(clientPtr->getIOExecutorProvider()->get()),
       timer_(),
       mutexTimer_() {
-    LOG_INFO("ACK grouping is enabled, grouping time " << ackGroupingTimeMs << "ms, grouping max size "
-                                                       << ackGroupingMaxSize);
+    LOG_DEBUG("ACK grouping is enabled, grouping time " << ackGroupingTimeMs << "ms, grouping max size "
+                                                        << ackGroupingMaxSize);
     this->scheduleTimer();
 }
 
@@ -96,7 +96,7 @@ void AckGroupingTrackerEnabled::close() {
 void AckGroupingTrackerEnabled::flush() {
     auto cnx = this->handler_.getCnx().lock();
     if (cnx == nullptr) {
-        LOG_WARN("Connection is not ready, grouping ACK failed.");
+        LOG_DEBUG("Connection is not ready, grouping ACK failed.");
         return;
     }
 


[pulsar] 07/14: Fix batch ackset recycled multiple times. (#7409)

Posted by rx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit eb343fda471244dc22f0ae5cb764484d855a6954
Author: lipenghui <pe...@apache.org>
AuthorDate: Tue Jul 7 00:40:33 2020 +0800

    Fix batch ackset recycled multiple times. (#7409)
    
    * Fix batch ackset recycled multiple times.
    
    * Apply comments.
    
    * Update pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/BitSetRecyclable.java
    
    (cherry picked from commit 25a690734f278299bfdaae118a2c02ecc25c125e)
---
 .../client/impl/BatchMessageIndexAckTest.java      | 31 ++++++++++++++++++++++
 .../PersistentAcknowledgmentsGroupingTracker.java  |  1 +
 .../apache/pulsar/common/protocol/Commands.java    |  3 ---
 3 files changed, 32 insertions(+), 3 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckTest.java
index 114b9ae..3150f10 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckTest.java
@@ -181,4 +181,35 @@ public class BatchMessageIndexAckTest extends ProducerConsumerBase {
         // broker also need to handle the available permits.
         Assert.assertEquals(received.size(), 100);
     }
+
+    @Test
+    public void testDoNotRecycleAckSetMultipleTimes() throws Exception  {
+        final String topic = "persistent://my-property/my-ns/testSafeAckSetRecycle";
+
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .batchingMaxMessages(10)
+                .blockIfQueueFull(true).topic(topic)
+                .create();
+
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .acknowledgmentGroupTime(1, TimeUnit.MILLISECONDS)
+                .topic(topic)
+                .subscriptionName("test")
+                .subscribe();
+
+        final int messages = 100;
+        for (int i = 0; i < messages; i++) {
+            producer.sendAsync("Hello Pulsar".getBytes());
+        }
+
+        // Should not throw an exception.
+        for (int i = 0; i < messages; i++) {
+            consumer.acknowledgeCumulative(consumer.receive());
+            // make sure the group ack flushed.
+            Thread.sleep(2);
+        }
+
+        producer.close();
+        consumer.close();
+    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
index 937f005..6908979 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
@@ -205,6 +205,7 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
         }
 
         final ByteBuf cmd = Commands.newAck(consumer.consumerId, msgId.ledgerId, msgId.entryId, bitSet, ackType, null, properties);
+        bitSet.recycle();
         cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
         return true;
     }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index 0222f3c..7ae21e7 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -961,9 +961,6 @@ public class Commands {
 
         ByteBuf res = serializeWithSize(BaseCommand.newBuilder().setType(Type.ACK).setAck(ack));
         ack.recycle();
-        if (ackSet != null) {
-            ackSet.recycle();
-        }
         ackBuilder.recycle();
         messageIdDataBuilder.recycle();
         messageIdData.recycle();


[pulsar] 09/14: [client authentication] add authentication client with oauth2 support (#7420)

Posted by rx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit c86fe38266eb3181a9942993cecff65fdee1a511
Author: Jia Zhai <zh...@apache.org>
AuthorDate: Thu Jul 2 01:41:31 2020 -0500

    [client authentication] add authentication client with oauth2 support (#7420)
    
    ### Motivation
    
    Pulsar supports authenticating clients using OAuth 2.0 access tokens. You can use tokens to identify a Pulsar client and associate with some "principal" (or "role") that is permitted to do some actions (eg: publish to a topic or consume from a topic).
    
    This module is to support Pulsar Client Authentication Plugin for OAuth 2.0 directly. Client side communicate with Oauth 2.0 server,  then the client will get an `access token` from Oauth 2.0 server, and will pass this `access token` to Pulsar broker to do the authentication.
    
    So the Broker side could still use `org.apache.pulsar.broker.authentication.AuthenticationProviderToken`,
    also user can add their own `AuthenticationProvider` to work with this module.
    
    ### Modifications
    
    - add related code;
    - add related test;
    - add related doc.
    
    The init of this client authentication module would be like:
    ```java
    Authentication oauth2Authentication = AuthenticationFactoryOAuth2.clientCredentials(
                    new URL("https://dev-kt-aa9ne.us.auth0.com/oauth/token"),
                    new URL("file:///path/to/credential/file.json"),  // key file path
                    "https://dev-kt-aa9ne.us.auth0.com/api/v2/"
            );
    PulsarClient client = PulsarClient.builder()
        .serviceUrl("pulsar://broker.example.com:6650/")
        .authentication(oauth2Authentication)
        .build();
    ```
    
    ### Verifying this change
    
    tests passed.
    
    (cherry picked from commit 768813ea595697e317a0ac5af53191e6cf832766)
---
 .../TokenAuthenticatedProducerConsumerTest.java    | 143 ++++++++++++++++++
 ...kenOauth2AuthenticatedProducerConsumerTest.java | 160 +++++++++++++++++++++
 .../authentication/token/credentials_file.json     |   4 +
 .../impl/auth/oauth2/AuthenticationDataOAuth2.java |  60 ++++++++
 .../auth/oauth2/AuthenticationFactoryOAuth2.java   |  47 ++++++
 .../impl/auth/oauth2/AuthenticationOAuth2.java     | 131 +++++++++++++++++
 .../impl/auth/oauth2/ClientCredentialsFlow.java    | 146 +++++++++++++++++++
 .../pulsar/client/impl/auth/oauth2/Flow.java       |  47 ++++++
 .../pulsar/client/impl/auth/oauth2/FlowBase.java   |  80 +++++++++++
 .../pulsar/client/impl/auth/oauth2/KeyFile.java    |  66 +++++++++
 .../pulsar/client/impl/auth/oauth2/Readme.md       |  94 ++++++++++++
 .../client/impl/auth/oauth2/package-info.java      |  19 +++
 .../protocol/ClientCredentialsExchangeRequest.java |  42 ++++++
 .../protocol/ClientCredentialsExchanger.java       |  41 ++++++
 .../oauth2/protocol/DefaultMetadataResolver.java   | 105 ++++++++++++++
 .../client/impl/auth/oauth2/protocol/Metadata.java |  54 +++++++
 .../auth/oauth2/protocol/MetadataResolver.java     |  28 ++++
 .../impl/auth/oauth2/protocol/TokenClient.java     | 121 ++++++++++++++++
 .../impl/auth/oauth2/protocol/TokenError.java      |  41 ++++++
 .../oauth2/protocol/TokenExchangeException.java    |  35 +++++
 .../impl/auth/oauth2/protocol/TokenResult.java     |  51 +++++++
 .../impl/auth/oauth2/protocol/package-info.java    |  19 +++
 .../impl/auth/oauth2/AuthenticationOAuth2Test.java | 122 ++++++++++++++++
 .../pulsar/client/impl/auth/oauth2/MockClock.java  |  97 +++++++++++++
 24 files changed, 1753 insertions(+)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenAuthenticatedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenAuthenticatedProducerConsumerTest.java
new file mode 100644
index 0000000..874a34d
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenAuthenticatedProducerConsumerTest.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import static org.mockito.Mockito.spy;
+
+import com.google.common.collect.Sets;
+import java.net.URI;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+/**
+ * Test Token authentication with:
+ *    client: org.apache.pulsar.client.impl.auth.AuthenticationToken
+ *    broker: org.apache.pulsar.broker.authentication.AuthenticationProviderToken
+ */
+public class TokenAuthenticatedProducerConsumerTest extends ProducerConsumerBase {
+    private static final Logger log = LoggerFactory.getLogger(TokenAuthenticatedProducerConsumerTest.class);
+
+    // pre-create a public/private_key pair.  Public key used for broker to verify client passed in token
+    private final String TOKEN_PUBLIC_KEY = "data:;base64,MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAhHKgdY6arG7eE75bUPtznN5WjMu0sxLq7pI5Aaiw2Ijerbz33iO/Fdd2yJVuAZNDZPD/AVSaeliEh/BP+s2rN8KNuiywD+SlL1NGf2JDS5BvGT4Q8eHfDDRd/iY5zkK58wYwlke6C8fKCx10MTH9iYAJpzaaxs+Tu1RaatK+691aYSiMkYIfgbqAKmSCpK+48al/PkmENfuhzaTBPhCnEblhNvUhS5MjzBcAcGzecpEuVSxUzDtm8rU8DEQR6kkdXS1QnGHVNis/vgk8QzctkJKbtgDIaGzNUmDvTCyPZ8WLWSWJWb1oPxRZwpfXVP69ijU0Rme4/YkuHt6IEw6ANQIDAQAB";
+    // admin token created based on private_key.
+    private final String ADMIN_TOKEN = "eyJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImV4cCI6MTYyNTEzNjQyMn0.DAfbUPZwQURgGvor4scO0NoqoyHkCulKZkhP7kksCWFvgx6B22iKuXGX42EFlFSRMWYYgIJXV7UZATCLCjJpn_ijrO6AWBmooib3f94OPoLDdkF3qXnqaLnvJtl8_sCoLCSghR_O3hQFgQW2GRjMDdfJgl2_HXCWuzedtI5cQJdbpfU0NU10nzo7RtrpCmUdgQYQEHegYOawLqQVvr53ZGjrZilBXY9HHz1mSlnwZGNGVNNdvRthBuGtXtfKgtfSDF5jLqABvK8TUpdNJybibeiOspdzuY19-wVt4eVXzNAGsP4V4Zs91MgIUYV5lWKnBUuVWalppkMWhRF4Jf-KWQ";
+
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        conf.setAuthenticationEnabled(true);
+        conf.setAuthorizationEnabled(true);
+
+        Set<String> superUserRoles = new HashSet<>();
+        superUserRoles.add("admin");
+        conf.setSuperUserRoles(superUserRoles);
+
+        Set<String> providers = new HashSet<>();
+        providers.add(AuthenticationProviderToken.class.getName());
+        conf.setAuthenticationProviders(providers);
+
+        conf.setClusterName("test");
+
+        // Set provider domain name
+        Properties properties = new Properties();
+        properties.setProperty("tokenPublicKey", TOKEN_PUBLIC_KEY);
+
+        conf.setProperties(properties);
+        super.init();
+    }
+
+    // setup both admin and pulsar client
+    protected final void clientSetup() throws Exception {
+        admin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString())
+                .authentication(AuthenticationFactory.token(ADMIN_TOKEN))
+                .build());
+
+        pulsarClient = PulsarClient.builder().serviceUrl(new URI(pulsar.getBrokerServiceUrl()).toString())
+                .statsInterval(0, TimeUnit.SECONDS)
+                .authentication(AuthenticationFactory.token(ADMIN_TOKEN))
+                .build();
+    }
+
+    @AfterMethod
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @DataProvider(name = "batch")
+    public Object[][] codecProvider() {
+        return new Object[][] { { 0 }, { 1000 } };
+    }
+
+    public void testSyncProducerAndConsumer() throws Exception {
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/my-ns/my-topic")
+                .subscriptionName("my-subscriber-name").subscribe();
+
+        ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic");
+
+        Producer<byte[]> producer = producerBuilder.create();
+        for (int i = 0; i < 10; i++) {
+            String message = "my-message-" + i;
+            producer.send(message.getBytes());
+        }
+
+        Message<byte[]> msg = null;
+        Set<String> messageSet = Sets.newHashSet();
+        for (int i = 0; i < 10; i++) {
+            msg = consumer.receive(5, TimeUnit.SECONDS);
+            String receivedMessage = new String(msg.getData());
+            log.debug("Received message: [{}]", receivedMessage);
+            String expectedMessage = "my-message-" + i;
+            testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
+        }
+        // Acknowledge the consumption of all messages at once
+        consumer.acknowledgeCumulative(msg);
+        consumer.close();
+    }
+
+    @Test
+    public void testTokenProducerAndConsumer() throws Exception {
+        log.info("-- Starting {} test --", methodName);
+        clientSetup();
+
+        // test rest by admin
+        admin.clusters().createCluster("test", new ClusterData(brokerUrl.toString()));
+        admin.tenants().createTenant("my-property",
+                new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("test")));
+        admin.namespaces().createNamespace("my-property/my-ns", Sets.newHashSet("test"));
+
+        // test protocol by producer/consumer
+        testSyncProducerAndConsumer();
+
+        log.info("-- Exiting {} test --", methodName);
+    }
+
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java
new file mode 100644
index 0000000..b54d8c9
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import static org.mockito.Mockito.spy;
+
+import com.google.common.collect.Sets;
+import java.net.URI;
+import java.net.URL;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.impl.auth.oauth2.AuthenticationFactoryOAuth2;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+/**
+ * Test Token authentication with:
+ *    client: org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
+ *    broker: org.apache.pulsar.broker.authentication.AuthenticationProviderToken
+ */
+public class TokenOauth2AuthenticatedProducerConsumerTest extends ProducerConsumerBase {
+    private static final Logger log = LoggerFactory.getLogger(TokenOauth2AuthenticatedProducerConsumerTest.class);
+
+    // public key in oauth2 server to verify the client passed in token. get from https://jwt.io/
+    private final String TOKEN_TEST_PUBLIC_KEY = "data:;base64,MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA2tZd/4gJda3U2Pc3tpgRAN7JPGWx/Gn17v/0IiZlNNRbP/Mmf0Vc6G1qsnaRaWNWOR+t6/a6ekFHJMikQ1N2X6yfz4UjMc8/G2FDPRmWjA+GURzARjVhxc/BBEYGoD0Kwvbq/u9CZm2QjlKrYaLfg3AeB09j0btNrDJ8rBsNzU6AuzChRvXj9IdcE/A/4N/UQ+S9cJ4UXP6NJbToLwajQ5km+CnxdGE6nfB7LWHvOFHjn9C2Rb9e37CFlmeKmIVFkagFM0gbmGOb6bnGI8Bp/VNGV0APef4YaBvBTqwoZ1Z4aDHy5eRxXfAMdtBkBupmBXqL6bpd15XRYUbu/7ck9QIDAQAB";
+
+    private final String ADMIN_ROLE = "Xd23RHsUnvUlP7wchjNYOaIfazgeHd9x@clients";
+
+    // Credentials File, which contains "client_id" and "client_secret"
+    private final String CREDENTIALS_FILE = "./src/test/resources/authentication/token/credentials_file.json";
+
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        conf.setAuthenticationEnabled(true);
+        conf.setAuthorizationEnabled(true);
+
+        Set<String> superUserRoles = new HashSet<>();
+        superUserRoles.add(ADMIN_ROLE);
+        conf.setSuperUserRoles(superUserRoles);
+
+        Set<String> providers = new HashSet<>();
+        providers.add(AuthenticationProviderToken.class.getName());
+        conf.setAuthenticationProviders(providers);
+
+        conf.setClusterName("test");
+
+        // Set provider domain name
+        Properties properties = new Properties();
+        properties.setProperty("tokenPublicKey", TOKEN_TEST_PUBLIC_KEY);
+
+        conf.setProperties(properties);
+        super.init();
+    }
+
+    // setup both admin and pulsar client
+    protected final void clientSetup() throws Exception {
+        Path path = Paths.get(CREDENTIALS_FILE).toAbsolutePath();
+        log.info("Credentials File path: {}", path.toString());
+
+        // AuthenticationOAuth2
+        Authentication authentication = AuthenticationFactoryOAuth2.clientCredentials(
+                new URL("https://dev-kt-aa9ne.us.auth0.com/oauth/token"),
+                new URL("file://" + path.toString()),  // key file path
+                "https://dev-kt-aa9ne.us.auth0.com/api/v2/"
+        );
+
+        admin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString())
+                .authentication(authentication)
+                .build());
+
+        pulsarClient = PulsarClient.builder().serviceUrl(new URI(pulsar.getBrokerServiceUrl()).toString())
+                .statsInterval(0, TimeUnit.SECONDS)
+                .authentication(authentication)
+                .build();
+    }
+
+    @AfterMethod
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @DataProvider(name = "batch")
+    public Object[][] codecProvider() {
+        return new Object[][] { { 0 }, { 1000 } };
+    }
+
+    public void testSyncProducerAndConsumer() throws Exception {
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/my-ns/my-topic")
+                .subscriptionName("my-subscriber-name").subscribe();
+
+        ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic");
+
+        Producer<byte[]> producer = producerBuilder.create();
+        for (int i = 0; i < 10; i++) {
+            String message = "my-message-" + i;
+            producer.send(message.getBytes());
+        }
+
+        Message<byte[]> msg = null;
+        Set<String> messageSet = Sets.newHashSet();
+        for (int i = 0; i < 10; i++) {
+            msg = consumer.receive(5, TimeUnit.SECONDS);
+            String receivedMessage = new String(msg.getData());
+            log.debug("Received message: [{}]", receivedMessage);
+            String expectedMessage = "my-message-" + i;
+            testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
+        }
+        // Acknowledge the consumption of all messages at once
+        consumer.acknowledgeCumulative(msg);
+        consumer.close();
+    }
+
+    @Test
+    public void testTokenProducerAndConsumer() throws Exception {
+        log.info("-- Starting {} test --", methodName);
+        clientSetup();
+
+        // test rest by admin
+        admin.clusters().createCluster("test", new ClusterData(brokerUrl.toString()));
+        admin.tenants().createTenant("my-property",
+                new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("test")));
+        admin.namespaces().createNamespace("my-property/my-ns", Sets.newHashSet("test"));
+
+        // test protocol by producer/consumer
+        testSyncProducerAndConsumer();
+
+        log.info("-- Exiting {} test --", methodName);
+    }
+
+}
diff --git a/pulsar-broker/src/test/resources/authentication/token/credentials_file.json b/pulsar-broker/src/test/resources/authentication/token/credentials_file.json
new file mode 100644
index 0000000..db1eccd
--- /dev/null
+++ b/pulsar-broker/src/test/resources/authentication/token/credentials_file.json
@@ -0,0 +1,4 @@
+{
+  "client_id":"Xd23RHsUnvUlP7wchjNYOaIfazgeHd9x",
+  "client_secret":"rT7ps7WY8uhdVuBTKWZkttwLdQotmdEliaM5rLfmgNibvqziZ-g07ZH52N_poGAb"
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationDataOAuth2.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationDataOAuth2.java
new file mode 100644
index 0000000..59810f5
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationDataOAuth2.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.auth.oauth2;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import org.apache.pulsar.client.api.AuthenticationDataProvider;
+
+/**
+ * Provide OAuth 2.0 authentication data.
+ */
+class AuthenticationDataOAuth2 implements AuthenticationDataProvider {
+    public static final String HTTP_HEADER_NAME = "Authorization";
+
+    private final String accessToken;
+    private final Set<Map.Entry<String, String>> headers;
+
+    public AuthenticationDataOAuth2(String accessToken) {
+        this.accessToken = accessToken;
+        this.headers = Collections.singletonMap(HTTP_HEADER_NAME, "Bearer " + accessToken).entrySet();
+    }
+
+    @Override
+    public boolean hasDataForHttp() {
+        return true;
+    }
+
+    @Override
+    public Set<Map.Entry<String, String>> getHttpHeaders() {
+        return this.headers;
+    }
+
+    @Override
+    public boolean hasDataFromCommand() {
+        return true;
+    }
+
+    @Override
+    public String getCommandData() {
+        return this.accessToken;
+    }
+
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java
new file mode 100644
index 0000000..54da5287d
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.auth.oauth2;
+
+import java.net.URL;
+import java.time.Clock;
+import org.apache.pulsar.client.api.Authentication;
+
+/**
+ * Factory class that allows to create {@link Authentication} instances
+ * for OAuth 2.0 authentication methods.
+ */
+public final class AuthenticationFactoryOAuth2 {
+
+    /**
+     * Authenticate with client credentials.
+     *
+     * @param issuerUrl the issuer URL
+     * @param credentialsUrl the credentials URL
+     * @param audience the audience identifier
+     * @return an Authentication object
+     */
+    public static Authentication clientCredentials(URL issuerUrl, URL credentialsUrl, String audience) {
+        ClientCredentialsFlow flow = ClientCredentialsFlow.builder()
+                .issuerUrl(issuerUrl)
+                .privateKey(credentialsUrl.toExternalForm())
+                .audience(audience)
+                .build();
+        return new AuthenticationOAuth2(flow, Clock.systemDefaultZone());
+    }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java
new file mode 100644
index 0000000..f7f41d0
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.auth.oauth2;
+
+import org.apache.pulsar.client.impl.auth.oauth2.protocol.TokenResult;
+import java.io.IOException;
+import java.time.Clock;
+import java.time.Instant;
+import java.util.Map;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.commons.lang.StringUtils;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationDataProvider;
+import org.apache.pulsar.client.api.EncodedAuthenticationParameterSupport;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.AuthenticationUtil;
+
+/**
+ * Pulsar client authentication provider based on OAuth 2.0.
+ */
+@Slf4j
+public class AuthenticationOAuth2 implements Authentication, EncodedAuthenticationParameterSupport {
+
+    public static final String CONFIG_PARAM_TYPE = "type";
+    public static final String TYPE_CLIENT_CREDENTIALS = "client_credentials";
+    public static final String AUTH_METHOD_NAME = "token";
+    public static final double EXPIRY_ADJUSTMENT = 0.9;
+    private static final long serialVersionUID = 1L;
+
+    final Clock clock;
+    Flow flow;
+    transient CachedToken cachedToken;
+
+    public AuthenticationOAuth2() {
+        this.clock = Clock.systemDefaultZone();
+    }
+
+    AuthenticationOAuth2(Flow flow, Clock clock) {
+        this.flow = flow;
+        this.clock = clock;
+    }
+
+    @Override
+    public String getAuthMethodName() {
+        return AUTH_METHOD_NAME;
+    }
+
+    @Override
+    public void configure(String encodedAuthParamString) {
+        if (StringUtils.isBlank(encodedAuthParamString)) {
+            throw new IllegalArgumentException("No authentication parameters were provided");
+        }
+        Map<String, String> params;
+        try {
+            params = AuthenticationUtil.configureFromJsonString(encodedAuthParamString);
+        } catch (IOException e) {
+            throw new IllegalArgumentException("Malformed authentication parameters", e);
+        }
+
+        String type = params.getOrDefault(CONFIG_PARAM_TYPE, TYPE_CLIENT_CREDENTIALS);
+        switch(type) {
+            case TYPE_CLIENT_CREDENTIALS:
+                this.flow = ClientCredentialsFlow.fromParameters(params);
+                break;
+            default:
+                throw new IllegalArgumentException("Unsupported authentication type: " + type);
+        }
+    }
+
+    @Override
+    @Deprecated
+    public void configure(Map<String, String> authParams) {
+        throw new NotImplementedException("Deprecated; use EncodedAuthenticationParameterSupport");
+    }
+
+    @Override
+    public void start() throws PulsarClientException {
+        flow.initialize();
+    }
+
+    @Override
+    public synchronized AuthenticationDataProvider getAuthData() throws PulsarClientException {
+        if (this.cachedToken == null || this.cachedToken.isExpired()) {
+            TokenResult tr = this.flow.authenticate();
+            this.cachedToken = new CachedToken(tr);
+        }
+        return this.cachedToken.getAuthData();
+    }
+
+    @Override
+    public void close() throws IOException {
+        flow.close();
+    }
+
+    @Data
+    class CachedToken {
+        private final TokenResult latest;
+        private final Instant expiresAt;
+        private final AuthenticationDataOAuth2 authData;
+
+        public CachedToken(TokenResult latest) {
+            this.latest = latest;
+            int adjustedExpiresIn = (int) (latest.getExpiresIn() * EXPIRY_ADJUSTMENT);
+            this.expiresAt = AuthenticationOAuth2.this.clock.instant().plusSeconds(adjustedExpiresIn);
+            this.authData = new AuthenticationDataOAuth2(latest.getAccessToken());
+        }
+
+        public boolean isExpired() {
+            return AuthenticationOAuth2.this.clock.instant().isAfter(this.expiresAt);
+        }
+    }
+}
+
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java
new file mode 100644
index 0000000..13bf0f5
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.auth.oauth2;
+
+import org.apache.pulsar.client.impl.auth.oauth2.protocol.ClientCredentialsExchangeRequest;
+import org.apache.pulsar.client.impl.auth.oauth2.protocol.ClientCredentialsExchanger;
+import org.apache.pulsar.client.impl.auth.oauth2.protocol.TokenClient;
+import org.apache.pulsar.client.impl.auth.oauth2.protocol.TokenExchangeException;
+import org.apache.pulsar.client.impl.auth.oauth2.protocol.TokenResult;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.net.URLConnection;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import lombok.Builder;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.http.entity.ContentType;
+import org.apache.pulsar.client.api.PulsarClientException;
+
+/**
+ * Implementation of OAuth 2.0 Client Credentials flow.
+ *
+ * @see <a href="https://tools.ietf.org/html/rfc6749#section-4.4">OAuth 2.0 RFC 6749, section 4.4</a>
+ */
+@Slf4j
+class ClientCredentialsFlow extends FlowBase {
+    public static final String CONFIG_PARAM_ISSUER_URL = "issuerUrl";
+    public static final String CONFIG_PARAM_AUDIENCE = "audience";
+    public static final String CONFIG_PARAM_KEY_FILE = "privateKey";
+
+    private static final long serialVersionUID = 1L;
+
+    private final String audience;
+    private final String privateKey;
+
+    private transient ClientCredentialsExchanger exchanger;
+
+    @Builder
+    public ClientCredentialsFlow(URL issuerUrl, String audience, String privateKey) {
+        super(issuerUrl);
+        this.audience = audience;
+        this.privateKey = privateKey;
+    }
+
+    @Override
+    public void initialize() throws PulsarClientException {
+        super.initialize();
+        assert this.metadata != null;
+
+        URL tokenUrl = this.metadata.getTokenEndpoint();
+        this.exchanger = new TokenClient(tokenUrl);
+    }
+
+    public TokenResult authenticate() throws PulsarClientException {
+        // read the private key from storage
+        KeyFile keyFile;
+        try {
+            keyFile = loadPrivateKey(this.privateKey);
+        } catch (IOException e) {
+            throw new PulsarClientException.AuthenticationException("Unable to read private key: " + e.getMessage());
+        }
+
+        // request an access token using client credentials
+        ClientCredentialsExchangeRequest req = ClientCredentialsExchangeRequest.builder()
+                .clientId(keyFile.getClientId())
+                .clientSecret(keyFile.getClientSecret())
+                .audience(this.audience)
+                .build();
+        TokenResult tr;
+        try {
+            tr = this.exchanger.exchangeClientCredentials(req);
+        } catch (TokenExchangeException | IOException e) {
+            throw new PulsarClientException.AuthenticationException("Unable to obtain an access token: "
+                                                                    + e.getMessage());
+        }
+
+        return tr;
+    }
+
+    @Override
+    public void close() {
+        exchanger.close();
+    }
+
+    /**
+     * Constructs a {@link ClientCredentialsFlow} from configuration parameters.
+     * @param params
+     * @return
+     */
+    public static ClientCredentialsFlow fromParameters(Map<String, String> params) {
+        URL issuerUrl = parseParameterUrl(params, CONFIG_PARAM_ISSUER_URL);
+        String audience = parseParameterString(params, CONFIG_PARAM_AUDIENCE);
+        String privateKeyUrl = parseParameterString(params, CONFIG_PARAM_KEY_FILE);
+        return ClientCredentialsFlow.builder()
+                .issuerUrl(issuerUrl)
+                .audience(audience)
+                .privateKey(privateKeyUrl)
+                .build();
+    }
+
+    /**
+     * Loads the private key from the given URL.
+     * @param privateKeyURL
+     * @return
+     * @throws IOException
+     */
+    private static KeyFile loadPrivateKey(String privateKeyURL) throws IOException {
+        try {
+            URLConnection urlConnection = new org.apache.pulsar.client.api.url.URL(privateKeyURL).openConnection();
+
+            String protocol = urlConnection.getURL().getProtocol();
+            String contentType = urlConnection.getContentType();
+            if ("data".equals(protocol) && !ContentType.APPLICATION_JSON.getMimeType().equals(contentType)) {
+                throw new IllegalArgumentException(
+                        "Unsupported media type or encoding format: " + urlConnection.getContentType());
+            }
+            KeyFile privateKey;
+            try (Reader r = new InputStreamReader((InputStream) urlConnection.getContent(), StandardCharsets.UTF_8)) {
+                privateKey = KeyFile.fromJson(r);
+            }
+            return privateKey;
+        } catch (URISyntaxException | InstantiationException | IllegalAccessException e) {
+            throw new IOException("Invalid privateKey format", e);
+        }
+    }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/Flow.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/Flow.java
new file mode 100644
index 0000000..b572325
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/Flow.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.auth.oauth2;
+
+import org.apache.pulsar.client.impl.auth.oauth2.protocol.TokenResult;
+import java.io.Serializable;
+import org.apache.pulsar.client.api.PulsarClientException;
+
+/**
+ * An OAuth 2.0 authorization flow.
+ */
+interface Flow extends Serializable {
+
+    /**
+     * Initializes the authorization flow.
+     * @throws PulsarClientException if the flow could not be initialized.
+     */
+    void initialize() throws PulsarClientException;
+
+    /**
+     * Acquires an access token from the OAuth 2.0 authorization server.
+     * @return a token result including an access token and optionally a refresh token.
+     * @throws PulsarClientException if authentication failed.
+     */
+    TokenResult authenticate() throws PulsarClientException;
+
+    /**
+     * Closes the authorization flow.
+     */
+    void close();
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/FlowBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/FlowBase.java
new file mode 100644
index 0000000..0f47121
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/FlowBase.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.auth.oauth2;
+
+import org.apache.pulsar.client.impl.auth.oauth2.protocol.DefaultMetadataResolver;
+import org.apache.pulsar.client.impl.auth.oauth2.protocol.Metadata;
+import org.apache.pulsar.client.impl.auth.oauth2.protocol.MetadataResolver;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.Map;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang.StringUtils;
+import org.apache.pulsar.client.api.PulsarClientException;
+
+/**
+ * An abstract OAuth 2.0 authorization flow.
+ */
+@Slf4j
+abstract class FlowBase implements Flow {
+
+    private static final long serialVersionUID = 1L;
+
+    protected final URL issuerUrl;
+
+    protected transient Metadata metadata;
+
+    protected FlowBase(URL issuerUrl) {
+        this.issuerUrl = issuerUrl;
+    }
+
+    public void initialize() throws PulsarClientException {
+        try {
+            this.metadata = createMetadataResolver().resolve();
+        } catch (IOException e) {
+            log.error("Unable to retrieve OAuth 2.0 server metadata", e);
+            throw new PulsarClientException.AuthenticationException("Unable to retrieve OAuth 2.0 server metadata");
+        }
+    }
+
+    protected MetadataResolver createMetadataResolver() {
+        return DefaultMetadataResolver.fromIssuerUrl(issuerUrl);
+    }
+
+    static String parseParameterString(Map<String, String> params, String name) {
+        String s = params.get(name);
+        if (StringUtils.isEmpty(s)) {
+            throw new IllegalArgumentException("Required configuration parameter: " + name);
+        }
+        return s;
+    }
+
+    static URL parseParameterUrl(Map<String, String> params, String name) {
+        String s = params.get(name);
+        if (StringUtils.isEmpty(s)) {
+            throw new IllegalArgumentException("Required configuration parameter: " + name);
+        }
+        try {
+            return new URL(s);
+        } catch (MalformedURLException e) {
+            throw new IllegalArgumentException("Malformed configuration parameter: " + name);
+        }
+    }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/KeyFile.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/KeyFile.java
new file mode 100644
index 0000000..b4a6510
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/KeyFile.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.auth.oauth2;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.io.Reader;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+
+/**
+ * A JSON object representing a credentials file.
+ */
+@Data
+@NoArgsConstructor
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class KeyFile {
+
+    private static ObjectMapper objectMapper = new ObjectMapper();
+
+    @JsonProperty("type")
+    private String type;
+
+    @JsonProperty("client_id")
+    private String clientId;
+
+    @JsonProperty("client_secret")
+    private String clientSecret;
+
+    @JsonProperty("client_email")
+    private String clientEmail;
+
+    @JsonProperty("issuer_url")
+    private String issuerUrl;
+
+    public String toJson() throws IOException {
+        return objectMapper.writeValueAsString(this);
+    }
+
+    public static KeyFile fromJson(String value) throws IOException {
+        return objectMapper.readValue(value, KeyFile.class);
+    }
+
+    public static KeyFile fromJson(Reader value) throws IOException {
+        return objectMapper.readValue(value, KeyFile.class);
+    }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/Readme.md b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/Readme.md
new file mode 100644
index 0000000..cca7973
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/Readme.md
@@ -0,0 +1,94 @@
+# Pulsar Client Authentication Plugin for OAuth 2.0
+
+Pulsar supports authenticating clients using OAuth 2.0 access tokens.
+
+You can use tokens to identify a Pulsar client and associate with some "principal" (or "role") that is permitted
+to do some actions (eg: publish to a topic or consume from a topic). 
+
+This module is to support Pulsar Client Authentication Plugin for OAuth 2.0. And after communicate with Oauth 2.0 server, 
+client will get an `access token` from Oauth 2.0 server, and will pass this `access token` to Pulsar broker to do the authentication.
+So the Broker side could still use `org.apache.pulsar.broker.authentication.AuthenticationProviderToken`,
+also user can add their own `AuthenticationProvider` to work with this module.
+
+## Provider Configuration
+This library allows you to authenticate using an access token that is obtained from an OAuth 2.0 authorization service,
+which acts as a _token issuer_.
+
+### Authentication Types
+The authentication type determines how to obtain an access token via an OAuth 2.0 authorization flow.
+
+#### Client Credentials
+The following parameters are supported:
+
+| Parameter  | Description  | Example |
+|---|---|---|
+| `type` | Oauth 2.0 auth type. Optional. | default: `client_credentials`  |
+| `issuerUrl` | URL of the provider which allows Pulsar to obtain an access token. Required. | `https://accounts.google.com` |
+| `privateKey` | URL to a JSON credentials file (in JSON format; see below). Required. | See "Supported Pattern Formats" |
+| `audience`  | An OAuth 2.0 "resource server" identifier for the Pulsar cluster. Required. | `https://broker.example.com` |
+
+### Supported Pattern Formats of `privateKey`
+The `privateKey` parameter supports the following three pattern formats, and contains client Credentials:
+
+- `file:///path/to/file`
+- `file:/path/to/file`
+- `data:application/json;base64,<base64-encoded value>`
+
+The credentials file contains service account credentials for use with the Client Credentials authentication type.
+
+For example of a credentials file `credentials_file.json`:
+```json
+{
+  "type": "client_credentials",
+  "client_id": "d9ZyX97q1ef8Cr81WHVC4hFQ64vSlDK3",
+  "client_secret": "on1uJ...k6F6R",
+  "client_email": "1234567890-abcdefghijklmnopqrstuvwxyz@developer.gserviceaccount.com",
+  "issuer_url": "https://accounts.google.com"
+}
+```
+
+The default type is `client_credentials`, and for this type, fields "client_id" and "client_secret" is required.
+
+### Example for a typical original Oauth2 request mapping
+
+A typical original Oauth2 request, which used to get access token from Oauth2 server, is like this: 
+
+```bash
+curl --request POST \
+  --url https://dev-kt-aa9ne.us.auth0.com/oauth/token \
+  --header 'content-type: application/json' \
+  --data '{
+  "client_id":"Xd23RHsUnvUlP7wchjNYOaIfazgeHd9x",
+  "client_secret":"rT7ps7WY8uhdVuBTKWZkttwLdQotmdEliaM5rLfmgNibvqziZ-g07ZH52N_poGAb",
+  "audience":"https://dev-kt-aa9ne.us.auth0.com/api/v2/",
+  "grant_type":"client_credentials"}'
+```
+
+In which,
+- `issuerUrl` parameter in this plugin is mapped to `--url https://dev-kt-aa9ne.us.auth0.com/oauth/token`
+- `privateKey` file parameter in this plugin should at least contains fields `client_id` and `client_secret`.
+- `audience` parameter in this plugin is mapped to  `"audience":"https://dev-kt-aa9ne.us.auth0.com/api/v2/"`
+
+## Pulsar Client Config
+You can use the provider with the following Pulsar clients.
+
+### Java
+You can use the factory method:
+```java
+PulsarClient client = PulsarClient.builder()
+    .serviceUrl("pulsar://broker.example.com:6650/")
+    .authentication(
+        AuthenticationFactoryOAuth2.clientCredentials(this.issuerUrl, this.credentialsUrl, this.audience))
+    .build();
+```
+
+Similarly, you can use encoded parameters:
+```java
+Authentication auth = AuthenticationFactory
+    .create(AuthenticationOAuth2.class.getName(), "{"type":"client_credentials","privateKey":"...","issuerUrl":"...","audience":"..."}");
+
+PulsarClient client = PulsarClient.builder()
+    .serviceUrl("pulsar://broker.example.com:6650/")
+    .authentication(auth)
+    .build();
+```
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/package-info.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/package-info.java
new file mode 100644
index 0000000..3beeda0
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/package-info.java
@@ -0,0 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.auth.oauth2;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/ClientCredentialsExchangeRequest.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/ClientCredentialsExchangeRequest.java
new file mode 100644
index 0000000..7c14296
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/ClientCredentialsExchangeRequest.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.auth.oauth2.protocol;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import lombok.Builder;
+import lombok.Data;
+
+/**
+ * A token request based on the exchange of client credentials.
+ *
+ * @see <a href="https://tools.ietf.org/html/rfc6749#section-4.4">OAuth 2.0 RFC 6749, section 4.4</a>
+ */
+@Data
+@Builder
+public class ClientCredentialsExchangeRequest {
+
+    @JsonProperty("client_id")
+    private String clientId;
+
+    @JsonProperty("client_secret")
+    private String clientSecret;
+
+    @JsonProperty("audience")
+    private String audience;
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/ClientCredentialsExchanger.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/ClientCredentialsExchanger.java
new file mode 100644
index 0000000..e6a956a
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/ClientCredentialsExchanger.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.auth.oauth2.protocol;
+
+import java.io.IOException;
+
+/**
+ * An interface for exchanging client credentials for an access token.
+ */
+public interface ClientCredentialsExchanger {
+    /**
+     * Requests an exchange of client credentials for an access token.
+     * @param req the request details.
+     * @return an access token.
+     * @throws TokenExchangeException if the OAuth server returned a detailed error.
+     * @throws IOException if a general IO error occurred.
+     */
+    TokenResult exchangeClientCredentials(ClientCredentialsExchangeRequest req)
+            throws TokenExchangeException, IOException;
+
+    /**
+     * Closes the exchanger.
+     */
+    void close();
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/DefaultMetadataResolver.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/DefaultMetadataResolver.java
new file mode 100644
index 0000000..d16ce8b
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/DefaultMetadataResolver.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.auth.oauth2.protocol;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLConnection;
+import java.time.Duration;
+
+/**
+ * Resolves OAuth 2.0 authorization server metadata as described in RFC 8414.
+ */
+public class DefaultMetadataResolver implements MetadataResolver {
+
+    private final URL metadataUrl;
+    private final ObjectReader objectReader;
+    private Duration connectTimeout;
+    private Duration readTimeout;
+
+    public DefaultMetadataResolver(URL metadataUrl) {
+        this.metadataUrl = metadataUrl;
+        this.objectReader = new ObjectMapper().readerFor(Metadata.class);
+    }
+
+    public DefaultMetadataResolver withConnectTimeout(Duration connectTimeout) {
+        this.connectTimeout = connectTimeout;
+        return this;
+    }
+
+    public DefaultMetadataResolver withReadTimeout(Duration readTimeout) {
+        this.readTimeout = readTimeout;
+        return this;
+    }
+
+    /**
+     * Resolves the authorization metadata.
+     * @return metadata
+     * @throws IOException if the metadata could not be resolved.
+     */
+    public Metadata resolve() throws IOException {
+        try {
+            URLConnection c = this.metadataUrl.openConnection();
+            if (connectTimeout != null) {
+                c.setConnectTimeout((int) connectTimeout.toMillis());
+            }
+            if (readTimeout != null) {
+                c.setReadTimeout((int) readTimeout.toMillis());
+            }
+            c.setRequestProperty("Accept", "application/json");
+
+            Metadata metadata;
+            try (InputStream inputStream = c.getInputStream()) {
+                metadata = this.objectReader.readValue(inputStream);
+            }
+            return metadata;
+
+        } catch (IOException e) {
+            throw new IOException("Cannot obtain authorization metadata from " + metadataUrl.toString(), e);
+        }
+    }
+
+    /**
+     * Gets a well-known metadata URL for the given OAuth issuer URL.
+     * @param issuerUrl The authorization server's issuer identifier
+     * @return a resolver
+     */
+    public static DefaultMetadataResolver fromIssuerUrl(URL issuerUrl) {
+        return new DefaultMetadataResolver(getWellKnownMetadataUrl(issuerUrl));
+    }
+
+    /**
+     * Gets a well-known metadata URL for the given OAuth issuer URL.
+     * @see <a href="https://tools.ietf.org/id/draft-ietf-oauth-discovery-08.html#ASConfig">
+     *     OAuth Discovery: Obtaining Authorization Server Metadata</a>
+     * @param issuerUrl The authorization server's issuer identifier
+     * @return a URL
+     */
+    public static URL getWellKnownMetadataUrl(URL issuerUrl) {
+        try {
+            return new URL(issuerUrl, "/.well-known/openid-configuration");
+        } catch (MalformedURLException e) {
+            throw new IllegalArgumentException(e);
+        }
+    }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/Metadata.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/Metadata.java
new file mode 100644
index 0000000..93f65be
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/Metadata.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.auth.oauth2.protocol;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.net.URL;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * Represents OAuth 2.0 Server Metadata.
+ */
+@Data
+@NoArgsConstructor
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class Metadata {
+    @JsonProperty("issuer")
+    private URL issuer;
+
+    @JsonProperty("authorization_endpoint")
+    private URL authorizationEndpoint;
+
+    @JsonProperty("token_endpoint")
+    private URL tokenEndpoint;
+
+    @JsonProperty("userinfo_endpoint")
+    private URL userInfoEndpoint;
+
+    @JsonProperty("revocation_endpoint")
+    private URL revocationEndpoint;
+
+    @JsonProperty("jwks_uri")
+    private URL jwksUri;
+
+    @JsonProperty("device_authorization_endpoint")
+    private URL deviceAuthorizationEndpoint;
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/MetadataResolver.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/MetadataResolver.java
new file mode 100644
index 0000000..85a6a0b
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/MetadataResolver.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.auth.oauth2.protocol;
+
+import java.io.IOException;
+
+/**
+ * Resolves OAuth 2.0 authorization server metadata as described in RFC 8414.
+ */
+public interface MetadataResolver {
+    Metadata resolve() throws IOException;
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java
new file mode 100644
index 0000000..715579d
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.auth.oauth2.protocol;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.http.Consts;
+import org.apache.http.HttpEntity;
+import org.apache.http.NameValuePair;
+import org.apache.http.StatusLine;
+import org.apache.http.client.ClientProtocolException;
+import org.apache.http.client.HttpResponseException;
+import org.apache.http.client.entity.UrlEncodedFormEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.ContentType;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.message.BasicNameValuePair;
+import org.apache.http.util.EntityUtils;
+
+/**
+ * A client for an OAuth 2.0 token endpoint.
+ */
+public class TokenClient implements AutoCloseable, ClientCredentialsExchanger {
+
+    private static final ObjectReader resultReader;
+    private static final ObjectReader errorReader;
+
+    static {
+        resultReader = new ObjectMapper().readerFor(TokenResult.class);
+        errorReader = new ObjectMapper().readerFor(TokenError.class);
+    }
+
+    private final URL tokenUrl;
+    private final CloseableHttpClient httpclient;
+
+    public TokenClient(URL tokenUrl) {
+        this.tokenUrl = tokenUrl;
+        this.httpclient = HttpClientBuilder.create().useSystemProperties().disableCookieManagement().build();
+    }
+
+    public void close() {
+    }
+
+    /**
+     * Performs a token exchange using client credentials.
+     * @param req the client credentials request details.
+     * @return a token result
+     * @throws TokenExchangeException
+     */
+    public TokenResult exchangeClientCredentials(ClientCredentialsExchangeRequest req)
+            throws TokenExchangeException, IOException {
+        List<NameValuePair> params = new ArrayList<>(4);
+        params.add(new BasicNameValuePair("grant_type", "client_credentials"));
+        params.add(new BasicNameValuePair("client_id", req.getClientId()));
+        params.add(new BasicNameValuePair("client_secret", req.getClientSecret()));
+        params.add(new BasicNameValuePair("audience", req.getAudience()));
+        HttpPost post = new HttpPost(tokenUrl.toString());
+        post.setHeader("Accept", ContentType.APPLICATION_JSON.getMimeType());
+        post.setEntity(new UrlEncodedFormEntity(params, Consts.UTF_8));
+
+        try (CloseableHttpResponse response = httpclient.execute(post)) {
+            StatusLine status = response.getStatusLine();
+            HttpEntity entity = response.getEntity();
+            try {
+                switch(status.getStatusCode()) {
+                    case HttpURLConnection.HTTP_OK:
+                        return readResponse(entity, resultReader);
+                    case HttpURLConnection.HTTP_BAD_REQUEST:
+                    case HttpURLConnection.HTTP_UNAUTHORIZED:
+                        throw new TokenExchangeException(readResponse(entity, errorReader));
+                    default:
+                        throw new HttpResponseException(status.getStatusCode(), status.getReasonPhrase());
+                }
+            } finally {
+                EntityUtils.consume(entity);
+            }
+        }
+    }
+
+    private static <T> T readResponse(HttpEntity entity, ObjectReader objectReader) throws IOException {
+        ContentType contentType = ContentType.getOrDefault(entity);
+        if (!ContentType.APPLICATION_JSON.getMimeType().equals(contentType.getMimeType())) {
+            throw new ClientProtocolException("Unsupported content type: " + contentType.getMimeType());
+        }
+        Charset charset = contentType.getCharset();
+        if (charset == null) {
+            charset = StandardCharsets.UTF_8;
+        }
+        try (Reader reader = new InputStreamReader(entity.getContent(), charset)) {
+            @SuppressWarnings("unchecked") T obj = (T) objectReader.readValue(reader);
+            return obj;
+        }
+    }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenError.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenError.java
new file mode 100644
index 0000000..5f050a1
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenError.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.auth.oauth2.protocol;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * Represents an error returned from an OAuth 2.0 token endpoint.
+ */
+@Data
+@NoArgsConstructor
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class TokenError {
+    @JsonProperty("error")
+    private String error;
+
+    @JsonProperty("error_description")
+    private String errorDescription;
+
+    @JsonProperty("error_uri")
+    private String errorUri;
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenExchangeException.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenExchangeException.java
new file mode 100644
index 0000000..286051f
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenExchangeException.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.auth.oauth2.protocol;
+
+/**
+ * Indicates a token exchange failure.
+ */
+public class TokenExchangeException extends Exception {
+    private TokenError error;
+
+    public TokenExchangeException(TokenError error) {
+        super(String.format("%s (%s)", error.getErrorDescription(), error.getError()));
+        this.error = error;
+    }
+
+    public TokenError getError() {
+        return error;
+    }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenResult.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenResult.java
new file mode 100644
index 0000000..8b333c0
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenResult.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.auth.oauth2.protocol;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.io.Serializable;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * The result of a token exchange request.
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class TokenResult implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    @JsonProperty("access_token")
+    private String accessToken;
+
+    @JsonProperty("id_token")
+    private String idToken;
+
+    @JsonProperty("refresh_token")
+    private String refreshToken;
+
+    @JsonProperty("expires_in")
+    private int expiresIn;
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/package-info.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/package-info.java
new file mode 100644
index 0000000..2068111
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/package-info.java
@@ -0,0 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.auth.oauth2.protocol;
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2Test.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2Test.java
new file mode 100644
index 0000000..f45c2c0
--- /dev/null
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2Test.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.auth.oauth2;
+
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.ZoneOffset;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pulsar.client.api.AuthenticationDataProvider;
+import org.apache.pulsar.client.impl.auth.oauth2.protocol.TokenResult;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Tests {@link AuthenticationOAuth2}.
+ */
+public class AuthenticationOAuth2Test {
+    private static final String TEST_ACCESS_TOKEN = "x.y.z";
+    private static final int TEST_EXPIRES_IN = 60;
+
+    private MockClock clock;
+    private Flow flow;
+    private AuthenticationOAuth2 auth;
+
+    @BeforeMethod
+    public void before() {
+        this.clock = new MockClock(Instant.EPOCH, ZoneOffset.UTC);
+        this.flow = mock(Flow.class);
+        this.auth = new AuthenticationOAuth2(flow, this.clock);
+    }
+
+    @Test
+    public void testGetAuthMethodName() {
+        assertEquals(this.auth.getAuthMethodName(), "token");
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = ".*No.*")
+    public void testConfigureNoParams() throws Exception {
+        this.auth.configure("");
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = ".*Malformed.*")
+    public void testConfigureMalformed() throws Exception {
+        this.auth.configure("{garbage}");
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = ".*Required.*")
+    public void testConfigureRequired() throws Exception {
+        this.auth.configure("{}");
+    }
+
+    @Test
+    public void testConfigure() throws Exception {
+        Map<String, String> params = new HashMap<>();
+        params.put("type", "client_credentials");
+        params.put("privateKey", "data:base64,e30=");
+        params.put("issuerUrl", "http://localhost");
+        params.put("audience", "http://localhost");
+        ObjectMapper mapper = new ObjectMapper();
+        String authParams = mapper.writeValueAsString(params);
+        this.auth.configure(authParams);
+        assertNotNull(this.auth.flow);
+    }
+
+    @Test
+    public void testStart() throws Exception {
+        this.auth.start();
+        verify(this.flow).initialize();
+    }
+
+    @Test
+    public void testGetAuthData() throws Exception {
+        AuthenticationDataProvider data;
+        TokenResult tr = TokenResult.builder().accessToken(TEST_ACCESS_TOKEN).expiresIn(TEST_EXPIRES_IN).build();
+        doReturn(tr).when(this.flow).authenticate();
+        data = this.auth.getAuthData();
+        verify(this.flow, times(1)).authenticate();
+        assertEquals(data.getCommandData(), tr.getAccessToken());
+
+        // cache hit
+        data = this.auth.getAuthData();
+        verify(this.flow, times(1)).authenticate();
+        assertEquals(data.getCommandData(), tr.getAccessToken());
+
+        // cache miss
+        clock.advance(Duration.ofSeconds(TEST_EXPIRES_IN));
+        data = this.auth.getAuthData();
+        verify(this.flow, times(2)).authenticate();
+        assertEquals(data.getCommandData(), tr.getAccessToken());
+    }
+
+    @Test
+    public void testClose() throws Exception {
+        this.auth.close();
+        verify(this.flow).close();
+    }
+}
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/MockClock.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/MockClock.java
new file mode 100644
index 0000000..1e23311
--- /dev/null
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/MockClock.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.auth.oauth2;
+
+import java.io.Serializable;
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.ZoneId;
+
+/**
+ * A clock where the current instant is manually adjustable.
+ */
+public class MockClock extends Clock implements Serializable {
+    private static final long serialVersionUID = 1L;
+    private Instant instant;
+    private final ZoneId zone;
+
+    public MockClock(Instant fixedInstant, ZoneId zone) {
+        this.instant = fixedInstant;
+        this.zone = zone;
+    }
+
+    @Override
+    public ZoneId getZone() {
+        return zone;
+    }
+
+    @Override
+    public Clock withZone(ZoneId zone) {
+        if (zone.equals(this.zone)) {
+            return this;
+        }
+        return new MockClock(instant, zone);
+    }
+
+    @Override
+    public long millis() {
+        return instant.toEpochMilli();
+    }
+
+    @Override
+    public Instant instant() {
+        return instant;
+    }
+
+    /**
+     * Sets the clock to the given instant.
+     * @param fixedInstant the instant
+     */
+    public void setInstant(Instant fixedInstant) {
+        this.instant = fixedInstant;
+    }
+
+    /**
+     * Advances the clock by the given duration.
+     * @param duration the duration
+     */
+    public void advance(Duration duration) {
+        this.instant = this.instant.plus(duration);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (obj instanceof MockClock) {
+            MockClock other = (MockClock) obj;
+            return instant.equals(other.instant) && zone.equals(other.zone);
+        }
+        return false;
+    }
+
+    @Override
+    public int hashCode() {
+        return instant.hashCode() ^ zone.hashCode();
+    }
+
+    @Override
+    public String toString() {
+        return "MockClock[" + instant + "," + zone + "]";
+    }
+}


[pulsar] 08/14: Decompression payload if needed in KeyShared subscription (#7416)

Posted by rx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit ae246fe7d71495b440a2dc83afe9930f7756ab90
Author: lipenghui <pe...@apache.org>
AuthorDate: Thu Jul 16 09:25:01 2020 +0800

    Decompression payload if needed in KeyShared subscription (#7416)
    
    Decompression payload if needed in KeyShared subscription
    
    (cherry picked from commit ed3583a5bd750661f8643617fc618151f87019b2)
---
 .../broker/service/AbstractBaseDispatcher.java     | 47 +++++-------------
 .../client/api/KeySharedSubscriptionTest.java      | 57 ++++++++++++++++++++++
 .../client/impl/BatchMessageKeyBasedContainer.java |  9 ----
 .../org/apache/pulsar/client/impl/MessageImpl.java | 10 +++-
 .../apache/pulsar/common/protocol/Commands.java    |  8 +++
 5 files changed, 85 insertions(+), 46 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index 18b29f2..7cf9793 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -33,6 +33,8 @@ import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.common.api.proto.PulsarApi;
+import org.apache.pulsar.common.compression.CompressionCodec;
+import org.apache.pulsar.common.compression.CompressionCodecProvider;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.Markers;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
@@ -149,43 +151,16 @@ public abstract class AbstractBaseDispatcher implements Dispatcher {
     public static final String NONE_KEY = "NONE_KEY";
 
     protected byte[] peekStickyKey(ByteBuf metadataAndPayload) {
-        int readerIndex = metadataAndPayload.readerIndex();
+        metadataAndPayload.markReaderIndex();
         PulsarApi.MessageMetadata metadata = Commands.parseMessageMetadata(metadataAndPayload);
-
-        try {
-            if (metadata.hasNumMessagesInBatch()) {
-                // If the message was part of a batch (eg: a batch of 1 message), we need
-                // to read the key from the first single-message-metadata entry
-                PulsarApi.SingleMessageMetadata.Builder singleMessageMetadataBuilder = PulsarApi.SingleMessageMetadata
-                        .newBuilder();
-                ByteBuf singleMessagePayload = Commands.deSerializeSingleMessageInBatch(metadataAndPayload,
-                        singleMessageMetadataBuilder, 0, metadata.getNumMessagesInBatch());
-                try {
-                    if (singleMessageMetadataBuilder.hasOrderingKey()) {
-                        return singleMessageMetadataBuilder.getOrderingKey().toByteArray();
-                    } else if (singleMessageMetadataBuilder.hasPartitionKey()) {
-                        return singleMessageMetadataBuilder.getPartitionKey().getBytes();
-                    }
-                } finally {
-                    singleMessagePayload.release();
-                    singleMessageMetadataBuilder.recycle();
-                }
-            } else {
-                // Message was not part of a batch
-                if (metadata.hasOrderingKey()) {
-                    return metadata.getOrderingKey().toByteArray();
-                } else if (metadata.hasPartitionKey()) {
-                    return metadata.getPartitionKey().getBytes();
-                }
-            }
-
-            return NONE_KEY.getBytes();
-        } catch (IOException e) {
-            // If we fail to deserialize medata, return null key
-            return NONE_KEY.getBytes();
-        } finally {
-            metadataAndPayload.readerIndex(readerIndex);
-            metadata.recycle();
+        metadataAndPayload.resetReaderIndex();
+        byte[] key = NONE_KEY.getBytes();
+        if (metadata.hasOrderingKey()) {
+            return metadata.getOrderingKey().toByteArray();
+        } else if (metadata.hasPartitionKey()) {
+            return metadata.getPartitionKey().getBytes();
         }
+        metadata.recycle();
+        return key;
     }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
index 511f8c2..2a7a20b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
@@ -710,6 +710,63 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
         consumer4.close();
     }
 
+    @Test
+    public void testWithMessageCompression() throws Exception {
+        final String topic = "testWithMessageCompression" + UUID.randomUUID().toString();
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic)
+                .compressionType(CompressionType.LZ4)
+                .create();
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionName("test")
+                .subscriptionType(SubscriptionType.Key_Shared)
+                .subscribe();
+        final int messages = 10;
+        for (int i = 0; i < messages; i++) {
+            producer.send(("Hello Pulsar > " + i).getBytes());
+        }
+        List<Message<byte[]>> receives = new ArrayList<>();
+        for (int i = 0; i < messages; i++) {
+            Message<byte[]> received = consumer.receive();
+            receives.add(received);
+            consumer.acknowledge(received);
+        }
+        Assert.assertEquals(receives.size(), messages);
+        producer.close();
+        consumer.close();
+    }
+
+    @Test
+    public void testAttachKeyToMessageMetadata()
+            throws PulsarClientException {
+        this.conf.setSubscriptionKeySharedEnable(true);
+        String topic = "persistent://public/default/key_shared-" + UUID.randomUUID();
+
+        @Cleanup
+        Consumer<Integer> consumer1 = createConsumer(topic);
+
+        @Cleanup
+        Consumer<Integer> consumer2 = createConsumer(topic);
+
+        @Cleanup
+        Consumer<Integer> consumer3 = createConsumer(topic);
+
+        @Cleanup
+        Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32)
+                .topic(topic)
+                .create();
+
+        for (int i = 0; i < 1000; i++) {
+            producer.newMessage()
+                    .key(String.valueOf(random.nextInt(NUMBER_OF_KEYS)))
+                    .value(i)
+                    .send();
+        }
+
+        receiveAndCheckDistribution(Lists.newArrayList(consumer1, consumer2, consumer3));
+    }
+
     private Consumer<String> createFixedHashRangesConsumer(String topic, String subscription, Range... ranges) throws PulsarClientException {
         return pulsarClient.newConsumer(Schema.STRING)
                 .topic(topic)
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java
index c9328c8..d9c1c6c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java
@@ -208,15 +208,6 @@ class BatchMessageKeyBasedContainer extends AbstractBatchMessageContainer {
         private void addMsg(MessageImpl<?> msg, SendCallback callback) {
             if (messages.size() == 0) {
                 sequenceId = Commands.initBatchMessageMetadata(messageMetadata, msg.getMessageBuilder());
-                if (msg.hasKey()) {
-                    messageMetadata.setPartitionKey(msg.getKey());
-                    if (msg.hasBase64EncodedKey()) {
-                        messageMetadata.setPartitionKeyB64Encoded(true);
-                    }
-                }
-                if (msg.hasOrderingKey()) {
-                    messageMetadata.setOrderingKey(ByteString.copyFrom(msg.getOrderingKey()));
-                }
                 batchedMessageMetadataAndPayload = PulsarByteBufAllocator.DEFAULT
                         .buffer(Math.min(maxBatchSize, ClientCnx.getMaxMessageSize()));
                 firstCallback = callback;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
index a07834f..ac387a4 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
@@ -140,10 +140,18 @@ public class MessageImpl<T> implements Message<T> {
         } else {
             properties = Collections.emptyMap();
         }
-
         if (singleMessageMetadata.hasPartitionKey()) {
             msgMetadataBuilder.setPartitionKeyB64Encoded(singleMessageMetadata.getPartitionKeyB64Encoded());
             msgMetadataBuilder.setPartitionKey(singleMessageMetadata.getPartitionKey());
+        } else if (msgMetadataBuilder.hasPartitionKey()) {
+            msgMetadataBuilder.clearPartitionKey();
+            msgMetadataBuilder.clearPartitionKeyB64Encoded();
+        }
+
+        if (singleMessageMetadata.hasOrderingKey()) {
+            msgMetadataBuilder.setOrderingKey(singleMessageMetadata.getOrderingKey());
+        } else if (msgMetadataBuilder.hasOrderingKey()) {
+            msgMetadataBuilder.clearOrderingKey();
         }
 
         if (singleMessageMetadata.hasEventTime()) {
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index 7ae21e7..d867139 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -1622,6 +1622,14 @@ public class Commands {
         messageMetadata.setPublishTime(builder.getPublishTime());
         messageMetadata.setProducerName(builder.getProducerName());
         messageMetadata.setSequenceId(builder.getSequenceId());
+        // Attach the key to the message metadata.
+        if (builder.hasPartitionKey()) {
+            messageMetadata.setPartitionKey(builder.getPartitionKey());
+            messageMetadata.setPartitionKeyB64Encoded(builder.getPartitionKeyB64Encoded());
+        }
+        if (builder.hasOrderingKey()) {
+            messageMetadata.setOrderingKey(builder.getOrderingKey());
+        }
         if (builder.hasReplicatedFrom()) {
             messageMetadata.setReplicatedFrom(builder.getReplicatedFrom());
         }


[pulsar] 12/14: [Doc]--Add labels to function statefulsets and services (#7428)

Posted by rx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 1fd55cf6cbf8b190e995477c4000559c57eaad63
Author: HuanliMeng <48...@users.noreply.github.com>
AuthorDate: Fri Jul 3 13:54:27 2020 +0800

    [Doc]--Add labels to function statefulsets and services (#7428)
    
    ### Motivation
    
    Pulsar release 2.4.0 supports Adding labels to function statefulsets and services. The code is implemented but the doc is not updated accordingly.
    
    Here is the code update PR: https://github.com/apache/pulsar/pull/4038
    
    Therefore, i make this PR to update docs for releases:
    master/2.5.2/2.5.1/2.5.0/2.4.2/2.4.1/2.4.0
    
    ### Modifications
    
    Add a note in the doc "Setup: Configure Functions runtime" under the Pulsar Functions section
    
    
    
    (cherry picked from commit 4a041698dae58ddc851a7eccdd31e0a350328a24)
---
 site2/docs/functions-runtime.md                                 | 3 +++
 site2/website/versioned_docs/version-2.4.0/functions-runtime.md | 3 +++
 site2/website/versioned_docs/version-2.4.1/functions-runtime.md | 3 +++
 site2/website/versioned_docs/version-2.4.2/functions-runtime.md | 3 +++
 site2/website/versioned_docs/version-2.5.0/functions-runtime.md | 3 +++
 site2/website/versioned_docs/version-2.5.1/functions-runtime.md | 3 +++
 site2/website/versioned_docs/version-2.5.2/functions-runtime.md | 3 +++
 7 files changed, 21 insertions(+)

diff --git a/site2/docs/functions-runtime.md b/site2/docs/functions-runtime.md
index 3b4ce79..5134eb7 100644
--- a/site2/docs/functions-runtime.md
+++ b/site2/docs/functions-runtime.md
@@ -10,6 +10,9 @@ Pulsar Functions support the following methods to run functions.
 - *Process*: Invoke functions in processes forked by Functions Worker.
 - *Kubernetes*: Submit functions as Kubernetes StatefulSets by Functions Worker.
 
+#### Note
+> Pulsar supports adding labels to the Kubernetes StatefulSets and services while launching functions, which facilitates selecting the target Kubernetes objects.
+
 The differences of the thread and process modes are:
 - Thread mode: when a function runs in thread mode, it runs on the same Java virtual machine (JVM) with Functions worker.
 - Process mode: when a function runs in process mode, it runs on the same machine that Functions worker runs.
diff --git a/site2/website/versioned_docs/version-2.4.0/functions-runtime.md b/site2/website/versioned_docs/version-2.4.0/functions-runtime.md
index 7ce699b..e7934e4 100644
--- a/site2/website/versioned_docs/version-2.4.0/functions-runtime.md
+++ b/site2/website/versioned_docs/version-2.4.0/functions-runtime.md
@@ -12,6 +12,9 @@ Pulsar Functions support the following methods to run functions.
 - *Process*: Invoke functions in processes forked by Functions Worker.
 - *Kubernetes*: Submit functions as Kubernetes StatefulSets by Functions Worker.
 
+#### Note
+> Pulsar supports adding labels to the Kubernetes StatefulSets and services while launching functions, which facilitates selecting the target Kubernetes objects.
+
 ## Configure thread runtime
 It is easy to configure *Thread* runtime. In most cases, you do not need to configure anything. You can customize the thread group name with the following settings:
 
diff --git a/site2/website/versioned_docs/version-2.4.1/functions-runtime.md b/site2/website/versioned_docs/version-2.4.1/functions-runtime.md
index ab26cd7..200ade2 100644
--- a/site2/website/versioned_docs/version-2.4.1/functions-runtime.md
+++ b/site2/website/versioned_docs/version-2.4.1/functions-runtime.md
@@ -11,6 +11,9 @@ Pulsar Functions support the following methods to run functions.
 - *Process*: Invoke functions in processes forked by Functions Worker.
 - *Kubernetes*: Submit functions as Kubernetes StatefulSets by Functions Worker.
 
+#### Note
+> Pulsar supports adding labels to the  Kubernetes StatefulSets and services while launching functions, which facilitates selecting the target Kubernetes objects.
+
 The differences of the thread and process modes are:   
 - Thread mode: when a function runs in thread mode, it runs on the same Java virtual machine (JVM) with Functions worker.   
 - Process mode: when a function runs in process mode, it runs on the same machine that Functions worker runs.
diff --git a/site2/website/versioned_docs/version-2.4.2/functions-runtime.md b/site2/website/versioned_docs/version-2.4.2/functions-runtime.md
index 4fb98ad..2d2d7a0 100644
--- a/site2/website/versioned_docs/version-2.4.2/functions-runtime.md
+++ b/site2/website/versioned_docs/version-2.4.2/functions-runtime.md
@@ -11,6 +11,9 @@ Pulsar Functions support the following methods to run functions.
 - *Process*: Invoke functions in processes forked by Functions Worker.
 - *Kubernetes*: Submit functions as Kubernetes StatefulSets by Functions Worker.
 
+#### Note
+> Pulsar supports adding labels to the Kubernetes StatefulSets and services while launching functions, which facilitates selecting the target Kubernetes objects.
+
 The differences of the thread and process modes are:   
 - Thread mode: when a function runs in thread mode, it runs on the same Java virtual machine (JVM) with Functions worker.   
 - Process mode: when a function runs in process mode, it runs on the same machine that Functions worker runs.
diff --git a/site2/website/versioned_docs/version-2.5.0/functions-runtime.md b/site2/website/versioned_docs/version-2.5.0/functions-runtime.md
index bf83fef..2a1e6cb 100644
--- a/site2/website/versioned_docs/version-2.5.0/functions-runtime.md
+++ b/site2/website/versioned_docs/version-2.5.0/functions-runtime.md
@@ -11,6 +11,9 @@ Pulsar Functions support the following methods to run functions.
 - *Process*: Invoke functions in processes forked by Functions Worker.
 - *Kubernetes*: Submit functions as Kubernetes StatefulSets by Functions Worker.
 
+#### Note
+> Pulsar supports adding labels to the Kubernetes StatefulSets and services while launching functions, which facilitates selecting the target Kubernetes objects.
+
 The differences of the thread and process modes are:
 - Thread mode: when a function runs in thread mode, it runs on the same Java virtual machine (JVM) with Functions worker.
 - Process mode: when a function runs in process mode, it runs on the same machine that Functions worker runs.
diff --git a/site2/website/versioned_docs/version-2.5.1/functions-runtime.md b/site2/website/versioned_docs/version-2.5.1/functions-runtime.md
index 6912fdf..06e18c9 100644
--- a/site2/website/versioned_docs/version-2.5.1/functions-runtime.md
+++ b/site2/website/versioned_docs/version-2.5.1/functions-runtime.md
@@ -11,6 +11,9 @@ Pulsar Functions support the following methods to run functions.
 - *Process*: Invoke functions in processes forked by Functions Worker.
 - *Kubernetes*: Submit functions as Kubernetes StatefulSets by Functions Worker.
 
+#### Note
+> Pulsar supports adding labels to the Kubernetes StatefulSets and services while launching functions, which facilitates selecting the target Kubernetes objects.
+
 The differences of the thread and process modes are:
 - Thread mode: when a function runs in thread mode, it runs on the same Java virtual machine (JVM) with Functions worker.
 - Process mode: when a function runs in process mode, it runs on the same machine that Functions worker runs.
diff --git a/site2/website/versioned_docs/version-2.5.2/functions-runtime.md b/site2/website/versioned_docs/version-2.5.2/functions-runtime.md
index 7e28409..edddae6 100644
--- a/site2/website/versioned_docs/version-2.5.2/functions-runtime.md
+++ b/site2/website/versioned_docs/version-2.5.2/functions-runtime.md
@@ -11,6 +11,9 @@ Pulsar Functions support the following methods to run functions.
 - *Process*: Invoke functions in processes forked by Functions Worker.
 - *Kubernetes*: Submit functions as Kubernetes StatefulSets by Functions Worker.
 
+#### Note
+> Pulsar supports adding labels to the Kubernetes StatefulSets and services while launching functions, which facilitates selecting the target Kubernetes objects.
+
 The differences of the thread and process modes are:
 - Thread mode: when a function runs in thread mode, it runs on the same Java virtual machine (JVM) with Functions worker.
 - Process mode: when a function runs in process mode, it runs on the same machine that Functions worker runs.


[pulsar] 02/14: update for pr:Restore clusterDispatchRate policy for compatibility (#7380)

Posted by rx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit b64e02a0939fbcb7056a04d68cee04377a8af743
Author: HuanliMeng <48...@users.noreply.github.com>
AuthorDate: Mon Jun 29 12:10:13 2020 +0800

    update for pr:Restore clusterDispatchRate policy for compatibility (#7380)
    
    
    (cherry picked from commit 717deb85b88d080c4c629920bff5a0104906b9f4)
---
 site2/docs/admin-api-namespaces.md | 11 +++++++++--
 1 file changed, 9 insertions(+), 2 deletions(-)

diff --git a/site2/docs/admin-api-namespaces.md b/site2/docs/admin-api-namespaces.md
index 7cd0c1d..0d19e7e 100644
--- a/site2/docs/admin-api-namespaces.md
+++ b/site2/docs/admin-api-namespaces.md
@@ -554,11 +554,18 @@ admin.namespaces().getRetention(namespace)
 
 #### set dispatch throttling
 
-It sets message dispatch rate for all the topics under a given namespace.
+It sets message dispatch rate for all the topics under a given namespace. 
 Dispatch rate can be restricted by number of message per X seconds (`msg-dispatch-rate`) or by number of message-bytes per X second (`byte-dispatch-rate`).
 dispatch rate is in second and it can be configured with `dispatch-rate-period`. Default value of `msg-dispatch-rate` and `byte-dispatch-rate` is -1 which
 disables the throttling.
 
+#### Note
+> - If neither `clusterDispatchRate` nor `topicDispatchRate` is configured, dispatch throttling is disabled.
+> >
+> - If `topicDispatchRate` is not configured, `clusterDispatchRate` takes effect.
+> > 
+> - If `topicDispatchRate` is configured, `topicDispatchRate` takes effect.
+
 ###### CLI
 
 ```
@@ -669,7 +676,7 @@ $ pulsar-admin namespaces get-subscription-dispatch-rate test-tenant/ns1
 admin.namespaces().getSubscriptionDispatchRate(namespace)
 ```
 
-#### set dispatch throttling for subscription
+#### set dispatch throttling for replicator
 
 It sets message dispatch rate for all the replicator between replication clusters under a given namespace.
 Dispatch rate can be restricted by number of message per X seconds (`msg-dispatch-rate`) or by number of message-bytes per X second (`byte-dispatch-rate`).


[pulsar] 04/14: Catch NPE and detect state doesn't move (#7401)

Posted by rx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 5b3889c114b1e95275f5e1998d21c527b0f24c29
Author: Sijie Guo <si...@apache.org>
AuthorDate: Tue Jun 30 15:32:53 2020 -0700

    Catch NPE and detect state doesn't move (#7401)
    
    
    (cherry picked from commit 86e2610bebb46f479ea221a69782cd8575a04b16)
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 49 +++++++++++++++-------
 1 file changed, 33 insertions(+), 16 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index ea0614d..810b3f8 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -404,7 +404,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
         };
 
         // Create a new ledger to start writing
-        this.lastLedgerCreationInitiationTimestamp = System.nanoTime();
+        this.lastLedgerCreationInitiationTimestamp = System.currentTimeMillis();
         mbean.startDataLedgerCreateOp();
 
         asyncCreateLedger(bookKeeper, config, digestType, (rc, lh, ctx) -> {
@@ -596,13 +596,19 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
             if (log.isDebugEnabled()) {
                 log.debug("[{}] Queue addEntry request", name);
             }
+            if (State.CreatingLedger == state) {
+                long elapsedMs = System.currentTimeMillis() - this.lastLedgerCreationInitiationTimestamp;
+                if (elapsedMs > TimeUnit.SECONDS.toMillis(2 * config.getMetadataOperationsTimeoutSeconds())) {
+                    log.info("[{}] Ledger creation was initiated {} ms ago but it never completed" +
+                        " and creation timeout task didn't kick in as well. Force to fail the create ledger operation ...");
+                    this.createComplete(Code.TimeoutException, null, null);
+                }
+            }
         } else if (state == State.ClosedLedger) {
             // No ledger and no pending operations. Create a new ledger
-            if (log.isDebugEnabled()) {
-                log.debug("[{}] Creating a new ledger", name);
-            }
+            log.info("[{}] Creating a new ledger", name);
             if (STATE_UPDATER.compareAndSet(this, State.ClosedLedger, State.CreatingLedger)) {
-                this.lastLedgerCreationInitiationTimestamp = System.nanoTime();
+                this.lastLedgerCreationInitiationTimestamp = System.currentTimeMillis();
                 mbean.startDataLedgerCreateOp();
                 asyncCreateLedger(bookKeeper, config, digestType, this, Collections.emptyMap());
             }
@@ -1229,8 +1235,8 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
                     metadataMutex.unlock();
                     updateLedgersIdsComplete(stat);
                     synchronized (ManagedLedgerImpl.this) {
-                        mbean.addLedgerSwitchLatencySample(System.nanoTime() - lastLedgerCreationInitiationTimestamp,
-                                TimeUnit.NANOSECONDS);
+                        mbean.addLedgerSwitchLatencySample(System.currentTimeMillis() - lastLedgerCreationInitiationTimestamp,
+                                TimeUnit.MILLISECONDS);
                     }
                 }
 
@@ -1380,11 +1386,9 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
 
         if (!pendingAddEntries.isEmpty()) {
             // Need to create a new ledger to write pending entries
-            if (log.isDebugEnabled()) {
-                log.debug("[{}] Creating a new ledger", name);
-            }
+            log.info("[{}] Creating a new ledger", name);
             STATE_UPDATER.set(this, State.CreatingLedger);
-            this.lastLedgerCreationInitiationTimestamp = System.nanoTime();
+            this.lastLedgerCreationInitiationTimestamp = System.currentTimeMillis();
             mbean.startDataLedgerCreateOp();
             asyncCreateLedger(bookKeeper, config, digestType, this, Collections.emptyMap());
         }
@@ -3172,15 +3176,28 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
         Map<String, byte[]> finalMetadata = new HashMap<>();
         finalMetadata.putAll(ledgerMetadata);
         finalMetadata.putAll(metadata);
-        if (log.isDebugEnabled()) {
-            log.debug("creating ledger, metadata: "+finalMetadata);
-        }
-        bookKeeper.asyncCreateLedger(config.getEnsembleSize(), config.getWriteQuorumSize(), config.getAckQuorumSize(),
+        log.info("[{}] Creating ledger, metadata: {} - metadata ops timeout : {} seconds",
+            name, finalMetadata, config.getMetadataOperationsTimeoutSeconds());
+        try {
+            bookKeeper.asyncCreateLedger(config.getEnsembleSize(), config.getWriteQuorumSize(), config.getAckQuorumSize(),
                 digestType, config.getPassword(), cb, ledgerCreated, finalMetadata);
+        } catch (Throwable cause) {
+            log.error("[{}] Encountered unexpected error when creating ledger",
+                name, cause);
+            cb.createComplete(Code.UnexpectedConditionException, null, ledgerCreated);
+            return;
+        }
         scheduledExecutor.schedule(() -> {
             if (!ledgerCreated.get()) {
-                cb.createComplete(BKException.Code.TimeoutException, null, ledgerCreated);
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] Timeout creating ledger", name);
+                }
+            } else {
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] Ledger already created when timeout task is triggered", name);
+                }
             }
+            cb.createComplete(BKException.Code.TimeoutException, null, ledgerCreated);
         }, config.getMetadataOperationsTimeoutSeconds(), TimeUnit.SECONDS);
     }
 


[pulsar] 05/14: Avoid NPEs at ledger creation when DNS failures happen (#7403)

Posted by rx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 30a52215f832309cbc1913090cae56ace87feaa2
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Tue Jun 30 18:39:01 2020 -0700

    Avoid NPEs at ledger creation when DNS failures happen (#7403)
    
    * Avoid NPEs at ledger creation when DNS failures happen
    
    * Removed unnecessary try/catch
    
    (cherry picked from commit a230427f78e7cc844b2a22ababd05d217d9164bf)
---
 .gitignore                                         |  1 +
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |  4 +--
 .../service/schema/BookkeeperSchemaStorage.java    | 33 +++++++++++++---------
 .../pulsar/compaction/TwoPhaseCompactor.java       | 31 ++++++++++++--------
 4 files changed, 41 insertions(+), 28 deletions(-)

diff --git a/.gitignore b/.gitignore
index f6c7c3a..297f31d 100644
--- a/.gitignore
+++ b/.gitignore
@@ -87,3 +87,4 @@ docker.debug-info
 examples/flink/src/main/java/org/apache/flink/avro/generated
 pulsar-flink/src/test/java/org/apache/flink/avro/generated
 pulsar-client/src/test/java/org/apache/pulsar/client/avro/generated
+/build/
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 66e977f..3e0d583 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -2283,6 +2283,7 @@ public class ManagedCursorImpl implements ManagedCursor {
 
     void createNewMetadataLedger(final VoidCallback callback) {
         ledger.mbean.startCursorLedgerCreateOp();
+
         ledger.asyncCreateLedger(bookkeeper, config, digestType, (rc, lh, ctx) -> {
 
             if (ledger.checkAndCompleteLedgerOpTask(rc, lh, ctx)) {
@@ -2349,7 +2350,6 @@ public class ManagedCursorImpl implements ManagedCursor {
                 });
             }));
         }, LedgerMetadataUtils.buildAdditionalMetadataForCursor(name));
-
     }
 
     private List<LongProperty> buildPropertiesMap(Map<String, Long> properties) {
@@ -2818,7 +2818,7 @@ public class ManagedCursorImpl implements ManagedCursor {
             return null;
         }
     }
-  
+
     void updateReadStats(int readEntriesCount, long readEntriesSize) {
         this.entriesReadCount += readEntriesCount;
         this.entriesReadSize += readEntriesSize;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
index 05a9911..8b31358 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
@@ -53,6 +53,7 @@ import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.common.protocol.schema.SchemaStorage;
 import org.apache.pulsar.common.protocol.schema.SchemaVersion;
 import org.apache.pulsar.common.schema.LongSchemaVersion;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.protocol.schema.StoredSchema;
 import org.apache.pulsar.zookeeper.ZooKeeperCache;
 import org.apache.zookeeper.CreateMode;
@@ -500,20 +501,24 @@ public class BookkeeperSchemaStorage implements SchemaStorage {
     private CompletableFuture<LedgerHandle> createLedger(String schemaId) {
         Map<String, byte[]> metadata = LedgerMetadataUtils.buildMetadataForSchema(schemaId);
         final CompletableFuture<LedgerHandle> future = new CompletableFuture<>();
-        bookKeeper.asyncCreateLedger(
-            config.getManagedLedgerDefaultEnsembleSize(),
-            config.getManagedLedgerDefaultWriteQuorum(),
-            config.getManagedLedgerDefaultAckQuorum(),
-            BookKeeper.DigestType.fromApiDigestType(config.getManagedLedgerDigestType()),
-            LedgerPassword,
-            (rc, handle, ctx) -> {
-                if (rc != BKException.Code.OK) {
-                    future.completeExceptionally(bkException("Failed to create ledger", rc, -1, -1));
-                } else {
-                    future.complete(handle);
-                }
-            }, null, metadata
-        );
+        try {
+            bookKeeper.asyncCreateLedger(
+                    config.getManagedLedgerDefaultEnsembleSize(),
+                    config.getManagedLedgerDefaultWriteQuorum(),
+                    config.getManagedLedgerDefaultAckQuorum(),
+                    BookKeeper.DigestType.fromApiDigestType(config.getManagedLedgerDigestType()),
+                    LedgerPassword,
+                    (rc, handle, ctx) -> {
+                        if (rc != BKException.Code.OK) {
+                            future.completeExceptionally(bkException("Failed to create ledger", rc, -1, -1));
+                        } else {
+                            future.complete(handle);
+                        }
+                    }, null, metadata);
+        } catch (Throwable t) {
+            log.error("[{}] Encountered unexpected error when creating schema ledger", schemaId, t);
+            return FutureUtil.failedFuture(t);
+        }
         return future;
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
index df7c79b..4b7c8bd 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
@@ -47,6 +47,7 @@ import org.apache.pulsar.client.api.RawReader;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.RawBatchConverter;
 import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -310,18 +311,24 @@ public class TwoPhaseCompactor extends Compactor {
 
     private CompletableFuture<LedgerHandle> createLedger(BookKeeper bk, Map<String,byte[]> metadata) {
         CompletableFuture<LedgerHandle> bkf = new CompletableFuture<>();
-        bk.asyncCreateLedger(conf.getManagedLedgerDefaultEnsembleSize(),
-                             conf.getManagedLedgerDefaultWriteQuorum(),
-                             conf.getManagedLedgerDefaultAckQuorum(),
-                             Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE,
-                             Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD,
-                             (rc, ledger, ctx) -> {
-                                 if (rc != BKException.Code.OK) {
-                                     bkf.completeExceptionally(BKException.create(rc));
-                                 } else {
-                                     bkf.complete(ledger);
-                                 }
-                             }, null, metadata);
+
+        try {
+            bk.asyncCreateLedger(conf.getManagedLedgerDefaultEnsembleSize(),
+                    conf.getManagedLedgerDefaultWriteQuorum(),
+                    conf.getManagedLedgerDefaultAckQuorum(),
+                    Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE,
+                    Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD,
+                    (rc, ledger, ctx) -> {
+                        if (rc != BKException.Code.OK) {
+                            bkf.completeExceptionally(BKException.create(rc));
+                        } else {
+                            bkf.complete(ledger);
+                        }
+                    }, null, metadata);
+        } catch (Throwable t) {
+            log.error("Encountered unexpected error when creating compaction ledger", t);
+            return FutureUtil.failedFuture(t);
+        }
         return bkf;
     }
 


[pulsar] 14/14: shaded jclouds to avoid gson conflict

Posted by rx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit f0363e3847c1affe4f1ff92b40336c03a74cbf69
Author: ran <ga...@126.com>
AuthorDate: Mon Jul 6 07:24:13 2020 +0800

    shaded jclouds to avoid gson conflict
    
    Signed-off-by: xiaolong.ran <rx...@apache.org>
---
 jclouds-shaded/pom.xml                             | 140 +++++++++++++++++++++
 pom.xml                                            |  13 +-
 tiered-storage/jcloud/pom.xml                      |  36 ++++--
 .../impl/BlobStoreManagedLedgerOffloader.java      |   6 +-
 .../impl/BlobStoreManagedLedgerOffloaderTest.java  |   2 +-
 tiered-storage/pom.xml                             |   4 +
 6 files changed, 178 insertions(+), 23 deletions(-)

diff --git a/jclouds-shaded/pom.xml b/jclouds-shaded/pom.xml
new file mode 100644
index 0000000..efd8ff9
--- /dev/null
+++ b/jclouds-shaded/pom.xml
@@ -0,0 +1,140 @@
+<?xml version="1.0"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
+    xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.pulsar</groupId>
+    <artifactId>pulsar</artifactId>
+    <version>2.7.0-SNAPSHOT</version>
+    <relativePath>..</relativePath>
+  </parent>
+
+  <artifactId>jclouds-shaded</artifactId>
+  <name>Apache Pulsar :: Jclouds shaded</name>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.jclouds</groupId>
+      <artifactId>jclouds-allblobstore</artifactId>
+      <version>${jclouds.version}</version>
+    </dependency>
+  </dependencies>
+
+  <dependencyManagement>
+    <dependencies>
+      <!-- JClouds still is using Guava 18.0 and it won't work with newer versions -->
+      <dependency>
+        <groupId>com.google.guava</groupId>
+        <artifactId>guava</artifactId>
+        <version>18.0</version>
+      </dependency>
+    </dependencies>
+  </dependencyManagement>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-shade-plugin</artifactId>
+        <executions>
+          <execution>
+            <phase>package</phase>
+            <goals>
+              <goal>shade</goal>
+            </goals>
+            <configuration>
+              <createDependencyReducedPom>true</createDependencyReducedPom>
+              <promoteTransitiveDependencies>true</promoteTransitiveDependencies>
+              <minimizeJar>false</minimizeJar>
+
+              <artifactSet>
+                <includes>
+                  <include>com.google.guava:guava</include>
+                  <include>org.apache.jclouds:*</include>
+                  <include>org.apache.jclouds.api:*</include>
+                  <include>org.apache.jclouds.common:*</include>
+                  <include>org.apache.jclouds.provider:*</include>
+                  <include>com.google.inject.extensions:guice-assistedinject</include>
+                  <include>com.google.inject:guice</include>
+                  <include>com.google.inject.extensions:guice-multibindings</include>
+                  <include>javax.ws.rs:*</include>
+                  <include>com.jamesmurty.utils:*</include>
+                  <include>net.iharder:*</include>
+                  <include>aopalliance:*</include>
+                  <include>javax.inject:*</include>
+                  <include>javax.annotation:*</include>
+                  <include>com.google.errorprone:*</include>
+                </includes>
+              </artifactSet>
+
+              <relocations>
+                <relocation>
+                  <pattern>com.google.gson.internal</pattern>
+                  <shadedPattern>org.jclouds.json.gson.internal</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>com.google</pattern>
+                  <shadedPattern>org.apache.pulsar.jcloud.shade.com.google</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>javax.ws</pattern>
+                  <shadedPattern>org.apache.pulsar.jcloud.shade.javax.ws</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>com.jamesmurty.utils</pattern>
+                  <shadedPattern>org.apache.pulsar.jcloud.shade.com.jamesmurty.utils</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>aopalliance</pattern>
+                  <shadedPattern>org.apache.pulsar.jcloud.shade.aopalliance</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>net.iharder</pattern>
+                  <shadedPattern>org.apache.pulsar.jcloud.shade.net.iharder</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>javax.inject</pattern>
+                  <shadedPattern>org.apache.pulsar.jcloud.shade.javax.inject</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>javax.annotation</pattern>
+                  <shadedPattern>org.apache.pulsar.jcloud.shade.javax.annotation</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>com.google.errorprone</pattern>
+                  <shadedPattern>org.apache.pulsar.jcloud.shade.com.google.errorprone</shadedPattern>
+                </relocation>
+
+              </relocations>
+              <transformers>
+                <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+                <transformer implementation="org.apache.maven.plugins.shade.resource.PluginXmlResourceTransformer"/>
+              </transformers>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+</project>
diff --git a/pom.xml b/pom.xml
index bac783a..822dd65 100644
--- a/pom.xml
+++ b/pom.xml
@@ -133,6 +133,7 @@ flexible messaging model and an intuitive client API.</description>
     <module>docker</module>
     <module>tests</module>
     <module>pulsar-metadata</module>
+    <module>jclouds-shaded</module>
   </modules>
 
   <issueManagement>
@@ -1005,18 +1006,6 @@ flexible messaging model and an intuitive client API.</description>
         <version>${commons.collections.version}</version>
       </dependency>
 
-      <dependency>
-        <groupId>org.apache.jclouds</groupId>
-        <artifactId>jclouds-allblobstore</artifactId>
-        <version>${jclouds.version}</version>
-      </dependency>
-
-      <dependency>
-        <groupId>org.apache.jclouds</groupId>
-        <artifactId>jclouds-blobstore</artifactId>
-        <version>${jclouds.version}</version>
-      </dependency>
-
       <!-- test dependencies -->
       <dependency>
         <groupId>com.lmax</groupId>
diff --git a/tiered-storage/jcloud/pom.xml b/tiered-storage/jcloud/pom.xml
index 9e68965..051e16e 100644
--- a/tiered-storage/jcloud/pom.xml
+++ b/tiered-storage/jcloud/pom.xml
@@ -40,13 +40,35 @@
     </dependency>
 
     <dependency>
-      <groupId>org.apache.jclouds</groupId>
-      <artifactId>jclouds-allblobstore</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.jclouds</groupId>
-      <artifactId>jclouds-blobstore</artifactId>
+      <groupId>org.apache.pulsar</groupId>
+      <artifactId>jclouds-shaded</artifactId>
+      <version>${pulsar.jclouds.shaded.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>com.google.code.gson</groupId>
+          <artifactId>gson</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.google.guava</groupId>
+          <artifactId>guava</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.jclouds</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.jclouds.api</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.jclouds.common</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.jclouds.provider</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
 
     <dependency>
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
index 6bb5b21..76d039a 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
@@ -23,7 +23,6 @@ import com.amazonaws.auth.AWSCredentialsProvider;
 import com.amazonaws.auth.AWSSessionCredentials;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Strings;
-import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
@@ -50,6 +49,7 @@ import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlock;
 import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockBuilder;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.common.policies.data.OffloadPolicies;
+import org.apache.pulsar.jcloud.shade.com.google.common.base.Supplier;
 import org.jclouds.Constants;
 import org.jclouds.ContextBuilder;
 import org.jclouds.aws.domain.SessionCredentials;
@@ -69,10 +69,10 @@ import org.jclouds.googlecloud.GoogleCredentialsFromJson;
 import org.jclouds.googlecloudstorage.GoogleCloudStorageProviderMetadata;
 import org.jclouds.io.Payload;
 import org.jclouds.io.Payloads;
-import org.jclouds.osgi.ProviderRegistry;
-import org.jclouds.s3.reference.S3Constants;
 import org.jclouds.osgi.ApiRegistry;
+import org.jclouds.osgi.ProviderRegistry;
 import org.jclouds.s3.S3ApiMetadata;
+import org.jclouds.s3.reference.S3Constants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
index ed46a01..8aa977a 100644
--- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
+++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
@@ -28,7 +28,6 @@ import static org.mockito.Mockito.mock;
 import com.amazonaws.auth.AWSCredentials;
 import com.amazonaws.auth.AWSCredentialsProvider;
 import com.amazonaws.auth.AWSSessionCredentials;
-import com.google.common.base.Supplier;
 import com.google.common.util.concurrent.MoreExecutors;
 import java.io.File;
 import java.io.IOException;
@@ -56,6 +55,7 @@ import org.apache.bookkeeper.mledger.offload.jcloud.BlobStoreTestBase;
 import org.apache.bookkeeper.mledger.offload.jcloud.CredentialsUtil;
 import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.pulsar.common.policies.data.OffloadPolicies;
+import org.apache.pulsar.jcloud.shade.com.google.common.base.Supplier;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.MockZooKeeper;
 import org.apache.zookeeper.data.ACL;
diff --git a/tiered-storage/pom.xml b/tiered-storage/pom.xml
index 70a69db..ba0c3fd 100644
--- a/tiered-storage/pom.xml
+++ b/tiered-storage/pom.xml
@@ -32,6 +32,10 @@
   <artifactId>tiered-storage-parent</artifactId>
   <name>Apache Pulsar :: Tiered Storage :: Parent</name>
 
+  <properties>
+    <pulsar.jclouds.shaded.version>${project.version}</pulsar.jclouds.shaded.version>
+  </properties>
+
   <modules>
     <module>jcloud</module>
     <module>file-system</module>


[pulsar] 13/14: Handle NotAllowed Exception at the client side. (#7430)

Posted by rx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 754b864cf5f8844881eb9d47f4eaba6b4fb6d5c2
Author: lipenghui <pe...@apache.org>
AuthorDate: Fri Jul 17 09:22:40 2020 +0800

    Handle NotAllowed Exception at the client side. (#7430)
    
    * Handle NotAllowed Exception at the client side.
    
    (cherry picked from commit f8b2a2334fb7d2dc5266242a6393c9cc434fba60)
---
 .../broker/service/BrokerServiceException.java     |  2 ++
 .../client/api/KeySharedSubscriptionTest.java      |  2 +-
 .../pulsar/client/api/PulsarClientException.java   | 22 ++++++++++++++++++++++
 pulsar-client-cpp/include/pulsar/Result.h          |  1 +
 pulsar-client-cpp/lib/ClientConnection.cc          |  3 +++
 pulsar-client-cpp/lib/Result.cc                    |  3 +++
 .../org/apache/pulsar/client/impl/ClientCnx.java   |  2 ++
 .../apache/pulsar/common/api/proto/PulsarApi.java  |  3 +++
 pulsar-common/src/main/proto/PulsarApi.proto       |  1 +
 9 files changed, 38 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
index 6d0e50f..7ec97ff 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
@@ -212,6 +212,8 @@ public class BrokerServiceException extends Exception {
             return ServerError.TransactionCoordinatorNotFound;
         } else if (t instanceof CoordinatorException.InvalidTxnStatusException) {
             return ServerError.InvalidTxnStatus;
+        } else if (t instanceof NotAllowedException) {
+            return ServerError.NotAllowedError;
         } else {
             if (checkCauseIfUnknown) {
                 return getClientErrorCode(t.getCause(), false);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
index 2a7a20b..610c4d1 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
@@ -397,7 +397,7 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
         receiveAndCheck(checkList);
     }
 
-    @Test(expectedExceptions = PulsarClientException.class)
+    @Test(expectedExceptions = PulsarClientException.NotAllowedException.class)
     public void testDisableKeySharedSubscription() throws PulsarClientException {
         this.conf.setSubscriptionKeySharedEnable(false);
         String topic = "persistent://public/default/key_shared_disabled";
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
index 6a6e42a..597e0d5 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
@@ -640,6 +640,23 @@ public class PulsarClientException extends IOException {
     }
 
     /**
+     * Not allowed exception thrown by Pulsar client.
+     */
+    public static class NotAllowedException extends PulsarClientException {
+
+        /**
+         * Constructs an {@code NotAllowedException} with the specified detail message.
+         *
+         * @param msg
+         *        The detail message (which is saved for later retrieval
+         *        by the {@link #getMessage()} method)
+         */
+        public NotAllowedException(String msg) {
+            super(msg);
+        }
+    }
+
+    /**
      * Full producer queue error thrown by Pulsar client.
      */
     public static class ProducerQueueIsFullError extends PulsarClientException {
@@ -790,6 +807,8 @@ public class PulsarClientException extends IOException {
             return new InvalidTopicNameException(msg);
         } else if (t instanceof NotSupportedException) {
             return new NotSupportedException(msg);
+        } else if (t instanceof NotAllowedException) {
+            return new NotAllowedException(msg);
         } else if (t instanceof ProducerQueueIsFullError) {
             return new ProducerQueueIsFullError(msg);
         } else if (t instanceof ProducerBlockedQuotaExceededError) {
@@ -873,6 +892,8 @@ public class PulsarClientException extends IOException {
             return new InvalidTopicNameException(msg);
         } else if (cause instanceof NotSupportedException) {
             return new NotSupportedException(msg);
+        } else if (cause instanceof NotAllowedException) {
+            return new NotAllowedException(msg);
         } else if (cause instanceof ProducerQueueIsFullError) {
             return new ProducerQueueIsFullError(msg);
         } else if (cause instanceof ProducerBlockedQuotaExceededError) {
@@ -911,6 +932,7 @@ public class PulsarClientException extends IOException {
                 || t instanceof InvalidMessageException
                 || t instanceof InvalidTopicNameException
                 || t instanceof NotSupportedException
+                || t instanceof NotAllowedException
                 || t instanceof ChecksumException
                 || t instanceof CryptoException
                 || t instanceof ConsumerAssignException
diff --git a/pulsar-client-cpp/include/pulsar/Result.h b/pulsar-client-cpp/include/pulsar/Result.h
index b2a16c5..6dd7e45 100644
--- a/pulsar-client-cpp/include/pulsar/Result.h
+++ b/pulsar-client-cpp/include/pulsar/Result.h
@@ -82,6 +82,7 @@ enum Result
                                                      /// Shared and Key_Shared subscription mode
     ResultTransactionCoordinatorNotFoundError,       /// Transaction coordinator not found
     ResultInvalidTxnStatusError,                     /// Invalid txn status error
+    ResultNotAllowedError,                           /// Not allowed
 };
 
 // Return string representation of result code
diff --git a/pulsar-client-cpp/lib/ClientConnection.cc b/pulsar-client-cpp/lib/ClientConnection.cc
index fe43805..3628dd5 100644
--- a/pulsar-client-cpp/lib/ClientConnection.cc
+++ b/pulsar-client-cpp/lib/ClientConnection.cc
@@ -117,6 +117,9 @@ static Result getResult(ServerError serverError) {
 
         case InvalidTxnStatus:
             return ResultInvalidTxnStatusError;
+
+        case NotAllowedError:
+            return ResultNotAllowedError;
     }
     // NOTE : Do not add default case in the switch above. In future if we get new cases for
     // ServerError and miss them in the switch above we would like to get notified. Adding
diff --git a/pulsar-client-cpp/lib/Result.cc b/pulsar-client-cpp/lib/Result.cc
index fc4d81f..3c1c2a8 100644
--- a/pulsar-client-cpp/lib/Result.cc
+++ b/pulsar-client-cpp/lib/Result.cc
@@ -144,6 +144,9 @@ const char* strResult(Result result) {
 
         case ResultInvalidTxnStatusError:
             return "ResultInvalidTxnStatusError";
+
+        case ResultNotAllowedError:
+            return "ResultNotAllowedError";
     };
     // NOTE : Do not add default case in the switch above. In future if we get new cases for
     // ServerError and miss them in the switch above we would like to get notified. Adding
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index d63cbc1..ea1624b 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -1003,6 +1003,8 @@ public class ClientCnx extends PulsarHandler {
             return new PulsarClientException.TopicDoesNotExistException(errorMsg);
         case ConsumerAssignError:
             return new PulsarClientException.ConsumerAssignException(errorMsg);
+        case NotAllowedError:
+            return new PulsarClientException.NotAllowedException(errorMsg);
         case UnknownError:
         default:
             return new PulsarClientException(errorMsg);
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
index 02fed41..4d1babf 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
@@ -82,6 +82,7 @@ public final class PulsarApi {
     ConsumerAssignError(19, 19),
     TransactionCoordinatorNotFound(20, 20),
     InvalidTxnStatus(21, 21),
+    NotAllowedError(22, 22),
     ;
     
     public static final int UnknownError_VALUE = 0;
@@ -106,6 +107,7 @@ public final class PulsarApi {
     public static final int ConsumerAssignError_VALUE = 19;
     public static final int TransactionCoordinatorNotFound_VALUE = 20;
     public static final int InvalidTxnStatus_VALUE = 21;
+    public static final int NotAllowedError_VALUE = 22;
     
     
     public final int getNumber() { return value; }
@@ -134,6 +136,7 @@ public final class PulsarApi {
         case 19: return ConsumerAssignError;
         case 20: return TransactionCoordinatorNotFound;
         case 21: return InvalidTxnStatus;
+        case 22: return NotAllowedError;
         default: return null;
       }
     }
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto
index be5e4d4..c4acca2 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -196,6 +196,7 @@ enum ServerError {
 
     TransactionCoordinatorNotFound = 20; // Transaction coordinator not found error
     InvalidTxnStatus = 21; // Invalid txn status error
+    NotAllowedError = 22; // Not allowed error
 }
 
 enum AuthMethod {


[pulsar] 06/14: [Issue 7407] NPE with tombstones (#7408)

Posted by rx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit f62bc5bc4ef17c1688149e874251fca1e5309059
Author: Fernando Miguélez Palomo <fe...@gmail.com>
AuthorDate: Mon Jul 6 03:26:47 2020 +0200

    [Issue 7407] NPE with tombstones (#7408)
    
    Added check to prevent NPE when a tombstone (null value) is produced.
    
    Fixes #7407
    
    (cherry picked from commit 90c2f4ae8bb8fa405e69f0b2cf99622223bae6ab)
---
 .../java/org/apache/pulsar/client/impl/ConsumerStatsRecorderImpl.java   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsRecorderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsRecorderImpl.java
index 80ee5f9..c1a2559 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsRecorderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsRecorderImpl.java
@@ -166,7 +166,7 @@ public class ConsumerStatsRecorderImpl implements ConsumerStatsRecorder {
     public void updateNumMsgsReceived(Message<?> message) {
         if (message != null) {
             numMsgsReceived.increment();
-            numBytesReceived.add(message.getData().length);
+            numBytesReceived.add(message.getData() == null ? 0 : message.getData().length);
         }
     }
 


[pulsar] 03/14: [Issue 7347] Avoid the NPE occurs in method `ManagedLedgerImpl.isOffloadedNeedsDelete` (#7389)

Posted by rx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 0a2df5a5c0309d561b2d15d1451ceaed951935ec
Author: ran <ga...@126.com>
AuthorDate: Thu Jul 9 00:43:31 2020 +0800

    [Issue 7347] Avoid the NPE occurs in method `ManagedLedgerImpl.isOffloadedNeedsDelete` (#7389)
    
    Fixes #7347
    
    ### Motivation
    
    The default value of the offload-deletion-lag is `null`, this will cause an NPE problem.
    
    ### Modifications
    
    Add null check in the method `ManagedLedgerImpl.isOffloadedNeedsDelete`.
    
    ### Verifying this change
    
    Add unit test for method `ManagedLedgerImpl.isOffloadedNeedsDelete`.
    
    (cherry picked from commit eaf268cb3717df242a8e17b5cadb6babc1a7099c)
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  4 +-
 .../mledger/impl/OffloadLedgerDeleteTest.java      | 62 ++++++++++++++++++++--
 .../bookkeeper/mledger/impl/OffloadPrefixTest.java |  2 +-
 3 files changed, 63 insertions(+), 5 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index f45b341..ea0614d 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -1960,7 +1960,9 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
         long elapsedMs = clock.millis() - offload.getTimestamp();
 
         if (config.getLedgerOffloader() != null && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
-                && config.getLedgerOffloader().getOffloadPolicies() != null) {
+                && config.getLedgerOffloader().getOffloadPolicies() != null
+                && config.getLedgerOffloader().getOffloadPolicies()
+                    .getManagedLedgerOffloadDeletionLagInMillis() != null) {
             return offload.getComplete() && !offload.getBookkeeperDeleted()
                     && elapsedMs > config.getLedgerOffloader()
                     .getOffloadPolicies().getManagedLedgerOffloadDeletionLagInMillis();
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadLedgerDeleteTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadLedgerDeleteTest.java
index 8fbc588..736cd8b 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadLedgerDeleteTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadLedgerDeleteTest.java
@@ -20,15 +20,21 @@ package org.apache.bookkeeper.mledger.impl;
 
 import static org.apache.bookkeeper.mledger.impl.OffloadPrefixTest.assertEventuallyTrue;
 
+import java.lang.reflect.Method;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
+import org.apache.bookkeeper.mledger.LedgerOffloader;
 import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats;
 import org.apache.bookkeeper.mledger.util.MockClock;
 import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
 
+import org.apache.pulsar.common.policies.data.OffloadPolicies;
+import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -48,7 +54,7 @@ public class OffloadLedgerDeleteTest extends MockedBookKeeperTestCase {
         config.setMinimumRolloverTime(0, TimeUnit.SECONDS);
         config.setRetentionTime(10, TimeUnit.MINUTES);
         config.setRetentionSizeInMB(10);
-        offloader.getOffloadPolicies().setManagedLedgerOffloadDeletionLagInMillis(new Long(300000));
+        offloader.getOffloadPolicies().setManagedLedgerOffloadDeletionLagInMillis(300000L);
         config.setLedgerOffloader(offloader);
         config.setClock(clock);
 
@@ -110,7 +116,7 @@ public class OffloadLedgerDeleteTest extends MockedBookKeeperTestCase {
         config.setMinimumRolloverTime(0, TimeUnit.SECONDS);
         config.setRetentionTime(5, TimeUnit.MINUTES);
         config.setRetentionSizeInMB(10);
-        offloader.getOffloadPolicies().setManagedLedgerOffloadDeletionLagInMillis(new Long(600000));
+        offloader.getOffloadPolicies().setManagedLedgerOffloadDeletionLagInMillis(600000L);
         config.setLedgerOffloader(offloader);
         config.setClock(clock);
 
@@ -157,7 +163,7 @@ public class OffloadLedgerDeleteTest extends MockedBookKeeperTestCase {
         config.setMaxEntriesPerLedger(10);
         config.setMinimumRolloverTime(0, TimeUnit.SECONDS);
         config.setRetentionTime(10, TimeUnit.MINUTES);
-        offloader.getOffloadPolicies().setManagedLedgerOffloadDeletionLagInMillis(new Long(300000));
+        offloader.getOffloadPolicies().setManagedLedgerOffloadDeletionLagInMillis(300000L);
         config.setLedgerOffloader(offloader);
         config.setClock(clock);
 
@@ -201,4 +207,54 @@ public class OffloadLedgerDeleteTest extends MockedBookKeeperTestCase {
                             .map(e -> e.getLedgerId()).collect(Collectors.toSet()),
                             offloader.offloadedLedgers());
     }
+
+    @Test
+    public void isOffloadedNeedsDeleteTest() throws Exception {
+        OffloadPolicies offloadPolicies = new OffloadPolicies();
+        LedgerOffloader ledgerOffloader = Mockito.mock(LedgerOffloader.class);
+        Mockito.when(ledgerOffloader.getOffloadPolicies()).thenReturn(offloadPolicies);
+
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        MockClock clock = new MockClock();
+        config.setLedgerOffloader(ledgerOffloader);
+        config.setClock(clock);
+
+        ManagedLedger managedLedger = factory.open("isOffloadedNeedsDeleteTest", config);
+        Class<ManagedLedgerImpl> clazz = ManagedLedgerImpl.class;
+        Method method = clazz.getDeclaredMethod("isOffloadedNeedsDelete", MLDataFormats.OffloadContext.class);
+        method.setAccessible(true);
+
+        MLDataFormats.OffloadContext offloadContext = MLDataFormats.OffloadContext.newBuilder()
+                .setTimestamp(config.getClock().millis() - 1000)
+                .setComplete(true)
+                .setBookkeeperDeleted(false)
+                .build();
+        Boolean needsDelete = (Boolean) method.invoke(managedLedger, offloadContext);
+        Assert.assertFalse(needsDelete);
+
+        offloadPolicies.setManagedLedgerOffloadDeletionLagInMillis(500L);
+        needsDelete = (Boolean) method.invoke(managedLedger, offloadContext);
+        Assert.assertTrue(needsDelete);
+
+        offloadPolicies.setManagedLedgerOffloadDeletionLagInMillis(1000L * 2);
+        needsDelete = (Boolean) method.invoke(managedLedger, offloadContext);
+        Assert.assertFalse(needsDelete);
+
+        offloadContext = MLDataFormats.OffloadContext.newBuilder()
+                .setTimestamp(config.getClock().millis() - 1000)
+                .setComplete(false)
+                .setBookkeeperDeleted(false)
+                .build();
+        needsDelete = (Boolean) method.invoke(managedLedger, offloadContext);
+        Assert.assertFalse(needsDelete);
+
+        offloadContext = MLDataFormats.OffloadContext.newBuilder()
+                .setTimestamp(config.getClock().millis() - 1000)
+                .setComplete(true)
+                .setBookkeeperDeleted(true)
+                .build();
+        needsDelete = (Boolean) method.invoke(managedLedger, offloadContext);
+        Assert.assertFalse(needsDelete);
+
+    }
 }
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
index 97bab56..d2d4148 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
@@ -608,7 +608,7 @@ public class OffloadPrefixTest extends MockedBookKeeperTestCase {
         config.setMaxEntriesPerLedger(10);
         config.setMinimumRolloverTime(0, TimeUnit.SECONDS);
         config.setRetentionTime(0, TimeUnit.MINUTES);
-        offloader.getOffloadPolicies().setManagedLedgerOffloadDeletionLagInMillis(new Long(100));
+        offloader.getOffloadPolicies().setManagedLedgerOffloadDeletionLagInMillis(100L);
         offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInBytes(100);
         config.setLedgerOffloader(offloader);
         ManagedLedgerImpl ledger = (ManagedLedgerImpl)factory.open("my_test_ledger", config);