You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2021/09/08 15:29:41 UTC

[pulsar] branch master updated: Avoid to infinitely split bundle (#11937)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new bef3757  Avoid to infinitely split bundle (#11937)
bef3757 is described below

commit bef3757029eb3e48a490a02871689ab3e6abdfa5
Author: Shoothzj <sh...@gmail.com>
AuthorDate: Wed Sep 8 23:28:13 2021 +0800

    Avoid to infinitely split bundle (#11937)
---
 .../loadbalance/impl/BundleSplitterTask.java       |   8 +-
 .../loadbalance/impl/ModularLoadManagerImpl.java   |   2 +-
 .../loadbalance/impl/BundleSplitterTaskTest.java   | 105 +++++++++++++++++++++
 .../pulsar/client/api/BrokerServiceLookupTest.java |  15 ++-
 4 files changed, 122 insertions(+), 8 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTask.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTask.java
index bb1f990..e81fb50 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTask.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTask.java
@@ -42,10 +42,8 @@ public class BundleSplitterTask implements BundleSplitStrategy {
     /**
      * Construct a BundleSplitterTask.
      *
-     * @param pulsar
-     *            Service to construct from.
      */
-    public BundleSplitterTask(final PulsarService pulsar) {
+    public BundleSplitterTask() {
         bundleCache = new HashSet<>();
     }
 
@@ -74,6 +72,10 @@ public class BundleSplitterTask implements BundleSplitStrategy {
             for (final Map.Entry<String, NamespaceBundleStats> entry : localData.getLastStats().entrySet()) {
                 final String bundle = entry.getKey();
                 final NamespaceBundleStats stats = entry.getValue();
+                if (stats.topics == 1) {
+                    log.info("namespace bundle {} only have 1 topic", bundle);
+                    continue;
+                }
                 double totalMessageRate = 0;
                 double totalMessageThroughput = 0;
                 // Attempt to consider long-term message data, otherwise effectively ignore.
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index cd175a8..dcea560 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -248,7 +248,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager, Consumer<Noti
             brokerHostUsage = new GenericBrokerHostUsageImpl(pulsar);
         }
 
-        bundleSplitStrategy = new BundleSplitterTask(pulsar);
+        bundleSplitStrategy = new BundleSplitterTask();
 
         conf = pulsar.getConfiguration();
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTaskTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTaskTest.java
new file mode 100644
index 0000000..7480989
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTaskTest.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.impl;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.BrokerData;
+import org.apache.pulsar.broker.BundleData;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.TimeAverageMessageData;
+import org.apache.pulsar.broker.loadbalance.LoadData;
+import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
+import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
+import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * @author hezhangjian
+ */
+@Slf4j
+@Test(groups = "broker")
+public class BundleSplitterTaskTest {
+
+    private LocalBookkeeperEnsemble bkEnsemble;
+
+    private PulsarService pulsar;
+
+    @BeforeMethod
+    void setup() throws Exception {
+        // Start local bookkeeper ensemble
+        bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0);
+        bkEnsemble.start();
+        // Start broker
+        ServiceConfiguration config = new ServiceConfiguration();
+        config.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
+        config.setClusterName("use");
+        config.setWebServicePort(Optional.of(0));
+        config.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+
+        config.setAdvertisedAddress("localhost");
+        config.setBrokerShutdownTimeoutMs(0L);
+        config.setBrokerServicePort(Optional.of(0));
+        config.setBrokerServicePortTls(Optional.of(0));
+        config.setWebServicePortTls(Optional.of(0));
+        pulsar = new PulsarService(config);
+        pulsar.start();
+    }
+
+    @Test
+    public void testSplitTaskWhenTopicJustOne() {
+        final BundleSplitterTask bundleSplitterTask = new BundleSplitterTask();
+        LoadData loadData = new LoadData();
+
+        LocalBrokerData brokerData = new LocalBrokerData();
+        Map<String, NamespaceBundleStats> lastStats = new HashMap<>();
+        final NamespaceBundleStats namespaceBundleStats = new NamespaceBundleStats();
+        namespaceBundleStats.topics = 1;
+        lastStats.put("ten/ns/0x00000000_0x80000000", namespaceBundleStats);
+        brokerData.setLastStats(lastStats);
+        loadData.getBrokerData().put("broker", new BrokerData(brokerData));
+
+        BundleData bundleData = new BundleData();
+        TimeAverageMessageData averageMessageData = new TimeAverageMessageData();
+        averageMessageData.setMsgRateIn(pulsar.getConfiguration().getLoadBalancerNamespaceBundleMaxMsgRate());
+        averageMessageData.setMsgRateOut(1);
+        bundleData.setLongTermData(averageMessageData);
+        loadData.getBundleData().put("ten/ns/0x00000000_0x80000000", bundleData);
+
+        final Set<String> bundlesToSplit = bundleSplitterTask.findBundlesToSplit(loadData, pulsar);
+        Assert.assertEquals(bundlesToSplit.size(), 0);
+    }
+
+
+    @AfterMethod(alwaysRun = true)
+    void shutdown() throws Exception {
+        log.info("--- Shutting down ---");
+        pulsar.close();
+        bkEnsemble.stop();
+    }
+
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
index cb349bf..36ff448 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
@@ -1019,6 +1019,13 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase {
             Consumer<byte[]> consumer1 = pulsarClient2.newConsumer().topic(topic1)
                     .subscriptionName("my-subscriber-name").subscribe();
 
+            // there should be more than one topic to trigger split
+            final String topic2 = "persistent://" + namespace + "/topic2";
+            @Cleanup
+            Consumer<byte[]> consumer2 = pulsarClient2.newConsumer().topic(topic2)
+                    .subscriptionName("my-subscriber-name")
+                    .subscribe();
+
             // (4) Broker-1 will own topic-1
             final String unsplitBundle = namespace + "/0x00000000_0xffffffff";
 
@@ -1054,7 +1061,7 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase {
             updateAllMethod.invoke(loadManager);
             conf2.setLoadBalancerAutoBundleSplitEnabled(true);
             conf2.setLoadBalancerAutoUnloadSplitBundlesEnabled(true);
-            conf2.setLoadBalancerNamespaceBundleMaxTopics(0);
+            conf2.setLoadBalancerNamespaceBundleMaxTopics(1);
             loadManager.checkNamespaceBundleSplit();
 
             // (6) Broker-2 should get the watch and update bundle cache
@@ -1063,15 +1070,15 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase {
             });
 
             // (7) Make lookup request again to Broker-2 which should succeed.
-            final String topic2 = "persistent://" + namespace + "/topic2";
+            final String topic3 = "persistent://" + namespace + "/topic3";
             @Cleanup
-            Consumer<byte[]> consumer2 = pulsarClient2.newConsumer().topic(topic2)
+            Consumer<byte[]> consumer3 = pulsarClient2.newConsumer().topic(topic3)
                     .subscriptionName("my-subscriber-name")
                     .subscribe();
 
             Awaitility.await().untilAsserted(() -> {
                 NamespaceBundle bundleInBroker1AfterSplit = pulsar2.getNamespaceService()
-                        .getBundle(TopicName.get(topic2));
+                        .getBundle(TopicName.get(topic3));
                 assertNotEquals(bundleInBroker1AfterSplit.toString(), unsplitBundle);
             });
         } finally {