You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@dubbo.apache.org by GitBox <gi...@apache.org> on 2022/12/08 07:12:53 UTC

[GitHub] [dubbo] AlbumenJ commented on a diff in pull request #10885: Feature/modify xds subscribe

AlbumenJ commented on code in PR #10885:
URL: https://github.com/apache/dubbo/pull/10885#discussion_r1043000896


##########
dubbo-xds/src/main/java/org/apache/dubbo/registry/xds/util/protocol/impl/LdsProtocol.java:
##########
@@ -32,27 +34,75 @@
 import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.Rds;
 import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse;
 
-import java.util.Collections;
-import java.util.Objects;
+import java.util.HashMap;
 import java.util.Set;
-import java.util.function.Consumer;
+import java.util.Objects;
+import java.util.HashSet;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
+import java.util.function.Consumer;
 
 import static org.apache.dubbo.common.constants.LoggerCodeConstants.REGISTRY_ERROR_RESPONSE_XDS;
 
 public class LdsProtocol extends AbstractProtocol<ListenerResult, DeltaListener> {
-
     private static final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(LdsProtocol.class);
 
-    public LdsProtocol(XdsChannel xdsChannel, Node node, int pollingPoolSize, int pollingTimeout) {
-        super(xdsChannel, node, pollingPoolSize, pollingTimeout);
+    private StreamObserver<DiscoveryRequest> requestObserver;
+
+    private CompletableFuture<ListenerResult> future;
+    public LdsProtocol(XdsChannel xdsChannel, Node node, int pollingTimeout) {
+        super(xdsChannel, node, pollingTimeout);
     }
 
     @Override
     public String getTypeUrl() {
         return "type.googleapis.com/envoy.config.listener.v3.Listener";
     }
 
+    private HashMap<String, Object> resourcesMap = new HashMap<>();
+
+    @Override
+    public boolean isExistResource(Set<String> resourceNames) {
+        for (String resourceName : resourceNames) {
+            if (!resourcesMap.containsKey(resourceName)) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    @Override
+    public ListenerResult getCacheResource(Set<String> resourceNames) {
+        Set<String> resourceSet = new HashSet<>();
+        if (!resourceNames.isEmpty() && isExistResource(resourceNames)) {
+            for (String resourceName : resourceNames) {
+                resourceSet.add((String) resourcesMap.get(resourceName));
+            }
+        } else {
+            if (requestObserver == null) {
+                future = new CompletableFuture<>();
+                requestObserver = xdsChannel.createDeltaDiscoveryRequest(new ResponseObserver(future));
+            }
+            resourceNames.addAll(resourcesMap.keySet());
+            requestObserver.onNext(buildDiscoveryRequest(resourceNames));
+            try {
+                return future.get();
+            } catch (InterruptedException e) {
+                logger.error("InterruptedException occur when request control panel. error={}", e);
+                Thread.currentThread().interrupt();
+            }  catch (Exception e) {
+                logger.error("Error occur when request control panel. error=. ",e);
+            }
+        }
+        return new ListenerResult(resourceSet);
+    }

Review Comment:
   Move this method impl to `AbstractProtocol`.



##########
dubbo-xds/src/main/java/org/apache/dubbo/registry/xds/util/protocol/impl/EdsProtocol.java:
##########
@@ -43,15 +48,61 @@ public class EdsProtocol extends AbstractProtocol<EndpointResult, DeltaEndpoint>
 
     private static final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(EdsProtocol.class);
 
-    public EdsProtocol(XdsChannel xdsChannel, Node node, int pollingPoolSize, int pollingTimeout) {
-        super(xdsChannel, node, pollingPoolSize, pollingTimeout);
+    private StreamObserver<DiscoveryRequest> requestObserver;
+
+    private HashMap<String, Object> resourcesMap = new HashMap<>();
+
+
+    public EdsProtocol(XdsChannel xdsChannel, Node node, int pollingTimeout) {
+        super(xdsChannel, node, pollingTimeout);
     }
 
     @Override
     public String getTypeUrl() {
         return "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment";
     }
 
+    @Override
+    public boolean isExistResource(Set<String> resourceNames) {
+        for (String resourceName : resourceNames) {
+            if (!resourcesMap.containsKey(resourceName)) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    @Override
+    public EndpointResult getCacheResource(Set<String> resourceNames) {
+        Set<Endpoint> resourceSet = new HashSet<>();
+        if (!resourceNames.isEmpty() && isExistResource(resourceNames)) {
+            for (String resourceName : resourceNames) {
+                resourceSet.addAll((Set<Endpoint>) resourcesMap.get(resourceName));
+            }
+        } else {
+            CompletableFuture<EndpointResult> future = new CompletableFuture<>();
+            if (requestObserver == null) {
+                requestObserver = xdsChannel.createDeltaDiscoveryRequest(new ResponseObserver(future));
+            }

Review Comment:
   `future` will never get response if `requestObserver` is not null when `resourceNames` update.



##########
dubbo-xds/src/main/java/org/apache/dubbo/registry/xds/util/protocol/impl/RdsProtocol.java:
##########
@@ -43,22 +46,73 @@ public class RdsProtocol extends AbstractProtocol<RouteResult, DeltaRoute> {
 
     private static final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(RdsProtocol.class);
 
-    public RdsProtocol(XdsChannel xdsChannel, Node node, int pollingPoolSize, int pollingTimeout) {
-        super(xdsChannel, node, pollingPoolSize, pollingTimeout);
+    private StreamObserver<DiscoveryRequest> requestObserver;
+
+    private HashMap<String, Object> resourcesMap = new HashMap<>();

Review Comment:
   Use concurrent map



##########
dubbo-xds/src/main/java/org/apache/dubbo/registry/xds/util/protocol/AbstractProtocol.java:
##########
@@ -93,103 +65,23 @@ public AbstractProtocol(XdsChannel xdsChannel, Node node, int pollingPoolSize, i
      */
     public abstract String getTypeUrl();
 
+    public abstract boolean isExistResource(Set<String> resourceNames);
+
+    public abstract T getCacheResource(Set<String> resourceNames);
+
+    public abstract StreamObserver<DiscoveryRequest> getStreamObserver();
     @Override
     public T getResource(Set<String> resourceNames) {
-        long request = requestId.getAndIncrement();
         resourceNames = resourceNames == null ? Collections.emptySet() : resourceNames;
-
-        // Store Request Parameter, which will be used for ACK
-        requestParam.put(request, resourceNames);
-
-        // create observer
-        StreamObserver<DiscoveryRequest> requestObserver = xdsChannel.createDeltaDiscoveryRequest(new ResponseObserver(request));
-
-        // use future to get async result
-        CompletableFuture<T> future = new CompletableFuture<>();
-        requestObserverMap.put(request, requestObserver);
-        streamResult.put(request, future);
-
-        // send request to control panel
-        requestObserver.onNext(buildDiscoveryRequest(resourceNames));
-
-        try {
-            // get result
-            return future.get();
-        } catch (InterruptedException | ExecutionException e) {
-            logger.error(REGISTRY_ERROR_REQUEST_XDS, "", "", "Error occur when request control panel.");
-            return null;
-        } finally {
-            // close observer
-            //requestObserver.onCompleted();
-
-            // remove temp
-            streamResult.remove(request);
-            requestObserverMap.remove(request);
-            requestParam.remove(request);
-        }
+        return getCacheResource(resourceNames);
     }
-
     @Override
-    public long observeResource(Set<String> resourceNames, Consumer<T> consumer) {
-        long request = requestId.getAndIncrement();
+    public void observeResource(Set<String> resourceNames, Consumer<T> consumer) {
         resourceNames = resourceNames == null ? Collections.emptySet() : resourceNames;
-
-        // Store Request Parameter, which will be used for ACK
-        requestParam.put(request, resourceNames);
-
         // call once for full data
         consumer.accept(getResource(resourceNames));
-
-        // channel reused
-        StreamObserver<DiscoveryRequest> requestObserver = xdsChannel.createDeltaDiscoveryRequest(new ResponseObserver(request));
-        requestObserverMap.put(request, requestObserver);
-
-        ScheduledFuture<?> scheduledFuture = pollingExecutor.scheduleAtFixedRate(() -> {
-            try {
-                // origin request, may changed by updateObserve
-                Set<String> names = requestParam.get(request);
-
-                // use future to get async result, future complete on StreamObserver onNext
-                CompletableFuture<T> future = new CompletableFuture<>();
-                streamResult.put(request, future);
-
-                // observer reused
-                StreamObserver<DiscoveryRequest> observer = requestObserverMap.get(request);
-
-                if (observer == null) {
-                    observer = xdsChannel.createDeltaDiscoveryRequest(new ResponseObserver(request));
-                    requestObserverMap.put(request, observer);
-                }
-
-                // send request to control panel
-                observer.onNext(buildDiscoveryRequest(names));
-
-                try {
-                    // get result
-                    consumer.accept(future.get());
-                } catch (InterruptedException | ExecutionException e) {
-                    logger.error(REGISTRY_ERROR_REQUEST_XDS, "", "", "Error occur when request control panel.");
-                } finally {
-                    // close observer
-                    //requestObserver.onCompleted();
-
-                    // remove temp
-                    streamResult.remove(request);
-                }
-            } catch (Throwable t) {
-                logger.error(REGISTRY_ERROR_REQUEST_XDS, "", "", "Error when requesting observe data. Type: " + getTypeUrl(), t);
-            }
-        }, pollingTimeout, pollingTimeout, TimeUnit.SECONDS);
-
-        observeScheduledMap.put(request, scheduledFuture);
-
-        return request;
-    }
-
-    @Override
-    public void updateObserve(long request, Set<String> resourceNames) {
-        // send difference in resourceNames
-        requestParam.put(request, resourceNames);
+        this.observeResourcesName = resourceNames;
+        this.observeConsumer = consumer;

Review Comment:
   Should support multi observe.
   E.g. Consumer 1 consumer resource names 1. Consumer 2 consumer resource names 2.



##########
dubbo-xds/src/main/java/org/apache/dubbo/registry/xds/util/PilotExchanger.java:
##########
@@ -45,61 +46,51 @@ public class PilotExchanger {
     private ListenerResult listenerResult;
 
     private RouteResult routeResult;
-
-    private final AtomicLong observeRouteRequest = new AtomicLong(-1);
-
-    private final Map<String, Long> domainObserveRequest = new ConcurrentHashMap<>();
+    private final AtomicBoolean isRdsObserve = new AtomicBoolean(false);
+    private final HashSet<String> domainObserveRequest = new HashSet<>();
 
     private final Map<String, Set<Consumer<Set<Endpoint>>>> domainObserveConsumer = new ConcurrentHashMap<>();
 
     private PilotExchanger(URL url) {
         xdsChannel = new XdsChannel(url);
-        int pollingPoolSize = url.getParameter("pollingPoolSize", 10);
         int pollingTimeout = url.getParameter("pollingTimeout", 10);
-        LdsProtocol ldsProtocol = new LdsProtocol(xdsChannel, NodeBuilder.build(), pollingPoolSize, pollingTimeout);
-        this.rdsProtocol = new RdsProtocol(xdsChannel, NodeBuilder.build(), pollingPoolSize, pollingTimeout);
-        this.edsProtocol = new EdsProtocol(xdsChannel, NodeBuilder.build(), pollingPoolSize, pollingTimeout);
+        LdsProtocol ldsProtocol = new LdsProtocol(xdsChannel, NodeBuilder.build(), pollingTimeout);
+        this.rdsProtocol = new RdsProtocol(xdsChannel, NodeBuilder.build(), pollingTimeout);
+        this.edsProtocol = new EdsProtocol(xdsChannel, NodeBuilder.build(), pollingTimeout);
 
         this.listenerResult = ldsProtocol.getListeners();
         this.routeResult = rdsProtocol.getResource(listenerResult.getRouteConfigNames());
 
         // Observer RDS update
         if (CollectionUtils.isNotEmpty(listenerResult.getRouteConfigNames())) {
-            this.observeRouteRequest.set(createRouteObserve());
+            createRouteObserve();
+            isRdsObserve.set(true);
         }
+
         // Observe LDS updated
         ldsProtocol.observeListeners((newListener) -> {
             // update local cache
             if (!newListener.equals(listenerResult)) {
                 this.listenerResult = newListener;
                 // update RDS observation
-                synchronized (observeRouteRequest) {
-                    if (observeRouteRequest.get() == -1) {
-                        this.observeRouteRequest.set(createRouteObserve());
-                    } else {
-                        rdsProtocol.updateObserve(observeRouteRequest.get(), newListener.getRouteConfigNames());
-                    }
+                synchronized (isRdsObserve) {

Review Comment:
   Why sync on a atomic boolean object?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@dubbo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@dubbo.apache.org
For additional commands, e-mail: notifications-help@dubbo.apache.org