You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ke...@apache.org on 2021/06/18 12:00:35 UTC

[skywalking] 01/01: OAL supports generating metrics from events

This is an automated email from the ASF dual-hosted git repository.

kezhenxu94 pushed a commit to branch feature/event/oal
in repository https://gitbox.apache.org/repos/asf/skywalking.git

commit f40fa013de83e1eef833004245dc4e393205e2d1
Author: kezhenxu94 <ke...@apache.org>
AuthorDate: Fri Jun 18 11:23:57 2021 +0800

    OAL supports generating metrics from events
---
 CHANGES.md                                         |  2 +-
 docs/en/concepts-and-designs/scope-definitions.md  | 16 ++++++++++-
 .../event/EventAnalyzerModuleProvider.java         |  9 ++++++-
 .../oap/server/analyzer/event/EventOALDefine.java} | 31 +++++++++-------------
 .../listener/EventRecordAnalyzerListener.java      | 12 +++++++--
 .../apache/skywalking/oal/rt/grammar/OALLexer.g4   |  1 +
 .../apache/skywalking/oal/rt/grammar/OALParser.g4  |  3 ++-
 .../code-templates/dispatcher/dispatch.ftl         |  4 +--
 .../code-templates/dispatcher/doMetrics.ftl        |  2 +-
 .../src/main/resources/oal/event.oal}              | 24 ++++-------------
 .../server/core/analysis/DispatcherManager.java    |  8 +++---
 .../oap/server/core/analysis/SourceDispatcher.java |  4 +--
 .../oap/server/core/{event => source}/Event.java   | 21 ++++++++++-----
 .../core/source/{Source.java => ISource.java}      | 18 +++++--------
 .../skywalking/oap/server/core/source/Source.java  | 12 +--------
 .../oap/server/core/source/SourceReceiver.java     |  2 +-
 .../oap/server/core/source/SourceReceiverImpl.java |  2 +-
 .../handler/ServiceManagementHandlerTest.java      | 10 +++----
 .../kafka/provider/handler/SourceReceiverRule.java |  9 ++++---
 .../provider/parser/listener/MockReceiver.java     |  6 ++---
 .../listener/MultiScopesAnalysisListenerTest.java  | 16 +++++------
 .../elasticsearch/query/ESEventQueryDAO.java       |  2 +-
 .../elasticsearch7/query/ES7EventQueryDAO.java     |  2 +-
 .../plugin/influxdb/query/EventQueryDAO.java       |  2 +-
 .../plugin/jdbc/h2/dao/H2EventQueryDAO.java        |  2 +-
 .../tool/profile/core/mock/MockSourceReceiver.java |  4 +--
 26 files changed, 115 insertions(+), 109 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 0273b96..bca04eb 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -34,7 +34,7 @@ Release Notes.
 * Upgrade zookeeper caused by CVE-2019-0201. 
 * Upgrade snake yaml caused by CVE-2017-18640.
 * Upgrade embed tomcat caused by CVE-2020-13935.
-
+* OAL supports generating metrics from events.
 
 #### UI
 
diff --git a/docs/en/concepts-and-designs/scope-definitions.md b/docs/en/concepts-and-designs/scope-definitions.md
index 0bdb725..9be43de 100644
--- a/docs/en/concepts-and-designs/scope-definitions.md
+++ b/docs/en/concepts-and-designs/scope-definitions.md
@@ -181,7 +181,7 @@ This calculates the metrics data from each request of the page in the browser ap
 
 ### SCOPE `BrowserAppPagePerf`
 
-This calculates the metrics data form each request of the page in the browser application (browser only).
+This calculates the metrics data from each request of the page in the browser application (browser only).
 
 | Name | Remarks | Group Key | Type | 
 |---|---|---|---|
@@ -201,3 +201,17 @@ This calculates the metrics data form each request of the page in the browser ap
 | ttlTime | Time to interact. | | int(in ms) |
 | firstPackTime | First pack time. | | int(in ms) |
 | fmpTime | First Meaningful Paint. | | int(in ms) |
+
+### SCOPE `Event`
+
+This calculates the metrics data from [events](event.md).
+
+| Name | Remarks | Group Key | Type | 
+|---|---|---|---|
+| name | The name of the event. |  | string |
+| service | The service name to which the event belongs to. | | string |
+| serviceInstance | The service instance to which the event belongs to, if any. | | string|
+| endpoint | The service endpoint to which the event belongs to, if any. | | string|
+| type | The type of the event, `Normal` or `Error`. | | string|
+| message | The message of the event. | | string |
+| parameters | The parameters in the `message`, see [parameters](event.md#parameters). | | string |
diff --git a/oap-server/analyzer/event-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/event/EventAnalyzerModuleProvider.java b/oap-server/analyzer/event-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/event/EventAnalyzerModuleProvider.java
index d29e4cb..16b7a27 100644
--- a/oap-server/analyzer/event-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/event/EventAnalyzerModuleProvider.java
+++ b/oap-server/analyzer/event-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/event/EventAnalyzerModuleProvider.java
@@ -20,9 +20,11 @@ package org.apache.skywalking.oap.server.analyzer.event;
 
 import org.apache.skywalking.oap.server.analyzer.event.listener.EventRecordAnalyzerListener;
 import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.oal.rt.OALEngineLoaderService;
 import org.apache.skywalking.oap.server.library.module.ModuleConfig;
 import org.apache.skywalking.oap.server.library.module.ModuleDefine;
 import org.apache.skywalking.oap.server.library.module.ModuleProvider;
+import org.apache.skywalking.oap.server.library.module.ModuleStartException;
 import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
 
 public class EventAnalyzerModuleProvider extends ModuleProvider {
@@ -51,7 +53,12 @@ public class EventAnalyzerModuleProvider extends ModuleProvider {
     }
 
     @Override
-    public void start() {
+    public void start() throws ModuleStartException {
+        getManager().find(CoreModule.NAME)
+                    .provider()
+                    .getService(OALEngineLoaderService.class)
+                    .load(EventOALDefine.INSTANCE);
+
         analysisService.add(new EventRecordAnalyzerListener.Factory(getManager()));
     }
 
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Source.java b/oap-server/analyzer/event-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/event/EventOALDefine.java
similarity index 60%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Source.java
copy to oap-server/analyzer/event-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/event/EventOALDefine.java
index 25d01cd..8aa2ef7 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Source.java
+++ b/oap-server/analyzer/event-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/event/EventOALDefine.java
@@ -6,34 +6,29 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
- *
  */
 
-package org.apache.skywalking.oap.server.core.source;
-
-import lombok.Getter;
-import lombok.Setter;
-
-public abstract class Source {
-    public abstract int scope();
+package org.apache.skywalking.oap.server.analyzer.event;
 
-    @Getter
-    @Setter
-    private long timeBucket;
+import org.apache.skywalking.oap.server.core.oal.rt.OALDefine;
 
-    public abstract String getEntityId();
-
-    /**
-     * Internal data field preparation before {@link org.apache.skywalking.oap.server.core.analysis.SourceDispatcher#dispatch(Source)}
-     */
-    public void prepare() {
+/**
+ * OAL rules to calculate Event-specific metrics.
+ */
+public class EventOALDefine extends OALDefine {
+    public static final EventOALDefine INSTANCE = new EventOALDefine();
 
+    private EventOALDefine() {
+        super(
+            "oal/event.oal",
+            "org.apache.skywalking.oap.server.core.source"
+        );
     }
 }
diff --git a/oap-server/analyzer/event-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/event/listener/EventRecordAnalyzerListener.java b/oap-server/analyzer/event-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/event/listener/EventRecordAnalyzerListener.java
index 73eba62..b8c0d18 100644
--- a/oap-server/analyzer/event-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/event/listener/EventRecordAnalyzerListener.java
+++ b/oap-server/analyzer/event-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/event/listener/EventRecordAnalyzerListener.java
@@ -25,7 +25,8 @@ import org.apache.skywalking.oap.server.core.CoreModule;
 import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
 import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
 import org.apache.skywalking.oap.server.core.config.NamingControl;
-import org.apache.skywalking.oap.server.core.event.Event;
+import org.apache.skywalking.oap.server.core.source.Event;
+import org.apache.skywalking.oap.server.core.source.SourceReceiver;
 import org.apache.skywalking.oap.server.library.module.ModuleManager;
 
 /**
@@ -37,11 +38,14 @@ public class EventRecordAnalyzerListener implements EventAnalyzerListener {
 
     private final NamingControl namingControl;
 
+    private final SourceReceiver sourceReceiver;
+
     private final Event event = new Event();
 
     @Override
     public void build() {
         MetricsStreamProcessor.getInstance().in(event);
+        sourceReceiver.receive(event);
     }
 
     @Override
@@ -72,16 +76,20 @@ public class EventRecordAnalyzerListener implements EventAnalyzerListener {
 
     public static class Factory implements EventAnalyzerListener.Factory {
         private final NamingControl namingControl;
+        private final SourceReceiver sourceReceiver;
 
         public Factory(final ModuleManager moduleManager) {
             this.namingControl = moduleManager.find(CoreModule.NAME)
                                               .provider()
                                               .getService(NamingControl.class);
+            this.sourceReceiver = moduleManager.find(CoreModule.NAME)
+                                               .provider()
+                                               .getService(SourceReceiver.class);
         }
 
         @Override
         public EventAnalyzerListener create(final ModuleManager moduleManager) {
-            return new EventRecordAnalyzerListener(namingControl);
+            return new EventRecordAnalyzerListener(namingControl, sourceReceiver);
         }
     }
 }
diff --git a/oap-server/oal-grammar/src/main/antlr4/org/apache/skywalking/oal/rt/grammar/OALLexer.g4 b/oap-server/oal-grammar/src/main/antlr4/org/apache/skywalking/oal/rt/grammar/OALLexer.g4
index 86830f2..5286760 100644
--- a/oap-server/oal-grammar/src/main/antlr4/org/apache/skywalking/oal/rt/grammar/OALLexer.g4
+++ b/oap-server/oal-grammar/src/main/antlr4/org/apache/skywalking/oal/rt/grammar/OALLexer.g4
@@ -39,6 +39,7 @@ SRC_SERVICE_INSTANCE_CLR_CPU: 'ServiceInstanceCLRCPU';
 SRC_SERVICE_INSTANCE_CLR_GC: 'ServiceInstanceCLRGC';
 SRC_SERVICE_INSTANCE_CLR_THREAD: 'ServiceInstanceCLRThread';
 SRC_ENVOY_INSTANCE_METRIC: 'EnvoyInstanceMetric';
+SRC_EVENT: 'Event';
 
 // Browser keywords
 SRC_BROWSER_APP_PERF: 'BrowserAppPerf';
diff --git a/oap-server/oal-grammar/src/main/antlr4/org/apache/skywalking/oal/rt/grammar/OALParser.g4 b/oap-server/oal-grammar/src/main/antlr4/org/apache/skywalking/oal/rt/grammar/OALParser.g4
index 08f4008..f8eef42 100644
--- a/oap-server/oal-grammar/src/main/antlr4/org/apache/skywalking/oal/rt/grammar/OALParser.g4
+++ b/oap-server/oal-grammar/src/main/antlr4/org/apache/skywalking/oal/rt/grammar/OALParser.g4
@@ -55,7 +55,8 @@ source
       SRC_SERVICE_INSTANCE_CLR_CPU | SRC_SERVICE_INSTANCE_CLR_GC | SRC_SERVICE_INSTANCE_CLR_THREAD |
       SRC_ENVOY_INSTANCE_METRIC |
       SRC_BROWSER_APP_PERF | SRC_BROWSER_APP_PAGE_PERF | SRC_BROWSER_APP_SINGLE_VERSION_PERF |
-      SRC_BROWSER_APP_TRAFFIC | SRC_BROWSER_APP_PAGE_TRAFFIC | SRC_BROWSER_APP_SINGLE_VERSION_TRAFFIC
+      SRC_BROWSER_APP_TRAFFIC | SRC_BROWSER_APP_PAGE_TRAFFIC | SRC_BROWSER_APP_SINGLE_VERSION_TRAFFIC |
+      SRC_EVENT
     ;
 
 disableSource
diff --git a/oap-server/oal-rt/src/main/resources/code-templates/dispatcher/dispatch.ftl b/oap-server/oal-rt/src/main/resources/code-templates/dispatcher/dispatch.ftl
index 4503eda..e9d16cb 100644
--- a/oap-server/oal-rt/src/main/resources/code-templates/dispatcher/dispatch.ftl
+++ b/oap-server/oal-rt/src/main/resources/code-templates/dispatcher/dispatch.ftl
@@ -1,6 +1,6 @@
-public void dispatch(org.apache.skywalking.oap.server.core.source.Source source) {
+public void dispatch(org.apache.skywalking.oap.server.core.source.ISource source) {
 ${sourcePackage}${source} _source = (${sourcePackage}${source})source;
 <#list metrics as metrics>
     do${metrics.metricsName}(_source);
 </#list>
-}
\ No newline at end of file
+}
diff --git a/oap-server/oal-rt/src/main/resources/code-templates/dispatcher/doMetrics.ftl b/oap-server/oal-rt/src/main/resources/code-templates/dispatcher/doMetrics.ftl
index 45ee4c4..6312960 100644
--- a/oap-server/oal-rt/src/main/resources/code-templates/dispatcher/doMetrics.ftl
+++ b/oap-server/oal-rt/src/main/resources/code-templates/dispatcher/doMetrics.ftl
@@ -1,5 +1,4 @@
 private void do${metricsName}(${sourcePackage}${sourceName} source) {
-${metricsClassPackage}${metricsName}Metrics metrics = new ${metricsClassPackage}${metricsName}Metrics();
 
 <#if filterExpressions??>
     <#list filterExpressions as filterExpression>
@@ -9,6 +8,7 @@ ${metricsClassPackage}${metricsName}Metrics metrics = new ${metricsClassPackage}
     </#list>
 </#if>
 
+${metricsClassPackage}${metricsName}Metrics metrics = new ${metricsClassPackage}${metricsName}Metrics();
 metrics.setTimeBucket(source.getTimeBucket());
 <#list fieldsFromSource as field>
     metrics.${field.fieldSetter}(source.${field.fieldGetter}());
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Source.java b/oap-server/server-bootstrap/src/main/resources/oal/event.oal
old mode 100644
new mode 100755
similarity index 64%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Source.java
copy to oap-server/server-bootstrap/src/main/resources/oal/event.oal
index 25d01cd..35459b5
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Source.java
+++ b/oap-server/server-bootstrap/src/main/resources/oal/event.oal
@@ -16,24 +16,10 @@
  *
  */
 
-package org.apache.skywalking.oap.server.core.source;
+event_total = from(Event.*).count();
 
-import lombok.Getter;
-import lombok.Setter;
+event_error_count = from(Event.*).filter(type == "Error").count();
+event_normal_count = from(Event.*).filter(type == "Normal").count();
 
-public abstract class Source {
-    public abstract int scope();
-
-    @Getter
-    @Setter
-    private long timeBucket;
-
-    public abstract String getEntityId();
-
-    /**
-     * Internal data field preparation before {@link org.apache.skywalking.oap.server.core.analysis.SourceDispatcher#dispatch(Source)}
-     */
-    public void prepare() {
-
-    }
-}
+event_start_count = from(Event.*).filter(name == "Start").count();
+event_shutdown_count = from(Event.*).filter(name == "Shutdown").count();
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java
index 9f3f7ee..890d1a4 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java
@@ -29,7 +29,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.skywalking.oap.server.core.UnexpectedException;
-import org.apache.skywalking.oap.server.core.source.Source;
+import org.apache.skywalking.oap.server.core.source.ISource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,7 +43,7 @@ public class DispatcherManager implements DispatcherDetectorListener {
         this.dispatcherMap = new HashMap<>();
     }
 
-    public void forward(Source source) {
+    public void forward(ISource source) {
         if (source == null) {
             return;
         }
@@ -96,12 +96,12 @@ public class DispatcherManager implements DispatcherDetectorListener {
 
                     Object source = ((Class) argument).newInstance();
 
-                    if (!Source.class.isAssignableFrom(source.getClass())) {
+                    if (!ISource.class.isAssignableFrom(source.getClass())) {
                         throw new UnexpectedException(
                             "unexpected type argument of class " + aClass.getName() + ", should be `org.apache.skywalking.oap.server.core.source.Source`. ");
                     }
 
-                    Source dispatcherSource = (Source) source;
+                    ISource dispatcherSource = (ISource) source;
                     SourceDispatcher dispatcher = (SourceDispatcher) aClass.newInstance();
 
                     int scopeId = dispatcherSource.scope();
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/SourceDispatcher.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/SourceDispatcher.java
index 8f52e85..cba0455 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/SourceDispatcher.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/SourceDispatcher.java
@@ -18,7 +18,7 @@
 
 package org.apache.skywalking.oap.server.core.analysis;
 
-import org.apache.skywalking.oap.server.core.source.Source;
+import org.apache.skywalking.oap.server.core.source.ISource;
 
 /**
  * SourceDispatcher implementation processes different types of the source. There are two kinds of the source
@@ -29,6 +29,6 @@ import org.apache.skywalking.oap.server.core.source.Source;
  *
  * @param <SOURCE> the data type of this dispatcher processes.
  */
-public interface SourceDispatcher<SOURCE extends Source> {
+public interface SourceDispatcher<SOURCE extends ISource> {
     void dispatch(SOURCE source);
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/event/Event.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Event.java
similarity index 95%
rename from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/event/Event.java
rename to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Event.java
index 37269be..9a5b241 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/event/Event.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Event.java
@@ -16,7 +16,7 @@
  *
  */
 
-package org.apache.skywalking.oap.server.core.event;
+package org.apache.skywalking.oap.server.core.source;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -34,8 +34,6 @@ import org.apache.skywalking.oap.server.core.analysis.metrics.MetricsMetaInfo;
 import org.apache.skywalking.oap.server.core.analysis.metrics.WithMetadata;
 import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
 import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
-import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
-import org.apache.skywalking.oap.server.core.source.ScopeDeclaration;
 import org.apache.skywalking.oap.server.core.storage.StorageHashMapBuilder;
 import org.apache.skywalking.oap.server.core.storage.annotation.Column;
 import org.elasticsearch.common.Strings;
@@ -51,7 +49,7 @@ import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.EV
     of = "uuid"
 )
 @MetricsExtension(supportDownSampling = false, supportUpdate = true)
-public class Event extends Metrics implements WithMetadata, LongValueHolder {
+public class Event extends Metrics implements ISource, WithMetadata, LongValueHolder {
 
     public static final String INDEX_NAME = "events";
 
@@ -206,16 +204,25 @@ public class Event extends Metrics implements WithMetadata, LongValueHolder {
     @Override
     public MetricsMetaInfo getMeta() {
         int scope = DefaultScopeDefine.SERVICE;
+        String id = getEntityId();
+        return new MetricsMetaInfo(getName(), scope, id);
+    }
+
+    @Override
+    public int scope() {
+        return EVENT;
+    }
+
+    @Override
+    public String getEntityId() {
         final String serviceId = IDManager.ServiceID.buildId(getService(), true);
         String id = serviceId;
         if (!Strings.isNullOrEmpty(getServiceInstance())) {
-            scope = DefaultScopeDefine.SERVICE_INSTANCE;
             id = IDManager.ServiceInstanceID.buildId(serviceId, getServiceInstance());
         } else if (!Strings.isNullOrEmpty(getEndpoint())) {
-            scope = DefaultScopeDefine.ENDPOINT;
             id = IDManager.EndpointID.buildId(serviceId, getEndpoint());
         }
-        return new MetricsMetaInfo(getName(), scope, id);
+        return id;
     }
 
     public static class Builder implements StorageHashMapBuilder<Event> {
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Source.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/ISource.java
similarity index 75%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Source.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/ISource.java
index 25d01cd..479212d 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Source.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/ISource.java
@@ -18,22 +18,18 @@
 
 package org.apache.skywalking.oap.server.core.source;
 
-import lombok.Getter;
-import lombok.Setter;
+public interface ISource {
+    int scope();
 
-public abstract class Source {
-    public abstract int scope();
+    long getTimeBucket();
 
-    @Getter
-    @Setter
-    private long timeBucket;
+    void setTimeBucket(long timeBucket);
 
-    public abstract String getEntityId();
+    String getEntityId();
 
     /**
-     * Internal data field preparation before {@link org.apache.skywalking.oap.server.core.analysis.SourceDispatcher#dispatch(Source)}
+     * Internal data field preparation before {@link org.apache.skywalking.oap.server.core.analysis.SourceDispatcher#dispatch(ISource)}
      */
-    public void prepare() {
-
+    default void prepare() {
     }
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Source.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Source.java
index 25d01cd..5c99722 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Source.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Source.java
@@ -21,19 +21,9 @@ package org.apache.skywalking.oap.server.core.source;
 import lombok.Getter;
 import lombok.Setter;
 
-public abstract class Source {
-    public abstract int scope();
+public abstract class Source implements ISource {
 
     @Getter
     @Setter
     private long timeBucket;
-
-    public abstract String getEntityId();
-
-    /**
-     * Internal data field preparation before {@link org.apache.skywalking.oap.server.core.analysis.SourceDispatcher#dispatch(Source)}
-     */
-    public void prepare() {
-
-    }
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/SourceReceiver.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/SourceReceiver.java
index c7a05a1..66cef4f 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/SourceReceiver.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/SourceReceiver.java
@@ -26,7 +26,7 @@ import org.apache.skywalking.oap.server.library.module.Service;
  * in order to forward source to the suitable real {@link org.apache.skywalking.oap.server.core.analysis.SourceDispatcher}.
  */
 public interface SourceReceiver extends Service {
-    void receive(Source source);
+    void receive(ISource source);
 
     DispatcherDetectorListener getDispatcherDetectorListener();
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/SourceReceiverImpl.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/SourceReceiverImpl.java
index ae4564d..1e01da8 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/SourceReceiverImpl.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/SourceReceiverImpl.java
@@ -32,7 +32,7 @@ public class SourceReceiverImpl implements SourceReceiver {
     }
 
     @Override
-    public void receive(Source source) {
+    public void receive(ISource source) {
         dispatcherManager.forward(source);
     }
 
diff --git a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/test/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/ServiceManagementHandlerTest.java b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/test/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/ServiceManagementHandlerTest.java
index c23b004..ade713f 100644
--- a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/test/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/ServiceManagementHandlerTest.java
+++ b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/test/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/ServiceManagementHandlerTest.java
@@ -23,16 +23,16 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.skywalking.apm.network.management.v3.InstancePingPkg;
 import org.apache.skywalking.apm.network.management.v3.InstanceProperties;
+import org.apache.skywalking.oap.server.analyzer.agent.kafka.mock.MockModuleManager;
+import org.apache.skywalking.oap.server.analyzer.agent.kafka.mock.MockModuleProvider;
+import org.apache.skywalking.oap.server.analyzer.agent.kafka.module.KafkaFetcherConfig;
 import org.apache.skywalking.oap.server.core.CoreModule;
 import org.apache.skywalking.oap.server.core.config.NamingControl;
 import org.apache.skywalking.oap.server.core.config.group.EndpointNameGrouping;
+import org.apache.skywalking.oap.server.core.source.ISource;
 import org.apache.skywalking.oap.server.core.source.ServiceInstanceUpdate;
-import org.apache.skywalking.oap.server.core.source.Source;
 import org.apache.skywalking.oap.server.core.source.SourceReceiver;
 import org.apache.skywalking.oap.server.library.module.ModuleManager;
-import org.apache.skywalking.oap.server.analyzer.agent.kafka.mock.MockModuleManager;
-import org.apache.skywalking.oap.server.analyzer.agent.kafka.mock.MockModuleProvider;
-import org.apache.skywalking.oap.server.analyzer.agent.kafka.module.KafkaFetcherConfig;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.ClassRule;
@@ -52,7 +52,7 @@ public class ServiceManagementHandlerTest {
     public static SourceReceiverRule SOURCE_RECEIVER = new SourceReceiverRule() {
 
         @Override
-        protected void verify(final List<Source> sourceList) throws Throwable {
+        protected void verify(final List<ISource> sourceList) throws Throwable {
             ServiceInstanceUpdate instanceUpdate = (ServiceInstanceUpdate) sourceList.get(0);
             Assert.assertEquals(instanceUpdate.getName(), SERVICE_INSTANCE);
 
diff --git a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/test/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/SourceReceiverRule.java b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/test/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/SourceReceiverRule.java
index aa80b09..9be40b9 100644
--- a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/test/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/SourceReceiverRule.java
+++ b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/test/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/SourceReceiverRule.java
@@ -21,15 +21,15 @@ package org.apache.skywalking.oap.server.analyzer.agent.kafka.provider.handler;
 import com.google.common.collect.Lists;
 import java.util.List;
 import org.apache.skywalking.oap.server.core.analysis.DispatcherDetectorListener;
-import org.apache.skywalking.oap.server.core.source.Source;
+import org.apache.skywalking.oap.server.core.source.ISource;
 import org.apache.skywalking.oap.server.core.source.SourceReceiver;
 import org.junit.rules.Verifier;
 
 public abstract class SourceReceiverRule extends Verifier implements SourceReceiver {
-    private final List<Source> sourceList = Lists.newArrayList();
+    private final List<ISource> sourceList = Lists.newArrayList();
 
     @Override
-    public void receive(final Source source) {
+    public void receive(final ISource source) {
         sourceList.add(source);
     }
 
@@ -38,10 +38,11 @@ public abstract class SourceReceiverRule extends Verifier implements SourceRecei
         return null;
     }
 
+    @Override
     protected void verify() throws Throwable {
         verify(sourceList);
     }
 
-    protected abstract void verify(List<Source> sourceList) throws Throwable;
+    protected abstract void verify(List<ISource> sourceList) throws Throwable;
 
 }
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/MockReceiver.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/MockReceiver.java
index 9443744..3ba6a36 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/MockReceiver.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/MockReceiver.java
@@ -22,7 +22,7 @@ import java.util.ArrayList;
 import java.util.List;
 import lombok.Getter;
 import org.apache.skywalking.oap.server.core.analysis.DispatcherDetectorListener;
-import org.apache.skywalking.oap.server.core.source.Source;
+import org.apache.skywalking.oap.server.core.source.ISource;
 import org.apache.skywalking.oap.server.core.source.SourceReceiver;
 
 /**
@@ -30,10 +30,10 @@ import org.apache.skywalking.oap.server.core.source.SourceReceiver;
  */
 public class MockReceiver implements SourceReceiver {
     @Getter
-    private List<Source> receivedSources = new ArrayList<>();
+    private List<ISource> receivedSources = new ArrayList<>();
 
     @Override
-    public void receive(final Source source) {
+    public void receive(final ISource source) {
         receivedSources.add(source);
     }
 
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/MultiScopesAnalysisListenerTest.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/MultiScopesAnalysisListenerTest.java
index 0b443eb..5d37e70 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/MultiScopesAnalysisListenerTest.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/MultiScopesAnalysisListenerTest.java
@@ -43,12 +43,12 @@ import org.apache.skywalking.oap.server.core.source.All;
 import org.apache.skywalking.oap.server.core.source.DatabaseAccess;
 import org.apache.skywalking.oap.server.core.source.Endpoint;
 import org.apache.skywalking.oap.server.core.source.EndpointRelation;
+import org.apache.skywalking.oap.server.core.source.ISource;
 import org.apache.skywalking.oap.server.core.source.Service;
 import org.apache.skywalking.oap.server.core.source.ServiceInstance;
 import org.apache.skywalking.oap.server.core.source.ServiceInstanceRelation;
 import org.apache.skywalking.oap.server.core.source.ServiceMeta;
 import org.apache.skywalking.oap.server.core.source.ServiceRelation;
-import org.apache.skywalking.oap.server.core.source.Source;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -147,7 +147,7 @@ public class MultiScopesAnalysisListenerTest {
         listener.parseEntry(spanObject, segment);
         listener.build();
 
-        final List<Source> receivedSources = mockReceiver.getReceivedSources();
+        final List<ISource> receivedSources = mockReceiver.getReceivedSources();
         Assert.assertEquals(7, receivedSources.size());
         final All all = (All) receivedSources.get(0);
         final Service service = (Service) receivedSources.get(1);
@@ -212,7 +212,7 @@ public class MultiScopesAnalysisListenerTest {
         listener.parseEntry(spanObject, segment);
         listener.build();
 
-        final List<Source> receivedSources = mockReceiver.getReceivedSources();
+        final List<ISource> receivedSources = mockReceiver.getReceivedSources();
         Assert.assertEquals(7, receivedSources.size());
         final All all = (All) receivedSources.get(0);
         final Service service = (Service) receivedSources.get(1);
@@ -276,7 +276,7 @@ public class MultiScopesAnalysisListenerTest {
         listener.parseEntry(spanObject, segment);
         listener.build();
 
-        final List<Source> receivedSources = mockReceiver.getReceivedSources();
+        final List<ISource> receivedSources = mockReceiver.getReceivedSources();
         Assert.assertEquals(7, receivedSources.size());
         final All all = (All) receivedSources.get(0);
         final Service service = (Service) receivedSources.get(1);
@@ -332,7 +332,7 @@ public class MultiScopesAnalysisListenerTest {
         listener.parseLocal(spanObject, segment);
         listener.build();
 
-        final List<Source> receivedSources = mockReceiver.getReceivedSources();
+        final List<ISource> receivedSources = mockReceiver.getReceivedSources();
         Assert.assertEquals(1, receivedSources.size());
         final Endpoint source = (Endpoint) receivedSources.get(0);
         Assert.assertEquals("/logic-call", source.getName());
@@ -377,7 +377,7 @@ public class MultiScopesAnalysisListenerTest {
         listener.parseLocal(spanObject, segment);
         listener.build();
 
-        final List<Source> receivedSources = mockReceiver.getReceivedSources();
+        final List<ISource> receivedSources = mockReceiver.getReceivedSources();
         Assert.assertEquals(1, receivedSources.size());
         final Endpoint source = (Endpoint) receivedSources.get(0);
         Assert.assertEquals("/GraphQL-service", source.getName());
@@ -416,7 +416,7 @@ public class MultiScopesAnalysisListenerTest {
         listener.parseExit(spanObject, segment);
         listener.build();
 
-        final List<Source> receivedSources = mockReceiver.getReceivedSources();
+        final List<ISource> receivedSources = mockReceiver.getReceivedSources();
         Assert.assertEquals(4, receivedSources.size());
         final ServiceRelation serviceRelation = (ServiceRelation) receivedSources.get(0);
         final ServiceInstanceRelation serviceInstanceRelation = (ServiceInstanceRelation) receivedSources.get(1);
@@ -462,7 +462,7 @@ public class MultiScopesAnalysisListenerTest {
         listener.parseExit(spanObject, segment);
         listener.build();
 
-        final List<Source> receivedSources = mockReceiver.getReceivedSources();
+        final List<ISource> receivedSources = mockReceiver.getReceivedSources();
         Assert.assertEquals(2, receivedSources.size());
         final ServiceRelation serviceRelation = (ServiceRelation) receivedSources.get(0);
         final ServiceInstanceRelation serviceInstanceRelation = (ServiceInstanceRelation) receivedSources.get(1);
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/ESEventQueryDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/ESEventQueryDAO.java
index 749037b..7aee3e7 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/ESEventQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/ESEventQueryDAO.java
@@ -23,7 +23,7 @@ import java.util.List;
 import java.util.Objects;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
-import org.apache.skywalking.oap.server.core.event.Event;
+import org.apache.skywalking.oap.server.core.source.Event;
 import org.apache.skywalking.oap.server.core.query.enumeration.Order;
 import org.apache.skywalking.oap.server.core.query.input.Duration;
 import org.apache.skywalking.oap.server.core.query.type.event.EventQueryCondition;
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/query/ES7EventQueryDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/query/ES7EventQueryDAO.java
index 2ddfd7a..c0df97b 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/query/ES7EventQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/query/ES7EventQueryDAO.java
@@ -22,7 +22,7 @@ import java.io.IOException;
 import java.util.List;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
-import org.apache.skywalking.oap.server.core.event.Event;
+import org.apache.skywalking.oap.server.core.source.Event;
 import org.apache.skywalking.oap.server.core.query.type.event.EventQueryCondition;
 import org.apache.skywalking.oap.server.core.query.type.event.Events;
 import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
diff --git a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/EventQueryDAO.java b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/EventQueryDAO.java
index ab36ca2..1d36892 100644
--- a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/EventQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/EventQueryDAO.java
@@ -26,7 +26,7 @@ import java.util.Map;
 
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.skywalking.oap.server.core.event.Event;
+import org.apache.skywalking.oap.server.core.source.Event;
 import org.apache.skywalking.oap.server.core.query.enumeration.Order;
 import org.apache.skywalking.oap.server.core.query.input.Duration;
 import org.apache.skywalking.oap.server.core.query.type.event.EventQueryCondition;
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2EventQueryDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2EventQueryDAO.java
index 43d8f95..c29dfc9 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2EventQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2EventQueryDAO.java
@@ -28,7 +28,7 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.skywalking.oap.server.core.event.Event;
+import org.apache.skywalking.oap.server.core.source.Event;
 import org.apache.skywalking.oap.server.core.query.input.Duration;
 import org.apache.skywalking.oap.server.core.query.type.event.EventQueryCondition;
 import org.apache.skywalking.oap.server.core.query.type.event.EventType;
diff --git a/oap-server/server-tools/profile-exporter/tool-profile-snapshot-server-mock/src/main/java/org/apache/skywalking/oap/server/tool/profile/core/mock/MockSourceReceiver.java b/oap-server/server-tools/profile-exporter/tool-profile-snapshot-server-mock/src/main/java/org/apache/skywalking/oap/server/tool/profile/core/mock/MockSourceReceiver.java
index e0f653d..3da59d7 100644
--- a/oap-server/server-tools/profile-exporter/tool-profile-snapshot-server-mock/src/main/java/org/apache/skywalking/oap/server/tool/profile/core/mock/MockSourceReceiver.java
+++ b/oap-server/server-tools/profile-exporter/tool-profile-snapshot-server-mock/src/main/java/org/apache/skywalking/oap/server/tool/profile/core/mock/MockSourceReceiver.java
@@ -19,7 +19,7 @@
 package org.apache.skywalking.oap.server.tool.profile.core.mock;
 
 import org.apache.skywalking.oap.server.core.analysis.DispatcherDetectorListener;
-import org.apache.skywalking.oap.server.core.source.Source;
+import org.apache.skywalking.oap.server.core.source.ISource;
 import org.apache.skywalking.oap.server.core.source.SourceReceiver;
 
 /**
@@ -27,7 +27,7 @@ import org.apache.skywalking.oap.server.core.source.SourceReceiver;
  */
 public class MockSourceReceiver implements SourceReceiver {
     @Override
-    public void receive(Source source) {
+    public void receive(ISource source) {
     }
 
     @Override