You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by yo...@apache.org on 2021/12/23 14:03:45 UTC

[pulsar] branch branch-2.8 updated (d8e9d47 -> ed3db4a)

This is an automated email from the ASF dual-hosted git repository.

yong pushed a change to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


    from d8e9d47  [branch 2.8] [owasp] port suppressions files from master and make the check pass (#13455)
     new 5a123c4  Fix ProxyServiceStarterTest flaky tests (#12344)
     new 42bce8a  [pulsar-perf] Write histogram files for consume command (#12569)
     new 75ce022  Fix Issue #12885, Unordered consuming case in Key_Shared subscription (#12890)
     new 129ec34  Use sendAsync instead of send when produce message to retry topic. (#12946)
     new b613f52  feat(cli): support autorecovery service in pulsar cli (#12985)
     new 3574bbd  Fix consume message order issue when use listener. (#13023)
     new 4dd61e6  fix(functions): missing runtime set in GoInstanceConfig (#13031)
     new 8770cfc  Don't attempt to delete pending ack store unless transactions are enabled (#13041)
     new ef4c7f7  pulsar admin exposes secret for source and sink (#13059)
     new 85c4ba0  Use current resourceUsage value as historyUsage when leader change in ThresholdShedder (#13136)
     new 33bc0cc  Fix flaky test BrokerServiceLookupTest.testModularLoadManagerSplitBundle (#13159)
     new ba155d6  Update cursor last active timestamp when reseting cursor  (#13166)
     new cb7f6ae  Fix in macOS cmake might find error boost-python libs path (#13193)
     new ed3db4a  Fix when deleting topic with NotFoundException, do not return to client (#13203)

The 14 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 bin/pulsar                                         |  4 ++
 bin/pulsar-daemon                                  |  4 ++
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |  2 +-
 .../bookkeeper/mledger/impl/ManagedCursorTest.java | 42 +++++++++++++++++++++
 .../apache/pulsar/broker/admin/AdminResource.java  |  8 ++++
 .../broker/admin/impl/PersistentTopicsBase.java    |  5 +--
 .../broker/loadbalance/impl/ThresholdShedder.java  | 22 +++++------
 .../PersistentDispatcherMultipleConsumers.java     |  3 ++
 ...istentStickyKeyDispatcherMultipleConsumers.java | 31 ++++++++++++++++
 .../broker/service/persistent/PersistentTopic.java | 43 ++++++++++++----------
 .../pulsar/broker/admin/PersistentTopicsTest.java  | 32 ++++++++++++++++
 .../pulsar/client/api/BrokerServiceLookupTest.java | 12 +++---
 pulsar-client-cpp/python/CMakeLists.txt            | 17 ++++++---
 .../java/org/apache/pulsar/admin/cli/CmdSinks.java | 19 +++++++++-
 .../org/apache/pulsar/admin/cli/CmdSources.java    | 13 +++++++
 .../apache/pulsar/client/impl/ConsumerBase.java    | 42 +++++++++++----------
 .../apache/pulsar/client/impl/ConsumerImpl.java    | 13 ++++---
 .../pulsar/functions/runtime/RuntimeUtils.java     |  3 ++
 .../pulsar/functions/runtime/RuntimeUtilsTest.java |  3 +-
 .../runtime/kubernetes/KubernetesRuntimeTest.java  |  2 +-
 .../pulsar/proxy/server/ProxyServiceStarter.java   |  5 +++
 .../proxy/server/ProxyServiceStarterTest.java      | 16 +++++---
 .../pulsar/testclient/PerformanceConsumer.java     | 24 +++++++++++-
 .../pulsar/testclient/PerformanceProducer.java     | 25 +++++++++----
 24 files changed, 301 insertions(+), 89 deletions(-)

[pulsar] 13/14: Fix in macOS cmake might find error boost-python libs path (#13193)

Posted by yo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit cb7f6ae7d01fa9c68e3d6d3104bdac3777f6a7b1
Author: Kai Wang <kw...@streamnative.io>
AuthorDate: Fri Dec 10 01:25:05 2021 +0800

    Fix in macOS cmake might find error boost-python libs path (#13193)
    
    (cherry picked from commit fdfea8af64a1a9e5c259238ca9ec681ec41fb108)
---
 pulsar-client-cpp/python/CMakeLists.txt | 17 ++++++++++++-----
 1 file changed, 12 insertions(+), 5 deletions(-)

diff --git a/pulsar-client-cpp/python/CMakeLists.txt b/pulsar-client-cpp/python/CMakeLists.txt
index f7d4069..30631cd 100644
--- a/pulsar-client-cpp/python/CMakeLists.txt
+++ b/pulsar-client-cpp/python/CMakeLists.txt
@@ -72,11 +72,18 @@ set(PYTHON_WRAPPER_LIBS ${Boost_PYTHON_LIBRARY}
                         ${Boost_PYTHON39_LIBRARY})
 
 if (APPLE)
-    set(PYTHON_WRAPPER_LIBS ${PYTHON_WRAPPER_LIBS}
-                            ${Boost_PYTHON27-MT_LIBRARY_RELEASE}
-                            ${Boost_PYTHON37-MT_LIBRARY_RELEASE}
-                            ${Boost_PYTHON38-MT_LIBRARY_RELEASE}
-                            ${Boost_PYTHON39-MT_LIBRARY_RELEASE})
+    if (Boost_PYTHON27-MT_LIBRARY_RELEASE)
+        set(PYTHON_WRAPPER_LIBS ${PYTHON_WRAPPER_LIBS} ${Boost_PYTHON27-MT_LIBRARY_RELEASE})
+    endif ()
+    if (Boost_PYTHON37-MT_LIBRARY_RELEASE)
+        set(PYTHON_WRAPPER_LIBS ${PYTHON_WRAPPER_LIBS} ${Boost_PYTHON37-MT_LIBRARY_RELEASE})
+    endif ()
+    if (Boost_PYTHON38-MT_LIBRARY_RELEASE)
+        set(PYTHON_WRAPPER_LIBS ${PYTHON_WRAPPER_LIBS} ${Boost_PYTHON38-MT_LIBRARY_RELEASE})
+    endif ()
+    if (Boost_PYTHON39-MT_LIBRARY_RELEASE)
+        set(PYTHON_WRAPPER_LIBS ${PYTHON_WRAPPER_LIBS} ${Boost_PYTHON39-MT_LIBRARY_RELEASE})
+    endif ()
 endif()
 
 message(STATUS "Using Boost Python libs: ${PYTHON_WRAPPER_LIBS}")

[pulsar] 07/14: fix(functions): missing runtime set in GoInstanceConfig (#13031)

Posted by yo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 4dd61e61eaf16cc93126f83dce8a7029d4bec682
Author: Eric Shen <er...@outlook.com>
AuthorDate: Tue Nov 30 23:29:30 2021 -0600

    fix(functions): missing runtime set in GoInstanceConfig (#13031)
    
    * fix(functions): missing runtime set in GoInstanceConfig
    
    Signed-off-by: Eric Shen <er...@outlook.com>
    
    * fix ci ut
    
    Signed-off-by: Eric Shen <er...@outlook.com>
    
    * fix test ci
    
    Signed-off-by: Eric Shen <er...@outlook.com>
    
    * rollback some change in function-go
    
    Signed-off-by: Eric Shen <er...@outlook.com>
    (cherry picked from commit aa992e843581b65c854a0f97353f68ab0170b576)
---
 .../main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java    | 3 +++
 .../java/org/apache/pulsar/functions/runtime/RuntimeUtilsTest.java     | 3 ++-
 .../pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java     | 2 +-
 3 files changed, 6 insertions(+), 2 deletions(-)

diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
index 107d5cf..4ebb7d9 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
@@ -173,6 +173,9 @@ public class RuntimeUtils {
         if (instanceConfig.getFunctionDetails().getProcessingGuarantees() != null) {
             goInstanceConfig.setProcessingGuarantees(instanceConfig.getFunctionDetails().getProcessingGuaranteesValue());
         }
+        if (instanceConfig.getFunctionDetails().getRuntime() != null) {
+            goInstanceConfig.setRuntime(instanceConfig.getFunctionDetails().getRuntimeValue());
+        }
         if (instanceConfig.getFunctionDetails().getSecretsMap() != null) {
             goInstanceConfig.setSecretsMap(instanceConfig.getFunctionDetails().getSecretsMap());
         }
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/RuntimeUtilsTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/RuntimeUtilsTest.java
index f8bbbc4..bc00776 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/RuntimeUtilsTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/RuntimeUtilsTest.java
@@ -99,6 +99,7 @@ public class RuntimeUtilsTest {
                 .setName("go-func")
                 .setLogTopic("go-func-log")
                 .setProcessingGuarantees(Function.ProcessingGuarantees.ATLEAST_ONCE)
+                .setRuntime(Function.FunctionDetails.Runtime.GO)
                 .setSecretsMap(secretsMap.toJSONString())
                 .setParallelism(1)
                 .setSource(sources)
@@ -137,7 +138,7 @@ public class RuntimeUtilsTest {
         Assert.assertEquals(goInstanceConfig.get("autoAck"), true);
         Assert.assertEquals(goInstanceConfig.get("regexPatternSubscription"), false);
         Assert.assertEquals(goInstanceConfig.get("pulsarServiceURL"), "pulsar://localhost:6650");
-        Assert.assertEquals(goInstanceConfig.get("runtime"), 0);
+        Assert.assertEquals(goInstanceConfig.get("runtime"), 3);
         Assert.assertEquals(goInstanceConfig.get("cpu"), 2.0);
         Assert.assertEquals(goInstanceConfig.get("funcID"), "func-7734");
         Assert.assertEquals(goInstanceConfig.get("funcVersion"), "1.0.0");
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
index 8e24222..032936d 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
@@ -870,7 +870,7 @@ public class KubernetesRuntimeTest {
         assertEquals(goInstanceConfig.get("autoAck"), false);
         assertEquals(goInstanceConfig.get("regexPatternSubscription"), false);
         assertEquals(goInstanceConfig.get("pulsarServiceURL"), pulsarServiceUrl);
-        assertEquals(goInstanceConfig.get("runtime"), 0);
+        assertEquals(goInstanceConfig.get("runtime"), 3);
         assertEquals(goInstanceConfig.get("cpu"), 1.0);
         assertEquals(goInstanceConfig.get("funcVersion"), "1.0");
         assertEquals(goInstanceConfig.get("disk"), 10000);

[pulsar] 11/14: Fix flaky test BrokerServiceLookupTest.testModularLoadManagerSplitBundle (#13159)

Posted by yo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 33bc0cc0cf6a7c3b7af0b245ab8206f1a3f8c342
Author: Zhanpeng Wu <zh...@qq.com>
AuthorDate: Tue Dec 7 21:31:10 2021 +0800

    Fix flaky test BrokerServiceLookupTest.testModularLoadManagerSplitBundle (#13159)
    
    Co-authored-by: wuzhanpeng <wu...@bigo.sg>
    (cherry picked from commit 4b319f38256d586bf179ac8df9f401709b128b15)
---
 .../apache/pulsar/client/api/BrokerServiceLookupTest.java    | 12 ++++++------
 1 file changed, 6 insertions(+), 6 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
index 36ff448..3b12dcc 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
@@ -981,15 +981,18 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase {
             conf2.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
             conf2.setZookeeperServers("localhost:2181");
             conf2.setConfigurationStoreServers("localhost:3181");
-
-            @Cleanup
-            PulsarService pulsar2 = startBroker(conf2);
+            conf2.setLoadBalancerAutoBundleSplitEnabled(true);
+            conf2.setLoadBalancerAutoUnloadSplitBundlesEnabled(true);
+            conf2.setLoadBalancerNamespaceBundleMaxTopics(1);
 
             // configure broker-1 with ModularLoadManager
             stopBroker();
             conf.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
             startBroker();
 
+            @Cleanup
+            PulsarService pulsar2 = startBroker(conf2);
+
             pulsar.getLoadManager().get().writeLoadReportOnZookeeper();
             pulsar2.getLoadManager().get().writeLoadReportOnZookeeper();
 
@@ -1059,9 +1062,6 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase {
                     .getLoadManager().get()).getLoadManager();
 
             updateAllMethod.invoke(loadManager);
-            conf2.setLoadBalancerAutoBundleSplitEnabled(true);
-            conf2.setLoadBalancerAutoUnloadSplitBundlesEnabled(true);
-            conf2.setLoadBalancerNamespaceBundleMaxTopics(1);
             loadManager.checkNamespaceBundleSplit();
 
             // (6) Broker-2 should get the watch and update bundle cache

[pulsar] 14/14: Fix when deleting topic with NotFoundException, do not return to client (#13203)

Posted by yo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit ed3db4ac620919b30602246220f42d16f0e171a0
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Sat Dec 11 15:20:49 2021 +0800

    Fix when deleting topic with NotFoundException, do not return to client (#13203)
    
    (cherry picked from commit bd68b6f05f9749328701c59bdaf3cddda2254d39)
---
 .../apache/pulsar/broker/admin/AdminResource.java  |  8 ++++++
 .../broker/admin/impl/PersistentTopicsBase.java    |  5 ++--
 .../pulsar/broker/admin/PersistentTopicsTest.java  | 32 ++++++++++++++++++++++
 3 files changed, 42 insertions(+), 3 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 7a3c267..ba61953 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -36,6 +36,7 @@ import javax.ws.rs.container.AsyncResponse;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
 import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
@@ -62,6 +63,7 @@ import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl;
 import org.apache.pulsar.common.util.Codec;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.metadata.api.MetadataStoreException.AlreadyExistsException;
 import org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException;
 import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
@@ -804,6 +806,12 @@ public abstract class AdminResource extends PulsarWebResource {
         }
     }
 
+    protected boolean isManagedLedgerNotFoundException(Exception e) {
+        Throwable cause = e.getCause();
+        return cause instanceof ManagedLedgerException.MetadataNotFoundException
+                || cause instanceof MetadataStoreException.NotFoundException;
+    }
+
     protected void checkArgument(boolean b, String errorMessage) {
         if (!b) {
             throw new RestException(Status.BAD_REQUEST, errorMessage);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index fdd2397..23f5742 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -56,7 +56,6 @@ import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.LedgerOffloader;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
-import org.apache.bookkeeper.mledger.ManagedLedgerException.MetadataNotFoundException;
 import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
@@ -312,7 +311,7 @@ public class PersistentTopicsBase extends AdminResource {
         try {
             pulsar().getBrokerService().deleteTopic(topicName.toString(), true, deleteSchema).get();
         } catch (Exception e) {
-            if (e.getCause() instanceof MetadataNotFoundException) {
+            if (isManagedLedgerNotFoundException(e)) {
                 log.info("[{}] Topic was already not existing {}", clientAppId(), topicName, e);
             } else {
                 log.error("[{}] Failed to delete topic forcefully {}", clientAppId(), topicName, e);
@@ -1000,7 +999,7 @@ public class PersistentTopicsBase extends AdminResource {
             log.error("[{}] Failed to delete topic {}", clientAppId(), topicName, t);
             if (t instanceof TopicBusyException) {
                 throw new RestException(Status.PRECONDITION_FAILED, "Topic has active producers/subscriptions");
-            } else if (t instanceof MetadataNotFoundException) {
+            } else if (isManagedLedgerNotFoundException(e)) {
                 throw new RestException(Status.NOT_FOUND, "Topic not found");
             } else {
                 throw new RestException(t);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index 2377b9c..5bc55a3 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -49,6 +49,7 @@ import org.apache.pulsar.broker.admin.v2.PersistentTopics;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
 import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
+import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.web.PulsarWebResource;
 import org.apache.pulsar.broker.web.RestException;
 import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -73,6 +74,7 @@ import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.zookeeper.ZooKeeperManagedLedgerCache;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.zookeeper.KeeperException;
 import org.mockito.ArgumentCaptor;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
@@ -899,4 +901,34 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
             Assert.assertNull(message4);
         }
     }
+
+    @Test
+    public void testDeleteTopic() throws Exception {
+        final String topicName = "topic-1";
+        BrokerService brokerService = spy(pulsar.getBrokerService());
+        doReturn(brokerService).when(pulsar).getBrokerService();
+        persistentTopics.createNonPartitionedTopic(testTenant, testNamespace, topicName, false);
+        CompletableFuture<Void> deleteTopicFuture = new CompletableFuture<>();
+        deleteTopicFuture.completeExceptionally(new MetadataStoreException.NotFoundException());
+        doReturn(deleteTopicFuture).when(brokerService).deleteTopic(anyString(), anyBoolean(), anyBoolean());
+        persistentTopics.deleteTopic(testTenant, testNamespace, topicName, true, true, true);
+        //
+        CompletableFuture<Void> deleteTopicFuture2 = new CompletableFuture<>();
+        deleteTopicFuture2.completeExceptionally(new MetadataStoreException("test exception"));
+        doReturn(deleteTopicFuture2).when(brokerService).deleteTopic(anyString(), anyBoolean(), anyBoolean());
+        try {
+            persistentTopics.deleteTopic(testTenant, testNamespace, topicName, true, true, true);
+        } catch (Exception e) {
+            Assert.assertTrue(e instanceof RestException);
+        }
+        //
+        CompletableFuture<Void> deleteTopicFuture3 = new CompletableFuture<>();
+        deleteTopicFuture3.completeExceptionally(new MetadataStoreException.NotFoundException());
+        doReturn(deleteTopicFuture3).when(brokerService).deleteTopic(anyString(), anyBoolean(), anyBoolean());
+        try {
+            persistentTopics.deleteTopic(testTenant, testNamespace, topicName, false, true, true);
+        } catch (RestException e) {
+            Assert.assertEquals(e.getResponse().getStatus(), 404);
+        }
+    }
 }

[pulsar] 02/14: [pulsar-perf] Write histogram files for consume command (#12569)

Posted by yo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 42bce8a6e4f8eb3bcebdf720306932e2646401e3
Author: Matt Fleming <ma...@codeblueprint.co.uk>
AuthorDate: Fri Nov 5 09:36:44 2021 +0000

    [pulsar-perf] Write histogram files for consume command (#12569)
    
    * [pulsar-perf] Write histogram files for consume command
    
    * [pulsar-perf] Disable writing to histogram files by default
    
    Most users don't use the histogram files and instead opt for sending
    metrics to prometheus, etc, so there's no need to have this enabled by
    default.
    
    Instead, add a new --histogram-file parameter to pulsar-perf
    produce/consume which, when specified, dumps the contents of the
    internal histogram to the given filename.
    
    Previous behaviour can be achieved with the following options:
    
      $ pulsar-perf produce --histogram-file perf-producer-$(date +%s).hgrm
    
    * [pulsar-perf] Update docs with --histogram-file param
    
    (cherry picked from commit 48de2e251ee3ea38449d696402d0cc57c88ee3c7)
---
 .../pulsar/testclient/PerformanceConsumer.java     | 24 +++++++++++++++++++--
 .../pulsar/testclient/PerformanceProducer.java     | 25 +++++++++++++++-------
 2 files changed, 39 insertions(+), 10 deletions(-)

diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
index e0fa99b..5b1b863 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
@@ -26,8 +26,8 @@ import com.beust.jcommander.Parameter;
 import com.beust.jcommander.ParameterException;
 import com.beust.jcommander.Parameters;
 import java.io.FileInputStream;
-import java.lang.management.BufferPoolMXBean;
-import java.lang.management.ManagementFactory;
+import java.io.FileOutputStream;
+import java.io.PrintStream;
 import java.nio.ByteBuffer;
 import java.text.DecimalFormat;
 import java.util.Collections;
@@ -38,6 +38,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.LongAdder;
 
 import org.HdrHistogram.Histogram;
+import org.HdrHistogram.HistogramLogWriter;
 import org.HdrHistogram.Recorder;
 import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.Consumer;
@@ -185,6 +186,9 @@ public class PerformanceConsumer {
 
         @Parameter(names = {"-bw", "--busy-wait"}, description = "Enable Busy-Wait on the Pulsar client")
         public boolean enableBusyWait = false;
+
+        @Parameter(names = { "--histogram-file" }, description = "HdrHistogram output file")
+        public String histogramFile = null;
     }
 
     public static void main(String[] args) throws Exception {
@@ -400,7 +404,19 @@ public class PerformanceConsumer {
         long oldTime = System.nanoTime();
 
         Histogram reportHistogram = null;
+        HistogramLogWriter histogramLogWriter = null;
+
+        if (arguments.histogramFile != null) {
+            String statsFileName = arguments.histogramFile;
+            log.info("Dumping latency stats to {}", statsFileName);
+
+            PrintStream histogramLog = new PrintStream(new FileOutputStream(statsFileName), false);
+            histogramLogWriter = new HistogramLogWriter(histogramLog);
 
+            // Some log header bits
+            histogramLogWriter.outputLogFormatVersion();
+            histogramLogWriter.outputLegend();
+        }
 
         while (true) {
             try {
@@ -425,6 +441,10 @@ public class PerformanceConsumer {
                     reportHistogram.getValueAtPercentile(99), reportHistogram.getValueAtPercentile(99.9),
                     reportHistogram.getValueAtPercentile(99.99), reportHistogram.getMaxValue());
 
+            if (histogramLogWriter != null) {
+                histogramLogWriter.outputIntervalHistogram(reportHistogram);
+            }
+
             reportHistogram.reset();
             oldTime = now;
         }
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
index f198a2d..c2d13c3 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
@@ -250,6 +250,9 @@ public class PerformanceProducer {
 
         @Parameter(names = {"-fc", "--format-class"}, description="Custom Formatter class name")
         public String formatterClass = "org.apache.pulsar.testclient.DefaultMessageFormatter";
+
+        @Parameter(names = { "--histogram-file" }, description = "HdrHistogram output file")
+        public String histogramFile = null;
     }
 
     public static void main(String[] args) throws Exception {
@@ -429,16 +432,19 @@ public class PerformanceProducer {
         long oldTime = System.nanoTime();
 
         Histogram reportHistogram = null;
+        HistogramLogWriter histogramLogWriter = null;
 
-        String statsFileName = "perf-producer-" + System.currentTimeMillis() + ".hgrm";
-        log.info("Dumping latency stats to {}", statsFileName);
+        if (arguments.histogramFile != null) {
+            String statsFileName = arguments.histogramFile;
+            log.info("Dumping latency stats to {}", statsFileName);
 
-        PrintStream histogramLog = new PrintStream(new FileOutputStream(statsFileName), false);
-        HistogramLogWriter histogramLogWriter = new HistogramLogWriter(histogramLog);
+            PrintStream histogramLog = new PrintStream(new FileOutputStream(statsFileName), false);
+            histogramLogWriter = new HistogramLogWriter(histogramLog);
 
-        // Some log header bits
-        histogramLogWriter.outputLogFormatVersion();
-        histogramLogWriter.outputLegend();
+            // Some log header bits
+            histogramLogWriter.outputLogFormatVersion();
+            histogramLogWriter.outputLegend();
+        }
 
         while (true) {
             try {
@@ -473,7 +479,10 @@ public class PerformanceProducer {
                     dec.format(reportHistogram.getValueAtPercentile(99.99) / 1000.0),
                     dec.format(reportHistogram.getMaxValue() / 1000.0));
 
-            histogramLogWriter.outputIntervalHistogram(reportHistogram);
+            if (histogramLogWriter != null) {
+                histogramLogWriter.outputIntervalHistogram(reportHistogram);
+            }
+
             reportHistogram.reset();
 
             oldTime = now;

[pulsar] 01/14: Fix ProxyServiceStarterTest flaky tests (#12344)

Posted by yo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 5a123c414172035276f8181eebde6f81741c75e6
Author: Nicolò Boschi <bo...@gmail.com>
AuthorDate: Thu Oct 21 18:06:49 2021 +0200

    Fix ProxyServiceStarterTest flaky tests (#12344)
    
    * Fix ProxyServiceStarterTest flaky tests
    
    * revert formatting
    
    * use ephimeral port
    
    (cherry picked from commit 7ad46c8c18bb8365c9a2d1233a6cd58ecd6f541f)
---
 .../apache/pulsar/proxy/server/ProxyServiceStarter.java  |  5 +++++
 .../pulsar/proxy/server/ProxyServiceStarterTest.java     | 16 +++++++++++-----
 2 files changed, 16 insertions(+), 5 deletions(-)

diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
index 6c151dd..86ce236 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
@@ -299,6 +299,11 @@ public class ProxyServiceStarter {
         return config;
     }
 
+    @VisibleForTesting
+    public WebServer getServer() {
+        return server;
+    }
+
     private static final Logger log = LoggerFactory.getLogger(ProxyServiceStarter.class);
 
 }
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java
index 3377ec2..bdba8d3 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java
@@ -59,7 +59,8 @@ public class ProxyServiceStarterTest extends MockedPulsarServiceBaseTest {
         serviceStarter = new ProxyServiceStarter(ARGS);
         serviceStarter.getConfig().setBrokerServiceURL(pulsar.getBrokerServiceUrl());
         serviceStarter.getConfig().setBrokerWebServiceURL(pulsar.getWebServiceAddress());
-        serviceStarter.getConfig().setServicePort(Optional.of(11000));
+        serviceStarter.getConfig().setWebServicePort(Optional.of(0));
+        serviceStarter.getConfig().setServicePort(Optional.of(0));
         serviceStarter.getConfig().setWebSocketServiceEnabled(true);
         serviceStarter.start();
     }
@@ -71,14 +72,19 @@ public class ProxyServiceStarterTest extends MockedPulsarServiceBaseTest {
         serviceStarter.close();
     }
 
+    private String computeWsBasePath() {
+        return String.format("ws://localhost:%d/ws", serviceStarter.getServer().getListenPortHTTP().get());
+    }
+
     @Test
     public void testEnableWebSocketServer() throws Exception {
         HttpClient httpClient = new HttpClient();
         WebSocketClient webSocketClient = new WebSocketClient(httpClient);
         webSocketClient.start();
         MyWebSocket myWebSocket = new MyWebSocket();
-        String webSocketUri = "ws://localhost:8080/ws/pingpong";
+        String webSocketUri = computeWsBasePath() + "/pingpong";
         Future<Session> sessionFuture = webSocketClient.connect(myWebSocket, URI.create(webSocketUri));
+        System.out.println("uri" + webSocketUri);
         sessionFuture.get().getRemote().sendPing(ByteBuffer.wrap("ping".getBytes()));
         assertTrue(myWebSocket.getResponse().contains("ping"));
     }
@@ -86,7 +92,7 @@ public class ProxyServiceStarterTest extends MockedPulsarServiceBaseTest {
     @Test
     public void testProducer() throws Exception {
         @Cleanup
-        PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:11000")
+        PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:" + this.pulsar.getBrokerService().getListenPort().get())
                 .build();
 
         @Cleanup
@@ -105,7 +111,7 @@ public class ProxyServiceStarterTest extends MockedPulsarServiceBaseTest {
         WebSocketClient producerWebSocketClient = new WebSocketClient(producerClient);
         producerWebSocketClient.start();
         MyWebSocket producerSocket = new MyWebSocket();
-        String produceUri = "ws://localhost:8080/ws/producer/persistent/sample/test/local/websocket-topic";
+        String produceUri = computeWsBasePath() + "/producer/persistent/sample/test/local/websocket-topic";
         Future<Session> producerSession = producerWebSocketClient.connect(producerSocket, URI.create(produceUri));
 
         ProducerMessage produceRequest = new ProducerMessage();
@@ -116,7 +122,7 @@ public class ProxyServiceStarterTest extends MockedPulsarServiceBaseTest {
         WebSocketClient consumerWebSocketClient = new WebSocketClient(consumerClient);
         consumerWebSocketClient.start();
         MyWebSocket consumerSocket = new MyWebSocket();
-        String consumeUri = "ws://localhost:8080/ws/consumer/persistent/sample/test/local/websocket-topic/my-sub";
+        String consumeUri = computeWsBasePath() + "/consumer/persistent/sample/test/local/websocket-topic/my-sub";
         Future<Session> consumerSession = consumerWebSocketClient.connect(consumerSocket, URI.create(consumeUri));
         consumerSession.get().getRemote().sendPing(ByteBuffer.wrap("ping".getBytes()));
         producerSession.get().getRemote().sendString(ObjectMapperFactory.getThreadLocal().writeValueAsString(produceRequest));

[pulsar] 06/14: Fix consume message order issue when use listener. (#13023)

Posted by yo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 3574bbd4fb440c343f135f8bb15e6f7d3db59460
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Mon Dec 13 16:59:50 2021 +0800

    Fix consume message order issue when use listener. (#13023)
    
    (cherry picked from commit e134e372b3cc007bb507f04076011407cc28b7c0)
---
 .../apache/pulsar/client/impl/ConsumerBase.java    | 42 +++++++++++-----------
 .../apache/pulsar/client/impl/ConsumerImpl.java    |  3 +-
 2 files changed, 23 insertions(+), 22 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 87e4ee9..a78dad8 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -910,30 +910,32 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
     protected void triggerListener() {
         // Trigger the notification on the message listener in a separate thread to avoid blocking the networking
         // thread while the message processing happens
-        try {
-            // Control executor to call MessageListener one by one.
-            if (executorQueueSize.get() < 1) {
-                final Message<T> msg = internalReceive(0, TimeUnit.MILLISECONDS);
-                if (msg != null) {
-                    executorQueueSize.incrementAndGet();
-                    if (SubscriptionType.Key_Shared == conf.getSubscriptionType()) {
-                        executorProvider.getExecutor(peekMessageKey(msg)).execute(() ->
-                                callMessageListener(msg));
-                    } else {
-                        getExternalExecutor(msg).execute(() -> {
-                            callMessageListener(msg);
-                        });
+        internalPinnedExecutor.execute(() -> {
+            try {
+                // Control executor to call MessageListener one by one.
+                if (executorQueueSize.get() < 1) {
+                    final Message<T> msg = internalReceive(0, TimeUnit.MILLISECONDS);
+                    if (msg != null) {
+                        executorQueueSize.incrementAndGet();
+                        if (SubscriptionType.Key_Shared == conf.getSubscriptionType()) {
+                            executorProvider.getExecutor(peekMessageKey(msg)).execute(() ->
+                                    callMessageListener(msg));
+                        } else {
+                            getExternalExecutor(msg).execute(() -> {
+                                callMessageListener(msg);
+                            });
+                        }
                     }
                 }
+            } catch (PulsarClientException e) {
+                log.warn("[{}] [{}] Failed to dequeue the message for listener", topic, subscription, e);
+                return;
             }
-        } catch (PulsarClientException e) {
-            log.warn("[{}] [{}] Failed to dequeue the message for listener", topic, subscription, e);
-            return;
-        }
 
-        if (log.isDebugEnabled()) {
-            log.debug("[{}] [{}] Message has been cleared from the queue", topic, subscription);
-        }
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] [{}] Message has been cleared from the queue", topic, subscription);
+            }
+        });
     }
 
     protected void callMessageListener(Message<T> msg) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index a872db0..f3e6b99 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1086,8 +1086,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
 
             uncompressedPayload.release();
         }
-        internalPinnedExecutor.execute(()
-                -> tryTriggerListener());
+        tryTriggerListener();
 
     }
 

[pulsar] 04/14: Use sendAsync instead of send when produce message to retry topic. (#12946)

Posted by yo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 129ec346d17e745f4e26e08dd440704cbf9f6d74
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Tue Nov 30 01:46:53 2021 +0800

    Use sendAsync instead of send when produce message to retry topic. (#12946)
    
    * Use sendAsync instead of send when produce message to retry letter topic.
    
    * add exception handler.
    
    (cherry picked from commit 09cc1d6aa91422c71601664dac8d94ba574beb7b)
---
 .../main/java/org/apache/pulsar/client/impl/ConsumerImpl.java  | 10 +++++++---
 1 file changed, 7 insertions(+), 3 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index ce11cd6..a872db0 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -582,9 +582,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
                 propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES, String.valueOf(reconsumetimes));
                 propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_DELAY_TIME, String.valueOf(unit.toMillis(delayTime)));
 
+                MessageId finalMessageId = messageId;
                 if (reconsumetimes > this.deadLetterPolicy.getMaxRedeliverCount() && StringUtils.isNotBlank(deadLetterPolicy.getDeadLetterTopic())) {
                     initDeadLetterProducerIfNeeded();
-                    MessageId finalMessageId = messageId;
                     deadLetterProducer.thenAccept(dlqProducer -> {
                         TypedMessageBuilder<byte[]> typedMessageBuilderNew =
                                 dlqProducer.newMessage(Schema.AUTO_PRODUCE_BYTES(retryMessage.getReaderSchema().get()))
@@ -616,8 +616,12 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
                     if (message.hasKey()) {
                         typedMessageBuilderNew.key(message.getKey());
                     }
-                    typedMessageBuilderNew.send();
-                    return doAcknowledge(messageId, ackType, properties, null);
+                    typedMessageBuilderNew.sendAsync()
+                            .thenAccept(__ -> doAcknowledge(finalMessageId, ackType, properties, null).thenAccept(v -> result.complete(null)))
+                            .exceptionally(ex -> {
+                                result.completeExceptionally(ex);
+                                return null;
+                            });
                 }
             } catch (Exception e) {
                 log.error("Send to retry letter topic exception with topic: {}, messageId: {}", retryLetterProducer.getTopic(), messageId, e);

[pulsar] 05/14: feat(cli): support autorecovery service in pulsar cli (#12985)

Posted by yo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit b613f520efdd334edfc157d0d51314f80842c29f
Author: Eric Shen <er...@outlook.com>
AuthorDate: Fri Nov 26 20:08:51 2021 -0600

    feat(cli): support autorecovery service in pulsar cli (#12985)
    
    ### Motivation
    Autorecovery service will be shutdown if the zk session expired and then will lead the bk service shutdown together. So, in the production environment, it is recommand to deploy autorecovery service seperately but currently pulsar doesn't support it.
    
    ### Modifications
    Added the autorecovery service in pulsar cli
    Added the autorecovery service in pulsar-daemon cli
    
    (cherry picked from commit f192209af58e86a2c6da4130956b172b2d1ddc13)
---
 bin/pulsar        | 4 ++++
 bin/pulsar-daemon | 4 ++++
 2 files changed, 8 insertions(+)

diff --git a/bin/pulsar b/bin/pulsar
index 29bda52..bea358a 100755
--- a/bin/pulsar
+++ b/bin/pulsar
@@ -142,6 +142,7 @@ where command is one of:
     sql-worker          Run a sql worker server
     sql                 Run sql CLI
     standalone          Run a broker server with local bookies and local zookeeper
+    autorecovery        Run an autorecovery service
 
     initialize-cluster-metadata     One-time metadata initialization
     delete-cluster-metadata         Delete a cluster's metadata
@@ -353,6 +354,9 @@ elif [ $COMMAND == "functions-worker" ]; then
 elif [ $COMMAND == "standalone" ]; then
     PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"pulsar-standalone.log"}
     exec $JAVA $LOG4J2_SHUTDOWN_HOOK_DISABLED $OPTS ${ZK_OPTS} -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.PulsarStandaloneStarter --config $PULSAR_STANDALONE_CONF $@
+elif [ ${COMMAND} == "autorecovery" ]; then
+    PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"pulsar-autorecovery.log"}
+    exec $JAVA $OPTS -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.bookkeeper.replication.AutoRecoveryMain --conf $PULSAR_BOOKKEEPER_CONF $@
 elif [ $COMMAND == "initialize-cluster-metadata" ]; then
     exec $JAVA $OPTS org.apache.pulsar.PulsarClusterMetadataSetup $@
 elif [ $COMMAND == "delete-cluster-metadata" ]; then
diff --git a/bin/pulsar-daemon b/bin/pulsar-daemon
index 196aaa7..dce11fd 100755
--- a/bin/pulsar-daemon
+++ b/bin/pulsar-daemon
@@ -31,6 +31,7 @@ where command is one of:
     functions-worker    Run a functions worker server
     standalone          Run a standalone Pulsar service
     proxy               Run a Proxy Pulsar service
+    autorecovery        Run an autorecovery service
 
 where argument is one of:
     -force (accepted only with stop command): Decides whether to stop the server forcefully if not stopped by normal shutdown
@@ -106,6 +107,9 @@ case $command in
     (proxy)
         echo "doing $startStop $command ..."
         ;;
+    (autorecovery)
+        echo "doing $startStop $command ..."
+        ;;
     (*)
         echo "Error: unknown service name $command"
         usage

[pulsar] 10/14: Use current resourceUsage value as historyUsage when leader change in ThresholdShedder (#13136)

Posted by yo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 85c4ba066481e0c303bc2928b493e578af79a146
Author: Hang Chen <ch...@apache.org>
AuthorDate: Sat Dec 11 12:31:50 2021 +0800

    Use current resourceUsage value as historyUsage when leader change in ThresholdShedder (#13136)
    
    ### Motivation
    Fix #13119
    
    ### Modification
    1. User current resourceUsage value as historyUsage value when leader change in ThresholdShedder to speed up getting the actual historyUsage value.
    
    (cherry picked from commit 6d9d24d50db5418ddbb845d2c7a2be2b9ac72893)
---
 .../broker/loadbalance/impl/ThresholdShedder.java  | 22 ++++++++++------------
 1 file changed, 10 insertions(+), 12 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java
index 3996592..727be9b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java
@@ -38,13 +38,9 @@ import org.slf4j.LoggerFactory;
 
 public class ThresholdShedder implements LoadSheddingStrategy {
     private static final Logger log = LoggerFactory.getLogger(ThresholdShedder.class);
-
     private final Multimap<String, String> selectedBundlesCache = ArrayListMultimap.create();
-
     private static final double ADDITIONAL_THRESHOLD_PERCENT_MARGIN = 0.05;
-
     private static final double MB = 1024 * 1024;
-
     private final Map<String, Double> brokerAvgResourceUsage = new HashMap<>();
 
     @Override
@@ -139,25 +135,27 @@ public class ThresholdShedder implements LoadSheddingStrategy {
         for (Map.Entry<String, BrokerData> entry : loadData.getBrokerData().entrySet()) {
             LocalBrokerData localBrokerData = entry.getValue().getLocalData();
             String broker = entry.getKey();
-            updateAvgResourceUsage(broker, localBrokerData, historyPercentage, conf);
-            totalUsage += brokerAvgResourceUsage.getOrDefault(broker, 0.0);
+            totalUsage += updateAvgResourceUsage(broker, localBrokerData, historyPercentage, conf);
             totalBrokers++;
         }
 
         return totalBrokers > 0 ? totalUsage / totalBrokers : 0;
     }
 
-    private void updateAvgResourceUsage(String broker, LocalBrokerData localBrokerData, final double historyPercentage,
-                                        final ServiceConfiguration conf) {
-        double historyUsage =
-                brokerAvgResourceUsage.getOrDefault(broker, 0.0);
-        historyUsage = historyUsage * historyPercentage
-                + (1 - historyPercentage) * localBrokerData.getMaxResourceUsageWithWeight(
+    private double updateAvgResourceUsage(String broker, LocalBrokerData localBrokerData,
+                                          final double historyPercentage, final ServiceConfiguration conf) {
+        Double historyUsage =
+                brokerAvgResourceUsage.get(broker);
+        double resourceUsage = localBrokerData.getMaxResourceUsageWithWeight(
                 conf.getLoadBalancerCPUResourceWeight(),
                 conf.getLoadBalancerMemoryResourceWeight(), conf.getLoadBalancerDirectMemoryResourceWeight(),
                 conf.getLoadBalancerBandwithInResourceWeight(),
                 conf.getLoadBalancerBandwithOutResourceWeight());
+        historyUsage = historyUsage == null
+                ? resourceUsage : historyUsage * historyPercentage + (1 - historyPercentage) * resourceUsage;
+
         brokerAvgResourceUsage.put(broker, historyUsage);
+        return historyUsage;
     }
 
 }

[pulsar] 08/14: Don't attempt to delete pending ack store unless transactions are enabled (#13041)

Posted by yo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 8770cfc13ff859f9852b819efc4a30d560d938d2
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Tue Nov 30 19:28:58 2021 +0200

    Don't attempt to delete pending ack store unless transactions are enabled (#13041)
    
    (cherry picked from commit 46720247d9a06daae9f8eae7740887c92406b2c3)
---
 .../broker/service/persistent/PersistentTopic.java | 43 ++++++++++++----------
 1 file changed, 24 insertions(+), 19 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index e1c2983..6ad10fb 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -978,27 +978,32 @@ public class PersistentTopic extends AbstractTopic
     @Override
     public CompletableFuture<Void> unsubscribe(String subscriptionName) {
         CompletableFuture<Void> unsubscribeFuture = new CompletableFuture<>();
-        getBrokerService().getManagedLedgerFactory().asyncDelete(TopicName.get(MLPendingAckStore
-                .getTransactionPendingAckStoreSuffix(topic,
-                        Codec.encode(subscriptionName))).getPersistenceNamingEncoding(),
-                new AsyncCallbacks.DeleteLedgerCallback() {
-            @Override
-            public void deleteLedgerComplete(Object ctx) {
-                asyncDeleteCursor(subscriptionName, unsubscribeFuture);
-            }
 
-            @Override
-            public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx) {
-                if (exception instanceof MetadataNotFoundException) {
-                    asyncDeleteCursor(subscriptionName, unsubscribeFuture);
-                    return;
-                }
+        if (brokerService.pulsar().getConfiguration().isTransactionCoordinatorEnabled()) {
+            getBrokerService().getManagedLedgerFactory().asyncDelete(TopicName.get(MLPendingAckStore
+                            .getTransactionPendingAckStoreSuffix(topic,
+                                    Codec.encode(subscriptionName))).getPersistenceNamingEncoding(),
+                    new AsyncCallbacks.DeleteLedgerCallback() {
+                        @Override
+                        public void deleteLedgerComplete(Object ctx) {
+                            asyncDeleteCursor(subscriptionName, unsubscribeFuture);
+                        }
 
-                unsubscribeFuture.completeExceptionally(exception);
-                log.error("[{}][{}] Error deleting subscription pending ack store",
-                        topic, subscriptionName, exception);
-            }
-        }, null);
+                        @Override
+                        public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx) {
+                            if (exception instanceof MetadataNotFoundException) {
+                                asyncDeleteCursor(subscriptionName, unsubscribeFuture);
+                                return;
+                            }
+
+                            unsubscribeFuture.completeExceptionally(exception);
+                            log.error("[{}][{}] Error deleting subscription pending ack store",
+                                    topic, subscriptionName, exception);
+                        }
+                    }, null);
+        } else {
+            asyncDeleteCursor(subscriptionName, unsubscribeFuture);
+        }
 
         return unsubscribeFuture;
     }

[pulsar] 03/14: Fix Issue #12885, Unordered consuming case in Key_Shared subscription (#12890)

Posted by yo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 75ce0220450715a83f50cfa90acb539db71d8b0a
Author: JiangHaiting <ji...@apache.org>
AuthorDate: Tue Nov 30 20:34:45 2021 +0800

    Fix Issue #12885, Unordered consuming case in Key_Shared subscription (#12890)
    
    (cherry picked from commit 73ef1621ab0bbecfcb2325453a4d93a406fcba3c)
---
 .../PersistentDispatcherMultipleConsumers.java     |  3 +++
 ...istentStickyKeyDispatcherMultipleConsumers.java | 31 ++++++++++++++++++++++
 2 files changed, 34 insertions(+)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 907d180..7ec6f4d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -82,6 +82,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
 
     protected volatile boolean havePendingRead = false;
     protected volatile boolean havePendingReplayRead = false;
+    protected volatile PositionImpl minReplayedPosition = null;
     protected boolean shouldRewindBeforeReadingOrReplaying = false;
     protected final String name;
 
@@ -243,6 +244,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
                 }
 
                 havePendingReplayRead = true;
+                minReplayedPosition = messagesToReplayNow.stream().min(PositionImpl::compareTo).orElse(null);
                 Set<? extends Position> deletedMessages = topic.isDelayedDeliveryEnabled()
                         ? asyncReplayEntriesInOrder(messagesToReplayNow) : asyncReplayEntries(messagesToReplayNow);
                 // clear already acked positions from replay bucket
@@ -266,6 +268,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
                             consumerList.size());
                 }
                 havePendingRead = true;
+                minReplayedPosition = getMessagesToReplayNow(1).stream().findFirst().orElse(null);
                 cursor.asyncReadEntriesOrWait(messagesToRead, bytesToRead, this,
                         ReadType.Normal, topic.getMaxReadPosition());
             } else {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index f3bcbf2..420795c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -169,6 +169,37 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
             return;
         }
 
+        // A corner case that we have to retry a readMoreEntries in order to preserver order delivery.
+        // This may happen when consumer closed. See issue #12885 for details.
+        if (!allowOutOfOrderDelivery) {
+            Set<PositionImpl> messagesToReplayNow = this.getMessagesToReplayNow(1);
+            if (messagesToReplayNow != null && !messagesToReplayNow.isEmpty() && this.minReplayedPosition != null) {
+                PositionImpl relayPosition = messagesToReplayNow.stream().findFirst().get();
+                // If relayPosition is a new entry wither smaller position is inserted for redelivery during this async
+                // read, it is possible that this relayPosition should dispatch to consumer first. So in order to
+                // preserver order delivery, we need to discard this read result, and try to trigger a replay read,
+                // that containing "relayPosition", by calling readMoreEntries.
+                if (relayPosition.compareTo(minReplayedPosition) < 0) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}] Position {} (<{}) is inserted for relay during current {} read, discard this "
+                                + "read and retry with readMoreEntries.",
+                                name, relayPosition, minReplayedPosition, readType);
+                    }
+                    if (readType == ReadType.Normal) {
+                        entries.forEach(entry -> {
+                            long stickyKeyHash = getStickyKeyHash(entry);
+                            addMessageToReplay(entry.getLedgerId(), entry.getEntryId(), stickyKeyHash);
+                            entry.release();
+                        });
+                    } else if (readType == ReadType.Replay) {
+                        entries.forEach(Entry::release);
+                    }
+                    readMoreEntries();
+                    return;
+                }
+            }
+        }
+
         nextStuckConsumers.clear();
 
         final Map<Consumer, List<Entry>> groupedEntries = localGroupedEntries.get();

[pulsar] 09/14: pulsar admin exposes secret for source and sink (#13059)

Posted by yo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit ef4c7f7b753bb349aa58a47fdb393a9375c8c15c
Author: Neng Lu <nl...@streamnative.io>
AuthorDate: Wed Dec 1 18:33:56 2021 -0800

    pulsar admin exposes secret for source and sink (#13059)
    
    Follow-up fix of #12950 for #12834
    
    It turns out the Source and Sink doesn't inherit from Function cmd, so we need to add the api separately.
    
    add the `--secrets` argument into `pulsar-admin [source|sink] create/update/localrun` command
    
    (cherry picked from commit e888c2980f61428650779a8d23fe707bb61a31a1)
---
 .../java/org/apache/pulsar/admin/cli/CmdSinks.java    | 19 ++++++++++++++++++-
 .../java/org/apache/pulsar/admin/cli/CmdSources.java  | 13 +++++++++++++
 2 files changed, 31 insertions(+), 1 deletion(-)

diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
index b44affb..e82ed728 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
@@ -38,7 +38,13 @@ import java.io.File;
 import java.io.IOException;
 import java.lang.reflect.Field;
 import java.lang.reflect.Type;
-import java.util.*;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
@@ -340,6 +346,8 @@ public class CmdSinks extends CmdBase {
         protected Long negativeAckRedeliveryDelayMs;
         @Parameter(names = "--custom-runtime-options", description = "A string that encodes options to customize the runtime, see docs for configured runtime for details")
         protected String customRuntimeOptions;
+        @Parameter(names = "--secrets", description = "The map of secretName to an object that encapsulates how the secret is fetched by the underlying secrets provider")
+        protected String secretsString;
 
         protected SinkConfig sinkConfig;
 
@@ -489,6 +497,15 @@ public class CmdSinks extends CmdBase {
                 sinkConfig.setCustomRuntimeOptions(customRuntimeOptions);
             }
 
+            if (secretsString != null) {
+                Type type = new TypeToken<Map<String, Object>>() {}.getType();
+                Map<String, Object> secretsMap = new Gson().fromJson(secretsString, type);
+                if (secretsMap == null) {
+                    secretsMap = Collections.emptyMap();
+                }
+                sinkConfig.setSecrets(secretsMap);
+            }
+
             // check if configs are valid
             validateSinkConfigs(sinkConfig);
         }
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
index 337847e..7b1e70f 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
@@ -38,6 +38,7 @@ import java.io.File;
 import java.io.IOException;
 import java.lang.reflect.Field;
 import java.lang.reflect.Type;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -325,6 +326,8 @@ public class CmdSources extends CmdBase {
         protected String batchSourceConfigString;
         @Parameter(names = "--custom-runtime-options", description = "A string that encodes options to customize the runtime, see docs for configured runtime for details")
         protected String customRuntimeOptions;
+        @Parameter(names = "--secrets", description = "The map of secretName to an object that encapsulates how the secret is fetched by the underlying secrets provider")
+        protected String secretsString;
 
         protected SourceConfig sourceConfig;
 
@@ -437,6 +440,16 @@ public class CmdSources extends CmdBase {
             if (customRuntimeOptions != null) {
                 sourceConfig.setCustomRuntimeOptions(customRuntimeOptions);
             }
+
+            if (secretsString != null) {
+                Type type = new TypeToken<Map<String, Object>>() {}.getType();
+                Map<String, Object> secretsMap = new Gson().fromJson(secretsString, type);
+                if (secretsMap == null) {
+                    secretsMap = Collections.emptyMap();
+                }
+                sourceConfig.setSecrets(secretsMap);
+            }
+            
             // check if source configs are valid
             validateSourceConfigs(sourceConfig);
         }

[pulsar] 12/14: Update cursor last active timestamp when reseting cursor (#13166)

Posted by yo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit ba155d60bd8472177a1c9de8012dc3419fb5d99e
Author: Zhanpeng Wu <zh...@qq.com>
AuthorDate: Fri Dec 10 17:45:45 2021 +0800

    Update cursor last active timestamp when reseting cursor  (#13166)
    
    Resolves #13165
    
    ### Modifications
    1. trigger last active time update after resetting cursor
    2. add related test case
    
    (cherry picked from commit 26342996f8336dd4f63634d8962b6fef11087485)
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |  2 +-
 .../bookkeeper/mledger/impl/ManagedCursorTest.java | 42 ++++++++++++++++++++++
 2 files changed, 43 insertions(+), 1 deletion(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index b622e55..f22b327 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -1103,7 +1103,7 @@ public class ManagedCursorImpl implements ManagedCursor {
                     }
                 }
                 callback.resetComplete(newPosition);
-
+                updateLastActive();
             }
 
             @Override
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index 2a648ea..1c8960b 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -26,6 +26,7 @@ import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
@@ -770,6 +771,47 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
     }
 
     @Test(timeOut = 20000)
+    void testLastActiveAfterResetCursor() throws Exception {
+        ManagedLedger ledger = factory.open("test_cursor_ledger");
+        ManagedCursor cursor = ledger.openCursor("tla");
+
+        PositionImpl lastPosition = null;
+        for (int i = 0; i < 3; i++) {
+            lastPosition = (PositionImpl) ledger.addEntry("dummy-entry".getBytes(Encoding));
+        }
+
+        final AtomicBoolean moveStatus = new AtomicBoolean(false);
+        CountDownLatch countDownLatch = new CountDownLatch(1);
+
+        long lastActive = cursor.getLastActive();
+
+        cursor.asyncResetCursor(lastPosition, new AsyncCallbacks.ResetCursorCallback() {
+            @Override
+            public void resetComplete(Object ctx) {
+                moveStatus.set(true);
+                countDownLatch.countDown();
+            }
+
+            @Override
+            public void resetFailed(ManagedLedgerException exception, Object ctx) {
+                moveStatus.set(false);
+                countDownLatch.countDown();
+            }
+        });
+
+        countDownLatch.await();
+        assertTrue(moveStatus.get());
+
+        assertNotNull(lastPosition);
+        assertEquals(lastPosition, cursor.getReadPosition());
+
+        assertNotEquals(lastActive, cursor.getLastActive());
+
+        cursor.close();
+        ledger.close();
+    }
+
+    @Test(timeOut = 20000)
     void seekPosition() throws Exception {
         ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(10));
         ManagedCursor cursor = ledger.openCursor("c1");