You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@dubbo.apache.org by GitBox <gi...@apache.org> on 2021/09/30 06:51:15 UTC

[GitHub] [dubbo] guohao commented on a change in pull request #8967: Dubbo3.0 :adaptive service

guohao commented on a change in pull request #8967:
URL: https://github.com/apache/dubbo/pull/8967#discussion_r719102751



##########
File path: dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/adaptive/Node.java
##########
@@ -0,0 +1,39 @@
+package org.apache.dubbo.rpc.cluster.adaptive;
+
+import org.apache.dubbo.rpc.Invoker;
+
+public class Node {
+    private volatile int remain;
+    private Invoker invoker;
+    private String address;
+
+    public Node(Invoker invoker){
+        this();
+        this.invoker = invoker;
+        this.address = invoker.getUrl().getBackupAddress();

Review comment:
       why use backup address?

##########
File path: dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/adaptive/AdaptiveFilter.java
##########
@@ -0,0 +1,37 @@
+package org.apache.dubbo.rpc.cluster.adaptive;
+
+import org.apache.dubbo.common.constants.CommonConstants;
+import org.apache.dubbo.common.extension.Activate;
+import org.apache.dubbo.rpc.*;
+import org.apache.dubbo.rpc.cluster.loadbalance.P2CLoadBalance;
+
+import java.util.function.Supplier;
+
+@Activate(group = CommonConstants.CONSUMER)
+public class AdaptiveFilter implements Filter, BaseFilter.Listener {
+    private static final Supplier<Long> clock = System::nanoTime;
+
+    @Override
+    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
+        try {
+            Result result = invoker.invoke(invocation);
+            return result;
+        } catch (Exception e) {
+            throw e;
+        }
+
+    }
+
+    @Override
+    public void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation) {
+        String remain = appResponse.getAttachment("remain");
+        String limit = appResponse.getAttachment("limit");
+        long pickTime = (Long)invocation.get("pickTime");
+        P2CLoadBalance.updateNodes(Integer.parseInt(remain),invocation.getMethodName(),invoker.getUrl().getBackupAddress(),clock.get() - pickTime);

Review comment:
       use SPI or pass reference here to reduce static code

##########
File path: dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/adaptive/AdaptiveFilter.java
##########
@@ -0,0 +1,37 @@
+package org.apache.dubbo.rpc.cluster.adaptive;
+
+import org.apache.dubbo.common.constants.CommonConstants;
+import org.apache.dubbo.common.extension.Activate;
+import org.apache.dubbo.rpc.*;
+import org.apache.dubbo.rpc.cluster.loadbalance.P2CLoadBalance;
+
+import java.util.function.Supplier;
+
+@Activate(group = CommonConstants.CONSUMER)
+public class AdaptiveFilter implements Filter, BaseFilter.Listener {
+    private static final Supplier<Long> clock = System::nanoTime;
+
+    @Override
+    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
+        try {

Review comment:
       unused try catch

##########
File path: dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/loadbalance/P2CLoadBalance.java
##########
@@ -0,0 +1,101 @@
+package org.apache.dubbo.rpc.cluster.loadbalance;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.rpc.Invocation;
+import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.RpcException;
+import org.apache.dubbo.rpc.cluster.LoadBalance;
+import org.apache.dubbo.rpc.cluster.adaptive.Node;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Supplier;
+
+public class P2CLoadBalance implements LoadBalance {
+    private static final ConcurrentHashMap<String, List<Node>> method2Nodes = new ConcurrentHashMap<>(32);
+    private static final Supplier<Long> clock = System::nanoTime;
+    private final ReentrantLock lock = new ReentrantLock();
+
+    @Override
+    public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException {
+        //update method2Nodes
+        updatemethod2Nodes(invokers,invocation);
+        //prepick
+        Node[] prepick = prepick(invocation);
+        //pick
+        Invoker pick = pick(prepick);
+        //set pickTime
+        invocation.put("pickTime",clock.get());
+        return pick;
+    }
+
+    private <T> void updatemethod2Nodes(List<Invoker<T>> invokers,Invocation invocation){
+        //todo 注册中心的健康检查机制
+        String methodName = invocation.getMethodName();
+        if (method2Nodes.containsKey(methodName))return;
+        try{
+            lock.lock();
+            method2Nodes.computeIfAbsent(methodName,k-> new ArrayList<>());

Review comment:
       will it be better to use `set` for lookup ?

##########
File path: dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/adaptive/AdaptiveFilter.java
##########
@@ -0,0 +1,37 @@
+package org.apache.dubbo.rpc.cluster.adaptive;
+
+import org.apache.dubbo.common.constants.CommonConstants;
+import org.apache.dubbo.common.extension.Activate;
+import org.apache.dubbo.rpc.*;
+import org.apache.dubbo.rpc.cluster.loadbalance.P2CLoadBalance;
+
+import java.util.function.Supplier;
+
+@Activate(group = CommonConstants.CONSUMER)
+public class AdaptiveFilter implements Filter, BaseFilter.Listener {
+    private static final Supplier<Long> clock = System::nanoTime;
+
+    @Override
+    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
+        try {
+            Result result = invoker.invoke(invocation);
+            return result;
+        } catch (Exception e) {
+            throw e;
+        }
+
+    }
+
+    @Override
+    public void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation) {
+        String remain = appResponse.getAttachment("remain");

Review comment:
       try use constant instead of literal

##########
File path: dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/loadbalance/P2CLoadBalance.java
##########
@@ -0,0 +1,101 @@
+package org.apache.dubbo.rpc.cluster.loadbalance;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.rpc.Invocation;
+import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.RpcException;
+import org.apache.dubbo.rpc.cluster.LoadBalance;
+import org.apache.dubbo.rpc.cluster.adaptive.Node;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Supplier;
+
+public class P2CLoadBalance implements LoadBalance {
+    private static final ConcurrentHashMap<String, List<Node>> method2Nodes = new ConcurrentHashMap<>(32);
+    private static final Supplier<Long> clock = System::nanoTime;
+    private final ReentrantLock lock = new ReentrantLock();
+
+    @Override
+    public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException {
+        //update method2Nodes
+        updatemethod2Nodes(invokers,invocation);
+        //prepick
+        Node[] prepick = prepick(invocation);
+        //pick
+        Invoker pick = pick(prepick);
+        //set pickTime
+        invocation.put("pickTime",clock.get());
+        return pick;
+    }
+
+    private <T> void updatemethod2Nodes(List<Invoker<T>> invokers,Invocation invocation){
+        //todo 注册中心的健康检查机制
+        String methodName = invocation.getMethodName();
+        if (method2Nodes.containsKey(methodName))return;

Review comment:
       brace

##########
File path: dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/AdaptiveFilter.java
##########
@@ -0,0 +1,70 @@
+package org.apache.dubbo.rpc.filter;
+
+import org.apache.dubbo.common.constants.CommonConstants;
+import org.apache.dubbo.common.extension.Activate;
+import org.apache.dubbo.rpc.*;
+import org.apache.dubbo.rpc.filter.limiter.AbstractLimiter;
+import org.apache.dubbo.rpc.filter.limiter.Limiter;
+import org.apache.dubbo.rpc.filter.limiter.SimpleLimiter;
+
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Supplier;
+
+/**
+ * 服务端过滤器
+ * 可选接口
+ * 此类可以修改实现,不可以移动类或者修改包名
+ * 用户可以在服务端拦截请求和响应,捕获 rpc 调用时产生、服务端返回的已知异常。
+ */
+@Activate(group = CommonConstants.PROVIDER)
+public class AdaptiveFilter implements Filter, BaseFilter.Listener {
+    private static final ConcurrentHashMap<String, Limiter> name2limiter = new ConcurrentHashMap<>();

Review comment:
       Is it accessary to use static?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@dubbo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@dubbo.apache.org
For additional commands, e-mail: notifications-help@dubbo.apache.org