You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@skywalking.apache.org by wu...@apache.org on 2018/09/13 06:53:25 UTC
[incubator-skywalking] branch new-indicator-core updated: Add
service call as manual indicator and refactor the core DIspatcherManager
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch new-indicator-core
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git
The following commit(s) were added to refs/heads/new-indicator-core by this push:
new 9859635 Add service call as manual indicator and refactor the core DIspatcherManager
9859635 is described below
commit 9859635a26c77061f4649b0512046b9d88c57fca
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Thu Sep 13 14:53:19 2018 +0800
Add service call as manual indicator and refactor the core DIspatcherManager
---
.../server/core/analysis/DispatcherManager.java | 36 ++---
.../ServiceCallRelationDispatcher.java | 41 ++++++
.../ServiceCallRelationIndicator.java | 155 +++++++++++++++++++++
.../oap/server/core/source/SourceReceiverImpl.java | 2 +-
4 files changed, 217 insertions(+), 17 deletions(-)
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java
index cd6c508..7a44a31 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java
@@ -31,7 +31,9 @@ import org.apache.skywalking.oap.server.core.analysis.generated.serviceinstancej
import org.apache.skywalking.oap.server.core.analysis.generated.serviceinstancerelation.ServiceInstanceRelationDispatcher;
import org.apache.skywalking.oap.server.core.analysis.generated.servicerelation.ServiceRelationDispatcher;
import org.apache.skywalking.oap.server.core.analysis.manual.service.*;
+import org.apache.skywalking.oap.server.core.analysis.manual.servicerelation.ServiceCallRelationDispatcher;
import org.apache.skywalking.oap.server.core.source.Scope;
+import org.apache.skywalking.oap.server.core.source.Source;
import org.slf4j.*;
/**
@@ -41,31 +43,33 @@ public class DispatcherManager {
private static final Logger logger = LoggerFactory.getLogger(DispatcherManager.class);
- private Map<Scope, SourceDispatcher> dispatcherMap;
+ private Map<Scope, SourceDispatcher[]> dispatcherMap;
public DispatcherManager() {
this.dispatcherMap = new HashMap<>();
- this.dispatcherMap.put(Scope.All, new AllDispatcher());
+ this.dispatcherMap.put(Scope.All, new SourceDispatcher[] {new AllDispatcher()});
- this.dispatcherMap.put(Scope.Service, new ServiceDispatcher());
- this.dispatcherMap.put(Scope.ServiceInstance, new ServiceInstanceDispatcher());
- this.dispatcherMap.put(Scope.Endpoint, new EndpointDispatcher());
+ this.dispatcherMap.put(Scope.Service, new SourceDispatcher[] {new ServiceDispatcher()});
+ this.dispatcherMap.put(Scope.ServiceInstance, new SourceDispatcher[] {new ServiceInstanceDispatcher()});
+ this.dispatcherMap.put(Scope.Endpoint, new SourceDispatcher[] {new EndpointDispatcher()});
- this.dispatcherMap.put(Scope.ServiceComponent, new ServiceComponentDispatcher());
- this.dispatcherMap.put(Scope.ServiceMapping, new ServiceMappingDispatcher());
+ this.dispatcherMap.put(Scope.ServiceComponent, new SourceDispatcher[] {new ServiceComponentDispatcher()});
+ this.dispatcherMap.put(Scope.ServiceMapping, new SourceDispatcher[] {new ServiceMappingDispatcher()});
- this.dispatcherMap.put(Scope.ServiceRelation, new ServiceRelationDispatcher());
- this.dispatcherMap.put(Scope.ServiceInstanceRelation, new ServiceInstanceRelationDispatcher());
- this.dispatcherMap.put(Scope.EndpointRelation, new EndpointRelationDispatcher());
+ this.dispatcherMap.put(Scope.ServiceRelation, new SourceDispatcher[] {new ServiceRelationDispatcher(), new ServiceCallRelationDispatcher()});
+ this.dispatcherMap.put(Scope.ServiceInstanceRelation, new SourceDispatcher[] {new ServiceInstanceRelationDispatcher()});
+ this.dispatcherMap.put(Scope.EndpointRelation, new SourceDispatcher[] {new EndpointRelationDispatcher()});
- this.dispatcherMap.put(Scope.ServiceInstanceJVMCPU, new ServiceInstanceJVMCPUDispatcher());
- this.dispatcherMap.put(Scope.ServiceInstanceJVMGC, new ServiceInstanceJVMGCDispatcher());
- this.dispatcherMap.put(Scope.ServiceInstanceJVMMemory, new ServiceInstanceJVMMemoryDispatcher());
- this.dispatcherMap.put(Scope.ServiceInstanceJVMMemoryPool, new ServiceInstanceJVMMemoryPoolDispatcher());
+ this.dispatcherMap.put(Scope.ServiceInstanceJVMCPU, new SourceDispatcher[] {new ServiceInstanceJVMCPUDispatcher()});
+ this.dispatcherMap.put(Scope.ServiceInstanceJVMGC, new SourceDispatcher[] {new ServiceInstanceJVMGCDispatcher()});
+ this.dispatcherMap.put(Scope.ServiceInstanceJVMMemory, new SourceDispatcher[] {new ServiceInstanceJVMMemoryDispatcher()});
+ this.dispatcherMap.put(Scope.ServiceInstanceJVMMemoryPool, new SourceDispatcher[] {new ServiceInstanceJVMMemoryPoolDispatcher()});
}
- public SourceDispatcher getDispatcher(Scope scope) {
- return dispatcherMap.get(scope);
+ public void forward(Source source) {
+ for (SourceDispatcher dispatcher : dispatcherMap.get(source.scope())) {
+ dispatcher.dispatch(source);
+ }
}
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/servicerelation/ServiceCallRelationDispatcher.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/servicerelation/ServiceCallRelationDispatcher.java
new file mode 100644
index 0000000..aad015f
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/servicerelation/ServiceCallRelationDispatcher.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.manual.servicerelation;
+
+import org.apache.skywalking.oap.server.core.analysis.SourceDispatcher;
+import org.apache.skywalking.oap.server.core.analysis.worker.IndicatorProcess;
+import org.apache.skywalking.oap.server.core.source.ServiceRelation;
+
+/**
+ * @author wusheng
+ */
+public class ServiceCallRelationDispatcher implements SourceDispatcher<ServiceRelation> {
+ @Override
+ public void dispatch(ServiceRelation source) {
+ doDispatch(source);
+ }
+
+ public void doDispatch(ServiceRelation source) {
+ ServiceCallRelationIndicator indicator = new ServiceCallRelationIndicator();
+ indicator.setTimeBucket(source.getTimeBucket());
+ indicator.setSourceServiceId(source.getSourceServiceId());
+ indicator.setDestServiceId(source.getDestServiceId());
+ IndicatorProcess.INSTANCE.in(indicator);
+ }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/servicerelation/ServiceCallRelationIndicator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/servicerelation/ServiceCallRelationIndicator.java
new file mode 100644
index 0000000..0663607
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/servicerelation/ServiceCallRelationIndicator.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.manual.servicerelation;
+
+import java.util.HashMap;
+import java.util.Map;
+import lombok.Getter;
+import lombok.Setter;
+import org.apache.skywalking.oap.server.core.Const;
+import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
+import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.IndicatorType;
+import org.apache.skywalking.oap.server.core.remote.annotation.StreamData;
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
+import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
+import org.apache.skywalking.oap.server.core.storage.annotation.Column;
+import org.apache.skywalking.oap.server.core.storage.annotation.IDColumn;
+import org.apache.skywalking.oap.server.core.storage.annotation.StorageEntity;
+
+@IndicatorType
+@StreamData
+@StorageEntity(name = ServiceCallRelationIndicator.INDEX_NAME, builder = ServiceCallRelationIndicator.Builder.class)
+public class ServiceCallRelationIndicator extends Indicator {
+
+ public static final String INDEX_NAME = "service_call_relation";
+ public static final String SOURCE_SERVICE_ID = "source_service_id";
+ public static final String DEST_SERVICE_ID = "dest_service_id";
+
+ @Setter @Getter @Column(columnName = SOURCE_SERVICE_ID) @IDColumn private int sourceServiceId;
+ @Setter @Getter @Column(columnName = DEST_SERVICE_ID) @IDColumn private int destServiceId;
+
+ @Override public String id() {
+ String splitJointId = String.valueOf(getTimeBucket());
+ splitJointId += Const.ID_SPLIT + String.valueOf(sourceServiceId);
+ splitJointId += Const.ID_SPLIT + String.valueOf(destServiceId);
+ return splitJointId;
+ }
+
+ @Override public void combine(Indicator indicator) {
+
+ }
+
+ @Override public void calculate() {
+
+ }
+
+ @Override public Indicator toHour() {
+ ServiceCallRelationIndicator indicator = new ServiceCallRelationIndicator();
+ indicator.setTimeBucket(toTimeBucketInHour());
+ indicator.setSourceServiceId(getSourceServiceId());
+ indicator.setDestServiceId(getDestServiceId());
+ return indicator;
+ }
+
+ @Override public Indicator toDay() {
+ ServiceCallRelationIndicator indicator = new ServiceCallRelationIndicator();
+ indicator.setTimeBucket(toTimeBucketInDay());
+ indicator.setSourceServiceId(getSourceServiceId());
+ indicator.setDestServiceId(getDestServiceId());
+ return indicator;
+ }
+
+ @Override public Indicator toMonth() {
+ ServiceCallRelationIndicator indicator = new ServiceCallRelationIndicator();
+ indicator.setTimeBucket(toTimeBucketInMonth());
+ indicator.setSourceServiceId(getSourceServiceId());
+ indicator.setDestServiceId(getDestServiceId());
+ return indicator;
+ }
+
+ @Override public int remoteHashCode() {
+ int result = 17;
+ result = 31 * result + sourceServiceId;
+ result = 31 * result + destServiceId;
+ return result;
+ }
+
+ @Override public void deserialize(RemoteData remoteData) {
+ setSourceServiceId(remoteData.getDataIntegers(0));
+ setDestServiceId(remoteData.getDataIntegers(1));
+ setTimeBucket(remoteData.getDataLongs(0));
+ }
+
+ @Override public RemoteData.Builder serialize() {
+ RemoteData.Builder remoteBuilder = RemoteData.newBuilder();
+
+ remoteBuilder.setDataIntegers(0, getSourceServiceId());
+ remoteBuilder.setDataIntegers(1, getDestServiceId());
+ remoteBuilder.setDataLongs(0, getTimeBucket());
+
+ return remoteBuilder;
+ }
+
+ @Override public int hashCode() {
+ int result = 17;
+ result = 31 * result + sourceServiceId;
+ result = 31 * result + destServiceId;
+ result = 31 * result + (int)getTimeBucket();
+ return result;
+ }
+
+ @Override public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+
+ ServiceCallRelationIndicator indicator = (ServiceCallRelationIndicator)obj;
+ if (sourceServiceId != indicator.sourceServiceId)
+ return false;
+ if (destServiceId != indicator.destServiceId)
+ return false;
+
+ if (getTimeBucket() != indicator.getTimeBucket())
+ return false;
+
+ return true;
+ }
+
+ public static class Builder implements StorageBuilder<ServiceCallRelationIndicator> {
+
+ @Override public ServiceCallRelationIndicator map2Data(Map<String, Object> dbMap) {
+ ServiceCallRelationIndicator indicator = new ServiceCallRelationIndicator();
+ indicator.setSourceServiceId(((Number)dbMap.get(SOURCE_SERVICE_ID)).intValue());
+ indicator.setDestServiceId(((Number)dbMap.get(DEST_SERVICE_ID)).intValue());
+ indicator.setTimeBucket(((Number)dbMap.get(TIME_BUCKET)).longValue());
+ return indicator;
+ }
+
+ @Override public Map<String, Object> data2Map(ServiceCallRelationIndicator storageData) {
+ Map<String, Object> map = new HashMap<>();
+ map.put(SOURCE_SERVICE_ID, storageData.getSourceServiceId());
+ map.put(DEST_SERVICE_ID, storageData.getDestServiceId());
+ map.put(TIME_BUCKET, storageData.getTimeBucket());
+ return map;
+ }
+ }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/SourceReceiverImpl.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/SourceReceiverImpl.java
index 3b30285..853c99b 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/SourceReceiverImpl.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/SourceReceiverImpl.java
@@ -32,6 +32,6 @@ public class SourceReceiverImpl implements SourceReceiver {
}
@Override public void receive(Source source) {
- dispatcherManager.getDispatcher(source.scope()).dispatch(source);
+ dispatcherManager.forward(source);
}
}