You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2018/12/06 03:27:35 UTC

[incubator-skywalking] branch refactor-dispatcher-manager created (now 1ed574e)

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a change to branch refactor-dispatcher-manager
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git.


      at 1ed574e  Remove the hard codes about DispatcherManager.

This branch includes the following new commits:

     new 1ed574e  Remove the hard codes about DispatcherManager.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-skywalking] 01/01: Remove the hard codes about DispatcherManager.

Posted by wu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch refactor-dispatcher-manager
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git

commit 1ed574e5a949f01db40d65ab9aa8669d18fb28cd
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Thu Dec 6 11:27:23 2018 +0800

    Remove the hard codes about DispatcherManager.
---
 .../oap/server/core/CoreModuleProvider.java        |  8 +-
 .../server/core/analysis/DispatcherManager.java    | 92 ++++++++++++++--------
 .../oap/server/core/source/SourceReceiverImpl.java |  5 ++
 3 files changed, 69 insertions(+), 36 deletions(-)

diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
index 20ea0bc..b41a19a 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
@@ -59,6 +59,7 @@ public class CoreModuleProvider extends ModuleProvider {
     private final StorageAnnotationListener storageAnnotationListener;
     private final StreamAnnotationListener streamAnnotationListener;
     private final StreamDataAnnotationContainer streamDataAnnotationContainer;
+    private final SourceReceiverImpl receiver;
 
     public CoreModuleProvider() {
         super();
@@ -67,6 +68,7 @@ public class CoreModuleProvider extends ModuleProvider {
         this.storageAnnotationListener = new StorageAnnotationListener();
         this.streamAnnotationListener = new StreamAnnotationListener();
         this.streamDataAnnotationContainer = new StreamDataAnnotationContainer();
+        receiver = new SourceReceiverImpl();
     }
 
     @Override public String name() {
@@ -101,7 +103,7 @@ public class CoreModuleProvider extends ModuleProvider {
 
         this.registerServiceImplementation(IComponentLibraryCatalogService.class, new ComponentLibraryCatalogService());
 
-        this.registerServiceImplementation(SourceReceiver.class, new SourceReceiverImpl());
+        this.registerServiceImplementation(SourceReceiver.class, receiver);
 
         this.registerServiceImplementation(StreamDataClassGetter.class, streamDataAnnotationContainer);
 
@@ -143,10 +145,12 @@ public class CoreModuleProvider extends ModuleProvider {
         remoteClientManager.start();
 
         try {
+            receiver.scan();
+
             annotationScan.scan(() -> {
                 streamDataAnnotationContainer.generate(streamAnnotationListener.getStreamClasses());
             });
-        } catch (IOException e) {
+        } catch (IOException | IllegalAccessException | InstantiationException e) {
             throw new ModuleStartException(e.getMessage(), e);
         }
     }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java
index 5e55a75..42b3f94 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java
@@ -18,23 +18,20 @@
 
 package org.apache.skywalking.oap.server.core.analysis;
 
-import java.util.*;
-import org.apache.skywalking.oap.server.core.analysis.generated.all.AllDispatcher;
-import org.apache.skywalking.oap.server.core.analysis.generated.endpoint.EndpointDispatcher;
-import org.apache.skywalking.oap.server.core.analysis.generated.endpointrelation.EndpointRelationDispatcher;
-import org.apache.skywalking.oap.server.core.analysis.generated.service.ServiceDispatcher;
-import org.apache.skywalking.oap.server.core.analysis.generated.serviceinstance.ServiceInstanceDispatcher;
-import org.apache.skywalking.oap.server.core.analysis.generated.serviceinstancejvmcpu.ServiceInstanceJVMCPUDispatcher;
-import org.apache.skywalking.oap.server.core.analysis.generated.serviceinstancejvmgc.ServiceInstanceJVMGCDispatcher;
-import org.apache.skywalking.oap.server.core.analysis.generated.serviceinstancejvmmemory.ServiceInstanceJVMMemoryDispatcher;
-import org.apache.skywalking.oap.server.core.analysis.generated.serviceinstancejvmmemorypool.ServiceInstanceJVMMemoryPoolDispatcher;
-import org.apache.skywalking.oap.server.core.analysis.generated.serviceinstancerelation.ServiceInstanceRelationDispatcher;
-import org.apache.skywalking.oap.server.core.analysis.generated.servicerelation.ServiceRelationDispatcher;
-import org.apache.skywalking.oap.server.core.analysis.manual.endpointrelation.EndpointCallRelationDispatcher;
-import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentDispatcher;
-import org.apache.skywalking.oap.server.core.analysis.manual.servicerelation.ServiceCallRelationDispatcher;
-import org.apache.skywalking.oap.server.core.source.*;
-import org.slf4j.*;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.reflect.ClassPath;
+import java.io.IOException;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.skywalking.oap.server.core.UnexpectedException;
+import org.apache.skywalking.oap.server.core.source.Scope;
+import org.apache.skywalking.oap.server.core.source.Source;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * @author peng-yongsheng
@@ -43,32 +40,59 @@ public class DispatcherManager {
 
     private static final Logger logger = LoggerFactory.getLogger(DispatcherManager.class);
 
-    private Map<Scope, SourceDispatcher[]> dispatcherMap;
+    private Map<Scope, List<SourceDispatcher>> dispatcherMap;
 
     public DispatcherManager() {
         this.dispatcherMap = new HashMap<>();
+    }
 
-        this.dispatcherMap.put(Scope.All, new SourceDispatcher[] {new AllDispatcher()});
+    public void forward(Source source) {
+        for (SourceDispatcher dispatcher : dispatcherMap.get(source.scope())) {
+            dispatcher.dispatch(source);
+        }
+    }
 
-        this.dispatcherMap.put(Scope.Segment, new SourceDispatcher[] {new SegmentDispatcher()});
+    public void scan() throws IOException, IllegalAccessException, InstantiationException {
+        ClassPath classpath = ClassPath.from(this.getClass().getClassLoader());
+        ImmutableSet<ClassPath.ClassInfo> classes = classpath.getTopLevelClassesRecursive("org.apache.skywalking");
+        for (ClassPath.ClassInfo classInfo : classes) {
+            Class<?> aClass = classInfo.load();
 
-        this.dispatcherMap.put(Scope.Service, new SourceDispatcher[] {new ServiceDispatcher()});
-        this.dispatcherMap.put(Scope.ServiceInstance, new SourceDispatcher[] {new ServiceInstanceDispatcher()});
-        this.dispatcherMap.put(Scope.Endpoint, new SourceDispatcher[] {new EndpointDispatcher()});
+            if (!aClass.isInterface() && SourceDispatcher.class.isAssignableFrom(aClass)) {
+                Type[] genericInterfaces = aClass.getGenericInterfaces();
+                for (Type genericInterface : genericInterfaces) {
+                    ParameterizedType anInterface = (ParameterizedType)genericInterface;
+                    if (anInterface.getRawType().getTypeName().equals(SourceDispatcher.class.getName())) {
+                        Type[] arguments = anInterface.getActualTypeArguments();
 
-        this.dispatcherMap.put(Scope.ServiceRelation, new SourceDispatcher[] {new ServiceRelationDispatcher(), new ServiceCallRelationDispatcher()});
-        this.dispatcherMap.put(Scope.ServiceInstanceRelation, new SourceDispatcher[] {new ServiceInstanceRelationDispatcher()});
-        this.dispatcherMap.put(Scope.EndpointRelation, new SourceDispatcher[] {new EndpointRelationDispatcher(), new EndpointCallRelationDispatcher()});
+                        if (arguments.length != 1) {
+                            throw new UnexpectedException("unexpected type argument number, class " + aClass.getName());
+                        }
+                        Type argument = arguments[0];
 
-        this.dispatcherMap.put(Scope.ServiceInstanceJVMCPU, new SourceDispatcher[] {new ServiceInstanceJVMCPUDispatcher()});
-        this.dispatcherMap.put(Scope.ServiceInstanceJVMGC, new SourceDispatcher[] {new ServiceInstanceJVMGCDispatcher()});
-        this.dispatcherMap.put(Scope.ServiceInstanceJVMMemory, new SourceDispatcher[] {new ServiceInstanceJVMMemoryDispatcher()});
-        this.dispatcherMap.put(Scope.ServiceInstanceJVMMemoryPool, new SourceDispatcher[] {new ServiceInstanceJVMMemoryPoolDispatcher()});
-    }
+                        Object source = ((Class)argument).newInstance();
 
-    public void forward(Source source) {
-        for (SourceDispatcher dispatcher : dispatcherMap.get(source.scope())) {
-            dispatcher.dispatch(source);
+                        if (!Source.class.isAssignableFrom(source.getClass())) {
+                            throw new UnexpectedException("unexpected type argument of class " + aClass.getName() + ", should be `org.apache.skywalking.oap.server.core.source.Source`. ");
+                        }
+
+                        Source dispatcherSource = (Source)source;
+                        SourceDispatcher dispatcher = (SourceDispatcher)aClass.newInstance();
+
+                        Scope scope = dispatcherSource.scope();
+
+                        List<SourceDispatcher> dispatchers = this.dispatcherMap.get(scope);
+                        if (dispatchers == null) {
+                            dispatchers = new ArrayList<>();
+                            this.dispatcherMap.put(scope, dispatchers);
+                        }
+
+                        dispatchers.add(dispatcher);
+
+                        logger.info("Dispatcher {} is added into Scope {}.", dispatcher.getClass().getName(), scope);
+                    }
+                }
+            }
         }
     }
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/SourceReceiverImpl.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/SourceReceiverImpl.java
index 853c99b..4c48efd 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/SourceReceiverImpl.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/SourceReceiverImpl.java
@@ -18,6 +18,7 @@
 
 package org.apache.skywalking.oap.server.core.source;
 
+import java.io.IOException;
 import org.apache.skywalking.oap.server.core.analysis.DispatcherManager;
 
 /**
@@ -34,4 +35,8 @@ public class SourceReceiverImpl implements SourceReceiver {
     @Override public void receive(Source source) {
         dispatcherManager.forward(source);
     }
+
+    public void scan() throws IOException, InstantiationException, IllegalAccessException {
+        dispatcherManager.scan();
+    }
 }