You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by al...@apache.org on 2022/06/20 02:01:18 UTC

[dubbo] branch 3.0 updated: [ISSUE #10020] add MemorySafeLinkedBlockingQueue (#10021)

This is an automated email from the ASF dual-hosted git repository.

albumenj pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new 6de61dfaf1 [ISSUE #10020] add MemorySafeLinkedBlockingQueue (#10021)
6de61dfaf1 is described below

commit 6de61dfaf1452a6547d8d56a246bcb95a24ebf19
Author: dragon-zhang <ha...@webuy.ai>
AuthorDate: Mon Jun 20 10:01:06 2022 +0800

    [ISSUE #10020] add MemorySafeLinkedBlockingQueue (#10021)
    
    * [ISSUE #10020] add MemorySafeLinkedBlockingQueue
    
    * fix bug and add test case
    
    * fix bug
---
 .../common/threadpool/MemoryLimitCalculator.java   | 81 ++++++++++++++++++
 .../dubbo/common/threadpool/MemoryLimiter.java     |  9 +-
 .../threadpool/MemorySafeLinkedBlockingQueue.java  | 97 ++++++++++++++++++++++
 .../support/cached/CachedThreadPool.java           |  3 +-
 .../threadpool/support/fixed/FixedThreadPool.java  |  3 +-
 .../support/limited/LimitedThreadPool.java         |  3 +-
 .../MemorySafeLinkedBlockingQueueTest.java         | 46 ++++++++++
 7 files changed, 235 insertions(+), 7 deletions(-)

diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/MemoryLimitCalculator.java b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/MemoryLimitCalculator.java
new file mode 100644
index 0000000000..31db9e0508
--- /dev/null
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/MemoryLimitCalculator.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.common.threadpool;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * {@link java.lang.Runtime#freeMemory()} technology is used to calculate the
+ * memory limit by using the percentage of the current maximum available memory,
+ * which can be used with {@link MemoryLimiter}.
+ *
+ * @see MemoryLimiter
+ * @see <a href="https://github.com/apache/incubator-shenyu/blob/master/shenyu-common/src/main/java/org/apache/shenyu/common/concurrent/MemoryLimitCalculator.java">MemoryLimitCalculator</a>
+ */
+public class MemoryLimitCalculator {
+
+    private static volatile long maxAvailable;
+
+    private static final ScheduledExecutorService SCHEDULER = Executors.newSingleThreadScheduledExecutor();
+
+    static {
+        // immediately refresh when this class is loaded to prevent maxAvailable from being 0
+        refresh();
+        // check every 50 ms to improve performance
+        SCHEDULER.scheduleWithFixedDelay(MemoryLimitCalculator::refresh, 50, 50, TimeUnit.MILLISECONDS);
+        Runtime.getRuntime().addShutdownHook(new Thread(SCHEDULER::shutdown));
+    }
+
+    private static void refresh() {
+        maxAvailable = Runtime.getRuntime().freeMemory();
+    }
+
+    /**
+     * Get the maximum available memory of the current JVM.
+     *
+     * @return maximum available memory
+     */
+    public static long maxAvailable() {
+        return maxAvailable;
+    }
+
+    /**
+     * Take the current JVM's maximum available memory
+     * as a percentage of the result as the limit.
+     *
+     * @param percentage percentage
+     * @return available memory
+     */
+    public static long calculate(final float percentage) {
+        if (percentage <= 0 || percentage > 1) {
+            throw new IllegalArgumentException();
+        }
+        return (long) (maxAvailable() * percentage);
+    }
+
+    /**
+     * By default, it takes 80% of the maximum available memory of the current JVM.
+     *
+     * @return available memory
+     */
+    public static long defaultLimit() {
+        return (long) (maxAvailable() * 0.8);
+    }
+}
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/MemoryLimiter.java b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/MemoryLimiter.java
index d1db0d16eb..b26e37eb27 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/MemoryLimiter.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/MemoryLimiter.java
@@ -122,7 +122,8 @@ public class MemoryLimiter {
                 return false;
             }
             memory.add(objectSize);
-            if (sum < memoryLimit) {
+            // see https://github.com/apache/incubator-shenyu/pull/3356
+            if (memory.sum() < memoryLimit) {
                 notLimited.signal();
             }
         } finally {
@@ -140,13 +141,13 @@ public class MemoryLimiter {
         }
         acquireLock.lockInterruptibly();
         try {
-            final long sum = memory.sum();
             final long objectSize = inst.getObjectSize(e);
-            while (sum + objectSize >= memoryLimit) {
+            // see https://github.com/apache/incubator-shenyu/pull/3335
+            while (memory.sum() + objectSize >= memoryLimit) {
                 notLimited.await();
             }
             memory.add(objectSize);
-            if (sum < memoryLimit) {
+            if (memory.sum() < memoryLimit) {
                 notLimited.signal();
             }
         } finally {
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/MemorySafeLinkedBlockingQueue.java b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/MemorySafeLinkedBlockingQueue.java
new file mode 100644
index 0000000000..aae67b618c
--- /dev/null
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/MemorySafeLinkedBlockingQueue.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.common.threadpool;
+
+import java.util.Collection;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Can completely solve the OOM problem caused by {@link java.util.concurrent.LinkedBlockingQueue},
+ * does not depend on {@link java.lang.instrument.Instrumentation} and is easier to use than
+ * {@link MemoryLimitedLinkedBlockingQueue}.
+ *
+ * @see <a href="https://github.com/apache/incubator-shenyu/blob/master/shenyu-common/src/main/java/org/apache/shenyu/common/concurrent/MemorySafeLinkedBlockingQueue.java">MemorySafeLinkedBlockingQueue</a>
+ */
+public class MemorySafeLinkedBlockingQueue<E> extends LinkedBlockingQueue<E> {
+
+    private static final long serialVersionUID = 8032578371739960142L;
+
+    public static int THE_256_MB = 256 * 1024 * 1024;
+
+    private int maxFreeMemory;
+
+    public MemorySafeLinkedBlockingQueue() {
+        this(THE_256_MB);
+    }
+
+    public MemorySafeLinkedBlockingQueue(final int maxFreeMemory) {
+        super(Integer.MAX_VALUE);
+        this.maxFreeMemory = maxFreeMemory;
+    }
+
+    public MemorySafeLinkedBlockingQueue(final Collection<? extends E> c,
+                                         final int maxFreeMemory) {
+        super(c);
+        this.maxFreeMemory = maxFreeMemory;
+    }
+
+    /**
+     * set the max free memory.
+     *
+     * @param maxFreeMemory the max free memory
+     */
+    public void setMaxFreeMemory(final int maxFreeMemory) {
+        this.maxFreeMemory = maxFreeMemory;
+    }
+
+    /**
+     * get the max free memory.
+     *
+     * @return the max free memory limit
+     */
+    public int getMaxFreeMemory() {
+        return maxFreeMemory;
+    }
+
+    /**
+     * determine if there is any remaining free memory.
+     *
+     * @return true if has free memory
+     */
+    public boolean hasRemainedMemory() {
+        return MemoryLimitCalculator.maxAvailable() > maxFreeMemory;
+    }
+
+    @Override
+    public void put(final E e) throws InterruptedException {
+        if (hasRemainedMemory()) {
+            super.put(e);
+        }
+    }
+
+    @Override
+    public boolean offer(final E e, final long timeout, final TimeUnit unit) throws InterruptedException {
+        return hasRemainedMemory() && super.offer(e, timeout, unit);
+    }
+
+    @Override
+    public boolean offer(final E e) {
+        return hasRemainedMemory() && super.offer(e);
+    }
+}
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/support/cached/CachedThreadPool.java b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/support/cached/CachedThreadPool.java
index eb14fba089..b053bd9337 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/support/cached/CachedThreadPool.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/support/cached/CachedThreadPool.java
@@ -18,6 +18,7 @@ package org.apache.dubbo.common.threadpool.support.cached;
 
 import org.apache.dubbo.common.URL;
 import org.apache.dubbo.common.threadlocal.NamedInternalThreadFactory;
+import org.apache.dubbo.common.threadpool.MemorySafeLinkedBlockingQueue;
 import org.apache.dubbo.common.threadpool.ThreadPool;
 import org.apache.dubbo.common.threadpool.support.AbortPolicyWithReport;
 
@@ -54,7 +55,7 @@ public class CachedThreadPool implements ThreadPool {
         int alive = url.getParameter(ALIVE_KEY, DEFAULT_ALIVE);
         return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS,
                 queues == 0 ? new SynchronousQueue<Runnable>() :
-                        (queues < 0 ? new LinkedBlockingQueue<Runnable>()
+                        (queues < 0 ? new MemorySafeLinkedBlockingQueue<Runnable>()
                                 : new LinkedBlockingQueue<Runnable>(queues)),
                 new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
     }
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/support/fixed/FixedThreadPool.java b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/support/fixed/FixedThreadPool.java
index 71ee07449a..de1fb7ee68 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/support/fixed/FixedThreadPool.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/support/fixed/FixedThreadPool.java
@@ -18,6 +18,7 @@ package org.apache.dubbo.common.threadpool.support.fixed;
 
 import org.apache.dubbo.common.URL;
 import org.apache.dubbo.common.threadlocal.NamedInternalThreadFactory;
+import org.apache.dubbo.common.threadpool.MemorySafeLinkedBlockingQueue;
 import org.apache.dubbo.common.threadpool.ThreadPool;
 import org.apache.dubbo.common.threadpool.support.AbortPolicyWithReport;
 
@@ -48,7 +49,7 @@ public class FixedThreadPool implements ThreadPool {
         int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
         return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
                 queues == 0 ? new SynchronousQueue<Runnable>() :
-                        (queues < 0 ? new LinkedBlockingQueue<Runnable>()
+                        (queues < 0 ? new MemorySafeLinkedBlockingQueue<Runnable>()
                                 : new LinkedBlockingQueue<Runnable>(queues)),
                 new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
     }
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/support/limited/LimitedThreadPool.java b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/support/limited/LimitedThreadPool.java
index 25e3003900..f0203e6676 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/support/limited/LimitedThreadPool.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/support/limited/LimitedThreadPool.java
@@ -19,6 +19,7 @@ package org.apache.dubbo.common.threadpool.support.limited;
 
 import org.apache.dubbo.common.URL;
 import org.apache.dubbo.common.threadlocal.NamedInternalThreadFactory;
+import org.apache.dubbo.common.threadpool.MemorySafeLinkedBlockingQueue;
 import org.apache.dubbo.common.threadpool.ThreadPool;
 import org.apache.dubbo.common.threadpool.support.AbortPolicyWithReport;
 
@@ -51,7 +52,7 @@ public class LimitedThreadPool implements ThreadPool {
         int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
         return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS,
                 queues == 0 ? new SynchronousQueue<Runnable>() :
-                        (queues < 0 ? new LinkedBlockingQueue<Runnable>()
+                        (queues < 0 ? new MemorySafeLinkedBlockingQueue<Runnable>()
                                 : new LinkedBlockingQueue<Runnable>(queues)),
                 new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
     }
diff --git a/dubbo-common/src/test/java/org/apache/dubbo/common/threadpool/MemorySafeLinkedBlockingQueueTest.java b/dubbo-common/src/test/java/org/apache/dubbo/common/threadpool/MemorySafeLinkedBlockingQueueTest.java
new file mode 100644
index 0000000000..dd7f095b88
--- /dev/null
+++ b/dubbo-common/src/test/java/org/apache/dubbo/common/threadpool/MemorySafeLinkedBlockingQueueTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.common.threadpool;
+
+import net.bytebuddy.agent.ByteBuddyAgent;
+import org.junit.jupiter.api.Test;
+
+import java.lang.instrument.Instrumentation;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+
+public class MemorySafeLinkedBlockingQueueTest {
+    @Test
+    public void test() throws Exception {
+        ByteBuddyAgent.install();
+        final Instrumentation instrumentation = ByteBuddyAgent.getInstrumentation();
+        final long objectSize = instrumentation.getObjectSize((Runnable) () -> {
+        });
+        int maxFreeMemory = (int) MemoryLimitCalculator.maxAvailable();
+        MemorySafeLinkedBlockingQueue<Runnable> queue = new MemorySafeLinkedBlockingQueue<>(maxFreeMemory);
+        // all memory is reserved for JVM, so it will fail here
+        assertThat(queue.offer(() -> {
+        }), is(false));
+
+        // maxFreeMemory-objectSize Byte memory is reserved for the JVM, so this will succeed
+        queue.setMaxFreeMemory((int) (MemoryLimitCalculator.maxAvailable() - objectSize));
+        assertThat(queue.offer(() -> {
+        }), is(true));
+    }
+}