You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/12/21 13:08:19 UTC

[pulsar] branch branch-2.7 updated (70fec14 -> 21b9a00)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a change to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


    from 70fec14  consumer support update stats with specified stats (#8951)
     new d9b3b79  fix npe (#8969)
     new 21b9a00  Fix the metadata setup compatibility issue (#8959)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/pulsar/PulsarClusterMetadataSetup.java  | 20 ++++++++++++---
 ...istentStickyKeyDispatcherMultipleConsumers.java |  2 +-
 ...ntStickyKeyDispatcherMultipleConsumersTest.java | 29 ++++++++++++++++++++++
 .../broker/zookeeper/ClusterMetadataSetupTest.java | 16 ++++++++++++
 4 files changed, 63 insertions(+), 4 deletions(-)


[pulsar] 02/02: Fix the metadata setup compatibility issue (#8959)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 21b9a00d413f06cdb7d450d935a456b73a5c0fea
Author: Yong Zhang <zh...@gmail.com>
AuthorDate: Thu Dec 17 03:24:41 2020 +0800

    Fix the metadata setup compatibility issue (#8959)
    
    ---
    
    *Motivation*
    
    metadata command is not compatible with old versions.
    For keeping the compatibility of the setup metadata command,
    we should avoid deleting the existing arguments. If we need to
    change it, it's better to keep the old arguments and mark it
    deprecates and hide them.
    
    (cherry picked from commit d7da7af13cb86cd5684434c2fe4242d132cbd107)
---
 .../apache/pulsar/PulsarClusterMetadataSetup.java    | 20 +++++++++++++++++---
 .../broker/zookeeper/ClusterMetadataSetupTest.java   | 16 ++++++++++++++++
 2 files changed, 33 insertions(+), 3 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java
index dd14ce5..3dba3ee 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java
@@ -111,6 +111,15 @@ public class PulsarClusterMetadataSetup {
             "--existing-bk-metadata-service-uri" }, description = "The metadata service URI of the existing BookKeeper cluster that you want to use")
         private String existingBkMetadataServiceUri;
 
+        // Hide and marked as deprecated this flag because we use the new name '--existing-bk-metadata-service-uri' to
+        // pass the service url. For compatibility of the command, we should keep both to avoid the exceptions.
+        @Deprecated
+        @Parameter(names = {
+            "--bookkeeper-metadata-service-uri"},
+            description = "The metadata service URI of the existing BookKeeper cluster that you want to use",
+            hidden = true)
+        private String bookieMetadataServiceUri;
+
         @Parameter(names = { "-h", "--help" }, description = "Show this help message")
         private boolean help = false;
     }
@@ -173,18 +182,23 @@ public class PulsarClusterMetadataSetup {
 
         // Format BookKeeper ledger storage metadata
         ServerConfiguration bkConf = new ServerConfiguration();
-        if (arguments.existingBkMetadataServiceUri == null) {
+        if (arguments.existingBkMetadataServiceUri == null && arguments.bookieMetadataServiceUri == null) {
             bkConf.setZkServers(arguments.zookeeper);
             bkConf.setZkTimeout(arguments.zkSessionTimeoutMillis);
             if (localZk.exists("/ledgers", false) == null // only format if /ledgers doesn't exist
-                    && !BookKeeperAdmin.format(bkConf, false /* interactive */, false /* force */)) {
+                && !BookKeeperAdmin.format(bkConf, false /* interactive */, false /* force */)) {
                 throw new IOException("Failed to initialize BookKeeper metadata");
             }
         }
 
         // Format BookKeeper stream storage metadata
         if (arguments.numStreamStorageContainers > 0) {
-            String uriStr = arguments.existingBkMetadataServiceUri == null ? bkConf.getMetadataServiceUri() : arguments.existingBkMetadataServiceUri;
+            String uriStr = bkConf.getMetadataServiceUri();
+            if (arguments.existingBkMetadataServiceUri != null) {
+                uriStr = arguments.existingBkMetadataServiceUri;
+            } else if (arguments.bookieMetadataServiceUri != null) {
+                uriStr = arguments.bookieMetadataServiceUri;
+            }
             ServiceURI bkMetadataServiceUri = ServiceURI.create(uriStr);
             ClusterInitializer initializer = new ZkClusterInitializer(arguments.zookeeper);
             initializer.initializeCluster(bkMetadataServiceUri.getUri(), arguments.numStreamStorageContainers);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ClusterMetadataSetupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ClusterMetadataSetupTest.java
index 8c2c007..163dc6b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ClusterMetadataSetupTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ClusterMetadataSetupTest.java
@@ -68,6 +68,22 @@ public class ClusterMetadataSetupTest {
         // expected not exist
         assertNull(localZk.exists("/ledgers", false));
 
+        String[] bookkeeperMetadataServiceUriArgs = {
+            "--cluster", "testReSetupClusterMetadata-cluster",
+            "--zookeeper", zkConnection,
+            "--configuration-store", zkConnection,
+            "--bookkeeper-metadata-service-uri", "zk+null://" + zkConnection + "/chroot/ledgers",
+            "--web-service-url", "http://127.0.0.1:8080",
+            "--web-service-url-tls", "https://127.0.0.1:8443",
+            "--broker-service-url", "pulsar://127.0.0.1:6650",
+            "--broker-service-url-tls","pulsar+ssl://127.0.0.1:6651"
+        };
+
+        PulsarClusterMetadataSetup.main(bookkeeperMetadataServiceUriArgs);
+        ZooKeeper bookkeeperMetadataServiceUriZk = PulsarClusterMetadataSetup.initZk(zkConnection, 30000);
+        // expected not exist
+        assertNull(bookkeeperMetadataServiceUriZk.exists("/ledgers", false));
+
         String[] args1 = {
                 "--cluster", "testReSetupClusterMetadata-cluster",
                 "--zookeeper", zkConnection,


[pulsar] 01/02: fix npe (#8969)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit d9b3b799da4ee506123d7c2c8fbaef94acf472a7
Author: feynmanlin <fe...@tencent.com>
AuthorDate: Thu Dec 17 01:29:26 2020 +0800

    fix npe (#8969)
    
    
    (cherry picked from commit 19242a63e8890c790690646be7578f8d1cb25ed2)
---
 ...istentStickyKeyDispatcherMultipleConsumers.java |  2 +-
 ...ntStickyKeyDispatcherMultipleConsumersTest.java | 29 ++++++++++++++++++++++
 2 files changed, 30 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index c8592ba..d7d08d1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -176,7 +176,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
             Consumer consumer = current.getKey();
             List<Entry> entriesWithSameKey = current.getValue();
             int entriesWithSameKeyCount = entriesWithSameKey.size();
-            final int availablePermits = Math.max(consumer.getAvailablePermits(), 0);
+            final int availablePermits = consumer == null ? 0 : Math.max(consumer.getAvailablePermits(), 0);
             int maxMessagesForC = Math.min(entriesWithSameKeyCount, availablePermits);
             int messagesForC = getRestrictedMaxEntriesForConsumer(consumer, entriesWithSameKey, maxMessagesForC, readType);
             if (log.isDebugEnabled()) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
index c281400..a902ac2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
@@ -166,6 +166,35 @@ public class PersistentStickyKeyDispatcherMultipleConsumersTest {
         Assert.assertEquals(allTotalMessagesCaptor.get(0).intValue(), 5);
     }
 
+    @Test(timeOut = 10000)
+    public void testSendMessage() {
+        KeySharedMeta keySharedMeta = KeySharedMeta.newBuilder().setKeySharedMode(PulsarApi.KeySharedMode.STICKY).build();
+        PersistentStickyKeyDispatcherMultipleConsumers persistentDispatcher = new PersistentStickyKeyDispatcherMultipleConsumers(
+                topicMock, cursorMock, subscriptionMock, configMock, keySharedMeta);
+        try {
+            PulsarApi.IntRange.newBuilder().setStart(0).setEnd(9).build();
+            keySharedMeta = PulsarApi.KeySharedMeta.newBuilder().setKeySharedMode(PulsarApi.KeySharedMode.STICKY)
+                    .addHashRanges(PulsarApi.IntRange.newBuilder().setStart(0).setEnd(9).build()).build();
+            Consumer consumerMock = mock(Consumer.class);
+            doReturn(keySharedMeta).when(consumerMock).getKeySharedMeta();
+            persistentDispatcher.addConsumer(consumerMock);
+            persistentDispatcher.consumerFlow(consumerMock, 1000);
+        } catch (Exception e) {
+            fail("Failed to add mock consumer", e);
+        }
+
+        List<Entry> entries = new ArrayList<>();
+        entries.add(EntryImpl.create(1, 1, createMessage("message1", 1)));
+        entries.add(EntryImpl.create(1, 2, createMessage("message2", 2)));
+
+        try {
+            //Should success,see issue #8960
+            persistentDispatcher.readEntriesComplete(entries, PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Normal);
+        } catch (Exception e) {
+            fail("Failed to readEntriesComplete.", e);
+        }
+    }
+
     @Test
     public void testSkipRedeliverTemporally() {
         final Consumer slowConsumerMock = mock(Consumer.class);