You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ke...@apache.org on 2021/06/18 12:00:35 UTC
[skywalking] 01/01: OAL supports generating metrics from events
This is an automated email from the ASF dual-hosted git repository.
kezhenxu94 pushed a commit to branch feature/event/oal
in repository https://gitbox.apache.org/repos/asf/skywalking.git
commit f40fa013de83e1eef833004245dc4e393205e2d1
Author: kezhenxu94 <ke...@apache.org>
AuthorDate: Fri Jun 18 11:23:57 2021 +0800
OAL supports generating metrics from events
---
CHANGES.md | 2 +-
docs/en/concepts-and-designs/scope-definitions.md | 16 ++++++++++-
.../event/EventAnalyzerModuleProvider.java | 9 ++++++-
.../oap/server/analyzer/event/EventOALDefine.java} | 31 +++++++++-------------
.../listener/EventRecordAnalyzerListener.java | 12 +++++++--
.../apache/skywalking/oal/rt/grammar/OALLexer.g4 | 1 +
.../apache/skywalking/oal/rt/grammar/OALParser.g4 | 3 ++-
.../code-templates/dispatcher/dispatch.ftl | 4 +--
.../code-templates/dispatcher/doMetrics.ftl | 2 +-
.../src/main/resources/oal/event.oal} | 24 ++++-------------
.../server/core/analysis/DispatcherManager.java | 8 +++---
.../oap/server/core/analysis/SourceDispatcher.java | 4 +--
.../oap/server/core/{event => source}/Event.java | 21 ++++++++++-----
.../core/source/{Source.java => ISource.java} | 18 +++++--------
.../skywalking/oap/server/core/source/Source.java | 12 +--------
.../oap/server/core/source/SourceReceiver.java | 2 +-
.../oap/server/core/source/SourceReceiverImpl.java | 2 +-
.../handler/ServiceManagementHandlerTest.java | 10 +++----
.../kafka/provider/handler/SourceReceiverRule.java | 9 ++++---
.../provider/parser/listener/MockReceiver.java | 6 ++---
.../listener/MultiScopesAnalysisListenerTest.java | 16 +++++------
.../elasticsearch/query/ESEventQueryDAO.java | 2 +-
.../elasticsearch7/query/ES7EventQueryDAO.java | 2 +-
.../plugin/influxdb/query/EventQueryDAO.java | 2 +-
.../plugin/jdbc/h2/dao/H2EventQueryDAO.java | 2 +-
.../tool/profile/core/mock/MockSourceReceiver.java | 4 +--
26 files changed, 115 insertions(+), 109 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 0273b96..bca04eb 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -34,7 +34,7 @@ Release Notes.
* Upgrade zookeeper caused by CVE-2019-0201.
* Upgrade snake yaml caused by CVE-2017-18640.
* Upgrade embed tomcat caused by CVE-2020-13935.
-
+* OAL supports generating metrics from events.
#### UI
diff --git a/docs/en/concepts-and-designs/scope-definitions.md b/docs/en/concepts-and-designs/scope-definitions.md
index 0bdb725..9be43de 100644
--- a/docs/en/concepts-and-designs/scope-definitions.md
+++ b/docs/en/concepts-and-designs/scope-definitions.md
@@ -181,7 +181,7 @@ This calculates the metrics data from each request of the page in the browser ap
### SCOPE `BrowserAppPagePerf`
-This calculates the metrics data form each request of the page in the browser application (browser only).
+This calculates the metrics data from each request of the page in the browser application (browser only).
| Name | Remarks | Group Key | Type |
|---|---|---|---|
@@ -201,3 +201,17 @@ This calculates the metrics data form each request of the page in the browser ap
| ttlTime | Time to interact. | | int(in ms) |
| firstPackTime | First pack time. | | int(in ms) |
| fmpTime | First Meaningful Paint. | | int(in ms) |
+
+### SCOPE `Event`
+
+This calculates the metrics data from [events](event.md).
+
+| Name | Remarks | Group Key | Type |
+|---|---|---|---|
+| name | The name of the event. | | string |
+| service | The service name to which the event belongs to. | | string |
+| serviceInstance | The service instance to which the event belongs to, if any. | | string|
+| endpoint | The service endpoint to which the event belongs to, if any. | | string|
+| type | The type of the event, `Normal` or `Error`. | | string|
+| message | The message of the event. | | string |
+| parameters | The parameters in the `message`, see [parameters](event.md#parameters). | | string |
diff --git a/oap-server/analyzer/event-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/event/EventAnalyzerModuleProvider.java b/oap-server/analyzer/event-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/event/EventAnalyzerModuleProvider.java
index d29e4cb..16b7a27 100644
--- a/oap-server/analyzer/event-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/event/EventAnalyzerModuleProvider.java
+++ b/oap-server/analyzer/event-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/event/EventAnalyzerModuleProvider.java
@@ -20,9 +20,11 @@ package org.apache.skywalking.oap.server.analyzer.event;
import org.apache.skywalking.oap.server.analyzer.event.listener.EventRecordAnalyzerListener;
import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.oal.rt.OALEngineLoaderService;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
import org.apache.skywalking.oap.server.library.module.ModuleProvider;
+import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
public class EventAnalyzerModuleProvider extends ModuleProvider {
@@ -51,7 +53,12 @@ public class EventAnalyzerModuleProvider extends ModuleProvider {
}
@Override
- public void start() {
+ public void start() throws ModuleStartException {
+ getManager().find(CoreModule.NAME)
+ .provider()
+ .getService(OALEngineLoaderService.class)
+ .load(EventOALDefine.INSTANCE);
+
analysisService.add(new EventRecordAnalyzerListener.Factory(getManager()));
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Source.java b/oap-server/analyzer/event-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/event/EventOALDefine.java
similarity index 60%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Source.java
copy to oap-server/analyzer/event-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/event/EventOALDefine.java
index 25d01cd..8aa2ef7 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Source.java
+++ b/oap-server/analyzer/event-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/event/EventOALDefine.java
@@ -6,34 +6,29 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- *
*/
-package org.apache.skywalking.oap.server.core.source;
-
-import lombok.Getter;
-import lombok.Setter;
-
-public abstract class Source {
- public abstract int scope();
+package org.apache.skywalking.oap.server.analyzer.event;
- @Getter
- @Setter
- private long timeBucket;
+import org.apache.skywalking.oap.server.core.oal.rt.OALDefine;
- public abstract String getEntityId();
-
- /**
- * Internal data field preparation before {@link org.apache.skywalking.oap.server.core.analysis.SourceDispatcher#dispatch(Source)}
- */
- public void prepare() {
+/**
+ * OAL rules to calculate Event-specific metrics.
+ */
+public class EventOALDefine extends OALDefine {
+ public static final EventOALDefine INSTANCE = new EventOALDefine();
+ private EventOALDefine() {
+ super(
+ "oal/event.oal",
+ "org.apache.skywalking.oap.server.core.source"
+ );
}
}
diff --git a/oap-server/analyzer/event-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/event/listener/EventRecordAnalyzerListener.java b/oap-server/analyzer/event-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/event/listener/EventRecordAnalyzerListener.java
index 73eba62..b8c0d18 100644
--- a/oap-server/analyzer/event-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/event/listener/EventRecordAnalyzerListener.java
+++ b/oap-server/analyzer/event-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/event/listener/EventRecordAnalyzerListener.java
@@ -25,7 +25,8 @@ import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
import org.apache.skywalking.oap.server.core.config.NamingControl;
-import org.apache.skywalking.oap.server.core.event.Event;
+import org.apache.skywalking.oap.server.core.source.Event;
+import org.apache.skywalking.oap.server.core.source.SourceReceiver;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
/**
@@ -37,11 +38,14 @@ public class EventRecordAnalyzerListener implements EventAnalyzerListener {
private final NamingControl namingControl;
+ private final SourceReceiver sourceReceiver;
+
private final Event event = new Event();
@Override
public void build() {
MetricsStreamProcessor.getInstance().in(event);
+ sourceReceiver.receive(event);
}
@Override
@@ -72,16 +76,20 @@ public class EventRecordAnalyzerListener implements EventAnalyzerListener {
public static class Factory implements EventAnalyzerListener.Factory {
private final NamingControl namingControl;
+ private final SourceReceiver sourceReceiver;
public Factory(final ModuleManager moduleManager) {
this.namingControl = moduleManager.find(CoreModule.NAME)
.provider()
.getService(NamingControl.class);
+ this.sourceReceiver = moduleManager.find(CoreModule.NAME)
+ .provider()
+ .getService(SourceReceiver.class);
}
@Override
public EventAnalyzerListener create(final ModuleManager moduleManager) {
- return new EventRecordAnalyzerListener(namingControl);
+ return new EventRecordAnalyzerListener(namingControl, sourceReceiver);
}
}
}
diff --git a/oap-server/oal-grammar/src/main/antlr4/org/apache/skywalking/oal/rt/grammar/OALLexer.g4 b/oap-server/oal-grammar/src/main/antlr4/org/apache/skywalking/oal/rt/grammar/OALLexer.g4
index 86830f2..5286760 100644
--- a/oap-server/oal-grammar/src/main/antlr4/org/apache/skywalking/oal/rt/grammar/OALLexer.g4
+++ b/oap-server/oal-grammar/src/main/antlr4/org/apache/skywalking/oal/rt/grammar/OALLexer.g4
@@ -39,6 +39,7 @@ SRC_SERVICE_INSTANCE_CLR_CPU: 'ServiceInstanceCLRCPU';
SRC_SERVICE_INSTANCE_CLR_GC: 'ServiceInstanceCLRGC';
SRC_SERVICE_INSTANCE_CLR_THREAD: 'ServiceInstanceCLRThread';
SRC_ENVOY_INSTANCE_METRIC: 'EnvoyInstanceMetric';
+SRC_EVENT: 'Event';
// Browser keywords
SRC_BROWSER_APP_PERF: 'BrowserAppPerf';
diff --git a/oap-server/oal-grammar/src/main/antlr4/org/apache/skywalking/oal/rt/grammar/OALParser.g4 b/oap-server/oal-grammar/src/main/antlr4/org/apache/skywalking/oal/rt/grammar/OALParser.g4
index 08f4008..f8eef42 100644
--- a/oap-server/oal-grammar/src/main/antlr4/org/apache/skywalking/oal/rt/grammar/OALParser.g4
+++ b/oap-server/oal-grammar/src/main/antlr4/org/apache/skywalking/oal/rt/grammar/OALParser.g4
@@ -55,7 +55,8 @@ source
SRC_SERVICE_INSTANCE_CLR_CPU | SRC_SERVICE_INSTANCE_CLR_GC | SRC_SERVICE_INSTANCE_CLR_THREAD |
SRC_ENVOY_INSTANCE_METRIC |
SRC_BROWSER_APP_PERF | SRC_BROWSER_APP_PAGE_PERF | SRC_BROWSER_APP_SINGLE_VERSION_PERF |
- SRC_BROWSER_APP_TRAFFIC | SRC_BROWSER_APP_PAGE_TRAFFIC | SRC_BROWSER_APP_SINGLE_VERSION_TRAFFIC
+ SRC_BROWSER_APP_TRAFFIC | SRC_BROWSER_APP_PAGE_TRAFFIC | SRC_BROWSER_APP_SINGLE_VERSION_TRAFFIC |
+ SRC_EVENT
;
disableSource
diff --git a/oap-server/oal-rt/src/main/resources/code-templates/dispatcher/dispatch.ftl b/oap-server/oal-rt/src/main/resources/code-templates/dispatcher/dispatch.ftl
index 4503eda..e9d16cb 100644
--- a/oap-server/oal-rt/src/main/resources/code-templates/dispatcher/dispatch.ftl
+++ b/oap-server/oal-rt/src/main/resources/code-templates/dispatcher/dispatch.ftl
@@ -1,6 +1,6 @@
-public void dispatch(org.apache.skywalking.oap.server.core.source.Source source) {
+public void dispatch(org.apache.skywalking.oap.server.core.source.ISource source) {
${sourcePackage}${source} _source = (${sourcePackage}${source})source;
<#list metrics as metrics>
do${metrics.metricsName}(_source);
</#list>
-}
\ No newline at end of file
+}
diff --git a/oap-server/oal-rt/src/main/resources/code-templates/dispatcher/doMetrics.ftl b/oap-server/oal-rt/src/main/resources/code-templates/dispatcher/doMetrics.ftl
index 45ee4c4..6312960 100644
--- a/oap-server/oal-rt/src/main/resources/code-templates/dispatcher/doMetrics.ftl
+++ b/oap-server/oal-rt/src/main/resources/code-templates/dispatcher/doMetrics.ftl
@@ -1,5 +1,4 @@
private void do${metricsName}(${sourcePackage}${sourceName} source) {
-${metricsClassPackage}${metricsName}Metrics metrics = new ${metricsClassPackage}${metricsName}Metrics();
<#if filterExpressions??>
<#list filterExpressions as filterExpression>
@@ -9,6 +8,7 @@ ${metricsClassPackage}${metricsName}Metrics metrics = new ${metricsClassPackage}
</#list>
</#if>
+${metricsClassPackage}${metricsName}Metrics metrics = new ${metricsClassPackage}${metricsName}Metrics();
metrics.setTimeBucket(source.getTimeBucket());
<#list fieldsFromSource as field>
metrics.${field.fieldSetter}(source.${field.fieldGetter}());
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Source.java b/oap-server/server-bootstrap/src/main/resources/oal/event.oal
old mode 100644
new mode 100755
similarity index 64%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Source.java
copy to oap-server/server-bootstrap/src/main/resources/oal/event.oal
index 25d01cd..35459b5
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Source.java
+++ b/oap-server/server-bootstrap/src/main/resources/oal/event.oal
@@ -16,24 +16,10 @@
*
*/
-package org.apache.skywalking.oap.server.core.source;
+event_total = from(Event.*).count();
-import lombok.Getter;
-import lombok.Setter;
+event_error_count = from(Event.*).filter(type == "Error").count();
+event_normal_count = from(Event.*).filter(type == "Normal").count();
-public abstract class Source {
- public abstract int scope();
-
- @Getter
- @Setter
- private long timeBucket;
-
- public abstract String getEntityId();
-
- /**
- * Internal data field preparation before {@link org.apache.skywalking.oap.server.core.analysis.SourceDispatcher#dispatch(Source)}
- */
- public void prepare() {
-
- }
-}
+event_start_count = from(Event.*).filter(name == "Start").count();
+event_shutdown_count = from(Event.*).filter(name == "Shutdown").count();
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java
index 9f3f7ee..890d1a4 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java
@@ -29,7 +29,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.skywalking.oap.server.core.UnexpectedException;
-import org.apache.skywalking.oap.server.core.source.Source;
+import org.apache.skywalking.oap.server.core.source.ISource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,7 +43,7 @@ public class DispatcherManager implements DispatcherDetectorListener {
this.dispatcherMap = new HashMap<>();
}
- public void forward(Source source) {
+ public void forward(ISource source) {
if (source == null) {
return;
}
@@ -96,12 +96,12 @@ public class DispatcherManager implements DispatcherDetectorListener {
Object source = ((Class) argument).newInstance();
- if (!Source.class.isAssignableFrom(source.getClass())) {
+ if (!ISource.class.isAssignableFrom(source.getClass())) {
throw new UnexpectedException(
"unexpected type argument of class " + aClass.getName() + ", should be `org.apache.skywalking.oap.server.core.source.Source`. ");
}
- Source dispatcherSource = (Source) source;
+ ISource dispatcherSource = (ISource) source;
SourceDispatcher dispatcher = (SourceDispatcher) aClass.newInstance();
int scopeId = dispatcherSource.scope();
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/SourceDispatcher.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/SourceDispatcher.java
index 8f52e85..cba0455 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/SourceDispatcher.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/SourceDispatcher.java
@@ -18,7 +18,7 @@
package org.apache.skywalking.oap.server.core.analysis;
-import org.apache.skywalking.oap.server.core.source.Source;
+import org.apache.skywalking.oap.server.core.source.ISource;
/**
* SourceDispatcher implementation processes different types of the source. There are two kinds of the source
@@ -29,6 +29,6 @@ import org.apache.skywalking.oap.server.core.source.Source;
*
* @param <SOURCE> the data type of this dispatcher processes.
*/
-public interface SourceDispatcher<SOURCE extends Source> {
+public interface SourceDispatcher<SOURCE extends ISource> {
void dispatch(SOURCE source);
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/event/Event.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Event.java
similarity index 95%
rename from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/event/Event.java
rename to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Event.java
index 37269be..9a5b241 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/event/Event.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Event.java
@@ -16,7 +16,7 @@
*
*/
-package org.apache.skywalking.oap.server.core.event;
+package org.apache.skywalking.oap.server.core.source;
import java.util.HashMap;
import java.util.Map;
@@ -34,8 +34,6 @@ import org.apache.skywalking.oap.server.core.analysis.metrics.MetricsMetaInfo;
import org.apache.skywalking.oap.server.core.analysis.metrics.WithMetadata;
import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
-import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
-import org.apache.skywalking.oap.server.core.source.ScopeDeclaration;
import org.apache.skywalking.oap.server.core.storage.StorageHashMapBuilder;
import org.apache.skywalking.oap.server.core.storage.annotation.Column;
import org.elasticsearch.common.Strings;
@@ -51,7 +49,7 @@ import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.EV
of = "uuid"
)
@MetricsExtension(supportDownSampling = false, supportUpdate = true)
-public class Event extends Metrics implements WithMetadata, LongValueHolder {
+public class Event extends Metrics implements ISource, WithMetadata, LongValueHolder {
public static final String INDEX_NAME = "events";
@@ -206,16 +204,25 @@ public class Event extends Metrics implements WithMetadata, LongValueHolder {
@Override
public MetricsMetaInfo getMeta() {
int scope = DefaultScopeDefine.SERVICE;
+ String id = getEntityId();
+ return new MetricsMetaInfo(getName(), scope, id);
+ }
+
+ @Override
+ public int scope() {
+ return EVENT;
+ }
+
+ @Override
+ public String getEntityId() {
final String serviceId = IDManager.ServiceID.buildId(getService(), true);
String id = serviceId;
if (!Strings.isNullOrEmpty(getServiceInstance())) {
- scope = DefaultScopeDefine.SERVICE_INSTANCE;
id = IDManager.ServiceInstanceID.buildId(serviceId, getServiceInstance());
} else if (!Strings.isNullOrEmpty(getEndpoint())) {
- scope = DefaultScopeDefine.ENDPOINT;
id = IDManager.EndpointID.buildId(serviceId, getEndpoint());
}
- return new MetricsMetaInfo(getName(), scope, id);
+ return id;
}
public static class Builder implements StorageHashMapBuilder<Event> {
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Source.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/ISource.java
similarity index 75%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Source.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/ISource.java
index 25d01cd..479212d 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Source.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/ISource.java
@@ -18,22 +18,18 @@
package org.apache.skywalking.oap.server.core.source;
-import lombok.Getter;
-import lombok.Setter;
+public interface ISource {
+ int scope();
-public abstract class Source {
- public abstract int scope();
+ long getTimeBucket();
- @Getter
- @Setter
- private long timeBucket;
+ void setTimeBucket(long timeBucket);
- public abstract String getEntityId();
+ String getEntityId();
/**
- * Internal data field preparation before {@link org.apache.skywalking.oap.server.core.analysis.SourceDispatcher#dispatch(Source)}
+ * Internal data field preparation before {@link org.apache.skywalking.oap.server.core.analysis.SourceDispatcher#dispatch(ISource)}
*/
- public void prepare() {
-
+ default void prepare() {
}
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Source.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Source.java
index 25d01cd..5c99722 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Source.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Source.java
@@ -21,19 +21,9 @@ package org.apache.skywalking.oap.server.core.source;
import lombok.Getter;
import lombok.Setter;
-public abstract class Source {
- public abstract int scope();
+public abstract class Source implements ISource {
@Getter
@Setter
private long timeBucket;
-
- public abstract String getEntityId();
-
- /**
- * Internal data field preparation before {@link org.apache.skywalking.oap.server.core.analysis.SourceDispatcher#dispatch(Source)}
- */
- public void prepare() {
-
- }
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/SourceReceiver.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/SourceReceiver.java
index c7a05a1..66cef4f 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/SourceReceiver.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/SourceReceiver.java
@@ -26,7 +26,7 @@ import org.apache.skywalking.oap.server.library.module.Service;
* in order to forward source to the suitable real {@link org.apache.skywalking.oap.server.core.analysis.SourceDispatcher}.
*/
public interface SourceReceiver extends Service {
- void receive(Source source);
+ void receive(ISource source);
DispatcherDetectorListener getDispatcherDetectorListener();
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/SourceReceiverImpl.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/SourceReceiverImpl.java
index ae4564d..1e01da8 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/SourceReceiverImpl.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/SourceReceiverImpl.java
@@ -32,7 +32,7 @@ public class SourceReceiverImpl implements SourceReceiver {
}
@Override
- public void receive(Source source) {
+ public void receive(ISource source) {
dispatcherManager.forward(source);
}
diff --git a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/test/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/ServiceManagementHandlerTest.java b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/test/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/ServiceManagementHandlerTest.java
index c23b004..ade713f 100644
--- a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/test/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/ServiceManagementHandlerTest.java
+++ b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/test/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/ServiceManagementHandlerTest.java
@@ -23,16 +23,16 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.utils.Bytes;
import org.apache.skywalking.apm.network.management.v3.InstancePingPkg;
import org.apache.skywalking.apm.network.management.v3.InstanceProperties;
+import org.apache.skywalking.oap.server.analyzer.agent.kafka.mock.MockModuleManager;
+import org.apache.skywalking.oap.server.analyzer.agent.kafka.mock.MockModuleProvider;
+import org.apache.skywalking.oap.server.analyzer.agent.kafka.module.KafkaFetcherConfig;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.config.NamingControl;
import org.apache.skywalking.oap.server.core.config.group.EndpointNameGrouping;
+import org.apache.skywalking.oap.server.core.source.ISource;
import org.apache.skywalking.oap.server.core.source.ServiceInstanceUpdate;
-import org.apache.skywalking.oap.server.core.source.Source;
import org.apache.skywalking.oap.server.core.source.SourceReceiver;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
-import org.apache.skywalking.oap.server.analyzer.agent.kafka.mock.MockModuleManager;
-import org.apache.skywalking.oap.server.analyzer.agent.kafka.mock.MockModuleProvider;
-import org.apache.skywalking.oap.server.analyzer.agent.kafka.module.KafkaFetcherConfig;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
@@ -52,7 +52,7 @@ public class ServiceManagementHandlerTest {
public static SourceReceiverRule SOURCE_RECEIVER = new SourceReceiverRule() {
@Override
- protected void verify(final List<Source> sourceList) throws Throwable {
+ protected void verify(final List<ISource> sourceList) throws Throwable {
ServiceInstanceUpdate instanceUpdate = (ServiceInstanceUpdate) sourceList.get(0);
Assert.assertEquals(instanceUpdate.getName(), SERVICE_INSTANCE);
diff --git a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/test/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/SourceReceiverRule.java b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/test/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/SourceReceiverRule.java
index aa80b09..9be40b9 100644
--- a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/test/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/SourceReceiverRule.java
+++ b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/test/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/SourceReceiverRule.java
@@ -21,15 +21,15 @@ package org.apache.skywalking.oap.server.analyzer.agent.kafka.provider.handler;
import com.google.common.collect.Lists;
import java.util.List;
import org.apache.skywalking.oap.server.core.analysis.DispatcherDetectorListener;
-import org.apache.skywalking.oap.server.core.source.Source;
+import org.apache.skywalking.oap.server.core.source.ISource;
import org.apache.skywalking.oap.server.core.source.SourceReceiver;
import org.junit.rules.Verifier;
public abstract class SourceReceiverRule extends Verifier implements SourceReceiver {
- private final List<Source> sourceList = Lists.newArrayList();
+ private final List<ISource> sourceList = Lists.newArrayList();
@Override
- public void receive(final Source source) {
+ public void receive(final ISource source) {
sourceList.add(source);
}
@@ -38,10 +38,11 @@ public abstract class SourceReceiverRule extends Verifier implements SourceRecei
return null;
}
+ @Override
protected void verify() throws Throwable {
verify(sourceList);
}
- protected abstract void verify(List<Source> sourceList) throws Throwable;
+ protected abstract void verify(List<ISource> sourceList) throws Throwable;
}
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/MockReceiver.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/MockReceiver.java
index 9443744..3ba6a36 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/MockReceiver.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/MockReceiver.java
@@ -22,7 +22,7 @@ import java.util.ArrayList;
import java.util.List;
import lombok.Getter;
import org.apache.skywalking.oap.server.core.analysis.DispatcherDetectorListener;
-import org.apache.skywalking.oap.server.core.source.Source;
+import org.apache.skywalking.oap.server.core.source.ISource;
import org.apache.skywalking.oap.server.core.source.SourceReceiver;
/**
@@ -30,10 +30,10 @@ import org.apache.skywalking.oap.server.core.source.SourceReceiver;
*/
public class MockReceiver implements SourceReceiver {
@Getter
- private List<Source> receivedSources = new ArrayList<>();
+ private List<ISource> receivedSources = new ArrayList<>();
@Override
- public void receive(final Source source) {
+ public void receive(final ISource source) {
receivedSources.add(source);
}
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/MultiScopesAnalysisListenerTest.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/MultiScopesAnalysisListenerTest.java
index 0b443eb..5d37e70 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/MultiScopesAnalysisListenerTest.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/MultiScopesAnalysisListenerTest.java
@@ -43,12 +43,12 @@ import org.apache.skywalking.oap.server.core.source.All;
import org.apache.skywalking.oap.server.core.source.DatabaseAccess;
import org.apache.skywalking.oap.server.core.source.Endpoint;
import org.apache.skywalking.oap.server.core.source.EndpointRelation;
+import org.apache.skywalking.oap.server.core.source.ISource;
import org.apache.skywalking.oap.server.core.source.Service;
import org.apache.skywalking.oap.server.core.source.ServiceInstance;
import org.apache.skywalking.oap.server.core.source.ServiceInstanceRelation;
import org.apache.skywalking.oap.server.core.source.ServiceMeta;
import org.apache.skywalking.oap.server.core.source.ServiceRelation;
-import org.apache.skywalking.oap.server.core.source.Source;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -147,7 +147,7 @@ public class MultiScopesAnalysisListenerTest {
listener.parseEntry(spanObject, segment);
listener.build();
- final List<Source> receivedSources = mockReceiver.getReceivedSources();
+ final List<ISource> receivedSources = mockReceiver.getReceivedSources();
Assert.assertEquals(7, receivedSources.size());
final All all = (All) receivedSources.get(0);
final Service service = (Service) receivedSources.get(1);
@@ -212,7 +212,7 @@ public class MultiScopesAnalysisListenerTest {
listener.parseEntry(spanObject, segment);
listener.build();
- final List<Source> receivedSources = mockReceiver.getReceivedSources();
+ final List<ISource> receivedSources = mockReceiver.getReceivedSources();
Assert.assertEquals(7, receivedSources.size());
final All all = (All) receivedSources.get(0);
final Service service = (Service) receivedSources.get(1);
@@ -276,7 +276,7 @@ public class MultiScopesAnalysisListenerTest {
listener.parseEntry(spanObject, segment);
listener.build();
- final List<Source> receivedSources = mockReceiver.getReceivedSources();
+ final List<ISource> receivedSources = mockReceiver.getReceivedSources();
Assert.assertEquals(7, receivedSources.size());
final All all = (All) receivedSources.get(0);
final Service service = (Service) receivedSources.get(1);
@@ -332,7 +332,7 @@ public class MultiScopesAnalysisListenerTest {
listener.parseLocal(spanObject, segment);
listener.build();
- final List<Source> receivedSources = mockReceiver.getReceivedSources();
+ final List<ISource> receivedSources = mockReceiver.getReceivedSources();
Assert.assertEquals(1, receivedSources.size());
final Endpoint source = (Endpoint) receivedSources.get(0);
Assert.assertEquals("/logic-call", source.getName());
@@ -377,7 +377,7 @@ public class MultiScopesAnalysisListenerTest {
listener.parseLocal(spanObject, segment);
listener.build();
- final List<Source> receivedSources = mockReceiver.getReceivedSources();
+ final List<ISource> receivedSources = mockReceiver.getReceivedSources();
Assert.assertEquals(1, receivedSources.size());
final Endpoint source = (Endpoint) receivedSources.get(0);
Assert.assertEquals("/GraphQL-service", source.getName());
@@ -416,7 +416,7 @@ public class MultiScopesAnalysisListenerTest {
listener.parseExit(spanObject, segment);
listener.build();
- final List<Source> receivedSources = mockReceiver.getReceivedSources();
+ final List<ISource> receivedSources = mockReceiver.getReceivedSources();
Assert.assertEquals(4, receivedSources.size());
final ServiceRelation serviceRelation = (ServiceRelation) receivedSources.get(0);
final ServiceInstanceRelation serviceInstanceRelation = (ServiceInstanceRelation) receivedSources.get(1);
@@ -462,7 +462,7 @@ public class MultiScopesAnalysisListenerTest {
listener.parseExit(spanObject, segment);
listener.build();
- final List<Source> receivedSources = mockReceiver.getReceivedSources();
+ final List<ISource> receivedSources = mockReceiver.getReceivedSources();
Assert.assertEquals(2, receivedSources.size());
final ServiceRelation serviceRelation = (ServiceRelation) receivedSources.get(0);
final ServiceInstanceRelation serviceInstanceRelation = (ServiceInstanceRelation) receivedSources.get(1);
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/ESEventQueryDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/ESEventQueryDAO.java
index 749037b..7aee3e7 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/ESEventQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/ESEventQueryDAO.java
@@ -23,7 +23,7 @@ import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-import org.apache.skywalking.oap.server.core.event.Event;
+import org.apache.skywalking.oap.server.core.source.Event;
import org.apache.skywalking.oap.server.core.query.enumeration.Order;
import org.apache.skywalking.oap.server.core.query.input.Duration;
import org.apache.skywalking.oap.server.core.query.type.event.EventQueryCondition;
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/query/ES7EventQueryDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/query/ES7EventQueryDAO.java
index 2ddfd7a..c0df97b 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/query/ES7EventQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/query/ES7EventQueryDAO.java
@@ -22,7 +22,7 @@ import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-import org.apache.skywalking.oap.server.core.event.Event;
+import org.apache.skywalking.oap.server.core.source.Event;
import org.apache.skywalking.oap.server.core.query.type.event.EventQueryCondition;
import org.apache.skywalking.oap.server.core.query.type.event.Events;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
diff --git a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/EventQueryDAO.java b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/EventQueryDAO.java
index ab36ca2..1d36892 100644
--- a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/EventQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/EventQueryDAO.java
@@ -26,7 +26,7 @@ import java.util.Map;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.apache.skywalking.oap.server.core.event.Event;
+import org.apache.skywalking.oap.server.core.source.Event;
import org.apache.skywalking.oap.server.core.query.enumeration.Order;
import org.apache.skywalking.oap.server.core.query.input.Duration;
import org.apache.skywalking.oap.server.core.query.type.event.EventQueryCondition;
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2EventQueryDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2EventQueryDAO.java
index 43d8f95..c29dfc9 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2EventQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2EventQueryDAO.java
@@ -28,7 +28,7 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.apache.skywalking.oap.server.core.event.Event;
+import org.apache.skywalking.oap.server.core.source.Event;
import org.apache.skywalking.oap.server.core.query.input.Duration;
import org.apache.skywalking.oap.server.core.query.type.event.EventQueryCondition;
import org.apache.skywalking.oap.server.core.query.type.event.EventType;
diff --git a/oap-server/server-tools/profile-exporter/tool-profile-snapshot-server-mock/src/main/java/org/apache/skywalking/oap/server/tool/profile/core/mock/MockSourceReceiver.java b/oap-server/server-tools/profile-exporter/tool-profile-snapshot-server-mock/src/main/java/org/apache/skywalking/oap/server/tool/profile/core/mock/MockSourceReceiver.java
index e0f653d..3da59d7 100644
--- a/oap-server/server-tools/profile-exporter/tool-profile-snapshot-server-mock/src/main/java/org/apache/skywalking/oap/server/tool/profile/core/mock/MockSourceReceiver.java
+++ b/oap-server/server-tools/profile-exporter/tool-profile-snapshot-server-mock/src/main/java/org/apache/skywalking/oap/server/tool/profile/core/mock/MockSourceReceiver.java
@@ -19,7 +19,7 @@
package org.apache.skywalking.oap.server.tool.profile.core.mock;
import org.apache.skywalking.oap.server.core.analysis.DispatcherDetectorListener;
-import org.apache.skywalking.oap.server.core.source.Source;
+import org.apache.skywalking.oap.server.core.source.ISource;
import org.apache.skywalking.oap.server.core.source.SourceReceiver;
/**
@@ -27,7 +27,7 @@ import org.apache.skywalking.oap.server.core.source.SourceReceiver;
*/
public class MockSourceReceiver implements SourceReceiver {
@Override
- public void receive(Source source) {
+ public void receive(ISource source) {
}
@Override