You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ke...@apache.org on 2021/02/23 12:07:15 UTC

[skywalking] branch master updated: Introduce log analysis language (LAL) (#6388)

This is an automated email from the ASF dual-hosted git repository.

kezhenxu94 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new 96f1c12  Introduce log analysis language (LAL) (#6388)
96f1c12 is described below

commit 96f1c12159297f67cd6cd724b552b182bf44c573
Author: Zhenxu Ke <ke...@apache.org>
AuthorDate: Tue Feb 23 20:06:48 2021 +0800

    Introduce log analysis language (LAL) (#6388)
---
 CHANGES.md                                         |   1 +
 apm-dist-es7/src/main/assembly/binary-es7.xml      |  10 +-
 apm-dist/src/main/assembly/binary.xml              |  10 +-
 .../alarm-settings.yml}                            |   0
 dist-material/config-examples/lal.yaml             |  50 +++
 dist-material/config-examples/log-mal.yaml         |  37 +++
 docs/en/concepts-and-designs/lal.md                | 342 +++++++++++++++++++++
 oap-server/analyzer/log-analyzer/pom.xml           |  11 +-
 .../skywalking/oap/log/analyzer/dsl/Binding.java   | 117 +++++++
 .../skywalking/oap/log/analyzer/dsl/DSL.java       |  58 ++++
 .../oap/log/analyzer/dsl/spec/AbstractSpec.java    |  52 ++++
 .../analyzer/dsl/spec/extractor/ExtractorSpec.java | 197 ++++++++++++
 .../log/analyzer/dsl/spec/filter/FilterSpec.java   | 160 ++++++++++
 .../spec/parser/JsonParserSpec.java}               |  26 +-
 .../analyzer/dsl/spec/parser/TextParserSpec.java   |  57 ++++
 .../spec/parser/YamlParserSpec.java}               |  29 +-
 .../log/analyzer/dsl/spec/sink/SamplerSpec.java    |  84 +++++
 .../oap/log/analyzer/dsl/spec/sink/SinkSpec.java   |  61 ++++
 .../dsl/spec/sink/sampler/RateLimitingSampler.java | 107 +++++++
 .../spec/sink/sampler/Sampler.java}                |  27 +-
 ...LogAnalyzerModuleConfig.java => LALConfig.java} |  11 +-
 .../oap/log/analyzer/provider/LALConfigs.java      |  77 +++++
 .../analyzer/provider/LogAnalyzerModuleConfig.java |  45 +++
 .../provider/LogAnalyzerModuleProvider.java        |  12 +-
 .../oap/log/analyzer/provider/log/LogAnalyzer.java |   2 +-
 .../provider/log/listener/LogAnalysisListener.java |   3 +-
 .../log/listener/LogAnalysisListenerFactory.java   |   5 +-
 .../provider/log/listener/LogFilterListener.java   |  76 +++++
 .../log/listener/RecordAnalysisListener.java       |  10 +-
 .../log/listener/TrafficAnalysisListener.java      |  10 +-
 oap-server/server-bootstrap/pom.xml                |   2 +
 .../src/main/resources/application.yml             |   2 +
 .../src/main/resources/lal/default.yaml            |  43 +--
 .../main/resources/log-mal-rules/placeholder.yaml  |  36 +--
 .../e2e/e2e-test/docker/log/docker-compose.es6.yml |   9 +-
 .../e2e/e2e-test/docker/log/docker-compose.es7.yml |   7 +-
 test/e2e/e2e-test/docker/log/docker-compose.h2.yml |   9 +-
 .../docker/log/docker-compose.influxdb.yml         |   7 +-
 .../e2e-test/docker/log/docker-compose.mysql.yml   |   7 +-
 .../log/{docker-compose.influxdb.yml => lal.yaml}  |  53 ++--
 test/e2e/e2e-test/docker/log/log-mal.yaml          |  36 +++
 .../java/org/apache/skywalking/e2e/log/LogE2E.java |  32 +-
 42 files changed, 1762 insertions(+), 168 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 40f7c03..1e596f0 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -27,6 +27,7 @@ Release Notes.
 * Storage plugin supports postgresql.
 * Fix kubernetes.client.opeanapi.ApiException.
 * Remove filename suffix in the meter active file config.
+* Introduce log analysis language (LAL).
 
 #### UI
 * Update selector scroller to show in all pages.
diff --git a/apm-dist-es7/src/main/assembly/binary-es7.xml b/apm-dist-es7/src/main/assembly/binary-es7.xml
index d5b84cf..060bbd2 100644
--- a/apm-dist-es7/src/main/assembly/binary-es7.xml
+++ b/apm-dist-es7/src/main/assembly/binary-es7.xml
@@ -40,7 +40,13 @@
             <includes>
                 <include>log4j2.xml</include>
                 <include>alarm-settings.yml</include>
-                <include>alarm-settings-sample.yml</include>
+            </includes>
+        </fileSet>
+        <fileSet>
+            <directory>${project.basedir}/../dist-material</directory>
+            <outputDirectory>/</outputDirectory>
+            <includes>
+                <include>config-examples/*</include>
             </includes>
         </fileSet>
         <fileSet>
@@ -59,6 +65,8 @@
                 <include>zabbix-rules/*.yaml</include>
                 <include>otel-oc-rules/*</include>
                 <include>ui-initialized-templates/*</include>
+                <include>lal/*</include>
+                <include>log-mal-rules/*</include>
             </includes>
             <outputDirectory>/config</outputDirectory>
         </fileSet>
diff --git a/apm-dist/src/main/assembly/binary.xml b/apm-dist/src/main/assembly/binary.xml
index 56c0f7a..a81c44d 100644
--- a/apm-dist/src/main/assembly/binary.xml
+++ b/apm-dist/src/main/assembly/binary.xml
@@ -40,7 +40,13 @@
             <includes>
                 <include>log4j2.xml</include>
                 <include>alarm-settings.yml</include>
-                <include>alarm-settings-sample.yml</include>
+            </includes>
+        </fileSet>
+        <fileSet>
+            <directory>${project.basedir}/../dist-material</directory>
+            <outputDirectory>/</outputDirectory>
+            <includes>
+                <include>config-examples/*</include>
             </includes>
         </fileSet>
         <fileSet>
@@ -59,6 +65,8 @@
                 <include>zabbix-rules/*.yaml</include>
                 <include>otel-oc-rules/*</include>
                 <include>ui-initialized-templates/*</include>
+                <include>lal/*</include>
+                <include>log-mal-rules/*</include>
             </includes>
             <outputDirectory>/config</outputDirectory>
         </fileSet>
diff --git a/dist-material/alarm-settings-sample.yml b/dist-material/config-examples/alarm-settings.yml
similarity index 100%
rename from dist-material/alarm-settings-sample.yml
rename to dist-material/config-examples/alarm-settings.yml
diff --git a/dist-material/config-examples/lal.yaml b/dist-material/config-examples/lal.yaml
new file mode 100644
index 0000000..f20e359
--- /dev/null
+++ b/dist-material/config-examples/lal.yaml
@@ -0,0 +1,50 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Example config file of path: config/lal/config.yaml
+rules:
+  - name: example
+    dsl: |
+      filter {
+        if (log.service == "TestService") {
+          abort {}
+        }
+        text {
+          if (!regexp($/(?s)(?<timestamp>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}.\d{3}) \[TID:(?<tid>.+?)] \[(?<thread>.+?)] (?<level>\w{4,}) (?<logger>.{1,36}) (?<msg>.+)/$)) {
+            abort {}
+          }
+        }
+        extractor {
+          metrics {
+            timestamp log.timestamp
+            labels level: parsed.level, service: log.service, instance: log.serviceInstance
+            name "log_count"
+            value 1
+          }
+        }
+        sink {
+          sampler {
+            if (log.service == "ImportantApp") {
+              rateLimit("ImportantAppSampler") {
+                qps 300
+              }
+            } else {
+              rateLimit("OtherSampler") {
+                qps 30
+              }
+            }
+          }
+        }
+      }
diff --git a/dist-material/config-examples/log-mal.yaml b/dist-material/config-examples/log-mal.yaml
new file mode 100644
index 0000000..78a98cc
--- /dev/null
+++ b/dist-material/config-examples/log-mal.yaml
@@ -0,0 +1,37 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# This will parse a textual representation of a duration. The formats
+# accepted are based on the ISO-8601 duration format {@code PnDTnHnMn.nS}
+# with days considered to be exactly 24 hours.
+# <p>
+# Examples:
+# <pre>
+#    "PT20.345S" -- parses as "20.345 seconds"
+#    "PT15M"     -- parses as "15 minutes" (where a minute is 60 seconds)
+#    "PT10H"     -- parses as "10 hours" (where an hour is 3600 seconds)
+#    "P2D"       -- parses as "2 days" (where a day is 24 hours or 86400 seconds)
+#    "P2DT3H4M"  -- parses as "2 days, 3 hours and 4 minutes"
+#    "P-6H3M"    -- parses as "-6 hours and +3 minutes"
+#    "-P6H3M"    -- parses as "-6 hours and -3 minutes"
+#    "-P-6H+3M"  -- parses as "+6 hours and -3 minutes"
+# </pre>
+
+# Example config file of path: config/log-mal-rules/config.yaml
+expSuffix: instance(['service'], ['instance'])
+metricPrefix: log
+metricsRules:
+  - name: count_info
+    exp: log_count.tagEqual('level', 'INFO').sum(['service', 'instance'])
diff --git a/docs/en/concepts-and-designs/lal.md b/docs/en/concepts-and-designs/lal.md
new file mode 100644
index 0000000..90cad9a
--- /dev/null
+++ b/docs/en/concepts-and-designs/lal.md
@@ -0,0 +1,342 @@
+# Log Analysis Language
+
+Log Analysis Language (LAL) in SkyWalking is essentially a Domain-Specific Language (DSL) to analyze logs. You can use
+LAL to parse, extract, and save the logs, as well as collaborate the logs with traces (by extracting the trace id,
+segment id and span id) and metrics (by generating metrics from the logs and send them to the meter system).
+
+The LAL config files are in YAML format, and are located under directory `lal`, you can
+set `log-analyzer/default/lalFiles` in the `application.yml` file or set environment variable `SW_LOG_LAL_FILES` to
+activate specific LAL config files.
+
+## Filter
+
+A filter is a group of [parser](#parser), [extractor](#extractor) and [sink](#sink). Users can use one or more filters
+to organize their processing logics. Every piece of log will be sent to all filters in an LAL rule. The piece of log
+sent into the filter is available as property `log` in the LAL, therefore you can access the log service name
+via `log.service`, for all available fields of `log`, please refer to [the protocol definition](https://github.com/apache/skywalking-data-collect-protocol/blob/master/logging/Logging.proto#L41).
+
+All components are executed sequentially in the orders they are declared.
+
+### Global Functions
+
+There are functions globally available that you can use them in all components (i.e. parsers, extractors, and sinks)
+when needed.
+
+- `abort`
+
+By default, all components declared are executed no matter what flags (`dropped`, `saved`, etc.) have been set. There
+are cases where you may want the filter chain to stop earlier when specified conditions are met. `abort` function aborts
+the remaining filter chain from where it's declared, all the remaining components won't be executed at all.
+`abort` function serves as a fast-fail mechanism in LAL.
+
+```groovy
+filter {
+    if (log.service == "TestingService") { // Don't waste resources on TestingServices
+        abort {} // all remaining components won't be executed at all
+    }
+    text {
+        if (!regexp("(?<timestamp>\\d{8}) (?<thread>\\w+) (?<level>\\w+) (?<traceId>\\w+) (?<msg>.+)")) {
+            // if the logs don't match this regexp, skip it
+            abort {}
+        }
+    }
+    // ... extractors, sinks
+}
+```
+
+Note that when you put `regexp` in an `if` statement, you need to surround the expression with `()`
+like `regexp(<the expression>)`, instead of `regexp <the expression>`.
+
+### Parser
+
+Parsers are responsible for parsing the raw logs into structured data in SkyWalking for further processing. There are 3
+types of parsers at the moment, namely `json`, `yaml`, and `text`.
+
+When a piece of log is parsed, there is a corresponding property available, called `parsed`, injected by LAL.
+Property `parsed` is typically a map, containing all the fields parsed from the raw logs, for example, if the parser
+is `json` / `yaml`, `parsed` is a map containing all the key-values in the `json` / `yaml`, if the parser is `text`
+, `parsed` is a map containing all the captured groups and their values (for `regexp` and `grok`). See examples below.
+
+#### `json`
+
+<!-- TODO: is structured in the reported (gRPC) `LogData`, not much to do -->
+
+#### `yaml`
+
+<!-- TODO: is structured in the reported (gRPC) `LogData`, not much to do -->
+
+#### `text`
+
+For unstructured logs, there are some `text` parsers for use.
+
+- `regexp`
+
+`regexp` parser uses a regular expression (`regexp`) to parse the logs. It leverages the captured groups of the regexp,
+all the captured groups can be used later in the extractors or sinks.
+`regexp` returns a `boolean` indicating whether the log matches the pattern or not.
+
+```groovy
+filter {
+    text {
+        regexp "(?<timestamp>\\d{8}) (?<thread>\\w+) (?<level>\\w+) (?<traceId>\\w+) (?<msg>.+)"
+        // this is just a demo pattern
+    }
+    extractor {
+        tag level: parsed.level
+        // we add a tag called `level` and its value is parsed.level, captured from the regexp above
+        traceId parsed.traceId
+        // we also extract the trace id from the parsed result, which will be used to associate the log with the trace
+    }
+    // ...
+}
+```
+
+- `grok`
+
+<!-- TODO: grok Java library has poor performance, need to benchmark it, the idea is basically the same with `regexp` above -->
+
+### Extractor
+
+Extractors aim to extract metadata from the logs. The metadata can be a service name, a service instance name, an
+endpoint name, or even a trace ID, all of which can be associated with the existing traces and metrics.
+
+- `service`
+
+`service` extracts the service name from the `parsed` result, and set it into the `LogData`, which will be persisted (if
+not dropped) and is used to associate with traces / metrics.
+
+- `instance`
+
+`instance` extracts the service instance name from the `parsed` result, and set it into the `LogData`, which will be
+persisted (if not dropped) and is used to associate with traces / metrics.
+
+- `endpoint`
+
+`endpoint` extracts the service instance name from the `parsed` result, and set it into the `LogData`, which will be
+persisted (if not dropped) and is used to associate with traces / metrics.
+
+- `traceId`
+
+`traceId` extracts the trace ID from the `parsed` result, and set it into the `LogData`, which will be persisted (if not
+dropped) and is used to associate with traces / metrics.
+
+- `segmentId`
+
+`segmentId` extracts the segment ID from the `parsed` result, and set it into the `LogData`, which will be persisted (if
+not dropped) and is used to associate with traces / metrics.
+
+- `spanId`
+
+`spanId` extracts the span ID from the `parsed` result, and set it into the `LogData`, which will be persisted (if not
+dropped) and is used to associate with traces / metrics.
+
+- `timestamp`
+
+`timestamp` extracts the timestamp from the `parsed` result, and set it into the `LogData`, which will be persisted (if
+not dropped) and is used to associate with traces / metrics.
+
+The unit of `timestamp` is millisecond.
+
+- `tag`
+
+`tag` extracts the tags from the `parsed` result, and set them into the `LogData`. The form of this extractor is
+something like `tag key1: value, key2: value2`, you can use the properties of `parsed` as both keys and values.
+
+```groovy
+filter {
+    // ... parser
+
+    extractor {
+        tag level: parsed.level, (parsed.statusCode): parsed.statusMsg
+        tag anotherKey: "anotherConstantValue"
+    }
+}
+```
+
+- `metrics`
+
+`metrics` extracts / generates metrics from the logs, and sends the generated metrics to the meter system, you can
+configure [MAL](mal.md) for further analysis of these metrics. The dedicated MAL config files are under
+directory `log-mal-rules`, you can set `log-analyzer/default/malFiles` to enable configured files.
+
+```yaml
+# application.yml
+# ...
+log-analyzer:
+  selector: ${SW_LOG_ANALYZER:default}
+  default:
+    lalFiles: ${SW_LOG_LAL_FILES:my-lal-config} # files are under "lal" directory
+    malFiles: ${SW_LOG_MAL_FILES:my-lal-mal-config,another-lal-mal-config} # files are under "log-mal-rules" directory
+```
+
+Examples are as follows:
+
+```groovy
+filter {
+    // ...
+    extractor {
+        service parsed.serviceName
+        metrics {
+            name "log_count"
+            timestamp parsed.timestamp
+            labels level: parsed.level, service: parsed.service, instance: parsed.instance
+            value 1
+        }
+        metrics {
+            name "http_response_time"
+            timestamp parsed.timestamp
+            labels status_code: parsed.statusCode, service: parsed.service, instance: parsed.instance
+            value parsed.duration
+        }
+    }
+    // ...
+}
+```
+
+The extractor above generates a metrics named `log_count`, with tag key `level` and value `1`, after this, you can
+configure MAL rules to calculate the log count grouping by logging level like this:
+
+```yaml
+# ... other configurations of MAL
+
+metrics:
+  - name: log_count_debug
+    exp: log_count.tagEqual('level', 'DEBUG').sum(['service', 'instance']).increase('PT1M')
+  - name: log_count_error
+    exp: log_count.tagEqual('level', 'ERROR').sum(['service', 'instance']).increase('PT1M')
+
+```
+
+The other metrics generated is `http_response_time`, so that you can configure MAL rules to generate more useful metrics
+like percentiles.
+
+```yaml
+# ... other configurations of MAL
+
+metrics:
+  - name: response_time_percentile
+    exp: http_response_time.sum(['le', 'service', 'instance']).increase('PT5M').histogram().histogram_percentile([50,70,90,99])
+```
+
+### Sink
+
+Sinks are the persistent layer of the LAL. By default, all the logs of each filter are persisted into the storage.
+However, there are some mechanisms that allow you to selectively save some logs, or even drop all the logs after you've
+extracted useful information, such as metrics.
+
+#### Sampler
+
+Sampler allows you to save the logs in a sampling manner. Currently, sampling strategy `rateLimit` is supported, welcome
+to contribute more sampling strategies. If multiple samplers are specified, the last one determines the final sampling
+result, see examples in [Enforcer](#enforcer).
+
+`rateLimit` samples `n` logs at most in 1 second. `rateLimit("SamplerID")` requires an ID for the sampler, sampler
+declarations with the same ID share the same sampler instance, and thus share the same `qps`, resetting logics.
+
+Examples:
+
+```groovy
+filter {
+    // ... parser
+
+    sink {
+        sampler {
+            if (parsed.service == "ImportantApp") {
+                rateLimit("ImportantAppSampler") {
+                    qps 30  // samples 30 pieces of logs every second for service "ImportantApp"
+                }
+            } else {
+                rateLimit("OtherSampler") {
+                    qps 3   // samples 3 pieces of logs every second for other services than "ImportantApp"
+                }
+            }
+        }
+    }
+}
+```
+
+#### Dropper
+
+Dropper is a special sink, meaning that all the logs are dropped without any exception. This is useful when you want to
+drop debugging logs,
+
+```groovy
+filter {
+    // ... parser
+
+    sink {
+        if (parsed.level == "DEBUG") {
+            dropper {}
+        } else {
+            sampler {
+                // ... configs
+            }
+        }
+    }
+}
+```
+
+or you have multiple filters, some of which are for extracting metrics, only one of them needs to be persisted.
+
+```groovy
+filter { // filter A: this is for persistence
+    // ... parser
+
+    sink {
+        sampler {
+            // .. sampler configs
+        }
+    }
+}
+filter { // filter B:
+    // ... extractors to generate many metrics
+    extractors {
+        metrics {
+            // ... metrics
+        }
+    }
+    sink {
+        dropper {} // drop all logs because they have been saved in "filter A" above.
+    }
+}
+```
+
+#### Enforcer
+
+Enforcer is another special sink that forcibly samples the log, a typical use case of enforcer is when you have
+configured a sampler and want to save some logs forcibly, for example, to save error logs even if the sampling mechanism
+is configured.
+
+```groovy
+filter {
+    // ... parser
+
+    sink {
+        sampler {
+            // ... sampler configs
+        }
+        if (parserd.level == "ERROR" || parsed.userId == "TestingUserId") { // sample error logs or testing users' logs (userId == "TestingUserId") even if the sampling strategy is configured
+            enforcer {
+            }
+        }
+    }
+}
+```
+
+You can use `enforcer` and `dropper` to simulate a probabilistic sampler like this.
+
+```groovy
+filter {
+    // ... parser
+
+    sink {
+        sampler { // simulate a probabilistic sampler with sampler rate 30% (not accurate though)
+            if (Math.abs(Math.random()) > 0.3) {
+                enforcer {}
+            } else {
+                dropper {}
+            }
+        }
+    }
+}
+```
diff --git a/oap-server/analyzer/log-analyzer/pom.xml b/oap-server/analyzer/log-analyzer/pom.xml
index e16e2eb..6e2fbce 100644
--- a/oap-server/analyzer/log-analyzer/pom.xml
+++ b/oap-server/analyzer/log-analyzer/pom.xml
@@ -33,6 +33,15 @@
             <artifactId>server-core</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.skywalking</groupId>
+            <artifactId>meter-analyzer</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.codehaus.groovy</groupId>
+            <artifactId>groovy</artifactId>
+        </dependency>
     </dependencies>
 
-</project>
\ No newline at end of file
+</project>
diff --git a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/Binding.java b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/Binding.java
new file mode 100644
index 0000000..7502865
--- /dev/null
+++ b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/Binding.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.log.analyzer.dsl;
+
+import java.util.Map;
+import java.util.regex.Matcher;
+import lombok.Getter;
+import org.apache.skywalking.apm.network.logging.v3.LogData;
+
+/**
+ * The binding bridge between OAP and the DSL, which provides some convenient methods to ease the use of the raw {@link groovy.lang.Binding#setProperty(java.lang.String, java.lang.Object)} and {@link
+ * groovy.lang.Binding#getProperty(java.lang.String)}.
+ */
+public class Binding extends groovy.lang.Binding {
+    public static final String KEY_LOG = "log";
+
+    public static final String KEY_PARSED = "parsed";
+
+    public static final String KEY_SAVE = "save";
+
+    public static final String KEY_ABORT = "abort";
+
+    public Binding() {
+        setProperty(KEY_PARSED, new Parsed());
+    }
+
+    public Binding log(final LogData.Builder log) {
+        setProperty(KEY_LOG, log);
+        setProperty(KEY_SAVE, true);
+        setProperty(KEY_ABORT, false);
+        return this;
+    }
+
+    public Binding log(final LogData log) {
+        return log(log.toBuilder());
+    }
+
+    public LogData.Builder log() {
+        return (LogData.Builder) getProperty(KEY_LOG);
+    }
+
+    public Binding parsed(final Matcher parsed) {
+        parsed().matcher = parsed;
+        return this;
+    }
+
+    public Binding parsed(final Map<String, Object> parsed) {
+        parsed().map = parsed;
+        return this;
+    }
+
+    public Parsed parsed() {
+        return (Parsed) getProperty(KEY_PARSED);
+    }
+
+    public Binding save() {
+        setProperty(KEY_SAVE, true);
+        return this;
+    }
+
+    public Binding drop() {
+        setProperty(KEY_SAVE, false);
+        return this;
+    }
+
+    public boolean shouldSave() {
+        return (boolean) getProperty(KEY_SAVE);
+    }
+
+    public Binding abort() {
+        setProperty(KEY_ABORT, true);
+        return this;
+    }
+
+    public boolean shouldAbort() {
+        return (boolean) getProperty(KEY_ABORT);
+    }
+
+    public static class Parsed {
+        @Getter
+        private Matcher matcher;
+
+        @Getter
+        private Map<String, Object> map;
+
+        public Object getAt(final String key) {
+            if (matcher != null) {
+                return matcher.group(key);
+            }
+            if (map != null) {
+                return map.get(key);
+            }
+            return null;
+        }
+
+        @SuppressWarnings("unused")
+        public Object propertyMissing(final String name) {
+            return getAt(name);
+        }
+    }
+}
diff --git a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/DSL.java b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/DSL.java
new file mode 100644
index 0000000..c5fad4e
--- /dev/null
+++ b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/DSL.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.log.analyzer.dsl;
+
+import groovy.lang.GroovyShell;
+import groovy.util.DelegatingScript;
+import lombok.AccessLevel;
+import lombok.RequiredArgsConstructor;
+import org.apache.skywalking.oap.log.analyzer.dsl.spec.filter.FilterSpec;
+import org.apache.skywalking.oap.log.analyzer.provider.LogAnalyzerModuleConfig;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.library.module.ModuleStartException;
+import org.codehaus.groovy.control.CompilerConfiguration;
+
+@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
+public class DSL {
+    private final DelegatingScript script;
+
+    private final FilterSpec filterSpec;
+
+    public static DSL of(final ModuleManager moduleManager,
+                         final LogAnalyzerModuleConfig config,
+                         final String dsl) throws ModuleStartException {
+        final CompilerConfiguration cc = new CompilerConfiguration();
+        cc.setScriptBaseClass(DelegatingScript.class.getName());
+
+        final GroovyShell sh = new GroovyShell(cc);
+        final DelegatingScript script = (DelegatingScript) sh.parse(dsl);
+        final FilterSpec filterSpec = new FilterSpec(moduleManager, config);
+        script.setDelegate(filterSpec);
+
+        return new DSL(script, filterSpec);
+    }
+
+    public void bind(final Binding binding) {
+        this.filterSpec.bind(binding);
+    }
+
+    public void evaluate() {
+        script.run();
+    }
+}
diff --git a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/spec/AbstractSpec.java b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/spec/AbstractSpec.java
new file mode 100644
index 0000000..260d61f
--- /dev/null
+++ b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/spec/AbstractSpec.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.log.analyzer.dsl.spec;
+
+import groovy.lang.Closure;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.experimental.Accessors;
+import org.apache.skywalking.oap.log.analyzer.dsl.Binding;
+import org.apache.skywalking.oap.log.analyzer.provider.LogAnalyzerModuleConfig;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+
+@Getter
+@RequiredArgsConstructor
+@Accessors(fluent = true)
+public abstract class AbstractSpec {
+    private final ModuleManager moduleManager;
+
+    private final LogAnalyzerModuleConfig moduleConfig;
+
+    protected static final ThreadLocal<Binding> BINDING = ThreadLocal.withInitial(Binding::new);
+
+    public void bind(final Binding b) {
+        BINDING.set(b);
+    }
+
+    @SuppressWarnings("unused")
+    public void abort(final Closure<Void> cl) {
+        BINDING.get().abort();
+    }
+
+    @SuppressWarnings("unused")
+    public Object propertyMissing(final String name) {
+        return BINDING.get().getVariable(name);
+    }
+}
diff --git a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/spec/extractor/ExtractorSpec.java b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/spec/extractor/ExtractorSpec.java
new file mode 100644
index 0000000..0d68fe2
--- /dev/null
+++ b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/spec/extractor/ExtractorSpec.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.log.analyzer.dsl.spec.extractor;
+
+import com.google.common.collect.ImmutableMap;
+import groovy.lang.Closure;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import lombok.experimental.Delegate;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.skywalking.apm.network.common.v3.KeyStringValuePair;
+import org.apache.skywalking.apm.network.logging.v3.LogData;
+import org.apache.skywalking.apm.network.logging.v3.TraceContext;
+import org.apache.skywalking.oap.log.analyzer.dsl.spec.AbstractSpec;
+import org.apache.skywalking.oap.log.analyzer.provider.LogAnalyzerModuleConfig;
+import org.apache.skywalking.oap.meter.analyzer.MetricConvert;
+import org.apache.skywalking.oap.meter.analyzer.dsl.Sample;
+import org.apache.skywalking.oap.meter.analyzer.dsl.SampleFamily;
+import org.apache.skywalking.oap.meter.analyzer.dsl.SampleFamilyBuilder;
+import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.analysis.meter.MeterSystem;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.library.module.ModuleStartException;
+import org.apache.skywalking.oap.server.library.util.CollectionUtils;
+
+import static java.util.Objects.nonNull;
+import static org.apache.skywalking.apm.util.StringUtil.isNotBlank;
+
+public class ExtractorSpec extends AbstractSpec {
+
+    private final List<MetricConvert> metricConverts;
+
+    public ExtractorSpec(final ModuleManager moduleManager,
+                         final LogAnalyzerModuleConfig moduleConfig) throws ModuleStartException {
+        super(moduleManager, moduleConfig);
+
+        final MeterSystem meterSystem = moduleManager.find(CoreModule.NAME).provider().getService(MeterSystem.class);
+
+        metricConverts = moduleConfig.malConfigs()
+                                     .stream()
+                                     .map(it -> new MetricConvert(it, meterSystem))
+                                     .collect(Collectors.toList());
+    }
+
+    @SuppressWarnings("unused")
+    public void service(final String service) {
+        if (BINDING.get().shouldAbort()) {
+            return;
+        }
+        if (nonNull(service)) {
+            BINDING.get().log().setService(service);
+        }
+    }
+
+    @SuppressWarnings("unused")
+    public void instance(final String instance) {
+        if (BINDING.get().shouldAbort()) {
+            return;
+        }
+        if (nonNull(instance)) {
+            BINDING.get().log().setServiceInstance(instance);
+        }
+    }
+
+    @SuppressWarnings("unused")
+    public void endpoint(final String endpoint) {
+        if (BINDING.get().shouldAbort()) {
+            return;
+        }
+        if (nonNull(endpoint)) {
+            BINDING.get().log().setEndpoint(endpoint);
+        }
+    }
+
+    @SuppressWarnings("unused")
+    public void tag(final Map<String, Object> kv) {
+        if (BINDING.get().shouldAbort()) {
+            return;
+        }
+        if (CollectionUtils.isEmpty(kv)) {
+            return;
+        }
+        final LogData.Builder logData = BINDING.get().log();
+        logData.setTags(
+            logData.getTags()
+                   .toBuilder()
+                   .addAllData(
+                       kv.entrySet()
+                         .stream()
+                         .filter(it -> isNotBlank(it.getKey()))
+                         .filter(it -> nonNull(it.getValue()) && isNotBlank(Objects.toString(it.getValue())))
+                         .map(it -> KeyStringValuePair.newBuilder().setKey(it.getKey()).setValue(Objects.toString(it.getValue())).build())
+                         .collect(Collectors.toList())
+                   )
+        );
+        BINDING.get().log(logData);
+    }
+
+    @SuppressWarnings("unused")
+    public void traceId(final String traceId) {
+        if (BINDING.get().shouldAbort()) {
+            return;
+        }
+        if (nonNull(traceId)) {
+            final LogData.Builder logData = BINDING.get().log();
+            final TraceContext.Builder traceContext = logData.getTraceContext().toBuilder();
+            traceContext.setTraceId(traceId);
+            logData.setTraceContext(traceContext);
+        }
+    }
+
+    @SuppressWarnings("unused")
+    public void segmentId(final String segmentId) {
+        if (BINDING.get().shouldAbort()) {
+            return;
+        }
+        if (nonNull(segmentId)) {
+            final LogData.Builder logData = BINDING.get().log();
+            final TraceContext.Builder traceContext = logData.getTraceContext().toBuilder();
+            traceContext.setTraceSegmentId(segmentId);
+            logData.setTraceContext(traceContext);
+        }
+    }
+
+    @SuppressWarnings("unused")
+    public void spanId(final String spanId) {
+        if (BINDING.get().shouldAbort()) {
+            return;
+        }
+        if (nonNull(spanId)) {
+            final LogData.Builder logData = BINDING.get().log();
+            final TraceContext.Builder traceContext = logData.getTraceContext().toBuilder();
+            traceContext.setSpanId(Integer.parseInt(spanId));
+            logData.setTraceContext(traceContext);
+        }
+    }
+
+    @SuppressWarnings("unused")
+    public void timestamp(final String timestamp) {
+        if (BINDING.get().shouldAbort()) {
+            return;
+        }
+        if (nonNull(timestamp) && StringUtils.isNumeric(timestamp)) {
+            BINDING.get().log().setTimestamp(Long.parseLong(timestamp));
+        }
+    }
+
+    @SuppressWarnings("unused")
+    public void metrics(final Closure<Void> cl) {
+        if (BINDING.get().shouldAbort()) {
+            return;
+        }
+        final SampleBuilder builder = new SampleBuilder();
+        cl.setDelegate(builder);
+        cl.call();
+
+        final Sample sample = builder.build();
+
+        metricConverts.forEach(it -> it.toMeter(
+            ImmutableMap.<String, SampleFamily>builder()
+                .put(sample.getName(), SampleFamilyBuilder.newBuilder(sample).build())
+                .build()
+        ));
+    }
+
+    public static class SampleBuilder {
+        @Delegate
+        private final Sample.SampleBuilder sampleBuilder = Sample.builder();
+
+        @SuppressWarnings("unused")
+        public Sample.SampleBuilder labels(final Map<String, String> labels) {
+            final Map<String, String> filtered = labels.entrySet()
+                                                       .stream()
+                                                       .filter(it -> isNotBlank(it.getKey()) && isNotBlank(it.getValue()))
+                                                       .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+            return sampleBuilder.labels(ImmutableMap.copyOf(filtered));
+        }
+    }
+}
diff --git a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/spec/filter/FilterSpec.java b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/spec/filter/FilterSpec.java
new file mode 100644
index 0000000..9cfe593
--- /dev/null
+++ b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/spec/filter/FilterSpec.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.log.analyzer.dsl.spec.filter;
+
+import com.google.gson.reflect.TypeToken;
+import com.google.protobuf.TextFormat;
+import groovy.lang.Closure;
+import java.lang.reflect.Type;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.skywalking.apm.network.logging.v3.LogData;
+import org.apache.skywalking.oap.log.analyzer.dsl.Binding;
+import org.apache.skywalking.oap.log.analyzer.dsl.spec.AbstractSpec;
+import org.apache.skywalking.oap.log.analyzer.dsl.spec.extractor.ExtractorSpec;
+import org.apache.skywalking.oap.log.analyzer.dsl.spec.parser.JsonParserSpec;
+import org.apache.skywalking.oap.log.analyzer.dsl.spec.parser.TextParserSpec;
+import org.apache.skywalking.oap.log.analyzer.dsl.spec.parser.YamlParserSpec;
+import org.apache.skywalking.oap.log.analyzer.dsl.spec.sink.SinkSpec;
+import org.apache.skywalking.oap.log.analyzer.provider.LogAnalyzerModuleConfig;
+import org.apache.skywalking.oap.log.analyzer.provider.log.listener.LogAnalysisListenerFactory;
+import org.apache.skywalking.oap.log.analyzer.provider.log.listener.RecordAnalysisListener;
+import org.apache.skywalking.oap.log.analyzer.provider.log.listener.TrafficAnalysisListener;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.library.module.ModuleStartException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FilterSpec extends AbstractSpec {
+    private static final Logger LOGGER = LoggerFactory.getLogger(FilterSpec.class);
+
+    private final List<LogAnalysisListenerFactory> factories;
+
+    private final TextParserSpec textParser;
+
+    private final JsonParserSpec jsonParser;
+
+    private final YamlParserSpec yamlParser;
+
+    private final ExtractorSpec extractor;
+
+    private final SinkSpec sink;
+
+    private final Type parsedType;
+
+    public FilterSpec(final ModuleManager moduleManager,
+                      final LogAnalyzerModuleConfig moduleConfig) throws ModuleStartException {
+        super(moduleManager, moduleConfig);
+
+        parsedType = new TypeToken<Map<String, Object>>() {
+        }.getType();
+
+        factories = Arrays.asList(
+            new RecordAnalysisListener.Factory(moduleManager(), moduleConfig()),
+            new TrafficAnalysisListener.Factory(moduleManager(), moduleConfig())
+        );
+
+        textParser = new TextParserSpec(moduleManager(), moduleConfig());
+        jsonParser = new JsonParserSpec(moduleManager(), moduleConfig());
+        yamlParser = new YamlParserSpec(moduleManager(), moduleConfig());
+
+        extractor = new ExtractorSpec(moduleManager(), moduleConfig());
+
+        sink = new SinkSpec(moduleManager(), moduleConfig());
+    }
+
+    @SuppressWarnings("unused")
+    public void text(final Closure<Void> cl) {
+        if (BINDING.get().shouldAbort()) {
+            return;
+        }
+        cl.setDelegate(textParser);
+        cl.call();
+    }
+
+    @SuppressWarnings("unused")
+    public void json(final Closure<Void> cl) {
+        if (BINDING.get().shouldAbort()) {
+            return;
+        }
+        cl.setDelegate(jsonParser);
+        cl.call();
+
+        final LogData.Builder logData = BINDING.get().log();
+        final Map<String, Object> parsed = jsonParser.create().fromJson(
+            logData.getBody().getJson().getJson(), parsedType
+        );
+
+        BINDING.get().parsed(parsed);
+    }
+
+    @SuppressWarnings({"unused", "unchecked"})
+    public void yaml(final Closure<Void> cl) {
+        if (BINDING.get().shouldAbort()) {
+            return;
+        }
+        cl.setDelegate(yamlParser);
+        cl.call();
+
+        final LogData.Builder logData = BINDING.get().log();
+        final Map<String, Object> parsed = (Map<String, Object>) yamlParser.create().load(
+            logData.getBody().getYaml().getYaml()
+        );
+
+        BINDING.get().parsed(parsed);
+    }
+
+    @SuppressWarnings("unused")
+    public void extractor(final Closure<Void> cl) {
+        if (BINDING.get().shouldAbort()) {
+            return;
+        }
+        cl.setDelegate(extractor);
+        cl.call();
+    }
+
+    @SuppressWarnings("unused")
+    public void sink(final Closure<Void> cl) {
+        if (BINDING.get().shouldAbort()) {
+            return;
+        }
+        cl.setDelegate(sink);
+        cl.call();
+
+        final Binding b = BINDING.get();
+        final LogData.Builder logData = b.log();
+
+        if (!b.shouldSave()) {
+            if (LOGGER.isDebugEnabled()) {
+                LOGGER.debug("Log is dropped: {}", TextFormat.shortDebugString(logData));
+            }
+            return;
+        }
+
+        factories.stream()
+                 .map(LogAnalysisListenerFactory::create)
+                 .forEach(it -> it.parse(logData).build());
+    }
+
+    @SuppressWarnings("unused")
+    public void filter(final Closure<Void> cl) {
+        cl.call();
+    }
+}
diff --git a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/listener/LogAnalysisListenerFactory.java b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/spec/parser/JsonParserSpec.java
similarity index 58%
copy from oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/listener/LogAnalysisListenerFactory.java
copy to oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/spec/parser/JsonParserSpec.java
index 3d770df..6bb716d 100644
--- a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/listener/LogAnalysisListenerFactory.java
+++ b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/spec/parser/JsonParserSpec.java
@@ -6,25 +6,35 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
+ *
  */
 
-package org.apache.skywalking.oap.log.analyzer.provider.log.listener;
+package org.apache.skywalking.oap.log.analyzer.dsl.spec.parser;
 
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import org.apache.skywalking.oap.log.analyzer.dsl.spec.AbstractSpec;
 import org.apache.skywalking.oap.log.analyzer.provider.LogAnalyzerModuleConfig;
 import org.apache.skywalking.oap.server.library.module.ModuleManager;
 
-/**
- * LogAnalysisListenerFactory implementation creates the listener instance when required.
- * Every LogAnalysisListener could have its own creation factory.
- */
-public interface LogAnalysisListenerFactory {
+public class JsonParserSpec extends AbstractSpec {
+    private final GsonBuilder gsonBuilder;
+
+    public JsonParserSpec(final ModuleManager moduleManager,
+                          final LogAnalyzerModuleConfig moduleConfig) {
+        super(moduleManager, moduleConfig);
+
+        gsonBuilder = new GsonBuilder();
+    }
 
-    LogAnalysisListener create(ModuleManager moduleManager, LogAnalyzerModuleConfig moduleConfig);
+    public Gson create() {
+        return gsonBuilder.create();
+    }
 }
diff --git a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/spec/parser/TextParserSpec.java b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/spec/parser/TextParserSpec.java
new file mode 100644
index 0000000..0ffccab
--- /dev/null
+++ b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/spec/parser/TextParserSpec.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.log.analyzer.dsl.spec.parser;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.skywalking.apm.network.logging.v3.LogData;
+import org.apache.skywalking.oap.log.analyzer.dsl.spec.AbstractSpec;
+import org.apache.skywalking.oap.log.analyzer.provider.LogAnalyzerModuleConfig;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+
+public class TextParserSpec extends AbstractSpec {
+    public TextParserSpec(final ModuleManager moduleManager,
+                          final LogAnalyzerModuleConfig moduleConfig) {
+        super(moduleManager, moduleConfig);
+    }
+
+    @SuppressWarnings("unused")
+    public boolean regexp(final String regexp) {
+        return regexp(Pattern.compile(regexp));
+    }
+
+    public boolean regexp(final Pattern pattern) {
+        if (BINDING.get().shouldAbort()) {
+            return false;
+        }
+        final LogData.Builder log = BINDING.get().log();
+        final Matcher matcher = pattern.matcher(log.getBody().getText().getText());
+        final boolean matched = matcher.find();
+        if (matched) {
+            BINDING.get().parsed(matcher);
+        }
+        return matched;
+    }
+
+    public boolean grok(final String grok) {
+        // TODO
+        return false;
+    }
+
+}
diff --git a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/listener/LogAnalysisListenerFactory.java b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/spec/parser/YamlParserSpec.java
similarity index 51%
copy from oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/listener/LogAnalysisListenerFactory.java
copy to oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/spec/parser/YamlParserSpec.java
index 3d770df..2bb22b9 100644
--- a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/listener/LogAnalysisListenerFactory.java
+++ b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/spec/parser/YamlParserSpec.java
@@ -6,25 +6,38 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
+ *
  */
 
-package org.apache.skywalking.oap.log.analyzer.provider.log.listener;
+package org.apache.skywalking.oap.log.analyzer.dsl.spec.parser;
 
+import org.apache.skywalking.oap.log.analyzer.dsl.spec.AbstractSpec;
 import org.apache.skywalking.oap.log.analyzer.provider.LogAnalyzerModuleConfig;
 import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.yaml.snakeyaml.DumperOptions;
+import org.yaml.snakeyaml.LoaderOptions;
+import org.yaml.snakeyaml.Yaml;
+import org.yaml.snakeyaml.constructor.SafeConstructor;
+import org.yaml.snakeyaml.representer.Representer;
 
-/**
- * LogAnalysisListenerFactory implementation creates the listener instance when required.
- * Every LogAnalysisListener could have its own creation factory.
- */
-public interface LogAnalysisListenerFactory {
+public class YamlParserSpec extends AbstractSpec {
+    private final LoaderOptions loaderOptions;
+
+    public YamlParserSpec(final ModuleManager moduleManager,
+                          final LogAnalyzerModuleConfig moduleConfig) {
+        super(moduleManager, moduleConfig);
+
+        loaderOptions = new LoaderOptions();
+    }
 
-    LogAnalysisListener create(ModuleManager moduleManager, LogAnalyzerModuleConfig moduleConfig);
+    public Yaml create() {
+        return new Yaml(new SafeConstructor(), new Representer(), new DumperOptions(), loaderOptions);
+    }
 }
diff --git a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/spec/sink/SamplerSpec.java b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/spec/sink/SamplerSpec.java
new file mode 100644
index 0000000..178fa06
--- /dev/null
+++ b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/spec/sink/SamplerSpec.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.log.analyzer.dsl.spec.sink;
+
+import groovy.lang.Closure;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.skywalking.oap.log.analyzer.dsl.spec.AbstractSpec;
+import org.apache.skywalking.oap.log.analyzer.dsl.spec.sink.sampler.RateLimitingSampler;
+import org.apache.skywalking.oap.log.analyzer.dsl.spec.sink.sampler.Sampler;
+import org.apache.skywalking.oap.log.analyzer.provider.LogAnalyzerModuleConfig;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SamplerSpec extends AbstractSpec {
+    private static final Logger LOGGER = LoggerFactory.getLogger(SamplerSpec.class);
+
+    private final Map<String, Sampler> samplers;
+    private final RateLimitingSampler.ResetHandler rlsResetHandler;
+
+    public SamplerSpec(final ModuleManager moduleManager,
+                       final LogAnalyzerModuleConfig moduleConfig) {
+        super(moduleManager, moduleConfig);
+
+        samplers = new ConcurrentHashMap<>();
+        rlsResetHandler = new RateLimitingSampler.ResetHandler();
+    }
+
+    @SuppressWarnings("unused")
+    public void rateLimit(final String id, final Closure<Void> cl) {
+        if (BINDING.get().shouldAbort()) {
+            return;
+        }
+        final RateLimitingSampler newSampler = new RateLimitingSampler(rlsResetHandler);
+        cl.setDelegate(newSampler);
+        cl.call();
+
+        final Sampler sampler = samplers.computeIfAbsent(id, $ -> Sampler.NOOP);
+        if (Objects.equals(sampler, newSampler)) { // Unchanged
+            sampleWith(sampler);
+            return;
+        }
+
+        try {
+            sampler.close();
+        } catch (final Exception e) {
+            LOGGER.error("Failed to cancel old sampler: {}", sampler, e);
+        }
+
+        samplers.put(id, newSampler.start());
+
+        sampleWith(newSampler);
+    }
+
+    private void sampleWith(final Sampler sampler) {
+        if (BINDING.get().shouldAbort()) {
+            return;
+        }
+        if (sampler.sample()) {
+            BINDING.get().save();
+        } else {
+            BINDING.get().drop();
+        }
+    }
+
+}
diff --git a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/spec/sink/SinkSpec.java b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/spec/sink/SinkSpec.java
new file mode 100644
index 0000000..ee78be9
--- /dev/null
+++ b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/spec/sink/SinkSpec.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.log.analyzer.dsl.spec.sink;
+
+import groovy.lang.Closure;
+import org.apache.skywalking.oap.log.analyzer.dsl.spec.AbstractSpec;
+import org.apache.skywalking.oap.log.analyzer.provider.LogAnalyzerModuleConfig;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+
+public class SinkSpec extends AbstractSpec {
+
+    private final SamplerSpec sampler;
+
+    public SinkSpec(final ModuleManager moduleManager,
+                    final LogAnalyzerModuleConfig moduleConfig) {
+        super(moduleManager, moduleConfig);
+
+        sampler = new SamplerSpec(moduleManager(), moduleConfig());
+    }
+
+    @SuppressWarnings("unused")
+    public void sampler(final Closure<Void> cl) {
+        if (BINDING.get().shouldAbort()) {
+            return;
+        }
+        cl.setDelegate(sampler);
+        cl.call();
+    }
+
+    @SuppressWarnings("unused")
+    public void enforcer(final Closure<Void> cl) {
+        if (BINDING.get().shouldAbort()) {
+            return;
+        }
+        BINDING.get().save();
+    }
+
+    @SuppressWarnings("unused")
+    public void dropper(final Closure<Void> cl) {
+        if (BINDING.get().shouldAbort()) {
+            return;
+        }
+        BINDING.get().drop();
+    }
+}
diff --git a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/spec/sink/sampler/RateLimitingSampler.java b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/spec/sink/sampler/RateLimitingSampler.java
new file mode 100644
index 0000000..287fe24
--- /dev/null
+++ b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/spec/sink/sampler/RateLimitingSampler.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.log.analyzer.dsl.spec.sink.sampler;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.experimental.Accessors;
+import lombok.extern.slf4j.Slf4j;
+
+@Accessors(fluent = true)
+@EqualsAndHashCode(of = {"qps"})
+public class RateLimitingSampler implements Sampler {
+    @Getter
+    @Setter
+    private volatile int qps;
+
+    private final AtomicInteger factor = new AtomicInteger();
+
+    private final ResetHandler resetHandler;
+
+    public RateLimitingSampler(final ResetHandler resetHandler) {
+        this.resetHandler = resetHandler;
+    }
+
+    @Override
+    public RateLimitingSampler start() {
+        resetHandler.start(this);
+        return this;
+    }
+
+    @Override
+    public void close() {
+        resetHandler.close(this);
+    }
+
+    @Override
+    public boolean sample() {
+        return factor.getAndIncrement() < qps;
+    }
+
+    @Override
+    public RateLimitingSampler reset() {
+        factor.set(0);
+        return this;
+    }
+
+    @Slf4j
+    public static class ResetHandler {
+        private final Set<Sampler> samplers = new HashSet<>();
+
+        private volatile ScheduledFuture<?> future;
+
+        private volatile boolean started = false;
+
+        private synchronized void start(final Sampler sampler) {
+            samplers.add(sampler);
+
+            if (!started) {
+                future = Executors.newSingleThreadScheduledExecutor()
+                                  .scheduleAtFixedRate(this::reset, 1, 1, TimeUnit.SECONDS);
+                started = true;
+            }
+        }
+
+        private synchronized void close(final Sampler sampler) {
+            samplers.remove(sampler);
+
+            if (samplers.isEmpty() && future != null) {
+                future.cancel(true);
+                started = false;
+            }
+        }
+
+        private synchronized void reset() {
+            samplers.forEach(sampler -> {
+                try {
+                    sampler.reset();
+                } catch (final Exception e) {
+                    log.error("Failed to reset sampler {}.", sampler, e);
+                }
+            });
+        }
+    }
+}
diff --git a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/LogAnalyzerModuleConfig.java b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/spec/sink/sampler/Sampler.java
similarity index 60%
copy from oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/LogAnalyzerModuleConfig.java
copy to oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/spec/sink/sampler/Sampler.java
index a39d813..61e315c 100644
--- a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/LogAnalyzerModuleConfig.java
+++ b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/spec/sink/sampler/Sampler.java
@@ -6,18 +6,37 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
+ *
  */
 
-package org.apache.skywalking.oap.log.analyzer.provider;
+package org.apache.skywalking.oap.log.analyzer.dsl.spec.sink.sampler;
+
+public interface Sampler extends AutoCloseable {
+    Sampler NOOP = new Sampler() {
+        @Override
+        public boolean sample() {
+            return false;
+        }
+
+        @Override
+        public void close() {
+        }
+    };
+
+    boolean sample();
 
-import org.apache.skywalking.oap.server.library.module.ModuleConfig;
+    default Sampler start() {
+        return this;
+    }
 
-public class LogAnalyzerModuleConfig extends ModuleConfig {
+    default Sampler reset() {
+        return this;
+    }
 }
diff --git a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/LogAnalyzerModuleConfig.java b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/LALConfig.java
similarity index 81%
copy from oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/LogAnalyzerModuleConfig.java
copy to oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/LALConfig.java
index a39d813..89c331f 100644
--- a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/LogAnalyzerModuleConfig.java
+++ b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/LALConfig.java
@@ -6,18 +6,23 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
+ *
  */
 
 package org.apache.skywalking.oap.log.analyzer.provider;
 
-import org.apache.skywalking.oap.server.library.module.ModuleConfig;
+import lombok.Data;
+
+@Data
+public class LALConfig {
+    private String name;
 
-public class LogAnalyzerModuleConfig extends ModuleConfig {
+    private String dsl;
 }
diff --git a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/LALConfigs.java b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/LALConfigs.java
new file mode 100644
index 0000000..3ed21d8
--- /dev/null
+++ b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/LALConfigs.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.log.analyzer.provider;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.Reader;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.oap.server.library.module.ModuleStartException;
+import org.apache.skywalking.oap.server.library.util.ResourceUtils;
+import org.yaml.snakeyaml.Yaml;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.io.Files.getNameWithoutExtension;
+import static org.apache.skywalking.apm.util.StringUtil.isNotBlank;
+import static org.apache.skywalking.oap.server.library.util.CollectionUtils.isEmpty;
+
+@Data
+@Slf4j
+public class LALConfigs {
+    private List<LALConfig> rules;
+
+    public static List<LALConfigs> load(final String path, final List<String> files) throws Exception {
+        if (isEmpty(files)) {
+            return Collections.emptyList();
+        }
+
+        checkArgument(isNotBlank(path), "path cannot be blank");
+
+        try {
+            final File[] rules = ResourceUtils.getPathFiles(path);
+
+            return Arrays.stream(rules)
+                         .filter(File::isFile)
+                         .filter(it -> {
+                             //noinspection UnstableApiUsage
+                             return files.contains(getNameWithoutExtension(it.getName()));
+                         })
+                         .map(f -> {
+                             try (final Reader r = new FileReader(f)) {
+                                 return new Yaml().loadAs(r, LALConfigs.class);
+                             } catch (IOException e) {
+                                 log.debug("Failed to read file {}", f, e);
+                             }
+                             return null;
+                         })
+                         .filter(Objects::nonNull)
+                         .collect(Collectors.toList());
+        } catch (FileNotFoundException e) {
+            throw new ModuleStartException("Failed to load LAL config rules", e);
+        }
+    }
+}
diff --git a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/LogAnalyzerModuleConfig.java b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/LogAnalyzerModuleConfig.java
index a39d813..3b11355 100644
--- a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/LogAnalyzerModuleConfig.java
+++ b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/LogAnalyzerModuleConfig.java
@@ -17,7 +17,52 @@
 
 package org.apache.skywalking.oap.log.analyzer.provider;
 
+import com.google.common.base.Splitter;
+import com.google.common.base.Strings;
+import java.util.List;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import org.apache.skywalking.oap.meter.analyzer.prometheus.rule.Rule;
+import org.apache.skywalking.oap.meter.analyzer.prometheus.rule.Rules;
 import org.apache.skywalking.oap.server.library.module.ModuleConfig;
+import org.apache.skywalking.oap.server.library.module.ModuleStartException;
 
+import static java.util.Objects.nonNull;
+
+@EqualsAndHashCode(callSuper = false)
 public class LogAnalyzerModuleConfig extends ModuleConfig {
+    @Getter
+    @Setter
+    private String lalPath = "lal";
+
+    @Getter
+    @Setter
+    private String malPath = "log-mal-rules";
+
+    @Getter
+    @Setter
+    private String lalFiles = "default.yaml";
+
+    @Getter
+    @Setter
+    private String malFiles;
+
+    private List<Rule> meterConfigs;
+
+    public List<String> lalFiles() {
+        return Splitter.on(",").omitEmptyStrings().splitToList(Strings.nullToEmpty(getLalFiles()));
+    }
+
+    public List<Rule> malConfigs() throws ModuleStartException {
+        if (nonNull(meterConfigs)) {
+            return meterConfigs;
+        }
+        final List<String> files = Splitter.on(",")
+                                           .omitEmptyStrings()
+                                           .splitToList(Strings.nullToEmpty(getMalFiles()));
+        meterConfigs = Rules.loadRules(getMalPath(), files);
+
+        return meterConfigs;
+    }
 }
diff --git a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/LogAnalyzerModuleProvider.java b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/LogAnalyzerModuleProvider.java
index 5e16064..ecf9d0e 100644
--- a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/LogAnalyzerModuleProvider.java
+++ b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/LogAnalyzerModuleProvider.java
@@ -20,8 +20,7 @@ package org.apache.skywalking.oap.log.analyzer.provider;
 import org.apache.skywalking.oap.log.analyzer.module.LogAnalyzerModule;
 import org.apache.skywalking.oap.log.analyzer.provider.log.ILogAnalyzerService;
 import org.apache.skywalking.oap.log.analyzer.provider.log.LogAnalyzerServiceImpl;
-import org.apache.skywalking.oap.log.analyzer.provider.log.listener.RecordAnalysisListener;
-import org.apache.skywalking.oap.log.analyzer.provider.log.listener.TrafficAnalysisListener;
+import org.apache.skywalking.oap.log.analyzer.provider.log.listener.LogFilterListener;
 import org.apache.skywalking.oap.server.configuration.api.ConfigurationModule;
 import org.apache.skywalking.oap.server.core.CoreModule;
 import org.apache.skywalking.oap.server.library.module.ModuleConfig;
@@ -59,12 +58,15 @@ public class LogAnalyzerModuleProvider extends ModuleProvider {
 
     @Override
     public void start() throws ServiceNotProvidedException, ModuleStartException {
-        logAnalyzerService.addListenerFactory(new RecordAnalysisListener.Factory(getManager(), moduleConfig));
-        logAnalyzerService.addListenerFactory(new TrafficAnalysisListener.Factory(getManager(), moduleConfig));
+        try {
+            logAnalyzerService.addListenerFactory(new LogFilterListener.Factory(getManager(), moduleConfig));
+        } catch (final Exception e) {
+            throw new ModuleStartException("Failed to create LAL listener.", e);
+        }
     }
 
     @Override
-    public void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleStartException {
+    public void notifyAfterCompleted() throws ServiceNotProvidedException {
 
     }
 
diff --git a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/LogAnalyzer.java b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/LogAnalyzer.java
index 73b26f0..172b82f 100644
--- a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/LogAnalyzer.java
+++ b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/LogAnalyzer.java
@@ -65,6 +65,6 @@ public class LogAnalyzer {
 
     private void createListeners() {
         factoryManager.getLogAnalysisListenerFactories()
-                      .forEach(factory -> listeners.add(factory.create(moduleManager, moduleConfig)));
+                      .forEach(factory -> listeners.add(factory.create()));
     }
 }
diff --git a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/listener/LogAnalysisListener.java b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/listener/LogAnalysisListener.java
index 3186bfa..8e1e96f 100644
--- a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/listener/LogAnalysisListener.java
+++ b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/listener/LogAnalysisListener.java
@@ -31,6 +31,7 @@ public interface LogAnalysisListener {
 
     /**
      * Parse the raw data from the probe.
+     * @return {@code this} for chaining.
      */
-    void parse(LogData.Builder logData);
+    LogAnalysisListener parse(LogData.Builder logData);
 }
diff --git a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/listener/LogAnalysisListenerFactory.java b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/listener/LogAnalysisListenerFactory.java
index 3d770df..78d1e9f 100644
--- a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/listener/LogAnalysisListenerFactory.java
+++ b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/listener/LogAnalysisListenerFactory.java
@@ -17,14 +17,11 @@
 
 package org.apache.skywalking.oap.log.analyzer.provider.log.listener;
 
-import org.apache.skywalking.oap.log.analyzer.provider.LogAnalyzerModuleConfig;
-import org.apache.skywalking.oap.server.library.module.ModuleManager;
-
 /**
  * LogAnalysisListenerFactory implementation creates the listener instance when required.
  * Every LogAnalysisListener could have its own creation factory.
  */
 public interface LogAnalysisListenerFactory {
 
-    LogAnalysisListener create(ModuleManager moduleManager, LogAnalyzerModuleConfig moduleConfig);
+    LogAnalysisListener create();
 }
diff --git a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/listener/LogFilterListener.java b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/listener/LogFilterListener.java
new file mode 100644
index 0000000..e5d715b
--- /dev/null
+++ b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/listener/LogFilterListener.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.log.analyzer.provider.log.listener;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.apm.network.logging.v3.LogData;
+import org.apache.skywalking.oap.log.analyzer.dsl.Binding;
+import org.apache.skywalking.oap.log.analyzer.dsl.DSL;
+import org.apache.skywalking.oap.log.analyzer.provider.LALConfig;
+import org.apache.skywalking.oap.log.analyzer.provider.LALConfigs;
+import org.apache.skywalking.oap.log.analyzer.provider.LogAnalyzerModuleConfig;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+
+@Slf4j
+@RequiredArgsConstructor
+public class LogFilterListener implements LogAnalysisListener {
+    private final List<DSL> dsls;
+
+    @Override
+    public void build() {
+        dsls.forEach(dsl -> {
+            try {
+                dsl.evaluate();
+            } catch (final Exception e) {
+                log.warn("Failed to evaluate dsl: {}", dsl, e);
+            }
+        });
+    }
+
+    @Override
+    public LogAnalysisListener parse(final LogData.Builder logData) {
+        dsls.forEach(dsl -> dsl.bind(new Binding().log(logData.build())));
+        return this;
+    }
+
+    public static class Factory implements LogAnalysisListenerFactory {
+        private final List<DSL> dsls;
+
+        public Factory(final ModuleManager moduleManager, final LogAnalyzerModuleConfig config) throws Exception {
+            dsls = new ArrayList<>();
+
+            final List<LALConfig> configList = LALConfigs.load(config.getLalPath(), config.lalFiles())
+                                                         .stream()
+                                                         .flatMap(it -> it.getRules().stream())
+                                                         .collect(Collectors.toList());
+            for (final LALConfig c : configList) {
+                dsls.add(DSL.of(moduleManager, config, c.getDsl()));
+            }
+        }
+
+        @Override
+        public LogAnalysisListener create() {
+            return new LogFilterListener(dsls);
+        }
+    }
+}
diff --git a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/listener/RecordAnalysisListener.java b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/listener/RecordAnalysisListener.java
index 94e8fd5..cf5b667 100644
--- a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/listener/RecordAnalysisListener.java
+++ b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/listener/RecordAnalysisListener.java
@@ -57,7 +57,7 @@ public class RecordAnalysisListener implements LogAnalysisListener {
     }
 
     @Override
-    public void parse(final LogData.Builder logData) {
+    public LogAnalysisListener parse(final LogData.Builder logData) {
         LogDataBody body = logData.getBody();
         log.setUniqueId(UUID.randomUUID().toString().replace("-", ""));
         // timestamp
@@ -105,6 +105,7 @@ public class RecordAnalysisListener implements LogAnalysisListener {
             log.setTagsRawData(logData.getTags().toByteArray());
         }
         log.getTags().addAll(appendSearchableTags(logData));
+        return this;
     }
 
     private Collection<Tag> appendSearchableTags(LogData.Builder logData) {
@@ -112,9 +113,7 @@ public class RecordAnalysisListener implements LogAnalysisListener {
         logData.getTags().getDataList().forEach(tag -> {
             if (searchableTagKeys.contains(tag.getKey())) {
                 final Tag logTag = new Tag(tag.getKey(), tag.getValue());
-                if (!logTags.contains(logTag)) {
-                    logTags.add(logTag);
-                }
+                logTags.add(logTag);
             }
         });
         return logTags;
@@ -139,8 +138,7 @@ public class RecordAnalysisListener implements LogAnalysisListener {
         }
 
         @Override
-        public LogAnalysisListener create(final ModuleManager moduleManager,
-                                          final LogAnalyzerModuleConfig moduleConfig) {
+        public LogAnalysisListener create() {
             return new RecordAnalysisListener(sourceReceiver, namingControl, searchableTagKeys);
         }
     }
diff --git a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/listener/TrafficAnalysisListener.java b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/listener/TrafficAnalysisListener.java
index 45c149e..0cf6590 100644
--- a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/listener/TrafficAnalysisListener.java
+++ b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/listener/TrafficAnalysisListener.java
@@ -61,7 +61,7 @@ public class TrafficAnalysisListener implements LogAnalysisListener {
     }
 
     @Override
-    public void parse(final LogData.Builder logData) {
+    public LogAnalysisListener parse(final LogData.Builder logData) {
         final long timeBucket = TimeBucket.getTimeBucket(System.currentTimeMillis(), DownSampling.Minute);
         // to service traffic
         String serviceName = namingControl.formatServiceName(logData.getService());
@@ -85,6 +85,7 @@ public class TrafficAnalysisListener implements LogAnalysisListener {
             endpointMeta.setEndpoint(namingControl.formatEndpointName(serviceName, logData.getEndpoint()));
             endpointMeta.setTimeBucket(timeBucket);
         }
+        return this;
     }
 
     public static class Factory implements LogAnalysisListenerFactory {
@@ -96,13 +97,12 @@ public class TrafficAnalysisListener implements LogAnalysisListener {
                                                .provider()
                                                .getService(SourceReceiver.class);
             this.namingControl = moduleManager.find(CoreModule.NAME)
-                                              .provider().
-                                                  getService(NamingControl.class);
+                                              .provider()
+                                              .getService(NamingControl.class);
         }
 
         @Override
-        public LogAnalysisListener create(final ModuleManager moduleManager,
-                                          final LogAnalyzerModuleConfig moduleConfig) {
+        public LogAnalysisListener create() {
             return new TrafficAnalysisListener(sourceReceiver, namingControl);
         }
     }
diff --git a/oap-server/server-bootstrap/pom.xml b/oap-server/server-bootstrap/pom.xml
index 306cf3d..9fac393 100644
--- a/oap-server/server-bootstrap/pom.xml
+++ b/oap-server/server-bootstrap/pom.xml
@@ -283,6 +283,8 @@
                         <exclude>otel-oc-rules/</exclude>
                         <exclude>ui-initialized-templates/</exclude>
                         <exclude>zabbix-rules/</exclude>
+                        <exclude>lal/</exclude>
+                        <exclude>log-mal-rules/</exclude>
                     </excludes>
                 </configuration>
             </plugin>
diff --git a/oap-server/server-bootstrap/src/main/resources/application.yml b/oap-server/server-bootstrap/src/main/resources/application.yml
index 9b5779b..6ff73d9 100755
--- a/oap-server/server-bootstrap/src/main/resources/application.yml
+++ b/oap-server/server-bootstrap/src/main/resources/application.yml
@@ -230,6 +230,8 @@ agent-analyzer:
 log-analyzer:
   selector: ${SW_LOG_ANALYZER:default}
   default:
+    lalFiles: ${SW_LOG_LAL_FILES:default}
+    malFiles: ${SW_LOG_MAL_FILES:""}
 
 event-analyzer:
   selector: ${SW_EVENT_ANALYZER:default}
diff --git a/test/e2e/e2e-test/docker/log/docker-compose.influxdb.yml b/oap-server/server-bootstrap/src/main/resources/lal/default.yaml
similarity index 56%
copy from test/e2e/e2e-test/docker/log/docker-compose.influxdb.yml
copy to oap-server/server-bootstrap/src/main/resources/lal/default.yaml
index 2cb6abb..fb18825 100644
--- a/test/e2e/e2e-test/docker/log/docker-compose.influxdb.yml
+++ b/oap-server/server-bootstrap/src/main/resources/lal/default.yaml
@@ -13,38 +13,11 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-version: '2.1'
-
-services:
-  influxdb:
-    image: influxdb:1.7.9
-    expose:
-      - 8086
-    networks:
-      - e2e
-    healthcheck:
-      test: ["CMD", "bash", "-c", "cat < /dev/null > /dev/tcp/127.0.0.1/8086"]
-      interval: 5s
-      timeout: 60s
-      retries: 120
-
-  oap:
-    extends:
-      file: ../base-compose.yml
-      service: oap
-    environment:
-      SW_STORAGE: influxdb
-    depends_on:
-      influxdb:
-        condition: service_healthy
-
-  provider:
-    extends:
-      file: ../base-compose.yml
-      service: provider
-    depends_on:
-      oap:
-        condition: service_healthy
-
-networks:
-  e2e:
\ No newline at end of file
+# The default LAL script to save all logs, behaving like the versions before 8.5.0.
+rules:
+  - name: default
+    dsl: |
+      filter {
+        sink {
+        }
+      }
diff --git a/test/e2e/e2e-test/docker/log/docker-compose.influxdb.yml b/oap-server/server-bootstrap/src/main/resources/log-mal-rules/placeholder.yaml
similarity index 56%
copy from test/e2e/e2e-test/docker/log/docker-compose.influxdb.yml
copy to oap-server/server-bootstrap/src/main/resources/log-mal-rules/placeholder.yaml
index 2cb6abb..034440c 100644
--- a/test/e2e/e2e-test/docker/log/docker-compose.influxdb.yml
+++ b/oap-server/server-bootstrap/src/main/resources/log-mal-rules/placeholder.yaml
@@ -13,38 +13,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-version: '2.1'
-
-services:
-  influxdb:
-    image: influxdb:1.7.9
-    expose:
-      - 8086
-    networks:
-      - e2e
-    healthcheck:
-      test: ["CMD", "bash", "-c", "cat < /dev/null > /dev/tcp/127.0.0.1/8086"]
-      interval: 5s
-      timeout: 60s
-      retries: 120
-
-  oap:
-    extends:
-      file: ../base-compose.yml
-      service: oap
-    environment:
-      SW_STORAGE: influxdb
-    depends_on:
-      influxdb:
-        condition: service_healthy
-
-  provider:
-    extends:
-      file: ../base-compose.yml
-      service: provider
-    depends_on:
-      oap:
-        condition: service_healthy
-
-networks:
-  e2e:
\ No newline at end of file
+# Refer to examples in config-examples/log-mal.yaml
diff --git a/test/e2e/e2e-test/docker/log/docker-compose.es6.yml b/test/e2e/e2e-test/docker/log/docker-compose.es6.yml
index 595f976..04b2cf9 100644
--- a/test/e2e/e2e-test/docker/log/docker-compose.es6.yml
+++ b/test/e2e/e2e-test/docker/log/docker-compose.es6.yml
@@ -25,7 +25,7 @@ services:
     environment:
       - discovery.type=single-node
     healthcheck:
-      test: [ "CMD", "bash", "-c", "cat < /dev/null > /dev/tcp/127.0.0.1/9200" ]
+      test: ["CMD", "bash", "-c", "cat < /dev/null > /dev/tcp/127.0.0.1/9200"]
       interval: 5s
       timeout: 60s
       retries: 120
@@ -36,6 +36,11 @@ services:
       service: oap
     environment:
       SW_STORAGE: elasticsearch
+      SW_LOG_LAL_FILES: test
+      SW_LOG_MAL_FILES: test
+    volumes:
+      - ./lal.yaml:/skywalking/config/lal/test.yaml
+      - ./log-mal.yaml:/skywalking/config/log-mal-rules/test.yaml
     depends_on:
       es:
         condition: service_healthy
@@ -48,4 +53,4 @@ services:
       oap:
         condition: service_healthy
 networks:
-  e2e:
\ No newline at end of file
+  e2e:
diff --git a/test/e2e/e2e-test/docker/log/docker-compose.es7.yml b/test/e2e/e2e-test/docker/log/docker-compose.es7.yml
index 37adf3f..89bf372 100644
--- a/test/e2e/e2e-test/docker/log/docker-compose.es7.yml
+++ b/test/e2e/e2e-test/docker/log/docker-compose.es7.yml
@@ -36,6 +36,11 @@ services:
       service: oap-es7
     environment:
       SW_STORAGE: elasticsearch7
+      SW_LOG_LAL_FILES: test
+      SW_LOG_MAL_FILES: test
+    volumes:
+      - ./lal.yaml:/skywalking/config/lal/test.yaml
+      - ./log-mal.yaml:/skywalking/config/log-mal-rules/test.yaml
     depends_on:
       es:
         condition: service_healthy
@@ -48,4 +53,4 @@ services:
       oap:
         condition: service_healthy
 networks:
-  e2e:
\ No newline at end of file
+  e2e:
diff --git a/test/e2e/e2e-test/docker/log/docker-compose.h2.yml b/test/e2e/e2e-test/docker/log/docker-compose.h2.yml
index 128621b..c184f99 100644
--- a/test/e2e/e2e-test/docker/log/docker-compose.h2.yml
+++ b/test/e2e/e2e-test/docker/log/docker-compose.h2.yml
@@ -25,7 +25,7 @@ services:
     expose:
       - 1521
     healthcheck:
-      test: ["CMD", "sh", "-c", "nc -z 127.0.0.1 1521"]
+      test: [ "CMD", "sh", "-c", "nc -z 127.0.0.1 1521" ]
       interval: 5s
       timeout: 60s
       retries: 120
@@ -37,6 +37,11 @@ services:
     environment:
       SW_STORAGE: h2
       SW_STORAGE_H2_URL: jdbc:h2:tcp://h2db:1521/skywalking-oap-db
+      SW_LOG_LAL_FILES: test
+      SW_LOG_MAL_FILES: test
+    volumes:
+      - ./lal.yaml:/skywalking/config/lal/test.yaml
+      - ./log-mal.yaml:/skywalking/config/log-mal-rules/test.yaml
     depends_on:
       h2db:
         condition: service_healthy
@@ -50,4 +55,4 @@ services:
         condition: service_healthy
 
 networks:
-  e2e:
\ No newline at end of file
+  e2e:
diff --git a/test/e2e/e2e-test/docker/log/docker-compose.influxdb.yml b/test/e2e/e2e-test/docker/log/docker-compose.influxdb.yml
index 2cb6abb..9f905ef 100644
--- a/test/e2e/e2e-test/docker/log/docker-compose.influxdb.yml
+++ b/test/e2e/e2e-test/docker/log/docker-compose.influxdb.yml
@@ -34,6 +34,11 @@ services:
       service: oap
     environment:
       SW_STORAGE: influxdb
+      SW_LOG_LAL_FILES: test
+      SW_LOG_MAL_FILES: test
+    volumes:
+      - ./lal.yaml:/skywalking/config/lal/test.yaml
+      - ./log-mal.yaml:/skywalking/config/log-mal-rules/test.yaml
     depends_on:
       influxdb:
         condition: service_healthy
@@ -47,4 +52,4 @@ services:
         condition: service_healthy
 
 networks:
-  e2e:
\ No newline at end of file
+  e2e:
diff --git a/test/e2e/e2e-test/docker/log/docker-compose.mysql.yml b/test/e2e/e2e-test/docker/log/docker-compose.mysql.yml
index 4661e5e..9a0d94a 100644
--- a/test/e2e/e2e-test/docker/log/docker-compose.mysql.yml
+++ b/test/e2e/e2e-test/docker/log/docker-compose.mysql.yml
@@ -38,6 +38,11 @@ services:
       service: oap
     environment:
       SW_STORAGE: mysql
+      SW_LOG_LAL_FILES: test
+      SW_LOG_MAL_FILES: test
+    volumes:
+      - ./lal.yaml:/skywalking/config/lal/test.yaml
+      - ./log-mal.yaml:/skywalking/config/log-mal-rules/test.yaml
     depends_on:
       mysql:
         condition: service_healthy
@@ -52,4 +57,4 @@ services:
         condition: service_healthy
 
 networks:
-  e2e:
\ No newline at end of file
+  e2e:
diff --git a/test/e2e/e2e-test/docker/log/docker-compose.influxdb.yml b/test/e2e/e2e-test/docker/log/lal.yaml
similarity index 56%
copy from test/e2e/e2e-test/docker/log/docker-compose.influxdb.yml
copy to test/e2e/e2e-test/docker/log/lal.yaml
index 2cb6abb..d0d6a19 100644
--- a/test/e2e/e2e-test/docker/log/docker-compose.influxdb.yml
+++ b/test/e2e/e2e-test/docker/log/lal.yaml
@@ -13,38 +13,21 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-version: '2.1'
-
-services:
-  influxdb:
-    image: influxdb:1.7.9
-    expose:
-      - 8086
-    networks:
-      - e2e
-    healthcheck:
-      test: ["CMD", "bash", "-c", "cat < /dev/null > /dev/tcp/127.0.0.1/8086"]
-      interval: 5s
-      timeout: 60s
-      retries: 120
-
-  oap:
-    extends:
-      file: ../base-compose.yml
-      service: oap
-    environment:
-      SW_STORAGE: influxdb
-    depends_on:
-      influxdb:
-        condition: service_healthy
-
-  provider:
-    extends:
-      file: ../base-compose.yml
-      service: provider
-    depends_on:
-      oap:
-        condition: service_healthy
-
-networks:
-  e2e:
\ No newline at end of file
+rules:
+  - name: example
+    dsl: |
+      filter {
+        text {
+          regexp $/(?s)(?<timestamp>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}.\d{3}) \[TID:(?<tid>.+?)] \[(?<thread>.+?)] (?<level>\w{4,}) (?<logger>.{1,36}) (?<msg>.+)/$
+        }
+        extractor {
+          metrics {
+            timestamp log.timestamp
+            labels level: parsed.level, service: log.service, instance: log.serviceInstance
+            name "log_count"
+            value 1
+          }
+        }
+        sink {
+        }
+      }
diff --git a/test/e2e/e2e-test/docker/log/log-mal.yaml b/test/e2e/e2e-test/docker/log/log-mal.yaml
new file mode 100644
index 0000000..b2f8ff8
--- /dev/null
+++ b/test/e2e/e2e-test/docker/log/log-mal.yaml
@@ -0,0 +1,36 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# This will parse a textual representation of a duration. The formats
+# accepted are based on the ISO-8601 duration format {@code PnDTnHnMn.nS}
+# with days considered to be exactly 24 hours.
+# <p>
+# Examples:
+# <pre>
+#    "PT20.345S" -- parses as "20.345 seconds"
+#    "PT15M"     -- parses as "15 minutes" (where a minute is 60 seconds)
+#    "PT10H"     -- parses as "10 hours" (where an hour is 3600 seconds)
+#    "P2D"       -- parses as "2 days" (where a day is 24 hours or 86400 seconds)
+#    "P2DT3H4M"  -- parses as "2 days, 3 hours and 4 minutes"
+#    "P-6H3M"    -- parses as "-6 hours and +3 minutes"
+#    "-P6H3M"    -- parses as "-6 hours and -3 minutes"
+#    "-P-6H+3M"  -- parses as "+6 hours and -3 minutes"
+# </pre>
+
+expSuffix: instance(['service'], ['instance'])
+metricPrefix: log
+metricsRules:
+  - name: count_info
+    exp: log_count.tagEqual('level', 'INFO').sum(['service', 'instance'])
diff --git a/test/e2e/e2e-test/src/test/java/org/apache/skywalking/e2e/log/LogE2E.java b/test/e2e/e2e-test/src/test/java/org/apache/skywalking/e2e/log/LogE2E.java
index a3ffb4d..23cb4d1 100644
--- a/test/e2e/e2e-test/src/test/java/org/apache/skywalking/e2e/log/LogE2E.java
+++ b/test/e2e/e2e-test/src/test/java/org/apache/skywalking/e2e/log/LogE2E.java
@@ -25,13 +25,15 @@ import org.apache.skywalking.e2e.annotation.DockerCompose;
 import org.apache.skywalking.e2e.base.SkyWalkingE2E;
 import org.apache.skywalking.e2e.base.SkyWalkingTestAdapter;
 import org.apache.skywalking.e2e.common.HostAndPort;
+import org.apache.skywalking.e2e.metrics.AtLeastOneOfMetricsMatcher;
+import org.apache.skywalking.e2e.metrics.MetricsValueMatcher;
+import org.apache.skywalking.e2e.metrics.ReadMetrics;
+import org.apache.skywalking.e2e.metrics.ReadMetricsQuery;
 import org.apache.skywalking.e2e.retryable.RetryableTest;
 import org.apache.skywalking.e2e.service.Service;
 import org.apache.skywalking.e2e.service.ServicesMatcher;
 import org.apache.skywalking.e2e.service.ServicesQuery;
-import org.apache.skywalking.e2e.service.endpoint.EndpointQuery;
-import org.apache.skywalking.e2e.service.endpoint.Endpoints;
-import org.apache.skywalking.e2e.service.endpoint.EndpointsMatcher;
+import org.apache.skywalking.e2e.service.instance.Instance;
 import org.apache.skywalking.e2e.service.instance.Instances;
 import org.apache.skywalking.e2e.service.instance.InstancesMatcher;
 import org.apache.skywalking.e2e.service.instance.InstancesQuery;
@@ -136,12 +138,26 @@ public class LogE2E extends SkyWalkingTestAdapter {
 
         LOGGER.info("instances: {}", instances);
         load("expected/log/instances.yml").as(InstancesMatcher.class).verify(instances);
-    }
 
-    private void verifyServiceEndpoints(final Service service) throws Exception {
-        final Endpoints endpoints = graphql.endpoints(new EndpointQuery().serviceId(service.getKey()));
-        LOGGER.info("endpoints: {}", endpoints);
+        verifyInstanceMetrics(service, instances);
+    }
 
-        load("expected/log/endpoints.yml").as(EndpointsMatcher.class).verify(endpoints);
+    private void verifyInstanceMetrics(final Service service, final Instances instances) throws Exception {
+        for (Instance instance : instances.getInstances()) {
+            final String metricsName = "log_count_info";
+            LOGGER.info("verifying service instance response time: {}", instance);
+            final ReadMetrics instanceMetrics = graphql.readMetrics(
+                new ReadMetricsQuery().stepByMinute().metricsName(metricsName)
+                                      .serviceName(service.getLabel()).instanceName(instance.getLabel())
+            );
+
+            LOGGER.info("{}: {}", metricsName, instanceMetrics);
+            final AtLeastOneOfMetricsMatcher instanceRespTimeMatcher = new AtLeastOneOfMetricsMatcher();
+            final MetricsValueMatcher greaterThanZero = new MetricsValueMatcher();
+            greaterThanZero.setValue("gt 0");
+            instanceRespTimeMatcher.setValue(greaterThanZero);
+            instanceRespTimeMatcher.verify(instanceMetrics.getValues());
+        }
     }
+
 }