You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ke...@apache.org on 2021/02/23 12:07:15 UTC
[skywalking] branch master updated: Introduce log analysis language
(LAL) (#6388)
This is an automated email from the ASF dual-hosted git repository.
kezhenxu94 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new 96f1c12 Introduce log analysis language (LAL) (#6388)
96f1c12 is described below
commit 96f1c12159297f67cd6cd724b552b182bf44c573
Author: Zhenxu Ke <ke...@apache.org>
AuthorDate: Tue Feb 23 20:06:48 2021 +0800
Introduce log analysis language (LAL) (#6388)
---
CHANGES.md | 1 +
apm-dist-es7/src/main/assembly/binary-es7.xml | 10 +-
apm-dist/src/main/assembly/binary.xml | 10 +-
.../alarm-settings.yml} | 0
dist-material/config-examples/lal.yaml | 50 +++
dist-material/config-examples/log-mal.yaml | 37 +++
docs/en/concepts-and-designs/lal.md | 342 +++++++++++++++++++++
oap-server/analyzer/log-analyzer/pom.xml | 11 +-
.../skywalking/oap/log/analyzer/dsl/Binding.java | 117 +++++++
.../skywalking/oap/log/analyzer/dsl/DSL.java | 58 ++++
.../oap/log/analyzer/dsl/spec/AbstractSpec.java | 52 ++++
.../analyzer/dsl/spec/extractor/ExtractorSpec.java | 197 ++++++++++++
.../log/analyzer/dsl/spec/filter/FilterSpec.java | 160 ++++++++++
.../spec/parser/JsonParserSpec.java} | 26 +-
.../analyzer/dsl/spec/parser/TextParserSpec.java | 57 ++++
.../spec/parser/YamlParserSpec.java} | 29 +-
.../log/analyzer/dsl/spec/sink/SamplerSpec.java | 84 +++++
.../oap/log/analyzer/dsl/spec/sink/SinkSpec.java | 61 ++++
.../dsl/spec/sink/sampler/RateLimitingSampler.java | 107 +++++++
.../spec/sink/sampler/Sampler.java} | 27 +-
...LogAnalyzerModuleConfig.java => LALConfig.java} | 11 +-
.../oap/log/analyzer/provider/LALConfigs.java | 77 +++++
.../analyzer/provider/LogAnalyzerModuleConfig.java | 45 +++
.../provider/LogAnalyzerModuleProvider.java | 12 +-
.../oap/log/analyzer/provider/log/LogAnalyzer.java | 2 +-
.../provider/log/listener/LogAnalysisListener.java | 3 +-
.../log/listener/LogAnalysisListenerFactory.java | 5 +-
.../provider/log/listener/LogFilterListener.java | 76 +++++
.../log/listener/RecordAnalysisListener.java | 10 +-
.../log/listener/TrafficAnalysisListener.java | 10 +-
oap-server/server-bootstrap/pom.xml | 2 +
.../src/main/resources/application.yml | 2 +
.../src/main/resources/lal/default.yaml | 43 +--
.../main/resources/log-mal-rules/placeholder.yaml | 36 +--
.../e2e/e2e-test/docker/log/docker-compose.es6.yml | 9 +-
.../e2e/e2e-test/docker/log/docker-compose.es7.yml | 7 +-
test/e2e/e2e-test/docker/log/docker-compose.h2.yml | 9 +-
.../docker/log/docker-compose.influxdb.yml | 7 +-
.../e2e-test/docker/log/docker-compose.mysql.yml | 7 +-
.../log/{docker-compose.influxdb.yml => lal.yaml} | 53 ++--
test/e2e/e2e-test/docker/log/log-mal.yaml | 36 +++
.../java/org/apache/skywalking/e2e/log/LogE2E.java | 32 +-
42 files changed, 1762 insertions(+), 168 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 40f7c03..1e596f0 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -27,6 +27,7 @@ Release Notes.
* Storage plugin supports postgresql.
* Fix kubernetes.client.opeanapi.ApiException.
* Remove filename suffix in the meter active file config.
+* Introduce log analysis language (LAL).
#### UI
* Update selector scroller to show in all pages.
diff --git a/apm-dist-es7/src/main/assembly/binary-es7.xml b/apm-dist-es7/src/main/assembly/binary-es7.xml
index d5b84cf..060bbd2 100644
--- a/apm-dist-es7/src/main/assembly/binary-es7.xml
+++ b/apm-dist-es7/src/main/assembly/binary-es7.xml
@@ -40,7 +40,13 @@
<includes>
<include>log4j2.xml</include>
<include>alarm-settings.yml</include>
- <include>alarm-settings-sample.yml</include>
+ </includes>
+ </fileSet>
+ <fileSet>
+ <directory>${project.basedir}/../dist-material</directory>
+ <outputDirectory>/</outputDirectory>
+ <includes>
+ <include>config-examples/*</include>
</includes>
</fileSet>
<fileSet>
@@ -59,6 +65,8 @@
<include>zabbix-rules/*.yaml</include>
<include>otel-oc-rules/*</include>
<include>ui-initialized-templates/*</include>
+ <include>lal/*</include>
+ <include>log-mal-rules/*</include>
</includes>
<outputDirectory>/config</outputDirectory>
</fileSet>
diff --git a/apm-dist/src/main/assembly/binary.xml b/apm-dist/src/main/assembly/binary.xml
index 56c0f7a..a81c44d 100644
--- a/apm-dist/src/main/assembly/binary.xml
+++ b/apm-dist/src/main/assembly/binary.xml
@@ -40,7 +40,13 @@
<includes>
<include>log4j2.xml</include>
<include>alarm-settings.yml</include>
- <include>alarm-settings-sample.yml</include>
+ </includes>
+ </fileSet>
+ <fileSet>
+ <directory>${project.basedir}/../dist-material</directory>
+ <outputDirectory>/</outputDirectory>
+ <includes>
+ <include>config-examples/*</include>
</includes>
</fileSet>
<fileSet>
@@ -59,6 +65,8 @@
<include>zabbix-rules/*.yaml</include>
<include>otel-oc-rules/*</include>
<include>ui-initialized-templates/*</include>
+ <include>lal/*</include>
+ <include>log-mal-rules/*</include>
</includes>
<outputDirectory>/config</outputDirectory>
</fileSet>
diff --git a/dist-material/alarm-settings-sample.yml b/dist-material/config-examples/alarm-settings.yml
similarity index 100%
rename from dist-material/alarm-settings-sample.yml
rename to dist-material/config-examples/alarm-settings.yml
diff --git a/dist-material/config-examples/lal.yaml b/dist-material/config-examples/lal.yaml
new file mode 100644
index 0000000..f20e359
--- /dev/null
+++ b/dist-material/config-examples/lal.yaml
@@ -0,0 +1,50 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Example config file of path: config/lal/config.yaml
+rules:
+ - name: example
+ dsl: |
+ filter {
+ if (log.service == "TestService") {
+ abort {}
+ }
+ text {
+ if (!regexp($/(?s)(?<timestamp>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}.\d{3}) \[TID:(?<tid>.+?)] \[(?<thread>.+?)] (?<level>\w{4,}) (?<logger>.{1,36}) (?<msg>.+)/$)) {
+ abort {}
+ }
+ }
+ extractor {
+ metrics {
+ timestamp log.timestamp
+ labels level: parsed.level, service: log.service, instance: log.serviceInstance
+ name "log_count"
+ value 1
+ }
+ }
+ sink {
+ sampler {
+ if (log.service == "ImportantApp") {
+ rateLimit("ImportantAppSampler") {
+ qps 300
+ }
+ } else {
+ rateLimit("OtherSampler") {
+ qps 30
+ }
+ }
+ }
+ }
+ }
diff --git a/dist-material/config-examples/log-mal.yaml b/dist-material/config-examples/log-mal.yaml
new file mode 100644
index 0000000..78a98cc
--- /dev/null
+++ b/dist-material/config-examples/log-mal.yaml
@@ -0,0 +1,37 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# This will parse a textual representation of a duration. The formats
+# accepted are based on the ISO-8601 duration format {@code PnDTnHnMn.nS}
+# with days considered to be exactly 24 hours.
+# <p>
+# Examples:
+# <pre>
+# "PT20.345S" -- parses as "20.345 seconds"
+# "PT15M" -- parses as "15 minutes" (where a minute is 60 seconds)
+# "PT10H" -- parses as "10 hours" (where an hour is 3600 seconds)
+# "P2D" -- parses as "2 days" (where a day is 24 hours or 86400 seconds)
+# "P2DT3H4M" -- parses as "2 days, 3 hours and 4 minutes"
+# "P-6H3M" -- parses as "-6 hours and +3 minutes"
+# "-P6H3M" -- parses as "-6 hours and -3 minutes"
+# "-P-6H+3M" -- parses as "+6 hours and -3 minutes"
+# </pre>
+
+# Example config file of path: config/log-mal-rules/config.yaml
+expSuffix: instance(['service'], ['instance'])
+metricPrefix: log
+metricsRules:
+ - name: count_info
+ exp: log_count.tagEqual('level', 'INFO').sum(['service', 'instance'])
diff --git a/docs/en/concepts-and-designs/lal.md b/docs/en/concepts-and-designs/lal.md
new file mode 100644
index 0000000..90cad9a
--- /dev/null
+++ b/docs/en/concepts-and-designs/lal.md
@@ -0,0 +1,342 @@
+# Log Analysis Language
+
+Log Analysis Language (LAL) in SkyWalking is essentially a Domain-Specific Language (DSL) to analyze logs. You can use
+LAL to parse, extract, and save the logs, as well as collaborate the logs with traces (by extracting the trace id,
+segment id and span id) and metrics (by generating metrics from the logs and send them to the meter system).
+
+The LAL config files are in YAML format, and are located under directory `lal`, you can
+set `log-analyzer/default/lalFiles` in the `application.yml` file or set environment variable `SW_LOG_LAL_FILES` to
+activate specific LAL config files.
+
+## Filter
+
+A filter is a group of [parser](#parser), [extractor](#extractor) and [sink](#sink). Users can use one or more filters
+to organize their processing logics. Every piece of log will be sent to all filters in an LAL rule. The piece of log
+sent into the filter is available as property `log` in the LAL, therefore you can access the log service name
+via `log.service`, for all available fields of `log`, please refer to [the protocol definition](https://github.com/apache/skywalking-data-collect-protocol/blob/master/logging/Logging.proto#L41).
+
+All components are executed sequentially in the orders they are declared.
+
+### Global Functions
+
+There are functions globally available that you can use them in all components (i.e. parsers, extractors, and sinks)
+when needed.
+
+- `abort`
+
+By default, all components declared are executed no matter what flags (`dropped`, `saved`, etc.) have been set. There
+are cases where you may want the filter chain to stop earlier when specified conditions are met. `abort` function aborts
+the remaining filter chain from where it's declared, all the remaining components won't be executed at all.
+`abort` function serves as a fast-fail mechanism in LAL.
+
+```groovy
+filter {
+ if (log.service == "TestingService") { // Don't waste resources on TestingServices
+ abort {} // all remaining components won't be executed at all
+ }
+ text {
+ if (!regexp("(?<timestamp>\\d{8}) (?<thread>\\w+) (?<level>\\w+) (?<traceId>\\w+) (?<msg>.+)")) {
+ // if the logs don't match this regexp, skip it
+ abort {}
+ }
+ }
+ // ... extractors, sinks
+}
+```
+
+Note that when you put `regexp` in an `if` statement, you need to surround the expression with `()`
+like `regexp(<the expression>)`, instead of `regexp <the expression>`.
+
+### Parser
+
+Parsers are responsible for parsing the raw logs into structured data in SkyWalking for further processing. There are 3
+types of parsers at the moment, namely `json`, `yaml`, and `text`.
+
+When a piece of log is parsed, there is a corresponding property available, called `parsed`, injected by LAL.
+Property `parsed` is typically a map, containing all the fields parsed from the raw logs, for example, if the parser
+is `json` / `yaml`, `parsed` is a map containing all the key-values in the `json` / `yaml`, if the parser is `text`
+, `parsed` is a map containing all the captured groups and their values (for `regexp` and `grok`). See examples below.
+
+#### `json`
+
+<!-- TODO: is structured in the reported (gRPC) `LogData`, not much to do -->
+
+#### `yaml`
+
+<!-- TODO: is structured in the reported (gRPC) `LogData`, not much to do -->
+
+#### `text`
+
+For unstructured logs, there are some `text` parsers for use.
+
+- `regexp`
+
+`regexp` parser uses a regular expression (`regexp`) to parse the logs. It leverages the captured groups of the regexp,
+all the captured groups can be used later in the extractors or sinks.
+`regexp` returns a `boolean` indicating whether the log matches the pattern or not.
+
+```groovy
+filter {
+ text {
+ regexp "(?<timestamp>\\d{8}) (?<thread>\\w+) (?<level>\\w+) (?<traceId>\\w+) (?<msg>.+)"
+ // this is just a demo pattern
+ }
+ extractor {
+ tag level: parsed.level
+ // we add a tag called `level` and its value is parsed.level, captured from the regexp above
+ traceId parsed.traceId
+ // we also extract the trace id from the parsed result, which will be used to associate the log with the trace
+ }
+ // ...
+}
+```
+
+- `grok`
+
+<!-- TODO: grok Java library has poor performance, need to benchmark it, the idea is basically the same with `regexp` above -->
+
+### Extractor
+
+Extractors aim to extract metadata from the logs. The metadata can be a service name, a service instance name, an
+endpoint name, or even a trace ID, all of which can be associated with the existing traces and metrics.
+
+- `service`
+
+`service` extracts the service name from the `parsed` result, and set it into the `LogData`, which will be persisted (if
+not dropped) and is used to associate with traces / metrics.
+
+- `instance`
+
+`instance` extracts the service instance name from the `parsed` result, and set it into the `LogData`, which will be
+persisted (if not dropped) and is used to associate with traces / metrics.
+
+- `endpoint`
+
+`endpoint` extracts the service instance name from the `parsed` result, and set it into the `LogData`, which will be
+persisted (if not dropped) and is used to associate with traces / metrics.
+
+- `traceId`
+
+`traceId` extracts the trace ID from the `parsed` result, and set it into the `LogData`, which will be persisted (if not
+dropped) and is used to associate with traces / metrics.
+
+- `segmentId`
+
+`segmentId` extracts the segment ID from the `parsed` result, and set it into the `LogData`, which will be persisted (if
+not dropped) and is used to associate with traces / metrics.
+
+- `spanId`
+
+`spanId` extracts the span ID from the `parsed` result, and set it into the `LogData`, which will be persisted (if not
+dropped) and is used to associate with traces / metrics.
+
+- `timestamp`
+
+`timestamp` extracts the timestamp from the `parsed` result, and set it into the `LogData`, which will be persisted (if
+not dropped) and is used to associate with traces / metrics.
+
+The unit of `timestamp` is millisecond.
+
+- `tag`
+
+`tag` extracts the tags from the `parsed` result, and set them into the `LogData`. The form of this extractor is
+something like `tag key1: value, key2: value2`, you can use the properties of `parsed` as both keys and values.
+
+```groovy
+filter {
+ // ... parser
+
+ extractor {
+ tag level: parsed.level, (parsed.statusCode): parsed.statusMsg
+ tag anotherKey: "anotherConstantValue"
+ }
+}
+```
+
+- `metrics`
+
+`metrics` extracts / generates metrics from the logs, and sends the generated metrics to the meter system, you can
+configure [MAL](mal.md) for further analysis of these metrics. The dedicated MAL config files are under
+directory `log-mal-rules`, you can set `log-analyzer/default/malFiles` to enable configured files.
+
+```yaml
+# application.yml
+# ...
+log-analyzer:
+ selector: ${SW_LOG_ANALYZER:default}
+ default:
+ lalFiles: ${SW_LOG_LAL_FILES:my-lal-config} # files are under "lal" directory
+ malFiles: ${SW_LOG_MAL_FILES:my-lal-mal-config,another-lal-mal-config} # files are under "log-mal-rules" directory
+```
+
+Examples are as follows:
+
+```groovy
+filter {
+ // ...
+ extractor {
+ service parsed.serviceName
+ metrics {
+ name "log_count"
+ timestamp parsed.timestamp
+ labels level: parsed.level, service: parsed.service, instance: parsed.instance
+ value 1
+ }
+ metrics {
+ name "http_response_time"
+ timestamp parsed.timestamp
+ labels status_code: parsed.statusCode, service: parsed.service, instance: parsed.instance
+ value parsed.duration
+ }
+ }
+ // ...
+}
+```
+
+The extractor above generates a metrics named `log_count`, with tag key `level` and value `1`, after this, you can
+configure MAL rules to calculate the log count grouping by logging level like this:
+
+```yaml
+# ... other configurations of MAL
+
+metrics:
+ - name: log_count_debug
+ exp: log_count.tagEqual('level', 'DEBUG').sum(['service', 'instance']).increase('PT1M')
+ - name: log_count_error
+ exp: log_count.tagEqual('level', 'ERROR').sum(['service', 'instance']).increase('PT1M')
+
+```
+
+The other metrics generated is `http_response_time`, so that you can configure MAL rules to generate more useful metrics
+like percentiles.
+
+```yaml
+# ... other configurations of MAL
+
+metrics:
+ - name: response_time_percentile
+ exp: http_response_time.sum(['le', 'service', 'instance']).increase('PT5M').histogram().histogram_percentile([50,70,90,99])
+```
+
+### Sink
+
+Sinks are the persistent layer of the LAL. By default, all the logs of each filter are persisted into the storage.
+However, there are some mechanisms that allow you to selectively save some logs, or even drop all the logs after you've
+extracted useful information, such as metrics.
+
+#### Sampler
+
+Sampler allows you to save the logs in a sampling manner. Currently, sampling strategy `rateLimit` is supported, welcome
+to contribute more sampling strategies. If multiple samplers are specified, the last one determines the final sampling
+result, see examples in [Enforcer](#enforcer).
+
+`rateLimit` samples `n` logs at most in 1 second. `rateLimit("SamplerID")` requires an ID for the sampler, sampler
+declarations with the same ID share the same sampler instance, and thus share the same `qps`, resetting logics.
+
+Examples:
+
+```groovy
+filter {
+ // ... parser
+
+ sink {
+ sampler {
+ if (parsed.service == "ImportantApp") {
+ rateLimit("ImportantAppSampler") {
+ qps 30 // samples 30 pieces of logs every second for service "ImportantApp"
+ }
+ } else {
+ rateLimit("OtherSampler") {
+ qps 3 // samples 3 pieces of logs every second for other services than "ImportantApp"
+ }
+ }
+ }
+ }
+}
+```
+
+#### Dropper
+
+Dropper is a special sink, meaning that all the logs are dropped without any exception. This is useful when you want to
+drop debugging logs,
+
+```groovy
+filter {
+ // ... parser
+
+ sink {
+ if (parsed.level == "DEBUG") {
+ dropper {}
+ } else {
+ sampler {
+ // ... configs
+ }
+ }
+ }
+}
+```
+
+or you have multiple filters, some of which are for extracting metrics, only one of them needs to be persisted.
+
+```groovy
+filter { // filter A: this is for persistence
+ // ... parser
+
+ sink {
+ sampler {
+ // .. sampler configs
+ }
+ }
+}
+filter { // filter B:
+ // ... extractors to generate many metrics
+ extractors {
+ metrics {
+ // ... metrics
+ }
+ }
+ sink {
+ dropper {} // drop all logs because they have been saved in "filter A" above.
+ }
+}
+```
+
+#### Enforcer
+
+Enforcer is another special sink that forcibly samples the log, a typical use case of enforcer is when you have
+configured a sampler and want to save some logs forcibly, for example, to save error logs even if the sampling mechanism
+is configured.
+
+```groovy
+filter {
+ // ... parser
+
+ sink {
+ sampler {
+ // ... sampler configs
+ }
+ if (parserd.level == "ERROR" || parsed.userId == "TestingUserId") { // sample error logs or testing users' logs (userId == "TestingUserId") even if the sampling strategy is configured
+ enforcer {
+ }
+ }
+ }
+}
+```
+
+You can use `enforcer` and `dropper` to simulate a probabilistic sampler like this.
+
+```groovy
+filter {
+ // ... parser
+
+ sink {
+ sampler { // simulate a probabilistic sampler with sampler rate 30% (not accurate though)
+ if (Math.abs(Math.random()) > 0.3) {
+ enforcer {}
+ } else {
+ dropper {}
+ }
+ }
+ }
+}
+```
diff --git a/oap-server/analyzer/log-analyzer/pom.xml b/oap-server/analyzer/log-analyzer/pom.xml
index e16e2eb..6e2fbce 100644
--- a/oap-server/analyzer/log-analyzer/pom.xml
+++ b/oap-server/analyzer/log-analyzer/pom.xml
@@ -33,6 +33,15 @@
<artifactId>server-core</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.skywalking</groupId>
+ <artifactId>meter-analyzer</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.codehaus.groovy</groupId>
+ <artifactId>groovy</artifactId>
+ </dependency>
</dependencies>
-</project>
\ No newline at end of file
+</project>
diff --git a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/Binding.java b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/Binding.java
new file mode 100644
index 0000000..7502865
--- /dev/null
+++ b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/Binding.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.log.analyzer.dsl;
+
+import java.util.Map;
+import java.util.regex.Matcher;
+import lombok.Getter;
+import org.apache.skywalking.apm.network.logging.v3.LogData;
+
+/**
+ * The binding bridge between OAP and the DSL, which provides some convenient methods to ease the use of the raw {@link groovy.lang.Binding#setProperty(java.lang.String, java.lang.Object)} and {@link
+ * groovy.lang.Binding#getProperty(java.lang.String)}.
+ */
+public class Binding extends groovy.lang.Binding {
+ public static final String KEY_LOG = "log";
+
+ public static final String KEY_PARSED = "parsed";
+
+ public static final String KEY_SAVE = "save";
+
+ public static final String KEY_ABORT = "abort";
+
+ public Binding() {
+ setProperty(KEY_PARSED, new Parsed());
+ }
+
+ public Binding log(final LogData.Builder log) {
+ setProperty(KEY_LOG, log);
+ setProperty(KEY_SAVE, true);
+ setProperty(KEY_ABORT, false);
+ return this;
+ }
+
+ public Binding log(final LogData log) {
+ return log(log.toBuilder());
+ }
+
+ public LogData.Builder log() {
+ return (LogData.Builder) getProperty(KEY_LOG);
+ }
+
+ public Binding parsed(final Matcher parsed) {
+ parsed().matcher = parsed;
+ return this;
+ }
+
+ public Binding parsed(final Map<String, Object> parsed) {
+ parsed().map = parsed;
+ return this;
+ }
+
+ public Parsed parsed() {
+ return (Parsed) getProperty(KEY_PARSED);
+ }
+
+ public Binding save() {
+ setProperty(KEY_SAVE, true);
+ return this;
+ }
+
+ public Binding drop() {
+ setProperty(KEY_SAVE, false);
+ return this;
+ }
+
+ public boolean shouldSave() {
+ return (boolean) getProperty(KEY_SAVE);
+ }
+
+ public Binding abort() {
+ setProperty(KEY_ABORT, true);
+ return this;
+ }
+
+ public boolean shouldAbort() {
+ return (boolean) getProperty(KEY_ABORT);
+ }
+
+ public static class Parsed {
+ @Getter
+ private Matcher matcher;
+
+ @Getter
+ private Map<String, Object> map;
+
+ public Object getAt(final String key) {
+ if (matcher != null) {
+ return matcher.group(key);
+ }
+ if (map != null) {
+ return map.get(key);
+ }
+ return null;
+ }
+
+ @SuppressWarnings("unused")
+ public Object propertyMissing(final String name) {
+ return getAt(name);
+ }
+ }
+}
diff --git a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/DSL.java b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/DSL.java
new file mode 100644
index 0000000..c5fad4e
--- /dev/null
+++ b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/DSL.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.log.analyzer.dsl;
+
+import groovy.lang.GroovyShell;
+import groovy.util.DelegatingScript;
+import lombok.AccessLevel;
+import lombok.RequiredArgsConstructor;
+import org.apache.skywalking.oap.log.analyzer.dsl.spec.filter.FilterSpec;
+import org.apache.skywalking.oap.log.analyzer.provider.LogAnalyzerModuleConfig;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.library.module.ModuleStartException;
+import org.codehaus.groovy.control.CompilerConfiguration;
+
+@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
+public class DSL {
+ private final DelegatingScript script;
+
+ private final FilterSpec filterSpec;
+
+ public static DSL of(final ModuleManager moduleManager,
+ final LogAnalyzerModuleConfig config,
+ final String dsl) throws ModuleStartException {
+ final CompilerConfiguration cc = new CompilerConfiguration();
+ cc.setScriptBaseClass(DelegatingScript.class.getName());
+
+ final GroovyShell sh = new GroovyShell(cc);
+ final DelegatingScript script = (DelegatingScript) sh.parse(dsl);
+ final FilterSpec filterSpec = new FilterSpec(moduleManager, config);
+ script.setDelegate(filterSpec);
+
+ return new DSL(script, filterSpec);
+ }
+
+ public void bind(final Binding binding) {
+ this.filterSpec.bind(binding);
+ }
+
+ public void evaluate() {
+ script.run();
+ }
+}
diff --git a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/spec/AbstractSpec.java b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/spec/AbstractSpec.java
new file mode 100644
index 0000000..260d61f
--- /dev/null
+++ b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/spec/AbstractSpec.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.log.analyzer.dsl.spec;
+
+import groovy.lang.Closure;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.experimental.Accessors;
+import org.apache.skywalking.oap.log.analyzer.dsl.Binding;
+import org.apache.skywalking.oap.log.analyzer.provider.LogAnalyzerModuleConfig;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+
+@Getter
+@RequiredArgsConstructor
+@Accessors(fluent = true)
+public abstract class AbstractSpec {
+ private final ModuleManager moduleManager;
+
+ private final LogAnalyzerModuleConfig moduleConfig;
+
+ protected static final ThreadLocal<Binding> BINDING = ThreadLocal.withInitial(Binding::new);
+
+ public void bind(final Binding b) {
+ BINDING.set(b);
+ }
+
+ @SuppressWarnings("unused")
+ public void abort(final Closure<Void> cl) {
+ BINDING.get().abort();
+ }
+
+ @SuppressWarnings("unused")
+ public Object propertyMissing(final String name) {
+ return BINDING.get().getVariable(name);
+ }
+}
diff --git a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/spec/extractor/ExtractorSpec.java b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/spec/extractor/ExtractorSpec.java
new file mode 100644
index 0000000..0d68fe2
--- /dev/null
+++ b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/spec/extractor/ExtractorSpec.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.log.analyzer.dsl.spec.extractor;
+
+import com.google.common.collect.ImmutableMap;
+import groovy.lang.Closure;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import lombok.experimental.Delegate;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.skywalking.apm.network.common.v3.KeyStringValuePair;
+import org.apache.skywalking.apm.network.logging.v3.LogData;
+import org.apache.skywalking.apm.network.logging.v3.TraceContext;
+import org.apache.skywalking.oap.log.analyzer.dsl.spec.AbstractSpec;
+import org.apache.skywalking.oap.log.analyzer.provider.LogAnalyzerModuleConfig;
+import org.apache.skywalking.oap.meter.analyzer.MetricConvert;
+import org.apache.skywalking.oap.meter.analyzer.dsl.Sample;
+import org.apache.skywalking.oap.meter.analyzer.dsl.SampleFamily;
+import org.apache.skywalking.oap.meter.analyzer.dsl.SampleFamilyBuilder;
+import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.analysis.meter.MeterSystem;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.library.module.ModuleStartException;
+import org.apache.skywalking.oap.server.library.util.CollectionUtils;
+
+import static java.util.Objects.nonNull;
+import static org.apache.skywalking.apm.util.StringUtil.isNotBlank;
+
+public class ExtractorSpec extends AbstractSpec {
+
+ private final List<MetricConvert> metricConverts;
+
+ public ExtractorSpec(final ModuleManager moduleManager,
+ final LogAnalyzerModuleConfig moduleConfig) throws ModuleStartException {
+ super(moduleManager, moduleConfig);
+
+ final MeterSystem meterSystem = moduleManager.find(CoreModule.NAME).provider().getService(MeterSystem.class);
+
+ metricConverts = moduleConfig.malConfigs()
+ .stream()
+ .map(it -> new MetricConvert(it, meterSystem))
+ .collect(Collectors.toList());
+ }
+
+ @SuppressWarnings("unused")
+ public void service(final String service) {
+ if (BINDING.get().shouldAbort()) {
+ return;
+ }
+ if (nonNull(service)) {
+ BINDING.get().log().setService(service);
+ }
+ }
+
+ @SuppressWarnings("unused")
+ public void instance(final String instance) {
+ if (BINDING.get().shouldAbort()) {
+ return;
+ }
+ if (nonNull(instance)) {
+ BINDING.get().log().setServiceInstance(instance);
+ }
+ }
+
+ @SuppressWarnings("unused")
+ public void endpoint(final String endpoint) {
+ if (BINDING.get().shouldAbort()) {
+ return;
+ }
+ if (nonNull(endpoint)) {
+ BINDING.get().log().setEndpoint(endpoint);
+ }
+ }
+
+ @SuppressWarnings("unused")
+ public void tag(final Map<String, Object> kv) {
+ if (BINDING.get().shouldAbort()) {
+ return;
+ }
+ if (CollectionUtils.isEmpty(kv)) {
+ return;
+ }
+ final LogData.Builder logData = BINDING.get().log();
+ logData.setTags(
+ logData.getTags()
+ .toBuilder()
+ .addAllData(
+ kv.entrySet()
+ .stream()
+ .filter(it -> isNotBlank(it.getKey()))
+ .filter(it -> nonNull(it.getValue()) && isNotBlank(Objects.toString(it.getValue())))
+ .map(it -> KeyStringValuePair.newBuilder().setKey(it.getKey()).setValue(Objects.toString(it.getValue())).build())
+ .collect(Collectors.toList())
+ )
+ );
+ BINDING.get().log(logData);
+ }
+
+ @SuppressWarnings("unused")
+ public void traceId(final String traceId) {
+ if (BINDING.get().shouldAbort()) {
+ return;
+ }
+ if (nonNull(traceId)) {
+ final LogData.Builder logData = BINDING.get().log();
+ final TraceContext.Builder traceContext = logData.getTraceContext().toBuilder();
+ traceContext.setTraceId(traceId);
+ logData.setTraceContext(traceContext);
+ }
+ }
+
+ @SuppressWarnings("unused")
+ public void segmentId(final String segmentId) {
+ if (BINDING.get().shouldAbort()) {
+ return;
+ }
+ if (nonNull(segmentId)) {
+ final LogData.Builder logData = BINDING.get().log();
+ final TraceContext.Builder traceContext = logData.getTraceContext().toBuilder();
+ traceContext.setTraceSegmentId(segmentId);
+ logData.setTraceContext(traceContext);
+ }
+ }
+
+ @SuppressWarnings("unused")
+ public void spanId(final String spanId) {
+ if (BINDING.get().shouldAbort()) {
+ return;
+ }
+ if (nonNull(spanId)) {
+ final LogData.Builder logData = BINDING.get().log();
+ final TraceContext.Builder traceContext = logData.getTraceContext().toBuilder();
+ traceContext.setSpanId(Integer.parseInt(spanId));
+ logData.setTraceContext(traceContext);
+ }
+ }
+
+ @SuppressWarnings("unused")
+ public void timestamp(final String timestamp) {
+ if (BINDING.get().shouldAbort()) {
+ return;
+ }
+ if (nonNull(timestamp) && StringUtils.isNumeric(timestamp)) {
+ BINDING.get().log().setTimestamp(Long.parseLong(timestamp));
+ }
+ }
+
+ @SuppressWarnings("unused")
+ public void metrics(final Closure<Void> cl) {
+ if (BINDING.get().shouldAbort()) {
+ return;
+ }
+ final SampleBuilder builder = new SampleBuilder();
+ cl.setDelegate(builder);
+ cl.call();
+
+ final Sample sample = builder.build();
+
+ metricConverts.forEach(it -> it.toMeter(
+ ImmutableMap.<String, SampleFamily>builder()
+ .put(sample.getName(), SampleFamilyBuilder.newBuilder(sample).build())
+ .build()
+ ));
+ }
+
+ public static class SampleBuilder {
+ @Delegate
+ private final Sample.SampleBuilder sampleBuilder = Sample.builder();
+
+ @SuppressWarnings("unused")
+ public Sample.SampleBuilder labels(final Map<String, String> labels) {
+ final Map<String, String> filtered = labels.entrySet()
+ .stream()
+ .filter(it -> isNotBlank(it.getKey()) && isNotBlank(it.getValue()))
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+ return sampleBuilder.labels(ImmutableMap.copyOf(filtered));
+ }
+ }
+}
diff --git a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/spec/filter/FilterSpec.java b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/spec/filter/FilterSpec.java
new file mode 100644
index 0000000..9cfe593
--- /dev/null
+++ b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/spec/filter/FilterSpec.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.log.analyzer.dsl.spec.filter;
+
+import com.google.gson.reflect.TypeToken;
+import com.google.protobuf.TextFormat;
+import groovy.lang.Closure;
+import java.lang.reflect.Type;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.skywalking.apm.network.logging.v3.LogData;
+import org.apache.skywalking.oap.log.analyzer.dsl.Binding;
+import org.apache.skywalking.oap.log.analyzer.dsl.spec.AbstractSpec;
+import org.apache.skywalking.oap.log.analyzer.dsl.spec.extractor.ExtractorSpec;
+import org.apache.skywalking.oap.log.analyzer.dsl.spec.parser.JsonParserSpec;
+import org.apache.skywalking.oap.log.analyzer.dsl.spec.parser.TextParserSpec;
+import org.apache.skywalking.oap.log.analyzer.dsl.spec.parser.YamlParserSpec;
+import org.apache.skywalking.oap.log.analyzer.dsl.spec.sink.SinkSpec;
+import org.apache.skywalking.oap.log.analyzer.provider.LogAnalyzerModuleConfig;
+import org.apache.skywalking.oap.log.analyzer.provider.log.listener.LogAnalysisListenerFactory;
+import org.apache.skywalking.oap.log.analyzer.provider.log.listener.RecordAnalysisListener;
+import org.apache.skywalking.oap.log.analyzer.provider.log.listener.TrafficAnalysisListener;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.library.module.ModuleStartException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FilterSpec extends AbstractSpec {
+ private static final Logger LOGGER = LoggerFactory.getLogger(FilterSpec.class);
+
+ private final List<LogAnalysisListenerFactory> factories;
+
+ private final TextParserSpec textParser;
+
+ private final JsonParserSpec jsonParser;
+
+ private final YamlParserSpec yamlParser;
+
+ private final ExtractorSpec extractor;
+
+ private final SinkSpec sink;
+
+ private final Type parsedType;
+
+ public FilterSpec(final ModuleManager moduleManager,
+ final LogAnalyzerModuleConfig moduleConfig) throws ModuleStartException {
+ super(moduleManager, moduleConfig);
+
+ parsedType = new TypeToken<Map<String, Object>>() {
+ }.getType();
+
+ factories = Arrays.asList(
+ new RecordAnalysisListener.Factory(moduleManager(), moduleConfig()),
+ new TrafficAnalysisListener.Factory(moduleManager(), moduleConfig())
+ );
+
+ textParser = new TextParserSpec(moduleManager(), moduleConfig());
+ jsonParser = new JsonParserSpec(moduleManager(), moduleConfig());
+ yamlParser = new YamlParserSpec(moduleManager(), moduleConfig());
+
+ extractor = new ExtractorSpec(moduleManager(), moduleConfig());
+
+ sink = new SinkSpec(moduleManager(), moduleConfig());
+ }
+
+ @SuppressWarnings("unused")
+ public void text(final Closure<Void> cl) {
+ if (BINDING.get().shouldAbort()) {
+ return;
+ }
+ cl.setDelegate(textParser);
+ cl.call();
+ }
+
+ @SuppressWarnings("unused")
+ public void json(final Closure<Void> cl) {
+ if (BINDING.get().shouldAbort()) {
+ return;
+ }
+ cl.setDelegate(jsonParser);
+ cl.call();
+
+ final LogData.Builder logData = BINDING.get().log();
+ final Map<String, Object> parsed = jsonParser.create().fromJson(
+ logData.getBody().getJson().getJson(), parsedType
+ );
+
+ BINDING.get().parsed(parsed);
+ }
+
+ @SuppressWarnings({"unused", "unchecked"})
+ public void yaml(final Closure<Void> cl) {
+ if (BINDING.get().shouldAbort()) {
+ return;
+ }
+ cl.setDelegate(yamlParser);
+ cl.call();
+
+ final LogData.Builder logData = BINDING.get().log();
+ final Map<String, Object> parsed = (Map<String, Object>) yamlParser.create().load(
+ logData.getBody().getYaml().getYaml()
+ );
+
+ BINDING.get().parsed(parsed);
+ }
+
+ @SuppressWarnings("unused")
+ public void extractor(final Closure<Void> cl) {
+ if (BINDING.get().shouldAbort()) {
+ return;
+ }
+ cl.setDelegate(extractor);
+ cl.call();
+ }
+
+ @SuppressWarnings("unused")
+ public void sink(final Closure<Void> cl) {
+ if (BINDING.get().shouldAbort()) {
+ return;
+ }
+ cl.setDelegate(sink);
+ cl.call();
+
+ final Binding b = BINDING.get();
+ final LogData.Builder logData = b.log();
+
+ if (!b.shouldSave()) {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Log is dropped: {}", TextFormat.shortDebugString(logData));
+ }
+ return;
+ }
+
+ factories.stream()
+ .map(LogAnalysisListenerFactory::create)
+ .forEach(it -> it.parse(logData).build());
+ }
+
+ @SuppressWarnings("unused")
+ public void filter(final Closure<Void> cl) {
+ cl.call();
+ }
+}
diff --git a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/listener/LogAnalysisListenerFactory.java b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/spec/parser/JsonParserSpec.java
similarity index 58%
copy from oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/listener/LogAnalysisListenerFactory.java
copy to oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/spec/parser/JsonParserSpec.java
index 3d770df..6bb716d 100644
--- a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/listener/LogAnalysisListenerFactory.java
+++ b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/spec/parser/JsonParserSpec.java
@@ -6,25 +6,35 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
+ *
*/
-package org.apache.skywalking.oap.log.analyzer.provider.log.listener;
+package org.apache.skywalking.oap.log.analyzer.dsl.spec.parser;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import org.apache.skywalking.oap.log.analyzer.dsl.spec.AbstractSpec;
import org.apache.skywalking.oap.log.analyzer.provider.LogAnalyzerModuleConfig;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
-/**
- * LogAnalysisListenerFactory implementation creates the listener instance when required.
- * Every LogAnalysisListener could have its own creation factory.
- */
-public interface LogAnalysisListenerFactory {
+public class JsonParserSpec extends AbstractSpec {
+ private final GsonBuilder gsonBuilder;
+
+ public JsonParserSpec(final ModuleManager moduleManager,
+ final LogAnalyzerModuleConfig moduleConfig) {
+ super(moduleManager, moduleConfig);
+
+ gsonBuilder = new GsonBuilder();
+ }
- LogAnalysisListener create(ModuleManager moduleManager, LogAnalyzerModuleConfig moduleConfig);
+ public Gson create() {
+ return gsonBuilder.create();
+ }
}
diff --git a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/spec/parser/TextParserSpec.java b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/spec/parser/TextParserSpec.java
new file mode 100644
index 0000000..0ffccab
--- /dev/null
+++ b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/spec/parser/TextParserSpec.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.log.analyzer.dsl.spec.parser;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.skywalking.apm.network.logging.v3.LogData;
+import org.apache.skywalking.oap.log.analyzer.dsl.spec.AbstractSpec;
+import org.apache.skywalking.oap.log.analyzer.provider.LogAnalyzerModuleConfig;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+
+public class TextParserSpec extends AbstractSpec {
+ public TextParserSpec(final ModuleManager moduleManager,
+ final LogAnalyzerModuleConfig moduleConfig) {
+ super(moduleManager, moduleConfig);
+ }
+
+ @SuppressWarnings("unused")
+ public boolean regexp(final String regexp) {
+ return regexp(Pattern.compile(regexp));
+ }
+
+ public boolean regexp(final Pattern pattern) {
+ if (BINDING.get().shouldAbort()) {
+ return false;
+ }
+ final LogData.Builder log = BINDING.get().log();
+ final Matcher matcher = pattern.matcher(log.getBody().getText().getText());
+ final boolean matched = matcher.find();
+ if (matched) {
+ BINDING.get().parsed(matcher);
+ }
+ return matched;
+ }
+
+ public boolean grok(final String grok) {
+ // TODO
+ return false;
+ }
+
+}
diff --git a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/listener/LogAnalysisListenerFactory.java b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/spec/parser/YamlParserSpec.java
similarity index 51%
copy from oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/listener/LogAnalysisListenerFactory.java
copy to oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/spec/parser/YamlParserSpec.java
index 3d770df..2bb22b9 100644
--- a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/listener/LogAnalysisListenerFactory.java
+++ b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/spec/parser/YamlParserSpec.java
@@ -6,25 +6,38 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
+ *
*/
-package org.apache.skywalking.oap.log.analyzer.provider.log.listener;
+package org.apache.skywalking.oap.log.analyzer.dsl.spec.parser;
+import org.apache.skywalking.oap.log.analyzer.dsl.spec.AbstractSpec;
import org.apache.skywalking.oap.log.analyzer.provider.LogAnalyzerModuleConfig;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.yaml.snakeyaml.DumperOptions;
+import org.yaml.snakeyaml.LoaderOptions;
+import org.yaml.snakeyaml.Yaml;
+import org.yaml.snakeyaml.constructor.SafeConstructor;
+import org.yaml.snakeyaml.representer.Representer;
-/**
- * LogAnalysisListenerFactory implementation creates the listener instance when required.
- * Every LogAnalysisListener could have its own creation factory.
- */
-public interface LogAnalysisListenerFactory {
+public class YamlParserSpec extends AbstractSpec {
+ private final LoaderOptions loaderOptions;
+
+ public YamlParserSpec(final ModuleManager moduleManager,
+ final LogAnalyzerModuleConfig moduleConfig) {
+ super(moduleManager, moduleConfig);
+
+ loaderOptions = new LoaderOptions();
+ }
- LogAnalysisListener create(ModuleManager moduleManager, LogAnalyzerModuleConfig moduleConfig);
+ public Yaml create() {
+ return new Yaml(new SafeConstructor(), new Representer(), new DumperOptions(), loaderOptions);
+ }
}
diff --git a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/spec/sink/SamplerSpec.java b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/spec/sink/SamplerSpec.java
new file mode 100644
index 0000000..178fa06
--- /dev/null
+++ b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/spec/sink/SamplerSpec.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.log.analyzer.dsl.spec.sink;
+
+import groovy.lang.Closure;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.skywalking.oap.log.analyzer.dsl.spec.AbstractSpec;
+import org.apache.skywalking.oap.log.analyzer.dsl.spec.sink.sampler.RateLimitingSampler;
+import org.apache.skywalking.oap.log.analyzer.dsl.spec.sink.sampler.Sampler;
+import org.apache.skywalking.oap.log.analyzer.provider.LogAnalyzerModuleConfig;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SamplerSpec extends AbstractSpec {
+ private static final Logger LOGGER = LoggerFactory.getLogger(SamplerSpec.class);
+
+ private final Map<String, Sampler> samplers;
+ private final RateLimitingSampler.ResetHandler rlsResetHandler;
+
+ public SamplerSpec(final ModuleManager moduleManager,
+ final LogAnalyzerModuleConfig moduleConfig) {
+ super(moduleManager, moduleConfig);
+
+ samplers = new ConcurrentHashMap<>();
+ rlsResetHandler = new RateLimitingSampler.ResetHandler();
+ }
+
+ @SuppressWarnings("unused")
+ public void rateLimit(final String id, final Closure<Void> cl) {
+ if (BINDING.get().shouldAbort()) {
+ return;
+ }
+ final RateLimitingSampler newSampler = new RateLimitingSampler(rlsResetHandler);
+ cl.setDelegate(newSampler);
+ cl.call();
+
+ final Sampler sampler = samplers.computeIfAbsent(id, $ -> Sampler.NOOP);
+ if (Objects.equals(sampler, newSampler)) { // Unchanged
+ sampleWith(sampler);
+ return;
+ }
+
+ try {
+ sampler.close();
+ } catch (final Exception e) {
+ LOGGER.error("Failed to cancel old sampler: {}", sampler, e);
+ }
+
+ samplers.put(id, newSampler.start());
+
+ sampleWith(newSampler);
+ }
+
+ private void sampleWith(final Sampler sampler) {
+ if (BINDING.get().shouldAbort()) {
+ return;
+ }
+ if (sampler.sample()) {
+ BINDING.get().save();
+ } else {
+ BINDING.get().drop();
+ }
+ }
+
+}
diff --git a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/spec/sink/SinkSpec.java b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/spec/sink/SinkSpec.java
new file mode 100644
index 0000000..ee78be9
--- /dev/null
+++ b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/spec/sink/SinkSpec.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.log.analyzer.dsl.spec.sink;
+
+import groovy.lang.Closure;
+import org.apache.skywalking.oap.log.analyzer.dsl.spec.AbstractSpec;
+import org.apache.skywalking.oap.log.analyzer.provider.LogAnalyzerModuleConfig;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+
+public class SinkSpec extends AbstractSpec {
+
+ private final SamplerSpec sampler;
+
+ public SinkSpec(final ModuleManager moduleManager,
+ final LogAnalyzerModuleConfig moduleConfig) {
+ super(moduleManager, moduleConfig);
+
+ sampler = new SamplerSpec(moduleManager(), moduleConfig());
+ }
+
+ @SuppressWarnings("unused")
+ public void sampler(final Closure<Void> cl) {
+ if (BINDING.get().shouldAbort()) {
+ return;
+ }
+ cl.setDelegate(sampler);
+ cl.call();
+ }
+
+ @SuppressWarnings("unused")
+ public void enforcer(final Closure<Void> cl) {
+ if (BINDING.get().shouldAbort()) {
+ return;
+ }
+ BINDING.get().save();
+ }
+
+ @SuppressWarnings("unused")
+ public void dropper(final Closure<Void> cl) {
+ if (BINDING.get().shouldAbort()) {
+ return;
+ }
+ BINDING.get().drop();
+ }
+}
diff --git a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/spec/sink/sampler/RateLimitingSampler.java b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/spec/sink/sampler/RateLimitingSampler.java
new file mode 100644
index 0000000..287fe24
--- /dev/null
+++ b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/spec/sink/sampler/RateLimitingSampler.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.log.analyzer.dsl.spec.sink.sampler;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.experimental.Accessors;
+import lombok.extern.slf4j.Slf4j;
+
+@Accessors(fluent = true)
+@EqualsAndHashCode(of = {"qps"})
+public class RateLimitingSampler implements Sampler {
+ @Getter
+ @Setter
+ private volatile int qps;
+
+ private final AtomicInteger factor = new AtomicInteger();
+
+ private final ResetHandler resetHandler;
+
+ public RateLimitingSampler(final ResetHandler resetHandler) {
+ this.resetHandler = resetHandler;
+ }
+
+ @Override
+ public RateLimitingSampler start() {
+ resetHandler.start(this);
+ return this;
+ }
+
+ @Override
+ public void close() {
+ resetHandler.close(this);
+ }
+
+ @Override
+ public boolean sample() {
+ return factor.getAndIncrement() < qps;
+ }
+
+ @Override
+ public RateLimitingSampler reset() {
+ factor.set(0);
+ return this;
+ }
+
+ @Slf4j
+ public static class ResetHandler {
+ private final Set<Sampler> samplers = new HashSet<>();
+
+ private volatile ScheduledFuture<?> future;
+
+ private volatile boolean started = false;
+
+ private synchronized void start(final Sampler sampler) {
+ samplers.add(sampler);
+
+ if (!started) {
+ future = Executors.newSingleThreadScheduledExecutor()
+ .scheduleAtFixedRate(this::reset, 1, 1, TimeUnit.SECONDS);
+ started = true;
+ }
+ }
+
+ private synchronized void close(final Sampler sampler) {
+ samplers.remove(sampler);
+
+ if (samplers.isEmpty() && future != null) {
+ future.cancel(true);
+ started = false;
+ }
+ }
+
+ private synchronized void reset() {
+ samplers.forEach(sampler -> {
+ try {
+ sampler.reset();
+ } catch (final Exception e) {
+ log.error("Failed to reset sampler {}.", sampler, e);
+ }
+ });
+ }
+ }
+}
diff --git a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/LogAnalyzerModuleConfig.java b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/spec/sink/sampler/Sampler.java
similarity index 60%
copy from oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/LogAnalyzerModuleConfig.java
copy to oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/spec/sink/sampler/Sampler.java
index a39d813..61e315c 100644
--- a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/LogAnalyzerModuleConfig.java
+++ b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/spec/sink/sampler/Sampler.java
@@ -6,18 +6,37 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
+ *
*/
-package org.apache.skywalking.oap.log.analyzer.provider;
+package org.apache.skywalking.oap.log.analyzer.dsl.spec.sink.sampler;
+
+public interface Sampler extends AutoCloseable {
+ Sampler NOOP = new Sampler() {
+ @Override
+ public boolean sample() {
+ return false;
+ }
+
+ @Override
+ public void close() {
+ }
+ };
+
+ boolean sample();
-import org.apache.skywalking.oap.server.library.module.ModuleConfig;
+ default Sampler start() {
+ return this;
+ }
-public class LogAnalyzerModuleConfig extends ModuleConfig {
+ default Sampler reset() {
+ return this;
+ }
}
diff --git a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/LogAnalyzerModuleConfig.java b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/LALConfig.java
similarity index 81%
copy from oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/LogAnalyzerModuleConfig.java
copy to oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/LALConfig.java
index a39d813..89c331f 100644
--- a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/LogAnalyzerModuleConfig.java
+++ b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/LALConfig.java
@@ -6,18 +6,23 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
+ *
*/
package org.apache.skywalking.oap.log.analyzer.provider;
-import org.apache.skywalking.oap.server.library.module.ModuleConfig;
+import lombok.Data;
+
+@Data
+public class LALConfig {
+ private String name;
-public class LogAnalyzerModuleConfig extends ModuleConfig {
+ private String dsl;
}
diff --git a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/LALConfigs.java b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/LALConfigs.java
new file mode 100644
index 0000000..3ed21d8
--- /dev/null
+++ b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/LALConfigs.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.log.analyzer.provider;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.Reader;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.oap.server.library.module.ModuleStartException;
+import org.apache.skywalking.oap.server.library.util.ResourceUtils;
+import org.yaml.snakeyaml.Yaml;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.io.Files.getNameWithoutExtension;
+import static org.apache.skywalking.apm.util.StringUtil.isNotBlank;
+import static org.apache.skywalking.oap.server.library.util.CollectionUtils.isEmpty;
+
+@Data
+@Slf4j
+public class LALConfigs {
+ private List<LALConfig> rules;
+
+ public static List<LALConfigs> load(final String path, final List<String> files) throws Exception {
+ if (isEmpty(files)) {
+ return Collections.emptyList();
+ }
+
+ checkArgument(isNotBlank(path), "path cannot be blank");
+
+ try {
+ final File[] rules = ResourceUtils.getPathFiles(path);
+
+ return Arrays.stream(rules)
+ .filter(File::isFile)
+ .filter(it -> {
+ //noinspection UnstableApiUsage
+ return files.contains(getNameWithoutExtension(it.getName()));
+ })
+ .map(f -> {
+ try (final Reader r = new FileReader(f)) {
+ return new Yaml().loadAs(r, LALConfigs.class);
+ } catch (IOException e) {
+ log.debug("Failed to read file {}", f, e);
+ }
+ return null;
+ })
+ .filter(Objects::nonNull)
+ .collect(Collectors.toList());
+ } catch (FileNotFoundException e) {
+ throw new ModuleStartException("Failed to load LAL config rules", e);
+ }
+ }
+}
diff --git a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/LogAnalyzerModuleConfig.java b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/LogAnalyzerModuleConfig.java
index a39d813..3b11355 100644
--- a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/LogAnalyzerModuleConfig.java
+++ b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/LogAnalyzerModuleConfig.java
@@ -17,7 +17,52 @@
package org.apache.skywalking.oap.log.analyzer.provider;
+import com.google.common.base.Splitter;
+import com.google.common.base.Strings;
+import java.util.List;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import org.apache.skywalking.oap.meter.analyzer.prometheus.rule.Rule;
+import org.apache.skywalking.oap.meter.analyzer.prometheus.rule.Rules;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
+import org.apache.skywalking.oap.server.library.module.ModuleStartException;
+import static java.util.Objects.nonNull;
+
+@EqualsAndHashCode(callSuper = false)
public class LogAnalyzerModuleConfig extends ModuleConfig {
+ @Getter
+ @Setter
+ private String lalPath = "lal";
+
+ @Getter
+ @Setter
+ private String malPath = "log-mal-rules";
+
+ @Getter
+ @Setter
+ private String lalFiles = "default.yaml";
+
+ @Getter
+ @Setter
+ private String malFiles;
+
+ private List<Rule> meterConfigs;
+
+ public List<String> lalFiles() {
+ return Splitter.on(",").omitEmptyStrings().splitToList(Strings.nullToEmpty(getLalFiles()));
+ }
+
+ public List<Rule> malConfigs() throws ModuleStartException {
+ if (nonNull(meterConfigs)) {
+ return meterConfigs;
+ }
+ final List<String> files = Splitter.on(",")
+ .omitEmptyStrings()
+ .splitToList(Strings.nullToEmpty(getMalFiles()));
+ meterConfigs = Rules.loadRules(getMalPath(), files);
+
+ return meterConfigs;
+ }
}
diff --git a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/LogAnalyzerModuleProvider.java b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/LogAnalyzerModuleProvider.java
index 5e16064..ecf9d0e 100644
--- a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/LogAnalyzerModuleProvider.java
+++ b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/LogAnalyzerModuleProvider.java
@@ -20,8 +20,7 @@ package org.apache.skywalking.oap.log.analyzer.provider;
import org.apache.skywalking.oap.log.analyzer.module.LogAnalyzerModule;
import org.apache.skywalking.oap.log.analyzer.provider.log.ILogAnalyzerService;
import org.apache.skywalking.oap.log.analyzer.provider.log.LogAnalyzerServiceImpl;
-import org.apache.skywalking.oap.log.analyzer.provider.log.listener.RecordAnalysisListener;
-import org.apache.skywalking.oap.log.analyzer.provider.log.listener.TrafficAnalysisListener;
+import org.apache.skywalking.oap.log.analyzer.provider.log.listener.LogFilterListener;
import org.apache.skywalking.oap.server.configuration.api.ConfigurationModule;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
@@ -59,12 +58,15 @@ public class LogAnalyzerModuleProvider extends ModuleProvider {
@Override
public void start() throws ServiceNotProvidedException, ModuleStartException {
- logAnalyzerService.addListenerFactory(new RecordAnalysisListener.Factory(getManager(), moduleConfig));
- logAnalyzerService.addListenerFactory(new TrafficAnalysisListener.Factory(getManager(), moduleConfig));
+ try {
+ logAnalyzerService.addListenerFactory(new LogFilterListener.Factory(getManager(), moduleConfig));
+ } catch (final Exception e) {
+ throw new ModuleStartException("Failed to create LAL listener.", e);
+ }
}
@Override
- public void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleStartException {
+ public void notifyAfterCompleted() throws ServiceNotProvidedException {
}
diff --git a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/LogAnalyzer.java b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/LogAnalyzer.java
index 73b26f0..172b82f 100644
--- a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/LogAnalyzer.java
+++ b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/LogAnalyzer.java
@@ -65,6 +65,6 @@ public class LogAnalyzer {
private void createListeners() {
factoryManager.getLogAnalysisListenerFactories()
- .forEach(factory -> listeners.add(factory.create(moduleManager, moduleConfig)));
+ .forEach(factory -> listeners.add(factory.create()));
}
}
diff --git a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/listener/LogAnalysisListener.java b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/listener/LogAnalysisListener.java
index 3186bfa..8e1e96f 100644
--- a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/listener/LogAnalysisListener.java
+++ b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/listener/LogAnalysisListener.java
@@ -31,6 +31,7 @@ public interface LogAnalysisListener {
/**
* Parse the raw data from the probe.
+ * @return {@code this} for chaining.
*/
- void parse(LogData.Builder logData);
+ LogAnalysisListener parse(LogData.Builder logData);
}
diff --git a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/listener/LogAnalysisListenerFactory.java b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/listener/LogAnalysisListenerFactory.java
index 3d770df..78d1e9f 100644
--- a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/listener/LogAnalysisListenerFactory.java
+++ b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/listener/LogAnalysisListenerFactory.java
@@ -17,14 +17,11 @@
package org.apache.skywalking.oap.log.analyzer.provider.log.listener;
-import org.apache.skywalking.oap.log.analyzer.provider.LogAnalyzerModuleConfig;
-import org.apache.skywalking.oap.server.library.module.ModuleManager;
-
/**
* LogAnalysisListenerFactory implementation creates the listener instance when required.
* Every LogAnalysisListener could have its own creation factory.
*/
public interface LogAnalysisListenerFactory {
- LogAnalysisListener create(ModuleManager moduleManager, LogAnalyzerModuleConfig moduleConfig);
+ LogAnalysisListener create();
}
diff --git a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/listener/LogFilterListener.java b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/listener/LogFilterListener.java
new file mode 100644
index 0000000..e5d715b
--- /dev/null
+++ b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/listener/LogFilterListener.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.log.analyzer.provider.log.listener;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.apm.network.logging.v3.LogData;
+import org.apache.skywalking.oap.log.analyzer.dsl.Binding;
+import org.apache.skywalking.oap.log.analyzer.dsl.DSL;
+import org.apache.skywalking.oap.log.analyzer.provider.LALConfig;
+import org.apache.skywalking.oap.log.analyzer.provider.LALConfigs;
+import org.apache.skywalking.oap.log.analyzer.provider.LogAnalyzerModuleConfig;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+
+@Slf4j
+@RequiredArgsConstructor
+public class LogFilterListener implements LogAnalysisListener {
+ private final List<DSL> dsls;
+
+ @Override
+ public void build() {
+ dsls.forEach(dsl -> {
+ try {
+ dsl.evaluate();
+ } catch (final Exception e) {
+ log.warn("Failed to evaluate dsl: {}", dsl, e);
+ }
+ });
+ }
+
+ @Override
+ public LogAnalysisListener parse(final LogData.Builder logData) {
+ dsls.forEach(dsl -> dsl.bind(new Binding().log(logData.build())));
+ return this;
+ }
+
+ public static class Factory implements LogAnalysisListenerFactory {
+ private final List<DSL> dsls;
+
+ public Factory(final ModuleManager moduleManager, final LogAnalyzerModuleConfig config) throws Exception {
+ dsls = new ArrayList<>();
+
+ final List<LALConfig> configList = LALConfigs.load(config.getLalPath(), config.lalFiles())
+ .stream()
+ .flatMap(it -> it.getRules().stream())
+ .collect(Collectors.toList());
+ for (final LALConfig c : configList) {
+ dsls.add(DSL.of(moduleManager, config, c.getDsl()));
+ }
+ }
+
+ @Override
+ public LogAnalysisListener create() {
+ return new LogFilterListener(dsls);
+ }
+ }
+}
diff --git a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/listener/RecordAnalysisListener.java b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/listener/RecordAnalysisListener.java
index 94e8fd5..cf5b667 100644
--- a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/listener/RecordAnalysisListener.java
+++ b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/listener/RecordAnalysisListener.java
@@ -57,7 +57,7 @@ public class RecordAnalysisListener implements LogAnalysisListener {
}
@Override
- public void parse(final LogData.Builder logData) {
+ public LogAnalysisListener parse(final LogData.Builder logData) {
LogDataBody body = logData.getBody();
log.setUniqueId(UUID.randomUUID().toString().replace("-", ""));
// timestamp
@@ -105,6 +105,7 @@ public class RecordAnalysisListener implements LogAnalysisListener {
log.setTagsRawData(logData.getTags().toByteArray());
}
log.getTags().addAll(appendSearchableTags(logData));
+ return this;
}
private Collection<Tag> appendSearchableTags(LogData.Builder logData) {
@@ -112,9 +113,7 @@ public class RecordAnalysisListener implements LogAnalysisListener {
logData.getTags().getDataList().forEach(tag -> {
if (searchableTagKeys.contains(tag.getKey())) {
final Tag logTag = new Tag(tag.getKey(), tag.getValue());
- if (!logTags.contains(logTag)) {
- logTags.add(logTag);
- }
+ logTags.add(logTag);
}
});
return logTags;
@@ -139,8 +138,7 @@ public class RecordAnalysisListener implements LogAnalysisListener {
}
@Override
- public LogAnalysisListener create(final ModuleManager moduleManager,
- final LogAnalyzerModuleConfig moduleConfig) {
+ public LogAnalysisListener create() {
return new RecordAnalysisListener(sourceReceiver, namingControl, searchableTagKeys);
}
}
diff --git a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/listener/TrafficAnalysisListener.java b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/listener/TrafficAnalysisListener.java
index 45c149e..0cf6590 100644
--- a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/listener/TrafficAnalysisListener.java
+++ b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/listener/TrafficAnalysisListener.java
@@ -61,7 +61,7 @@ public class TrafficAnalysisListener implements LogAnalysisListener {
}
@Override
- public void parse(final LogData.Builder logData) {
+ public LogAnalysisListener parse(final LogData.Builder logData) {
final long timeBucket = TimeBucket.getTimeBucket(System.currentTimeMillis(), DownSampling.Minute);
// to service traffic
String serviceName = namingControl.formatServiceName(logData.getService());
@@ -85,6 +85,7 @@ public class TrafficAnalysisListener implements LogAnalysisListener {
endpointMeta.setEndpoint(namingControl.formatEndpointName(serviceName, logData.getEndpoint()));
endpointMeta.setTimeBucket(timeBucket);
}
+ return this;
}
public static class Factory implements LogAnalysisListenerFactory {
@@ -96,13 +97,12 @@ public class TrafficAnalysisListener implements LogAnalysisListener {
.provider()
.getService(SourceReceiver.class);
this.namingControl = moduleManager.find(CoreModule.NAME)
- .provider().
- getService(NamingControl.class);
+ .provider()
+ .getService(NamingControl.class);
}
@Override
- public LogAnalysisListener create(final ModuleManager moduleManager,
- final LogAnalyzerModuleConfig moduleConfig) {
+ public LogAnalysisListener create() {
return new TrafficAnalysisListener(sourceReceiver, namingControl);
}
}
diff --git a/oap-server/server-bootstrap/pom.xml b/oap-server/server-bootstrap/pom.xml
index 306cf3d..9fac393 100644
--- a/oap-server/server-bootstrap/pom.xml
+++ b/oap-server/server-bootstrap/pom.xml
@@ -283,6 +283,8 @@
<exclude>otel-oc-rules/</exclude>
<exclude>ui-initialized-templates/</exclude>
<exclude>zabbix-rules/</exclude>
+ <exclude>lal/</exclude>
+ <exclude>log-mal-rules/</exclude>
</excludes>
</configuration>
</plugin>
diff --git a/oap-server/server-bootstrap/src/main/resources/application.yml b/oap-server/server-bootstrap/src/main/resources/application.yml
index 9b5779b..6ff73d9 100755
--- a/oap-server/server-bootstrap/src/main/resources/application.yml
+++ b/oap-server/server-bootstrap/src/main/resources/application.yml
@@ -230,6 +230,8 @@ agent-analyzer:
log-analyzer:
selector: ${SW_LOG_ANALYZER:default}
default:
+ lalFiles: ${SW_LOG_LAL_FILES:default}
+ malFiles: ${SW_LOG_MAL_FILES:""}
event-analyzer:
selector: ${SW_EVENT_ANALYZER:default}
diff --git a/test/e2e/e2e-test/docker/log/docker-compose.influxdb.yml b/oap-server/server-bootstrap/src/main/resources/lal/default.yaml
similarity index 56%
copy from test/e2e/e2e-test/docker/log/docker-compose.influxdb.yml
copy to oap-server/server-bootstrap/src/main/resources/lal/default.yaml
index 2cb6abb..fb18825 100644
--- a/test/e2e/e2e-test/docker/log/docker-compose.influxdb.yml
+++ b/oap-server/server-bootstrap/src/main/resources/lal/default.yaml
@@ -13,38 +13,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-version: '2.1'
-
-services:
- influxdb:
- image: influxdb:1.7.9
- expose:
- - 8086
- networks:
- - e2e
- healthcheck:
- test: ["CMD", "bash", "-c", "cat < /dev/null > /dev/tcp/127.0.0.1/8086"]
- interval: 5s
- timeout: 60s
- retries: 120
-
- oap:
- extends:
- file: ../base-compose.yml
- service: oap
- environment:
- SW_STORAGE: influxdb
- depends_on:
- influxdb:
- condition: service_healthy
-
- provider:
- extends:
- file: ../base-compose.yml
- service: provider
- depends_on:
- oap:
- condition: service_healthy
-
-networks:
- e2e:
\ No newline at end of file
+# The default LAL script to save all logs, behaving like the versions before 8.5.0.
+rules:
+ - name: default
+ dsl: |
+ filter {
+ sink {
+ }
+ }
diff --git a/test/e2e/e2e-test/docker/log/docker-compose.influxdb.yml b/oap-server/server-bootstrap/src/main/resources/log-mal-rules/placeholder.yaml
similarity index 56%
copy from test/e2e/e2e-test/docker/log/docker-compose.influxdb.yml
copy to oap-server/server-bootstrap/src/main/resources/log-mal-rules/placeholder.yaml
index 2cb6abb..034440c 100644
--- a/test/e2e/e2e-test/docker/log/docker-compose.influxdb.yml
+++ b/oap-server/server-bootstrap/src/main/resources/log-mal-rules/placeholder.yaml
@@ -13,38 +13,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-version: '2.1'
-
-services:
- influxdb:
- image: influxdb:1.7.9
- expose:
- - 8086
- networks:
- - e2e
- healthcheck:
- test: ["CMD", "bash", "-c", "cat < /dev/null > /dev/tcp/127.0.0.1/8086"]
- interval: 5s
- timeout: 60s
- retries: 120
-
- oap:
- extends:
- file: ../base-compose.yml
- service: oap
- environment:
- SW_STORAGE: influxdb
- depends_on:
- influxdb:
- condition: service_healthy
-
- provider:
- extends:
- file: ../base-compose.yml
- service: provider
- depends_on:
- oap:
- condition: service_healthy
-
-networks:
- e2e:
\ No newline at end of file
+# Refer to examples in config-examples/log-mal.yaml
diff --git a/test/e2e/e2e-test/docker/log/docker-compose.es6.yml b/test/e2e/e2e-test/docker/log/docker-compose.es6.yml
index 595f976..04b2cf9 100644
--- a/test/e2e/e2e-test/docker/log/docker-compose.es6.yml
+++ b/test/e2e/e2e-test/docker/log/docker-compose.es6.yml
@@ -25,7 +25,7 @@ services:
environment:
- discovery.type=single-node
healthcheck:
- test: [ "CMD", "bash", "-c", "cat < /dev/null > /dev/tcp/127.0.0.1/9200" ]
+ test: ["CMD", "bash", "-c", "cat < /dev/null > /dev/tcp/127.0.0.1/9200"]
interval: 5s
timeout: 60s
retries: 120
@@ -36,6 +36,11 @@ services:
service: oap
environment:
SW_STORAGE: elasticsearch
+ SW_LOG_LAL_FILES: test
+ SW_LOG_MAL_FILES: test
+ volumes:
+ - ./lal.yaml:/skywalking/config/lal/test.yaml
+ - ./log-mal.yaml:/skywalking/config/log-mal-rules/test.yaml
depends_on:
es:
condition: service_healthy
@@ -48,4 +53,4 @@ services:
oap:
condition: service_healthy
networks:
- e2e:
\ No newline at end of file
+ e2e:
diff --git a/test/e2e/e2e-test/docker/log/docker-compose.es7.yml b/test/e2e/e2e-test/docker/log/docker-compose.es7.yml
index 37adf3f..89bf372 100644
--- a/test/e2e/e2e-test/docker/log/docker-compose.es7.yml
+++ b/test/e2e/e2e-test/docker/log/docker-compose.es7.yml
@@ -36,6 +36,11 @@ services:
service: oap-es7
environment:
SW_STORAGE: elasticsearch7
+ SW_LOG_LAL_FILES: test
+ SW_LOG_MAL_FILES: test
+ volumes:
+ - ./lal.yaml:/skywalking/config/lal/test.yaml
+ - ./log-mal.yaml:/skywalking/config/log-mal-rules/test.yaml
depends_on:
es:
condition: service_healthy
@@ -48,4 +53,4 @@ services:
oap:
condition: service_healthy
networks:
- e2e:
\ No newline at end of file
+ e2e:
diff --git a/test/e2e/e2e-test/docker/log/docker-compose.h2.yml b/test/e2e/e2e-test/docker/log/docker-compose.h2.yml
index 128621b..c184f99 100644
--- a/test/e2e/e2e-test/docker/log/docker-compose.h2.yml
+++ b/test/e2e/e2e-test/docker/log/docker-compose.h2.yml
@@ -25,7 +25,7 @@ services:
expose:
- 1521
healthcheck:
- test: ["CMD", "sh", "-c", "nc -z 127.0.0.1 1521"]
+ test: [ "CMD", "sh", "-c", "nc -z 127.0.0.1 1521" ]
interval: 5s
timeout: 60s
retries: 120
@@ -37,6 +37,11 @@ services:
environment:
SW_STORAGE: h2
SW_STORAGE_H2_URL: jdbc:h2:tcp://h2db:1521/skywalking-oap-db
+ SW_LOG_LAL_FILES: test
+ SW_LOG_MAL_FILES: test
+ volumes:
+ - ./lal.yaml:/skywalking/config/lal/test.yaml
+ - ./log-mal.yaml:/skywalking/config/log-mal-rules/test.yaml
depends_on:
h2db:
condition: service_healthy
@@ -50,4 +55,4 @@ services:
condition: service_healthy
networks:
- e2e:
\ No newline at end of file
+ e2e:
diff --git a/test/e2e/e2e-test/docker/log/docker-compose.influxdb.yml b/test/e2e/e2e-test/docker/log/docker-compose.influxdb.yml
index 2cb6abb..9f905ef 100644
--- a/test/e2e/e2e-test/docker/log/docker-compose.influxdb.yml
+++ b/test/e2e/e2e-test/docker/log/docker-compose.influxdb.yml
@@ -34,6 +34,11 @@ services:
service: oap
environment:
SW_STORAGE: influxdb
+ SW_LOG_LAL_FILES: test
+ SW_LOG_MAL_FILES: test
+ volumes:
+ - ./lal.yaml:/skywalking/config/lal/test.yaml
+ - ./log-mal.yaml:/skywalking/config/log-mal-rules/test.yaml
depends_on:
influxdb:
condition: service_healthy
@@ -47,4 +52,4 @@ services:
condition: service_healthy
networks:
- e2e:
\ No newline at end of file
+ e2e:
diff --git a/test/e2e/e2e-test/docker/log/docker-compose.mysql.yml b/test/e2e/e2e-test/docker/log/docker-compose.mysql.yml
index 4661e5e..9a0d94a 100644
--- a/test/e2e/e2e-test/docker/log/docker-compose.mysql.yml
+++ b/test/e2e/e2e-test/docker/log/docker-compose.mysql.yml
@@ -38,6 +38,11 @@ services:
service: oap
environment:
SW_STORAGE: mysql
+ SW_LOG_LAL_FILES: test
+ SW_LOG_MAL_FILES: test
+ volumes:
+ - ./lal.yaml:/skywalking/config/lal/test.yaml
+ - ./log-mal.yaml:/skywalking/config/log-mal-rules/test.yaml
depends_on:
mysql:
condition: service_healthy
@@ -52,4 +57,4 @@ services:
condition: service_healthy
networks:
- e2e:
\ No newline at end of file
+ e2e:
diff --git a/test/e2e/e2e-test/docker/log/docker-compose.influxdb.yml b/test/e2e/e2e-test/docker/log/lal.yaml
similarity index 56%
copy from test/e2e/e2e-test/docker/log/docker-compose.influxdb.yml
copy to test/e2e/e2e-test/docker/log/lal.yaml
index 2cb6abb..d0d6a19 100644
--- a/test/e2e/e2e-test/docker/log/docker-compose.influxdb.yml
+++ b/test/e2e/e2e-test/docker/log/lal.yaml
@@ -13,38 +13,21 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-version: '2.1'
-
-services:
- influxdb:
- image: influxdb:1.7.9
- expose:
- - 8086
- networks:
- - e2e
- healthcheck:
- test: ["CMD", "bash", "-c", "cat < /dev/null > /dev/tcp/127.0.0.1/8086"]
- interval: 5s
- timeout: 60s
- retries: 120
-
- oap:
- extends:
- file: ../base-compose.yml
- service: oap
- environment:
- SW_STORAGE: influxdb
- depends_on:
- influxdb:
- condition: service_healthy
-
- provider:
- extends:
- file: ../base-compose.yml
- service: provider
- depends_on:
- oap:
- condition: service_healthy
-
-networks:
- e2e:
\ No newline at end of file
+rules:
+ - name: example
+ dsl: |
+ filter {
+ text {
+ regexp $/(?s)(?<timestamp>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}.\d{3}) \[TID:(?<tid>.+?)] \[(?<thread>.+?)] (?<level>\w{4,}) (?<logger>.{1,36}) (?<msg>.+)/$
+ }
+ extractor {
+ metrics {
+ timestamp log.timestamp
+ labels level: parsed.level, service: log.service, instance: log.serviceInstance
+ name "log_count"
+ value 1
+ }
+ }
+ sink {
+ }
+ }
diff --git a/test/e2e/e2e-test/docker/log/log-mal.yaml b/test/e2e/e2e-test/docker/log/log-mal.yaml
new file mode 100644
index 0000000..b2f8ff8
--- /dev/null
+++ b/test/e2e/e2e-test/docker/log/log-mal.yaml
@@ -0,0 +1,36 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# This will parse a textual representation of a duration. The formats
+# accepted are based on the ISO-8601 duration format {@code PnDTnHnMn.nS}
+# with days considered to be exactly 24 hours.
+# <p>
+# Examples:
+# <pre>
+# "PT20.345S" -- parses as "20.345 seconds"
+# "PT15M" -- parses as "15 minutes" (where a minute is 60 seconds)
+# "PT10H" -- parses as "10 hours" (where an hour is 3600 seconds)
+# "P2D" -- parses as "2 days" (where a day is 24 hours or 86400 seconds)
+# "P2DT3H4M" -- parses as "2 days, 3 hours and 4 minutes"
+# "P-6H3M" -- parses as "-6 hours and +3 minutes"
+# "-P6H3M" -- parses as "-6 hours and -3 minutes"
+# "-P-6H+3M" -- parses as "+6 hours and -3 minutes"
+# </pre>
+
+expSuffix: instance(['service'], ['instance'])
+metricPrefix: log
+metricsRules:
+ - name: count_info
+ exp: log_count.tagEqual('level', 'INFO').sum(['service', 'instance'])
diff --git a/test/e2e/e2e-test/src/test/java/org/apache/skywalking/e2e/log/LogE2E.java b/test/e2e/e2e-test/src/test/java/org/apache/skywalking/e2e/log/LogE2E.java
index a3ffb4d..23cb4d1 100644
--- a/test/e2e/e2e-test/src/test/java/org/apache/skywalking/e2e/log/LogE2E.java
+++ b/test/e2e/e2e-test/src/test/java/org/apache/skywalking/e2e/log/LogE2E.java
@@ -25,13 +25,15 @@ import org.apache.skywalking.e2e.annotation.DockerCompose;
import org.apache.skywalking.e2e.base.SkyWalkingE2E;
import org.apache.skywalking.e2e.base.SkyWalkingTestAdapter;
import org.apache.skywalking.e2e.common.HostAndPort;
+import org.apache.skywalking.e2e.metrics.AtLeastOneOfMetricsMatcher;
+import org.apache.skywalking.e2e.metrics.MetricsValueMatcher;
+import org.apache.skywalking.e2e.metrics.ReadMetrics;
+import org.apache.skywalking.e2e.metrics.ReadMetricsQuery;
import org.apache.skywalking.e2e.retryable.RetryableTest;
import org.apache.skywalking.e2e.service.Service;
import org.apache.skywalking.e2e.service.ServicesMatcher;
import org.apache.skywalking.e2e.service.ServicesQuery;
-import org.apache.skywalking.e2e.service.endpoint.EndpointQuery;
-import org.apache.skywalking.e2e.service.endpoint.Endpoints;
-import org.apache.skywalking.e2e.service.endpoint.EndpointsMatcher;
+import org.apache.skywalking.e2e.service.instance.Instance;
import org.apache.skywalking.e2e.service.instance.Instances;
import org.apache.skywalking.e2e.service.instance.InstancesMatcher;
import org.apache.skywalking.e2e.service.instance.InstancesQuery;
@@ -136,12 +138,26 @@ public class LogE2E extends SkyWalkingTestAdapter {
LOGGER.info("instances: {}", instances);
load("expected/log/instances.yml").as(InstancesMatcher.class).verify(instances);
- }
- private void verifyServiceEndpoints(final Service service) throws Exception {
- final Endpoints endpoints = graphql.endpoints(new EndpointQuery().serviceId(service.getKey()));
- LOGGER.info("endpoints: {}", endpoints);
+ verifyInstanceMetrics(service, instances);
+ }
- load("expected/log/endpoints.yml").as(EndpointsMatcher.class).verify(endpoints);
+ private void verifyInstanceMetrics(final Service service, final Instances instances) throws Exception {
+ for (Instance instance : instances.getInstances()) {
+ final String metricsName = "log_count_info";
+ LOGGER.info("verifying service instance response time: {}", instance);
+ final ReadMetrics instanceMetrics = graphql.readMetrics(
+ new ReadMetricsQuery().stepByMinute().metricsName(metricsName)
+ .serviceName(service.getLabel()).instanceName(instance.getLabel())
+ );
+
+ LOGGER.info("{}: {}", metricsName, instanceMetrics);
+ final AtLeastOneOfMetricsMatcher instanceRespTimeMatcher = new AtLeastOneOfMetricsMatcher();
+ final MetricsValueMatcher greaterThanZero = new MetricsValueMatcher();
+ greaterThanZero.setValue("gt 0");
+ instanceRespTimeMatcher.setValue(greaterThanZero);
+ instanceRespTimeMatcher.verify(instanceMetrics.getValues());
+ }
}
+
}