You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sh...@apache.org on 2022/05/15 12:30:10 UTC
[pulsar] branch master updated: use netty PlatformDependent.estimateMaxDirectMemory (#15238)
This is an automated email from the ASF dual-hosted git repository.
shoothzj pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 9d5d110e893 use netty PlatformDependent.estimateMaxDirectMemory (#15238)
9d5d110e893 is described below
commit 9d5d110e893cb6580f2aea36fa2137f2871fe4ae
Author: ZhangJian He <sh...@gmail.com>
AuthorDate: Sun May 15 20:29:58 2022 +0800
use netty PlatformDependent.estimateMaxDirectMemory (#15238)
### Motivation
- `PlatformDependent.maxDirectMemory()` can be inaccurate if `io.netty.maxDirectMemory` are setted
- Bookkeeper's `DirectMemoryUtils` is not worked within some jdk releases.
In netty 4.1.75, they introduced a new method `PlatformDependent.estimateMaxDirectMemory` to help users get maxDirectMemory. Since netty's this method works well on many jdk releases, use this to replace below two.
PS: `DirectMemoryUtils` has been removed from bookkeeper in https://github.com/apache/bookkeeper/pull/2989
### Modifications
- use `PlatformDependent.estimateMaxDirectMemory` instead
---
.../apache/pulsar/broker/ServiceConfiguration.java | 5 +--
.../org/apache/pulsar/PulsarBrokerStarter.java | 4 +--
.../broker/loadbalance/impl/LoadManagerShared.java | 3 +-
.../prometheus/PrometheusMetricsGenerator.java | 3 +-
.../org/apache/pulsar/common/stats/JvmMetrics.java | 3 +-
.../pulsar/common/util/DirectMemoryUtils.java | 37 ++++++++++++++++++++++
.../runtime/thread/ThreadRuntimeFactory.java | 3 +-
.../runtime/thread/ThreadRuntimeFactoryTest.java | 4 +--
.../functions/worker/WorkerStatsManager.java | 3 +-
.../pulsar/proxy/server/ProxyServiceStarter.java | 4 +--
.../apache/pulsar/testclient/PerfClientUtils.java | 3 +-
.../apache/pulsar/websocket/stats/JvmMetrics.java | 6 ++--
12 files changed, 60 insertions(+), 18 deletions(-)
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index c7e9ce3c198..857a8a05831 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -49,6 +49,7 @@ import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.sasl.SaslConstants;
+import org.apache.pulsar.common.util.DirectMemoryUtils;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
/**
@@ -1166,7 +1167,7 @@ public class ServiceConfiguration implements PulsarConfiguration {
+ " It's shared across all the topics running in the same broker.\n\n"
+ " Use -1 to disable the memory limitation. Default is 1/2 of direct memory.\n\n")
private int maxMessagePublishBufferSizeInMB = Math.max(64,
- (int) (io.netty.util.internal.PlatformDependent.maxDirectMemory() / 2 / (1024 * 1024)));
+ (int) (DirectMemoryUtils.jvmMaxDirectMemory() / 2 / (1024 * 1024)));
@FieldContext(
category = CATEGORY_SERVER,
@@ -1732,7 +1733,7 @@ public class ServiceConfiguration implements PulsarConfiguration {
+ " memory is allocated from JVM direct memory and it's shared across all the topics"
+ " running in the same broker. By default, uses 1/5th of available direct memory")
private int managedLedgerCacheSizeMB = Math.max(64,
- (int) (io.netty.util.internal.PlatformDependent.maxDirectMemory() / 5 / (1024 * 1024)));
+ (int) (DirectMemoryUtils.jvmMaxDirectMemory() / 5 / (1024 * 1024)));
@FieldContext(category = CATEGORY_STORAGE_ML, doc = "Whether we should make a copy of the entry payloads when "
+ "inserting in cache")
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java
index 8a216fb6165..9debacddf7f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java
@@ -28,7 +28,6 @@ import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.Parameters;
import com.google.common.annotations.VisibleForTesting;
-import io.netty.util.internal.PlatformDependent;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStream;
@@ -57,6 +56,7 @@ import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.CmdGenerateDocs;
+import org.apache.pulsar.common.util.DirectMemoryUtils;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.service.WorkerServiceLoader;
@@ -173,7 +173,7 @@ public class PulsarBrokerStarter {
}
int maxFrameSize = brokerConfig.getMaxMessageSize() + Commands.MESSAGE_SIZE_FRAME_PADDING;
- if (maxFrameSize >= PlatformDependent.estimateMaxDirectMemory()) {
+ if (maxFrameSize >= DirectMemoryUtils.jvmMaxDirectMemory()) {
throw new IllegalArgumentException("Max message size need smaller than jvm directMemory");
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
index cbaddc209a4..8afb6a04007 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
@@ -41,6 +41,7 @@ import org.apache.pulsar.broker.loadbalance.LoadData;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.ServiceUnitId;
+import org.apache.pulsar.common.util.DirectMemoryUtils;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
@@ -225,7 +226,7 @@ public class LoadManagerShared {
// Collect JVM direct memory
systemResourceUsage.setDirectMemory(new ResourceUsage((double) (getJvmDirectMemoryUsed() / MIBI),
- (double) (io.netty.util.internal.PlatformDependent.maxDirectMemory() / MIBI)));
+ (double) (DirectMemoryUtils.jvmMaxDirectMemory() / MIBI)));
return systemResourceUsage;
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
index 5d48ae93608..a321d1fcfe1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
@@ -52,6 +52,7 @@ import org.apache.pulsar.broker.stats.metrics.ManagedCursorMetrics;
import org.apache.pulsar.broker.stats.metrics.ManagedLedgerCacheMetrics;
import org.apache.pulsar.broker.stats.metrics.ManagedLedgerMetrics;
import org.apache.pulsar.common.stats.Metrics;
+import org.apache.pulsar.common.util.DirectMemoryUtils;
import org.apache.pulsar.common.util.SimpleTextOutputStream;
import org.eclipse.jetty.server.HttpOutput;
@@ -78,7 +79,7 @@ public class PrometheusMetricsGenerator {
Gauge.build("jvm_memory_direct_bytes_max", "-").create().setChild(new Child() {
@Override
public double get() {
- return io.netty.util.internal.PlatformDependent.maxDirectMemory();
+ return DirectMemoryUtils.jvmMaxDirectMemory();
}
}).register(CollectorRegistry.defaultRegistry);
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/stats/JvmMetrics.java b/pulsar-common/src/main/java/org/apache/pulsar/common/stats/JvmMetrics.java
index d05484062d1..737c6dd197e 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/stats/JvmMetrics.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/stats/JvmMetrics.java
@@ -37,6 +37,7 @@ import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.common.util.DirectMemoryUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -103,7 +104,7 @@ public class JvmMetrics {
m.put("jvm_total_memory", r.totalMemory());
m.put("jvm_direct_memory_used", getJvmDirectMemoryUsed());
- m.put("jvm_max_direct_memory", io.netty.util.internal.PlatformDependent.maxDirectMemory());
+ m.put("jvm_max_direct_memory", DirectMemoryUtils.jvmMaxDirectMemory());
m.put("jvm_thread_cnt", getThreadCount());
this.gcLogger.logMetrics(m);
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/DirectMemoryUtils.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/DirectMemoryUtils.java
new file mode 100644
index 00000000000..7fb54ac3661
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/DirectMemoryUtils.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util;
+
+// CHECKSTYLE.OFF: IllegalImport
+import io.netty.util.internal.PlatformDependent;
+// CHECKSTYLE.ON: IllegalImport
+
+public class DirectMemoryUtils {
+
+ /**
+ * PlatformDependent.maxDirectMemory can be inaccurate if java property `io.netty.maxDirectMemory` are setted.
+ * Cache the result in this field.
+ */
+ private static final long JVM_MAX_DIRECT_MEMORY = PlatformDependent.estimateMaxDirectMemory();
+
+ public static long jvmMaxDirectMemory() {
+ return JVM_MAX_DIRECT_MEMORY;
+ }
+
+}
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
index e57af3279af..5e574919996 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
@@ -27,6 +27,7 @@ import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.common.util.DirectMemoryUtils;
import org.apache.pulsar.common.util.Reflections;
import org.apache.pulsar.functions.auth.FunctionAuthProvider;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
@@ -145,7 +146,7 @@ public class ThreadRuntimeFactory implements RuntimeFactory {
}
private long getBytesPercentDirectMem(double percent) {
- return (long) (io.netty.util.internal.PlatformDependent.maxDirectMemory() * (percent / 100));
+ return (long) (DirectMemoryUtils.jvmMaxDirectMemory() * (percent / 100));
}
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactoryTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactoryTest.java
index c86e1d5f2d6..f13fd0c646e 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactoryTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactoryTest.java
@@ -19,11 +19,11 @@
package org.apache.pulsar.functions.runtime.thread;
-import io.netty.util.internal.PlatformDependent;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.SizeUnit;
+import org.apache.pulsar.common.util.DirectMemoryUtils;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
@@ -105,7 +105,7 @@ public class ThreadRuntimeFactoryTest {
private ClientBuilder testMemoryLimit(Long absolute, Double percent) throws Exception {
try (MockedStatic<PulsarClient> mockedPulsarClient = mockStatic(PulsarClient.class);) {
- Whitebox.setInternalState(PlatformDependent.class, "DIRECT_MEMORY_LIMIT", 1024L);
+ Whitebox.setInternalState(DirectMemoryUtils.class, "JVM_MAX_DIRECT_MEMORY", 1024L);
ClientBuilder clientBuilder = Mockito.mock(ClientBuilder.class);
mockedPulsarClient.when(() -> PulsarClient.builder()).thenAnswer(i -> clientBuilder);
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerStatsManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerStatsManager.java
index e2e0781d8d3..445cc6d2ee3 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerStatsManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerStatsManager.java
@@ -29,6 +29,7 @@ import java.io.StringWriter;
import java.util.List;
import java.util.function.Supplier;
import lombok.Setter;
+import org.apache.pulsar.common.util.DirectMemoryUtils;
import org.apache.pulsar.functions.instance.stats.PrometheusTextFormat;
import org.apache.pulsar.functions.proto.Function;
@@ -190,7 +191,7 @@ public class WorkerStatsManager {
Gauge.build("jvm_memory_direct_bytes_max", "-").create().setChild(new Gauge.Child() {
@Override
public double get() {
- return io.netty.util.internal.PlatformDependent.maxDirectMemory();
+ return DirectMemoryUtils.jvmMaxDirectMemory();
}
}).register(CollectorRegistry.defaultRegistry);
}
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
index 9fda513c6e0..6031850fc02 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
@@ -28,7 +28,6 @@ import static org.slf4j.bridge.SLF4JBridgeHandler.removeHandlersForRootLogger;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.google.common.annotations.VisibleForTesting;
-import io.netty.util.internal.PlatformDependent;
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.Gauge;
import io.prometheus.client.Gauge.Child;
@@ -49,6 +48,7 @@ import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.configuration.VipStatus;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.util.CmdGenerateDocs;
+import org.apache.pulsar.common.util.DirectMemoryUtils;
import org.apache.pulsar.proxy.stats.ProxyStats;
import org.apache.pulsar.websocket.WebSocketConsumerServlet;
import org.apache.pulsar.websocket.WebSocketPingPongServlet;
@@ -226,7 +226,7 @@ public class ProxyServiceStarter {
Gauge.build("jvm_memory_direct_bytes_max", "-").create().setChild(new Child() {
@Override
public double get() {
- return PlatformDependent.maxDirectMemory();
+ return DirectMemoryUtils.jvmMaxDirectMemory();
}
}).register(CollectorRegistry.defaultRegistry);
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerfClientUtils.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerfClientUtils.java
index a563a7370a4..44dcad90e3e 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerfClientUtils.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerfClientUtils.java
@@ -23,6 +23,7 @@ import java.util.Objects;
import java.util.function.Consumer;
import lombok.experimental.UtilityClass;
import org.apache.commons.io.FileUtils;
+import org.apache.pulsar.common.util.DirectMemoryUtils;
import org.slf4j.Logger;
/**
@@ -49,7 +50,7 @@ public class PerfClientUtils {
public static void printJVMInformation(Logger log) {
log.info("JVM args {}", ManagementFactory.getRuntimeMXBean().getInputArguments());
log.info("Netty max memory (PlatformDependent.maxDirectMemory()) {}",
- FileUtils.byteCountToDisplaySize(io.netty.util.internal.PlatformDependent.maxDirectMemory()));
+ FileUtils.byteCountToDisplaySize(DirectMemoryUtils.jvmMaxDirectMemory()));
log.info("JVM max heap memory (Runtime.getRuntime().maxMemory()) {}",
FileUtils.byteCountToDisplaySize(Runtime.getRuntime().maxMemory()));
}
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/stats/JvmMetrics.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/stats/JvmMetrics.java
index 91f3db2192f..2f542e7dd81 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/stats/JvmMetrics.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/stats/JvmMetrics.java
@@ -18,8 +18,6 @@
*/
package org.apache.pulsar.websocket.stats;
-// CHECKSTYLE.OFF: IllegalImport
-import static io.netty.util.internal.PlatformDependent.maxDirectMemory;
import static org.apache.pulsar.common.stats.Metrics.create;
import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
import java.lang.management.ManagementFactory;
@@ -30,10 +28,10 @@ import javax.management.MBeanServer;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import org.apache.pulsar.common.stats.Metrics;
+import org.apache.pulsar.common.util.DirectMemoryUtils;
import org.apache.pulsar.websocket.WebSocketService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-// CHECKSTYLE.ON: IllegalImport
public class JvmMetrics {
@@ -65,7 +63,7 @@ public class JvmMetrics {
m.put("jvm_max_memory", r.maxMemory());
m.put("jvm_total_memory", r.totalMemory());
- m.put("jvm_max_direct_memory", maxDirectMemory());
+ m.put("jvm_max_direct_memory", DirectMemoryUtils.jvmMaxDirectMemory());
m.put("jvm_thread_cnt", getThreadCount());
m.put("jvm_gc_young_pause", currentYoungGcTime);