You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/12/14 11:39:15 UTC

[pulsar] branch branch-2.11 updated (c3bf25b1fb4 -> e1ea99c6419)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a change to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


    from c3bf25b1fb4 [improve][misc] Upgrade Netty to 4.1.86.Final and Netty Tcnative to 2.0.54.Final (#18599)
     new a8c10978efe [fix][broker] Fix incorrect Nic usage collected by pulsar (#18882)
     new e1ea99c6419 [fix][broker] Fix the breaking change of standalone metadata initialization (#18909)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../java/org/apache/pulsar/PulsarStandalone.java   |  58 +++++----
 .../pulsar/broker/loadbalance/LinuxInfoUtils.java  |  57 ++++++++-
 .../loadbalance/impl/LinuxBrokerHostUsageImpl.java |   4 +-
 .../pulsar/MockTokenAuthenticationProvider.java    | 104 ++++++++++++++++
 .../org/apache/pulsar/PulsarStandaloneTest.java    | 133 ++++++++++-----------
 .../loadbalance/LoadReportNetworkLimitTest.java    |   8 +-
 .../configurations/standalone_no_client_auth.conf  |  20 ++--
 .../tests/integration/standalone/SmokeTest.java    |  15 ---
 .../topologies/PulsarStandaloneTestBase.java       |   3 -
 9 files changed, 265 insertions(+), 137 deletions(-)
 create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/MockTokenAuthenticationProvider.java
 copy .github/actions/copy-test-reports/action.yml => pulsar-broker/src/test/resources/configurations/standalone_no_client_auth.conf (67%)


[pulsar] 02/02: [fix][broker] Fix the breaking change of standalone metadata initialization (#18909)

Posted by te...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit e1ea99c641951e3c76afcbbf08ffd399023bffd4
Author: Yunze Xu <xy...@163.com>
AuthorDate: Wed Dec 14 19:35:24 2022 +0800

    [fix][broker] Fix the breaking change of standalone metadata initialization (#18909)
---
 .../java/org/apache/pulsar/PulsarStandalone.java   |  58 +++++----
 .../pulsar/MockTokenAuthenticationProvider.java    | 104 ++++++++++++++++
 .../org/apache/pulsar/PulsarStandaloneTest.java    | 133 ++++++++++-----------
 .../configurations/standalone_no_client_auth.conf  |  31 +++++
 .../tests/integration/standalone/SmokeTest.java    |  15 ---
 .../topologies/PulsarStandaloneTestBase.java       |   3 -
 6 files changed, 225 insertions(+), 119 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java
index b5cc3462691..6317d02eb58 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java
@@ -25,6 +25,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Sets;
 import java.io.File;
 import java.nio.file.Paths;
+import java.util.List;
 import java.util.Optional;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.conf.ServerConfiguration;
@@ -32,9 +33,9 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.broker.resources.ClusterResources;
 import org.apache.pulsar.broker.resources.NamespaceResources;
-import org.apache.pulsar.broker.resources.TenantResources;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
@@ -367,36 +368,31 @@ public class PulsarStandalone implements AutoCloseable {
         log.debug("--- setup completed ---");
     }
 
-    @VisibleForTesting
-    void createNameSpace(String cluster, String publicTenant, NamespaceName ns) throws Exception {
-        ClusterResources cr = broker.getPulsarResources().getClusterResources();
-        TenantResources tr = broker.getPulsarResources().getTenantResources();
-        NamespaceResources nsr = broker.getPulsarResources().getNamespaceResources();
-
-        if (!cr.clusterExists(cluster)) {
-            cr.createCluster(cluster,
-                    ClusterData.builder()
-                            .serviceUrl(broker.getWebServiceAddress())
-                            .serviceUrlTls(broker.getWebServiceAddressTls())
-                            .brokerServiceUrl(broker.getBrokerServiceUrl())
-                            .brokerServiceUrlTls(broker.getBrokerServiceUrlTls())
-                            .build());
-        }
-
-        if (!tr.tenantExists(publicTenant)) {
-            tr.createTenant(publicTenant,
-                    TenantInfo.builder()
-                            .adminRoles(Sets.newHashSet(config.getSuperUserRoles()))
-                            .allowedClusters(Sets.newHashSet(cluster))
-                            .build());
-        }
-
-        if (!nsr.namespaceExists(ns)) {
-            try {
-                broker.getAdminClient().namespaces().createNamespace(ns.toString());
-            } catch (Exception e) {
-                log.warn("Failed to create the default namespace {}: {}", ns, e.getMessage());
+    private void createNameSpace(String cluster, String publicTenant, NamespaceName ns) throws Exception {
+        PulsarAdmin admin = broker.getAdminClient();
+        try {
+            final List<String> clusters = admin.clusters().getClusters();
+            if (!clusters.contains(cluster)) {
+                admin.clusters().createCluster(cluster, ClusterData.builder()
+                        .serviceUrl(broker.getWebServiceAddress())
+                        .serviceUrlTls(broker.getWebServiceAddressTls())
+                        .brokerServiceUrl(broker.getBrokerServiceUrl())
+                        .brokerServiceUrlTls(broker.getBrokerServiceUrlTls())
+                        .build());
+            }
+            final List<String> tenants = admin.tenants().getTenants();
+            if (!tenants.contains(publicTenant)) {
+                admin.tenants().createTenant(publicTenant, TenantInfo.builder()
+                        .adminRoles(Sets.newHashSet(config.getSuperUserRoles()))
+                        .allowedClusters(Sets.newHashSet(cluster))
+                        .build());
+            }
+            final List<String> namespaces = admin.namespaces().getNamespaces(publicTenant);
+            if (!namespaces.contains(ns.toString())) {
+                admin.namespaces().createNamespace(ns.toString(), config.getDefaultNumberOfNamespaceBundles());
             }
+        } catch (PulsarAdminException e) {
+            log.error("Failed to create namespace {} on cluster {} and tenant {}", ns, cluster, publicTenant, e);
         }
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/MockTokenAuthenticationProvider.java b/pulsar-broker/src/test/java/org/apache/pulsar/MockTokenAuthenticationProvider.java
new file mode 100644
index 00000000000..e2593c9fe40
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/MockTokenAuthenticationProvider.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import javax.naming.AuthenticationException;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
+import org.apache.pulsar.broker.authentication.AuthenticationProvider;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationDataProvider;
+import org.apache.pulsar.client.api.PulsarClientException;
+
+public class MockTokenAuthenticationProvider implements AuthenticationProvider {
+
+    public static final String KEY = "role";
+
+    @Override
+    public void initialize(ServiceConfiguration config) throws IOException {
+        // No ops
+    }
+
+    @Override
+    public String getAuthMethodName() {
+        return "mock";
+    }
+
+    @Override
+    public String authenticate(AuthenticationDataSource authData) throws AuthenticationException {
+        if (!authData.hasDataFromHttp()) {
+            throw new AuthenticationException("No HTTP data in " + authData);
+        }
+        String value = authData.getHttpHeader(KEY);
+        if (value == null) {
+            throw new AuthenticationException("No HTTP header for " + KEY);
+        }
+        return value;
+    }
+
+    @Override
+    public void close() throws IOException {
+        // No ops
+    }
+
+    public static class MockAuthentication implements Authentication {
+
+        @Override
+        public AuthenticationDataProvider getAuthData(String brokerHostName) throws PulsarClientException {
+            return new MockAuthenticationData();
+        }
+
+        @Override
+        public String getAuthMethodName() {
+            return "mock";
+        }
+
+        @Override
+        public void configure(Map<String, String> authParams) {
+            // No ops
+        }
+
+        @Override
+        public void start() throws PulsarClientException {
+            // No ops
+        }
+
+        @Override
+        public void close() throws IOException {
+            // No ops
+        }
+    }
+
+    private static class MockAuthenticationData implements AuthenticationDataProvider {
+
+        @Override
+        public boolean hasDataForHttp() {
+            return true;
+        }
+
+        @Override
+        public Set<Map.Entry<String, String>> getHttpHeaders() throws Exception {
+            return Collections.singletonMap(KEY, "admin").entrySet();
+        }
+    }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/PulsarStandaloneTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/PulsarStandaloneTest.java
index f62ce1c5e46..a3bca0a8f75 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/PulsarStandaloneTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/PulsarStandaloneTest.java
@@ -19,89 +19,30 @@
 package org.apache.pulsar;
 
 import static org.apache.commons.io.FileUtils.cleanDirectory;
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
 import java.io.File;
+import java.util.Collections;
 import java.util.List;
+import lombok.Cleanup;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.util.IOUtils;
-import org.apache.pulsar.client.admin.Namespaces;
 import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.admin.PulsarAdminException;
-import org.apache.pulsar.common.naming.NamespaceName;
-import org.apache.pulsar.broker.PulsarService;
-import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.broker.resources.ClusterResources;
-import org.apache.pulsar.broker.resources.NamespaceResources;
-import org.apache.pulsar.broker.resources.PulsarResources;
-import org.apache.pulsar.broker.resources.TenantResources;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.testng.Assert;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 @Test(groups = "broker")
 public class PulsarStandaloneTest {
 
-    @Test
-    public void testCreateNameSpace() throws Exception {
-        final String cluster = "cluster1";
-        final String tenant = "tenant1";
-        final NamespaceName ns = NamespaceName.get(tenant, "ns1");
-
-        ClusterResources cr = mock(ClusterResources.class);
-        when(cr.clusterExists(cluster)).thenReturn(false).thenReturn(true);
-        doNothing().when(cr).createCluster(eq(cluster), any());
-
-        TenantResources tr = mock(TenantResources.class);
-        when(tr.tenantExists(tenant)).thenReturn(false).thenReturn(true);
-        doNothing().when(tr).createTenant(eq(tenant), any());
-
-        NamespaceResources nsr = mock(NamespaceResources.class);
-        when(nsr.namespaceExists(ns)).thenReturn(false).thenReturn(true).thenReturn(false);
-        doNothing().when(nsr).createPolicies(eq(ns), any());
-
-        PulsarResources resources = mock(PulsarResources.class);
-        when(resources.getClusterResources()).thenReturn(cr);
-        when(resources.getTenantResources()).thenReturn(tr);
-        when(resources.getNamespaceResources()).thenReturn(nsr);
-
-        Namespaces namespaces = mock(Namespaces.class);
-        doNothing().when(namespaces).createNamespace(any());
-        PulsarAdmin admin = mock(PulsarAdmin.class);
-        when(admin.namespaces()).thenReturn(namespaces);
-
-        PulsarService broker = mock(PulsarService.class);
-        when(broker.getPulsarResources()).thenReturn(resources);
-        when(broker.getWebServiceAddress()).thenReturn("pulsar://localhost:8080");
-        when(broker.getWebServiceAddressTls()).thenReturn(null);
-        when(broker.getBrokerServiceUrl()).thenReturn("pulsar://localhost:6650");
-        when(broker.getBrokerServiceUrlTls()).thenReturn(null);
-        when(broker.getAdminClient()).thenReturn(admin);
-
-        ServiceConfiguration config = new ServiceConfiguration();
-        config.setClusterName(cluster);
-
-        PulsarStandalone standalone = new PulsarStandalone();
-        standalone.setBroker(broker);
-        standalone.setConfig(config);
-
-        standalone.createNameSpace(cluster, tenant, ns);
-        standalone.createNameSpace(cluster, tenant, ns);
-        verify(cr, times(1)).createCluster(eq(cluster), any());
-        verify(tr, times(1)).createTenant(eq(tenant), any());
-        verify(admin, times(1)).namespaces();
-        verify(admin.namespaces(), times(1)).createNamespace(eq(ns.toString()));
-
-        doThrow(new PulsarAdminException("No permission")).when(namespaces).createNamespace(any());
-        standalone.createNameSpace(cluster, tenant, ns);
+    @DataProvider
+    public Object[][] enableBrokerClientAuth() {
+        return new Object[][] { { true }, { false } };
     }
 
-    @Test(groups = "broker")
+    @Test
     public void testStandaloneWithRocksDB() throws Exception {
         String[] args = new String[]{"--config",
                 "./src/test/resources/configurations/pulsar_broker_test_standalone_with_rocksdb.conf"};
@@ -131,4 +72,56 @@ public class PulsarStandaloneTest {
         cleanDirectory(tempDir);
     }
 
+    @Test(dataProvider = "enableBrokerClientAuth")
+    public void testMetadataInitialization(boolean enableBrokerClientAuth) throws Exception {
+        final File metadataDir = IOUtils.createTempDir("standalone", "metadata");
+        @Cleanup final PulsarStandaloneStarter standalone = new PulsarStandaloneStarter(new String[]{
+                "--config",
+                "./src/test/resources/configurations/standalone_no_client_auth.conf",
+                "-nss",
+                "-nfw",
+                "--metadata-url",
+                "rocksdb://" + metadataDir.getAbsolutePath()
+        });
+        if (enableBrokerClientAuth) {
+            standalone.getConfig().setBrokerClientAuthenticationPlugin(
+                    MockTokenAuthenticationProvider.MockAuthentication.class.getName());
+        }
+        final File bkDir = IOUtils.createTempDir("standalone", "bk");
+        standalone.setNumOfBk(1);
+        standalone.setBkDir(bkDir.getAbsolutePath());
+        standalone.start();
+
+        @Cleanup PulsarAdmin admin = PulsarAdmin.builder()
+                .serviceHttpUrl("http://localhost:8080")
+                .authentication(new MockTokenAuthenticationProvider.MockAuthentication())
+                .build();
+        if (enableBrokerClientAuth) {
+            assertTrue(admin.clusters().getClusters().contains("test_cluster"));
+            assertTrue(admin.tenants().getTenants().contains("public"));
+            assertTrue(admin.namespaces().getNamespaces("public").contains("public/default"));
+        } else {
+            assertTrue(admin.clusters().getClusters().isEmpty());
+            admin.clusters().createCluster("test_cluster", ClusterData.builder()
+                    .serviceUrl("http://localhost:8080/")
+                    .brokerServiceUrl("pulsar://localhost:6650/")
+                    .build());
+            assertTrue(admin.tenants().getTenants().isEmpty());
+            admin.tenants().createTenant("public", TenantInfo.builder()
+                    .adminRoles(Collections.singleton("admin"))
+                    .allowedClusters(Collections.singleton("test_cluster"))
+                    .build());
+
+            assertTrue(admin.namespaces().getNamespaces("public").isEmpty());
+            admin.namespaces().createNamespace("public/default");
+        }
+
+        String topic = "test-get-topic-bundle-range";
+        admin.topics().createNonPartitionedTopic(topic);
+        assertEquals(admin.lookups().getBundleRange(topic), "0xc0000000_0xffffffff");
+
+        standalone.close();
+        cleanDirectory(bkDir);
+        cleanDirectory(metadataDir);
+    }
 }
diff --git a/pulsar-broker/src/test/resources/configurations/standalone_no_client_auth.conf b/pulsar-broker/src/test/resources/configurations/standalone_no_client_auth.conf
new file mode 100644
index 00000000000..573d6cdba99
--- /dev/null
+++ b/pulsar-broker/src/test/resources/configurations/standalone_no_client_auth.conf
@@ -0,0 +1,31 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+brokerServicePort=6650
+webServicePort=8080
+allowLoopback=true
+clusterName=test_cluster
+superUserRoles=admin
+managedLedgerDefaultEnsembleSize=1
+managedLedgerDefaultWriteQuorum=1
+managedLedgerDefaultAckQuorum=1
+authenticationEnabled=true
+authenticationProviders=org.apache.pulsar.MockTokenAuthenticationProvider
+brokerClientAuthenticationPlugin=
+brokerClientAuthenticationParameters=
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/standalone/SmokeTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/standalone/SmokeTest.java
index 9fb90f172c5..438d0e42353 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/standalone/SmokeTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/standalone/SmokeTest.java
@@ -18,12 +18,7 @@
  */
 package org.apache.pulsar.tests.integration.standalone;
 
-import static org.testng.Assert.assertEquals;
 import java.util.function.Supplier;
-import lombok.Cleanup;
-import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.admin.PulsarAdminException;
-import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.tests.integration.suites.PulsarStandaloneTestSuite;
 import org.testng.annotations.Test;
 
@@ -33,14 +28,4 @@ public class SmokeTest extends PulsarStandaloneTestSuite {
     public void testPublishAndConsume(Supplier<String> serviceUrl, boolean isPersistent) throws Exception {
         super.testPublishAndConsume(serviceUrl.get(), isPersistent);
     }
-
-    @Test
-    public void testGetBundleRange() throws PulsarClientException, PulsarAdminException {
-        @Cleanup
-        PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(getHttpServiceUrl()).build();
-
-        String topic = "test-get-topic-bundle-range";
-        admin.topics().createNonPartitionedTopic(topic);
-        assertEquals(admin.lookups().getBundleRange(topic), "0xc0000000_0xffffffff");
-    }
 }
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarStandaloneTestBase.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarStandaloneTestBase.java
index 1261e76c0ba..85d7b46c711 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarStandaloneTestBase.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarStandaloneTestBase.java
@@ -121,7 +121,4 @@ public abstract class PulsarStandaloneTestBase extends PulsarTestBase {
         }
     }
 
-    protected String getHttpServiceUrl() {
-        return container.getHttpServiceUrl();
-    }
 }


[pulsar] 01/02: [fix][broker] Fix incorrect Nic usage collected by pulsar (#18882)

Posted by te...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit a8c10978efee5ec1a157fbee29afbcc727689f3e
Author: thetumbled <52...@users.noreply.github.com>
AuthorDate: Wed Dec 14 11:33:18 2022 +0800

    [fix][broker] Fix incorrect Nic usage collected by pulsar (#18882)
---
 .../pulsar/broker/loadbalance/LinuxInfoUtils.java  | 57 ++++++++++++++++++++--
 .../loadbalance/impl/LinuxBrokerHostUsageImpl.java |  4 +-
 .../loadbalance/LoadReportNetworkLimitTest.java    |  8 +--
 3 files changed, 59 insertions(+), 10 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LinuxInfoUtils.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LinuxInfoUtils.java
index cec0de503d6..cf5845e8409 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LinuxInfoUtils.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LinuxInfoUtils.java
@@ -26,6 +26,7 @@ import java.nio.file.Paths;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Locale;
 import java.util.Optional;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -163,6 +164,30 @@ public class LinuxInfoUtils {
         }
     }
 
+    /**
+     * Determine whether nic is usable.
+     * @param nicPath Nic path
+     * @return whether nic is usable.
+     */
+    private static boolean isUsable(Path nicPath) {
+        try {
+            String operstate = readTrimStringFromFile(nicPath.resolve("operstate"));
+            Operstate operState = Operstate.valueOf(operstate.toUpperCase(Locale.ROOT));
+            switch (operState) {
+                case UP:
+                case UNKNOWN:
+                case DORMANT:
+                    return true;
+                default:
+                    return false;
+            }
+        } catch (Exception e) {
+            log.warn("[LinuxInfo] Failed to read {} NIC operstate, the detail is: {}", nicPath, e.getMessage());
+            // Read operstate got error.
+            return false;
+        }
+    }
+
     /**
      * Get all physical nic limit.
      * @param nics All nic path
@@ -199,12 +224,13 @@ public class LinuxInfoUtils {
     }
 
     /**
-     * Get all physical nic path.
-     * @return All physical nic path
+     * Get paths of all usable physical nic.
+     * @return All usable physical nic paths.
      */
-    public static List<String> getPhysicalNICs() {
+    public static List<String> getUsablePhysicalNICs() {
         try (Stream<Path> stream = Files.list(Paths.get(NIC_PATH))) {
             return stream.filter(LinuxInfoUtils::isPhysicalNic)
+                    .filter(LinuxInfoUtils::isUsable)
                     .map(path -> path.getFileName().toString())
                     .collect(Collectors.toList());
         } catch (IOException e) {
@@ -218,7 +244,7 @@ public class LinuxInfoUtils {
      * @return Whether the VM has nic speed
      */
     public static boolean checkHasNicSpeeds() {
-        List<String> physicalNICs = getPhysicalNICs();
+        List<String> physicalNICs = getUsablePhysicalNICs();
         if (CollectionUtils.isEmpty(physicalNICs)) {
             return false;
         }
@@ -242,6 +268,29 @@ public class LinuxInfoUtils {
         return Double.parseDouble(readTrimStringFromFile(path));
     }
 
+    /**
+     * TLV IFLA_OPERSTATE
+     * contains RFC2863 state of the interface in numeric representation:
+     * See <a href="https://www.kernel.org/doc/Documentation/networking/operstates.txt">...</a>
+     */
+    enum Operstate {
+        // Interface is in unknown state, neither driver nor userspace has set
+        // operational state. Interface must be considered for user data as
+        // setting operational state has not been implemented in every driver.
+        UNKNOWN,
+        // Interface is unable to transfer data on L1, f.e. ethernet is not
+        // plugged or interface is ADMIN down.
+        DOWN,
+        // Interfaces stacked on an interface that is IF_OPER_DOWN show this
+        // state (f.e. VLAN).
+        LOWERLAYERDOWN,
+        // Interface is L1 up, but waiting for an external event, f.e. for a
+        // protocol to establish. (802.1X)
+        DORMANT,
+        // Interface is operational up and can be used.
+        UP
+    }
+
     @AllArgsConstructor
     public enum NICUsageType {
         // transport
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImpl.java
index 27035cc2b50..0c146419f29 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImpl.java
@@ -21,10 +21,10 @@ package org.apache.pulsar.broker.loadbalance.impl;
 import static org.apache.pulsar.broker.loadbalance.LinuxInfoUtils.NICUsageType;
 import static org.apache.pulsar.broker.loadbalance.LinuxInfoUtils.getCpuUsageForCGroup;
 import static org.apache.pulsar.broker.loadbalance.LinuxInfoUtils.getCpuUsageForEntireHost;
-import static org.apache.pulsar.broker.loadbalance.LinuxInfoUtils.getPhysicalNICs;
 import static org.apache.pulsar.broker.loadbalance.LinuxInfoUtils.getTotalCpuLimit;
 import static org.apache.pulsar.broker.loadbalance.LinuxInfoUtils.getTotalNicLimit;
 import static org.apache.pulsar.broker.loadbalance.LinuxInfoUtils.getTotalNicUsage;
+import static org.apache.pulsar.broker.loadbalance.LinuxInfoUtils.getUsablePhysicalNICs;
 import static org.apache.pulsar.broker.loadbalance.LinuxInfoUtils.isCGroupEnabled;
 import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
 import com.google.common.annotations.VisibleForTesting;
@@ -88,7 +88,7 @@ public class LinuxBrokerHostUsageImpl implements BrokerHostUsage {
 
     @Override
     public void calculateBrokerHostUsage() {
-        List<String> nics = getPhysicalNICs();
+        List<String> nics = getUsablePhysicalNICs();
         double totalNicLimit = getTotalNicLimitWithConfiguration(nics);
         double totalNicUsageTx = getTotalNicUsage(nics, NICUsageType.TX, BitRateUnit.Kilobit);
         double totalNicUsageRx = getTotalNicUsage(nics, NICUsageType.RX, BitRateUnit.Kilobit);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadReportNetworkLimitTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadReportNetworkLimitTest.java
index 57059a1b098..0e90ef15b68 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadReportNetworkLimitTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadReportNetworkLimitTest.java
@@ -29,7 +29,7 @@ import org.testng.annotations.Test;
 
 @Test(groups = "broker")
 public class LoadReportNetworkLimitTest extends MockedPulsarServiceBaseTest {
-    int nicCount;
+    int usableNicCount;
 
     @Override
     protected void doInitConf() throws Exception {
@@ -43,7 +43,7 @@ public class LoadReportNetworkLimitTest extends MockedPulsarServiceBaseTest {
     public void setup() throws Exception {
         super.internalSetup();
         if (SystemUtils.IS_OS_LINUX) {
-            nicCount = LinuxInfoUtils.getPhysicalNICs().size();
+            usableNicCount = LinuxInfoUtils.getUsablePhysicalNICs().size();
         }
     }
 
@@ -60,8 +60,8 @@ public class LoadReportNetworkLimitTest extends MockedPulsarServiceBaseTest {
         LoadManagerReport report = admin.brokerStats().getLoadReport();
 
         if (SystemUtils.IS_OS_LINUX) {
-            assertEquals(report.getBandwidthIn().limit, nicCount * 5.4 * 1000 * 1000);
-            assertEquals(report.getBandwidthOut().limit, nicCount * 5.4 * 1000 * 1000);
+            assertEquals(report.getBandwidthIn().limit, usableNicCount * 5.4 * 1000 * 1000);
+            assertEquals(report.getBandwidthOut().limit, usableNicCount * 5.4 * 1000 * 1000);
         } else {
             // On non-Linux system we don't report the network usage
             assertEquals(report.getBandwidthIn().limit, -1.0);