You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sh...@apache.org on 2022/05/15 12:30:10 UTC

[pulsar] branch master updated: use netty PlatformDependent.estimateMaxDirectMemory (#15238)

This is an automated email from the ASF dual-hosted git repository.

shoothzj pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 9d5d110e893 use netty PlatformDependent.estimateMaxDirectMemory (#15238)
9d5d110e893 is described below

commit 9d5d110e893cb6580f2aea36fa2137f2871fe4ae
Author: ZhangJian He <sh...@gmail.com>
AuthorDate: Sun May 15 20:29:58 2022 +0800

    use netty PlatformDependent.estimateMaxDirectMemory (#15238)
    
    ### Motivation
    - `PlatformDependent.maxDirectMemory()` can be inaccurate if `io.netty.maxDirectMemory` are setted
    - Bookkeeper's `DirectMemoryUtils` is not worked within some jdk releases.
    In netty 4.1.75, they introduced a new method `PlatformDependent.estimateMaxDirectMemory` to help users get maxDirectMemory. Since netty's this method works well on many jdk releases, use this to replace below two.
    PS: `DirectMemoryUtils` has been removed from bookkeeper in https://github.com/apache/bookkeeper/pull/2989
    ### Modifications
    - use `PlatformDependent.estimateMaxDirectMemory` instead
---
 .../apache/pulsar/broker/ServiceConfiguration.java |  5 +--
 .../org/apache/pulsar/PulsarBrokerStarter.java     |  4 +--
 .../broker/loadbalance/impl/LoadManagerShared.java |  3 +-
 .../prometheus/PrometheusMetricsGenerator.java     |  3 +-
 .../org/apache/pulsar/common/stats/JvmMetrics.java |  3 +-
 .../pulsar/common/util/DirectMemoryUtils.java      | 37 ++++++++++++++++++++++
 .../runtime/thread/ThreadRuntimeFactory.java       |  3 +-
 .../runtime/thread/ThreadRuntimeFactoryTest.java   |  4 +--
 .../functions/worker/WorkerStatsManager.java       |  3 +-
 .../pulsar/proxy/server/ProxyServiceStarter.java   |  4 +--
 .../apache/pulsar/testclient/PerfClientUtils.java  |  3 +-
 .../apache/pulsar/websocket/stats/JvmMetrics.java  |  6 ++--
 12 files changed, 60 insertions(+), 18 deletions(-)

diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index c7e9ce3c198..857a8a05831 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -49,6 +49,7 @@ import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
 import org.apache.pulsar.common.policies.data.TopicType;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.sasl.SaslConstants;
+import org.apache.pulsar.common.util.DirectMemoryUtils;
 import org.apache.pulsar.metadata.impl.ZKMetadataStore;
 
 /**
@@ -1166,7 +1167,7 @@ public class ServiceConfiguration implements PulsarConfiguration {
             + " It's shared across all the topics running in the same broker.\n\n"
             + " Use -1 to disable the memory limitation. Default is 1/2 of direct memory.\n\n")
     private int maxMessagePublishBufferSizeInMB = Math.max(64,
-        (int) (io.netty.util.internal.PlatformDependent.maxDirectMemory() / 2 / (1024 * 1024)));
+        (int) (DirectMemoryUtils.jvmMaxDirectMemory() / 2 / (1024 * 1024)));
 
     @FieldContext(
         category = CATEGORY_SERVER,
@@ -1732,7 +1733,7 @@ public class ServiceConfiguration implements PulsarConfiguration {
             + " memory is allocated from JVM direct memory and it's shared across all the topics"
             + " running in the same broker. By default, uses 1/5th of available direct memory")
     private int managedLedgerCacheSizeMB = Math.max(64,
-            (int) (io.netty.util.internal.PlatformDependent.maxDirectMemory() / 5 / (1024 * 1024)));
+            (int) (DirectMemoryUtils.jvmMaxDirectMemory() / 5 / (1024 * 1024)));
 
     @FieldContext(category = CATEGORY_STORAGE_ML, doc = "Whether we should make a copy of the entry payloads when "
             + "inserting in cache")
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java
index 8a216fb6165..9debacddf7f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java
@@ -28,7 +28,6 @@ import com.beust.jcommander.JCommander;
 import com.beust.jcommander.Parameter;
 import com.beust.jcommander.Parameters;
 import com.google.common.annotations.VisibleForTesting;
-import io.netty.util.internal.PlatformDependent;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.InputStream;
@@ -57,6 +56,7 @@ import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.util.CmdGenerateDocs;
+import org.apache.pulsar.common.util.DirectMemoryUtils;
 import org.apache.pulsar.functions.worker.WorkerConfig;
 import org.apache.pulsar.functions.worker.WorkerService;
 import org.apache.pulsar.functions.worker.service.WorkerServiceLoader;
@@ -173,7 +173,7 @@ public class PulsarBrokerStarter {
             }
 
             int maxFrameSize = brokerConfig.getMaxMessageSize() + Commands.MESSAGE_SIZE_FRAME_PADDING;
-            if (maxFrameSize >= PlatformDependent.estimateMaxDirectMemory()) {
+            if (maxFrameSize >= DirectMemoryUtils.jvmMaxDirectMemory()) {
                 throw new IllegalArgumentException("Max message size need smaller than jvm directMemory");
             }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
index cbaddc209a4..8afb6a04007 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
@@ -41,6 +41,7 @@ import org.apache.pulsar.broker.loadbalance.LoadData;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.ServiceUnitId;
+import org.apache.pulsar.common.util.DirectMemoryUtils;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
@@ -225,7 +226,7 @@ public class LoadManagerShared {
 
         // Collect JVM direct memory
         systemResourceUsage.setDirectMemory(new ResourceUsage((double) (getJvmDirectMemoryUsed() / MIBI),
-                (double) (io.netty.util.internal.PlatformDependent.maxDirectMemory() / MIBI)));
+                (double) (DirectMemoryUtils.jvmMaxDirectMemory() / MIBI)));
 
         return systemResourceUsage;
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
index 5d48ae93608..a321d1fcfe1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
@@ -52,6 +52,7 @@ import org.apache.pulsar.broker.stats.metrics.ManagedCursorMetrics;
 import org.apache.pulsar.broker.stats.metrics.ManagedLedgerCacheMetrics;
 import org.apache.pulsar.broker.stats.metrics.ManagedLedgerMetrics;
 import org.apache.pulsar.common.stats.Metrics;
+import org.apache.pulsar.common.util.DirectMemoryUtils;
 import org.apache.pulsar.common.util.SimpleTextOutputStream;
 import org.eclipse.jetty.server.HttpOutput;
 
@@ -78,7 +79,7 @@ public class PrometheusMetricsGenerator {
         Gauge.build("jvm_memory_direct_bytes_max", "-").create().setChild(new Child() {
             @Override
             public double get() {
-                return io.netty.util.internal.PlatformDependent.maxDirectMemory();
+                return DirectMemoryUtils.jvmMaxDirectMemory();
             }
         }).register(CollectorRegistry.defaultRegistry);
 
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/stats/JvmMetrics.java b/pulsar-common/src/main/java/org/apache/pulsar/common/stats/JvmMetrics.java
index d05484062d1..737c6dd197e 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/stats/JvmMetrics.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/stats/JvmMetrics.java
@@ -37,6 +37,7 @@ import java.util.Set;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.common.util.DirectMemoryUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -103,7 +104,7 @@ public class JvmMetrics {
         m.put("jvm_total_memory", r.totalMemory());
 
         m.put("jvm_direct_memory_used", getJvmDirectMemoryUsed());
-        m.put("jvm_max_direct_memory", io.netty.util.internal.PlatformDependent.maxDirectMemory());
+        m.put("jvm_max_direct_memory", DirectMemoryUtils.jvmMaxDirectMemory());
         m.put("jvm_thread_cnt", getThreadCount());
 
         this.gcLogger.logMetrics(m);
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/DirectMemoryUtils.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/DirectMemoryUtils.java
new file mode 100644
index 00000000000..7fb54ac3661
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/DirectMemoryUtils.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util;
+
+// CHECKSTYLE.OFF: IllegalImport
+import io.netty.util.internal.PlatformDependent;
+// CHECKSTYLE.ON: IllegalImport
+
+public class DirectMemoryUtils {
+
+    /**
+     * PlatformDependent.maxDirectMemory can be inaccurate if java property `io.netty.maxDirectMemory` are setted.
+     * Cache the result in this field.
+     */
+    private static final long JVM_MAX_DIRECT_MEMORY = PlatformDependent.estimateMaxDirectMemory();
+
+    public static long jvmMaxDirectMemory() {
+        return JVM_MAX_DIRECT_MEMORY;
+    }
+
+}
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
index e57af3279af..5e574919996 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
@@ -27,6 +27,7 @@ import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.common.util.DirectMemoryUtils;
 import org.apache.pulsar.common.util.Reflections;
 import org.apache.pulsar.functions.auth.FunctionAuthProvider;
 import org.apache.pulsar.functions.instance.AuthenticationConfig;
@@ -145,7 +146,7 @@ public class ThreadRuntimeFactory implements RuntimeFactory {
     }
 
     private long getBytesPercentDirectMem(double percent) {
-        return (long) (io.netty.util.internal.PlatformDependent.maxDirectMemory() * (percent / 100));
+        return (long) (DirectMemoryUtils.jvmMaxDirectMemory() * (percent / 100));
     }
 
 
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactoryTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactoryTest.java
index c86e1d5f2d6..f13fd0c646e 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactoryTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactoryTest.java
@@ -19,11 +19,11 @@
 
 package org.apache.pulsar.functions.runtime.thread;
 
-import io.netty.util.internal.PlatformDependent;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.SizeUnit;
+import org.apache.pulsar.common.util.DirectMemoryUtils;
 import org.apache.pulsar.functions.instance.AuthenticationConfig;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
@@ -105,7 +105,7 @@ public class ThreadRuntimeFactoryTest {
 
     private ClientBuilder testMemoryLimit(Long absolute, Double percent) throws Exception {
         try (MockedStatic<PulsarClient> mockedPulsarClient = mockStatic(PulsarClient.class);) {
-            Whitebox.setInternalState(PlatformDependent.class, "DIRECT_MEMORY_LIMIT", 1024L);
+            Whitebox.setInternalState(DirectMemoryUtils.class, "JVM_MAX_DIRECT_MEMORY", 1024L);
 
             ClientBuilder clientBuilder = Mockito.mock(ClientBuilder.class);
             mockedPulsarClient.when(() -> PulsarClient.builder()).thenAnswer(i -> clientBuilder);
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerStatsManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerStatsManager.java
index e2e0781d8d3..445cc6d2ee3 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerStatsManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerStatsManager.java
@@ -29,6 +29,7 @@ import java.io.StringWriter;
 import java.util.List;
 import java.util.function.Supplier;
 import lombok.Setter;
+import org.apache.pulsar.common.util.DirectMemoryUtils;
 import org.apache.pulsar.functions.instance.stats.PrometheusTextFormat;
 import org.apache.pulsar.functions.proto.Function;
 
@@ -190,7 +191,7 @@ public class WorkerStatsManager {
       Gauge.build("jvm_memory_direct_bytes_max", "-").create().setChild(new Gauge.Child() {
         @Override
         public double get() {
-          return io.netty.util.internal.PlatformDependent.maxDirectMemory();
+          return DirectMemoryUtils.jvmMaxDirectMemory();
         }
       }).register(CollectorRegistry.defaultRegistry);
     }
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
index 9fda513c6e0..6031850fc02 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
@@ -28,7 +28,6 @@ import static org.slf4j.bridge.SLF4JBridgeHandler.removeHandlersForRootLogger;
 import com.beust.jcommander.JCommander;
 import com.beust.jcommander.Parameter;
 import com.google.common.annotations.VisibleForTesting;
-import io.netty.util.internal.PlatformDependent;
 import io.prometheus.client.CollectorRegistry;
 import io.prometheus.client.Gauge;
 import io.prometheus.client.Gauge.Child;
@@ -49,6 +48,7 @@ import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.common.configuration.VipStatus;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.util.CmdGenerateDocs;
+import org.apache.pulsar.common.util.DirectMemoryUtils;
 import org.apache.pulsar.proxy.stats.ProxyStats;
 import org.apache.pulsar.websocket.WebSocketConsumerServlet;
 import org.apache.pulsar.websocket.WebSocketPingPongServlet;
@@ -226,7 +226,7 @@ public class ProxyServiceStarter {
         Gauge.build("jvm_memory_direct_bytes_max", "-").create().setChild(new Child() {
             @Override
             public double get() {
-                return PlatformDependent.maxDirectMemory();
+                return DirectMemoryUtils.jvmMaxDirectMemory();
             }
         }).register(CollectorRegistry.defaultRegistry);
 
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerfClientUtils.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerfClientUtils.java
index a563a7370a4..44dcad90e3e 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerfClientUtils.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerfClientUtils.java
@@ -23,6 +23,7 @@ import java.util.Objects;
 import java.util.function.Consumer;
 import lombok.experimental.UtilityClass;
 import org.apache.commons.io.FileUtils;
+import org.apache.pulsar.common.util.DirectMemoryUtils;
 import org.slf4j.Logger;
 
 /**
@@ -49,7 +50,7 @@ public class PerfClientUtils {
     public static void printJVMInformation(Logger log) {
         log.info("JVM args {}", ManagementFactory.getRuntimeMXBean().getInputArguments());
         log.info("Netty max memory (PlatformDependent.maxDirectMemory()) {}",
-                FileUtils.byteCountToDisplaySize(io.netty.util.internal.PlatformDependent.maxDirectMemory()));
+                FileUtils.byteCountToDisplaySize(DirectMemoryUtils.jvmMaxDirectMemory()));
         log.info("JVM max heap memory (Runtime.getRuntime().maxMemory()) {}",
                 FileUtils.byteCountToDisplaySize(Runtime.getRuntime().maxMemory()));
     }
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/stats/JvmMetrics.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/stats/JvmMetrics.java
index 91f3db2192f..2f542e7dd81 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/stats/JvmMetrics.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/stats/JvmMetrics.java
@@ -18,8 +18,6 @@
  */
 package org.apache.pulsar.websocket.stats;
 
-// CHECKSTYLE.OFF: IllegalImport
-import static io.netty.util.internal.PlatformDependent.maxDirectMemory;
 import static org.apache.pulsar.common.stats.Metrics.create;
 import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
 import java.lang.management.ManagementFactory;
@@ -30,10 +28,10 @@ import javax.management.MBeanServer;
 import javax.management.MalformedObjectNameException;
 import javax.management.ObjectName;
 import org.apache.pulsar.common.stats.Metrics;
+import org.apache.pulsar.common.util.DirectMemoryUtils;
 import org.apache.pulsar.websocket.WebSocketService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-// CHECKSTYLE.ON: IllegalImport
 
 public class JvmMetrics {
 
@@ -65,7 +63,7 @@ public class JvmMetrics {
         m.put("jvm_max_memory", r.maxMemory());
         m.put("jvm_total_memory", r.totalMemory());
 
-        m.put("jvm_max_direct_memory", maxDirectMemory());
+        m.put("jvm_max_direct_memory", DirectMemoryUtils.jvmMaxDirectMemory());
         m.put("jvm_thread_cnt", getThreadCount());
 
         m.put("jvm_gc_young_pause", currentYoungGcTime);