You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2020/07/29 09:53:06 UTC

[pulsar] branch branch-2.6 updated (f0363e3 -> 375fc00)

This is an automated email from the ASF dual-hosted git repository.

rxl pushed a change to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


    from f0363e3  shaded jclouds to avoid gson conflict
     new fc1a8ee  Consumer is registered on dispatcher even if hash range conflicts on Key_Shared subscription (#7444)
     new e0de399  add pulsar-client-messagecrypto-bc into pulsar-client module (#7447)
     new 22647cd  fix update partitions error for non-persistent topic (#7459)
     new af93301  [Doc]--add authentication client with oauth2 support  (#7462)
     new 7905aca  Cpp oauth2 auth client (#7467)
     new 5f05fd2  Use CGroup CPU usage when present (#7475)
     new 76c0939  Fix ArrayIndexOutOfBoundsException in batch index ack. (#7483)
     new 9eee486  Get last entry is trying to read entry -1 (#7495)
     new 825fdd4  [Broker] Timeout opening managed ledger operation … (#7506)
     new 375fc00  fix typo

The 10 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 conf/broker.conf                                   |   2 +-
 conf/standalone.conf                               |   2 +-
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |  16 +-
 .../mledger/impl/ManagedLedgerFactoryImpl.java     |  70 +++++-
 .../apache/pulsar/broker/admin/AdminResource.java  |   3 +-
 .../loadbalance/impl/LinuxBrokerHostUsageImpl.java | 116 +++++----
 .../AbstractDispatcherSingleActiveConsumer.java    |   4 +-
 .../broker/service/EntryBatchIndexesAcks.java      |   9 +-
 .../apache/pulsar/broker/service/ServerCnx.java    |   1 +
 ...istentStickyKeyDispatcherMultipleConsumers.java |  10 +-
 .../PersistentDispatcherMultipleConsumers.java     |   2 +-
 .../PersistentDispatcherSingleActiveConsumer.java  |   2 +-
 ...istentStickyKeyDispatcherMultipleConsumers.java |  12 +-
 .../pulsar/broker/admin/AdminResourceTest.java     |   2 +
 .../client/api/KeySharedSubscriptionTest.java      |  10 +-
 pulsar-client-cpp/include/pulsar/Authentication.h  | 107 +++++++-
 .../include/pulsar/ClientConfiguration.h           |   2 +-
 pulsar-client-cpp/lib/Authentication.cc            |   7 +
 pulsar-client-cpp/lib/ClientConfiguration.cc       |   2 +-
 pulsar-client-cpp/lib/auth/AuthAthenz.cc           |   2 +-
 pulsar-client-cpp/lib/auth/AuthOauth2.cc           | 271 +++++++++++++++++++++
 .../lib/auth/{AuthToken.h => AuthOauth2.h}         |  42 +++-
 pulsar-client-cpp/lib/auth/AuthTls.cc              |   2 +-
 pulsar-client-cpp/lib/auth/AuthToken.cc            |   2 +-
 pulsar-client-cpp/tests/AuthPluginTest.cc          |  45 ++++
 pulsar-client-shaded/pom.xml                       |   6 +
 site2/docs/client-libraries-java.md                |  27 +-
 site2/docs/reference-configuration.md              |   1 +
 site2/docs/security-oauth.md                       |  93 +++++++
 site2/website/sidebars.json                        |   1 +
 30 files changed, 783 insertions(+), 88 deletions(-)
 create mode 100644 pulsar-client-cpp/lib/auth/AuthOauth2.cc
 copy pulsar-client-cpp/lib/auth/{AuthToken.h => AuthOauth2.h} (50%)
 create mode 100644 site2/docs/security-oauth.md


[pulsar] 04/10: [Doc]--add authentication client with oauth2 support (#7462)

Posted by rx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit af93301a47fe202c7e597056fb180bccb94da906
Author: HuanliMeng <48...@users.noreply.github.com>
AuthorDate: Thu Jul 9 12:19:06 2020 +0800

    [Doc]--add authentication client with oauth2 support  (#7462)
    
    fixes #7425
    
    ### Motivation
    Pulsar supports authenticating clients using OAuth 2.0 access tokens. the related code is updated as shown in this PR: https://github.com/apache/pulsar/pull/7420.
    
    Update the related pulsar doc.
    
    
    
    ### Modifications
    In security section: add Authentication using OAuth 2.0 access tokens
    Java client: add an example about Oauth authentication
    sidebar: update the sidebar document.
    
    (cherry picked from commit 6726072a867c7a9f23d1a3cecfc2085c84cbdece)
---
 site2/docs/client-libraries-java.md | 27 ++++++++++-
 site2/docs/security-oauth.md        | 93 +++++++++++++++++++++++++++++++++++++
 site2/website/sidebars.json         |  1 +
 3 files changed, 120 insertions(+), 1 deletion(-)

diff --git a/site2/docs/client-libraries-java.md b/site2/docs/client-libraries-java.md
index 68a9256..bbcf8f2 100644
--- a/site2/docs/client-libraries-java.md
+++ b/site2/docs/client-libraries-java.md
@@ -781,7 +781,7 @@ The following schema formats are currently available for Java:
 
 ## Authentication
 
-Pulsar currently supports two authentication schemes: [TLS](security-tls-authentication.md) and [Athenz](security-athenz.md). You can use the Pulsar Java client with both.
+Pulsar currently supports three authentication schemes: [TLS](security-tls-authentication.md), [Athenz](security-athenz.md), and [Oauth2](security-oauth.md). You can use the Pulsar Java client with all of them.
 
 ### TLS Authentication
 
@@ -840,3 +840,28 @@ PulsarClient client = PulsarClient.builder()
 > * `file:///path/to/file`
 > * `file:/path/to/file`
 > * `data:application/x-pem-file;base64,<base64-encoded value>`
+
+### Oauth2
+
+The following example shows how to use [Oauth2](security-oauth.md) as an authentication provider for the Pulsar Java client.
+
+You can use the factory method to configure authentication for Pulsar Java client.
+
+```java
+PulsarClient client = PulsarClient.builder()
+    .serviceUrl("pulsar://broker.example.com:6650/")
+    .authentication(
+        AuthenticationFactoryOAuth2.clientCredentials(this.issuerUrl, this.credentialsUrl, this.audience))
+    .build();
+```
+
+In addition, you can also use the encoded parameters to configure authentication for Pulsar Java client.
+
+```java
+Authentication auth = AuthenticationFactory
+    .create(AuthenticationOAuth2.class.getName(), "{"type":"client_credentials","privateKey":"...","issuerUrl":"...","audience":"..."}");
+PulsarClient client = PulsarClient.builder()
+    .serviceUrl("pulsar://broker.example.com:6650/")
+    .authentication(auth)
+    .build();
+```
\ No newline at end of file
diff --git a/site2/docs/security-oauth.md b/site2/docs/security-oauth.md
new file mode 100644
index 0000000..b399dea
--- /dev/null
+++ b/site2/docs/security-oauth.md
@@ -0,0 +1,93 @@
+---
+id: security-oauth
+title: Client authentication using OAuth 2.0 access tokens
+sidebar_label: Authentication using OAuth 2.0 access tokens
+---
+
+Pulsar supports authenticating clients using OAuth 2.0 access tokens. You can use OAuth 2.0 access tokens to identify a Pulsar client and associate the Pulsar client with some "principal" (or "role"), which is permitted to do some actions, such as publishing messages to a topic or consume messages from a topic.
+
+This module is used to support the Pulsar client authentication plugin for OAuth 2.0. After communicating with the Oauth 2.0 server, the Pulsar client gets an `access token` from the Oauth 2.0 server, and passes this `access token` to the Pulsar broker to do the authentication. The broker can use the `org.apache.pulsar.broker.authentication.AuthenticationProviderToken`. Or, you can add your own `AuthenticationProvider` to make it with this module.
+
+## Authentication provider configuration
+
+This library allows you to authenticate the Pulsar client by using an access token that is obtained from an OAuth 2.0 authorization service, which acts as a _token issuer_.
+
+### Authentication types
+
+The authentication type determines how to obtain an access token through an OAuth 2.0 authorization flow.
+
+#### Note
+> Currently, the Pulsar Java client only supports the `client_credentials` authentication type .
+
+#### Client credentials
+
+The following table lists parameters supported for the `client credentials` authentication type.
+
+| Parameter | Description | Example | Required or not |
+| --- | --- | --- | --- |
+| `type` | Oauth 2.0 authentication type. |  `client_credentials` (default) | Optional |
+| `issuerUrl` | URL of the authentication provider which allows the Pulsar client to obtain an access token | `https://accounts.google.com` | Required |
+| `privateKey` | URL to a JSON credentials file  | Support the following pattern formats: <br> <li> `file:///path/to/file` <li>`file:/path/to/file` <li> `data:application/json;base64,<base64-encoded value>` | Required |
+| `audience`  | An OAuth 2.0 "resource server" identifier for the Pulsar cluster | `https://broker.example.com` | Required |
+
+The credentials file contains service account credentials used with the client authentication type. The following shows an example of a credentials file `credentials_file.json`.
+
+```json
+{
+  "type": "client_credentials",
+  "client_id": "d9ZyX97q1ef8Cr81WHVC4hFQ64vSlDK3",
+  "client_secret": "on1uJ...k6F6R",
+  "client_email": "1234567890-abcdefghijklmnopqrstuvwxyz@developer.gserviceaccount.com",
+  "issuer_url": "https://accounts.google.com"
+}
+```
+
+In the above example, the authentication type is set to `client_credentials` by default. And the fields "client_id" and "client_secret" are required.
+
+### Typical original Oauth2 request mapping
+
+The following shows a typical original Oauth2 request, which is used to obtain the access token from the Oauth2 server.
+
+```bash
+curl --request POST \
+  --url https://dev-kt-aa9ne.us.auth0.com/oauth/token \
+  --header 'content-type: application/json' \
+  --data '{
+  "client_id":"Xd23RHsUnvUlP7wchjNYOaIfazgeHd9x",
+  "client_secret":"rT7ps7WY8uhdVuBTKWZkttwLdQotmdEliaM5rLfmgNibvqziZ-g07ZH52N_poGAb",
+  "audience":"https://dev-kt-aa9ne.us.auth0.com/api/v2/",
+  "grant_type":"client_credentials"}'
+```
+
+In the above example, the mapping relationship is shown as below.
+
+- The `issuerUrl` parameter in this plugin is mapped to `--url https://dev-kt-aa9ne.us.auth0.com/oauth/token`.
+- The `privateKey` file parameter in this plugin should at least contains the `client_id` and `client_secret` fields.
+- The `audience` parameter in this plugin is mapped to  `"audience":"https://dev-kt-aa9ne.us.auth0.com/api/v2/"`.
+
+## Client Configuration
+
+You can use the Oauth2 authentication provider with the following Pulsar clients.
+
+### Java
+
+You can use the factory method to configure authentication for Pulsar Java client.
+
+```java
+PulsarClient client = PulsarClient.builder()
+    .serviceUrl("pulsar://broker.example.com:6650/")
+    .authentication(
+        AuthenticationFactoryOAuth2.clientCredentials(this.issuerUrl, this.credentialsUrl, this.audience))
+    .build();
+```
+
+In addition, you can also use the encoded parameters to configure authentication for Pulsar Java client.
+
+```java
+Authentication auth = AuthenticationFactory
+    .create(AuthenticationOAuth2.class.getName(), "{"type":"client_credentials","privateKey":"...","issuerUrl":"...","audience":"..."}");
+PulsarClient client = PulsarClient.builder()
+    .serviceUrl("pulsar://broker.example.com:6650/")
+    .authentication(auth)
+    .build();
+```
\ No newline at end of file
diff --git a/site2/website/sidebars.json b/site2/website/sidebars.json
index 3799845..42356fd 100644
--- a/site2/website/sidebars.json
+++ b/site2/website/sidebars.json
@@ -82,6 +82,7 @@
       "security-jwt",
       "security-athenz",
       "security-kerberos",
+      "security-oauth"
       "security-authorization",
       "security-encryption",
       "security-extending",


[pulsar] 09/10: [Broker] Timeout opening managed ledger operation … (#7506)

Posted by rx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 825fdd4222dd65ef3099f1a975a1555226297379
Author: Sijie Guo <si...@apache.org>
AuthorDate: Thu Jul 16 06:20:16 2020 -0700

    [Broker] Timeout opening managed ledger operation … (#7506)
    
    *Motivation*
    
    Currently, broker has a timeout mechanism on loading topics. However, the underlying managed ledger library
    doesn't provide a timeout mechanism. This will get into a situation that a TopicLoad operation times out
    after 30 seconds. But the CompletableFuture of opening a managed ledger is still kept in the cache of managed ledger
    factory. The completable future will never return. So any sub-sequent topic lookups will fail because any
    attempts to load a topic will never attempt to re-open a managed ledger.
    
    *Modification*
    
    Introduce a timeout mechanism in the managed ledger factory. If a managed ledger is not open within a given timeout
    period, the CompletableFuture will be removed. This allows any subsequent attempts to load topics that can try to
    open the managed ledger again.
    
    *Tests*
    
    This problem can be constantly reproduced in a chaos test in Kubernetes by killing k8s worker nodes. It can cause
    producer stuck forever until the owner broker pod is restarted. The change has been verified in a chaos testing environment.
    
    (cherry picked from commit 14e3b7ae05e84ca13eefa16026288a384a961e45)
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 16 +++--
 .../mledger/impl/ManagedLedgerFactoryImpl.java     | 70 ++++++++++++++++++----
 2 files changed, 72 insertions(+), 14 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 3e0d583..26f1bf3 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -64,6 +64,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
 import org.apache.bookkeeper.client.AsyncCallback.DeleteCallback;
+import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.LedgerEntry;
@@ -323,9 +324,9 @@ public class ManagedCursorImpl implements ManagedCursor {
         // a new ledger and write the position into it
         ledger.mbean.startCursorLedgerOpenOp();
         long ledgerId = info.getCursorsLedgerId();
-        bookkeeper.asyncOpenLedger(ledgerId, digestType, config.getPassword(), (rc, lh, ctx) -> {
-            if (log.isDebugEnabled()) {
-                log.debug("[{}] Opened ledger {} for consumer {}. rc={}", ledger.getName(), ledgerId, name, rc);
+        OpenCallback openCallback = (rc, lh, ctx) -> {
+            if (log.isInfoEnabled()) {
+                log.info("[{}] Opened ledger {} for consumer {}. rc={}", ledger.getName(), ledgerId, name, rc);
             }
             if (isBkErrorNotRecoverable(rc)) {
                 log.error("[{}] Error opening metadata ledger {} for consumer {}: {}", ledger.getName(), ledgerId, name,
@@ -399,7 +400,14 @@ public class ManagedCursorImpl implements ManagedCursor {
                 recoveredCursor(position, recoveredProperties, lh);
                 callback.operationComplete();
             }, null);
-        }, null);
+        };
+        try {
+            bookkeeper.asyncOpenLedger(ledgerId, digestType, config.getPassword(), openCallback, null);
+        } catch (Throwable t) {
+            log.error("[{}] Encountered error on opening cursor ledger {} for cursor {}",
+                ledger.getName(), ledgerId, name, t);
+            openCallback.openComplete(BKException.Code.UnexpectedConditionException, null, null);
+        }
     }
 
     private void recoverIndividualDeletedMessages(List<MLDataFormats.MessageRange> individualDeletedMessagesList) {
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index 517c62c..8b470c2 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -50,6 +50,7 @@ import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteLedgerCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ManagedLedgerInfoCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenLedgerCallback;
@@ -101,6 +102,8 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
     protected final ManagedLedgerFactoryMBeanImpl mbean;
 
     protected final ConcurrentHashMap<String, CompletableFuture<ManagedLedgerImpl>> ledgers = new ConcurrentHashMap<>();
+    protected final ConcurrentHashMap<String, PendingInitializeManagedLedger> pendingInitializeLedgers =
+        new ConcurrentHashMap<>();
     private final EntryCacheManager entryCacheManager;
 
     private long lastStatTimestamp = System.nanoTime();
@@ -111,6 +114,18 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
 
     private static final int StatsPeriodSeconds = 60;
 
+    private static class PendingInitializeManagedLedger {
+
+        private final ManagedLedgerImpl ledger;
+        private final long createTimeMs;
+
+        PendingInitializeManagedLedger(ManagedLedgerImpl ledger) {
+            this.ledger = ledger;
+            this.createTimeMs = System.currentTimeMillis();
+        }
+
+    }
+
     public ManagedLedgerFactoryImpl(ClientConfiguration bkClientConfiguration, String zkConnection) throws Exception {
         this(bkClientConfiguration, zkConnection, new ManagedLedgerFactoryConfig());
     }
@@ -320,18 +335,32 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
 
         // If the ledger state is bad, remove it from the map.
         CompletableFuture<ManagedLedgerImpl> existingFuture = ledgers.get(name);
-        if (existingFuture != null && existingFuture.isDone()) {
-            try {
-                ManagedLedgerImpl l = existingFuture.get();
-                if (l.getState().equals(State.Fenced.toString()) || l.getState().equals(State.Closed.toString())) {
-                    // Managed ledger is in unusable state. Recreate it.
-                    log.warn("[{}] Attempted to open ledger in {} state. Removing from the map to recreate it", name,
+        if (existingFuture != null) {
+            if (existingFuture.isDone()) {
+                try {
+                    ManagedLedgerImpl l = existingFuture.get();
+                    if (l.getState().equals(State.Fenced.toString()) || l.getState().equals(State.Closed.toString())) {
+                        // Managed ledger is in unusable state. Recreate it.
+                        log.warn("[{}] Attempted to open ledger in {} state. Removing from the map to recreate it", name,
                             l.getState());
-                    ledgers.remove(name, existingFuture);
+                        ledgers.remove(name, existingFuture);
+                    }
+                } catch (Exception e) {
+                    // Unable to get the future
+                    log.warn("[{}] Got exception while trying to retrieve ledger", name, e);
                 }
-            } catch (Exception e) {
-                // Unable to get the future
-                log.warn("[{}] Got exception while trying to retrieve ledger", name, e);
+            } else {
+                PendingInitializeManagedLedger pendingLedger = pendingInitializeLedgers.get(name);
+                if (null != pendingLedger) {
+                    long pendingMs = System.currentTimeMillis() - pendingLedger.createTimeMs;
+                    if (pendingMs > TimeUnit.SECONDS.toMillis(config.getMetadataOperationsTimeoutSeconds())) {
+                        log.warn("[{}] Managed ledger has been pending in initialize state more than {} milliseconds,"
+                            + " remove it from cache to retry ...", name, pendingMs);
+                        ledgers.remove(name, existingFuture);
+                        pendingInitializeLedgers.remove(name, pendingLedger);
+                    }
+                }
+
             }
         }
 
@@ -345,16 +374,37 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
                                     config.getBookKeeperEnsemblePlacementPolicyProperties())),
                     store, config, scheduledExecutor,
                     orderedExecutor, name, mlOwnershipChecker);
+            PendingInitializeManagedLedger pendingLedger = new PendingInitializeManagedLedger(newledger);
+            pendingInitializeLedgers.put(name, pendingLedger);
             newledger.initialize(new ManagedLedgerInitializeLedgerCallback() {
                 @Override
                 public void initializeComplete() {
+                    log.info("[{}] Successfully initialize managed ledger", name);
+                    pendingInitializeLedgers.remove(name, pendingLedger);
                     future.complete(newledger);
                 }
 
                 @Override
                 public void initializeFailed(ManagedLedgerException e) {
+                    log.error("[{}] Failed to initialize managed ledger: {}", name, e.getMessage());
+
                     // Clean the map if initialization fails
                     ledgers.remove(name, future);
+
+                    if (pendingInitializeLedgers.remove(name, pendingLedger)) {
+                        pendingLedger.ledger.asyncClose(new CloseCallback() {
+                            @Override
+                            public void closeComplete(Object ctx) {
+                                // no-op
+                            }
+
+                            @Override
+                            public void closeFailed(ManagedLedgerException exception, Object ctx) {
+                                log.warn("[{}] Failed to a pending initialization managed ledger", name, exception);
+                            }
+                        }, null);
+                    }
+
                     future.completeExceptionally(e);
                 }
             }, null);


[pulsar] 10/10: fix typo

Posted by rx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 375fc00ef95f03d0e02f581bb7949a8ff40d164b
Author: mikecaat <35...@users.noreply.github.com>
AuthorDate: Tue Jul 14 06:11:45 2020 +0900

    fix typo
    
    Signed-off-by: xiaolong.ran <rx...@apache.org>
---
 conf/broker.conf                      | 2 +-
 conf/standalone.conf                  | 2 +-
 site2/docs/reference-configuration.md | 1 +
 3 files changed, 3 insertions(+), 2 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 05c2747..a74ab3b 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -937,7 +937,7 @@ replicatorPrefix=pulsar.repl
 
 # Duration to check replication policy to avoid replicator inconsistency
 # due to missing ZooKeeper watch (disable with value 0)
-replicatioPolicyCheckDurationSeconds=600
+replicationPolicyCheckDurationSeconds=600
 
 # Default message retention time
 defaultRetentionTimeInMinutes=0
diff --git a/conf/standalone.conf b/conf/standalone.conf
index 2efc651..fb653f0 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -684,7 +684,7 @@ replicationProducerQueueSize=1000
 
 # Duration to check replication policy to avoid replicator inconsistency
 # due to missing ZooKeeper watch (disable with value 0)
-replicatioPolicyCheckDurationSeconds=600
+replicationPolicyCheckDurationSeconds=600
 
 # Default message retention time
 defaultRetentionTimeInMinutes=0
diff --git a/site2/docs/reference-configuration.md b/site2/docs/reference-configuration.md
index a1015a1..3d32d3c 100644
--- a/site2/docs/reference-configuration.md
+++ b/site2/docs/reference-configuration.md
@@ -438,6 +438,7 @@ The [`pulsar-client`](reference-cli-tools.md#pulsar-client) CLI tool can be used
 |replicationMetricsEnabled|   |true|
 |replicationConnectionsPerBroker|   |16|
 |replicationProducerQueueSize|    |1000|
+| replicationPolicyCheckDurationSeconds | Duration to check replication policy to avoid replicator inconsistency due to missing ZooKeeper watch. When the value is set to 0, disable checking replication policy. | 600 |
 |defaultRetentionTimeInMinutes|   |0|
 |defaultRetentionSizeInMB|    |0|
 |keepAliveIntervalSeconds|    |30|


[pulsar] 06/10: Use CGroup CPU usage when present (#7475)

Posted by rx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 5f05fd28841ad4f0f7b2d18de329530d6b5edf3e
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Thu Jul 9 22:17:54 2020 -0700

    Use CGroup CPU usage when present (#7475)
    
    * Use CGroup CPU usage when present
    
    * Also read cpu limits from cgroups
    
    * Fixed test
    
    * Fixed string trimming
    
    * Addressed comments
    
    (cherry picked from commit 8bc4880044cd80bc2975dcf18ef21e8018595f3d)
---
 .../loadbalance/impl/LinuxBrokerHostUsageImpl.java | 116 +++++++++++++--------
 1 file changed, 75 insertions(+), 41 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImpl.java
index 2d67f10..992b743 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImpl.java
@@ -18,7 +18,9 @@
  */
 package org.apache.pulsar.broker.loadbalance.impl;
 
+import com.google.common.base.Charsets;
 import com.sun.management.OperatingSystemMXBean;
+
 import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.nio.file.Files;
@@ -33,28 +35,33 @@ import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import lombok.extern.slf4j.Slf4j;
+
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.loadbalance.BrokerHostUsage;
 import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
 import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 
 /**
  * Class that will return the broker host usage.
  */
+@Slf4j
 public class LinuxBrokerHostUsageImpl implements BrokerHostUsage {
     private long lastCollection;
     private double lastTotalNicUsageTx;
     private double lastTotalNicUsageRx;
-    private CpuStat lastCpuStat;
+    private double lastCpuUsage;
+    private double lastCpuTotalTime;
     private OperatingSystemMXBean systemBean;
     private SystemResourceUsage usage;
 
     private final Optional<Double> overrideBrokerNicSpeedGbps;
+    private final boolean isCGroupsEnabled;
 
-    private static final Logger LOG = LoggerFactory.getLogger(LinuxBrokerHostUsageImpl.class);
+    private static final String CGROUPS_CPU_USAGE_PATH = "/sys/fs/cgroup/cpu/cpuacct.usage";
+    private static final String CGROUPS_CPU_LIMIT_QUOTA_PATH = "/sys/fs/cgroup/cpu/cpu.cfs_quota_us";
+    private static final String CGROUPS_CPU_LIMIT_PERIOD_PATH = "/sys/fs/cgroup/cpu/cpu.cfs_period_us";
 
     public LinuxBrokerHostUsageImpl(PulsarService pulsar) {
         this(
@@ -73,6 +80,16 @@ public class LinuxBrokerHostUsageImpl implements BrokerHostUsage {
         this.overrideBrokerNicSpeedGbps = overrideBrokerNicSpeedGbps;
         executorService.scheduleAtFixedRate(this::calculateBrokerHostUsage, 0,
                 hostUsageCheckIntervalMin, TimeUnit.MINUTES);
+
+        boolean isCGroupsEnabled = false;
+        try {
+             isCGroupsEnabled = Files.exists(Paths.get(CGROUPS_CPU_USAGE_PATH));
+        } catch (Exception e) {
+            log.warn("Failed to check cgroup CPU usage file: {}", e.getMessage());
+        }
+
+        this.isCGroupsEnabled = isCGroupsEnabled;
+        calculateBrokerHostUsage();
     }
 
     @Override
@@ -87,29 +104,20 @@ public class LinuxBrokerHostUsageImpl implements BrokerHostUsage {
         double totalNicUsageTx = getTotalNicUsageTxKb(nics);
         double totalNicUsageRx = getTotalNicUsageRxKb(nics);
         double totalCpuLimit = getTotalCpuLimit();
-        CpuStat cpuStat = getTotalCpuUsage();
 
         SystemResourceUsage usage = new SystemResourceUsage();
         long now = System.currentTimeMillis();
+        double elapsedSeconds = (now - lastCollection) / 1000d;
+        double cpuUsage = getTotalCpuUsage(elapsedSeconds);
 
         if (lastCollection == 0L) {
             usage.setMemory(getMemUsage());
             usage.setBandwidthIn(new ResourceUsage(0d, totalNicLimit));
             usage.setBandwidthOut(new ResourceUsage(0d, totalNicLimit));
-            usage.setCpu(new ResourceUsage(0d, totalCpuLimit));
         } else {
-            double elapsedSeconds = (now - lastCollection) / 1000d;
             double nicUsageTx = (totalNicUsageTx - lastTotalNicUsageTx) / elapsedSeconds;
             double nicUsageRx = (totalNicUsageRx - lastTotalNicUsageRx) / elapsedSeconds;
 
-            if (cpuStat != null && lastCpuStat != null) {
-                // we need two non null stats to get a usage report
-                long cpuTimeDiff = cpuStat.getTotalTime() - lastCpuStat.getTotalTime();
-                long cpuUsageDiff = cpuStat.getUsage() - lastCpuStat.getUsage();
-                double cpuUsage = ((double) cpuUsageDiff / (double) cpuTimeDiff) * totalCpuLimit;
-                usage.setCpu(new ResourceUsage(cpuUsage, totalCpuLimit));
-            }
-
             usage.setMemory(getMemUsage());
             usage.setBandwidthIn(new ResourceUsage(nicUsageRx, totalNicLimit));
             usage.setBandwidthOut(new ResourceUsage(nicUsageTx, totalNicLimit));
@@ -117,15 +125,37 @@ public class LinuxBrokerHostUsageImpl implements BrokerHostUsage {
 
         lastTotalNicUsageTx = totalNicUsageTx;
         lastTotalNicUsageRx = totalNicUsageRx;
-        lastCpuStat = cpuStat;
         lastCollection = System.currentTimeMillis();
         this.usage = usage;
+        usage.setCpu(new ResourceUsage(cpuUsage, totalCpuLimit));
     }
 
     private double getTotalCpuLimit() {
+        if (isCGroupsEnabled) {
+            try {
+                long quota = readLongFromFile(CGROUPS_CPU_LIMIT_QUOTA_PATH);
+                long period = readLongFromFile(CGROUPS_CPU_LIMIT_PERIOD_PATH);
+                if (quota > 0) {
+                    return 100.0 * quota / period;
+                }
+            } catch (IOException e) {
+                log.warn("Failed to read CPU quotas from cgroups", e);
+                // Fallback to availableProcessors
+            }
+        }
+
+        // Fallback to JVM reported CPU quota
         return 100 * Runtime.getRuntime().availableProcessors();
     }
 
+    private double getTotalCpuUsage(double elapsedTimeSeconds) {
+        if (isCGroupsEnabled) {
+            return getTotalCpuUsageForCGroup(elapsedTimeSeconds);
+        } else {
+            return getTotalCpuUsageForEntireHost();
+        }
+    }
+
     /**
      * Reads first line of /proc/stat to get total cpu usage.
      *
@@ -137,18 +167,36 @@ public class LinuxBrokerHostUsageImpl implements BrokerHostUsage {
      * Line is split in "words", filtering the first. The sum of all numbers give the amount of cpu cycles used this
      * far. Real CPU usage should equal the sum substracting the idle cycles, this would include iowait, irq and steal.
      */
-    private CpuStat getTotalCpuUsage() {
+    private double getTotalCpuUsageForEntireHost() {
         try (Stream<String> stream = Files.lines(Paths.get("/proc/stat"))) {
             String[] words = stream.findFirst().get().split("\\s+");
 
             long total = Arrays.stream(words).filter(s -> !s.contains("cpu")).mapToLong(Long::parseLong).sum();
-
             long idle = Long.parseLong(words[4]);
+            long usage = total - idle;
+
+            double currentUsage = (usage - lastCpuUsage)  / (total - lastCpuTotalTime) * getTotalCpuLimit();
+
+            lastCpuUsage = usage;
+            lastCpuTotalTime = total;
 
-            return new CpuStat(total, total - idle);
+            return currentUsage;
         } catch (IOException e) {
-            LOG.error("Failed to read CPU usage from /proc/stat", e);
-            return null;
+            log.error("Failed to read CPU usage from /proc/stat", e);
+            return -1;
+        }
+    }
+
+    private double getTotalCpuUsageForCGroup(double elapsedTimeSeconds) {
+        try {
+            long usage = readLongFromFile(CGROUPS_CPU_USAGE_PATH);
+            double currentUsage = usage - lastCpuUsage;
+            lastCpuUsage = usage;
+
+            return 100 * currentUsage / elapsedTimeSeconds / TimeUnit.SECONDS.toNanos(1);
+        } catch (IOException e) {
+            log.error("Failed to read CPU usage from {}", CGROUPS_CPU_USAGE_PATH, e);
+            return -1;
         }
     }
 
@@ -163,7 +211,7 @@ public class LinuxBrokerHostUsageImpl implements BrokerHostUsage {
             return stream.filter(this::isPhysicalNic).map(path -> path.getFileName().toString())
                     .collect(Collectors.toList());
         } catch (IOException e) {
-            LOG.error("Failed to find NICs", e);
+            log.error("Failed to find NICs", e);
             return Collections.emptyList();
         }
     }
@@ -194,7 +242,7 @@ public class LinuxBrokerHostUsageImpl implements BrokerHostUsage {
                     try {
                         return Double.parseDouble(new String(Files.readAllBytes(getNicSpeedPath(s))));
                     } catch (IOException e) {
-                        LOG.error("Failed to read speed for nic " + s, e);
+                        log.error("Failed to read speed for nic " + s, e);
                         return 0d;
                     }
                 }).sum() * 1024);
@@ -213,7 +261,7 @@ public class LinuxBrokerHostUsageImpl implements BrokerHostUsage {
             try {
                 return Double.parseDouble(new String(Files.readAllBytes(getNicRxPath(s))));
             } catch (IOException e) {
-                LOG.error("Failed to read rx_bytes for NIC " + s, e);
+                log.error("Failed to read rx_bytes for NIC " + s, e);
                 return 0d;
             }
         }).sum() * 8 / 1024;
@@ -224,27 +272,13 @@ public class LinuxBrokerHostUsageImpl implements BrokerHostUsage {
             try {
                 return Double.parseDouble(new String(Files.readAllBytes(getNicTxPath(s))));
             } catch (IOException e) {
-                LOG.error("Failed to read tx_bytes for NIC " + s, e);
+                log.error("Failed to read tx_bytes for NIC " + s, e);
                 return 0d;
             }
         }).sum() * 8 / 1024;
     }
 
-    private class CpuStat {
-        private long totalTime;
-        private long usage;
-
-        CpuStat(long totalTime, long usage) {
-            this.totalTime = totalTime;
-            this.usage = usage;
-        }
-
-        long getTotalTime() {
-            return totalTime;
-        }
-
-        long getUsage() {
-            return usage;
-        }
+    private static long readLongFromFile(String path) throws IOException {
+        return Long.parseLong(new String(Files.readAllBytes(Paths.get(path)), Charsets.UTF_8).trim());
     }
 }


[pulsar] 08/10: Get last entry is trying to read entry -1 (#7495)

Posted by rx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 9eee486de035f27de7b38670affc90698d7087fa
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Fri Jul 10 10:41:30 2020 -0700

    Get last entry is trying to read entry -1 (#7495)
    
    
    (cherry picked from commit d8fd8eaaa2e2dcb93ac69d80981365048b76c0b2)
---
 .../src/main/java/org/apache/pulsar/broker/service/ServerCnx.java        | 1 +
 1 file changed, 1 insertion(+)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 3131a76..0b7499d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1480,6 +1480,7 @@ public class ServerCnx extends PulsarHandler {
                     .setPartition(partitionIndex).build();
 
             ctx.writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, messageId));
+            return;
         }
 
         // For a valid position, we read the entry out and parse the batch size from its metadata.


[pulsar] 03/10: fix update partitions error for non-persistent topic (#7459)

Posted by rx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 22647cd0cc65aed9c697a4dbb41334efa9030f2a
Author: Aloys <lo...@gmail.com>
AuthorDate: Tue Jul 7 00:42:32 2020 +0800

    fix update partitions error for non-persistent topic (#7459)
    
    
    (cherry picked from commit f8beb7876974cb5c69f74d740926d00fa0cc5a2e)
---
 .../src/main/java/org/apache/pulsar/broker/admin/AdminResource.java    | 3 +--
 .../test/java/org/apache/pulsar/broker/admin/AdminResourceTest.java    | 2 ++
 2 files changed, 3 insertions(+), 2 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 1d765f5..ecf926c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -358,10 +358,9 @@ public abstract class AdminResource extends PulsarWebResource {
     }
 
     protected void validatePartitionedTopicMetadata(String tenant, String namespace, String encodedTopic) {
-        String completeTopicName = tenant + "/" + namespace + "/" +  Codec.decode(encodedTopic);
         try {
             PartitionedTopicMetadata partitionedTopicMetadata =
-                    pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(TopicName.get(completeTopicName)).get();
+                    pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName).get();
             if (partitionedTopicMetadata.partitions < 1) {
                 throw new RestException(Status.CONFLICT, "Topic is not partitioned topic");
             }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminResourceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminResourceTest.java
index 757b775..509374d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminResourceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminResourceTest.java
@@ -98,8 +98,10 @@ public class AdminResourceTest extends BrokerTestBase {
         AdminResource resource = mockResource();
         resource.setPulsar(pulsar);
         // validate should pass when topic is partitioned topic
+        resource.validatePartitionedTopicName(tenant, namespace, Codec.encode(partitionedTopic));
         resource.validatePartitionedTopicMetadata(tenant, namespace, Codec.encode(partitionedTopic));
         // validate should failed when topic is non-partitioned topic
+        resource.validatePartitionedTopicName(tenant, namespace, Codec.encode(nonPartitionedTopic));
         try {
             resource.validatePartitionedTopicMetadata(tenant, namespace, Codec.encode(nonPartitionedTopic));
             fail("Should fail validation on non-partitioned topic");


[pulsar] 07/10: Fix ArrayIndexOutOfBoundsException in batch index ack. (#7483)

Posted by rx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 76c0939c09b8baa25924b52eb2078a6aa1112728
Author: lipenghui <pe...@apache.org>
AuthorDate: Thu Jul 9 12:58:50 2020 +0800

    Fix ArrayIndexOutOfBoundsException in batch index ack. (#7483)
    
    
    (cherry picked from commit beb9e3be60513bdfbd0e412a68747b97714af1d7)
---
 .../org/apache/pulsar/broker/service/EntryBatchIndexesAcks.java  | 9 +++++++--
 .../persistent/PersistentDispatcherMultipleConsumers.java        | 2 +-
 .../persistent/PersistentDispatcherSingleActiveConsumer.java     | 2 +-
 .../PersistentStickyKeyDispatcherMultipleConsumers.java          | 2 +-
 4 files changed, 10 insertions(+), 5 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryBatchIndexesAcks.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryBatchIndexesAcks.java
index e41a2909..175f5ac 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryBatchIndexesAcks.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryBatchIndexesAcks.java
@@ -53,8 +53,13 @@ public class EntryBatchIndexesAcks {
         handle.recycle(this);
     }
 
-    public static EntryBatchIndexesAcks get() {
-        return RECYCLER.get();
+    public static EntryBatchIndexesAcks get(int entriesListSize) {
+        EntryBatchIndexesAcks ebi = RECYCLER.get();
+
+        if (ebi.indexesAcks.length < entriesListSize) {
+            ebi.indexesAcks = new Pair[entriesListSize];
+        }
+        return ebi;
     }
 
     private EntryBatchIndexesAcks(Recycler.Handle<EntryBatchIndexesAcks> handle) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index b8255ba..c24e762 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -525,7 +525,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
                 List<Entry> entriesForThisConsumer = entries.subList(start, start + messagesForC);
 
                 EntryBatchSizes batchSizes = EntryBatchSizes.get(entriesForThisConsumer.size());
-                EntryBatchIndexesAcks batchIndexesAcks = EntryBatchIndexesAcks.get();
+                EntryBatchIndexesAcks batchIndexesAcks = EntryBatchIndexesAcks.get(entriesForThisConsumer.size());
                 filterEntriesForConsumer(entriesForThisConsumer, batchSizes, sendMessageInfo, batchIndexesAcks, cursor);
 
                 c.sendMessages(entriesForThisConsumer, batchSizes, batchIndexesAcks, sendMessageInfo.getTotalMessages(),
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 5bb7629..58a63bd 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -237,7 +237,7 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp
         } else {
             EntryBatchSizes batchSizes = EntryBatchSizes.get(entries.size());
             SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
-            EntryBatchIndexesAcks batchIndexesAcks = EntryBatchIndexesAcks.get();
+            EntryBatchIndexesAcks batchIndexesAcks = EntryBatchIndexesAcks.get(entries.size());
             filterEntriesForConsumer(entries, batchSizes, sendMessageInfo, batchIndexesAcks, cursor);
 
             int totalMessages = sendMessageInfo.getTotalMessages();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 420552c..35d645a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -188,7 +188,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
 
                 SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
                 EntryBatchSizes batchSizes = EntryBatchSizes.get(messagesForC);
-                EntryBatchIndexesAcks batchIndexesAcks = EntryBatchIndexesAcks.get();
+                EntryBatchIndexesAcks batchIndexesAcks = EntryBatchIndexesAcks.get(messagesForC);
                 filterEntriesForConsumer(entriesWithSameKey, batchSizes, sendMessageInfo, batchIndexesAcks, cursor);
 
                 consumer.sendMessages(entriesWithSameKey, batchSizes, batchIndexesAcks, sendMessageInfo.getTotalMessages(),


[pulsar] 02/10: add pulsar-client-messagecrypto-bc into pulsar-client module (#7447)

Posted by rx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit e0de3992468d9fa83c3abfd1ab40b90524fe4624
Author: Jia Zhai <zh...@apache.org>
AuthorDate: Sun Jul 5 09:39:07 2020 -0500

    add pulsar-client-messagecrypto-bc into pulsar-client module (#7447)
    
    Fixes #6834
    
    ### Motivation
    
    `MessageCryptoBc` implements the interface of `MessageCrypto`. while `pulsar-client` did a shade to`ByteBuf`, and caused the method parameter changed from `org.apache.pulsar.shade.io.netty.buffer.ByteBuf` to `io.netty.buffer.ByteBuf` and will cause error of:
    ```
    Caused by: java.lang.NoSuchMethodError: 'org.apache.pulsar.shade.io.netty.buffer.ByteBuf org.apache.pulsar.client.api.MessageCrypto.encrypt(java.util.Set, org.apache.pulsar.client.api.CryptoKeyReader, java.util.function.Supplier, org.apache.pulsar.shade.io.netty.buffer.ByteBuf)
    ```
    
    ### Modifications
    
    add pulsar-client-messagecrypto-bc into pulsar-client dependency to avoid method-not-found issue.
    
    (cherry picked from commit d83133ae562e09bb597c53eb1748a1ce5256adf4)
---
 pulsar-client-shaded/pom.xml | 6 ++++++
 1 file changed, 6 insertions(+)

diff --git a/pulsar-client-shaded/pom.xml b/pulsar-client-shaded/pom.xml
index ed41bfa..3e5b3f3 100644
--- a/pulsar-client-shaded/pom.xml
+++ b/pulsar-client-shaded/pom.xml
@@ -38,6 +38,12 @@
       <artifactId>pulsar-client-original</artifactId>
       <version>${project.parent.version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.pulsar</groupId>
+      <artifactId>pulsar-client-messagecrypto-bc</artifactId>
+      <version>${project.parent.version}</version>
+      <optional>true</optional>
+    </dependency>
   </dependencies>
 
   <build>


[pulsar] 01/10: Consumer is registered on dispatcher even if hash range conflicts on Key_Shared subscription (#7444)

Posted by rx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit fc1a8ee4c829f663091b3d0daf91b14661539cf5
Author: Masahiro Sakamoto <ma...@yahoo-corp.jp>
AuthorDate: Tue Jul 7 01:25:07 2020 +0900

    Consumer is registered on dispatcher even if hash range conflicts on Key_Shared subscription (#7444)
    
    
    (cherry picked from commit 97ee82ed2dc337f81d7059c5d8980191d16dbfe3)
---
 .../broker/service/AbstractDispatcherSingleActiveConsumer.java |  4 ++--
 .../NonPersistentStickyKeyDispatcherMultipleConsumers.java     | 10 ++++++++--
 .../PersistentStickyKeyDispatcherMultipleConsumers.java        | 10 ++++++++--
 .../apache/pulsar/client/api/KeySharedSubscriptionTest.java    | 10 +++++++++-
 4 files changed, 27 insertions(+), 7 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
index 6c5f8a7..9948dcc 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
@@ -155,8 +155,6 @@ public abstract class AbstractDispatcherSingleActiveConsumer extends AbstractBas
             throw new ConsumerBusyException("Subscription reached max consumers limit");
         }
 
-        consumers.add(consumer);
-
         if (subscriptionType == SubType.Exclusive
                 && consumer.getKeySharedMeta() != null
                 && consumer.getKeySharedMeta().getHashRangesList() != null
@@ -168,6 +166,8 @@ public abstract class AbstractDispatcherSingleActiveConsumer extends AbstractBas
             isKeyHashRangeFiltered = false;
         }
 
+        consumers.add(consumer);
+
         if (!pickAndScheduleActiveConsumer()) {
             // the active consumer is not changed
             Consumer currentActiveConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
index 32cce87..37b29da 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
@@ -47,7 +47,13 @@ public class NonPersistentStickyKeyDispatcherMultipleConsumers extends NonPersis
     @Override
     public synchronized void addConsumer(Consumer consumer) throws BrokerServiceException {
         super.addConsumer(consumer);
-        selector.addConsumer(consumer);
+        try {
+            selector.addConsumer(consumer);
+        } catch (BrokerServiceException e) {
+            consumerSet.removeAll(consumer);
+            consumerList.remove(consumer);
+            throw e;
+        }
     }
 
     @Override
@@ -99,4 +105,4 @@ public class NonPersistentStickyKeyDispatcherMultipleConsumers extends NonPersis
             TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this, -sendMessageInfo.getTotalMessages());
         }
     }
-}
\ No newline at end of file
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index a4a532f..420552c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -93,7 +93,13 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
     @Override
     public synchronized void addConsumer(Consumer consumer) throws BrokerServiceException {
         super.addConsumer(consumer);
-        selector.addConsumer(consumer);
+        try {
+            selector.addConsumer(consumer);
+        } catch (BrokerServiceException e) {
+            consumerSet.removeAll(consumer);
+            consumerList.remove(consumer);
+            throw e;
+        }
 
         // If this was the 1st consumer, or if all the messages are already acked, then we
         // don't need to do anything special
@@ -294,4 +300,4 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
 
     private static final Logger log = LoggerFactory.getLogger(PersistentStickyKeyDispatcherMultipleConsumers.class);
 
-}
\ No newline at end of file
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
index 610c4d1..02073ea 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
@@ -658,7 +658,7 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
     @Test
     public void testHashRangeConflict() throws PulsarClientException {
         this.conf.setSubscriptionKeySharedEnable(true);
-        final String topic = "testHashRangeConflict-" + UUID.randomUUID().toString();
+        final String topic = "persistent://public/default/testHashRangeConflict-" + UUID.randomUUID().toString();
         final String sub = "test";
 
         Consumer<String> consumer1 = createFixedHashRangesConsumer(topic, sub, Range.of(0,99), Range.of(400, 65535));
@@ -667,6 +667,10 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
         Consumer<String> consumer2 = createFixedHashRangesConsumer(topic, sub, Range.of(100,399));
         Assert.assertTrue(consumer2.isConnected());
 
+        PersistentStickyKeyDispatcherMultipleConsumers dispatcher = (PersistentStickyKeyDispatcherMultipleConsumers) pulsar
+                .getBrokerService().getTopicReference(topic).get().getSubscription(sub).getDispatcher();
+        Assert.assertEquals(dispatcher.getConsumers().size(), 2);
+
         try {
             createFixedHashRangesConsumer(topic, sub, Range.of(0, 65535));
             Assert.fail("Should failed with conflict range.");
@@ -679,7 +683,9 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
         } catch (PulsarClientException.ConsumerAssignException ignore) {
         }
 
+        Assert.assertEquals(dispatcher.getConsumers().size(), 2);
         consumer1.close();
+        Assert.assertEquals(dispatcher.getConsumers().size(), 1);
 
         try {
             createFixedHashRangesConsumer(topic, sub, Range.of(0, 65535));
@@ -705,9 +711,11 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
         Consumer<String> consumer4 = createFixedHashRangesConsumer(topic, sub, Range.of(50,99));
         Assert.assertTrue(consumer4.isConnected());
 
+        Assert.assertEquals(dispatcher.getConsumers().size(), 3);
         consumer2.close();
         consumer3.close();
         consumer4.close();
+        Assert.assertFalse(dispatcher.isConsumerConnected());
     }
 
     @Test


[pulsar] 05/10: Cpp oauth2 auth client (#7467)

Posted by rx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 7905aca413f3f16d3e4ffece6a51af4162b23f56
Author: Jia Zhai <zh...@apache.org>
AuthorDate: Wed Jul 8 21:25:19 2020 -0500

    Cpp oauth2 auth client (#7467)
    
    ### Motivation
    
    #7420 provides an Oauth2 auth client for java. This PR tries to support it in cpp client
    
    ### Modifications
    
    - add implementation
    - add related tests.
    
    (cherry picked from commit 2d0ccebab1dce4c657821fb78a08f4a0bfca2454)
---
 pulsar-client-cpp/include/pulsar/Authentication.h  | 107 +++++++-
 .../include/pulsar/ClientConfiguration.h           |   2 +-
 pulsar-client-cpp/lib/Authentication.cc            |   7 +
 pulsar-client-cpp/lib/ClientConfiguration.cc       |   2 +-
 pulsar-client-cpp/lib/auth/AuthAthenz.cc           |   2 +-
 pulsar-client-cpp/lib/auth/AuthOauth2.cc           | 271 +++++++++++++++++++++
 pulsar-client-cpp/lib/auth/AuthOauth2.h            |  74 ++++++
 pulsar-client-cpp/lib/auth/AuthTls.cc              |   2 +-
 pulsar-client-cpp/lib/auth/AuthToken.cc            |   2 +-
 pulsar-client-cpp/tests/AuthPluginTest.cc          |  45 ++++
 10 files changed, 505 insertions(+), 9 deletions(-)

diff --git a/pulsar-client-cpp/include/pulsar/Authentication.h b/pulsar-client-cpp/include/pulsar/Authentication.h
index efa8d9f..57a3e70 100644
--- a/pulsar-client-cpp/include/pulsar/Authentication.h
+++ b/pulsar-client-cpp/include/pulsar/Authentication.h
@@ -56,7 +56,7 @@ class PULSAR_PUBLIC Authentication {
    public:
     virtual ~Authentication();
     virtual const std::string getAuthMethodName() const = 0;
-    virtual Result getAuthData(AuthenticationDataPtr& authDataContent) const {
+    virtual Result getAuthData(AuthenticationDataPtr& authDataContent) {
         authDataContent = authData_;
         return ResultOk;
     }
@@ -107,7 +107,7 @@ class PULSAR_PUBLIC AuthTls : public Authentication {
     static AuthenticationPtr create(const std::string& authParamsString);
     static AuthenticationPtr create(const std::string& certificatePath, const std::string& privateKeyPath);
     const std::string getAuthMethodName() const;
-    Result getAuthData(AuthenticationDataPtr& authDataTls) const;
+    Result getAuthData(AuthenticationDataPtr& authDataTls);
 
    private:
     AuthenticationDataPtr authDataTls_;
@@ -144,7 +144,7 @@ class PULSAR_PUBLIC AuthToken : public Authentication {
     static AuthenticationPtr create(const TokenSupplier& tokenSupplier);
 
     const std::string getAuthMethodName() const;
-    Result getAuthData(AuthenticationDataPtr& authDataToken) const;
+    Result getAuthData(AuthenticationDataPtr& authDataToken);
 
    private:
     AuthenticationDataPtr authDataToken_;
@@ -160,12 +160,111 @@ class PULSAR_PUBLIC AuthAthenz : public Authentication {
     static AuthenticationPtr create(ParamMap& params);
     static AuthenticationPtr create(const std::string& authParamsString);
     const std::string getAuthMethodName() const;
-    Result getAuthData(AuthenticationDataPtr& authDataAthenz) const;
+    Result getAuthData(AuthenticationDataPtr& authDataAthenz);
 
    private:
     AuthenticationDataPtr authDataAthenz_;
 };
 
+// OAuth 2.0 token and associated information.
+// currently mainly works for access token
+class Oauth2TokenResult {
+   public:
+    enum
+    {
+        undefined_expiration = -1
+    };
+
+    Oauth2TokenResult();
+    ~Oauth2TokenResult();
+
+    Oauth2TokenResult& setAccessToken(const std::string& accessToken);
+    Oauth2TokenResult& setIdToken(const std::string& idToken);
+    Oauth2TokenResult& setRefreshToken(const std::string& refreshToken);
+    Oauth2TokenResult& setExpiresIn(const int64_t expiresIn);
+
+    const std::string& getAccessToken() const;
+    const std::string& getIdToken() const;
+    const std::string& getRefreshToken() const;
+    int64_t getExpiresIn() const;
+
+   private:
+    // map to json "access_token"
+    std::string accessToken_;
+    // map to json "id_token"
+    std::string idToken_;
+    // map to json "refresh_token"
+    std::string refreshToken_;
+    // map to json "expires_in"
+    int64_t expiresIn_;
+};
+
+typedef std::shared_ptr<Oauth2TokenResult> Oauth2TokenResultPtr;
+
+class Oauth2Flow {
+   public:
+    virtual ~Oauth2Flow();
+
+    /**
+     * Initializes the authorization flow.
+     */
+    virtual void initialize() = 0;
+
+    /**
+     * Acquires an access token from the OAuth 2.0 authorization server.
+     * @return a token result including an access token.
+     */
+    virtual Oauth2TokenResultPtr authenticate() = 0;
+
+    /**
+     * Closes the authorization flow.
+     */
+    virtual void close() = 0;
+
+   protected:
+    Oauth2Flow();
+};
+
+typedef std::shared_ptr<Oauth2Flow> FlowPtr;
+
+class CachedToken {
+   public:
+    ~CachedToken();
+    virtual bool isExpired() = 0;
+    virtual AuthenticationDataPtr getAuthData() = 0;
+
+   protected:
+    CachedToken();
+};
+
+typedef std::shared_ptr<CachedToken> CachedTokenPtr;
+
+/**
+ * Oauth2 based implementation of Pulsar client authentication.
+ * Passed in parameter would be like:
+ * ```
+ *   "type": "client_credentials",
+ *   "issuer_url": "https://accounts.google.com",
+ *   "client_id": "d9ZyX97q1ef8Cr81WHVC4hFQ64vSlDK3",
+ *   "client_secret": "on1uJ...k6F6R",
+ *   "audience": "https://broker.example.com"
+ *  ```
+ */
+class PULSAR_PUBLIC AuthOauth2 : public Authentication {
+   public:
+    AuthOauth2(ParamMap& params);
+    ~AuthOauth2();
+
+    static AuthenticationPtr create(ParamMap& params);
+    static AuthenticationPtr create(const std::string& authParamsString);
+    const std::string getAuthMethodName() const;
+    Result getAuthData(AuthenticationDataPtr& authDataOauth2);
+
+   private:
+    FlowPtr flowPtr_;
+    CachedTokenPtr cachedTokenPtr_;
+};
+
 }  // namespace pulsar
 
 #endif /* PULSAR_AUTHENTICATION_H_ */
diff --git a/pulsar-client-cpp/include/pulsar/ClientConfiguration.h b/pulsar-client-cpp/include/pulsar/ClientConfiguration.h
index e65aecc..9bb63d4 100644
--- a/pulsar-client-cpp/include/pulsar/ClientConfiguration.h
+++ b/pulsar-client-cpp/include/pulsar/ClientConfiguration.h
@@ -43,7 +43,7 @@ class PULSAR_PUBLIC ClientConfiguration {
     /**
      * @return the authentication data
      */
-    const Authentication& getAuth() const;
+    Authentication& getAuth() const;
 
     /**
      * Set timeout on client operations (subscribe, create producer, close, unsubscribe)
diff --git a/pulsar-client-cpp/lib/Authentication.cc b/pulsar-client-cpp/lib/Authentication.cc
index f552b53..105d1c3 100644
--- a/pulsar-client-cpp/lib/Authentication.cc
+++ b/pulsar-client-cpp/lib/Authentication.cc
@@ -22,6 +22,7 @@
 #include "auth/AuthTls.h"
 #include "auth/AuthAthenz.h"
 #include "auth/AuthToken.h"
+#include "auth/AuthOauth2.h"
 #include <lib/LogUtils.h>
 
 #include <string>
@@ -125,6 +126,9 @@ AuthenticationPtr tryCreateBuiltinAuth(const std::string& pluginName, ParamMap&
     } else if (boost::iequals(pluginName, ATHENZ_PLUGIN_NAME) ||
                boost::iequals(pluginName, ATHENZ_JAVA_PLUGIN_NAME)) {
         return AuthAthenz::create(paramMap);
+    } else if (boost::iequals(pluginName, OAUTH2_TOKEN_PLUGIN_NAME) ||
+               boost::iequals(pluginName, OAUTH2_TOKEN_JAVA_PLUGIN_NAME)) {
+        return AuthOauth2::create(paramMap);
     } else {
         return AuthenticationPtr();
     }
@@ -139,6 +143,9 @@ AuthenticationPtr tryCreateBuiltinAuth(const std::string& pluginName, const std:
     } else if (boost::iequals(pluginName, ATHENZ_PLUGIN_NAME) ||
                boost::iequals(pluginName, ATHENZ_JAVA_PLUGIN_NAME)) {
         return AuthAthenz::create(authParamsString);
+    } else if (boost::iequals(pluginName, OAUTH2_TOKEN_PLUGIN_NAME) ||
+               boost::iequals(pluginName, OAUTH2_TOKEN_JAVA_PLUGIN_NAME)) {
+        return AuthOauth2::create(authParamsString);
     } else {
         return AuthenticationPtr();
     }
diff --git a/pulsar-client-cpp/lib/ClientConfiguration.cc b/pulsar-client-cpp/lib/ClientConfiguration.cc
index 1733dde..ad210f6 100644
--- a/pulsar-client-cpp/lib/ClientConfiguration.cc
+++ b/pulsar-client-cpp/lib/ClientConfiguration.cc
@@ -36,7 +36,7 @@ ClientConfiguration& ClientConfiguration::setAuth(const AuthenticationPtr& authe
     return *this;
 }
 
-const Authentication& ClientConfiguration::getAuth() const { return *impl_->authenticationPtr; }
+Authentication& ClientConfiguration::getAuth() const { return *impl_->authenticationPtr; }
 
 const AuthenticationPtr& ClientConfiguration::getAuthPtr() const { return impl_->authenticationPtr; }
 
diff --git a/pulsar-client-cpp/lib/auth/AuthAthenz.cc b/pulsar-client-cpp/lib/auth/AuthAthenz.cc
index 1e6c8fe..3141fb3 100644
--- a/pulsar-client-cpp/lib/auth/AuthAthenz.cc
+++ b/pulsar-client-cpp/lib/auth/AuthAthenz.cc
@@ -91,7 +91,7 @@ AuthenticationPtr AuthAthenz::create(ParamMap& params) {
 
 const std::string AuthAthenz::getAuthMethodName() const { return "athenz"; }
 
-Result AuthAthenz::getAuthData(AuthenticationDataPtr& authDataContent) const {
+Result AuthAthenz::getAuthData(AuthenticationDataPtr& authDataContent) {
     authDataContent = authDataAthenz_;
     return ResultOk;
 }
diff --git a/pulsar-client-cpp/lib/auth/AuthOauth2.cc b/pulsar-client-cpp/lib/auth/AuthOauth2.cc
new file mode 100644
index 0000000..3104c4d
--- /dev/null
+++ b/pulsar-client-cpp/lib/auth/AuthOauth2.cc
@@ -0,0 +1,271 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <lib/auth/AuthOauth2.h>
+
+#include <curl/curl.h>
+#include <sstream>
+#include <boost/property_tree/json_parser.hpp>
+#include <boost/property_tree/ptree.hpp>
+#include <boost/date_time/posix_time/posix_time.hpp>
+
+#include <lib/LogUtils.h>
+DECLARE_LOG_OBJECT()
+
+namespace pulsar {
+
+// AuthDataOauth2
+
+AuthDataOauth2::AuthDataOauth2(const std::string& accessToken) { accessToken_ = accessToken; }
+
+AuthDataOauth2::~AuthDataOauth2() {}
+
+bool AuthDataOauth2::hasDataForHttp() { return true; }
+
+std::string AuthDataOauth2::getHttpHeaders() { return "Authorization: Bearer " + accessToken_; }
+
+bool AuthDataOauth2::hasDataFromCommand() { return true; }
+
+std::string AuthDataOauth2::getCommandData() { return accessToken_; }
+
+// Oauth2TokenResult
+
+Oauth2TokenResult::Oauth2TokenResult() { expiresIn_ = undefined_expiration; }
+
+Oauth2TokenResult::~Oauth2TokenResult() {}
+
+Oauth2TokenResult& Oauth2TokenResult::setAccessToken(const std::string& accessToken) {
+    accessToken_ = accessToken;
+    return *this;
+}
+
+Oauth2TokenResult& Oauth2TokenResult::setIdToken(const std::string& idToken) {
+    idToken_ = idToken;
+    return *this;
+}
+
+Oauth2TokenResult& Oauth2TokenResult::setRefreshToken(const std::string& refreshToken) {
+    refreshToken_ = refreshToken;
+    return *this;
+}
+
+Oauth2TokenResult& Oauth2TokenResult::setExpiresIn(const int64_t expiresIn) {
+    expiresIn_ = expiresIn;
+    return *this;
+}
+
+const std::string& Oauth2TokenResult::getAccessToken() const { return accessToken_; }
+
+const std::string& Oauth2TokenResult::getIdToken() const { return idToken_; }
+
+const std::string& Oauth2TokenResult::getRefreshToken() const { return refreshToken_; }
+
+int64_t Oauth2TokenResult::getExpiresIn() const { return expiresIn_; }
+
+// CachedToken
+
+CachedToken::CachedToken() {}
+
+CachedToken::~CachedToken() {}
+
+// Oauth2CachedToken
+
+static int64_t currentTimeMillis() {
+    using namespace boost::posix_time;
+    using boost::posix_time::milliseconds;
+    using boost::posix_time::seconds;
+    static ptime time_t_epoch(boost::gregorian::date(1970, 1, 1));
+
+    time_duration diff = microsec_clock::universal_time() - time_t_epoch;
+    return diff.total_milliseconds();
+}
+
+Oauth2CachedToken::Oauth2CachedToken(Oauth2TokenResultPtr token) {
+    latest_ = token;
+
+    int64_t expiredIn = token->getExpiresIn();
+    if (expiredIn > 0) {
+        expiresAt_ = expiredIn + currentTimeMillis();
+    } else {
+        throw "ExpiresIn in Oauth2TokenResult invalid value: " + expiredIn;
+    }
+    authData_ = AuthenticationDataPtr(new AuthDataOauth2(token->getAccessToken()));
+}
+
+AuthenticationDataPtr Oauth2CachedToken::getAuthData() { return authData_; }
+
+Oauth2CachedToken::~Oauth2CachedToken() {}
+
+bool Oauth2CachedToken::isExpired() { return expiresAt_ < currentTimeMillis(); }
+
+// OauthFlow
+
+Oauth2Flow::Oauth2Flow() {}
+Oauth2Flow::~Oauth2Flow() {}
+
+// ClientCredentialFlow
+
+ClientCredentialFlow::ClientCredentialFlow(const std::string& issuerUrl, const std::string& clientId,
+                                           const std::string& clientSecret, const std::string& audience) {
+    issuerUrl_ = issuerUrl;
+    clientId_ = clientId;
+    clientSecret_ = clientSecret;
+    audience_ = audience;
+}
+
+void ClientCredentialFlow::initialize() {}
+void ClientCredentialFlow::close() {}
+
+static size_t curlWriteCallback(void* contents, size_t size, size_t nmemb, void* responseDataPtr) {
+    ((std::string*)responseDataPtr)->append((char*)contents, size * nmemb);
+    return size * nmemb;
+}
+
+Oauth2TokenResultPtr ClientCredentialFlow::authenticate() {
+    Oauth2TokenResultPtr resultPtr = Oauth2TokenResultPtr(new Oauth2TokenResult());
+
+    CURL* handle = curl_easy_init();
+    CURLcode res;
+    std::string responseData;
+
+    // set header: json, request type: post
+    struct curl_slist* list = NULL;
+    list = curl_slist_append(list, "Content-Type: application/json");
+    curl_easy_setopt(handle, CURLOPT_HTTPHEADER, list);
+    curl_easy_setopt(handle, CURLOPT_CUSTOMREQUEST, "POST");
+
+    // set URL: issuerUrl
+    curl_easy_setopt(handle, CURLOPT_URL, issuerUrl_.c_str());
+
+    // Write callback
+    curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, curlWriteCallback);
+    curl_easy_setopt(handle, CURLOPT_WRITEDATA, &responseData);
+
+    // New connection is made for each call
+    curl_easy_setopt(handle, CURLOPT_FRESH_CONNECT, 1L);
+    curl_easy_setopt(handle, CURLOPT_FORBID_REUSE, 1L);
+
+    curl_easy_setopt(handle, CURLOPT_FOLLOWLOCATION, 1L);
+    curl_easy_setopt(handle, CURLOPT_SSL_VERIFYPEER, 0L);
+    curl_easy_setopt(handle, CURLOPT_SSL_VERIFYHOST, 0L);
+
+    // fill in the request data
+    boost::property_tree::ptree pt;
+    pt.put("grant_type", "client_credentials");
+    pt.put("client_id", clientId_);
+    pt.put("client_secret", clientSecret_);
+    pt.put("audience", audience_);
+
+    std::stringstream ss;
+    boost::property_tree::json_parser::write_json(ss, pt);
+    std::string ssString = ss.str();
+
+    curl_easy_setopt(handle, CURLOPT_POSTFIELDS, ssString.c_str());
+
+    // Make get call to server
+    res = curl_easy_perform(handle);
+
+    LOG_DEBUG("issuerUrl_ " << issuerUrl_ << " clientid: " << clientId_ << " client_secret " << clientSecret_
+                            << " audience " << audience_ << " ssstring " << ssString);
+
+    switch (res) {
+        case CURLE_OK:
+            long response_code;
+            curl_easy_getinfo(handle, CURLINFO_RESPONSE_CODE, &response_code);
+            LOG_DEBUG("Response received for issuerurl " << issuerUrl_ << " code " << response_code);
+            if (response_code == 200) {
+                boost::property_tree::ptree root;
+                std::stringstream stream;
+                stream << responseData;
+                try {
+                    boost::property_tree::read_json(stream, root);
+                } catch (boost::property_tree::json_parser_error& e) {
+                    LOG_ERROR("Failed to parse json of Oauth2 response: "
+                              << e.what() << "\nInput Json = " << responseData << " passedin: " << ssString);
+                    break;
+                }
+
+                resultPtr->setAccessToken(root.get<std::string>("access_token"));
+                resultPtr->setExpiresIn(root.get<uint32_t>("expires_in"));
+
+                LOG_DEBUG("access_token: " << resultPtr->getAccessToken()
+                                           << " expires_in: " << resultPtr->getExpiresIn());
+            } else {
+                LOG_ERROR("Response failed for issuerurl " << issuerUrl_ << ". response Code "
+                                                           << response_code << " passedin: " << ssString);
+            }
+            break;
+        default:
+            LOG_ERROR("Response failed for issuerurl " << issuerUrl_ << ". Error Code " << res
+                                                       << " passedin: " << ssString);
+            break;
+    }
+    // Free header list
+    curl_slist_free_all(list);
+    curl_easy_cleanup(handle);
+
+    return resultPtr;
+}
+
+// AuthOauth2
+
+AuthOauth2::AuthOauth2(ParamMap& params) {
+    flowPtr_ = FlowPtr(new ClientCredentialFlow(params["issuer_url"], params["client_id"],
+                                                params["client_secret"], params["audience"]));
+}
+
+AuthOauth2::~AuthOauth2() {}
+
+ParamMap parseJsonAuthParamsString(const std::string& authParamsString) {
+    ParamMap params;
+    if (!authParamsString.empty()) {
+        boost::property_tree::ptree root;
+        std::stringstream stream;
+        stream << authParamsString;
+        try {
+            boost::property_tree::read_json(stream, root);
+            for (const auto& item : root) {
+                params[item.first] = item.second.get_value<std::string>();
+            }
+        } catch (boost::property_tree::json_parser_error& e) {
+            LOG_ERROR("Invalid String Error: " << e.what());
+        }
+    }
+    return params;
+}
+
+AuthenticationPtr AuthOauth2::create(const std::string& authParamsString) {
+    ParamMap params = parseJsonAuthParamsString(authParamsString);
+
+    return create(params);
+}
+
+AuthenticationPtr AuthOauth2::create(ParamMap& params) { return AuthenticationPtr(new AuthOauth2(params)); }
+
+const std::string AuthOauth2::getAuthMethodName() const { return "token"; }
+
+Result AuthOauth2::getAuthData(AuthenticationDataPtr& authDataContent) {
+    if (cachedTokenPtr_ == nullptr || cachedTokenPtr_->isExpired()) {
+        cachedTokenPtr_ = CachedTokenPtr(new Oauth2CachedToken(flowPtr_->authenticate()));
+    }
+
+    authDataContent = cachedTokenPtr_->getAuthData();
+    return ResultOk;
+}
+
+}  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/auth/AuthOauth2.h b/pulsar-client-cpp/lib/auth/AuthOauth2.h
new file mode 100644
index 0000000..0090976
--- /dev/null
+++ b/pulsar-client-cpp/lib/auth/AuthOauth2.h
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <pulsar/Authentication.h>
+#include <string>
+#include <boost/function.hpp>
+
+namespace pulsar {
+
+const std::string OAUTH2_TOKEN_PLUGIN_NAME = "oauth2token";
+const std::string OAUTH2_TOKEN_JAVA_PLUGIN_NAME =
+    "org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2";
+
+class ClientCredentialFlow : public Oauth2Flow {
+   public:
+    ClientCredentialFlow(const std::string& issuerUrl, const std::string& clientId,
+                         const std::string& clientSecret, const std::string& audience);
+    void initialize();
+    Oauth2TokenResultPtr authenticate();
+    void close();
+
+   private:
+    std::string issuerUrl_;
+    std::string clientId_;
+    std::string clientSecret_;
+    std::string audience_;
+};
+
+class Oauth2CachedToken : public CachedToken {
+   public:
+    Oauth2CachedToken(Oauth2TokenResultPtr token);
+    ~Oauth2CachedToken();
+    bool isExpired();
+    AuthenticationDataPtr getAuthData();
+
+   private:
+    int64_t expiresAt_;
+    Oauth2TokenResultPtr latest_;
+    AuthenticationDataPtr authData_;
+};
+
+class AuthDataOauth2 : public AuthenticationDataProvider {
+   public:
+    AuthDataOauth2(const std::string& accessToken);
+    ~AuthDataOauth2();
+
+    bool hasDataForHttp();
+    std::string getHttpHeaders();
+    bool hasDataFromCommand();
+    std::string getCommandData();
+
+   private:
+    std::string accessToken_;
+};
+
+}  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/auth/AuthTls.cc b/pulsar-client-cpp/lib/auth/AuthTls.cc
index fcf6571..fdf7f21 100644
--- a/pulsar-client-cpp/lib/auth/AuthTls.cc
+++ b/pulsar-client-cpp/lib/auth/AuthTls.cc
@@ -53,7 +53,7 @@ AuthenticationPtr AuthTls::create(const std::string& certificatePath, const std:
 
 const std::string AuthTls::getAuthMethodName() const { return "tls"; }
 
-Result AuthTls::getAuthData(AuthenticationDataPtr& authDataContent) const {
+Result AuthTls::getAuthData(AuthenticationDataPtr& authDataContent) {
     authDataContent = authDataTls_;
     return ResultOk;
 }
diff --git a/pulsar-client-cpp/lib/auth/AuthToken.cc b/pulsar-client-cpp/lib/auth/AuthToken.cc
index e377139..429f409 100644
--- a/pulsar-client-cpp/lib/auth/AuthToken.cc
+++ b/pulsar-client-cpp/lib/auth/AuthToken.cc
@@ -109,7 +109,7 @@ AuthenticationPtr AuthToken::create(const TokenSupplier &tokenSupplier) {
 
 const std::string AuthToken::getAuthMethodName() const { return "token"; }
 
-Result AuthToken::getAuthData(AuthenticationDataPtr &authDataContent) const {
+Result AuthToken::getAuthData(AuthenticationDataPtr &authDataContent) {
     authDataContent = authDataToken_;
     return ResultOk;
 }
diff --git a/pulsar-client-cpp/tests/AuthPluginTest.cc b/pulsar-client-cpp/tests/AuthPluginTest.cc
index a447a3a..183c880 100644
--- a/pulsar-client-cpp/tests/AuthPluginTest.cc
+++ b/pulsar-client-cpp/tests/AuthPluginTest.cc
@@ -336,3 +336,48 @@ TEST(AuthPluginTest, testAuthFactoryAthenz) {
         }
     }
 }
+
+TEST(AuthPluginTest, testOauth2) {
+    // test success get token from oauth2 server.
+    pulsar::AuthenticationDataPtr data;
+    std::string params = R"({
+        "type": "client_credentials",
+        "issuer_url": "https://dev-kt-aa9ne.us.auth0.com/oauth/token",
+        "client_id": "Xd23RHsUnvUlP7wchjNYOaIfazgeHd9x",
+        "client_secret": "rT7ps7WY8uhdVuBTKWZkttwLdQotmdEliaM5rLfmgNibvqziZ-g07ZH52N_poGAb",
+        "audience": "https://dev-kt-aa9ne.us.auth0.com/api/v2/"})";
+
+    int expectedTokenLength = 3379;
+    LOG_INFO("PARAMS: " << params);
+    pulsar::AuthenticationPtr auth = pulsar::AuthOauth2::create(params);
+    ASSERT_EQ(auth->getAuthMethodName(), "token");
+    ASSERT_EQ(auth->getAuthData(data), pulsar::ResultOk);
+    ASSERT_EQ(data->hasDataForHttp(), true);
+    ASSERT_EQ(data->hasDataFromCommand(), true);
+    ASSERT_EQ(data->getCommandData().length(), expectedTokenLength);
+}
+
+TEST(AuthPluginTest, testOauth2WrongSecret) {
+    try {
+        pulsar::AuthenticationDataPtr data;
+
+        std::string params = R"({
+        "type": "client_credentials",
+        "issuer_url": "https://dev-kt-aa9ne.us.auth0.com/oauth/token",
+        "client_id": "Xd23RHsUnvUlP7wchjNYOaIfazgeHd9x",
+        "client_secret": "rT7ps7WY8uhdVuBTKWZkttwLdQotmdEliaM5rLfmgNibvqziZ",
+        "audience": "https://dev-kt-aa9ne.us.auth0.com/api/v2/"})";
+
+        int expectedTokenLength = 3379;
+        LOG_INFO("PARAMS: " << params);
+        pulsar::AuthenticationPtr auth = pulsar::AuthOauth2::create(params);
+        ASSERT_EQ(auth->getAuthMethodName(), "token");
+
+        auth->getAuthData(data);
+
+        FAIL() << "Expected fail for wrong secret when to get token from server";
+
+    } catch (...) {
+        // expected
+    }
+}