You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/09/09 08:16:33 UTC

[pulsar] branch branch-2.8 updated (91bdc48 -> 0a57bae)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a change to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


    from 91bdc48  Fixed ZKSessionTest.testReacquireLocksAfterSessionLost (#11886)
     new 5a369be  Forget to update memory usage on producer close (#11906)
     new 9ce29f6  [testclient] deprecate option --subscriber-name and substitute --subscriptions first element for it (#11828)
     new 2cd8d47  [Transaction] add method to clear up transaction buffer snapshot (#11934)
     new 1703aff  [Issue 11936] forget to call SendCallback on producer close (#11939)
     new 44d85e8  [C++] Handle error when shutting down client after forks (#11954)
     new b1ff9bc  [Python] Expose Client.shutdown() method (#11955)
     new d02653a  Avoid to infinitely split bundle (#11937)
     new b7c2c19  [pulsar-client]Add a optional params scope for pulsar oauth2 client (#11931)
     new 0a57bae  Print position info when can't find next valid position. (#11969)

The 9 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |   2 +-
 .../loadbalance/impl/BundleSplitterTask.java       |   8 +-
 .../loadbalance/impl/ModularLoadManagerImpl.java   |   2 +-
 .../broker/service/persistent/PersistentTopic.java |   5 +-
 .../pulsar/broker/systopic/SystemTopicClient.java  |  19 ++++
 .../TransactionBufferSystemTopicClient.java        |  16 +++
 .../transaction/buffer/TransactionBuffer.java      |   7 ++
 .../buffer/impl/InMemTransactionBuffer.java        |   5 +
 .../buffer/impl/TopicTransactionBuffer.java        |   9 ++
 .../buffer/impl/TransactionBufferDisable.java      |   5 +
 .../loadbalance/impl/BundleSplitterTaskTest.java   | 105 +++++++++++++++++++
 .../TopicTransactionBufferRecoverTest.java         |  72 ++++++++++++-
 .../pulsar/client/api/BrokerServiceLookupTest.java |  15 ++-
 ...MemoryLimitTest.java => ProducerCloseTest.java} |  37 ++++---
 .../client/impl/ProducerMemoryLimitTest.java       |  20 +++-
 pulsar-client-cpp/lib/ExecutorService.cc           |  20 ++--
 pulsar-client-cpp/python/pulsar/__init__.py        |   9 ++
 pulsar-client-cpp/python/pulsar_test.py            |  13 +++
 .../apache/pulsar/client/impl/ProducerImpl.java    |  29 +++---
 .../auth/oauth2/AuthenticationFactoryOAuth2.java   |  18 ++++
 .../impl/auth/oauth2/ClientCredentialsFlow.java    |   9 +-
 .../protocol/ClientCredentialsExchangeRequest.java |   3 +
 .../impl/auth/oauth2/protocol/TokenClient.java     |  54 +++++++---
 .../impl/auth/oauth2/protocol/TokenClientTest.java | 116 +++++++++++++++++++++
 .../pulsar/testclient/PerformanceConsumer.java     |   7 +-
 site2/docs/reference-cli-tools.md                  |   2 +-
 26 files changed, 534 insertions(+), 73 deletions(-)
 create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTaskTest.java
 copy pulsar-broker/src/test/java/org/apache/pulsar/client/impl/{ProducerMemoryLimitTest.java => ProducerCloseTest.java} (62%)
 create mode 100644 pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClientTest.java

[pulsar] 08/09: [pulsar-client]Add a optional params scope for pulsar oauth2 client (#11931)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit b7c2c19859fd945725706585c5f227ff19392ca0
Author: Guangning E <gu...@apache.org>
AuthorDate: Thu Sep 9 08:55:45 2021 +0800

    [pulsar-client]Add a optional params scope for pulsar oauth2 client (#11931)
    
    ### Motivation
    
    In some scenarios (e.g. azure cloud), when the client exchanges tokens with the server, an optional scope parameter is required, this pr fixes this issue, to ensure compatibility, when the user does not fill in this parameter, all behavior is the same as before.
    
    ### Modifications
    
    * Add an optional parameter scope when exchanges token
    
    ### Verifying this change
    
    - [x] Make sure that the change passes the CI checks.
    
    (cherry picked from commit ac5114f8944784972b831438f8c7e0cbd57db4e5)
---
 .../auth/oauth2/AuthenticationFactoryOAuth2.java   |  18 ++++
 .../impl/auth/oauth2/ClientCredentialsFlow.java    |   9 +-
 .../protocol/ClientCredentialsExchangeRequest.java |   3 +
 .../impl/auth/oauth2/protocol/TokenClient.java     |  54 +++++++---
 .../impl/auth/oauth2/protocol/TokenClientTest.java | 116 +++++++++++++++++++++
 5 files changed, 182 insertions(+), 18 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java
index 54da5287d..707fcaf 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java
@@ -37,10 +37,28 @@ public final class AuthenticationFactoryOAuth2 {
      * @return an Authentication object
      */
     public static Authentication clientCredentials(URL issuerUrl, URL credentialsUrl, String audience) {
+        return clientCredentials(issuerUrl, credentialsUrl, audience, null);
+    }
+
+    /**
+     * Authenticate with client credentials.
+     *
+     * @param issuerUrl the issuer URL
+     * @param credentialsUrl the credentials URL
+     * @param audience the audience identifier
+     * @param scope An optional field. The value of the scope parameter is expressed as a list of space-delimited,
+     *              case-sensitive strings.  The strings are defined by the authorization server.
+     *              If the value contains multiple space-delimited strings, their order does not matter,
+     *              and each string adds an additional access range to the requested scope.
+     *              From here: https://datatracker.ietf.org/doc/html/rfc6749#section-4.4.2
+     * @return an Authentication object
+     */
+    public static Authentication clientCredentials(URL issuerUrl, URL credentialsUrl, String audience, String scope) {
         ClientCredentialsFlow flow = ClientCredentialsFlow.builder()
                 .issuerUrl(issuerUrl)
                 .privateKey(credentialsUrl.toExternalForm())
                 .audience(audience)
+                .scope(scope)
                 .build();
         return new AuthenticationOAuth2(flow, Clock.systemDefaultZone());
     }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java
index 8d82cc2..b011e85 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java
@@ -46,21 +46,24 @@ class ClientCredentialsFlow extends FlowBase {
     public static final String CONFIG_PARAM_ISSUER_URL = "issuerUrl";
     public static final String CONFIG_PARAM_AUDIENCE = "audience";
     public static final String CONFIG_PARAM_KEY_FILE = "privateKey";
+    public static final String CONFIG_PARAM_SCOPE = "scope";
 
     private static final long serialVersionUID = 1L;
 
     private final String audience;
     private final String privateKey;
+    private final String scope;
 
     private transient ClientCredentialsExchanger exchanger;
 
     private boolean initialized = false;
 
     @Builder
-    public ClientCredentialsFlow(URL issuerUrl, String audience, String privateKey) {
+    public ClientCredentialsFlow(URL issuerUrl, String audience, String privateKey, String scope) {
         super(issuerUrl);
         this.audience = audience;
         this.privateKey = privateKey;
+        this.scope = scope;
     }
 
     @Override
@@ -87,6 +90,7 @@ class ClientCredentialsFlow extends FlowBase {
                 .clientId(keyFile.getClientId())
                 .clientSecret(keyFile.getClientSecret())
                 .audience(this.audience)
+                .scope(this.scope)
                 .build();
         TokenResult tr;
         if (!initialized) {
@@ -116,10 +120,13 @@ class ClientCredentialsFlow extends FlowBase {
         URL issuerUrl = parseParameterUrl(params, CONFIG_PARAM_ISSUER_URL);
         String audience = parseParameterString(params, CONFIG_PARAM_AUDIENCE);
         String privateKeyUrl = parseParameterString(params, CONFIG_PARAM_KEY_FILE);
+        // This is an optional parameter
+        String scope = params.get(CONFIG_PARAM_SCOPE);
         return ClientCredentialsFlow.builder()
                 .issuerUrl(issuerUrl)
                 .audience(audience)
                 .privateKey(privateKeyUrl)
+                .scope(scope)
                 .build();
     }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/ClientCredentialsExchangeRequest.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/ClientCredentialsExchangeRequest.java
index 7c14296..2d37bb5 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/ClientCredentialsExchangeRequest.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/ClientCredentialsExchangeRequest.java
@@ -39,4 +39,7 @@ public class ClientCredentialsExchangeRequest {
 
     @JsonProperty("audience")
     private String audience;
+
+    @JsonProperty("scope")
+    private String scope;
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java
index 9151fc3..f8667e8 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java
@@ -27,6 +27,7 @@ import java.util.TreeMap;
 import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.PulsarVersion;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.asynchttpclient.AsyncHttpClient;
@@ -47,15 +48,22 @@ public class TokenClient implements ClientCredentialsExchanger {
     private final AsyncHttpClient httpClient;
 
     public TokenClient(URL tokenUrl) {
-        this.tokenUrl = tokenUrl;
+        this(tokenUrl, null);
+    }
 
-        DefaultAsyncHttpClientConfig.Builder confBuilder = new DefaultAsyncHttpClientConfig.Builder();
-        confBuilder.setFollowRedirect(true);
-        confBuilder.setConnectTimeout(DEFAULT_CONNECT_TIMEOUT_IN_SECONDS * 1000);
-        confBuilder.setReadTimeout(DEFAULT_READ_TIMEOUT_IN_SECONDS * 1000);
-        confBuilder.setUserAgent(String.format("Pulsar-Java-v%s", PulsarVersion.getVersion()));
-        AsyncHttpClientConfig config = confBuilder.build();
-        httpClient = new DefaultAsyncHttpClient(config);
+    TokenClient(URL tokenUrl, AsyncHttpClient httpClient) {
+        if (httpClient == null) {
+            DefaultAsyncHttpClientConfig.Builder confBuilder = new DefaultAsyncHttpClientConfig.Builder();
+            confBuilder.setFollowRedirect(true);
+            confBuilder.setConnectTimeout(DEFAULT_CONNECT_TIMEOUT_IN_SECONDS * 1000);
+            confBuilder.setReadTimeout(DEFAULT_READ_TIMEOUT_IN_SECONDS * 1000);
+            confBuilder.setUserAgent(String.format("Pulsar-Java-v%s", PulsarVersion.getVersion()));
+            AsyncHttpClientConfig config = confBuilder.build();
+            this.httpClient = new DefaultAsyncHttpClient(config);
+        } else {
+            this.httpClient = httpClient;
+        }
+        this.tokenUrl = tokenUrl;
     }
 
     @Override
@@ -64,6 +72,23 @@ public class TokenClient implements ClientCredentialsExchanger {
     }
 
     /**
+     * Constructing http request parameters.
+     * @param bodyMap List of parameters to be requested.
+     * @return Generate the final request body from a map.
+     */
+    String buildClientCredentialsBody(Map<String, String> bodyMap) {
+        return bodyMap.entrySet().stream()
+                .map(e -> {
+                    try {
+                        return URLEncoder.encode(e.getKey(), "UTF-8") + '=' + URLEncoder.encode(e.getValue(), "UTF-8");
+                    } catch (UnsupportedEncodingException e1) {
+                        throw new RuntimeException(e1);
+                    }
+                })
+                .collect(Collectors.joining("&"));
+    }
+
+    /**
      * Performs a token exchange using client credentials.
      * @param req the client credentials request details.
      * @return a token result
@@ -76,15 +101,10 @@ public class TokenClient implements ClientCredentialsExchanger {
         bodyMap.put("client_id", req.getClientId());
         bodyMap.put("client_secret", req.getClientSecret());
         bodyMap.put("audience", req.getAudience());
-        String body = bodyMap.entrySet().stream()
-                .map(e -> {
-                    try {
-                        return URLEncoder.encode(e.getKey(), "UTF-8") + '=' + URLEncoder.encode(e.getValue(), "UTF-8");
-                    } catch (UnsupportedEncodingException e1) {
-                        throw new RuntimeException(e1);
-                    }
-                })
-                .collect(Collectors.joining("&"));
+        if (!StringUtils.isBlank(req.getScope())) {
+            bodyMap.put("scope", req.getScope());
+        }
+        String body = buildClientCredentialsBody(bodyMap);
 
         try {
 
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClientTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClientTest.java
new file mode 100644
index 0000000..1617359
--- /dev/null
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClientTest.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.auth.oauth2.protocol;
+
+import com.google.gson.Gson;
+import org.asynchttpclient.BoundRequestBuilder;
+import org.asynchttpclient.DefaultAsyncHttpClient;
+import org.asynchttpclient.ListenableFuture;
+import org.asynchttpclient.Response;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.URL;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutionException;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Token client exchange token mock test.
+ */
+public class TokenClientTest {
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void exchangeClientCredentialsSuccessByScopeTest() throws
+            IOException, TokenExchangeException, ExecutionException, InterruptedException {
+        DefaultAsyncHttpClient defaultAsyncHttpClient = mock(DefaultAsyncHttpClient.class);
+        URL url = new URL("http://localhost");
+        TokenClient tokenClient = new TokenClient(url, defaultAsyncHttpClient);
+        Map<String, String> bodyMap = new TreeMap<>();
+        ClientCredentialsExchangeRequest request = ClientCredentialsExchangeRequest.builder()
+                .audience("test-audience")
+                .clientId("test-client-id")
+                .clientSecret("test-client-secret")
+                .scope("test-scope")
+                .build();
+        bodyMap.put("grant_type", "client_credentials");
+        bodyMap.put("client_id", request.getClientId());
+        bodyMap.put("client_secret", request.getClientSecret());
+        bodyMap.put("audience", request.getAudience());
+        bodyMap.put("scope", request.getScope());
+        String body = tokenClient.buildClientCredentialsBody(bodyMap);
+        BoundRequestBuilder boundRequestBuilder = mock(BoundRequestBuilder.class);
+        Response response = mock(Response.class);
+        ListenableFuture<Response> listenableFuture = mock(ListenableFuture.class);
+        when(defaultAsyncHttpClient.preparePost(url.toString())).thenReturn(boundRequestBuilder);
+        when(boundRequestBuilder.setHeader("Accept", "application/json")).thenReturn(boundRequestBuilder);
+        when(boundRequestBuilder.setHeader("Content-Type", "application/x-www-form-urlencoded")).thenReturn(boundRequestBuilder);
+        when(boundRequestBuilder.setBody(body)).thenReturn(boundRequestBuilder);
+        when(boundRequestBuilder.execute()).thenReturn(listenableFuture);
+        when(listenableFuture.get()).thenReturn(response);
+        when(response.getStatusCode()).thenReturn(200);
+        TokenResult tokenResult = new TokenResult();
+        tokenResult.setAccessToken("test-access-token");
+        tokenResult.setIdToken("test-id");
+        when(response.getResponseBodyAsBytes()).thenReturn(new Gson().toJson(tokenResult).getBytes());
+        TokenResult tr = tokenClient.exchangeClientCredentials(request);
+        Assert.assertNotNull(tr);
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void exchangeClientCredentialsSuccessByNoScopeTest() throws
+            IOException, TokenExchangeException, ExecutionException, InterruptedException {
+        DefaultAsyncHttpClient defaultAsyncHttpClient = mock(DefaultAsyncHttpClient.class);
+        URL url = new URL("http://localhost");
+        TokenClient tokenClient = new TokenClient(url, defaultAsyncHttpClient);
+        Map<String, String> bodyMap = new TreeMap<>();
+        ClientCredentialsExchangeRequest request = ClientCredentialsExchangeRequest.builder()
+                .audience("test-audience")
+                .clientId("test-client-id")
+                .clientSecret("test-client-secret")
+                .build();
+        bodyMap.put("grant_type", "client_credentials");
+        bodyMap.put("client_id", request.getClientId());
+        bodyMap.put("client_secret", request.getClientSecret());
+        bodyMap.put("audience", request.getAudience());
+        String body = tokenClient.buildClientCredentialsBody(bodyMap);
+        BoundRequestBuilder boundRequestBuilder = mock(BoundRequestBuilder.class);
+        Response response = mock(Response.class);
+        ListenableFuture<Response> listenableFuture = mock(ListenableFuture.class);
+        when(defaultAsyncHttpClient.preparePost(url.toString())).thenReturn(boundRequestBuilder);
+        when(boundRequestBuilder.setHeader("Accept", "application/json")).thenReturn(boundRequestBuilder);
+        when(boundRequestBuilder.setHeader("Content-Type", "application/x-www-form-urlencoded")).thenReturn(boundRequestBuilder);
+        when(boundRequestBuilder.setBody(body)).thenReturn(boundRequestBuilder);
+        when(boundRequestBuilder.execute()).thenReturn(listenableFuture);
+        when(listenableFuture.get()).thenReturn(response);
+        when(response.getStatusCode()).thenReturn(200);
+        TokenResult tokenResult = new TokenResult();
+        tokenResult.setAccessToken("test-access-token");
+        tokenResult.setIdToken("test-id");
+        when(response.getResponseBodyAsBytes()).thenReturn(new Gson().toJson(tokenResult).getBytes());
+        TokenResult tr = tokenClient.exchangeClientCredentials(request);
+        Assert.assertNotNull(tr);
+    }
+}

[pulsar] 02/09: [testclient] deprecate option --subscriber-name and substitute --subscriptions first element for it (#11828)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 9ce29f69c0e2367764d6179571d96659acfca375
Author: Ruguo Yu <ji...@163.com>
AuthorDate: Tue Sep 7 15:55:17 2021 +0800

    [testclient] deprecate option --subscriber-name and substitute --subscriptions first element for it (#11828)
    
    
    (cherry picked from commit 981cb626e59267e38c3b29faa02c14f0848a530f)
---
 .../java/org/apache/pulsar/testclient/PerformanceConsumer.java     | 7 +++++--
 site2/docs/reference-cli-tools.md                                  | 2 +-
 2 files changed, 6 insertions(+), 3 deletions(-)

diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
index c755cc0..4045a27 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
@@ -91,8 +91,8 @@ public class PerformanceConsumer {
         @Parameter(names = { "-ns", "--num-subscriptions" }, description = "Number of subscriptions (per topic)")
         public int numSubscriptions = 1;
 
-        @Parameter(names = { "-s", "--subscriber-name" }, description = "Subscriber name prefix")
-        public String subscriberName = "sub";
+        @Parameter(names = { "-s", "--subscriber-name" }, description = "Subscriber name prefix", hidden = true)
+        public String subscriberName;
 
         @Parameter(names = { "-ss", "--subscriptions" }, description = "A list of subscriptions to consume on (e.g. sub1,sub2)")
         public List<String> subscriptions = Collections.singletonList("sub");
@@ -228,6 +228,9 @@ public class PerformanceConsumer {
                 arguments.subscriptions.size() != arguments.numConsumers) {
             // keep compatibility with the previous version
             if (arguments.subscriptions.size() == 1) {
+                if (arguments.subscriberName == null) {
+                    arguments.subscriberName = arguments.subscriptions.get(0);
+                }
                 List<String> defaultSubscriptions = Lists.newArrayList();
                 for (int i = 0; i < arguments.numSubscriptions; i++) {
                     defaultSubscriptions.add(String.format("%s-%d", arguments.subscriberName, i));
diff --git a/site2/docs/reference-cli-tools.md b/site2/docs/reference-cli-tools.md
index cb35fe7..04b323a 100644
--- a/site2/docs/reference-cli-tools.md
+++ b/site2/docs/reference-cli-tools.md
@@ -443,7 +443,7 @@ Options
 |`-q`, `--receiver-queue-size`|Size of the receiver queue|1000|
 |`-u`, `--service-url`|Pulsar service URL||
 |`-i`, `--stats-interval-seconds`|Statistics interval seconds. If 0, statistics will be disabled|0|
-|`-s`, `--subscriber-name`|Subscriber name prefix|sub|
+|`-s`, `--subscriber-name`|Subscriber name prefix||
 |`-ss`, `--subscriptions`|A list of subscriptions to consume on (e.g. sub1,sub2)|sub|
 |`-st`, `--subscription-type`|Subscriber type. Possible values are Exclusive, Shared, Failover, Key_Shared.|Exclusive|
 |`-sp`, `--subscription-position`|Subscriber position. Possible values are Latest, Earliest.|Latest|

[pulsar] 07/09: Avoid to infinitely split bundle (#11937)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit d02653abd5541a44887c806abf586715f56cb33d
Author: Shoothzj <sh...@gmail.com>
AuthorDate: Wed Sep 8 23:28:13 2021 +0800

    Avoid to infinitely split bundle (#11937)
    
    
    (cherry picked from commit bef3757029eb3e48a490a02871689ab3e6abdfa5)
---
 .../loadbalance/impl/BundleSplitterTask.java       |   8 +-
 .../loadbalance/impl/ModularLoadManagerImpl.java   |   2 +-
 .../loadbalance/impl/BundleSplitterTaskTest.java   | 105 +++++++++++++++++++++
 .../pulsar/client/api/BrokerServiceLookupTest.java |  15 ++-
 4 files changed, 122 insertions(+), 8 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTask.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTask.java
index bb1f990..e81fb50 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTask.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTask.java
@@ -42,10 +42,8 @@ public class BundleSplitterTask implements BundleSplitStrategy {
     /**
      * Construct a BundleSplitterTask.
      *
-     * @param pulsar
-     *            Service to construct from.
      */
-    public BundleSplitterTask(final PulsarService pulsar) {
+    public BundleSplitterTask() {
         bundleCache = new HashSet<>();
     }
 
@@ -74,6 +72,10 @@ public class BundleSplitterTask implements BundleSplitStrategy {
             for (final Map.Entry<String, NamespaceBundleStats> entry : localData.getLastStats().entrySet()) {
                 final String bundle = entry.getKey();
                 final NamespaceBundleStats stats = entry.getValue();
+                if (stats.topics == 1) {
+                    log.info("namespace bundle {} only have 1 topic", bundle);
+                    continue;
+                }
                 double totalMessageRate = 0;
                 double totalMessageThroughput = 0;
                 // Attempt to consider long-term message data, otherwise effectively ignore.
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index 5b7867e..5035b00 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -248,7 +248,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager, Consumer<Noti
             brokerHostUsage = new GenericBrokerHostUsageImpl(pulsar);
         }
 
-        bundleSplitStrategy = new BundleSplitterTask(pulsar);
+        bundleSplitStrategy = new BundleSplitterTask();
 
         conf = pulsar.getConfiguration();
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTaskTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTaskTest.java
new file mode 100644
index 0000000..7480989
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTaskTest.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.impl;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.BrokerData;
+import org.apache.pulsar.broker.BundleData;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.TimeAverageMessageData;
+import org.apache.pulsar.broker.loadbalance.LoadData;
+import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
+import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
+import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * @author hezhangjian
+ */
+@Slf4j
+@Test(groups = "broker")
+public class BundleSplitterTaskTest {
+
+    private LocalBookkeeperEnsemble bkEnsemble;
+
+    private PulsarService pulsar;
+
+    @BeforeMethod
+    void setup() throws Exception {
+        // Start local bookkeeper ensemble
+        bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0);
+        bkEnsemble.start();
+        // Start broker
+        ServiceConfiguration config = new ServiceConfiguration();
+        config.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
+        config.setClusterName("use");
+        config.setWebServicePort(Optional.of(0));
+        config.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+
+        config.setAdvertisedAddress("localhost");
+        config.setBrokerShutdownTimeoutMs(0L);
+        config.setBrokerServicePort(Optional.of(0));
+        config.setBrokerServicePortTls(Optional.of(0));
+        config.setWebServicePortTls(Optional.of(0));
+        pulsar = new PulsarService(config);
+        pulsar.start();
+    }
+
+    @Test
+    public void testSplitTaskWhenTopicJustOne() {
+        final BundleSplitterTask bundleSplitterTask = new BundleSplitterTask();
+        LoadData loadData = new LoadData();
+
+        LocalBrokerData brokerData = new LocalBrokerData();
+        Map<String, NamespaceBundleStats> lastStats = new HashMap<>();
+        final NamespaceBundleStats namespaceBundleStats = new NamespaceBundleStats();
+        namespaceBundleStats.topics = 1;
+        lastStats.put("ten/ns/0x00000000_0x80000000", namespaceBundleStats);
+        brokerData.setLastStats(lastStats);
+        loadData.getBrokerData().put("broker", new BrokerData(brokerData));
+
+        BundleData bundleData = new BundleData();
+        TimeAverageMessageData averageMessageData = new TimeAverageMessageData();
+        averageMessageData.setMsgRateIn(pulsar.getConfiguration().getLoadBalancerNamespaceBundleMaxMsgRate());
+        averageMessageData.setMsgRateOut(1);
+        bundleData.setLongTermData(averageMessageData);
+        loadData.getBundleData().put("ten/ns/0x00000000_0x80000000", bundleData);
+
+        final Set<String> bundlesToSplit = bundleSplitterTask.findBundlesToSplit(loadData, pulsar);
+        Assert.assertEquals(bundlesToSplit.size(), 0);
+    }
+
+
+    @AfterMethod(alwaysRun = true)
+    void shutdown() throws Exception {
+        log.info("--- Shutting down ---");
+        pulsar.close();
+        bkEnsemble.stop();
+    }
+
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
index cb349bf..36ff448 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
@@ -1019,6 +1019,13 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase {
             Consumer<byte[]> consumer1 = pulsarClient2.newConsumer().topic(topic1)
                     .subscriptionName("my-subscriber-name").subscribe();
 
+            // there should be more than one topic to trigger split
+            final String topic2 = "persistent://" + namespace + "/topic2";
+            @Cleanup
+            Consumer<byte[]> consumer2 = pulsarClient2.newConsumer().topic(topic2)
+                    .subscriptionName("my-subscriber-name")
+                    .subscribe();
+
             // (4) Broker-1 will own topic-1
             final String unsplitBundle = namespace + "/0x00000000_0xffffffff";
 
@@ -1054,7 +1061,7 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase {
             updateAllMethod.invoke(loadManager);
             conf2.setLoadBalancerAutoBundleSplitEnabled(true);
             conf2.setLoadBalancerAutoUnloadSplitBundlesEnabled(true);
-            conf2.setLoadBalancerNamespaceBundleMaxTopics(0);
+            conf2.setLoadBalancerNamespaceBundleMaxTopics(1);
             loadManager.checkNamespaceBundleSplit();
 
             // (6) Broker-2 should get the watch and update bundle cache
@@ -1063,15 +1070,15 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase {
             });
 
             // (7) Make lookup request again to Broker-2 which should succeed.
-            final String topic2 = "persistent://" + namespace + "/topic2";
+            final String topic3 = "persistent://" + namespace + "/topic3";
             @Cleanup
-            Consumer<byte[]> consumer2 = pulsarClient2.newConsumer().topic(topic2)
+            Consumer<byte[]> consumer3 = pulsarClient2.newConsumer().topic(topic3)
                     .subscriptionName("my-subscriber-name")
                     .subscribe();
 
             Awaitility.await().untilAsserted(() -> {
                 NamespaceBundle bundleInBroker1AfterSplit = pulsar2.getNamespaceService()
-                        .getBundle(TopicName.get(topic2));
+                        .getBundle(TopicName.get(topic3));
                 assertNotEquals(bundleInBroker1AfterSplit.toString(), unsplitBundle);
             });
         } finally {

[pulsar] 01/09: Forget to update memory usage on producer close (#11906)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 5a369be528b296daf3afc6362d32c2a5dbaa4675
Author: Shoothzj <sh...@gmail.com>
AuthorDate: Tue Sep 7 00:14:38 2021 +0800

    Forget to update memory usage on producer close (#11906)
    
    
    (cherry picked from commit ad9efae1abf9675f830052ab1d4697330b23a750)
---
 .../pulsar/client/impl/ProducerMemoryLimitTest.java  | 20 +++++++++++++++++++-
 .../org/apache/pulsar/client/impl/ProducerImpl.java  |  2 ++
 2 files changed, 21 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java
index b6ec6a5..264ec30 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java
@@ -59,7 +59,7 @@ public class ProducerMemoryLimitTest extends ProducerConsumerBase {
                 .create();
         this.stopBroker();
         try {
-            producer.send("memroy-test".getBytes(StandardCharsets.UTF_8));
+            producer.send("memory-test".getBytes(StandardCharsets.UTF_8));
             throw new IllegalStateException("can not reach here");
         } catch (PulsarClientException.TimeoutException ex) {
             PulsarClientImpl clientImpl = (PulsarClientImpl) this.pulsarClient;
@@ -69,6 +69,24 @@ public class ProducerMemoryLimitTest extends ProducerConsumerBase {
 
     }
 
+    @Test(timeOut = 10_000)
+    public void testProducerCloseMemoryRelease() throws Exception {
+        initClientWithMemoryLimit();
+        @Cleanup
+        ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) pulsarClient.newProducer()
+                .topic("testProducerMemoryLimit")
+                .sendTimeout(5, TimeUnit.SECONDS)
+                .maxPendingMessages(0)
+                .enableBatching(false)
+                .create();
+        this.stopBroker();
+        producer.sendAsync("memory-test".getBytes(StandardCharsets.UTF_8));
+        producer.close();
+        PulsarClientImpl clientImpl = (PulsarClientImpl) this.pulsarClient;
+        final MemoryLimitController memoryLimitController = clientImpl.getMemoryLimitController();
+        Assert.assertEquals(memoryLimitController.currentUsage(), 0);
+    }
+
     private void initClientWithMemoryLimit() throws PulsarClientException {
         pulsarClient = PulsarClient.builder().
                 serviceUrl(lookupUrl.toString())
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 564682c..e531345 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -875,6 +875,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
                     format("The producer %s of the topic %s was already closed when closing the producers",
                         producerName, topic));
                 pendingMessages.forEach(msg -> {
+                    client.getMemoryLimitController().releaseMemory(msg.uncompressedSize);
                     msg.sendComplete(ex);
                     msg.cmd.release();
                     msg.recycle();
@@ -898,6 +899,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
                     log.info("[{}] [{}] Closed Producer", topic, producerName);
                     setState(State.Closed);
                     pendingMessages.forEach(msg -> {
+                        client.getMemoryLimitController().releaseMemory(msg.uncompressedSize);
                         msg.cmd.release();
                         msg.recycle();
                     });

[pulsar] 04/09: [Issue 11936] forget to call SendCallback on producer close (#11939)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 1703affb54cc2443dcb1bc185c5b2af7859710af
Author: Shoothzj <sh...@gmail.com>
AuthorDate: Tue Sep 7 23:41:06 2021 +0800

    [Issue 11936] forget to call SendCallback on producer close (#11939)
    
    * forget to call SendCallback on producer close
    
    * add unit tests
    
    * add unit tests
    
    (cherry picked from commit d494c43cc5cb249a7d139a0ee1a600103805bb85)
---
 .../pulsar/client/impl/ProducerCloseTest.java      | 82 ++++++++++++++++++++++
 .../apache/pulsar/client/impl/ProducerImpl.java    | 31 ++++----
 2 files changed, 97 insertions(+), 16 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerCloseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerCloseTest.java
new file mode 100644
index 0000000..0c4df15
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerCloseTest.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.common.api.proto.CommandSuccess;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+@Test(groups = "broker-impl")
+public class ProducerCloseTest extends ProducerConsumerBase {
+
+    @Override
+    @BeforeMethod
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+    }
+
+    @Override
+    @AfterMethod(alwaysRun = true)
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test(timeOut = 10_000)
+    public void testProducerCloseCallback() throws Exception {
+        initClient();
+        @Cleanup
+        ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) pulsarClient.newProducer()
+                .topic("testProducerClose")
+                .sendTimeout(5, TimeUnit.SECONDS)
+                .maxPendingMessages(0)
+                .enableBatching(false)
+                .create();
+        final TypedMessageBuilder<byte[]> messageBuilder = producer.newMessage();
+        final TypedMessageBuilder<byte[]> value = messageBuilder.value("test-msg".getBytes(StandardCharsets.UTF_8));
+        producer.getClientCnx().channel().config().setAutoRead(false);
+        final CompletableFuture<MessageId> completableFuture = value.sendAsync();
+        producer.closeAsync();
+        final CommandSuccess commandSuccess = new CommandSuccess();
+        PulsarClientImpl clientImpl = (PulsarClientImpl) this.pulsarClient;
+        commandSuccess.setRequestId(clientImpl.newRequestId() -1);
+        producer.getClientCnx().handleSuccess(commandSuccess);
+        Thread.sleep(3000);
+        Assert.assertEquals(completableFuture.isDone(), true);
+    }
+
+    private void initClient() throws PulsarClientException {
+        pulsarClient = PulsarClient.builder().
+                serviceUrl(lookupUrl.toString())
+                .build();
+    }
+
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index e531345..84062fb 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -871,16 +871,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
             synchronized (this) {
                 setState(State.Closed);
                 client.cleanupProducer(this);
-                PulsarClientException ex = new PulsarClientException.AlreadyClosedException(
-                    format("The producer %s of the topic %s was already closed when closing the producers",
-                        producerName, topic));
-                pendingMessages.forEach(msg -> {
-                    client.getMemoryLimitController().releaseMemory(msg.uncompressedSize);
-                    msg.sendComplete(ex);
-                    msg.cmd.release();
-                    msg.recycle();
-                });
-                pendingMessages.clear();
+                clearPendingMessagesWhenClose();
             }
 
             return CompletableFuture.completedFuture(null);
@@ -898,12 +889,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
                 synchronized (ProducerImpl.this) {
                     log.info("[{}] [{}] Closed Producer", topic, producerName);
                     setState(State.Closed);
-                    pendingMessages.forEach(msg -> {
-                        client.getMemoryLimitController().releaseMemory(msg.uncompressedSize);
-                        msg.cmd.release();
-                        msg.recycle();
-                    });
-                    pendingMessages.clear();
+                    clearPendingMessagesWhenClose();
                 }
 
                 closeFuture.complete(null);
@@ -918,6 +904,19 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
         return closeFuture;
     }
 
+    private void clearPendingMessagesWhenClose() {
+        PulsarClientException ex = new PulsarClientException.AlreadyClosedException(
+                format("The producer %s of the topic %s was already closed when closing the producers",
+                        producerName, topic));
+        pendingMessages.forEach(msg -> {
+            client.getMemoryLimitController().releaseMemory(msg.uncompressedSize);
+            msg.sendComplete(ex);
+            msg.cmd.release();
+            msg.recycle();
+        });
+        pendingMessages.clear();
+    }
+
     @Override
     public boolean isConnected() {
         return connectionHandler.cnx() != null && (getState() == State.Ready);

[pulsar] 09/09: Print position info when can't find next valid position. (#11969)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 0a57bae44fa0f94d9f67ded1878de6defb7a2b89
Author: GuoJiwei <te...@apache.org>
AuthorDate: Thu Sep 9 09:01:07 2021 +0800

    Print position info when can't find next valid position. (#11969)
    
    ## Motivation
    When the user reset the cursor and does not find the next valid one, the original log does'nt print position info.
    ```
    12:13:36.676 [pulsar-web-67-14] INFO  org.apache.pulsar.broker.service.persistent.PersistentSubscription - [persistent://public/default/test][sub] Successfully disconnected consumers from subscription, proceeding with cursor reset
    12:13:36.676 [bookkeeper-ml-workers-OrderedExecutor-2-0] ERROR org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [public/default/persistent/test] Can't find next valid position, fail back to the next position of the last position.
    java.lang.NullPointerException: null
    	at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.getNextValidPositionInternal(ManagedLedgerImpl.java:3031) ~[io.streamnative-managed-ledger-2.7.2.10.jar:2.7.2.10]
    	at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.getNextValidPosition(ManagedLedgerImpl.java:3018) ~[io.streamnative-managed-ledger-2.7.2.10.jar:2.7.2.10]
    	at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.lambda$asyncResetCursor$6(ManagedCursorImpl.java:1086) ~[io.streamnative-managed-ledger-2.7.2.10.jar:2.7.2.10]
    	at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32) [io.streamnative-managed-ledger-2.7.2.10.jar:2.7.2.10]
    	at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) [org.apache.bookkeeper-bookkeeper-common-4.12.0.jar:4.12.0]
    	at org.apache.bookkeeper.common.util.OrderedExecutor$TimedRunnable.run(OrderedExecutor.java:203) [org.apache.bookkeeper-bookkeeper-common-4.12.0.jar:4.12.0]
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_281]
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_281]
    	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.60.Final.jar:4.1.60.Final]
    	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_281]
    ```
    
    So it's better to add position info to the log.
    
    (cherry picked from commit 3cd5b9ed3b59dd153b04ce4f6e61da377900f9be)
---
 .../main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 7404d08..5892194 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -3281,7 +3281,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
         } catch (NullPointerException e) {
             next = lastConfirmedEntry.getNext();
             if (log.isDebugEnabled()) {
-                log.debug("[{}] Can't find next valid position, fall back to the next position of the last position.", name, e);
+                log.debug("[{}] Can't find next valid position : {}, fall back to the next position of the last position : {}.", position, name, next, e);
             }
         }
         return next;

[pulsar] 03/09: [Transaction] add method to clear up transaction buffer snapshot (#11934)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 2cd8d47b60645339f643d7ba53741ef9cd324645
Author: ran <ga...@126.com>
AuthorDate: Tue Sep 7 14:11:05 2021 +0800

    [Transaction] add method to clear up transaction buffer snapshot (#11934)
    
    
    (cherry picked from commit d86db3f4ec4fb6bd04216a123cde2fee5c43f9d9)
---
 .../broker/service/persistent/PersistentTopic.java |  5 +-
 .../pulsar/broker/systopic/SystemTopicClient.java  | 19 ++++++
 .../TransactionBufferSystemTopicClient.java        | 16 +++++
 .../transaction/buffer/TransactionBuffer.java      |  7 +++
 .../buffer/impl/InMemTransactionBuffer.java        |  5 ++
 .../buffer/impl/TopicTransactionBuffer.java        |  9 +++
 .../buffer/impl/TransactionBufferDisable.java      |  5 ++
 .../TopicTransactionBufferRecoverTest.java         | 72 +++++++++++++++++++++-
 8 files changed, 136 insertions(+), 2 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index a4db0bc..e165916 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -47,6 +47,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.LongAdder;
 import java.util.function.BiFunction;
 import java.util.stream.Collectors;
+import lombok.Getter;
 import org.apache.bookkeeper.client.api.LedgerMetadata;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
@@ -218,6 +219,7 @@ public class PersistentTopic extends AbstractTopic
 
     // this future is for publish txn message in order.
     private volatile CompletableFuture<Void> transactionCompletableFuture;
+    @Getter
     protected final TransactionBuffer transactionBuffer;
 
     private final LongAdder bytesOutFromRemovedSubscriptions = new LongAdder();
@@ -1108,7 +1110,8 @@ public class PersistentTopic extends AbstractTopic
                     CompletableFuture<SchemaVersion> deleteSchemaFuture =
                             deleteSchema ? deleteSchema() : CompletableFuture.completedFuture(null);
 
-                    deleteSchemaFuture.thenAccept(__ -> deleteTopicPolicies()).whenComplete((v, ex) -> {
+                    deleteSchemaFuture.thenAccept(__ -> deleteTopicPolicies())
+                            .thenCompose(__ -> transactionBuffer.clearSnapshot()).whenComplete((v, ex) -> {
                         if (ex != null) {
                             log.error("[{}] Error deleting topic", topic, ex);
                             unfenceTopicToResume();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/SystemTopicClient.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/SystemTopicClient.java
index 3f5a0a9..ceb1df6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/SystemTopicClient.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/SystemTopicClient.java
@@ -104,6 +104,25 @@ public interface SystemTopicClient<T> {
         CompletableFuture<MessageId> writeAsync(T t);
 
         /**
+         * Delete event in the system topic.
+         * @param t pulsar event
+         * @return message id
+         * @throws PulsarClientException exception while write event cause
+         */
+        default MessageId delete(T t) throws PulsarClientException {
+            throw new UnsupportedOperationException("Unsupported operation");
+        }
+
+        /**
+         * Async delete event in the system topic.
+         * @param t pulsar event
+         * @return message id future
+         */
+        default CompletableFuture<MessageId> deleteAsync(T t) {
+            throw new UnsupportedOperationException("Unsupported operation");
+        }
+
+        /**
          * Close the system topic writer.
          */
         void close() throws IOException;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TransactionBufferSystemTopicClient.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TransactionBufferSystemTopicClient.java
index 81b7096..807bb9d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TransactionBufferSystemTopicClient.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TransactionBufferSystemTopicClient.java
@@ -105,6 +105,22 @@ public class TransactionBufferSystemTopicClient extends SystemTopicClientBase<Tr
         }
 
         @Override
+        public MessageId delete(TransactionBufferSnapshot transactionBufferSnapshot) throws PulsarClientException {
+            return producer.newMessage()
+                    .key(transactionBufferSnapshot.getTopicName())
+                    .value(null)
+                    .send();
+        }
+
+        @Override
+        public CompletableFuture<MessageId> deleteAsync(TransactionBufferSnapshot transactionBufferSnapshot) {
+            return producer.newMessage()
+                    .key(transactionBufferSnapshot.getTopicName())
+                    .value(null)
+                    .sendAsync();
+        }
+
+        @Override
         public void close() throws IOException {
             this.producer.close();
             transactionBufferSystemTopicClient.removeWriter(this);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java
index c2f6006..6ffc218 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java
@@ -125,6 +125,13 @@ public interface TransactionBuffer {
     CompletableFuture<Void> purgeTxns(List<Long> dataLedgers);
 
     /**
+     * Clear up the snapshot of the TransactionBuffer.
+     *
+     * @return Clear up operation result.
+     */
+    CompletableFuture<Void> clearSnapshot();
+
+    /**
      * Close the buffer asynchronously.
      *
      * @return
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
index 43ed06f..213c7d0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
@@ -346,6 +346,11 @@ class InMemTransactionBuffer implements TransactionBuffer {
     }
 
     @Override
+    public CompletableFuture<Void> clearSnapshot() {
+        return CompletableFuture.completedFuture(null);
+    }
+
+    @Override
     public CompletableFuture<Void> closeAsync() {
         buffers.values().forEach(TxnBuffer::close);
         return CompletableFuture.completedFuture(null);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index c33c404..220b432 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -372,6 +372,15 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
     }
 
     @Override
+    public CompletableFuture<Void> clearSnapshot() {
+        return this.takeSnapshotWriter.thenCompose(writer -> {
+            TransactionBufferSnapshot snapshot = new TransactionBufferSnapshot();
+            snapshot.setTopicName(topic.getName());
+            return writer.deleteAsync(snapshot);
+        }).thenCompose(__ -> CompletableFuture.completedFuture(null));
+    }
+
+    @Override
     public CompletableFuture<Void> closeAsync() {
         changeToCloseState();
         return this.takeSnapshotWriter.thenCompose(SystemTopicClient.Writer::closeAsync);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
index 4b50e55..ff18924 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
@@ -69,6 +69,11 @@ public class TransactionBufferDisable implements TransactionBuffer {
     }
 
     @Override
+    public CompletableFuture<Void> clearSnapshot() {
+        return CompletableFuture.completedFuture(null);
+    }
+
+    @Override
     public CompletableFuture<Void> closeAsync() {
         return CompletableFuture.completedFuture(null);
     }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
index d79d3c8..956b86e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
@@ -22,6 +22,7 @@ import com.google.common.collect.Sets;
 
 import java.io.IOException;
 import java.lang.reflect.Field;
+import java.lang.reflect.Method;
 import java.util.NavigableMap;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
@@ -33,9 +34,11 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats;
 import org.apache.commons.collections4.map.LinkedMap;
+import org.apache.commons.lang3.RandomUtils;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
 import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer;
 import org.apache.pulsar.broker.transaction.buffer.matadata.TransactionBufferSnapshot;
 import org.apache.pulsar.client.api.Consumer;
@@ -50,11 +53,11 @@ import org.apache.pulsar.client.api.transaction.Transaction;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.transaction.TransactionImpl;
+import org.apache.pulsar.common.events.EventType;
 import org.apache.pulsar.common.events.EventsTopicNames;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.policies.data.ClusterDataImpl;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.awaitility.Awaitility;
@@ -403,4 +406,71 @@ public class TopicTransactionBufferRecoverTest extends TransactionTestBase {
         }
         assertTrue(exist);
     }
+
+    @Test
+    public void clearTransactionBufferSnapshotTest() throws Exception {
+        String topic = NAMESPACE1 + "/tb-snapshot-delete-" + RandomUtils.nextInt();
+
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient
+                .newProducer()
+                .topic(topic)
+                .sendTimeout(0, TimeUnit.SECONDS)
+                .create();
+
+        Transaction txn = pulsarClient.newTransaction()
+                .withTransactionTimeout(5, TimeUnit.SECONDS)
+                .build().get();
+        producer.newMessage(txn).value("test".getBytes()).sendAsync();
+        producer.newMessage(txn).value("test".getBytes()).sendAsync();
+        txn.commit().get();
+
+        // take snapshot
+        PersistentTopic originalTopic = (PersistentTopic) getPulsarServiceList().get(0)
+                .getBrokerService().getTopic(TopicName.get(topic).toString(), false).get().get();
+        TopicTransactionBuffer topicTransactionBuffer = (TopicTransactionBuffer) originalTopic.getTransactionBuffer();
+        Method takeSnapshotMethod = TopicTransactionBuffer.class.getDeclaredMethod("takeSnapshot");
+        takeSnapshotMethod.setAccessible(true);
+        takeSnapshotMethod.invoke(topicTransactionBuffer);
+
+        TopicName transactionBufferTopicName =
+                NamespaceEventsSystemTopicFactory.getSystemTopicName(
+                        TopicName.get(topic).getNamespaceObject(), EventType.TRANSACTION_BUFFER_SNAPSHOT);
+        PersistentTopic snapshotTopic = (PersistentTopic) getPulsarServiceList().get(0)
+                .getBrokerService().getTopic(transactionBufferTopicName.toString(), false).get().get();
+        Field field = PersistentTopic.class.getDeclaredField("currentCompaction");
+        field.setAccessible(true);
+
+        // Trigger compaction and make sure it is finished.
+        checkSnapshotCount(transactionBufferTopicName, true, snapshotTopic, field);
+        admin.topics().delete(topic, true);
+        checkSnapshotCount(transactionBufferTopicName, false, snapshotTopic, field);
+    }
+
+    private void checkSnapshotCount(TopicName topicName, boolean hasSnapshot,
+                                    PersistentTopic persistentTopic, Field field) throws Exception {
+        persistentTopic.triggerCompaction();
+        CompletableFuture<Long> compactionFuture = (CompletableFuture<Long>) field.get(persistentTopic);
+        Awaitility.await().untilAsserted(() -> assertTrue(compactionFuture.isDone()));
+
+        Reader<TransactionBufferSnapshot> reader = pulsarClient.newReader(Schema.AVRO(TransactionBufferSnapshot.class))
+                .readCompacted(true)
+                .startMessageId(MessageId.earliest)
+                .startMessageIdInclusive()
+                .topic(topicName.toString())
+                .create();
+
+        int count = 0;
+        while (true) {
+            Message<TransactionBufferSnapshot> snapshotMsg = reader.readNext(2, TimeUnit.SECONDS);
+            if (snapshotMsg != null) {
+                count++;
+            } else {
+                break;
+            }
+        }
+        assertTrue(hasSnapshot ? count > 0 : count == 0);
+        reader.close();
+    }
+
 }

[pulsar] 05/09: [C++] Handle error when shutting down client after forks (#11954)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 44d85e8bbdf48713ac650039ee4ea838898f780a
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Tue Sep 7 18:37:36 2021 -0700

    [C++] Handle error when shutting down client after forks (#11954)
    
    * [C++] Handle error when shutting down client after forks
    
    * Fixed formatting
    
    (cherry picked from commit 2858ed02a76788d1973a5145fca03997672c4779)
---
 pulsar-client-cpp/lib/ExecutorService.cc | 20 +++++++++++---------
 1 file changed, 11 insertions(+), 9 deletions(-)

diff --git a/pulsar-client-cpp/lib/ExecutorService.cc b/pulsar-client-cpp/lib/ExecutorService.cc
index 7cd4a49..f7cb010 100644
--- a/pulsar-client-cpp/lib/ExecutorService.cc
+++ b/pulsar-client-cpp/lib/ExecutorService.cc
@@ -22,6 +22,9 @@
 #include <functional>
 #include <memory>
 
+#include "LogUtils.h"
+DECLARE_LOG_OBJECT()
+
 namespace pulsar {
 
 ExecutorService::ExecutorService()
@@ -29,14 +32,7 @@ ExecutorService::ExecutorService()
       work_(new BackgroundWork(*io_service_)),
       worker_(std::bind(&ExecutorService::startWorker, this, io_service_)) {}
 
-ExecutorService::~ExecutorService() {
-    close();
-    // If the worker_ is still not joinable at this point just detach
-    // the thread so its destructor does not terminate the app
-    if (worker_.joinable()) {
-        worker_.detach();
-    }
-}
+ExecutorService::~ExecutorService() { close(); }
 
 void ExecutorService::startWorker(std::shared_ptr<boost::asio::io_service> io_service) { io_service_->run(); }
 
@@ -70,7 +66,13 @@ void ExecutorService::close() {
     work_.reset();
     // Detach the worker thread instead of join to avoid potential deadlock
     if (worker_.joinable()) {
-        worker_.detach();
+        try {
+            worker_.detach();
+        } catch (const std::system_error &e) {
+            // This condition will happen if we're forking the process, therefore the thread was not ported to
+            // the child side of the fork and the detach would be failing.
+            LOG_DEBUG("Failed to detach thread: " << e.what());
+        }
     }
 }
 

[pulsar] 06/09: [Python] Expose Client.shutdown() method (#11955)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit b1ff9bc7a3cf84a3bf9f2d981c5051bd0b1f6f64
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Tue Sep 7 18:36:11 2021 -0700

    [Python] Expose Client.shutdown() method (#11955)
    
    ### Motivation
    
    Similar to what we expose in Java and C++ client, we should expose the quick `shutdown()` method on the Python client.
    
    (cherry picked from commit c11ac895a1a90a378e4407597ec9540ff8bdcdd4)
---
 pulsar-client-cpp/python/pulsar/__init__.py |  9 +++++++++
 pulsar-client-cpp/python/pulsar_test.py     | 13 +++++++++++++
 2 files changed, 22 insertions(+)

diff --git a/pulsar-client-cpp/python/pulsar/__init__.py b/pulsar-client-cpp/python/pulsar/__init__.py
index 429aa10..9570cbe 100644
--- a/pulsar-client-cpp/python/pulsar/__init__.py
+++ b/pulsar-client-cpp/python/pulsar/__init__.py
@@ -844,6 +844,15 @@ class Client:
         _check_type(str, topic, 'topic')
         return self._client.get_topic_partitions(topic)
 
+    def shutdown(self):
+        """
+        Perform immediate shutdown of Pulsar client.
+
+        Release all resources and close all producer, consumer, and readers without waiting
+        for ongoing operations to complete.
+        """
+        self._client.shutdown()
+
     def close(self):
         """
         Close the client and all the associated producers and consumers
diff --git a/pulsar-client-cpp/python/pulsar_test.py b/pulsar-client-cpp/python/pulsar_test.py
index 7c85f77..b7d265f 100755
--- a/pulsar-client-cpp/python/pulsar_test.py
+++ b/pulsar-client-cpp/python/pulsar_test.py
@@ -1134,6 +1134,19 @@ class PulsarTest(TestCase):
         self.assertTrue(msg.topic_name() in partitions)
         client.close()
 
+    def test_shutdown_client(self):
+        client = Client(self.serviceUrl)
+        producer = client.create_producer('persistent://public/default/partitioned_topic_name_test')
+        producer.send(b'hello')
+        client.shutdown()
+
+        try:
+            producer.send(b'hello')
+            self.assertTrue(False)
+        except pulsar.PulsarException:
+            # Expected
+            pass
+
     def test_negative_acks(self):
         client = Client(self.serviceUrl)
         consumer = client.subscribe('test_negative_acks',