You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by al...@apache.org on 2021/10/25 04:50:03 UTC

[dubbo] branch 3.0 updated: Fix the problem of StatItem#isAllowable under multi-threading (#9096)

This is an automated email from the ASF dual-hosted git repository.

albumenj pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new a2ce5a0  Fix the problem of StatItem#isAllowable under multi-threading (#9096)
a2ce5a0 is described below

commit a2ce5a00dff0bf8f0b718d6b887fde11a010f251
Author: 灼华 <43...@users.noreply.github.com>
AuthorDate: Mon Oct 25 12:49:48 2021 +0800

    Fix the problem of StatItem#isAllowable under multi-threading (#9096)
---
 .../zookeeper/AbstractZookeeperTransporter.java    |  2 +-
 .../dubbo/remoting/zookeeper/ZookeeperClient.java  |  4 +-
 .../dubbo/remoting/transport/CodecSupportTest.java | 48 +++++++++++++++++
 .../org/apache/dubbo/rpc/filter/tps/StatItem.java  | 21 +++-----
 .../rpc/filter/tps/DefaultTPSLimiterTest.java      | 63 +++++++++++++++++++++-
 5 files changed, 120 insertions(+), 18 deletions(-)

diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/zookeeper/AbstractZookeeperTransporter.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/zookeeper/AbstractZookeeperTransporter.java
index 9dd266b..45bf79e 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/zookeeper/AbstractZookeeperTransporter.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/zookeeper/AbstractZookeeperTransporter.java
@@ -35,7 +35,7 @@ import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_KEY;
 /**
  * AbstractZookeeperTransporter is abstract implements of ZookeeperTransporter.
  * <p>
- * If you want to extends this, implements createZookeeperClient.
+ * If you want to extend this, implements createZookeeperClient.
  */
 public abstract class AbstractZookeeperTransporter implements ZookeeperTransporter {
     private static final Logger logger = LoggerFactory.getLogger(ZookeeperTransporter.class);
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/zookeeper/ZookeeperClient.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/zookeeper/ZookeeperClient.java
index 04db0f8..fdb6d88 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/zookeeper/ZookeeperClient.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/zookeeper/ZookeeperClient.java
@@ -33,13 +33,13 @@ public interface ZookeeperClient {
     List<String> addChildListener(String path, ChildListener listener);
 
     /**
-     * @param path:    directory. All of child of path will be listened.
+     * @param path:    directory. All child of path will be listened.
      * @param listener
      */
     void addDataListener(String path, DataListener listener);
 
     /**
-     * @param path:    directory. All of child of path will be listened.
+     * @param path:    directory. All child of path will be listened.
      * @param listener
      * @param executor another thread
      */
diff --git a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/transport/CodecSupportTest.java b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/transport/CodecSupportTest.java
new file mode 100644
index 0000000..0e9480e
--- /dev/null
+++ b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/transport/CodecSupportTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.transport;
+
+import org.apache.dubbo.common.serialize.ObjectOutput;
+import org.apache.dubbo.common.serialize.Serialization;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.InputStream;
+
+
+public class CodecSupportTest {
+
+    @Test
+    public void testHeartbeat() throws Exception {
+        Byte proto = CodecSupport.getIDByName("hessian2");
+        Serialization serialization = CodecSupport.getSerializationById(proto);
+        byte[] nullBytes = CodecSupport.getNullBytesOf(serialization);
+
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        ObjectOutput out = serialization.serialize(null, baos);
+        out.writeObject(null);
+        out.flushBuffer();
+        InputStream is = new ByteArrayInputStream(baos.toByteArray());
+        baos.close();
+        byte[] payload = CodecSupport.getPayload(is);
+
+        Assertions.assertArrayEquals(nullBytes, payload);
+        Assertions.assertTrue(CodecSupport.isHeartBeat(payload, proto));
+    }
+}
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/tps/StatItem.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/tps/StatItem.java
index 5a7a0ac..f3d25a8 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/tps/StatItem.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/tps/StatItem.java
@@ -16,7 +16,7 @@
  */
 package org.apache.dubbo.rpc.filter.tps;
 
-import java.util.concurrent.atomic.LongAdder;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * Judge whether a particular invocation of service provider method should be allowed within a configured time interval.
@@ -30,7 +30,7 @@ class StatItem {
 
     private long interval;
 
-    private LongAdder token;
+    private AtomicInteger token;
 
     private int rate;
 
@@ -39,20 +39,19 @@ class StatItem {
         this.rate = rate;
         this.interval = interval;
         this.lastResetTime = System.currentTimeMillis();
-        this.token = buildLongAdder(rate);
+        this.token = new AtomicInteger(rate);
     }
 
     public boolean isAllowable() {
         long now = System.currentTimeMillis();
         if (now > lastResetTime + interval) {
-            token = buildLongAdder(rate);
+            token.set(rate);
             lastResetTime = now;
         }
 
-        if (token.sum() <= 0) {
+        if (token.decrementAndGet() < 0) {
             return false;
         }
-        token.decrement();
         return true;
     }
 
@@ -70,8 +69,8 @@ class StatItem {
         return lastResetTime;
     }
 
-    long getToken() {
-        return token.sum();
+    int getToken() {
+        return token.get();
     }
 
     @Override
@@ -83,10 +82,4 @@ class StatItem {
                 .toString();
     }
 
-    private LongAdder buildLongAdder(int rate) {
-        LongAdder adder = new LongAdder();
-        adder.add(rate);
-        return adder;
-    }
-
 }
diff --git a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/tps/DefaultTPSLimiterTest.java b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/tps/DefaultTPSLimiterTest.java
index 38beb0c..c58dbd5 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/tps/DefaultTPSLimiterTest.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/tps/DefaultTPSLimiterTest.java
@@ -23,6 +23,10 @@ import org.apache.dubbo.rpc.support.MockInvocation;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
 import static org.apache.dubbo.common.constants.CommonConstants.INTERFACE_KEY;
 import static org.apache.dubbo.rpc.Constants.TPS_LIMIT_INTERVAL_KEY;
 import static org.apache.dubbo.rpc.Constants.TPS_LIMIT_RATE_KEY;
@@ -76,7 +80,64 @@ public class DefaultTPSLimiterTest {
         for (int i = 1; i <= tenTimesLimitRate; i++) {
             Assertions.assertTrue(defaultTPSLimiter.isAllowable(url, invocation));
         }
-        
+
         Assertions.assertFalse(defaultTPSLimiter.isAllowable(url, invocation));
     }
+
+    @Test
+    public void testMultiThread() throws InterruptedException {
+        Invocation invocation = new MockInvocation();
+        URL url = URL.valueOf("test://test");
+        url = url.addParameter(INTERFACE_KEY, "org.apache.dubbo.rpc.file.TpsService");
+        url = url.addParameter(TPS_LIMIT_RATE_KEY, 100);
+        url = url.addParameter(TPS_LIMIT_INTERVAL_KEY, 100000);
+
+        List<Task> taskList = new ArrayList<>();
+        int threadNum = 50;
+        CountDownLatch stopLatch = new CountDownLatch(threadNum);
+        CountDownLatch startLatch = new CountDownLatch(1);
+        for (int i = 0; i < threadNum; i++) {
+            taskList.add(new Task(defaultTPSLimiter, url, invocation, startLatch, stopLatch));
+
+        }
+        startLatch.countDown();
+        stopLatch.await();
+
+        Assertions.assertEquals(taskList.stream().map(task -> task.getCount()).reduce((a, b) -> a + b).get(), 100);
+    }
+
+    class Task implements Runnable {
+        private DefaultTPSLimiter defaultTPSLimiter;
+        private URL url;
+        private Invocation invocation;
+        private CountDownLatch startLatch;
+        private CountDownLatch stopLatch;
+        private int count;
+
+        public Task(DefaultTPSLimiter defaultTPSLimiter, URL url, Invocation invocation, CountDownLatch startLatch, CountDownLatch stopLatch) {
+            this.defaultTPSLimiter = defaultTPSLimiter;
+            this.url = url;
+            this.invocation = invocation;
+            this.startLatch = startLatch;
+            this.stopLatch = stopLatch;
+            new Thread(this).start();
+        }
+
+        @Override
+        public void run() {
+            try {
+                startLatch.await();
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+            for (int j = 0; j < 10000; j++) {
+                count = defaultTPSLimiter.isAllowable(url, invocation) ? count + 1 : count;
+            }
+            stopLatch.countDown();
+        }
+
+        public int getCount() {
+            return count;
+        }
+    }
 }