You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by al...@apache.org on 2021/10/25 04:50:03 UTC
[dubbo] branch 3.0 updated: Fix the problem of StatItem#isAllowable
under multi-threading (#9096)
This is an automated email from the ASF dual-hosted git repository.
albumenj pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.0 by this push:
new a2ce5a0 Fix the problem of StatItem#isAllowable under multi-threading (#9096)
a2ce5a0 is described below
commit a2ce5a00dff0bf8f0b718d6b887fde11a010f251
Author: 灼华 <43...@users.noreply.github.com>
AuthorDate: Mon Oct 25 12:49:48 2021 +0800
Fix the problem of StatItem#isAllowable under multi-threading (#9096)
---
.../zookeeper/AbstractZookeeperTransporter.java | 2 +-
.../dubbo/remoting/zookeeper/ZookeeperClient.java | 4 +-
.../dubbo/remoting/transport/CodecSupportTest.java | 48 +++++++++++++++++
.../org/apache/dubbo/rpc/filter/tps/StatItem.java | 21 +++-----
.../rpc/filter/tps/DefaultTPSLimiterTest.java | 63 +++++++++++++++++++++-
5 files changed, 120 insertions(+), 18 deletions(-)
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/zookeeper/AbstractZookeeperTransporter.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/zookeeper/AbstractZookeeperTransporter.java
index 9dd266b..45bf79e 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/zookeeper/AbstractZookeeperTransporter.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/zookeeper/AbstractZookeeperTransporter.java
@@ -35,7 +35,7 @@ import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_KEY;
/**
* AbstractZookeeperTransporter is abstract implements of ZookeeperTransporter.
* <p>
- * If you want to extends this, implements createZookeeperClient.
+ * If you want to extend this, implements createZookeeperClient.
*/
public abstract class AbstractZookeeperTransporter implements ZookeeperTransporter {
private static final Logger logger = LoggerFactory.getLogger(ZookeeperTransporter.class);
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/zookeeper/ZookeeperClient.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/zookeeper/ZookeeperClient.java
index 04db0f8..fdb6d88 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/zookeeper/ZookeeperClient.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/zookeeper/ZookeeperClient.java
@@ -33,13 +33,13 @@ public interface ZookeeperClient {
List<String> addChildListener(String path, ChildListener listener);
/**
- * @param path: directory. All of child of path will be listened.
+ * @param path: directory. All child of path will be listened.
* @param listener
*/
void addDataListener(String path, DataListener listener);
/**
- * @param path: directory. All of child of path will be listened.
+ * @param path: directory. All child of path will be listened.
* @param listener
* @param executor another thread
*/
diff --git a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/transport/CodecSupportTest.java b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/transport/CodecSupportTest.java
new file mode 100644
index 0000000..0e9480e
--- /dev/null
+++ b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/transport/CodecSupportTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.transport;
+
+import org.apache.dubbo.common.serialize.ObjectOutput;
+import org.apache.dubbo.common.serialize.Serialization;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.InputStream;
+
+
+public class CodecSupportTest {
+
+ @Test
+ public void testHeartbeat() throws Exception {
+ Byte proto = CodecSupport.getIDByName("hessian2");
+ Serialization serialization = CodecSupport.getSerializationById(proto);
+ byte[] nullBytes = CodecSupport.getNullBytesOf(serialization);
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectOutput out = serialization.serialize(null, baos);
+ out.writeObject(null);
+ out.flushBuffer();
+ InputStream is = new ByteArrayInputStream(baos.toByteArray());
+ baos.close();
+ byte[] payload = CodecSupport.getPayload(is);
+
+ Assertions.assertArrayEquals(nullBytes, payload);
+ Assertions.assertTrue(CodecSupport.isHeartBeat(payload, proto));
+ }
+}
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/tps/StatItem.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/tps/StatItem.java
index 5a7a0ac..f3d25a8 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/tps/StatItem.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/tps/StatItem.java
@@ -16,7 +16,7 @@
*/
package org.apache.dubbo.rpc.filter.tps;
-import java.util.concurrent.atomic.LongAdder;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* Judge whether a particular invocation of service provider method should be allowed within a configured time interval.
@@ -30,7 +30,7 @@ class StatItem {
private long interval;
- private LongAdder token;
+ private AtomicInteger token;
private int rate;
@@ -39,20 +39,19 @@ class StatItem {
this.rate = rate;
this.interval = interval;
this.lastResetTime = System.currentTimeMillis();
- this.token = buildLongAdder(rate);
+ this.token = new AtomicInteger(rate);
}
public boolean isAllowable() {
long now = System.currentTimeMillis();
if (now > lastResetTime + interval) {
- token = buildLongAdder(rate);
+ token.set(rate);
lastResetTime = now;
}
- if (token.sum() <= 0) {
+ if (token.decrementAndGet() < 0) {
return false;
}
- token.decrement();
return true;
}
@@ -70,8 +69,8 @@ class StatItem {
return lastResetTime;
}
- long getToken() {
- return token.sum();
+ int getToken() {
+ return token.get();
}
@Override
@@ -83,10 +82,4 @@ class StatItem {
.toString();
}
- private LongAdder buildLongAdder(int rate) {
- LongAdder adder = new LongAdder();
- adder.add(rate);
- return adder;
- }
-
}
diff --git a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/tps/DefaultTPSLimiterTest.java b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/tps/DefaultTPSLimiterTest.java
index 38beb0c..c58dbd5 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/tps/DefaultTPSLimiterTest.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/tps/DefaultTPSLimiterTest.java
@@ -23,6 +23,10 @@ import org.apache.dubbo.rpc.support.MockInvocation;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
import static org.apache.dubbo.common.constants.CommonConstants.INTERFACE_KEY;
import static org.apache.dubbo.rpc.Constants.TPS_LIMIT_INTERVAL_KEY;
import static org.apache.dubbo.rpc.Constants.TPS_LIMIT_RATE_KEY;
@@ -76,7 +80,64 @@ public class DefaultTPSLimiterTest {
for (int i = 1; i <= tenTimesLimitRate; i++) {
Assertions.assertTrue(defaultTPSLimiter.isAllowable(url, invocation));
}
-
+
Assertions.assertFalse(defaultTPSLimiter.isAllowable(url, invocation));
}
+
+ @Test
+ public void testMultiThread() throws InterruptedException {
+ Invocation invocation = new MockInvocation();
+ URL url = URL.valueOf("test://test");
+ url = url.addParameter(INTERFACE_KEY, "org.apache.dubbo.rpc.file.TpsService");
+ url = url.addParameter(TPS_LIMIT_RATE_KEY, 100);
+ url = url.addParameter(TPS_LIMIT_INTERVAL_KEY, 100000);
+
+ List<Task> taskList = new ArrayList<>();
+ int threadNum = 50;
+ CountDownLatch stopLatch = new CountDownLatch(threadNum);
+ CountDownLatch startLatch = new CountDownLatch(1);
+ for (int i = 0; i < threadNum; i++) {
+ taskList.add(new Task(defaultTPSLimiter, url, invocation, startLatch, stopLatch));
+
+ }
+ startLatch.countDown();
+ stopLatch.await();
+
+ Assertions.assertEquals(taskList.stream().map(task -> task.getCount()).reduce((a, b) -> a + b).get(), 100);
+ }
+
+ class Task implements Runnable {
+ private DefaultTPSLimiter defaultTPSLimiter;
+ private URL url;
+ private Invocation invocation;
+ private CountDownLatch startLatch;
+ private CountDownLatch stopLatch;
+ private int count;
+
+ public Task(DefaultTPSLimiter defaultTPSLimiter, URL url, Invocation invocation, CountDownLatch startLatch, CountDownLatch stopLatch) {
+ this.defaultTPSLimiter = defaultTPSLimiter;
+ this.url = url;
+ this.invocation = invocation;
+ this.startLatch = startLatch;
+ this.stopLatch = stopLatch;
+ new Thread(this).start();
+ }
+
+ @Override
+ public void run() {
+ try {
+ startLatch.await();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ for (int j = 0; j < 10000; j++) {
+ count = defaultTPSLimiter.isAllowable(url, invocation) ? count + 1 : count;
+ }
+ stopLatch.countDown();
+ }
+
+ public int getCount() {
+ return count;
+ }
+ }
}