You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@skywalking.apache.org by wu...@apache.org on 2018/09/13 06:53:25 UTC

[incubator-skywalking] branch new-indicator-core updated: Add service call as manual indicator and refactor the core DIspatcherManager

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch new-indicator-core
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git


The following commit(s) were added to refs/heads/new-indicator-core by this push:
     new 9859635  Add service call as manual indicator and refactor the core DIspatcherManager
9859635 is described below

commit 9859635a26c77061f4649b0512046b9d88c57fca
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Thu Sep 13 14:53:19 2018 +0800

    Add service call as manual indicator and refactor the core DIspatcherManager
---
 .../server/core/analysis/DispatcherManager.java    |  36 ++---
 .../ServiceCallRelationDispatcher.java             |  41 ++++++
 .../ServiceCallRelationIndicator.java              | 155 +++++++++++++++++++++
 .../oap/server/core/source/SourceReceiverImpl.java |   2 +-
 4 files changed, 217 insertions(+), 17 deletions(-)

diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java
index cd6c508..7a44a31 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java
@@ -31,7 +31,9 @@ import org.apache.skywalking.oap.server.core.analysis.generated.serviceinstancej
 import org.apache.skywalking.oap.server.core.analysis.generated.serviceinstancerelation.ServiceInstanceRelationDispatcher;
 import org.apache.skywalking.oap.server.core.analysis.generated.servicerelation.ServiceRelationDispatcher;
 import org.apache.skywalking.oap.server.core.analysis.manual.service.*;
+import org.apache.skywalking.oap.server.core.analysis.manual.servicerelation.ServiceCallRelationDispatcher;
 import org.apache.skywalking.oap.server.core.source.Scope;
+import org.apache.skywalking.oap.server.core.source.Source;
 import org.slf4j.*;
 
 /**
@@ -41,31 +43,33 @@ public class DispatcherManager {
 
     private static final Logger logger = LoggerFactory.getLogger(DispatcherManager.class);
 
-    private Map<Scope, SourceDispatcher> dispatcherMap;
+    private Map<Scope, SourceDispatcher[]> dispatcherMap;
 
     public DispatcherManager() {
         this.dispatcherMap = new HashMap<>();
 
-        this.dispatcherMap.put(Scope.All, new AllDispatcher());
+        this.dispatcherMap.put(Scope.All, new SourceDispatcher[] {new AllDispatcher()});
 
-        this.dispatcherMap.put(Scope.Service, new ServiceDispatcher());
-        this.dispatcherMap.put(Scope.ServiceInstance, new ServiceInstanceDispatcher());
-        this.dispatcherMap.put(Scope.Endpoint, new EndpointDispatcher());
+        this.dispatcherMap.put(Scope.Service, new SourceDispatcher[] {new ServiceDispatcher()});
+        this.dispatcherMap.put(Scope.ServiceInstance, new SourceDispatcher[] {new ServiceInstanceDispatcher()});
+        this.dispatcherMap.put(Scope.Endpoint, new SourceDispatcher[] {new EndpointDispatcher()});
 
-        this.dispatcherMap.put(Scope.ServiceComponent, new ServiceComponentDispatcher());
-        this.dispatcherMap.put(Scope.ServiceMapping, new ServiceMappingDispatcher());
+        this.dispatcherMap.put(Scope.ServiceComponent, new SourceDispatcher[] {new ServiceComponentDispatcher()});
+        this.dispatcherMap.put(Scope.ServiceMapping, new SourceDispatcher[] {new ServiceMappingDispatcher()});
 
-        this.dispatcherMap.put(Scope.ServiceRelation, new ServiceRelationDispatcher());
-        this.dispatcherMap.put(Scope.ServiceInstanceRelation, new ServiceInstanceRelationDispatcher());
-        this.dispatcherMap.put(Scope.EndpointRelation, new EndpointRelationDispatcher());
+        this.dispatcherMap.put(Scope.ServiceRelation, new SourceDispatcher[] {new ServiceRelationDispatcher(), new ServiceCallRelationDispatcher()});
+        this.dispatcherMap.put(Scope.ServiceInstanceRelation, new SourceDispatcher[] {new ServiceInstanceRelationDispatcher()});
+        this.dispatcherMap.put(Scope.EndpointRelation, new SourceDispatcher[] {new EndpointRelationDispatcher()});
 
-        this.dispatcherMap.put(Scope.ServiceInstanceJVMCPU, new ServiceInstanceJVMCPUDispatcher());
-        this.dispatcherMap.put(Scope.ServiceInstanceJVMGC, new ServiceInstanceJVMGCDispatcher());
-        this.dispatcherMap.put(Scope.ServiceInstanceJVMMemory, new ServiceInstanceJVMMemoryDispatcher());
-        this.dispatcherMap.put(Scope.ServiceInstanceJVMMemoryPool, new ServiceInstanceJVMMemoryPoolDispatcher());
+        this.dispatcherMap.put(Scope.ServiceInstanceJVMCPU, new SourceDispatcher[] {new ServiceInstanceJVMCPUDispatcher()});
+        this.dispatcherMap.put(Scope.ServiceInstanceJVMGC, new SourceDispatcher[] {new ServiceInstanceJVMGCDispatcher()});
+        this.dispatcherMap.put(Scope.ServiceInstanceJVMMemory, new SourceDispatcher[] {new ServiceInstanceJVMMemoryDispatcher()});
+        this.dispatcherMap.put(Scope.ServiceInstanceJVMMemoryPool, new SourceDispatcher[] {new ServiceInstanceJVMMemoryPoolDispatcher()});
     }
 
-    public SourceDispatcher getDispatcher(Scope scope) {
-        return dispatcherMap.get(scope);
+    public void forward(Source source) {
+        for (SourceDispatcher dispatcher : dispatcherMap.get(source.scope())) {
+            dispatcher.dispatch(source);
+        }
     }
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/servicerelation/ServiceCallRelationDispatcher.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/servicerelation/ServiceCallRelationDispatcher.java
new file mode 100644
index 0000000..aad015f
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/servicerelation/ServiceCallRelationDispatcher.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.manual.servicerelation;
+
+import org.apache.skywalking.oap.server.core.analysis.SourceDispatcher;
+import org.apache.skywalking.oap.server.core.analysis.worker.IndicatorProcess;
+import org.apache.skywalking.oap.server.core.source.ServiceRelation;
+
+/**
+ * @author wusheng
+ */
+public class ServiceCallRelationDispatcher implements SourceDispatcher<ServiceRelation> {
+    @Override
+    public void dispatch(ServiceRelation source) {
+        doDispatch(source);
+    }
+
+    public void doDispatch(ServiceRelation source) {
+        ServiceCallRelationIndicator indicator = new ServiceCallRelationIndicator();
+        indicator.setTimeBucket(source.getTimeBucket());
+        indicator.setSourceServiceId(source.getSourceServiceId());
+        indicator.setDestServiceId(source.getDestServiceId());
+        IndicatorProcess.INSTANCE.in(indicator);
+    }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/servicerelation/ServiceCallRelationIndicator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/servicerelation/ServiceCallRelationIndicator.java
new file mode 100644
index 0000000..0663607
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/servicerelation/ServiceCallRelationIndicator.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.manual.servicerelation;
+
+import java.util.HashMap;
+import java.util.Map;
+import lombok.Getter;
+import lombok.Setter;
+import org.apache.skywalking.oap.server.core.Const;
+import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
+import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.IndicatorType;
+import org.apache.skywalking.oap.server.core.remote.annotation.StreamData;
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
+import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
+import org.apache.skywalking.oap.server.core.storage.annotation.Column;
+import org.apache.skywalking.oap.server.core.storage.annotation.IDColumn;
+import org.apache.skywalking.oap.server.core.storage.annotation.StorageEntity;
+
+@IndicatorType
+@StreamData
+@StorageEntity(name = ServiceCallRelationIndicator.INDEX_NAME, builder = ServiceCallRelationIndicator.Builder.class)
+public class ServiceCallRelationIndicator extends Indicator {
+
+    public static final String INDEX_NAME = "service_call_relation";
+    public static final String SOURCE_SERVICE_ID = "source_service_id";
+    public static final String DEST_SERVICE_ID = "dest_service_id";
+
+    @Setter @Getter @Column(columnName = SOURCE_SERVICE_ID) @IDColumn private int sourceServiceId;
+    @Setter @Getter @Column(columnName = DEST_SERVICE_ID) @IDColumn private int destServiceId;
+
+    @Override public String id() {
+        String splitJointId = String.valueOf(getTimeBucket());
+        splitJointId += Const.ID_SPLIT + String.valueOf(sourceServiceId);
+        splitJointId += Const.ID_SPLIT + String.valueOf(destServiceId);
+        return splitJointId;
+    }
+
+    @Override public void combine(Indicator indicator) {
+
+    }
+
+    @Override public void calculate() {
+
+    }
+
+    @Override public Indicator toHour() {
+        ServiceCallRelationIndicator indicator = new ServiceCallRelationIndicator();
+        indicator.setTimeBucket(toTimeBucketInHour());
+        indicator.setSourceServiceId(getSourceServiceId());
+        indicator.setDestServiceId(getDestServiceId());
+        return indicator;
+    }
+
+    @Override public Indicator toDay() {
+        ServiceCallRelationIndicator indicator = new ServiceCallRelationIndicator();
+        indicator.setTimeBucket(toTimeBucketInDay());
+        indicator.setSourceServiceId(getSourceServiceId());
+        indicator.setDestServiceId(getDestServiceId());
+        return indicator;
+    }
+
+    @Override public Indicator toMonth() {
+        ServiceCallRelationIndicator indicator = new ServiceCallRelationIndicator();
+        indicator.setTimeBucket(toTimeBucketInMonth());
+        indicator.setSourceServiceId(getSourceServiceId());
+        indicator.setDestServiceId(getDestServiceId());
+        return indicator;
+    }
+
+    @Override public int remoteHashCode() {
+        int result = 17;
+        result = 31 * result + sourceServiceId;
+        result = 31 * result + destServiceId;
+        return result;
+    }
+
+    @Override public void deserialize(RemoteData remoteData) {
+        setSourceServiceId(remoteData.getDataIntegers(0));
+        setDestServiceId(remoteData.getDataIntegers(1));
+        setTimeBucket(remoteData.getDataLongs(0));
+    }
+
+    @Override public RemoteData.Builder serialize() {
+        RemoteData.Builder remoteBuilder = RemoteData.newBuilder();
+
+        remoteBuilder.setDataIntegers(0, getSourceServiceId());
+        remoteBuilder.setDataIntegers(1, getDestServiceId());
+        remoteBuilder.setDataLongs(0, getTimeBucket());
+
+        return remoteBuilder;
+    }
+
+    @Override public int hashCode() {
+        int result = 17;
+        result = 31 * result + sourceServiceId;
+        result = 31 * result + destServiceId;
+        result = 31 * result + (int)getTimeBucket();
+        return result;
+    }
+
+    @Override public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+        if (obj == null)
+            return false;
+        if (getClass() != obj.getClass())
+            return false;
+
+        ServiceCallRelationIndicator indicator = (ServiceCallRelationIndicator)obj;
+        if (sourceServiceId != indicator.sourceServiceId)
+            return false;
+        if (destServiceId != indicator.destServiceId)
+            return false;
+
+        if (getTimeBucket() != indicator.getTimeBucket())
+            return false;
+
+        return true;
+    }
+
+    public static class Builder implements StorageBuilder<ServiceCallRelationIndicator> {
+
+        @Override public ServiceCallRelationIndicator map2Data(Map<String, Object> dbMap) {
+            ServiceCallRelationIndicator indicator = new ServiceCallRelationIndicator();
+            indicator.setSourceServiceId(((Number)dbMap.get(SOURCE_SERVICE_ID)).intValue());
+            indicator.setDestServiceId(((Number)dbMap.get(DEST_SERVICE_ID)).intValue());
+            indicator.setTimeBucket(((Number)dbMap.get(TIME_BUCKET)).longValue());
+            return indicator;
+        }
+
+        @Override public Map<String, Object> data2Map(ServiceCallRelationIndicator storageData) {
+            Map<String, Object> map = new HashMap<>();
+            map.put(SOURCE_SERVICE_ID, storageData.getSourceServiceId());
+            map.put(DEST_SERVICE_ID, storageData.getDestServiceId());
+            map.put(TIME_BUCKET, storageData.getTimeBucket());
+            return map;
+        }
+    }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/SourceReceiverImpl.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/SourceReceiverImpl.java
index 3b30285..853c99b 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/SourceReceiverImpl.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/SourceReceiverImpl.java
@@ -32,6 +32,6 @@ public class SourceReceiverImpl implements SourceReceiver {
     }
 
     @Override public void receive(Source source) {
-        dispatcherManager.getDispatcher(source.scope()).dispatch(source);
+        dispatcherManager.forward(source);
     }
 }