You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/02/01 02:18:11 UTC

[GitHub] merlimat closed pull request #1157: Allow to override the auto-detected NIC speed limit

merlimat closed pull request #1157: Allow to override the auto-detected NIC speed limit
URL: https://github.com/apache/incubator-pulsar/pull/1157
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/conf/broker.conf b/conf/broker.conf
index 7f9dd3396..979f4e2d9 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -355,6 +355,14 @@ loadBalancerNamespaceBundleMaxBandwidthMbytes=100
 # maximum number of bundles in a namespace
 loadBalancerNamespaceMaximumBundles=128
 
+# Override the auto-detection of the network interfaces max speed. 
+# This option is useful in some environments (eg: EC2 VMs) where the max speed
+# reported by Linux is not reflecting the real bandwidth available to the broker.
+# Since the network usage is employed by the load manager to decide when a broker
+# is overloaded, it is important to make sure the info is correct or override it 
+# with the right value here.
+loadBalancerOverrideBrokerNicSpeedGbps=
+
 # Name of load manager to use
 loadManagerClassName=org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl
 
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 88117e077..0cc40011b 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -18,10 +18,9 @@
  */
 package org.apache.pulsar.broker;
 
-import java.lang.reflect.Field;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
+import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
 
@@ -349,6 +348,9 @@
     @FieldContext(dynamic = true)
     private String loadManagerClassName = "org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl";
 
+    // Option to override the auto-detected network interfaces max speed
+    private Integer loadBalancerOverrideBrokerNicSpeedGbps;
+
     /**** --- Replication --- ****/
     // Enable replication metrics
     private boolean replicationMetricsEnabled = false;
@@ -1218,6 +1220,14 @@ public int getLoadBalancerNamespaceMaximumBundles() {
         return this.loadBalancerNamespaceMaximumBundles;
     }
 
+    public Optional<Integer> getLoadBalancerOverrideBrokerNicSpeedGbps() {
+        return Optional.ofNullable(loadBalancerOverrideBrokerNicSpeedGbps);
+    }
+
+    public void setLoadBalancerOverrideBrokerNicSpeedGbps(int loadBalancerOverrideBrokerNicSpeedGbps) {
+        this.loadBalancerOverrideBrokerNicSpeedGbps = loadBalancerOverrideBrokerNicSpeedGbps;
+    }
+
     public boolean isReplicationMetricsEnabled() {
         return replicationMetricsEnabled;
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImpl.java
index 4b62f84a7..7cdf84c30 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImpl.java
@@ -18,15 +18,6 @@
  */
 package org.apache.pulsar.broker.loadbalance.impl;
 
-import com.sun.management.OperatingSystemMXBean;
-
-import org.apache.pulsar.broker.PulsarService;
-import org.apache.pulsar.broker.loadbalance.BrokerHostUsage;
-import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
-import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.nio.file.Files;
@@ -35,10 +26,20 @@
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.loadbalance.BrokerHostUsage;
+import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
+import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.sun.management.OperatingSystemMXBean;
+
 /**
  * Class that will return the broker host usage.
  *
@@ -54,6 +55,8 @@
     private OperatingSystemMXBean systemBean;
     private SystemResourceUsage usage;
 
+    private final Optional<Integer> overrideBrokerNicSpeedGbps;
+
     private static final Logger LOG = LoggerFactory.getLogger(LinuxBrokerHostUsageImpl.class);
 
     public LinuxBrokerHostUsageImpl(PulsarService pulsar) {
@@ -61,6 +64,7 @@ public LinuxBrokerHostUsageImpl(PulsarService pulsar) {
         this.systemBean = (OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean();
         this.lastCollection = 0L;
         this.usage = new SystemResourceUsage();
+        this.overrideBrokerNicSpeedGbps = pulsar.getConfiguration().getLoadBalancerOverrideBrokerNicSpeedGbps();
         pulsar.getLoadManagerExecutor().scheduleAtFixedRate(this::calculateBrokerHostUsage, 0,
                 hostUsageCheckIntervalMin, TimeUnit.MINUTES);
     }
@@ -117,12 +121,12 @@ private double getTotalCpuLimit() {
 
     /**
      * Reads first line of /proc/stat to get total cpu usage.
-     * 
+     *
      * <pre>
      *     cpu  user   nice system idle    iowait irq softirq steal guest guest_nice
      *     cpu  317808 128  58637  2503692 7634   0   13472   0     0     0
      * </pre>
-     * 
+     *
      * Line is split in "words", filtering the first. The sum of all numbers give the amount of cpu cycles used this
      * far. Real CPU usage should equal the sum substracting the idle cycles, this would include iowait, irq and steal.
      */
@@ -175,6 +179,12 @@ private Path getNicSpeedPath(String nic) {
     }
 
     private double getTotalNicLimitKbps(List<String> nics) {
+        if (overrideBrokerNicSpeedGbps.isPresent()) {
+            // Use the override value as configured. Return the total max speed across all available NICs, converted
+            // from Gbps into Kbps
+            return ((double) overrideBrokerNicSpeedGbps.get()) * nics.size() * 1024 * 1024;
+        }
+
         // Nic speed is in Mbits/s, return kbits/s
         return nics.stream().mapToDouble(s -> {
             try {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadReportNetworkLimit.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadReportNetworkLimit.java
new file mode 100644
index 000000000..7f8324731
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadReportNetworkLimit.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance;
+
+import org.apache.commons.lang3.SystemUtils;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+import static org.testng.Assert.*;
+
+public class LoadReportNetworkLimit extends MockedPulsarServiceBaseTest {
+
+    @BeforeClass
+    @Override
+    public void setup() throws Exception {
+        conf.setLoadBalancerEnabled(true);
+        conf.setLoadBalancerOverrideBrokerNicSpeedGbps(5);
+        super.internalSetup();
+    }
+
+    @AfterClass
+    @Override
+    public void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void checkLoadReportNicSpeed() throws Exception {
+        // Since we have overridden the NIC speed in the configuration, the load report for the broker should always
+
+        LoadManagerReport report = admin.brokerStats().getLoadReport();
+
+        if (SystemUtils.IS_OS_LINUX) {
+            assertEquals(report.getBandwidthIn().limit, 5.0 * 1024 * 1024);
+            assertEquals(report.getBandwidthOut().limit, 5.0 * 1024 * 1024);
+        } else {
+            // On non-Linux system we don't report the network usage
+            assertEquals(report.getBandwidthIn().limit, -1.0);
+            assertEquals(report.getBandwidthOut().limit, -1.0);
+        }
+    }
+
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
index f19803cbf..bad061c81 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
@@ -29,6 +29,7 @@
 import java.io.PrintWriter;
 import java.io.StringWriter;
 import java.nio.charset.StandardCharsets;
+import java.util.Optional;
 import java.util.Properties;
 
 import org.apache.pulsar.broker.ServiceConfiguration;
@@ -36,7 +37,7 @@
 import org.testng.annotations.Test;
 
 /**
- * 
+ *
  *
  */
 public class ServiceConfigurationTest {
@@ -45,7 +46,7 @@
 
     /**
      * test {@link ServiceConfiguration} initialization
-     * 
+     *
      * @throws Exception
      */
     @Test
@@ -59,9 +60,25 @@ public void testInit() throws Exception {
         assertEquals(config.getBootstrapNamespaces().get(1), "ns2");
     }
 
+    @Test
+    public void testOptionalSettingEmpty() throws Exception {
+        String confFile = "loadBalancerOverrideBrokerNicSpeedGbps=\n";
+        InputStream stream = new ByteArrayInputStream(confFile.getBytes());
+        final ServiceConfiguration config = PulsarConfigurationLoader.create(stream, ServiceConfiguration.class);
+        assertEquals(config.getLoadBalancerOverrideBrokerNicSpeedGbps(), Optional.empty());
+    }
+
+    @Test
+    public void testOptionalSettingPresent() throws Exception {
+        String confFile = "loadBalancerOverrideBrokerNicSpeedGbps=5\n";
+        InputStream stream = new ByteArrayInputStream(confFile.getBytes());
+        final ServiceConfiguration config = PulsarConfigurationLoader.create(stream, ServiceConfiguration.class);
+        assertEquals(config.getLoadBalancerOverrideBrokerNicSpeedGbps(), Optional.of(5));
+    }
+
     /**
      * test {@link ServiceConfiguration} with incorrect values.
-     * 
+     *
      * @throws Exception
      */
     @Test(expectedExceptions = IllegalArgumentException.class)
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FieldParser.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FieldParser.java
index c7bab5f3a..ebac11186 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FieldParser.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FieldParser.java
@@ -18,8 +18,6 @@
  */
 package org.apache.pulsar.common.util;
 
-import org.apache.pulsar.common.policies.data.AuthAction;
-
 import static com.google.common.base.Preconditions.checkNotNull;
 import static java.lang.String.format;
 
@@ -33,6 +31,8 @@
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import io.netty.util.internal.StringUtil;
+
 /**
  *
  * Generic value converter.
@@ -192,7 +192,12 @@ private static void initWrappers() {
      * @return The converted Integer value.
      */
     public static Integer stringToInteger(String val) {
-        return Integer.valueOf(trim(val));
+        String v = trim(val);
+        if (StringUtil.isNullOrEmpty(v)) {
+            return null;
+        } else {
+            return Integer.valueOf(v);
+        }
     }
 
     /**


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services